### Configuración inicial

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date
from delta import *
from pyspark.sql import functions as F
from pyspark.sql.types import DecimalType, TimestampType ,DateType


# Forzamos la limpieza de memoria en el driver antes de la escritura
import gc
gc.collect()

builder = SparkSession.builder \
    .appName("Lab_SECOP_Silver") \
    .master("local[*]") \
    .config("spark.jars.packages", "io.delta:delta-spark_2.12:3.0.0") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.sql.shuffle.partitions", "4")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
io.delta#delta-spark_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-df371132-736d-4239-86f7-e2f650d4d9c3;1.0
	confs: [default]
	found io.delta#delta-spark_2.12;3.0.0 in central
	found io.delta#delta-storage;3.0.0 in central
	found org.antlr#antlr4-runtime;4.9.3 in central
:: resolution report :: resolve 155ms :: artifacts dl 5ms
	:: modules in use:
	io.delta#delta-spark_2.12;3.0.0 from central in [default]
	io.delta#delta-storage;3.0.0 from central in [default]
	org.antlr#antlr4-runtime;4.9.3 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   3   |   0   |   0   |   

### Leer datos de Bronce

In [2]:
bronze_path = "/app/data/lakehouse/bronze/secop"
df_bronze= spark.read.format("delta").load(bronze_path)

In [3]:
df_bronze.limit(5).toPandas()

26/01/31 00:05:58 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

Unnamed: 0,anno_bpin,c_digo_bpin,ciudad,codigo_de_categoria_principal,codigo_entidad,codigo_proveedor,condiciones_de_entrega,departamento,descripcion_del_proceso,descripcion_documentos_tipo,...,valor_amortizado,valor_de_pago_adelantado,valor_del_contrato,valor_facturado,valor_pagado,valor_pendiente_de,valor_pendiente_de_ejecucion,valor_pendiente_de_pago,_ingestion_time,_source_file
0,2025,202500000002779,Chinácota,V1.80111600,704851104,730599727,No Definido,Norte de Santander,PRESTAR LOS SERVICIOS PROFESIONALES COMO FORMA...,No definido,...,0,0,12343333,12343333,11500000,0,843333,843333,2026-01-30 22:18:40.643078,API_Socrata_Bogota_2025
1,No D,No Definido,No Definido,V1.80111607,705008498,718547920,Como acordado previamente,No Definido,Brindar acompañamiento jurídico y apoyo profes...,No definido,...,0,0,20000000,20000000,20000000,0,0,0,2026-01-30 22:18:40.643078,API_Socrata_Bogota_2025
2,No D,No Definido,Cartagena,V1.85121600,709192637,712389626,No Definido,Bolívar,PRESTACIÓN DE LOS SERVICIOS COMO GESTORES DE S...,No definido,...,0,0,4000000,4000000,4000000,0,0,0,2026-01-30 22:18:40.643078,API_Socrata_Bogota_2025
3,No D,No Definido,Los Patios,V1.85101601,713088169,731045159,No Definido,Norte de Santander,PRESTAR SUS SERVICIOS EN CONDICIÓN DE TÉCNICO ...,No definido,...,0,0,10000000,10000000,10000000,0,0,0,2026-01-30 22:18:40.643078,API_Socrata_Bogota_2025
4,No D,No Definido,Amalfi,V1.81111819,718317027,718871106,A convenir,Antioquia,Prestación de servicios de Asesoría en Calidad...,No definido,...,0,0,12637800,8425200,8425200,0,4212600,4212600,2026-01-30 22:18:40.643078,API_Socrata_Bogota_2025


### Transformación del tipo de dato

In [4]:
# grupos de columnas 

# Números (Money/Decimal)
cols_decimal = [
    "valor_del_contrato", "valor_de_pago_adelantado", "valor_facturado", 
    "valor_pendiente_de_pago", "valor_pagado", "valor_amortizado", 
    "valor_pendiente_de", "valor_pendiente_de_ejecucion", "saldo_cdp", 
    "saldo_vigencia", "presupuesto_general_de_la_nacion_pgn", 
    "sistema_general_de_participaciones", "sistema_general_de_regal_as",
    "recursos_propios_alcald_as_gobernaciones_y_resguardos_ind_genas_",
    "recursos_de_credito", "recursos_propios"
]

#  Fechas (Timestamp)
cols_timestamp = [
    "fecha_de_firma", "fecha_de_inicio_del_contrato", "fecha_de_fin_del_contrato",
    "ultima_actualizacion", "fecha_inicio_liquidacion", "fecha_fin_liquidacion",
    "fecha_de_notificaci_n_de_prorrogaci_n"
]

# números enteros o IDs
cols_int = ["nit_entidad", "codigo_entidad", "dias_adicionados"]

# 3. Aplicar las transformaciones
df_silver_typed = df_bronze

for c in df_bronze.columns:
    if c in cols_decimal:
        df_silver_typed = df_silver_typed.withColumn(c, F.col(c).cast(DecimalType(18, 2)))
    elif c in cols_timestamp:
        df_silver_typed = df_silver_typed.withColumn(c, F.col(c).cast("timestamp"))
    elif c in cols_int:
        df_silver_typed = df_silver_typed.withColumn(c, F.col(c).cast("long"))
    # El resto se quedan como String automáticamente

print("Tipado completo. Revisando esquema...")
df_silver_typed.printSchema()

26/01/31 00:06:04 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


Tipado completo. Revisando esquema...
root
 |-- anno_bpin: string (nullable = true)
 |-- c_digo_bpin: string (nullable = true)
 |-- ciudad: string (nullable = true)
 |-- codigo_de_categoria_principal: string (nullable = true)
 |-- codigo_entidad: long (nullable = true)
 |-- codigo_proveedor: string (nullable = true)
 |-- condiciones_de_entrega: string (nullable = true)
 |-- departamento: string (nullable = true)
 |-- descripcion_del_proceso: string (nullable = true)
 |-- descripcion_documentos_tipo: string (nullable = true)
 |-- destino_gasto: string (nullable = true)
 |-- dias_adicionados: long (nullable = true)
 |-- documento_proveedor: string (nullable = true)
 |-- documentos_tipo: string (nullable = true)
 |-- domicilio_representante_legal: string (nullable = true)
 |-- duraci_n_del_contrato: string (nullable = true)
 |-- el_contrato_puede_ser_prorrogado: string (nullable = true)
 |-- entidad_centralizada: string (nullable = true)
 |-- es_grupo: string (nullable = true)
 |-- es_pym

### Descriptivas

In [5]:
def resumen_robusto_numericas(df, columnas):
    resultados = []

    for col in columnas:
        # 1. Calculamos las estadísticas
        stats = df.select(
            F.count(F.col(col)).alias("n"),
            F.mean(F.when(F.col(col).isNull(), 1).otherwise(0)).alias("missing_prop"),
            F.expr(f"percentile_approx({col}, 0.5)").alias("mediana"),
            F.expr(f"percentile_approx({col}, 0.1)").alias("p10"),
            F.expr(f"percentile_approx({col}, 0.9)").alias("p90"),
            (
                F.expr(f"percentile_approx({col}, 0.75)") -
                F.expr(f"percentile_approx({col}, 0.25)")
            ).alias("iqr"),
            F.mean(F.when(F.col(col) == 0, 1).otherwise(0)).alias("ceros_prop")
        ).collect()[0]

        # 2. Convertimos TODO a float para evitar el error de mezcla de tipos
        # Usamos float() para asegurar que Decimal y Long convivan en paz
        resultados.append((
            col,
            int(stats["n"]),
            round(float(stats["missing_prop"]) * 100, 2),
            float(stats["mediana"]) if stats["mediana"] is not None else 0.0,
            float(stats["p10"]) if stats["p10"] is not None else 0.0,
            float(stats["p90"]) if stats["p90"] is not None else 0.0,
            float(stats["iqr"]) if stats["iqr"] is not None else 0.0,
            round(float(stats["ceros_prop"]) * 100, 2)
        ))

    return spark.createDataFrame(
        resultados,
        [
            "variable", "n", "missing_%", "mediana", "p10", "p90", "iqr", "ceros_%"
        ]
    )

In [6]:
vars_numericas = [
    "valor_del_contrato", 
    "valor_pagado", 
    "valor_facturado", 
    "dias_adicionados" ]

In [7]:
resumen_num = resumen_robusto_numericas(df_silver_typed, vars_numericas)
resumen_num.show()

                                                                                

+------------------+------+---------+-----------+---------+-----------+-----------+-------+
|          variable|     n|missing_%|    mediana|      p10|        p90|        iqr|ceros_%|
+------------------+------+---------+-----------+---------+-----------+-----------+-------+
|valor_del_contrato|456617|      0.0|1.4225834E7|4000000.0|   1.1715E8|     2.45E7|   0.85|
|      valor_pagado|456617|      0.0|        0.0|      0.0|1.6389504E7|  5962064.0|  66.74|
|   valor_facturado|456617|      0.0|        0.0|      0.0|2.4533333E7|1.1187943E7|  51.36|
|  dias_adicionados|456617|      0.0|        0.0|      0.0|       16.0|        0.0|  86.55|
+------------------+------+---------+-----------+---------+-----------+-----------+-------+



In [8]:
# RESUMEN CATEGORICAS

def resumen_categoricas(df, columnas, top=5):
    resultados = {}
    n_total = df.count()

    for col in columnas:
        tabla = (
            df.groupBy(col)
            .count()
            .withColumn("porcentaje", F.round(F.col("count") / n_total * 100, 2))
            .orderBy(F.desc("count"))
            .limit(top)
        )
        resultados[col] = tabla

    return resultados

# variables de interés
vars_categoricas = [
    "tipo_de_contrato", 
    "modalidad_de_contratacion", 
    "estado_contrato",
    "sector",
    "orden"
]


print("Generando descriptivas categóricas...")
resumen_cat = resumen_categoricas(df_silver_typed, vars_categoricas)

# Imprimir los resultados
for columna, tabla_frecuencia in resumen_cat.items():
    print(f"\n Top 5 para la columna: {columna.upper()}")
    tabla_frecuencia.show(truncate=False)

Generando descriptivas categóricas...

 Top 5 para la columna: TIPO_DE_CONTRATO
+-----------------------+------+----------+
|tipo_de_contrato       |count |porcentaje|
+-----------------------+------+----------+
|Prestación de servicios|359310|78.69     |
|Otro                   |26529 |5.81      |
|Decreto 092 de 2017    |24869 |5.45      |
|Compraventa            |14466 |3.17      |
|Suministros            |14406 |3.15      |
+-----------------------+------+----------+


 Top 5 para la columna: MODALIDAD_DE_CONTRATACION
+------------------------------------+------+----------+
|modalidad_de_contratacion           |count |porcentaje|
+------------------------------------+------+----------+
|Contratación directa                |313201|68.59     |
|Contratación régimen especial       |76965 |16.86     |
|Mínima cuantía                      |32876 |7.2       |
|Selección Abreviada de Menor Cuantía|7677  |1.68      |
|Selección abreviada subasta inversa |7623  |1.67      |
+---------------

In [9]:
# 2. ANÁLISIS TEMPORAL (Fechas y Duración)

print("\n Generando análisis temporal...")

# Calculamos la duración en días (Fecha Fin - Fecha Inicio)

df_tiempos = df_silver_typed.withColumn(
    "duracion_contrato_dias",
    F.datediff(F.col("fecha_de_fin_del_contrato"), F.col("fecha_de_inicio_del_contrato"))
)

# rango total de los datos
rango_temporal = df_tiempos.select(
    F.min("fecha_de_firma").alias("Primer_Contrato_Firmado"),
    F.max("fecha_de_firma").alias("Ultimo_Contrato_Firmado"),
    F.min("fecha_de_inicio_del_contrato").alias("Inicio_Ejecucion_Min"),
    F.max("fecha_de_fin_del_contrato").alias("Fin_Ejecucion_Max")
)

# Calculamos la duración promedio y mediana de los contratos
estadisticas_duracion = df_tiempos.select(
    F.round(F.mean("duracion_contrato_dias"), 2).alias("Duracion_Promedio_Dias"),
    F.expr("percentile_approx(duracion_contrato_dias, 0.5)").alias("Duracion_Mediana_Dias"),
    F.min("duracion_contrato_dias").alias("Duracion_Minima"),
    F.max("duracion_contrato_dias").alias("Duracion_Maxima")
)

# Mostrar resultados
print("\n=== RANGO CRONOLÓGICO DE LA DATA ===")
rango_temporal.show()

print("=== ESTADÍSTICAS DE DURACIÓN DE CONTRATOS ===")
estadisticas_duracion.show()


 Generando análisis temporal...

=== RANGO CRONOLÓGICO DE LA DATA ===
+-----------------------+-----------------------+--------------------+-------------------+
|Primer_Contrato_Firmado|Ultimo_Contrato_Firmado|Inicio_Ejecucion_Min|  Fin_Ejecucion_Max|
+-----------------------+-----------------------+--------------------+-------------------+
|    2025-07-01 00:00:00|    2025-12-31 00:00:00| 1899-11-06 00:00:00|5025-12-20 00:00:00|
+-----------------------+-----------------------+--------------------+-------------------+

=== ESTADÍSTICAS DE DURACIÓN DE CONTRATOS ===




+----------------------+---------------------+---------------+---------------+
|Duracion_Promedio_Dias|Duracion_Mediana_Dias|Duracion_Minima|Duracion_Maxima|
+----------------------+---------------------+---------------+---------------+
|                110.34|                   90|        -730477|        1095834|
+----------------------+---------------------+---------------+---------------+



                                                                                

### Validación de Datos

In [10]:
# Calcular la duración del contrato mayor a 0
df_with_duration = df_silver_typed.withColumn(
    "duracion_calculada", 
    F.datediff(F.col("fecha_de_fin_del_contrato"), F.col("fecha_de_inicio_del_contrato"))
)

# límite cronológico (Año 1900 adelante)
fecha_limite = "1900-01-01"

# Identificar automáticamente todas las columnas de tipo fecha/timestamp
cols_temporales = [f.name for f in df_with_duration.schema.fields 
                   if isinstance(f.dataType, (DateType, TimestampType))]

# Reglas: 
# - Precio > 0
# - Fecha de Firma no nula
# - Duración > 0
# - Orden diferente a "No Definido"


# Construir la condición dinámica para TODAS las fechas
# Genera: (F.col(fecha1) >= '1900-01-01') & (F.col(fecha2) >= '1900-01-01') 


condicion_fechas_ok = F.lit(True) # Punto de partida neutro para el operador &
for col_name in cols_temporales:
    condicion_fechas_ok &= (F.col(col_name) >= fecha_limite)

# Condición de Validación (Quality Gate)
condicion_valida = (
    (F.col("valor_del_contrato") > 0) & 
    (F.col("fecha_de_firma").isNotNull()) &
    (F.col("fecha_de_firma") >= fecha_limite) &
    (F.col("fecha_de_inicio_del_contrato") >= fecha_limite) &
    (F.col("fecha_de_fin_del_contrato") >= fecha_limite) &
    (F.col("duracion_calculada") > 0) &
    (F.col("orden").isNotNull()) & 
    (F.upper(F.col("orden")) != "NO DEFINIDO")
)

# BIFURCACIÓN (Split)
df_silver = df_with_duration.filter(condicion_valida)

# Registros Inválidos (Quarantine)
df_quarantine = df_with_duration.filter(~condicion_valida)


#  Asignación de Motivos de Rechazo Detallados
df_quarantine = df_quarantine.withColumn(
    "motivo_rechazo",
    F.when(F.col("valor_del_contrato") <= 0, "Precio base menor o igual a 0")
     .when(F.col("fecha_de_firma").isNull(), "Fecha de firma nula")
     .when(F.col("fecha_de_firma") < fecha_limite, "Fecha de firma anterior a 1900")
     .when(F.col("fecha_de_inicio_del_contrato") < fecha_limite, "Fecha inicio anterior a 1900")
     .when(F.col("fecha_de_fin_del_contrato") < fecha_limite, "Fecha fin anterior a 1900")
     .when(F.col("duracion_calculada") <= 0, "Duración de contrato inválida (<= 0 días)")
     .when(F.col("duracion_calculada").isNull(), "Fechas inconsistentes para calcular duración")
     .when(F.upper(F.col("orden")) == "NO DEFINIDO", "Columna 'Orden' marcada como No Definido")
     .when(~condicion_fechas_ok, "Alguna fecha es anterior a 1900 o inválida")
     .otherwise("Error de integridad o campos nulos")
)

# numero de registros
print(f"Registros listos para Silver: {df_silver.count()}")
print(f"Registros para Cuarentena: {df_quarantine.count()}")

                                                                                

Registros listos para Silver: 438143
Registros para Cuarentena: 6412


### Guardar Datos en Silver y Quarantine

In [11]:
# configuraciones de seguridad para fechas antiguas
spark.conf.set("spark.sql.parquet.int96RebaseModeInWrite", "CORRECTED")
spark.conf.set("spark.sql.parquet.datetimeRebaseModeInWrite", "CORRECTED")

# 2. Rutas
silver_path = "/app/data/lakehouse/silver/secop"
quarantine_path = "/app/data/lakehouse/quarantine/secop_errors"

# 3. Guardado de SILVER 
print(" Iniciando guardado de Silver...")
(df_silver
 .repartition(4) 
 .write
 .format("delta")
 .mode("overwrite")
 .option("maxRecordsPerFile", 50000) # Evita archivos gigantes que consumen mucha RAM
 .save(silver_path))

# 4. Guardado de CUARENTENA
print(" Iniciando guardado de Cuarentena...")
(df_quarantine
 .repartition(2) 
 .write
 .format("delta")
 .mode("overwrite")
 .save(quarantine_path))

print(f" Ingesta Silver exitosa")
print(f" Registros en Cuarentena guardados")

 Iniciando guardado de Silver...


26/01/31 00:06:33 ERROR Utils: uncaught error in thread Spark Context Cleaner, stopping SparkContext
java.lang.OutOfMemoryError: Java heap space
	at java.base/java.lang.invoke.DirectMethodHandle.allocateInstance(DirectMethodHandle.java:501)
	at java.base/java.lang.invoke.DirectMethodHandle$Holder.newInvokeSpecial(DirectMethodHandle$Holder)
	at java.base/java.lang.invoke.Invokers$Holder.linkToTargetMethod(Invokers$Holder)
	at org.apache.spark.ContextCleaner.$anonfun$keepCleaning$1(ContextCleaner.scala:195)
	at org.apache.spark.ContextCleaner$$Lambda/0x00007c27506214c8.apply$mcV$sp(Unknown Source)
	at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1356)
	at org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:189)
	at org.apache.spark.ContextCleaner$$anon$1.run(ContextCleaner.scala:79)
26/01/31 00:06:33 ERROR Utils: throw uncaught fatal error in thread Spark Context Cleaner
java.lang.OutOfMemoryError: Java heap space
	at java.

Py4JError: py4j.reflection does not exist in the JVM