# üè¶ Notebook 02: Bronze Layer - Change Data Capture (CDC)

## Capturando Cambios Incrementales en Delta Lake

---

### Objetivos de este notebook:
1. Entender c√≥mo funciona Change Data Feed (CDF) en Delta Lake
2. Simular operaciones de INSERT, UPDATE y DELETE
3. Leer el Change Data Feed en modo batch y streaming
4. Procesar cambios incrementales para la capa Silver

### Conceptos clave:
- **Change Data Feed (CDF)**: Caracter√≠stica de Delta Lake que registra cambios a nivel de fila
- **_change_type**: Columna metadata que indica el tipo de cambio (insert, update_preimage, update_postimage, delete)
- **_commit_version**: Versi√≥n del commit donde ocurri√≥ el cambio
- **_commit_timestamp**: Timestamp del commit

---

## 1Ô∏è‚É£ Configuraci√≥n Inicial

In [None]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from delta.tables import DeltaTable

# Configuraci√≥n
DATABASE_NAME = "financial_lakehouse"
spark.sql(f"USE {DATABASE_NAME}")

# Nombres de tablas
BRONZE_CLIENTES = "bronze_clientes"
BRONZE_CUENTAS = "bronze_cuentas"
BRONZE_TRANSACCIONES = "bronze_transacciones"

print(f"‚úÖ Usando base de datos: {DATABASE_NAME}")

## 2Ô∏è‚É£ Verificar CDC Habilitado

In [None]:
# ============================================
# VERIFICAR QUE CDC EST√Å HABILITADO
# ============================================

def check_cdf_enabled(table_name):
    """Verifica si Change Data Feed est√° habilitado en una tabla"""
    props = spark.sql(f"SHOW TBLPROPERTIES {table_name}")
    cdf_prop = props.filter(col("key") == "delta.enableChangeDataFeed").collect()
    
    if cdf_prop and cdf_prop[0]["value"] == "true":
        return True
    return False

for table in [BRONZE_CLIENTES, BRONZE_CUENTAS, BRONZE_TRANSACCIONES]:
    status = "‚úÖ Habilitado" if check_cdf_enabled(table) else "‚ùå No habilitado"
    print(f"{table}: CDC {status}")

In [None]:
# Si alguna tabla no tiene CDC habilitado, habilitarlo
def enable_cdf(table_name):
    """Habilita Change Data Feed en una tabla existente"""
    spark.sql(f"""
        ALTER TABLE {table_name} 
        SET TBLPROPERTIES ('delta.enableChangeDataFeed' = 'true')
    """)
    print(f"‚úÖ CDC habilitado en {table_name}")

# Ejemplo: enable_cdf(BRONZE_CLIENTES)

## 3Ô∏è‚É£ Simular Operaciones de Cambio

Vamos a simular diferentes operaciones para generar el Change Data Feed:
- **INSERT**: Nuevos clientes
- **UPDATE**: Cambios en datos de clientes existentes
- **DELETE**: Eliminaci√≥n de clientes

In [None]:
# ============================================
# OBTENER VERSI√ìN ACTUAL ANTES DE LOS CAMBIOS
# ============================================

# Obtener la versi√≥n actual de la tabla
history = spark.sql(f"DESCRIBE HISTORY {BRONZE_CLIENTES} LIMIT 1").collect()
version_before = history[0]["version"]
print(f"üìå Versi√≥n actual de {BRONZE_CLIENTES}: {version_before}")

### 3.1 Simular INSERTs (Nuevos Clientes)

In [None]:
# ============================================
# INSERTAR NUEVOS CLIENTES
# ============================================

nuevos_clientes = [
    ("CLI-99990001", "Roberto Nuevocliente", "roberto.nuevo@gmail.com", "+51 999000001", 
     "Av. Nueva 123", "Lima", "Per√∫", "15001", "1985-03-15", "M", "VIP", "ACTIVO",
     "2024-01-15 10:30:00", "CORE_BANKING", "INSERT"),
    ("CLI-99990002", "Carolina Cliente", "carolina.cliente@hotmail.com", "+51 999000002",
     "Jr. Reciente 456", "Arequipa", "Per√∫", "04001", "1990-07-22", "F", "PREMIUM", "ACTIVO",
     "2024-01-16 14:45:00", "CORE_BANKING", "INSERT"),
    ("CLI-99990003", "Fernando Fresco", "fernando.fresco@yahoo.com", "+51 999000003",
     "Calle Actual 789", "Trujillo", "Per√∫", "13001", "1978-11-30", "M", "CORPORATE", "ACTIVO",
     "2024-01-17 09:15:00", "MOBILE_APP", "INSERT")
]

schema = StructType([
    StructField("cliente_id", StringType(), False),
    StructField("nombre", StringType(), True),
    StructField("email", StringType(), True),
    StructField("telefono", StringType(), True),
    StructField("direccion", StringType(), True),
    StructField("ciudad", StringType(), True),
    StructField("pais", StringType(), True),
    StructField("codigo_postal", StringType(), True),
    StructField("fecha_nacimiento", StringType(), True),
    StructField("genero", StringType(), True),
    StructField("segmento_cliente", StringType(), True),
    StructField("estado", StringType(), True),
    StructField("fecha_registro", StringType(), True),
    StructField("fuente", StringType(), True),
    StructField("operacion", StringType(), True)
])

df_nuevos = spark.createDataFrame(nuevos_clientes, schema) \
    .withColumn("fecha_nacimiento", to_date(col("fecha_nacimiento"))) \
    .withColumn("fecha_registro", to_timestamp(col("fecha_registro"))) \
    .withColumn("fecha_ingesta", current_timestamp())

# Insertar nuevos clientes
df_nuevos.write.format("delta").mode("append").saveAsTable(BRONZE_CLIENTES)

print(f"‚úÖ Insertados {df_nuevos.count()} nuevos clientes")
df_nuevos.show(truncate=False)

### 3.2 Simular UPDATEs (Cambios en Clientes Existentes)

In [None]:
# ============================================
# ACTUALIZAR CLIENTES EXISTENTES
# ============================================

# Obtener DeltaTable para hacer MERGE/UPDATE
delta_clientes = DeltaTable.forName(spark, BRONZE_CLIENTES)

# Escenario 1: Cliente cambia de direcci√≥n (generar√° update_preimage y update_postimage)
# Actualizamos uno de los clientes que acabamos de insertar
spark.sql(f"""
    UPDATE {BRONZE_CLIENTES}
    SET 
        direccion = 'Av. Actualizada 999',
        ciudad = 'Cusco',
        codigo_postal = '08001',
        operacion = 'UPDATE',
        fecha_ingesta = current_timestamp()
    WHERE cliente_id = 'CLI-99990001'
""")

print("‚úÖ Cliente CLI-99990001 actualizado (cambio de direcci√≥n)")

# Escenario 2: Cliente cambia de segmento (upgrade a VIP)
spark.sql(f"""
    UPDATE {BRONZE_CLIENTES}
    SET 
        segmento_cliente = 'VIP',
        operacion = 'UPDATE',
        fecha_ingesta = current_timestamp()
    WHERE cliente_id = 'CLI-99990002'
""")

print("‚úÖ Cliente CLI-99990002 actualizado (upgrade a VIP)")

### 3.3 Simular DELETEs (Eliminaci√≥n de Clientes)

In [None]:
# ============================================
# ELIMINAR CLIENTE
# ============================================

# Soft delete (cambiar estado) - M√°s com√∫n en sistemas financieros
spark.sql(f"""
    UPDATE {BRONZE_CLIENTES}
    SET 
        estado = 'INACTIVO',
        operacion = 'SOFT_DELETE',
        fecha_ingesta = current_timestamp()
    WHERE cliente_id = 'CLI-99990003'
""")

print("‚úÖ Cliente CLI-99990003 marcado como INACTIVO (soft delete)")

# Hard delete (eliminar f√≠sicamente) - Menos com√∫n pero genera evento DELETE en CDF
# spark.sql(f"DELETE FROM {BRONZE_CLIENTES} WHERE cliente_id = 'CLI-99990003'")

## 4Ô∏è‚É£ Leer Change Data Feed (Modo Batch)

Ahora vamos a leer los cambios que acabamos de hacer usando el Change Data Feed.

In [None]:
# ============================================
# LEER CDF - MODO BATCH (POR VERSI√ìN)
# ============================================

# Leer cambios desde la versi√≥n que guardamos antes
changes_df = spark.read.format("delta") \
    .option("readChangeFeed", "true") \
    .option("startingVersion", version_before + 1) \
    .table(BRONZE_CLIENTES)

print(f"üìä Cambios capturados desde versi√≥n {version_before + 1}:")
print(f"   Total de registros de cambio: {changes_df.count()}")

# Mostrar los cambios con columnas metadata de CDC
changes_df.select(
    "cliente_id",
    "nombre",
    "direccion",
    "ciudad",
    "segmento_cliente",
    "estado",
    "_change_type",
    "_commit_version",
    "_commit_timestamp"
).orderBy("cliente_id", "_commit_version").show(truncate=False)

In [None]:
# ============================================
# ANALIZAR TIPOS DE CAMBIO
# ============================================

print("üìà Resumen de tipos de cambio:")
changes_df.groupBy("_change_type").count().show()

# Explicaci√≥n de los tipos de cambio:
print("""
üìö Tipos de cambio en Change Data Feed:

‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¨‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê
‚îÇ _change_type        ‚îÇ Descripci√≥n                                       ‚îÇ
‚îú‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îº‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î§
‚îÇ insert              ‚îÇ Nueva fila insertada                              ‚îÇ
‚îÇ update_preimage     ‚îÇ Valor ANTES de la actualizaci√≥n                   ‚îÇ
‚îÇ update_postimage    ‚îÇ Valor DESPU√âS de la actualizaci√≥n                 ‚îÇ
‚îÇ delete              ‚îÇ Fila eliminada                                    ‚îÇ
‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¥‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò
""")

In [None]:
# ============================================
# VER DETALLE DE UN UPDATE (PRE vs POST IMAGE)
# ============================================

print("üîÑ Detalle del UPDATE para cliente CLI-99990001:")

update_detail = changes_df.filter(
    (col("cliente_id") == "CLI-99990001") & 
    (col("_change_type").isin(["update_preimage", "update_postimage"]))
)

update_detail.select(
    "cliente_id",
    "direccion",
    "ciudad",
    "codigo_postal",
    "_change_type"
).show(truncate=False)

## 5Ô∏è‚É£ Leer Change Data Feed (Modo Streaming)

Para pipelines en tiempo real, podemos leer el CDF como un stream.

In [None]:
# ============================================
# LEER CDF - MODO STREAMING
# ============================================

# Configurar stream desde el CDF
stream_changes = spark.readStream.format("delta") \
    .option("readChangeFeed", "true") \
    .option("startingVersion", 0) \
    .table(BRONZE_CLIENTES)

print("‚úÖ Stream de CDC configurado")
print(f"üìä Schema del stream:")
stream_changes.printSchema()

In [None]:
# ============================================
# PROCESAR STREAM A TABLA DE AUDITOR√çA
# ============================================

# Crear tabla de auditor√≠a para CDC
spark.sql("""
    CREATE TABLE IF NOT EXISTS audit_cdc_clientes (
        cliente_id STRING,
        nombre STRING,
        email STRING,
        direccion STRING,
        ciudad STRING,
        segmento_cliente STRING,
        estado STRING,
        change_type STRING,
        commit_version LONG,
        commit_timestamp TIMESTAMP,
        processed_timestamp TIMESTAMP
    )
    USING DELTA
    CLUSTER BY (cliente_id, commit_timestamp)
""")

print("‚úÖ Tabla de auditor√≠a creada")

In [None]:
# Procesar el stream y escribir a la tabla de auditor√≠a
# NOTA: En un ambiente real, esto correr√≠a continuamente

audit_stream = stream_changes.select(
    col("cliente_id"),
    col("nombre"),
    col("email"),
    col("direccion"),
    col("ciudad"),
    col("segmento_cliente"),
    col("estado"),
    col("_change_type").alias("change_type"),
    col("_commit_version").alias("commit_version"),
    col("_commit_timestamp").alias("commit_timestamp"),
    current_timestamp().alias("processed_timestamp")
)

# Escribir con trigger once (procesar todo lo disponible y parar)
query = audit_stream.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/tmp/checkpoint/audit_cdc_clientes") \
    .trigger(availableNow=True) \
    .toTable("audit_cdc_clientes")

# Esperar a que termine
query.awaitTermination()

print("‚úÖ Stream procesado exitosamente")

In [None]:
# Ver el resultado en la tabla de auditor√≠a
print("üìã Tabla de Auditor√≠a CDC:")
spark.table("audit_cdc_clientes") \
    .orderBy(col("commit_version").desc(), "cliente_id") \
    .show(20, truncate=False)

## 6Ô∏è‚É£ Casos de Uso Pr√°cticos del CDC

### 6.1 Detectar Cambios Espec√≠ficos (Alertas)

In [None]:
# ============================================
# CASO 1: DETECTAR CAMBIOS DE SEGMENTO (ALERTAS)
# ============================================

# Identificar clientes que cambiaron de segmento
cambios_segmento = spark.read.format("delta") \
    .option("readChangeFeed", "true") \
    .option("startingVersion", version_before + 1) \
    .table(BRONZE_CLIENTES) \
    .filter(col("_change_type").isin(["update_preimage", "update_postimage"]))

# Comparar pre y post image para detectar cambios de segmento
pre_image = cambios_segmento.filter(col("_change_type") == "update_preimage") \
    .select(
        col("cliente_id"),
        col("segmento_cliente").alias("segmento_anterior"),
        col("_commit_version")
    )

post_image = cambios_segmento.filter(col("_change_type") == "update_postimage") \
    .select(
        col("cliente_id"),
        col("segmento_cliente").alias("segmento_nuevo"),
        col("nombre"),
        col("_commit_version")
    )

alertas_segmento = pre_image.join(
    post_image,
    ["cliente_id", "_commit_version"]
).filter(
    col("segmento_anterior") != col("segmento_nuevo")
)

print("üîî ALERTAS: Cambios de Segmento Detectados")
alertas_segmento.show(truncate=False)

In [None]:
# ============================================
# CASO 2: DETECTAR CAMBIOS DE DIRECCI√ìN (COMPLIANCE)
# ============================================

# Similar al anterior pero para direcci√≥n
pre_direccion = cambios_segmento.filter(col("_change_type") == "update_preimage") \
    .select(
        col("cliente_id"),
        col("direccion").alias("direccion_anterior"),
        col("ciudad").alias("ciudad_anterior"),
        col("_commit_version")
    )

post_direccion = cambios_segmento.filter(col("_change_type") == "update_postimage") \
    .select(
        col("cliente_id"),
        col("direccion").alias("direccion_nueva"),
        col("ciudad").alias("ciudad_nueva"),
        col("nombre"),
        col("_commit_version"),
        col("_commit_timestamp")
    )

cambios_direccion = pre_direccion.join(
    post_direccion,
    ["cliente_id", "_commit_version"]
).filter(
    (col("direccion_anterior") != col("direccion_nueva")) |
    (col("ciudad_anterior") != col("ciudad_nueva"))
)

print("üìç Cambios de Direcci√≥n (para Compliance KYC):")
cambios_direccion.show(truncate=False)

### 6.2 M√©tricas de Cambios

In [None]:
# ============================================
# M√âTRICAS DE CDC
# ============================================

# Leer todo el historial de CDC
all_changes = spark.read.format("delta") \
    .option("readChangeFeed", "true") \
    .option("startingVersion", 0) \
    .table(BRONZE_CLIENTES)

print("üìä M√©tricas de Change Data Feed:")
print("="*50)

# Cambios por tipo
print("\nüìà Cambios por tipo:")
all_changes.groupBy("_change_type").count() \
    .withColumnRenamed("count", "total") \
    .orderBy("_change_type").show()

# Cambios por versi√≥n
print("\nüìà Cambios por versi√≥n:")
all_changes.groupBy("_commit_version").count() \
    .withColumnRenamed("count", "total_cambios") \
    .orderBy("_commit_version").show(10)

# Timeline de cambios
print("\nüìà Timeline de cambios por hora:")
all_changes.withColumn(
    "hora", date_trunc("hour", col("_commit_timestamp"))
).groupBy("hora", "_change_type").count() \
    .orderBy("hora", "_change_type").show()

## 7Ô∏è‚É£ Funci√≥n Reutilizable para Procesar CDC

In [None]:
# ============================================
# FUNCI√ìN GEN√âRICA PARA PROCESAR CDC
# ============================================

def process_cdc_batch(
    source_table: str,
    start_version: int = None,
    end_version: int = None,
    start_timestamp: str = None,
    end_timestamp: str = None
):
    """
    Procesa Change Data Feed de una tabla Delta.
    
    Args:
        source_table: Nombre de la tabla fuente
        start_version: Versi√≥n inicial (opcional)
        end_version: Versi√≥n final (opcional)
        start_timestamp: Timestamp inicial (opcional)
        end_timestamp: Timestamp final (opcional)
        
    Returns:
        DataFrame con los cambios y columnas metadata de CDC
    """
    reader = spark.read.format("delta").option("readChangeFeed", "true")
    
    # Configurar rango por versi√≥n
    if start_version is not None:
        reader = reader.option("startingVersion", start_version)
    if end_version is not None:
        reader = reader.option("endingVersion", end_version)
    
    # Configurar rango por timestamp
    if start_timestamp is not None:
        reader = reader.option("startingTimestamp", start_timestamp)
    if end_timestamp is not None:
        reader = reader.option("endingTimestamp", end_timestamp)
    
    return reader.table(source_table)

def get_only_latest_changes(cdc_df, key_columns: list):
    """
    De un DataFrame de CDC, obtiene solo el estado m√°s reciente de cada registro.
    √ötil para sincronizaci√≥n de tablas.
    
    Args:
        cdc_df: DataFrame con datos de CDC
        key_columns: Lista de columnas que forman la clave
        
    Returns:
        DataFrame con solo el estado m√°s reciente de cada registro
    """
    from pyspark.sql.window import Window
    
    # Solo tomar inserts y post-images (estado actual)
    current_state = cdc_df.filter(
        col("_change_type").isin(["insert", "update_postimage"])
    )
    
    # Ordenar por versi√≥n y tomar el m√°s reciente
    window = Window.partitionBy(*key_columns).orderBy(col("_commit_version").desc())
    
    return current_state.withColumn(
        "rn", row_number().over(window)
    ).filter(col("rn") == 1).drop("rn")

print("‚úÖ Funciones de utilidad para CDC definidas")

In [None]:
# Ejemplo de uso de las funciones
cdc_data = process_cdc_batch(BRONZE_CLIENTES, start_version=0)
latest_state = get_only_latest_changes(cdc_data, ["cliente_id"])

print(f"üìä Total cambios en CDC: {cdc_data.count()}")
print(f"üìä Registros √∫nicos (estado actual): {latest_state.count()}")

# Mostrar algunos registros
latest_state.select(
    "cliente_id", "nombre", "segmento_cliente", "estado", "_change_type", "_commit_version"
).show(5, truncate=False)

## 8Ô∏è‚É£ Verificar Historial de la Tabla

In [None]:
# ============================================
# VER HISTORIAL COMPLETO
# ============================================

print(f"üìú Historial de {BRONZE_CLIENTES}:")
spark.sql(f"DESCRIBE HISTORY {BRONZE_CLIENTES}").select(
    "version",
    "timestamp",
    "operation",
    "operationParameters",
    "operationMetrics"
).show(truncate=False)

---

## ‚úÖ Resumen del Notebook

### Lo que aprendimos:

1. ‚úÖ **Habilitar CDC** en tablas Delta existentes
2. ‚úÖ **Simular operaciones** (INSERT, UPDATE, DELETE)
3. ‚úÖ **Leer Change Data Feed** en modo batch y streaming
4. ‚úÖ **Analizar los tipos de cambio** (insert, update_preimage, update_postimage, delete)
5. ‚úÖ **Casos de uso pr√°cticos**: Alertas, compliance, m√©tricas
6. ‚úÖ **Funciones reutilizables** para procesar CDC

### Columnas Metadata de CDC:

| Columna | Descripci√≥n |
|---------|-------------|
| `_change_type` | Tipo de cambio (insert, update_preimage, update_postimage, delete) |
| `_commit_version` | Versi√≥n del commit en Delta Lake |
| `_commit_timestamp` | Timestamp del commit |

### Pr√≥ximo paso:
Continuar con el **Notebook 03: Silver Layer - SCD** para implementar Slowly Changing Dimensions.

---