In [172]:
!pip install -qq pyodbc 

In [341]:
from pyspark.sql.functions import col, count, when, to_timestamp,  regexp_replace, expr, concat, lit

In [174]:
appName = "pypspark"
serverName = "localhost"

In [175]:
driver = 'com.microsoft.sqlserver.jdbc.SQLServerDriver'
jdbc_driver_path = r"C:\\JDBC_sql_server\\sqljdbc_12.6\\enu\\mssql-jdbc-12.6.3.jre8.jar"

In [176]:
from pyspark.sql import SparkSession

try:
    spark = SparkSession.builder \
        .appName(appName) \
        .config("spark.jars", jdbc_driver_path) \
        .getOrCreate()
except Exception as e:
    print(e)

In [177]:
csv1 = "data_1.csv"
csv2 = "data_2.csv"

df1 = spark.read.format("csv") \
    .option("header", "true") \
    .option("sep", ";") \
    .load(csv1)

df2 = spark.read.format("csv") \
    .option("header", "true") \
    .option("sep", ";") \
    .load(csv2)

df1.show(5)
df2.show(5)

+------+--------------+----------------+-------------+--------------+
|pedido|nombre_cliente|apellido_cliente|estado_pedido|fecha_registro|
+------+--------------+----------------+-------------+--------------+
|     1|       Blondie|         McGauhy|        false|     4/02/2023|
|     2|        Sherye|           Pilch|        false|    27/06/2023|
|     3|        Julius|  Chinge de Hals|        false|     7/02/2023|
|     4|         Lanni|         Wegenen|         true|    20/08/2023|
|     5|         Betty|          Klimus|         true|    26/08/2023|
+------+--------------+----------------+-------------+--------------+
only showing top 5 rows

+------+--------------+----------------+-------------+--------------+
|pedido|nombre_cliente|apellido_cliente|estado_pedido|fecha_registro|
+------+--------------+----------------+-------------+--------------+
|  1001|        Blinny|        Scollick|        false|     5/04/2024|
|  1002|         Corri|            Lude|        false|     9/06/2

In [178]:
df1.count()

1000

In [179]:
null_counts = df1.select(count(when(col("pedido").isNull(), "pedido")),
                         count(when(col("nombre_cliente").isNull(), "nombre_cliente")).alias("nombre_cliente"),
                         count(when(col("apellido_cliente").isNull(), "apellido_cliente")).alias("apellido_cliente"),
                         count(when(col("estado_pedido").isNull(), "estado_pedido")).alias("estado_pedido"),
                         count(when(col("fecha_registro").isNull(), "fecha_registro")).alias("fecha_registro"))
null_counts.show()

results = null_counts.collect()

+-------------------------------------------------+--------------+----------------+-------------+--------------+
|count(CASE WHEN (pedido IS NULL) THEN pedido END)|nombre_cliente|apellido_cliente|estado_pedido|fecha_registro|
+-------------------------------------------------+--------------+----------------+-------------+--------------+
|                                                0|             0|               0|            0|             0|
+-------------------------------------------------+--------------+----------------+-------------+--------------+



In [180]:
df1.columns

['pedido',
 'nombre_cliente',
 'apellido_cliente',
 'estado_pedido',
 'fecha_registro']

In [181]:
null_counts = df1.select([count(when(col(c).isNull(), c)).alias(c) for c in df1.columns])
null_counts.show()

+------+--------------+----------------+-------------+--------------+
|pedido|nombre_cliente|apellido_cliente|estado_pedido|fecha_registro|
+------+--------------+----------------+-------------+--------------+
|     0|             0|               0|            0|             0|
+------+--------------+----------------+-------------+--------------+



In [242]:
#Verificar que las filas cumplan con el formato para DateTime_Field dd-MM-yyyy
pattern = r'^\d{2}/\d{2}/\d{4}$'

In [243]:
filtered_df = df1.filter(col("fecha_registro").rlike(pattern))

filtered_df.show(5, truncate=False)
print(filtered_df.count())

+------+--------------+----------------+-------------+--------------+
|pedido|nombre_cliente|apellido_cliente|estado_pedido|fecha_registro|
+------+--------------+----------------+-------------+--------------+
|2     |Sherye        |Pilch           |false        |27/06/2023    |
|4     |Lanni         |Wegenen         |true         |20/08/2023    |
|5     |Betty         |Klimus          |true         |26/08/2023    |
|6     |Ibbie         |Chowne          |false        |24/03/2023    |
|7     |Sonny         |Paddon          |false        |25/02/2024    |
+------+--------------+----------------+-------------+--------------+
only showing top 5 rows

716


In [245]:
non_filtered_df = df1.filter(~col("fecha_registro").rlike(pattern))
non_filtered_df.show(5)
print(non_filtered_df.count())

+------+--------------+----------------+-------------+--------------+
|pedido|nombre_cliente|apellido_cliente|estado_pedido|fecha_registro|
+------+--------------+----------------+-------------+--------------+
|     1|       Blondie|         McGauhy|        false|     4/02/2023|
|     3|        Julius|  Chinge de Hals|        false|     7/02/2023|
|     8|       Marcile|        Hurdedge|        false|     8/09/2023|
|    12|          Addi|        Bernardi|         true|     6/06/2024|
|    16|          Hube|           Slide|         true|     3/06/2023|
+------+--------------+----------------+-------------+--------------+
only showing top 5 rows

284


In [None]:
#Corregir fechas

In [351]:
df_correct_date = df1.withColumn(
    "fecha_registro",
    when(
        col("fecha_registro").rlike(r'^\d{1}/\d{2}/\d{4}$'),
        regexp_replace(col("fecha_registro"), r'^(\d{1})/(\d{2}/\d{4})$', concat(lit("0"),col("fecha_registro")))
    ).otherwise(col("fecha_registro"))
)

non_filtered_df = df_correct_date.filter(~col("fecha_registro").rlike(pattern))
print(non_filtered_df.count())


0


In [353]:
estado_pedido_count = df_correct_date.select(
    count(when(~((col("estado_pedido") == "true") | (col("estado_pedido") == "false")), "estado_pedido")).alias("estado_pedido_count")
)

collect_estado_pedido_count = estado_pedido_count.collect()
estado_pedido_count.show()

print(collect_estado_pedido_count)
print(estado_pedido_count[0]["estado_pedido_count"])

+-------------------+
|estado_pedido_count|
+-------------------+
|                 18|
+-------------------+

[Row(estado_pedido_count=18)]
Column<'estado_pedido_count[estado_pedido_count]'>


In [359]:
df_estado_pedido_not_bool = df_correct_date.filter(~(col("estado_pedido") == "true") & ~(col("estado_pedido") == "false"))
df_estado_pedido_not_bool.show()

+------+--------------+----------------+-------------+--------------+
|pedido|nombre_cliente|apellido_cliente|estado_pedido|fecha_registro|
+------+--------------+----------------+-------------+--------------+
|    36|      Beverley|          Newiss|        talse|    15/04/2023|
|    75|        Julian|          Lordon|        tru e|    07/07/2023|
|   294|       Jermain|        Steffens|        truue|    19/09/2023|
|   410|        Karoly|       Lovegrove|        true_|    23/06/2024|
|   441|         Raoul|          Barter|  false      |    17/03/2024|
|   445|     Millisent|         Dickons|        false|    22/04/2023|
|   463|         Lorne|         Heatlie|         true|    02/04/2023|
|   481|       Joshuah|        Cottrell|       true  |    09/03/2024|
|   498|         Daune|             Gye|       true  |    16/06/2023|
|   521|         Gavan|          Gaggen|       trueke|    12/05/2023|
|   677|      Jessalyn|       McGifford|        trrue|    20/12/2023|
|   873|          Jo

In [360]:
df1_transformed_no_bool = df_correct_date.withColumn(
    "estado_pedido",
    when(col("estado_pedido").startswith("t"), "true")
    .when(col("estado_pedido").startswith("f"), "false")
    .otherwise("false")
)

df1_transformed_no_bool.show()


+------+--------------+----------------+-------------+--------------+
|pedido|nombre_cliente|apellido_cliente|estado_pedido|fecha_registro|
+------+--------------+----------------+-------------+--------------+
|     1|       Blondie|         McGauhy|        false|    04/02/2023|
|     2|        Sherye|           Pilch|        false|    27/06/2023|
|     3|        Julius|  Chinge de Hals|        false|    07/02/2023|
|     4|         Lanni|         Wegenen|         true|    20/08/2023|
|     5|         Betty|          Klimus|         true|    26/08/2023|
|     6|         Ibbie|          Chowne|        false|    24/03/2023|
|     7|         Sonny|          Paddon|        false|    25/02/2024|
|     8|       Marcile|        Hurdedge|        false|    08/09/2023|
|     9|      Flemming|          Middas|         true|    20/04/2023|
|    10|          Jule|          Vivers|         true|    10/05/2023|
|    11|       Shaughn|         Canario|         true|    27/10/2023|
|    12|          Ad

In [362]:
estado_pedido_count = df1_transformed_no_bool.select(
    count(when(~((col("estado_pedido") == "true") | (col("estado_pedido") == "false")), "estado_pedido")).alias("estado_pedido_count")
).collect()

print(estado_pedido_count[0]["estado_pedido_count"])

0


In [364]:
nombre_cliente_null_count = results[0]["nombre_cliente"]
print(nombre_cliente_null_count)

0


In [381]:
df1_transformed_no_bool.printSchema()

root
 |-- pedido: string (nullable = true)
 |-- nombre_cliente: string (nullable = true)
 |-- apellido_cliente: string (nullable = true)
 |-- estado_pedido: string (nullable = false)
 |-- fecha_registro: string (nullable = true)



In [400]:
df1_transformed_no_bool.show(10)

+------+--------------+----------------+-------------+--------------+
|pedido|nombre_cliente|apellido_cliente|estado_pedido|fecha_registro|
+------+--------------+----------------+-------------+--------------+
|     1|       Blondie|         McGauhy|        false|    04/02/2023|
|     2|        Sherye|           Pilch|        false|    27/06/2023|
|     3|        Julius|  Chinge de Hals|        false|    07/02/2023|
|     4|         Lanni|         Wegenen|         true|    20/08/2023|
|     5|         Betty|          Klimus|         true|    26/08/2023|
|     6|         Ibbie|          Chowne|        false|    24/03/2023|
|     7|         Sonny|          Paddon|        false|    25/02/2024|
|     8|       Marcile|        Hurdedge|        false|    08/09/2023|
|     9|      Flemming|          Middas|         true|    20/04/2023|
|    10|          Jule|          Vivers|         true|    10/05/2023|
+------+--------------+----------------+-------------+--------------+
only showing top 10 

In [402]:
df1_converted = df1_transformed_no_bool.select(
    col("pedido").cast("integer"),
    col("nombre_cliente").cast("string"),
    col("apellido_cliente").cast("string"),
    col("estado_pedido").cast("boolean"),
    to_timestamp(col("fecha_registro"), "dd/MM/yyyy").alias("fecha_registro")
)

df1_converted.printSchema()

root
 |-- pedido: integer (nullable = true)
 |-- nombre_cliente: string (nullable = true)
 |-- apellido_cliente: string (nullable = true)
 |-- estado_pedido: boolean (nullable = true)
 |-- fecha_registro: timestamp (nullable = true)



In [408]:
df1_converted.show(20)

+------+--------------+----------------+-------------+-------------------+
|pedido|nombre_cliente|apellido_cliente|estado_pedido|     fecha_registro|
+------+--------------+----------------+-------------+-------------------+
|     1|       Blondie|         McGauhy|        false|2023-02-04 00:00:00|
|     2|        Sherye|           Pilch|        false|2023-06-27 00:00:00|
|     3|        Julius|  Chinge de Hals|        false|2023-02-07 00:00:00|
|     4|         Lanni|         Wegenen|         true|2023-08-20 00:00:00|
|     5|         Betty|          Klimus|         true|2023-08-26 00:00:00|
|     6|         Ibbie|          Chowne|        false|2023-03-24 00:00:00|
|     7|         Sonny|          Paddon|        false|2024-02-25 00:00:00|
|     8|       Marcile|        Hurdedge|        false|2023-09-08 00:00:00|
|     9|      Flemming|          Middas|         true|2023-04-20 00:00:00|
|    10|          Jule|          Vivers|         true|2023-05-10 00:00:00|
|    11|       Shaughn|  

## Crear tabla para guardar el df

In [412]:
import pyodbc
conn  = pyodbc.connect( 'Driver={SQL Server};'
                        'Server=localhost;'
                        'Trusted_Connection=yes;'
                        'autocommit=True;')

cursor = conn.cursor()

In [413]:
create_database_query = '''
IF NOT EXISTS (SELECT * FROM sys.databases WHERE name = 'PySpark_MSSQL')
BEGIN
    CREATE DATABASE [PySpark_MSSQL]
END
'''

use_database_query = '''
USE [PySpark_MSSQL]
'''

create_table_query = '''
IF NOT EXISTS (SELECT * FROM sysobjects WHERE name='test' AND xtype='U')
    BEGIN
        CREATE TABLE pedidos (
            pedido INTEGER NULL,
            nombre_cliente NVARCHAR(255) NULL,
            apellido_cliente NVARCHAR(255) NULL,
            estado_pedido BIT NULL,
            fecha_registro DATETIME NULL
        )
    END
'''

In [414]:
try:
    cursor.execute(create_database_query)
    conn.commit()

    cursor.execute(use_database_query)
    conn.commit()

    cursor.execute(create_table_query)
    conn.commit()

except pyodbc.Error as e:
    print(e)

conn.close()

In [415]:
db_name = "PySpark_MSSQL"
table = "pedidos"
url = f"jdbc:sqlserver://{serverName}:1433;DatabaseName={db_name};integratedSecurity=true;trustServerCertificate=true;"


In [416]:
try:
    df1_converted.write \
        .format("jdbc") \
        .option("url", url) \
        .option("dbtable", table) \
        .option("driver", driver) \
        .mode("append") \
        .save()

except Exception as e:
    print(e)

In [417]:
#Leer la data
query = '''
SELECT * FROM pedidos
'''

In [418]:
try:
    df_final = spark.read.format("jdbc") \
        .option("url", url) \
        .option("query", query) \
        .option("driver", driver) \
        .load()

    df_final.show(10)
except Exception as e:
    print(e)

+------+--------------+----------------+-------------+-------------------+
|pedido|nombre_cliente|apellido_cliente|estado_pedido|     fecha_registro|
+------+--------------+----------------+-------------+-------------------+
|     1|       Blondie|         McGauhy|        false|2023-02-04 00:00:00|
|     2|        Sherye|           Pilch|        false|2023-06-27 00:00:00|
|     3|        Julius|  Chinge de Hals|        false|2023-02-07 00:00:00|
|     4|         Lanni|         Wegenen|         true|2023-08-20 00:00:00|
|     5|         Betty|          Klimus|         true|2023-08-26 00:00:00|
|     6|         Ibbie|          Chowne|        false|2023-03-24 00:00:00|
|     7|         Sonny|          Paddon|        false|2024-02-25 00:00:00|
|     8|       Marcile|        Hurdedge|        false|2023-09-08 00:00:00|
|     9|      Flemming|          Middas|         true|2023-04-20 00:00:00|
|    10|          Jule|          Vivers|         true|2023-05-10 00:00:00|
+------+--------------+--