# Caso 13: CDC e Ingesta Incremental

**Objetivo**: Aprender a implementar Change Data Capture (CDC) e ingesta incremental desde Oracle a Delta Lake.

## üìã Contenido

1. Setup inicial
2. Generar datos CDC sint√©ticos
3. Implementar pipeline incremental con deduplicaci√≥n
4. MERGE a Delta Lake
5. Validaci√≥n de resultados

---

## ‚ö†Ô∏è Prerequisitos

- Spark 3.x configurado
- Delta Lake instalado
- Datos de prueba generados

In [None]:
# Setup: Inicializar Spark Session con Delta Lake
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from delta.tables import DeltaTable

spark = SparkSession.builder \
    .appName("Caso 13: CDC Incremental") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.sql.adaptive.enabled", "true") \
    .getOrCreate()

print("‚úÖ Spark Session creado con Delta Lake")
print(f"   Version: {spark.version}")

## Paso 1: Generar Datos CDC Sint√©ticos

Vamos a simular datos de Change Data Capture como si vinieran de Oracle GoldenGate.

In [None]:
# Generar datos CDC sint√©ticos
from datetime import datetime, timedelta

# D√≠a 1: Initial load (1000 customers)
print("üìä Generando initial load (1000 customers)...")
initial = spark.range(1, 1001).selectExpr(
    "id as customer_id",
    "concat('Customer ', id) as name",
    "concat('customer', id, '@example.com') as email",
    "concat('+1-555-', lpad(id, 4, '0')) as phone",
    "'I' as op_type",
    "timestamp('2026-01-10 00:00:00') as op_ts"
)

initial.write \
    .mode("overwrite") \
    .parquet("/tmp/cdc_data/customers/date=2026-01-10")

print(f"   ‚úÖ Initial load: {initial.count():,} registros")

# D√≠a 2: Updates (100 customers) + Deletes (10 customers)
print("\nüìä Generando cambios d√≠a 2...")
updates = spark.range(1, 101).selectExpr(
    "id as customer_id",
    "concat('UPDATED Customer ', id) as name",
    "concat('customer', id, '_new@example.com') as email",
    "concat('+1-555-', lpad(id, 4, '0')) as phone",
    "'U' as op_type",
    "timestamp('2026-01-11 12:00:00') as op_ts"
)

deletes = spark.range(991, 1001).selectExpr(
    "id as customer_id",
    "cast(null as string) as name",
    "cast(null as string) as email",
    "cast(null as string) as phone",
    "'D' as op_type",
    "timestamp('2026-01-11 18:00:00') as op_ts"
)

day2_changes = updates.union(deletes)
day2_changes.write \
    .mode("overwrite") \
    .parquet("/tmp/cdc_data/customers/date=2026-01-11")

print(f"   ‚úÖ Updates: 100 registros")
print(f"   ‚úÖ Deletes: 10 registros")
print(f"   Total d√≠a 2: {day2_changes.count():,} cambios")

## Paso 2: Ver Estructura de Datos CDC

Los datos CDC incluyen:
- `customer_id`: ID del cliente
- `name`, `email`, `phone`: Datos del cliente
- `op_type`: Tipo de operaci√≥n (I=Insert, U=Update, D=Delete)
- `op_ts`: Timestamp de la operaci√≥n

In [None]:
# Ver datos CDC del d√≠a 2
cdc_day2 = spark.read.parquet("/tmp/cdc_data/customers/date=2026-01-11")

print("üìä Datos CDC - D√≠a 2:")
print(f"   Total registros: {cdc_day2.count()}")

print("\nüîç Distribuci√≥n por tipo de operaci√≥n:")
cdc_day2.groupBy("op_type").count().show()

print("\nüëÄ Ejemplos de cada tipo:")
print("\n   Updates (U):")
cdc_day2.filter("op_type = 'U'").show(3, truncate=False)

print("   Deletes (D):")
cdc_day2.filter("op_type = 'D'").show(3, truncate=False)

## Paso 3: Implementar Deduplicaci√≥n

En CDC real, es posible tener m√∫ltiples cambios del mismo registro en el mismo batch.
Debemos quedarnos con el cambio m√°s reciente.

In [None]:
# Deduplicaci√≥n usando Window Function
from pyspark.sql.window import Window

print("üîÑ Aplicando deduplicaci√≥n...")

# Crear window particionado por customer_id, ordenado por timestamp descendente
window_spec = Window.partitionBy("customer_id").orderBy(col("op_ts").desc())

# Agregar row_number y filtrar solo el primer registro
deduplicated = cdc_day2 \
    .withColumn("rn", row_number().over(window_spec)) \
    .filter(col("rn") == 1) \
    .drop("rn")

print(f"   Before dedup: {cdc_day2.count():,} registros")
print(f"   After dedup: {deduplicated.count():,} registros")
print(f"   ‚úÖ Eliminados {cdc_day2.count() - deduplicated.count()} duplicados")

# Guardar deduplicados
deduplicated.write \
    .mode("overwrite") \
    .parquet("/tmp/cdc_data/customers_dedup/date=2026-01-11")

print("\nüìä Distribuci√≥n despu√©s de dedup:")
deduplicated.groupBy("op_type").count().show()

## Paso 4: Crear Tabla Delta (Primera Carga)

Primero cargamos el initial load a Delta Lake.

In [None]:
# Leer initial load y crear tabla Delta
print("üì¶ Creando tabla Delta con initial load...")

initial_load = spark.read.parquet("/tmp/cdc_data/customers/date=2026-01-10") \
    .filter("op_type = 'I'") \
    .select(
        "customer_id",
        "name",
        "email",
        "phone",
        col("op_ts").alias("created_at"),
        col("op_ts").alias("updated_at")
    )

initial_load.write \
    .format("delta") \
    .mode("overwrite") \
    .save("/tmp/delta/customers")

print(f"   ‚úÖ Tabla Delta creada con {initial_load.count():,} registros")

# Verificar
delta_table = spark.read.format("delta").load("/tmp/delta/customers")
print(f"\nüìä Estado de tabla Delta:")
print(f"   Total registros: {delta_table.count():,}")
delta_table.show(5)

## Paso 5: MERGE - Aplicar Cambios Incrementales

Ahora aplicamos los cambios del d√≠a 2 usando MERGE de Delta Lake.

In [None]:
# Leer cambios deduplicados
print("üîÄ Ejecutando MERGE de cambios incrementales...")

cdc_changes = spark.read.parquet("/tmp/cdc_data/customers_dedup/date=2026-01-11")

# Obtener tabla Delta
target_table = DeltaTable.forPath(spark, "/tmp/delta/customers")

# MERGE operation
target_table.alias("target").merge(
    cdc_changes.alias("source"),
    "target.customer_id = source.customer_id"
).whenMatchedUpdate(
    condition="source.op_type = 'U'",
    set={
        "name": "source.name",
        "email": "source.email",
        "phone": "source.phone",
        "updated_at": "source.op_ts"
    }
).whenMatchedDelete(
    condition="source.op_type = 'D'"
).whenNotMatchedInsert(
    condition="source.op_type IN ('I', 'U')",
    values={
        "customer_id": "source.customer_id",
        "name": "source.name",
        "email": "source.email",
        "phone": "source.phone",
        "created_at": "source.op_ts",
        "updated_at": "source.op_ts"
    }
).execute()

print("   ‚úÖ MERGE completado")

# Verificar estado final
final_state = spark.read.format("delta").load("/tmp/delta/customers")
print(f"\nüìä Estado final de tabla Delta:")
print(f"   Total registros: {final_state.count():,}")
print(f"   Esperado: 990 (1000 initial - 10 deletes)")

## Paso 6: Validaci√≥n de Resultados

Verificar que los cambios se aplicaron correctamente.

In [None]:
# Validaci√≥n 1: Verificar UPDATEs
print("‚úÖ Validaci√≥n 1: Verificar registros actualizados")
print("   Buscando registros con 'UPDATED' en el nombre...")

updated_records = final_state.filter("name LIKE 'UPDATED%'")
print(f"   ‚úÖ Encontrados: {updated_records.count()} registros actualizados (esperado: 100)")
updated_records.show(5, truncate=False)

# Validaci√≥n 2: Verificar DELETEs
print("\n‚úÖ Validaci√≥n 2: Verificar que registros fueron eliminados")
deleted_ids = list(range(991, 1001))
remaining_deleted = final_state.filter(col("customer_id").isin(deleted_ids))
print(f"   Registros con IDs 991-1000: {remaining_deleted.count()} (esperado: 0)")
if remaining_deleted.count() == 0:
    print("   ‚úÖ DELETES aplicados correctamente")
else:
    print("   ‚ùå ERROR: Algunos registros no fueron eliminados")

# Validaci√≥n 3: Verificar emails actualizados
print("\n‚úÖ Validaci√≥n 3: Verificar nuevos emails")
new_emails = final_state.filter("email LIKE '%_new@%'")
print(f"   Emails con '_new@': {new_emails.count()} (esperado: 100)")

# Resumen
print("\n" + "="*60)
print("üìä RESUMEN DE VALIDACI√ìN")
print("="*60)
print(f"Total final de registros: {final_state.count():,}")
print(f"Registros actualizados: {updated_records.count()}")
print(f"Registros eliminados: 10")
print(f"Resultado: {'‚úÖ EXITOSO' if final_state.count() == 990 else '‚ùå ERROR'}")
print("="*60)

## Paso 7: Time Travel - Ver Historia de Cambios

Delta Lake permite ver versiones anteriores de la tabla.

In [None]:
# Ver historia de versiones
print("üïê Historia de versiones de la tabla:")
history = DeltaTable.forPath(spark, "/tmp/delta/customers").history()
history.select("version", "timestamp", "operation", "operationMetrics").show(truncate=False)

# Leer versi√≥n anterior (antes del MERGE)
print("\nüìä Versi√≥n 0 (Initial Load):")
version_0 = spark.read.format("delta").option("versionAsOf", 0).load("/tmp/delta/customers")
print(f"   Registros: {version_0.count():,}")

print("\nüìä Versi√≥n 1 (Despu√©s de MERGE):")
version_1 = spark.read.format("delta").option("versionAsOf", 1).load("/tmp/delta/customers")
print(f"   Registros: {version_1.count():,}")

print("\n‚úÖ Time Travel permite auditor√≠a completa de cambios")

## üéØ Conclusiones

### ‚úÖ Lo que Aprendimos

1. **Generar datos CDC** simulando Oracle GoldenGate
2. **Deduplicaci√≥n** usando Window Functions
3. **MERGE operation** en Delta Lake:
   - UPDATE cuando hay match y es 'U'
   - DELETE cuando hay match y es 'D'
   - INSERT cuando no hay match
4. **Validaci√≥n** de resultados
5. **Time Travel** para auditor√≠a

### üí° Best Practices

- ‚úÖ **Siempre deduplicar** datos CDC antes del MERGE
- ‚úÖ **Manejar DELETEs** expl√≠citamente
- ‚úÖ **Usar Time Travel** para validaci√≥n
- ‚úÖ **Idempotencia**: Pipeline debe ser re-ejecutable
- ‚úÖ **Particionar datos CDC** por fecha para eficiencia

### üîó Pr√≥ximos Pasos

- Ver **Caso 14**: Data Quality & Error Handling
- Ver **Caso 15**: Spark Structured Streaming (CDC real-time)
- Implementar en producci√≥n con Airflow (Caso 16)