### Incrementales 

### Generar sesion de Spark 

In [1]:
from pyspark.sql import SparkSession

# Cerrar sesi√≥n actual
try:
    spark.stop()
    print("‚úÖ Sesi√≥n anterior cerrada")
except:
    print("‚ÑπÔ∏è No hab√≠a sesi√≥n previa")

# Reiniciar con configuraci√≥n expl√≠cita del JAR
spark = (SparkSession.builder
    .appName("Validacion-incremental")
    .master("spark://spark-master:7077")
    .config("spark.executor.instances", "1")
    .config("spark.executor.cores", "1")
    .config("spark.executor.memory", "2g")
    .config("spark.driver.memory", "1g")
    .config("spark.jars", "/opt/spark/jars/mssql-jdbc-13.2.0.jre8.jar")
    .config("spark.driver.extraClassPath", "/opt/spark/jars/mssql-jdbc-13.2.0.jre8.jar")
    .config("spark.executor.extraClassPath", "/opt/spark/jars/mssql-jdbc-13.2.0.jre8.jar")
    .enableHiveSupport()
    .getOrCreate())

print("‚úÖ Spark version:", spark.version)
print("‚úÖ Master URL:", spark.sparkContext.master)
print("‚úÖ Spark reiniciado con driver JDBC")

‚ÑπÔ∏è No hab√≠a sesi√≥n previa


25/09/11 16:03:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


‚úÖ Spark version: 2.4.5
‚úÖ Master URL: spark://spark-master:7077
‚úÖ Spark reiniciado con driver JDBC


In [2]:
# Importar configuraci√≥n
import sys
sys.path.append("/scripts/config")
from db_config import db_config

# Probar conexi√≥n
try:
    df_test = (spark.read.format("jdbc")
        .option("url", db_config["jdbc_url"])
        .option("user", db_config["user"])
        .option("password", db_config["password"])
        .option("driver", db_config["driver"])
        .option("query", "SELECT 1 AS test_col")
        .load())
    
    print("‚úÖ ¬°Conexi√≥n JDBC exitosa!")
    df_test.show()
    
except Exception as e:
    print(f"‚ùå Error: {str(e)}")

‚úÖ ¬°Conexi√≥n JDBC exitosa!


[Stage 0:>                                                          (0 + 1) / 1]

+--------+
|test_col|
+--------+
|       1|
+--------+



                                                                                

In [3]:
# Consultar las tablas disponibles (versi√≥n simplificada)
try:
    df_tablas = (spark.read.format("jdbc")
        .option("url", db_config["jdbc_url"])
        .option("user", db_config["user"])
        .option("password", db_config["password"])
        .option("driver", db_config["driver"])
        .option("query", """
            SELECT TOP 100
                TABLE_SCHEMA as esquema,
                TABLE_NAME as nombre_tabla,
                TABLE_TYPE as tipo_tabla
            FROM INFORMATION_SCHEMA.TABLES 
            WHERE TABLE_TYPE = 'BASE TABLE'
        """)
        .load())
    
    print("üìä Tablas disponibles en la base de datos 'olva':")
    df_tablas.show(100, truncate=False)
    print(f"\nüìà Total de tablas mostradas: {df_tablas.count()}")
    
except Exception as e:
    print(f"‚ùå Error consultando tablas: {str(e)}")

üìä Tablas disponibles en la base de datos 'olva':


                                                                                

+-------+------------+----------+
|esquema|nombre_tabla|tipo_tabla|
+-------+------------+----------+
|dbo    |Clientes    |BASE TABLE|
+-------+------------+----------+


üìà Total de tablas mostradas: 1


In [4]:
def read_sql_query(query: str):
    """
    Ejecuta un query en SQL Server usando la configuraci√≥n JDBC del proyecto.
    Retorna un DataFrame de Spark.
    """
    return (spark.read.format("jdbc")
        .option("url", db_config["jdbc_url"])
        .option("user", db_config["user"])
        .option("password", db_config["password"])
        .option("driver", db_config["driver"])
        .option("dbtable", query)   # üëà usar dbtable, no query
        .load())

In [5]:
ware_df = spark.read.parquet("hdfs://namenode:8020/bronze/clientes_watermark")
ware_df.show(6)

                                                                                

+--------------------+--------+
|      last_watermark|   tabla|
+--------------------+--------+
|2025-09-10 22:19:...|clientes|
+--------------------+--------+



In [6]:
marca_df= (ware_df
           .filter(ware_df.tabla=="clientes")
           .collect()[0]["last_watermark"])

In [7]:
# Construir query incremental con alias obligatorio
query_clientes_incr = f"(SELECT * FROM dbo.Clientes WHERE CreateTime > '{marca_df}') as clientes_incr"

# Leer con funci√≥n corregida
df_incremental = read_sql_query(query_clientes_incr)

# 3. Verificar si hay datos
if df_incremental.rdd.isEmpty():
    print("‚ö†Ô∏è No hay registros nuevos despu√©s de", marca_df)
    spark.stop()  # Opcional: si este notebook solo carga Clientes, puedes cerrar sesi√≥n
    sys.exit("üîö Pipeline finalizado: no hab√≠a registros nuevos.")
else:
    # 4. Guardar los nuevos registros en Bronze
    df_incremental.write.mode("append").parquet("hdfs://namenode:8020/bronze/clientes")
    print("‚úÖ Nuevos registros cargados en Bronze.")

                                                                                

‚úÖ Nuevos registros cargados en Bronze.


In [8]:
df_clie = spark.read.parquet("hdfs://namenode:8020/bronze/clientes")
df_clie.show(30)

+---------+--------------+--------------------+----------+--------------------+--------------------+
|ClienteID|        Nombre|               Email|  Telefono|          CreateTime|          UpdateTime|
+---------+--------------+--------------------+----------+--------------------+--------------------+
|        1|    Juan P√©rez|juan.perez@email.com| 089000100|2025-08-24 21:27:...|2025-08-24 21:31:...|
|        2|   Mar√≠a L√≥pez|maria22.nueva@ema...|0987654321|2025-08-24 21:27:...|2025-08-24 21:31:...|
|        3|   Carlos Ruiz|carlos.ruiz@email...|0971122334|2025-08-24 21:27:...|                null|
|        4|    Ana Torres|ana.torres@email.com|0965544332|2025-08-24 21:27:...|                null|
|        5|   Pedro G√≥mez|pedro.gomez@email...|0956677889|2025-08-24 21:27:...|                null|
|        7|     leo P√©rez| leo.perez@email.com|0991234522|2025-08-29 19:50:...|                null|
|       12|          dary|  dary.pez@email.com|0991234567|2025-09-10 22:19:...|       

In [9]:
from pyspark.sql.functions import max as spark_max, lit

marca_df = df_clie.agg(spark_max("CreateTime").alias("last_watermark")) \
                  .withColumn("tabla", lit("clientes"))


In [10]:
marca_df.show(truncate=False)

+-----------------------+--------+
|last_watermark         |tabla   |
+-----------------------+--------+
|2025-09-11 16:03:17.118|clientes|
+-----------------------+--------+



In [11]:
marca_df.write.mode("overwrite").parquet("hdfs://namenode:8020/bronze/clientes_watermark")

In [12]:
ware_df = spark.read.parquet("hdfs://namenode:8020/bronze/clientes_watermark")
ware_df.show(truncate=False)

+-----------------------+--------+
|last_watermark         |tabla   |
+-----------------------+--------+
|2025-09-11 16:03:17.118|clientes|
+-----------------------+--------+



In [13]:
spark.stop()