#### Silver Layer - Incremental Load
- Purpose: Incrementally update Silver tables from Bronze layer changes
- Layer: Silver (Clean Data)
- Load Type: Incremental

---
### Dependencias
---

In [15]:
import os
from datetime import datetime, timedelta
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
import logging
from pyspark import StorageLevel

StatementMeta(, 9cc54385-f449-4f4e-b6f3-310c3a7da6b1, 17, Finished, Available, Finished)

---
### Configuraciones de optimización
---

In [16]:
execution_date = os.environ.get("execution_date", datetime.now().isoformat())
lookback_days = int(os.environ.get("lookback_days", "1"))

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

StatementMeta(, 9cc54385-f449-4f4e-b6f3-310c3a7da6b1, 18, Finished, Available, Finished)

---
### Configuraciones de optimización
---

In [17]:
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.databricks.delta.optimizeWrite.enabled", "true")
spark.conf.set("spark.databricks.delta.autoCompact.enabled", "true")
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")

StatementMeta(, 9cc54385-f449-4f4e-b6f3-310c3a7da6b1, 19, Finished, Available, Finished)

---
### Tratamiento de la data
---

In [18]:
class SilverIncrementalProcessor:
    def __init__(self, spark_session):
        self.spark = spark_session
        
    def get_last_silver_execution_timestamp(self, table_name: str):
        """Obtener la última marca de tiempo de ejecución exitosa para una tabla silver"""
        try:
            if not self.spark._jsparkSession.catalog().tableExists("silver_incremental_control"):
                silver_table_name = table_name.replace('bronze_', 'silver_')
                if self.spark._jsparkSession.catalog().tableExists(silver_table_name):
                    max_date = self.spark.table(silver_table_name).agg(max("silver_created_date")).collect()[0][0]
                    logger.info(f"Usando max silver_created_date para {table_name}: {max_date}")
                    return max_date
                return None
                
            control_df = self.spark.table("silver_incremental_control")
            last_run = control_df.filter(
                (col("table_name") == table_name) & 
                (col("status") == "success")
            ).orderBy(col("execution_timestamp").desc()).limit(1).collect()
            
            if last_run:
                return last_run[0]["last_processed_timestamp"]
            else:
                silver_table_name = table_name.replace('bronze_', 'silver_')
                if self.spark._jsparkSession.catalog().tableExists(silver_table_name):
                    max_date = self.spark.table(silver_table_name).agg(max("silver_created_date")).collect()[0][0]
                    logger.info(f"Primera ejecución incremental para {table_name}. Usando max date: {max_date}")
                    return max_date
                return None
        except Exception as e:
            logger.warning(f"No se pudo obtener la marca de tiempo para {table_name}: {e}")
            return None
    
    def update_silver_execution_control(self, table_name: str, last_processed_timestamp, status: str, record_count: int = 0):
        """Actualizar la tabla de control silver con la información de ejecución"""
        control_data = [(
            table_name,
            execution_date,
            datetime.now(),
            last_processed_timestamp,
            status,
            record_count
        )]
        
        control_schema = StructType([
            StructField("table_name", StringType(), True),
            StructField("execution_id", StringType(), True),
            StructField("execution_timestamp", TimestampType(), True),
            StructField("last_processed_timestamp", TimestampType(), True),
            StructField("status", StringType(), True),
            StructField("record_count", IntegerType(), True)
        ])
        
        control_df = self.spark.createDataFrame(control_data, control_schema)
        control_df.write.format("delta").mode("append").option("mergeSchema", "true").saveAsTable("silver_incremental_control")
    
    def get_bronze_tables(self):
        """Obtener la lista de tablas bronze a procesar"""
        tables_df = self.spark.sql("""
            SHOW TABLES LIKE 'bronze_*'
        """)
        all_tables = [row.tableName for row in tables_df.collect()]
        
        # Excluir tablas de control y metadata
        excluded_tables = ['bronze_execution_log', 'bronze_notebook_execution_summary']
        filtered_tables = [table for table in all_tables if table not in excluded_tables]
        
        logger.info(f"Found {len(filtered_tables)} business tables to process (excluded {len(excluded_tables)} metadata tables)")
        return filtered_tables
    
    def standardize_data_types(self, df, table_name):
        schema_dict = dict(df.dtypes)
        transformations = []
        
        for col_name, col_type in schema_dict.items():
            new_col = col_name
            
            if col_name.lower().endswith('date') and col_type == 'bigint':
                new_col = when(
                    col(col_name) > 1000000000000,  # Si > año 2001, son nanosegundos
                    from_unixtime(col(col_name) / 1000000000).cast(TimestampType())
                ).otherwise(
                    from_unixtime(col(col_name)).cast(TimestampType())
                ).alias(col_name)
                transformations.append(new_col)
            
            elif col_name.lower() == 'month' and col_type == 'bigint':
                new_col = from_unixtime(col(col_name) / 1000000000).cast(TimestampType()).alias(col_name)
                transformations.append(new_col)
                
            elif col_name.lower() == 'dwcreateddate' and col_type != 'timestamp':
                if col_type in ['string', 'datetime']:
                    new_col = to_timestamp(col(col_name)).alias(col_name)
                    transformations.append(new_col)
                else:
                    transformations.append(col(col_name))
            
            elif col_name.lower().endswith(('_key', 'key')) and col_type == 'string':
                new_col = when(
                    trim(upper(col(col_name))).isin("", "NULL", "N/A", "UNKNOWN", "NONE") |
                    trim(col(col_name)).isNull(),
                    None
                ).otherwise(
                    trim(upper(col(col_name))) 
                ).alias(col_name)
                transformations.append(new_col)
            
            elif col_type == 'string':
                new_col = when(
                    trim(upper(col(col_name))).isin("", "NULL", "N/A", "UNKNOWN", "NONE", "#N/A") |
                    trim(col(col_name)).isNull(),
                    None
                ).otherwise(
                    trim(col(col_name))
                ).alias(col_name)
                transformations.append(new_col)
            
            elif col_type in ['double', 'float']:
                new_col = when(
                    isnan(col(col_name)) | col(col_name).isNull(), None
                ).otherwise(col(col_name)).alias(col_name)
                transformations.append(new_col)
            
            elif col_type == 'boolean':
                transformations.append(col(col_name))
                
            else:
                transformations.append(col(col_name))
        
        if transformations:
            df = df.select(*transformations)
            
        return df
    
    def create_quarantine_records(self, df, table_name):
        """NUEVO: Crear registros de cuarentena para datos problemáticos"""                
        problem_filter = None
        
        # Filtro para fechas futuras irreales
        future_cutoff = date_add(current_date(), 730)  # 2 años en el futuro
        for col_name in df.columns:
            if col_name.lower().endswith('date') and col_name.lower() != 'dwcreateddate':
                date_condition = col(col_name) > future_cutoff
                problem_filter = date_condition if problem_filter is None else (problem_filter | date_condition)
        
        for col_name, col_type in dict(df.dtypes).items():
            if col_type in ['double', 'float'] and 'value' in col_name.lower():
                value_condition = (col(col_name) > 100000000) | (col(col_name) < -10000000)
                problem_filter = value_condition if problem_filter is None else (problem_filter | value_condition)
        
        if problem_filter is not None:
            try:
                quarantine_df = df.filter(problem_filter)
                clean_df = df.filter(~problem_filter)
                
                if quarantine_df.count() > 0:
                    quarantine_table = f"silver_quarantine_{table_name.replace('silver_', '')}"
                    
                    quarantine_with_metadata = (
                        quarantine_df
                        .withColumn("quarantine_date", current_timestamp())
                        .withColumn("quarantine_reason", lit("data_quality_issues"))
                        .withColumn("source_table", lit(table_name))
                    )
                    
                    quarantine_with_metadata.write.mode("append").format("delta").saveAsTable(quarantine_table)
                    logger.info(f"Quarantined {quarantine_df.count()} problematic records from {table_name}")
                
                return clean_df
            except Exception as e:
                logger.warning(f"Quarantine process failed for {table_name}: {str(e)}. Returning original data.")
                return df
        return df
    
    def optimize_partitioning(self, df, table_name):
        """NUEVO: Optimización dinámica de particiones"""
        row_count = df.count()
        current_partitions = df.rdd.getNumPartitions()
        
        if row_count > 1000000: 
            base_partitions = row_count // 150000
            if base_partitions < 4:
                optimal_partitions = 4
            elif base_partitions > 200:
                optimal_partitions = 200
            else:
                optimal_partitions = base_partitions
        else:  
            calculated_partitions = row_count // 50000
            optimal_partitions = 1 if calculated_partitions < 1 else calculated_partitions
        
        if current_partitions != optimal_partitions:
            logger.info(f"{table_name}: Repartitioning from {current_partitions} to {optimal_partitions}")
            df = df.repartition(optimal_partitions)
        return df
    
    def remove_duplicates(self, df, table_name):
        """Eliminar duplicados conservando el registro más reciente - MEJORADO"""
        if table_name.startswith('silver_dim_'):
            cols_except_created = [col for col in df.columns if col.lower() not in ['dwcreateddate', 'silver_created_date', 'silver_execution_id']]
            
            if 'dwcreateddate' in df.columns and cols_except_created:
                window_spec = Window.partitionBy(*cols_except_created).orderBy(desc('dwcreateddate'))
                df = df.withColumn('rn', row_number().over(window_spec)) \
                      .filter(col('rn') == 1) \
                      .drop('rn')
                      
        elif table_name.startswith('silver_fact_'):
            key_columns = []
            for col_name in df.columns:
                if any(pattern in col_name.lower() for pattern in ['_number', '_key', 'customer_key', 'product_key']):
                    key_columns.append(col_name)
            
            if key_columns and 'dwcreateddate' in df.columns:
                window_spec = Window.partitionBy(*key_columns).orderBy(desc('dwcreateddate'))
                df = df.withColumn('rn', row_number().over(window_spec)) \
                      .filter(col('rn') == 1) \
                      .drop('rn')
        
        return df
    
    def data_quality_checks(self, df, table_name):
        """Chequeos básicos de calidad de datos y limpieza - MEJORADO"""
        initial_count = df.count()
        df_clean = df.dropna(how='all')
        
        # Aplicar cuarentena
        df_clean = self.create_quarantine_records(df_clean, table_name)
        
        final_count = df_clean.count()
        processed_rows = initial_count - final_count
        
        if processed_rows > 0 and initial_count > 0:
            logger.info(f"{table_name}: Processed {processed_rows} problematic rows ({(processed_rows/initial_count)*100:.2f}%)")
        
        return df_clean
    
    def get_incremental_bronze_data(self, bronze_table_name, last_processed_timestamp):
        """Obtener datos incrementales de la tabla bronze"""
        bronze_df = self.spark.table(bronze_table_name)
        
        if last_processed_timestamp is None:
            logger.info(f"Primera carga incremental para {bronze_table_name} - procesando todos los datos")
            return bronze_df
        
        safe_timestamp = last_processed_timestamp - timedelta(hours=1)
        logger.info(f"Carga incremental para {bronze_table_name} desde {safe_timestamp}")
        
        if 'dwcreateddate' in bronze_df.columns:
            incremental_df = bronze_df.filter(col('dwcreateddate') > safe_timestamp)
        else:
            logger.warning(f"No existe la columna dwcreateddate en {bronze_table_name}, procesando todos los datos")
            incremental_df = bronze_df
            
        return incremental_df
    
    def merge_to_silver_table(self, new_df, silver_table_name):
        """Fusionar datos incrementales en la tabla silver destino - MEJORADO"""
        if not self.spark._jsparkSession.catalog().tableExists(silver_table_name):
            logger.info(f"Creando nueva tabla silver: {silver_table_name}")
            record_count = new_df.count()
            
            writer = new_df.write.mode("overwrite").format("delta")
            
            if 'fact' in silver_table_name:
                writer = (writer
                          .option("optimizeWrite", "true")
                          .option("autoCompact", "true")
                          .option("dataSkippingNumIndexedCols", "5"))
            else:
                writer = writer.option("optimizeWrite", "true")
            
            writer.saveAsTable(silver_table_name)
            logger.info(f"Final partitions for {silver_table_name}: {new_df.rdd.getNumPartitions()}")
            return record_count
    
        logger.info(f"Fusionando datos en la tabla silver existente: {silver_table_name}")
        existing_df = self.spark.table(silver_table_name)
        combined_df = existing_df.union(new_df)
        
        if silver_table_name.startswith('silver_dim_'):
            cols_except_meta = [col for col in combined_df.columns 
                               if col.lower() not in ['dwcreateddate', 'silver_created_date', 'silver_execution_id']]
            
            if 'dwcreateddate' in combined_df.columns and cols_except_meta:
                window_spec = Window.partitionBy(*cols_except_meta).orderBy(desc('dwcreateddate'))
                final_df = combined_df.withColumn('rn', row_number().over(window_spec)) \
                                     .filter(col('rn') == 1) \
                                     .drop('rn')
            else:
                final_df = combined_df
                
        elif silver_table_name.startswith('silver_fact_'):
            key_columns = []
            for col_name in combined_df.columns:
                if any(pattern in col_name.lower() for pattern in ['_number', '_key', 'customer_key', 'product_key']):
                    key_columns.append(col_name)
            
            if key_columns and 'dwcreateddate' in combined_df.columns:
                window_spec = Window.partitionBy(*key_columns).orderBy(desc('dwcreateddate'))
                final_df = combined_df.withColumn('rn', row_number().over(window_spec)) \
                                     .filter(col('rn') == 1) \
                                     .drop('rn')
            else:
                final_df = combined_df
        else:
            final_df = combined_df
        
        record_count = final_df.count()
        
        writer = final_df.write.mode("overwrite").format("delta")
        
        if 'fact' in silver_table_name:
            writer = (writer
                      .option("optimizeWrite", "true")
                      .option("autoCompact", "true")
                      .option("dataSkippingNumIndexedCols", "5"))
        else:
            writer = writer.option("optimizeWrite", "true")
        
        writer.saveAsTable(silver_table_name)
        logger.info(f"Final partitions for {silver_table_name}: {final_df.rdd.getNumPartitions()}")
        
        return record_count
    
    def process_silver_incremental(self, bronze_table_name):
        """Procesar actualizaciones incrementales para una tabla bronze - MEJORADO"""
        try:
            silver_table_name = bronze_table_name.replace('bronze_', 'silver_')
            last_timestamp = self.get_last_silver_execution_timestamp(bronze_table_name)
            logger.info(f"Procesando {bronze_table_name} -> {silver_table_name}")
            bronze_incremental = self.get_incremental_bronze_data(bronze_table_name, last_timestamp)
            
            if bronze_incremental.count() == 0:
                logger.info(f"No hay nuevos datos para {bronze_table_name}")
                self.update_silver_execution_control(bronze_table_name, last_timestamp or datetime.now(), "success", 0)
                return 0
            
            df_processed = bronze_incremental
            df_processed = self.standardize_data_types(df_processed, silver_table_name)
            df_processed = self.remove_duplicates(df_processed, silver_table_name)
            df_processed = self.data_quality_checks(df_processed, silver_table_name)
            df_processed = self.optimize_partitioning(df_processed, silver_table_name)  # NUEVO
            
            df_processed = df_processed.withColumn('silver_created_date', current_timestamp()) \
                                       .withColumn('silver_execution_id', lit(execution_date))

            df_processed = df_processed.persist(StorageLevel.MEMORY_AND_DISK)
            
            max_timestamp = None
            if 'dwcreateddate' in df_processed.columns:
                max_timestamp = df_processed.agg(max('dwcreateddate')).collect()[0][0]
            if max_timestamp is None:
                max_timestamp = datetime.now()
            
            final_record_count = self.merge_to_silver_table(df_processed, silver_table_name)
            self.update_silver_execution_control(bronze_table_name, max_timestamp, "success", df_processed.count())
            df_processed.unpersist()
            
            logger.info(f"Procesado exitosamente {silver_table_name}: {df_processed.count():,} nuevos registros, {final_record_count:,} en total")
            return df_processed.count()
            
        except Exception as e:
            logger.error(f"Error procesando {bronze_table_name}: {str(e)}")
            self.update_silver_execution_control(bronze_table_name, None, "failed", 0)
            raise e

StatementMeta(, 9cc54385-f449-4f4e-b6f3-310c3a7da6b1, 20, Finished, Available, Finished)

In [19]:
silver_processor = SilverIncrementalProcessor(spark)

StatementMeta(, 9cc54385-f449-4f4e-b6f3-310c3a7da6b1, 21, Finished, Available, Finished)

In [20]:
bronze_tables = silver_processor.get_bronze_tables()
logger.info(f"Found {len(bronze_tables)} bronze tables for incremental processing")

StatementMeta(, 9cc54385-f449-4f4e-b6f3-310c3a7da6b1, 22, Finished, Available, Finished)

INFO:__main__:Found 14 bronze tables for incremental processing


In [21]:
results = []
total_records = 0

for bronze_table in bronze_tables:
    try:
        record_count = silver_processor.process_silver_incremental(bronze_table)
        silver_table = bronze_table.replace('bronze_', 'silver_')
        
        results.append({
            'bronze_table': bronze_table,
            'silver_table': silver_table,
            'record_count': record_count,
            'status': 'success'
        })
        total_records += record_count
        print(f"✓ {silver_table}: {record_count:,} records processed")
        
    except Exception as e:
        results.append({
            'bronze_table': bronze_table,
            'silver_table': bronze_table.replace('bronze_', 'silver_'),
            'record_count': 0,
            'status': 'failed',
            'error': str(e)
        })
        print(f"✗ {bronze_table}: FAILED - {str(e)}")
    
    print("-" * 50)

StatementMeta(, 9cc54385-f449-4f4e-b6f3-310c3a7da6b1, 23, Finished, Available, Finished)

INFO:__main__:Procesando bronze_dim_brands -> silver_dim_brands
INFO:__main__:Carga incremental para bronze_dim_brands desde 2025-09-18 21:17:48.136527
INFO:__main__:No hay nuevos datos para bronze_dim_brands
INFO:__main__:Procesando bronze_dim_budget_rate -> silver_dim_budget_rate
INFO:__main__:Carga incremental para bronze_dim_budget_rate desde 2025-09-18 21:17:56.290482
INFO:__main__:No hay nuevos datos para bronze_dim_budget_rate
INFO:__main__:Procesando bronze_dim_customers -> silver_dim_customers
INFO:__main__:Carga incremental para bronze_dim_customers desde 2025-09-18 21:18:04.726647
INFO:__main__:No hay nuevos datos para bronze_dim_customers
INFO:__main__:Procesando bronze_dim_employees -> silver_dim_employees
INFO:__main__:Carga incremental para bronze_dim_employees desde 2025-09-18 21:18:14.040441
INFO:__main__:No hay nuevos datos para bronze_dim_employees
INFO:__main__:Procesando bronze_dim_exchange_rate -> silver_dim_exchange_rate
INFO:__main__:Carga incremental para bronz

✓ silver_dim_brands: 0 records processed
--------------------------------------------------
✓ silver_dim_budget_rate: 0 records processed
--------------------------------------------------
✓ silver_dim_employees: 0 records processed
--------------------------------------------------
✓ silver_dim_exchange_rate: 0 records processed
--------------------------------------------------
✓ silver_dim_invoice_doctype: 0 records processed
--------------------------------------------------
✓ silver_dim_order_status: 0 records processed
--------------------------------------------------
✓ silver_dim_products: 0 records processed
--------------------------------------------------
✓ silver_dim_regions: 0 records processed
--------------------------------------------------
✓ silver_fact_budget: 0 records processed
--------------------------------------------------
✓ silver_fact_orders: 0 records processed
--------------------------------------------------


---
### Logs del proceso
---

In [22]:
execution_log_data = [(
    execution_date,
    "silver_incremental_load",
    datetime.now(),
    "completed" if all(r["status"] == "success" for r in results) else "completed_with_errors",
    "silver",
    "incremental",
    total_records,
    len([r for r in results if r["status"] == "success"]),
    len([r for r in results if r["status"] == "failed"]),
    len([r for r in results if r.get("record_count", 0) > 0]),  # Tables with new data
    str(results)[:1000]
)]

execution_log_schema = StructType([
    StructField("execution_id", StringType(), True),
    StructField("pipeline_name", StringType(), True),
    StructField("execution_timestamp", TimestampType(), True),
    StructField("status", StringType(), True),
    StructField("layer", StringType(), True),
    StructField("load_type", StringType(), True),
    StructField("total_records", LongType(), True),
    StructField("successful_tables", IntegerType(), True),
    StructField("failed_tables", IntegerType(), True),
    StructField("updated_tables", IntegerType(), True),
    StructField("details", StringType(), True)
])

execution_log = spark.createDataFrame(execution_log_data, execution_log_schema)
execution_log.write.format("delta").mode("append").option("mergeSchema", "true").saveAsTable("silver_execution_log")

StatementMeta(, 9cc54385-f449-4f4e-b6f3-310c3a7da6b1, 24, Finished, Available, Finished)

---
### Resumen del proceso
---

In [23]:
successful_loads = len([r for r in results if r["status"] == "success"])
failed_loads = len([r for r in results if r["status"] == "failed"])
updated_tables = len([r for r in results if r.get("record_count", 0) > 0])

print("=" * 60)
print("SILVER INCREMENTAL LOAD SUMMARY:")
print(f"Successful processes: {successful_loads}")
print(f"Failed processes: {failed_loads}")
print(f"Tables with updates: {updated_tables}")
print(f"Total records processed: {total_records:,}")
print(f"Execution Date: {execution_date}")
print("=" * 60)

if updated_tables > 0:
    print("\nTables with incremental updates:")
    for result in results:
        if result["status"] == "success" and result["record_count"] > 0:
            print(f"  - {result['silver_table']}: {result['record_count']:,} records")

for result in results:
    if result["status"] == "success" and result["record_count"] > 0:
        try:
            spark.sql(f"OPTIMIZE {result['silver_table']}")
        except Exception as e:
            print(f"✗ Error optimizing {result['silver_table']}: {e}")

StatementMeta(, 9cc54385-f449-4f4e-b6f3-310c3a7da6b1, 25, Finished, Available, Finished)

SILVER INCREMENTAL LOAD SUMMARY:
Successful processes: 14
Failed processes: 0
Tables with updates: 0
Total records processed: 0
Execution Date: 2025-09-18T23:14:10.292146
