# Inicializaci√≥n SPARK

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, broadcast, current_timestamp
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, LongType, TimestampType

# 1. INICIALIZACI√ìN DE SPARK
# Los packages se especifican aqu√≠ para que Spark los descargue y use
SPARK_PACKAGES = (
    "io.delta:delta-spark_2.12:3.1.0,"
    "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1"
)
SPARK_MASTER = "spark://0.0.0.0:7077" # Conecta al Spark Master dentro del contenedor

spark = SparkSession.builder \
    .appName("BronzeToSilverStreaming") \
    .master(SPARK_MASTER) \
    .config("spark.jars.packages", SPARK_PACKAGES) \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")
print("Sesi√≥n de Spark iniciada con soporte para Kafka y Delta Lake.")

Sesi√≥n de Spark iniciada con soporte para Kafka y Delta Lake.


# Esquema y lectura de Kafka (Capa Bronze)

In [4]:
# Esquema de los datos REALES del productor
CONTRACT_SCHEMA = StructType([
    StructField("id_contrato", StringType(), True),
    StructField("objeto_contrato", StringType(), True),
    StructField("entidad", StringType(), True),
    StructField("codigo_unspsc", StringType(), True), 
    StructField("duracion_dias", LongType(), True),
    StructField("valor_contrato", DoubleType(), True),
    StructField("fecha_firma", StringType(), True),
    StructField("departamento", StringType(), True),
    
    # Variables a ser eliminadas
    StructField("sector_ruidoso", StringType(), True), 
    StructField("estado_proceso_ruidoso", StringType(), True),
])
# Leer el stream desde Kafka (Capa Bronze)
kafka_stream = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:29092") \
    .option("subscribe", "contratos-publicos") \
    .option("startingOffsets", "earliest") \
    .load()

print("Stream de Kafka configurado.")

Stream de Kafka configurado.


# Transformaci√≥n y limpieza (Bronze -> Silver)

In [None]:
# 3. DATOS DE CRUCE Y FILTRADO (SOLO EJE CAFETERO)

# Dataset de Regiones: Solo incluye los departamentos del Eje Cafetero
regiones_eje_cafetero = [
    ("ANTIOQUIA", "Eje Cafetero"), 
    ("CALDAS", "Eje Cafetero"),
    ("QUINDIO", "Eje Cafetero"), 
    ("RISARALDA", "Eje Cafetero"),
    ("TOLIMA", "Eje Cafetero"), 
    ("VALLE DEL CAUCA", "Eje Cafetero")
]

df_regiones = (
    spark.createDataFrame(regiones_eje_cafetero)
         .toDF("departamento_join", "macrorregion_turistica")
)

df_regiones_broadcast = broadcast(df_regiones)

print("‚úÖ DataFrame de Regiones (Broadcast) listo para el Join.")


# 4. APLICACI√ìN DE TRANSFORMACIONES (BRONZE -> SILVER)

df_silver = (
    kafka_stream
        # Explosi√≥n del JSON y metadatos de Kafka
        .withColumn(
            "value_content",
            from_json(col("value").cast("string"), CONTRACT_SCHEMA)
        )
        .select(
            col("value_content.*"),
            col("timestamp").alias("kafka_ingestion_time"),
            col("offset").alias("kafka_offset")
        )

        # LIMPIEZA: Eliminaci√≥n de Redundantes/Ruidosas
        .drop("sector_ruidoso", "estado_proceso_ruidoso")

        # CRUCE: Broadcasting Join con Regiones (solo Eje Cafetero)
        .join(
            df_regiones_broadcast,
            on=df_regiones_broadcast.departamento_join == col("departamento"),
            how="inner"  # INNER JOIN asegura que solo pasen los que coinciden
        )
        .drop("departamento_join")

        # Filtrar expl√≠citamente Eje Cafetero y a√±adir tiempo de procesamiento
        .filter(col("macrorregion_turistica") == "Eje Cafetero")
        .withColumn("processing_time", current_timestamp())
)

print("‚úÖ Pipeline de limpieza, filtro (Eje Cafetero) y transformaci√≥n (Silver) definido.")


‚úÖ DataFrame de Regiones (Broadcast) listo para el Join.


NameError: name 'to_timestamp' is not defined

# Persistencia en Delta-Lake

In [None]:
# 5. PERSISTENCIA EN DELTA LAKE (CAPA SILVER)

DELTA_LAKE_PATH = "/opt/spark/data/delta/silver_contracts"
CHECKPOINT_PATH = "/opt/spark/data/checkpoints/silver_contracts"

# Iniciar la escritura del stream
query = (
    df_silver.writeStream
        .format("delta")
        .outputMode("append")   # ‚Üê aqu√≠ faltaba el backslash
        .option("checkpointLocation", CHECKPOINT_PATH)
        .option("path", DELTA_LAKE_PATH)
        .trigger(processingTime="10 seconds")
        .start()
)

print(f"‚úÖ Escritura del Stream a Delta Lake iniciada. Estado del Query ID: {query.id}")

# query.awaitTermination()  # Descomenta para mantener la ejecuci√≥n viva
