In [1]:
# %% Test: Transformación con validación de datos inválidos
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, lit
from delta import *

spark = (
    configure_spark_with_delta_pip(
        SparkSession.builder
        .appName("Lab_SECOP_TEST_Transform")
        .master("local[*]")
        .config("spark.jars.packages", "io.delta:delta-spark_2.12:3.0.0")
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
        .config(
            "spark.sql.catalog.spark_catalog",
            "org.apache.spark.sql.delta.catalog.DeltaCatalog",
        )
        .config("spark.executor.memory", "1g")
    )
    .getOrCreate()
)

bronze_path = "data/lakehouse/bronze/test_secop"

print("Leyendo datos de Bronze de prueba...")
df_bronze = spark.read.format("delta").load(bronze_path)

total_bronze = df_bronze.count()
print(f"Total registros en Bronze: {total_bronze}\n")


:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
io.delta#delta-spark_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-01db2c56-d9e8-4d85-8139-f5ca169f2fb2;1.0
	confs: [default]
	found io.delta#delta-spark_2.12;3.0.0 in central
	found io.delta#delta-storage;3.0.0 in central
	found org.antlr#antlr4-runtime;4.9.3 in central
:: resolution report :: resolve 253ms :: artifacts dl 6ms
	:: modules in use:
	io.delta#delta-spark_2.12;3.0.0 from central in [default]
	io.delta#delta-storage;3.0.0 from central in [default]
	org.antlr#antlr4-runtime;4.9.3 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   3   |   0   |   0   |   

Leyendo datos de Bronze de prueba...


26/02/02 05:28:35 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
26/02/02 05:28:42 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors

Total registros en Bronze: 10



                                                                                

In [2]:
# Identificar columnas
cols = df_bronze.columns

precio_col = "Precio_Base" if "Precio_Base" in cols else "Precio Base"
fecha_col = "Fecha_de_Firma" if "Fecha_de_Firma" in cols else "Fecha de Firma"

print(f"Usando columna de precio: {precio_col}")
print(f"Usando columna de fecha: {fecha_col}\n")


Usando columna de precio: Precio_Base
Usando columna de fecha: Fecha_de_Firma



In [3]:
# Reglas de calidad
cond_precio_ok = (col(precio_col).isNotNull()) & (col(precio_col).cast("double") > 0)
cond_fecha_ok = col(fecha_col).isNotNull()

cond_registro_valido = cond_precio_ok & cond_fecha_ok

In [4]:
# DataFrame de registros válidos
df_validos = df_bronze.filter(cond_registro_valido)

In [5]:
# DataFrame de registros inválidos con motivo_rechazo
df_invalidos = (
    df_bronze
    .filter(~cond_registro_valido)
    .withColumn(
        "motivo_rechazo",
        when(col(fecha_col).isNull(), lit("Fecha de Firma nula"))
        .when(col(precio_col).isNull(), lit("Precio Base nulo"))
        .when(col(precio_col).cast("double") <= 0, lit("Precio Base <= 0"))
        .otherwise(lit("Incumple reglas de calidad"))
    )
)

print(f"✅ Registros válidos (SILVER): {df_validos.count()}")
print(f"❌ Registros inválidos (QUARANTINE): {df_invalidos.count()}\n")

                                                                                

✅ Registros válidos (SILVER): 6
❌ Registros inválidos (QUARANTINE): 4



In [6]:
# Mostrar los registros inválidos
if df_invalidos.count() > 0:
    print("=" * 80)
    print("REGISTROS INVÁLIDOS CAPTURADOS:")
    print("=" * 80)
    df_invalidos.select("Entidad", "Precio_Base", "Fecha_de_Firma", "motivo_rechazo").show(20, truncate=False)

REGISTROS INVÁLIDOS CAPTURADOS:
+-------------+-----------+--------------+-------------------+
|Entidad      |Precio_Base|Fecha_de_Firma|motivo_rechazo     |
+-------------+-----------+--------------+-------------------+
|HOSPITAL 3   |NULL       |NULL          |Fecha de Firma nula|
|UNIVERSIDAD 4|-100       |2023-04-10    |Precio Base <= 0   |
|MUNICIPIO 6  |NULL       |NULL          |Fecha de Firma nula|
|EMPRESA 8    |NULL       |NULL          |Fecha de Firma nula|
+-------------+-----------+--------------+-------------------+



                                                                                

In [7]:
# Escritura
silver_path = "data/lakehouse/silver/test_secop"
quarantine_path = "data/lakehouse/quarantine/test_secop_errors"

print(f"\nEscribiendo registros VÁLIDOS en: {silver_path}")
(
    df_validos.write
    .format("delta")
    .mode("overwrite")
    .save(silver_path)
)

print(f"Escribiendo registros INVÁLIDOS en: {quarantine_path}")
(
    df_invalidos.write
    .format("delta")
    .mode("overwrite")
    .save(quarantine_path)
)

print("\n✅ Transformación de prueba completada.")
print(f"   - Silver (válidos): {df_validos.count()} registros")
print(f"   - Quarantine (inválidos): {df_invalidos.count()} registros")


Escribiendo registros VÁLIDOS en: data/lakehouse/silver/test_secop


                                                                                

Escribiendo registros INVÁLIDOS en: data/lakehouse/quarantine/test_secop_errors


                                                                                


✅ Transformación de prueba completada.
   - Silver (válidos): 6 registros
   - Quarantine (inválidos): 4 registros
