In [1]:
# ============================================================================
# FASE 2 - OPTIMIZADO PARA SPARK 3.5.1 + DELTA LAKE 3.0
# ============================================================================

# PASO 0: REINICIAR SPARK CON VERSIONES CORRECTAS
try:
    spark.stop()
except:
    pass

import time
time.sleep(3)

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = (
    SparkSession.builder
    .appName("Bronze_to_Silver_Optimized")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .config("spark.jars.packages", 
            "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1,"
            "io.delta:delta-spark_2.12:3.0.0")
    .config("spark.driver.memory", "4g")
    .config("spark.executor.memory", "4g")
    .config("spark.driver.maxResultSize", "2g")
    .config("spark.sql.shuffle.partitions", "50")
    .config("spark.sql.adaptive.enabled", "true")
    .getOrCreate()
)

spark.sparkContext.setLogLevel("ERROR")
print(f" Spark {spark.version} iniciado\n")


:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
io.delta#delta-spark_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-3c1b76ef-2078-463a-8103-daaea123be2e;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.5.1 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.5.1 in central
	found org.apache.kafka#kafka-clients;3.4.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.10.3 in central
	found org.slf4j#slf4j-api;2.0.7 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
	found org.apache.hadoop#hadoop-client-api;3.3.4 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.commons#commons-pool2;2.11.1 in central
	found io.delta#delta-spark_

 Spark 3.5.1 iniciado



In [2]:
# ============================================================================
# 1. LECTURA DE KAFKA
# ============================================================================

print("="*80)
print("PASO 1: LECTURA DE KAFKA")
print("="*80 + "\n")

contract_schema = StructType([
    StructField("id_contrato", StringType()),
    StructField("objeto_contrato", StringType()),
    StructField("entidad", StringType()),
    StructField("departamento", StringType()),
    StructField("municipio", StringType()),
    StructField("region", StringType()),
    StructField("codigo_unspsc", StringType()),
    StructField("descripcion_categoria", StringType()),
    StructField("valor_contrato", DoubleType()),
    StructField("duracion_dias", IntegerType()),
    StructField("fecha_firma", StringType()),
    StructField("tipo_contrato", StringType()),
    StructField("estado_contrato", StringType()),
    StructField("modalidad", StringType()),
    StructField("anno", IntegerType()),
    StructField("id_interno_sistema", StringType()),
    StructField("campo_vacio", StringType()),
    StructField("constante_1", StringType()),
    StructField("constante_2", IntegerType()),
    StructField("duplicate_id", StringType()),
    StructField("timestamp_carga", StringType())
])

print("Leyendo Kafka...")

df_kafka = spark.read \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:29092") \
    .option("subscribe", "contratos-publicos") \
    .option("startingOffsets", "earliest") \
    .load()

df_bronze = df_kafka.select(
    from_json(col("value").cast("string"), contract_schema).alias("data")
).select("data.*")

df_bronze = df_bronze.cache()
total_kafka = df_bronze.count()

print(f" Mensajes: {total_kafka:,}\n")



PASO 1: LECTURA DE KAFKA

Leyendo Kafka...


[Stage 0:>                                                          (0 + 1) / 1]

 Mensajes: 50,349



                                                                                

In [3]:
# ============================================================================
# 2. ELIMINAR REDUNDANTES
# ============================================================================

print("="*80)
print("PASO 2: ELIMINAR REDUNDANTES")
print("="*80 + "\n")

redundant_columns = [
    "id_interno_sistema", "campo_vacio", "constante_1",
    "constante_2", "duplicate_id", "timestamp_carga"
]

print(f" Eliminando {len(redundant_columns)} columnas redundantes")

df_cleaned = df_bronze.drop(*redundant_columns)

print(f" Columnas restantes: {len(df_cleaned.columns)}\n")

#  LIBERAR bronze, ya no lo necesitamos
df_bronze.unpersist()

PASO 2: ELIMINAR REDUNDANTES

 Eliminando 6 columnas redundantes
 Columnas restantes: 15



DataFrame[id_contrato: string, objeto_contrato: string, entidad: string, departamento: string, municipio: string, region: string, codigo_unspsc: string, descripcion_categoria: string, valor_contrato: double, duracion_dias: int, fecha_firma: string, tipo_contrato: string, estado_contrato: string, modalidad: string, anno: int, id_interno_sistema: string, campo_vacio: string, constante_1: string, constante_2: int, duplicate_id: string, timestamp_carga: string]

In [4]:
# ============================================================================
# 3. LIMPIEZA
# ============================================================================

print("="*80)
print("PASO 3: LIMPIEZA")
print("="*80 + "\n")

df_cleaned = df_cleaned.cache()
total_cleaned = df_cleaned.count()

print(f" Registros: {total_cleaned:,}\n")

# An√°lisis de nulos optimizado
null_counts = df_cleaned.select([
    count(when(col(c).isNull(), c)).alias(c) 
    for c in df_cleaned.columns
]).collect()[0].asDict()

print("Columnas con nulos:")
for col_name, null_count in sorted(null_counts.items(), key=lambda x: x[1], reverse=True):
    if null_count > 0:
        pct = (null_count / total_cleaned) * 100
        print(f"   {col_name}: {null_count:,} ({pct:.1f}%)")

# Limpieza
df_silver = df_cleaned \
    .filter(col("id_contrato").isNotNull()) \
    .filter(col("objeto_contrato").isNotNull()) \
    .filter(col("valor_contrato").isNotNull()) \
    .filter(col("valor_contrato") > 0) \
    .filter(col("fecha_firma").isNotNull()) \
    .withColumn("fecha_firma", to_date(col("fecha_firma"), "yyyy-MM-dd"))

df_silver = df_silver.cache()
total_silver = df_silver.count()

print(f"\n Limpieza:")
print(f"   Antes: {total_cleaned:,}")
print(f"   Despu√©s: {total_silver:,}")
print(f"   Descartados: {total_cleaned - total_silver:,}\n")

df_cleaned.unpersist()


PASO 3: LIMPIEZA



                                                                                

 Registros: 50,349

Columnas con nulos:
   duracion_dias: 50,349 (100.0%)


[Stage 11:>                                                         (0 + 1) / 1]


 Limpieza:
   Antes: 50,349
   Despu√©s: 50,058
   Descartados: 291



                                                                                

DataFrame[id_contrato: string, objeto_contrato: string, entidad: string, departamento: string, municipio: string, region: string, codigo_unspsc: string, descripcion_categoria: string, valor_contrato: double, duracion_dias: int, fecha_firma: string, tipo_contrato: string, estado_contrato: string, modalidad: string, anno: int]

In [5]:
# ============================================================================
# 4. ESTAD√çSTICAS
# ============================================================================

print("="*80)
print("PASO 4: ESTAD√çSTICAS")
print("="*80 + "\n")

print("üìä Por regi√≥n:")
df_silver.groupBy("region").count().orderBy(desc("count")).show(5)

print("\nüìä Top 5 entidades:")
df_silver.groupBy("entidad").count().orderBy(desc("count")).show(5, truncate=False)


PASO 4: ESTAD√çSTICAS

üìä Por regi√≥n:
+--------------+-----+
|        region|count|
+--------------+-----+
|Centro-Oriente|50058|
+--------------+-----+


üìä Top 5 entidades:
+-------------------------------------------------+-----+
|entidad                                          |count|
+-------------------------------------------------+-----+
|MUNICIPIO DE SOACHA.                             |3182 |
|ALCALD√çA MUNICIPAL COTA                          |1994 |
|ESE MUNICIPAL DE SOACHA JULIO CESAR PE√ëALOZA*    |1919 |
|CUNDINAMARCA-ALCALDIA MUNICIPIO MOSQUERA         |1883 |
|empresa social del estado regi√≥n de salud soacha.|1579 |
+-------------------------------------------------+-----+
only showing top 5 rows



In [6]:
# ============================================================================
# 5. GUARDAR EN DELTA LAKE
# ============================================================================

print("="*80)
print("PASO 5: GUARDAR EN DELTA LAKE")
print("="*80 + "\n")

DELTA_PATH = "/app/notebooks/delta_lake/silver_contracts"

print(f"üíæ Guardando en: {DELTA_PATH}")

df_silver.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .save(DELTA_PATH)

print("‚úÖ Guardado exitosamente\n")

# ‚ö†Ô∏è LIBERAR todo
df_silver.unpersist()
spark.catalog.clearCache()


PASO 5: GUARDAR EN DELTA LAKE

üíæ Guardando en: /app/notebooks/delta_lake/silver_contracts




‚úÖ Guardado exitosamente



                                                                                

In [10]:
print(f"Spark version: {spark.version}")

Spark version: 3.5.1


In [7]:
# ============================================================================
# 6. VERIFICACI√ìN
# ============================================================================

print("="*80)
print("VERIFICACI√ìN FINAL")
print("="*80 + "\n")

df_verify = spark.read.format("delta").load(DELTA_PATH)
print(f"‚úÖ Registros verificados: {df_verify.count():,}")

df_verify.select("id_contrato", "entidad", "valor_contrato", "fecha_firma") \
    .show(5, truncate=False)

print("\nüéØ Fase 2 completada. Siguiente: Fase 3 - Embeddings\n")

VERIFICACI√ìN FINAL

‚úÖ Registros verificados: 50,058
+------------+----------------------------------------------------+--------------+-----------+
|id_contrato |entidad                                             |valor_contrato|fecha_firma|
+------------+----------------------------------------------------+--------------+-----------+
|CPS-045-2024|empresa social del estado regi√≥n de salud soacha.   |7.8624E7      |2024-01-01 |
|CPS 018-2024|E.S.E HOSPITAL NUESTRA SE√ëORA DEL CARMEN DEL COLEGIO|1.07844E7     |2024-01-01 |
|CPS 012-2024|E.S.E HOSPITAL NUESTRA SE√ëORA DEL CARMEN DEL COLEGIO|1.07844E7     |2024-01-01 |
|024-2024    |ESE HOSPITAL SALAZAR DE VILLETA                     |9363575.0     |2024-01-01 |
|CPS-060-2024|empresa social del estado regi√≥n de salud soacha.   |8.3279308E7   |2024-01-01 |
+------------+----------------------------------------------------+--------------+-----------+
only showing top 5 rows


üéØ Fase 2 completada. Siguiente: Fase 3 - Embeddings

