In [1]:
import sys
import os

sys.path.append(os.path.abspath('..'))

from src.utilities import *

In [2]:
TABLE_BRONZE_EXP = "bronze_dev.sctr_emision.expuestos_bronze"
TABLE_BRONZE_CONT = "bronze_dev.sctr_emision.contratantes_bronze"

TABLE_SILVER_EXP = "silver_dev.sctr_emision.expuestos_silver"
TABLE_SILVER_CONT = "silver_dev.sctr_emision.contratantes_silver"

COLS_NAM_EXP_FINAL = ['POLIZA','F_INI_VIGEN_POLIZA','F_FIN_VIGEN_POLIZA',
                'CERTIFICADO','F_INI_COBERT','F_FIN_COBERT',
                'TIPO_DOC','NUM_DOC','ULT_DIGI_DOC','EXPUESTO',
                'YEAR_MOV','MONTH_MOV','FECHA_CARGA']
                
COLS_NAM_CONT_FINAL = ['POLIZA','TIPO_DOC','NUM_DOC_CONT','CONTRATANTE','YEAR_MOV','MONTH_MOV','FECHA_CARGA']

open_log("Silver")

logger.info(f"‚ö™ Iniciando proceso Silver en Databricks. Cl√∫ster: {spark.conf.get('spark.databricks.clusterUsageTags.clusterId')}")

üîÑ Historial detectado en Volumen. Restaurando 'ETL_Emision_SCTR_Silver_2026-01-12.log' a local...
[36m[1m12/01/2026 16:39:32[0m[36m[0m | [36mAVISO  [0m | [36müìù Log local iniciado en: /tmp/ETL_Emision_SCTR_Silver_2026-01-12.log[0m
[36m[1m12/01/2026 16:39:32[0m[36m[0m | [36mAVISO  [0m | [36müîÑ Sincronizaci√≥n autom√°tica a: /Volumes/landing_dev/sctr_emision/inputs_volumen/Logs/ETL_Emision_SCTR_Silver_2026-01-12.log[0m
[36m[1m12/01/2026 16:39:32[0m[36m[0m | [36mAVISO  [0m | [36m‚ö™ Iniciando proceso Silver en Databricks. Cl√∫ster: 0102-190343-jcacerrl[0m


In [3]:
def transform_expuestos_silver(periodo = PERIODO) -> DataFrame:
    try:
        logger.info(f"   üîç Leyendo Bronze Expuestos (Periodo: {periodo})...")
        
        df = spark.read.table(TABLE_BRONZE_EXP) \
                 .filter(F.col("FECHA_CARGA") == F.lit(periodo))

        total_rows = df.count()
        if total_rows == 0: 
            logger.warning(f"   ‚ö†Ô∏è La tabla Bronze Expuestos no contiene registros.")
            # raise Exception(f"La tabla Bronze Expuestos no contiene registros.")
            return None
        
        logger.info(f"   üîÑ Transformando Bronze Expuestos...")
        
        def try_parse_dates(col_name):
            return F.coalesce(*[F.to_date(F.substring(F.col(col_name), 1, 10), fmt) for fmt in DATE_FORMATS])

        df_clean = (
            df 
            .withColumn("NUM_DOC", 
                F.translate(F.col("NUM_DOC"), "'\"_", "") 
            )
            .withColumn("ULT_DIGI_DOC", F.expr("try_cast(substring(NUM_DOC, -1, 1) as INT)"))
            .withColumn("POLIZA", F.col("POLIZA").cast(DecimalType(scale=2)).cast(LongType()))
            .withColumn("YEAR_MOV", F.col("YEAR_MOV").cast(DecimalType(scale=2)).cast(IntegerType())) 
            .withColumn("MONTH_MOV", F.col("MONTH_MOV").cast(DecimalType(scale=2)).cast(IntegerType())) 
            .withColumn("TIPO_DOC_RAW", F.col("TIPO_DOC").cast(DecimalType(scale=2)).cast(IntegerType())) 
            .withColumn("EXPUESTO", 
                F.regexp_replace(
                    F.trim(F.concat_ws(" ", 
                        F.col("P_NOMBRE"), F.col("S_NOMBRE"), 
                        F.col("AP_PATERNO"), F.col("AP_MATERNO")
                    )), 
                    "  ", " "
                )
            ) 
            .withColumn("TIPO_DOC_DESC", 
                F.when(F.col("TIPO_DOC_RAW") == 1, "DNI")
                .when(F.col("TIPO_DOC_RAW") == 2, "CE")
                .when(F.col("TIPO_DOC_RAW") == 5, "PAS")
                .otherwise("OTROS")
            ) 
            .withColumn("NUM_DOC_CLEAN",
                F.when(
                    (F.length(F.col("NUM_DOC")).isin([5, 6, 7])) & 
                    (F.col("NUM_DOC").rlike("^\\d+$")),
                    F.lpad(F.col("NUM_DOC"), 8, '0') # Zfill
                ).otherwise(F.col("NUM_DOC"))
            ) 
            .drop("TIPO_DOC", "NUM_DOC") 
            .withColumnRenamed("TIPO_DOC_DESC", "TIPO_DOC") 
            .withColumnRenamed("NUM_DOC_CLEAN", "NUM_DOC")
        )

        cols_date = ['F_INI_VIGEN_POLIZA','F_FIN_VIGEN_POLIZA','F_INI_COBERT','F_FIN_COBERT']
        for c in cols_date:
            df_clean = df_clean.withColumn(c, try_parse_dates(c))

        df_clean = df_clean.filter(
            F.col("POLIZA").isNotNull()
        )

        df_final = df_clean.withColumn("FECHA_CARGA", F.lit(periodo))

        df_final = df_clean.select(*COLS_NAM_EXP_FINAL) \
                    .dropDuplicates(COLS_NAM_EXP_FINAL)

        total_rows = df_final.count()
        logger.info(f"   üìä Total Registros Guardados: {total_rows:,.0f}")

        return df_final
    except Exception as e:
        logger.error(f"   ‚ùå Error en Transformaci√≥n Silver Expuestos. {e}")
        return None
    
def transform_contratantes_silver(periodo = PERIODO) -> DataFrame:
    try:
        logger.info(f"   üîç Leyendo Bronze Contratantes (Periodo: {periodo})...")

        df = spark.read.table(TABLE_BRONZE_CONT) \
                 .filter(F.col("FECHA_CARGA") == F.lit(periodo))

        total_rows = df.count()
        if total_rows == 0: 
            logger.warning(f"   ‚ö†Ô∏è La tabla Bronze Contratantes no contiene registros.")
            return None
        
        logger.info(f"   üîÑ Transformando Bronze Contratantes...")

        df_clean = (
            df 
            .withColumn("NUM_DOC_CONT", 
                F.translate(F.col("NUM_DOC_CONT"), "'\"_", "") 
            )
            .withColumn("POLIZA", F.col("POLIZA").cast(DecimalType(scale=2)).cast(LongType())) \
            .withColumn("YEAR_MOV", F.col("YEAR_MOV").cast(LongType()).cast(IntegerType())) 
            .withColumn("MONTH_MOV", F.col("MONTH_MOV").cast(LongType()).cast(IntegerType())) 
            .withColumn("TIPO_DOC_RAW", F.col("TIPO_DOC").cast(LongType()).cast(IntegerType())) 
            .withColumn("TIPO_DOC_DESC", 
                F.when(F.col("TIPO_DOC_RAW") == 1, "DNI")
                .when(F.col("TIPO_DOC_RAW") == 6, "RUC")
                .otherwise("OTRO")
            ) 
            .withColumn("NUM_DOC_CONT_CLEAN",
                F.when(
                    (F.col("TIPO_DOC_DESC") == "DNI") &
                    (F.length(F.col("NUM_DOC_CONT")).isin([5, 6, 7])) &
                    (F.col("NUM_DOC_CONT").rlike("^\\d+$")),
                    F.lpad(F.col("NUM_DOC_CONT"), 8, '0')
                ).otherwise(F.col("NUM_DOC_CONT"))
            ) 
            .drop("TIPO_DOC", "NUM_DOC_CONT") 
            .withColumnRenamed("TIPO_DOC_DESC", "TIPO_DOC") 
            .withColumnRenamed("NUM_DOC_CONT_CLEAN", "NUM_DOC_CONT")
        )
            
        df_clean = df_clean.filter(
            F.col("POLIZA").isNotNull() & F.col("CONTRATANTE").isNotNull()
        )

        df_final = df_clean.withColumn("FECHA_CARGA", F.lit(periodo))

        df_final = df_clean.select(*COLS_NAM_CONT_FINAL) \
                    .dropDuplicates(COLS_NAM_CONT_FINAL)

        total_rows = df_final.count()
        logger.info(f"   üìä Total Registros Guardados: {total_rows:,.0f}")

        return df_final
    except Exception as e:
        logger.error(f"   ‚ùå Error en Transformaci√≥n Silver Contratantes. {e}")
        return None
    
def merge_to_delta(df_new: DataFrame, table_name: str, unique_keys: list) -> bool:
    try:
        logger.info(f"   üîÑ Iniciando MERGE (Upsert) en {table_name}...")
        
        condition = " AND ".join([f"t.{col} = s.{col}" for col in unique_keys])

        target_table = DeltaTable.forName(spark, table_name)
        
        (target_table.alias("t")
        .merge(
            df_new.alias("s"),
            condition
        )
        .whenMatchedUpdateAll()
        .whenNotMatchedInsertAll()
        .execute()
        )
        logger.info(f"   üíæ MERGE Guardado en {table_name}")
        last_operation = target_table.history(1).select("operationMetrics").collect()[0][0]
        num_inserted = last_operation.get("numTargetRowsInserted", "0")
        num_updated = last_operation.get("numTargetRowsUpdated", "0")
        logger.info(f"   üìà Merge Reporte: Insertados={num_inserted}, Actualizados={num_updated}")

        print(target_table.detail())
        total_rows = target_table.toDF().count()
        # total_rows = int(target_table.detail().select("numRecords").collect()[0][0])
        logger.info(f"   üìä Total Registros Guardados (Post Merge): {total_rows:,.0f}")

        return True
    except Exception as e:
        logger.error(f"   ‚ùå Error en Merge Delta ({table_name}). {e}")
        return False

In [4]:
def start_process(df: DataFrame|None, process: str, table_name: str, unique_keys: list) -> bool:
    status = False
    condition = ", ".join([col for col in unique_keys])

    status = True if df is not None else False
    if not status:
        return False

    if validate_table_delta(table_name):
        df_count = spark.read.table(table_name).count()
        if df_count > 0:
            status = merge_to_delta(df, table_name, unique_keys)
        else:
            status = save_to_table_delta(df, table_name, "overwrite", "false")
    else:
        status = save_to_table_delta(df, table_name, "overwrite", "false")

    if validate_table_delta(table_name, False):
        logger.info(f"   üßπ Optimizando tabla Silver {process}...")
        spark.sql(f"OPTIMIZE {table_name} ZORDER BY ({condition})")

        logger.info(f"   üìã Analizando tabla Silver {process}...")
        spark.sql(f"ANALYZE TABLE {table_name} COMPUTE STATISTICS FOR COLUMNS POLIZA")
        
        status = True

    return status

In [None]:
RUN_EXPUESTOS = True
RUN_CONTRATANTES = True
STATUS = False
ON_DEMAND = False

try:
    if RUN_EXPUESTOS:
        if ON_DEMAND:
            df_exp_silver = transform_expuestos_silver(datetime.strptime("2026-01-05","%Y-%m-%d").date())
        else:
            df_exp_silver = transform_expuestos_silver()
        keys_exp = ['POLIZA', 'CERTIFICADO', 'NUM_DOC', 'YEAR_MOV', 'MONTH_MOV'] 
        STATUS = start_process(df_exp_silver, "Expuestos", TABLE_SILVER_EXP, keys_exp)

    if RUN_CONTRATANTES:
        if ON_DEMAND:
            df_cont_silver = transform_contratantes_silver(datetime.strptime("2026-01-05","%Y-%m-%d").date())
        else:
            df_cont_silver = transform_contratantes_silver()
        keys_cont = ['POLIZA', 'NUM_DOC_CONT', 'YEAR_MOV', 'MONTH_MOV']
        STATUS = start_process(df_cont_silver, "Contratantes", TABLE_SILVER_CONT, keys_cont)

    if STATUS:
        logger.success("üèÅ Ejecuci√≥n Completa: Proceso Silver Finalizado con √©xito.")
    else:
        logger.error("üèÅ Ejecuci√≥n Incompleta: Proceso Silver Finalizado con Error.")
except Exception as e:
    logger.error(f"‚ùå Error cr√≠tico en proceso Silver. {e}")
finally:
    HORA_FINAL = datetime.now()
    difference_time = HORA_FINAL-HORA_INICIAL
    total_seconds = int(difference_time.total_seconds())
    difference_formated = "{} minuto(s), {} segundo(s)".format((total_seconds // 60), total_seconds % 60)
    logger.info(f"Tiempo de proceso: {difference_formated}")
    close_log()

[36m[1m12/01/2026 16:39:32[0m[36m[0m | [36mAVISO  [0m | [36m   üîç Leyendo Bronze Expuestos (Periodo: 2026-01-05)...[0m
[36m[1m12/01/2026 16:39:33[0m[36m[0m | [36mAVISO  [0m | [36m   üîÑ Transformando Bronze Expuestos...[0m
[36m[1m12/01/2026 16:39:47[0m[36m[0m | [36mAVISO  [0m | [36m   üìä Total Registros Guardados: 8,262,012[0m
[36m[1m12/01/2026 16:39:48[0m[36m[0m | [36mAVISO  [0m | [36m   ‚úÖ La tabla Delta existe en silver_dev.sctr_emision.expuestos_silver.[0m
[36m[1m12/01/2026 16:40:07[0m[36m[0m | [36mAVISO  [0m | [36m   üßπ Optimizando tabla Silver Expuestos...[0m
[36m[1m12/01/2026 16:40:16[0m[36m[0m | [36mAVISO  [0m | [36m   üìã Analizando tabla Silver Expuestos...[0m
[36m[1m12/01/2026 16:40:17[0m[36m[0m | [36mAVISO  [0m | [36m   üîç Leyendo Bronze Contratantes (Periodo: 2026-01-05)...[0m
[36m[1m12/01/2026 16:40:17[0m[36m[0m | [36mAVISO  [0m | [36m   üîÑ Transformando Bronze Contratantes...[0m
[36m[1