## 🎯 NIVEL 1: FUNDAMENTOS BÁSICOS
Ejercicio 1.1: Crear tu primer DataFrame

In [0]:
# Importar librerías necesarias
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

# Verificar la sesión de Spark (en Databricks ya está creada como 'spark')
print(f"Versión de Spark: {spark.version}")
# Crear un DataFrame simple desde listas
datos_empleados = [
    (1, "Juan", "Pérez", 30, "Ventas", 3000),
    (2, "María", "García", 25, "IT", 3500),
    (3, "Carlos", "López", 35, "Ventas", 3200),
    (4, "Ana", "Martínez", 28, "IT", 3800),
    (5, "Luis", "Rodríguez", 40, "RRHH", 3100)
]

columnas = ["id", "nombre", "apellido", "edad", "departamento", "salario"]

df_empleados = spark.createDataFrame(datos_empleados, columnas)

# Mostrar el DataFrame
df_empleados.show()

# Ver el esquema
df_empleados.printSchema()

# Información básica
print(f"Número de filas: {df_empleados.count()}")
print(f"Número de columnas: {len(df_empleados.columns)}")

Versión de Spark: 4.0.0
+---+------+---------+----+------------+-------+
| id|nombre| apellido|edad|departamento|salario|
+---+------+---------+----+------------+-------+
|  1|  Juan|    Pérez|  30|      Ventas|   3000|
|  2| María|   García|  25|          IT|   3500|
|  3|Carlos|    López|  35|      Ventas|   3200|
|  4|   Ana| Martínez|  28|          IT|   3800|
|  5|  Luis|Rodríguez|  40|        RRHH|   3100|
+---+------+---------+----+------------+-------+

root
 |-- id: long (nullable = true)
 |-- nombre: string (nullable = true)
 |-- apellido: string (nullable = true)
 |-- edad: long (nullable = true)
 |-- departamento: string (nullable = true)
 |-- salario: long (nullable = true)

Número de filas: 5
Número de columnas: 6


Ejercicio 1.2: Operaciones Básicas de Selección

In [0]:
# Seleccionar columnas específicas
df_empleados.select("nombre", "apellido", "salario").show()

# Seleccionar con alias
df_empleados.select(
    col("nombre"),
    col("salario").alias("salario_mensual"),
    (col("salario") * 12).alias("salario_anual")
).show()

# Filtrar registros
df_empleados.filter(col("edad") > 30).show()

# Múltiples condiciones
df_empleados.filter(
    (col("departamento") == "IT") & (col("salario") > 3500)
).show()

# Ordenar resultados
df_empleados.orderBy(col("salario").desc()).show()

+------+---------+-------+
|nombre| apellido|salario|
+------+---------+-------+
|  Juan|    Pérez|   3000|
| María|   García|   3500|
|Carlos|    López|   3200|
|   Ana| Martínez|   3800|
|  Luis|Rodríguez|   3100|
+------+---------+-------+

+------+---------------+-------------+
|nombre|salario_mensual|salario_anual|
+------+---------------+-------------+
|  Juan|           3000|        36000|
| María|           3500|        42000|
|Carlos|           3200|        38400|
|   Ana|           3800|        45600|
|  Luis|           3100|        37200|
+------+---------------+-------------+

+---+------+---------+----+------------+-------+
| id|nombre| apellido|edad|departamento|salario|
+---+------+---------+----+------------+-------+
|  3|Carlos|    López|  35|      Ventas|   3200|
|  5|  Luis|Rodríguez|  40|        RRHH|   3100|
+---+------+---------+----+------------+-------+

+---+------+--------+----+------------+-------+
| id|nombre|apellido|edad|departamento|salario|
+---+------+-

Ejercicio 1.3: Crear DataFrames con Esquemas Definidos

In [0]:
# Definir esquema explícitamente
schema_empleados = StructType([
    StructField("id", IntegerType(), False),
    StructField("nombre", StringType(), False),
    StructField("apellido", StringType(), False),
    StructField("edad", IntegerType(), True),
    StructField("departamento", StringType(), True),
    StructField("salario", DoubleType(), True)
])

df_con_esquema = spark.createDataFrame(datos_empleados, schema_empleados)
df_con_esquema.printSchema()

root
 |-- id: integer (nullable = false)
 |-- nombre: string (nullable = false)
 |-- apellido: string (nullable = false)
 |-- edad: integer (nullable = true)
 |-- departamento: string (nullable = true)
 |-- salario: double (nullable = true)



## 🎯 NIVEL 2: OPERACIONES CON DATAFRAMES
Ejercicio 2.1: Agregaciones y Agrupaciones

In [0]:
# Crear dataset más grande para agregaciones
from random import randint, choice, uniform
import builtins
departamentos = ["Ventas", "IT", "RRHH", "Marketing", "Finanzas"]
ciudades = ["Lima", "Arequipa", "Cusco", "Trujillo"]

datos_grandes = [
    (
        i,
        f"Empleado{i}",
        choice(departamentos),
        choice(ciudades),
        randint(22, 60),
        builtins.round(uniform(2500, 8000), 2),
        randint(1, 10)
    )
    for i in range(1, 1001)
]

columnas_grandes = ["id", "nombre", "departamento", "ciudad", "edad", "salario", "años_experiencia"]

df_grande = spark.createDataFrame(datos_grandes, columnas_grandes)

print(f"Dataset creado con {df_grande.count()} registros")
df_grande.show(10)

# Agregaciones básicas
df_grande.select(
    avg("salario").alias("salario_promedio"),
    max("salario").alias("salario_maximo"),
    min("salario").alias("salario_minimo"),
    sum("salario").alias("masa_salarial_total")
).show()

# Agrupaciones
df_grande.groupBy("departamento").agg(
    count("*").alias("num_empleados"),
    avg("salario").alias("salario_promedio"),
    max("salario").alias("salario_maximo")
).orderBy(col("salario_promedio").desc()).show()

# Agrupaciones múltiples
df_grande.groupBy("departamento", "ciudad").agg(
    count("*").alias("empleados"),
    avg("salario").alias("salario_promedio")
).orderBy("departamento", "ciudad").show()

Dataset creado con 1000 registros
+---+----------+------------+--------+----+-------+----------------+
| id|    nombre|departamento|  ciudad|edad|salario|años_experiencia|
+---+----------+------------+--------+----+-------+----------------+
|  1| Empleado1|        RRHH|   Cusco|  47|5420.13|               9|
|  2| Empleado2|      Ventas|    Lima|  52|4833.88|               6|
|  3| Empleado3|        RRHH|   Cusco|  44|7047.38|               1|
|  4| Empleado4|   Marketing|Trujillo|  32|5525.62|              10|
|  5| Empleado5|   Marketing|   Cusco|  51|3291.58|               4|
|  6| Empleado6|    Finanzas|Arequipa|  22|6054.55|               9|
|  7| Empleado7|    Finanzas|Arequipa|  41|4650.14|               3|
|  8| Empleado8|    Finanzas|Arequipa|  47|3956.11|               8|
|  9| Empleado9|    Finanzas|   Cusco|  37|7054.85|              10|
| 10|Empleado10|   Marketing|   Cusco|  50|5319.26|               9|
+---+----------+------------+--------+----+-------+----------------+


Ejercicio 2.2: Joins (Uniones)

In [0]:
# Crear tabla de bonos
datos_bonos = [
    ("Ventas", 500),
    ("IT", 800),
    ("Marketing", 600),
    ("Finanzas", 700)
]

df_bonos = spark.createDataFrame(datos_bonos, ["departamento", "bono_anual"])

# Inner Join
df_con_bonos = df_grande.join(df_bonos, "departamento", "inner")
df_con_bonos.select("nombre", "departamento", "salario", "bono_anual").show(10)

# Left Join (muestra RRHH aunque no tenga bono)
df_left = df_grande.join(df_bonos, "departamento", "left")
df_left.select("nombre", "departamento", "salario", "bono_anual").show(10)

# Calcular compensación total
df_compensacion = df_con_bonos.withColumn(
    "compensacion_total",
    col("salario") * 12 + col("bono_anual")
)

df_compensacion.select(
    "nombre", "departamento", "salario", "bono_anual", "compensacion_total"
).show(10)

Ejercicio 2.3: Window Functions (Funciones de Ventana)

In [0]:
from pyspark.sql.window import Window

# Ranking de salarios por departamento
ventana_dept = Window.partitionBy("departamento").orderBy(col("salario").desc())

df_ranking = df_grande.withColumn(
    "ranking_dept",
    rank().over(ventana_dept)
).withColumn(
    "row_number_dept",
    row_number().over(ventana_dept)
)

df_ranking.filter(col("ranking_dept") <= 3).select(
    "departamento", "nombre", "salario", "ranking_dept"
).orderBy("departamento", "ranking_dept").show(20)

# Calcular diferencia con el salario promedio del departamento
ventana_dept_sin_orden = Window.partitionBy("departamento")

df_comparacion = df_grande.withColumn(
    "salario_promedio_dept",
    avg("salario").over(ventana_dept_sin_orden)
).withColumn(
    "diferencia_vs_promedio",
    col("salario") - col("salario_promedio_dept")
)

df_comparacion.select(
    "departamento", "nombre", "salario", 
    "salario_promedio_dept", "diferencia_vs_promedio"
).show(10)

# Running total (suma acumulativa)
ventana_running = Window.partitionBy("departamento").orderBy("id").rowsBetween(Window.unboundedPreceding, Window.currentRow)

df_running = df_grande.withColumn(
    "suma_acumulativa_salarios",
    sum("salario").over(ventana_running)
)

df_running.select("departamento", "id", "salario", "suma_acumulativa_salarios").show(15)

## 🎯 NIVEL 3: TRANSFORMACIONES AVANZADAS
Ejercicio 3.1: Trabajar con Fechas y Timestamps

In [0]:
from datetime import datetime, timedelta
from random import randint

# Crear dataset con fechas
fecha_inicio = datetime(2020, 1, 1)

datos_transacciones = [
    (
        i,
        f"CLI{randint(1, 100)}",
        fecha_inicio + timedelta(days=randint(0, 1500)),
        round(uniform(10, 5000), 2),
        choice(["Completada", "Pendiente", "Cancelada"])
    )
    for i in range(1, 5001)
]

df_transacciones = spark.createDataFrame(
    datos_transacciones,
    ["id_transaccion", "cliente_id", "fecha", "monto", "estado"]
)

df_transacciones.show(10)

# Extraer componentes de fecha
df_fechas = df_transacciones.withColumn("año", year("fecha")) \
    .withColumn("mes", month("fecha")) \
    .withColumn("dia", dayofmonth("fecha")) \
    .withColumn("dia_semana", dayofweek("fecha")) \
    .withColumn("nombre_dia", date_format("fecha", "EEEE")) \
    .withColumn("trimestre", quarter("fecha"))

df_fechas.select("fecha", "año", "mes", "dia", "nombre_dia", "trimestre").show(10)

# Análisis temporal
df_transacciones.filter(col("estado") == "Completada") \
    .withColumn("año", year("fecha")) \
    .withColumn("mes", month("fecha")) \
    .groupBy("año", "mes") \
    .agg(
        count("*").alias("num_transacciones"),
        sum("monto").alias("ventas_totales"),
        avg("monto").alias("ticket_promedio")
    ) \
    .orderBy("año", "mes") \
    .show(20)

Ejercicio 3.2: Manejo de Valores Nulos y Limpieza de Datos

In [0]:
# Crear dataset con valores nulos
datos_con_nulos = [
    (1, "Juan", 30, 3000, "Lima"),
    (2, "María", None, 3500, "Cusco"),
    (3, "Carlos", 35, None, "Lima"),
    (4, None, 28, 3800, None),
    (5, "Luis", 40, 3100, "Arequipa"),
    (6, "Ana", None, None, "Lima")
]

df_nulos = spark.createDataFrame(
    datos_con_nulos,
    ["id", "nombre", "edad", "salario", "ciudad"]
)

print("Dataset original con nulos:")
df_nulos.show()

# Contar nulos por columna
from pyspark.sql.functions import col, count, when, isnan

df_nulos.select([
    count(when(col(c).isNull(), c)).alias(c) 
    for c in df_nulos.columns
]).show()

# Eliminar filas con cualquier nulo
df_sin_nulos = df_nulos.dropna()
print(f"Filas después de dropna(): {df_sin_nulos.count()}")

# Eliminar filas solo si todas las columnas son nulas
df_sin_todos_nulos = df_nulos.dropna(how='all')
print(f"Filas después de dropna(how='all'): {df_sin_todos_nulos.count()}")

# Rellenar nulos con valores específicos
df_rellenado = df_nulos.fillna({
    "nombre": "Desconocido",
    "edad": 0,
    "salario": 0.0,
    "ciudad": "No especificada"
})

print("Dataset con nulos rellenados:")
df_rellenado.show()

# Rellenar con el promedio (para columnas numéricas)
promedio_salario = df_nulos.select(avg("salario")).collect()[0][0]
promedio_edad = df_nulos.select(avg("edad")).collect()[0][0]

df_con_promedios = df_nulos.fillna({
    "salario": promedio_salario,
    "edad": int(promedio_edad)
})

df_con_promedios.show()

Ejercicio 3.3: Expresiones Regulares y Manipulación de Strings

In [0]:
# Crear dataset con textos
datos_productos = [
    (1, "  iPhone 14 Pro - 256GB  ", "Apple-2023-TECH", "tech@apple.com"),
    (2, "Samsung Galaxy S23", "SAMSUNG_2023_mobile", "contact@samsung.co.kr"),
    (3, "laptop  DELL XPS 15", "dell-2023-computers", "support@dell.com"),
    (4, "AirPods Pro 2nd Gen", "apple-2023-audio", "SALES@APPLE.COM"),
    (5, "Sony WH-1000XM5", "sony_2023_audio", "info@sony.jp")
]

df_productos = spark.createDataFrame(
    datos_productos,
    ["id", "nombre", "codigo", "email"]
)

print("Dataset original:")
df_productos.show(truncate=False)

# Limpiar espacios
df_limpio = df_productos.withColumn(
    "nombre_limpio",
    trim(col("nombre"))
).withColumn(
    "nombre_upper",
    upper(col("nombre"))
).withColumn(
    "nombre_lower",
    lower(col("nombre"))
)

df_limpio.select("nombre", "nombre_limpio", "nombre_upper", "nombre_lower").show(truncate=False)

# Extraer información con expresiones regulares
df_extraido = df_productos.withColumn(
    "marca",
    regexp_extract(col("codigo"), r"^([a-zA-Z]+)", 1)
).withColumn(
    "año",
    regexp_extract(col("codigo"), r"(\d{4})", 1)
).withColumn(
    "categoria",
    regexp_extract(col("codigo"), r"[_-]([a-zA-Z]+)$", 1)
)

df_extraido.select("codigo", "marca", "año", "categoria").show(truncate=False)

# Validar emails
df_validado = df_productos.withColumn(
    "email_valido",
    when(
        col("email").rlike(r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"),
        "Válido"
    ).otherwise("Inválido")
)

df_validado.select("email", "email_valido").show(truncate=False)

# Reemplazar patrones
df_normalizado = df_productos.withColumn(
    "codigo_normalizado",
    regexp_replace(col("codigo"), r"[_-]", " ")
)

df_normalizado.select("codigo", "codigo_normalizado").show(truncate=False)

## 🎯 NIVEL 4: OPTIMIZACIÓN Y PERFORMANCE
Ejercicio 4.1: Particionamiento y Cache

In [0]:
# Crear un dataset más grande
datos_ventas = [ 
    (
        i,
        choice(["2023", "2024", "2025"]),
        choice(["Q1", "Q2", "Q3", "Q4"]),
        choice(["Producto A", "Producto B", "Producto C", "Producto D"]),
        choice(["Norte", "Sur", "Este", "Oeste"]),
        builtins.round(uniform(100, 10000), 2)
    )
    for i in range(1, 100001)
]

df_ventas = spark.createDataFrame(
    datos_ventas,
    ["id", "año", "trimestre", "producto", "region", "ventas"]
)

print(f"Dataset de ventas creado: {df_ventas.count()} registros")

# Ver el plan de ejecución
print("\n=== Plan de ejecución sin optimizar ===")
df_ventas.filter(col("año") == "2024").explain()

# Hacer cache del DataFrame
df_ventas.cache()
df_ventas.count()  # Materializar el cache

# Comparar performance
import time

# Sin cache (aunque ya hicimos cache, lo mostramos conceptualmente)
start = time.time()
result1 = df_ventas.filter(col("año") == "2024").count()
time1 = time.time() - start
print(f"Primera consulta: {result1} registros en {time1:.3f} segundos")

# Con cache (segunda ejecución es más rápida)
start = time.time()
result2 = df_ventas.filter(col("año") == "2024").count()
time2 = time.time() - start
print(f"Segunda consulta (con cache): {result2} registros en {time2:.3f} segundos")

# Liberar cache
df_ventas.unpersist()

Ejercicio 4.2: Broadcast Joins para Tablas Pequeñas

In [0]:
from pyspark.sql.functions import broadcast

# Tabla grande de transacciones
datos_transacciones_grandes = [
    (i, randint(1, 100), round(uniform(10, 1000), 2))
    for i in range(1, 50001)
]

df_transacciones_grandes = spark.createDataFrame(
    datos_transacciones_grandes,
    ["id_transaccion", "id_producto", "monto"]
)

# Tabla pequeña de productos (candidata para broadcast)
datos_productos_catalogo = [
    (i, f"Producto {i}", choice(["Electrónica", "Ropa", "Alimentos", "Hogar"]))
    for i in range(1, 101)
]

df_catalogo = spark.createDataFrame(
    datos_productos_catalogo,
    ["id_producto", "nombre_producto", "categoria"]
)

# Join normal
print("=== Join Normal ===")
df_join_normal = df_transacciones_grandes.join(
    df_catalogo,
    "id_producto"
)
df_join_normal.explain()

# Broadcast Join (más eficiente para tablas pequeñas)
print("\n=== Broadcast Join ===")
df_join_broadcast = df_transacciones_grandes.join(
    broadcast(df_catalogo),
    "id_producto"
)
df_join_broadcast.explain()

# Verificar resultados
df_join_broadcast.groupBy("categoria").agg(
    count("*").alias("transacciones"),
    sum("monto").alias("ventas_totales")
).show()

Ejercicio 4.3: Reparticionamiento

In [0]:
# Ver particiones actuales
print(f"Número de particiones: {df_ventas.rdd.getNumPartitions()}")

# Reparticionar por columna (útil para operaciones por grupo)
df_reparticionado = df_ventas.repartition(8, "año", "trimestre")
print(f"Particiones después de repartition: {df_reparticionado.rdd.getNumPartitions()}")

# Coalesce (reducir particiones sin shuffle completo)
df_reducido = df_ventas.coalesce(2)
print(f"Particiones después de coalesce: {df_reducido.rdd.getNumPartitions()}")

# Análisis por partición
df_reparticionado.groupBy("año", "trimestre").agg(
    sum("ventas").alias("ventas_totales")
).show()

## 🎯 NIVEL 6: TÉCNICAS AVANZADAS
Ejercicio 6.1: Pivot y Unpivot

In [0]:
# Crear datos de ventas mensuales
datos_ventas_mes = [
    ("Norte", "Enero", 10000),
    ("Norte", "Febrero", 12000),
    ("Norte", "Marzo", 11000),
    ("Sur", "Enero", 8000),
    ("Sur", "Febrero", 9000),
    ("Sur", "Marzo", 8500),
    ("Este", "Enero", 15000),
    ("Este", "Febrero", 16000),
    ("Este", "Marzo", 17000),
]

df_ventas_mes = spark.createDataFrame(
    datos_ventas_mes,
    ["region", "mes", "ventas"]
)

print("Datos originales:")
df_ventas_mes.show()

# PIVOT: Convertir filas en columnas
df_pivot = df_ventas_mes.groupBy("region").pivot("mes").sum("ventas")

print("\nDatos con PIVOT (meses como columnas):")
df_pivot.show()

# UNPIVOT: Convertir columnas en filas
from pyspark.sql.functions import expr

df_unpivot = df_pivot.selectExpr(
    "region",
    "stack(3, 'Enero', Enero, 'Febrero', Febrero, 'Marzo', Marzo) as (mes, ventas)"
)

print("\nDatos con UNPIVOT (volver al formato original):")
df_unpivot.show()

# Pivot con múltiples agregaciones
df_pivot_multiple = df_ventas_mes.groupBy("region").pivot("mes").agg(
    sum("ventas").alias("total"),
    count("*").alias("transacciones")
)

df_pivot_multiple.show()

## 🎯 NIVEL 7: DELTA LAKE - FUNDAMENTOS
Ejercicio 7.1: Crear tu Primera Delta Table

In [0]:
# Limpiar directorios anteriores si existen
dbutils.fs.rm("/tmp/delta_tutorial", True)

# Crear datos iniciales
from pyspark.sql.functions import *
from random import randint, choice, uniform
from datetime import datetime, timedelta

# Dataset de clientes
datos_clientes = [
    (i, f"Cliente_{i}", choice(["Lima", "Cusco", "Arequipa", "Trujillo"]), 
     round(uniform(1000, 50000), 2), datetime(2024, 1, 1) + timedelta(days=randint(0, 300)))
    for i in range(1, 1001)
]

df_clientes = spark.createDataFrame(
    datos_clientes,
    ["cliente_id", "nombre", "ciudad", "valor_total", "fecha_registro"]
)

print("Dataset inicial de clientes:")
df_clientes.show(10)

# Escribir como Delta Table
ruta_delta_clientes = "/tmp/delta_tutorial/clientes"

df_clientes.write.format("delta").mode("overwrite").save(ruta_delta_clientes)

print(f"✓ Delta Table creada en: {ruta_delta_clientes}")

# Leer Delta Table
df_delta_clientes = spark.read.format("delta").load(ruta_delta_clientes)

print(f"Registros en Delta Table: {df_delta_clientes.count()}")
df_delta_clientes.show(5)

# Ver el historial de la tabla (versiones)
from delta.tables import DeltaTable

delta_table = DeltaTable.forPath(spark, ruta_delta_clientes)

print("\n=== HISTORIAL DE VERSIONES ===")
delta_table.history().select("version", "timestamp", "operation", "operationMetrics").show(truncate=False)

Ejercicio 7.2: Crear Delta Table Administrada (Managed Table)

In [0]:
# Crear tabla administrada (en el metastore de Databricks)
df_clientes.write.format("delta").mode("overwrite").saveAsTable("clientes_delta")

print("✓ Tabla administrada 'clientes_delta' creada")

# Leer desde tabla administrada
df_managed = spark.table("clientes_delta")
df_managed.show(5)

# Ver ubicación de la tabla administrada
spark.sql("DESCRIBE EXTENDED clientes_delta").filter(col("col_name") == "Location").show(truncate=False)

# Listar todas las tablas delta
print("\n=== TABLAS DELTA DISPONIBLES ===")
spark.sql("SHOW TABLES").show()

## 🎯 NIVEL 8: TIME TRAVEL
Ejercicio 8.1: Time Travel Básico

In [0]:
print("=== VERSIÓN 0 (INICIAL) ===")
df_delta_clientes.groupBy("ciudad").count().show()

# Realizar UPDATE (versión 1)
from delta.tables import DeltaTable

delta_table = DeltaTable.forPath(spark, ruta_delta_clientes)

delta_table.update(
    condition = "ciudad = 'Lima'",
    set = {"valor_total": "valor_total * 1.10"}  # Incremento 10% para Lima
)

print("\n=== VERSIÓN 1 (DESPUÉS DE UPDATE) ===")
df_v1 = spark.read.format("delta").load(ruta_delta_clientes)
df_v1.filter(col("ciudad") == "Lima").select("cliente_id", "nombre", "ciudad", "valor_total").show(5)

# Realizar DELETE (versión 2)
delta_table.delete("valor_total < 2000")

print("\n=== VERSIÓN 2 (DESPUÉS DE DELETE) ===")
df_v2 = spark.read.format("delta").load(ruta_delta_clientes)
print(f"Registros después de DELETE: {df_v2.count()}")

# Insertar nuevos registros (versión 3)
nuevos_clientes = [
    (1001, "Cliente_Nuevo_1", "Lima", 25000.00, datetime.now()),
    (1002, "Cliente_Nuevo_2", "Cusco", 30000.00, datetime.now()),
]

df_nuevos = spark.createDataFrame(
    nuevos_clientes,
    ["cliente_id", "nombre", "ciudad", "valor_total", "fecha_registro"]
)

df_nuevos.write.format("delta").mode("append").save(ruta_delta_clientes)

print("\n=== VERSIÓN 3 (DESPUÉS DE INSERT) ===")
df_v3 = spark.read.format("delta").load(ruta_delta_clientes)
print(f"Total de registros: {df_v3.count()}")

# Ver todas las versiones en el historial
print("\n=== HISTORIAL COMPLETO ===")
delta_table.history().select(
    "version", "timestamp", "operation", 
    col("operationMetrics.numOutputRows").alias("rows_affected")
).show(truncate=False)

# TIME TRAVEL: Leer versión específica por número
print("\n=== TIME TRAVEL: Leer Versión 0 ===")
df_version_0 = spark.read.format("delta").option("versionAsOf", 0).load(ruta_delta_clientes)
print(f"Registros en versión 0: {df_version_0.count()}")
df_version_0.groupBy("ciudad").count().show()

print("\n=== TIME TRAVEL: Leer Versión 1 ===")
df_version_1 = spark.read.format("delta").option("versionAsOf", 1).load(ruta_delta_clientes)
print(f"Registros en versión 1: {df_version_1.count()}")

print("\n=== TIME TRAVEL: Leer Versión 2 ===")
df_version_2 = spark.read.format("delta").option("versionAsOf", 2).load(ruta_delta_clientes)
print(f"Registros en versión 2: {df_version_2.count()}")

# TIME TRAVEL: Leer por timestamp
timestamp_v1 = delta_table.history().filter(col("version") == 1).select("timestamp").collect()[0][0]

print(f"\n=== TIME TRAVEL: Leer por Timestamp {timestamp_v1} ===")
df_by_timestamp = spark.read.format("delta").option("timestampAsOf", timestamp_v1).load(ruta_delta_clientes)
print(f"Registros: {df_by_timestamp.count()}")

Ejercicio 8.2: Restaurar Versiones Anteriores (RESTORE)

In [0]:
# Ver estado actual
print("=== ESTADO ACTUAL (Versión 3) ===")
df_actual = spark.read.format("delta").load(ruta_delta_clientes)
print(f"Total registros: {df_actual.count()}")

# RESTORE: Volver a la versión 0
print("\n=== RESTAURANDO A VERSIÓN 0 ===")
delta_table.restoreToVersion(0)

# Verificar restauración
df_restaurado = spark.read.format("delta").load(ruta_delta_clientes)
print(f"Registros después de RESTORE: {df_restaurado.count()}")

# Ver historial después del restore
print("\n=== HISTORIAL DESPUÉS DE RESTORE ===")
delta_table.history().select("version", "timestamp", "operation").show(10, truncate=False)

# Nota: RESTORE crea una nueva versión (versión 4 en este caso)
print("\n=== Leer Versión Actual (después de RESTORE) ===")
df_version_actual = spark.read.format("delta").load(ruta_delta_clientes)
df_version_actual.groupBy("ciudad").count().show()

## 🎯 NIVEL 9: OPTIMIZACIÓN DELTA LAKE
Ejercicio 9.1: OPTIMIZE - Compactación de Archivos

In [0]:
# Crear tabla con muchos archivos pequeños
print("=== CREANDO TABLA CON ARCHIVOS PEQUEÑOS ===")
ruta_delta_transacciones = "/tmp/delta_tutorial/transacciones"

# Escribir en múltiples lotes pequeños (simula escrituras incrementales)
for i in range(20):
    datos_batch = [
        (
            j + (i * 100),
            randint(1, 100),
            datetime(2024, 10, 1) + timedelta(hours=i),
            round(uniform(10, 1000), 2),
            choice(["COMPLETADA", "PENDIENTE", "CANCELADA"])
        )
        for j in range(1, 101)
    ]
    
    df_batch = spark.createDataFrame(
        datos_batch,
        ["transaccion_id", "cliente_id", "fecha", "monto", "estado"]
    )
    
    df_batch.write.format("delta").mode("append").save(ruta_delta_transacciones)

print("✓ Datos escritos en 20 lotes")

# Ver archivos antes de optimizar
files_antes = dbutils.fs.ls(ruta_delta_transacciones)
parquet_files_antes = [f for f in files_antes if f.name.endswith('.parquet')]
print(f"\nArchivos .parquet ANTES de OPTIMIZE: {len(parquet_files_antes)}")

# Ver tamaño de archivos
for f in parquet_files_antes[:5]:
    print(f"  {f.name}: {f.size / 1024:.2f} KB")

# OPTIMIZE: Compactar archivos pequeños
print("\n=== EJECUTANDO OPTIMIZE ===")
delta_table_trans = DeltaTable.forPath(spark, ruta_delta_transacciones)

result = delta_table_trans.optimize().executeCompaction()
print("✓ OPTIMIZE completado")

# Mostrar métricas de optimización
result.show(truncate=False)

# Ver archivos después de optimizar
files_despues = dbutils.fs.ls(ruta_delta_transacciones)
parquet_files_despues = [f for f in files_despues if f.name.endswith('.parquet')]
print(f"\nArchivos .parquet DESPUÉS de OPTIMIZE: {len(parquet_files_despues)}")

for f in parquet_files_despues[:5]:
    print(f"  {f.name}: {f.size / 1024:.2f} KB")

Ejercicio 9.2: Z-ORDERING - Optimización de Consultas

In [0]:
# Crear tabla para demostrar Z-ORDERING
ruta_delta_ventas = "/tmp/delta_tutorial/ventas_zorder"

datos_ventas_grande = [
    (
        i,
        choice(["Norte", "Sur", "Este", "Oeste", "Centro"]),
        choice(["Producto_A", "Producto_B", "Producto_C", "Producto_D", "Producto_E"]),
        datetime(2024, 1, 1) + timedelta(days=randint(0, 300)),
        round(uniform(10, 5000), 2)
    )
    for i in range(1, 50001)
]

df_ventas_grande = spark.createDataFrame(
    datos_ventas_grande,
    ["venta_id", "region", "producto", "fecha", "monto"]
)

df_ventas_grande.write.format("delta").mode("overwrite").save(ruta_delta_ventas)

print("✓ Tabla de ventas creada")

delta_table_ventas = DeltaTable.forPath(spark, ruta_delta_ventas)

# Consulta SIN Z-ORDERING
print("\n=== CONSULTA SIN Z-ORDERING ===")
spark.read.format("delta").load(ruta_delta_ventas) \
    .filter((col("region") == "Norte") & (col("producto") == "Producto_A")) \
    .explain()

# Aplicar Z-ORDERING en columnas frecuentemente filtradas
print("\n=== APLICANDO Z-ORDERING en 'region' y 'producto' ===")
result_zorder = delta_table_ventas.optimize().executeZOrderBy("region", "producto")
result_zorder.show(truncate=False)

print("✓ Z-ORDERING completado")

# Consulta CON Z-ORDERING
print("\n=== CONSULTA CON Z-ORDERING ===")
spark.read.format("delta").load(ruta_delta_ventas) \
    .filter((col("region") == "Norte") & (col("producto") == "Producto_A")) \
    .explain()

# Demostrar beneficio con estadísticas
print("\n=== COMPARACIÓN ===")
df_zorder = spark.read.format("delta").load(ruta_delta_ventas)
result = df_zorder.filter((col("region") == "Norte") & (col("producto") == "Producto_A")).count()
print(f"Registros encontrados: {result}")

Ejercicio 9.3: Data Skipping Statistics

In [0]:
# Ver estadísticas de la tabla
print("=== ESTADÍSTICAS DE LA TABLA ===")

# Describir detalles de la tabla
spark.sql(f"DESCRIBE DETAIL delta.`{ruta_delta_ventas}`").show(truncate=False)

# Ver historial con métricas
delta_table_ventas.history().select(
    "version", "timestamp", "operation",
    col("operationMetrics.numFiles").alias("num_files"),
    col("operationMetrics.numOutputRows").alias("output_rows"),
    col("operationMetrics.numOutputBytes").alias("output_bytes")
).show(truncate=False)

# Habilitar estadísticas extendidas (si están disponibles)
spark.sql(f"""
    ANALYZE TABLE delta.`{ruta_delta_ventas}` 
    COMPUTE STATISTICS FOR COLUMNS region, producto, fecha
""")

print("✓ Estadísticas computadas")

## 🎯 NIVEL 10: VACUUM - LIMPIEZA DE ARCHIVOS
Ejercicio 10.1: VACUUM Básico

In [0]:
# Ver archivos actuales
print("=== ESTADO ANTES DE VACUUM ===")
all_files = dbutils.fs.ls(ruta_delta_clientes)
print(f"Total de archivos en la carpeta: {len(all_files)}")

# Ver historial
print("\nHistorial de versiones:")
delta_table.history().select("version", "timestamp", "operation").show(truncate=False)

# VACUUM DRY RUN: Ver qué se eliminaría sin eliminar realmente
print("\n=== VACUUM DRY RUN ===")
result_dry = spark.sql(f"VACUUM delta.`{ruta_delta_clientes}` RETAIN 0 HOURS DRY RUN")
result_dry.show(truncate=False)

# Deshabilitar retención de 7 días (SOLO PARA DEMO - NO HACER EN PRODUCCIÓN)
spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", "false")

print("\n=== EJECUTANDO VACUUM (eliminando archivos antiguos) ===")
try:
    delta_table.vacuum(0)  # 0 horas de retención (solo para demo)
    print("✓ VACUUM completado")
except Exception as e:
    print(f"Error: {e}")

# Ver archivos después de VACUUM
all_files_after = dbutils.fs.ls(ruta_delta_clientes)
print(f"\nTotal de archivos después de VACUUM: {len(all_files_after)}")

# Intentar leer versiones antiguas (fallarán después de VACUUM)
print("\n=== INTENTANDO LEER VERSIÓN ANTIGUA ===")
try:
    df_old = spark.read.format("delta").option("versionAsOf", 1).load(ruta_delta_clientes)
    print(f"Versión 1 disponible: {df_old.count()} registros")
except Exception as e:
    print(f"❌ Error al leer versión antigua: {str(e)[:100]}")

# Restaurar configuración
spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", "true")

Ejercicio 10.2: VACUUM con Retención Configurable

In [0]:
# Crear nueva tabla para demostrar retención
ruta_delta_productos = "/tmp/delta_tutorial/productos"

datos_productos = [
    (i, f"Producto_{i}", round(uniform(10, 1000), 2), randint(0, 1000))
    for i in range(1, 501)
]

df_productos = spark.createDataFrame(
    datos_productos,
    ["producto_id", "nombre", "precio", "stock"]
)

df_productos.write.format("delta").mode("overwrite").save(ruta_delta_productos)

delta_table_productos = DeltaTable.forPath(spark, ruta_delta_productos)

# Hacer varios cambios
for i in range(5):
    delta_table_productos.update(
        condition = f"producto_id <= {(i+1)*100}",
        set = {"precio": "precio * 1.05"}
    )

print("✓ 5 actualizaciones realizadas")

# Ver historial
print("\n=== HISTORIAL ===")
delta_table_productos.history().select("version", "timestamp", "operation").show(truncate=False)

# VACUUM con retención de 168 horas (7 días) - valor recomendado en producción
print("\n=== VACUUM con retención de 168 horas (7 días) ===")
delta_table_productos.vacuum(168)

print("✓ VACUUM completado")
print("  Archivos más antiguos que 7 días eliminados")
print("  Time Travel disponible para versiones de los últimos 7 días")

## 🎯 NIVEL 11: MERGE - UPSERTS
Ejercicio 11.1: MERGE Básico (INSERT + UPDATE)

In [0]:
# Crear tabla destino
ruta_delta_inventario = "/tmp/delta_tutorial/inventario"

datos_inventario_inicial = [
    (1, "Laptop Dell", 10, 1200.00, datetime(2024, 10, 1)),
    (2, "Mouse Logitech", 50, 25.00, datetime(2024, 10, 1)),
    (3, "Teclado Mecánico", 30, 80.00, datetime(2024, 10, 1)),
    (4, "Monitor Samsung", 15, 300.00, datetime(2024, 10, 1)),
    (5, "Webcam HD", 20, 60.00, datetime(2024, 10, 1)),
]

df_inventario = spark.createDataFrame(
    datos_inventario_inicial,
    ["producto_id", "nombre", "cantidad", "precio", "ultima_actualizacion"]
)

df_inventario.write.format("delta").mode("overwrite").save(ruta_delta_inventario)

print("=== INVENTARIO INICIAL ===")
spark.read.format("delta").load(ruta_delta_inventario).show()

# Crear datos de actualización (algunos existen, otros son nuevos)
datos_actualizacion = [
    (2, "Mouse Logitech", 45, 25.00, datetime(2024, 10, 23)),  # UPDATE: cantidad cambia
    (4, "Monitor Samsung", 18, 280.00, datetime(2024, 10, 23)),  # UPDATE: cantidad y precio
    (6, "Auriculares Sony", 25, 150.00, datetime(2024, 10, 23)),  # INSERT: nuevo producto
    (7, "SSD 1TB", 40, 120.00, datetime(2024, 10, 23)),  # INSERT: nuevo producto
]

df_actualizacion = spark.createDataFrame(
    datos_actualizacion,
    ["producto_id", "nombre", "cantidad", "precio", "ultima_actualizacion"]
)

print("\n=== DATOS DE ACTUALIZACIÓN ===")
df_actualizacion.show()

# MERGE: Combinar datos (UPSERT)
delta_table_inventario = DeltaTable.forPath(spark, ruta_delta_inventario)

print("\n=== EJECUTANDO MERGE ===")
delta_table_inventario.alias("destino").merge(
    df_actualizacion.alias("origen"),
    "destino.producto_id = origen.producto_id"
).whenMatchedUpdate(
    set = {
        "cantidad": "origen.cantidad",
        "precio": "origen.precio",
        "ultima_actualizacion": "origen.ultima_actualizacion"
    }
).whenNotMatchedInsert(
    values = {
        "producto_id": "origen.producto_id",
        "nombre": "origen.nombre",
        "cantidad": "origen.cantidad",
        "precio": "origen.precio",
        "ultima_actualizacion": "origen.ultima_actualizacion"
    }
).execute()

print("✓ MERGE completado")

# Ver resultado
print("\n=== INVENTARIO DESPUÉS DE MERGE ===")
df_resultado = spark.read.format("delta").load(ruta_delta_inventario)
df_resultado.orderBy("producto_id").show()

# Ver métricas del merge
print("\n=== HISTORIAL DEL MERGE ===")
delta_table_inventario.history(1).select(
    "version", "operation", "operationMetrics"
).show(truncate=False)

Ejercicio 11.2: MERGE Condicional con DELETE

In [0]:
# Crear tabla de pedidos
ruta_delta_pedidos = "/tmp/delta_tutorial/pedidos"

datos_pedidos = [
    (1, 101, "PENDIENTE", 500.00, datetime(2024, 10, 1)),
    (2, 102, "ENVIADO", 750.00, datetime(2024, 10, 2)),
    (3, 103, "PENDIENTE", 300.00, datetime(2024, 10, 3)),
    (4, 104, "COMPLETADO", 1200.00, datetime(2024, 10, 4)),
    (5, 105, "PENDIENTE", 450.00, datetime(2024, 10, 5)),
]

df_pedidos = spark.createDataFrame(
    datos_pedidos,
    ["pedido_id", "cliente_id", "estado", "total", "fecha"]
)

df_pedidos.write.format("delta").mode("overwrite").save(ruta_delta_pedidos)

print("=== PEDIDOS INICIALES ===")
spark.read.format("delta").load(ruta_delta_pedidos).show()

# Actualizaciones: cambiar estados y cancelar algunos pedidos
datos_actualizacion_pedidos = [
    (1, 101, "ENVIADO", 500.00, datetime(2024, 10, 23)),  # PENDIENTE -> ENVIADO
    (3, 103, "CANCELADO", 300.00, datetime(2024, 10, 23)),  # PENDIENTE -> CANCELADO (eliminar)
    (5, 105, "ENVIADO", 450.00, datetime(2024, 10, 23)),  # PENDIENTE -> ENVIADO
    (6, 106, "PENDIENTE", 890.00, datetime(2024, 10, 23)),  # Nuevo pedido
]

df_actualizacion_pedidos = spark.createDataFrame(
    datos_actualizacion_pedidos,
    ["pedido_id", "cliente_id", "estado", "total", "fecha"]
)

print("\n=== ACTUALIZACIONES DE PEDIDOS ===")
df_actualizacion_pedidos.show()

# MERGE con DELETE condicional
delta_table_pedidos = DeltaTable.forPath(spark, ruta_delta_pedidos)

print("\n=== EJECUTANDO MERGE CON DELETE ===")
delta_table_pedidos.alias("destino").merge(
    df_actualizacion_pedidos.alias("origen"),
    "destino.pedido_id = origen.pedido_id"
).whenMatchedDelete(
    condition = "origen.estado = 'CANCELADO'"  # Eliminar pedidos cancelados
).whenMatchedUpdate(
    condition = "origen.estado != 'CANCELADO'",
    set = {
        "estado": "origen.estado",
        "fecha": "origen.fecha"
    }
).whenNotMatchedInsert(
    values = {
        "pedido_id": "origen.pedido_id",
        "cliente_id": "origen.cliente_id",
        "estado": "origen.estado",
        "total": "origen.total",
        "fecha": "origen.fecha"
    }
).execute()

print("✓ MERGE con DELETE completado")

# Ver resultado
print("\n=== PEDIDOS DESPUÉS DE MERGE ===")
df_resultado_pedidos = spark.read.format("delta").load(ruta_delta_pedidos)
df_resultado_pedidos.orderBy("pedido_id").show()

# Ver métricas
print("\n=== MÉTRICAS DEL MERGE ===")
delta_table_pedidos.history(1).select(
    "version", "operation", 
    col("operationMetrics.numTargetRowsInserted").alias("insertados"),
    col("operationMetrics.numTargetRowsUpdated").alias("actualizados"),
    col("operationMetrics.numTargetRowsDeleted").alias("eliminados")
).show(truncate=False)

Ejercicio 11.3: MERGE para Slowly Changing Dimension (SCD Type 2)

In [0]:
# Implementar SCD Type 2: mantener historial de cambios
ruta_delta_clientes_scd = "/tmp/delta_tutorial/clientes_scd"

# Tabla inicial con clientes (incluye columnas de versionado)
datos_clientes_scd = [
    (1, "Juan Pérez", "Lima", "juan@email.com", True, datetime(2024, 1, 1), None),
    (2, "María García", "Cusco", "maria@email.com", True, datetime(2024, 1, 1), None),
    (3, "Carlos López", "Arequipa", "carlos@email.com", True, datetime(2024, 1, 1), None),
]

df_clientes_scd = spark.createDataFrame(
    datos_clientes_scd,
    ["cliente_id", "nombre", "ciudad", "email", "es_actual", "fecha_inicio", "fecha_fin"]
)

df_clientes_scd.write.format("delta").mode("overwrite").save(ruta_delta_clientes_scd)

print("=== CLIENTES INICIALES (SCD Type 2) ===")
spark.read.format("delta").load(ruta_delta_clientes_scd).show(truncate=False)

# Cambios: Juan cambió de ciudad, María cambió email, nuevo cliente
datos_cambios = [
    (1, "Juan Pérez", "Trujillo", "juan@email.com"),  # Cambió ciudad
    (2, "María García", "Cusco", "maria.garcia@newemail.com"),  # Cambió email
    (4, "Ana Martínez", "Lima", "ana@email.com"),  # Cliente nuevo
]

df_cambios = spark.createDataFrame(
    datos_cambios,
    ["cliente_id", "nombre", "ciudad", "email"]
)

print("\n=== CAMBIOS A APLICAR ===")
df_cambios.show(truncate=False)

# Preparar datos para SCD Type 2
# 1. Marcar registros antiguos como no actuales
# 2. Insertar nuevos registros con la información actualizada

df_cambios_preparados = df_cambios.withColumn("es_actual", lit(True)) \
    .withColumn("fecha_inicio", lit(datetime(2024, 10, 23))) \
    .withColumn("fecha_fin", lit(None).cast("timestamp"))

delta_table_clientes_scd = DeltaTable.forPath(spark, ruta_delta_clientes_scd)

# Primero: Cerrar registros antiguos que cambiaron
print("\n=== PASO 1: Cerrando registros antiguos ===")
delta_table_clientes_scd.alias("destino").merge(
    df_cambios.alias("origen"),
    """destino.cliente_id = origen.cliente_id AND 
       destino.es_actual = true AND
       (destino.ciudad != origen.ciudad OR destino.email != origen.email)"""
).whenMatchedUpdate(
    set = {
        "es_actual": "false",
        "fecha_fin": "current_timestamp()"
    }
).execute()

# Segundo: Insertar nuevos registros (actualizados y nuevos clientes)
print("=== PASO 2: Insertando nuevos registros ===")
delta_table_clientes_scd.alias("destino").merge(
    df_cambios_preparados.alias("origen"),
    "destino.cliente_id = origen.cliente_id AND destino.es_actual = true"
).whenNotMatchedInsert(
    values = {
        "cliente_id": "origen.cliente_id",
        "nombre": "origen.nombre",
        "ciudad": "origen.ciudad",
        "email": "origen.email",
        "es_actual": "origen.es_actual",
        "fecha_inicio": "origen.fecha_inicio",
        "fecha_fin": "origen.fecha_fin"
    }
).execute()

print("✓ SCD Type 2 aplicado")

# Ver resultado: historial completo
print("\n=== HISTORIAL COMPLETO DE CLIENTES (SCD Type 2) ===")
df_scd_resultado = spark.read.format("delta").load(ruta_delta_clientes_scd)
df_scd_resultado.orderBy("cliente_id", "fecha_inicio").show(truncate=False)

# Ver solo clientes actuales
print("\n=== SOLO CLIENTES ACTUALES ===")
df_scd_resultado.filter(col("es_actual") == True).show(truncate=False)

# Ver historial de un cliente específico
print("\n=== HISTORIAL DEL CLIENTE 1 (Juan) ===")
df_scd_resultado.filter(col("cliente_id") == 1).orderBy("fecha_inicio").show(truncate=False)

## 🎯 NIVEL 12: CARGA INCREMENTAL Y STREAMING
Ejercicio 12.1: Carga Incremental con MERGE

In [0]:
# Simular carga incremental diaria
ruta_delta_ventas_diarias = "/tmp/delta_tutorial/ventas_diarias"

# Carga inicial
datos_dia_1 = [
    (1, 101, datetime(2024, 10, 21), 500.00, "COMPLETADA"),
    (2, 102, datetime(2024, 10, 21), 750.00, "COMPLETADA"),
    (3, 103, datetime(2024, 10, 21), 300.00, "COMPLETADA"),
]

In [0]:
df_dia_1 = spark.createDataFrame(
    datos_dia_1,
    ["venta_id", "cliente_id", "fecha", "monto", "estado"]
)

df_dia_1.write.format("delta").mode("overwrite").save(ruta_delta_ventas_diarias)

print("=== DÍA 1: CARGA INICIAL ===")
spark.read.format("delta").load(ruta_delta_ventas_diarias).show()

# Simular carga incremental día 2
datos_dia_2 = [
    (2, 102, datetime(2024, 10, 21), 800.00, "COMPLETADA"),  # Actualización de venta 2
    (4, 104, datetime(2024, 10, 22), 1200.00, "COMPLETADA"),  # Nueva venta
    (5, 105, datetime(2024, 10, 22), 450.00, "PENDIENTE"),  # Nueva venta
]

df_dia_2 = spark.createDataFrame(
    datos_dia_2,
    ["venta_id", "cliente_id", "fecha", "monto", "estado"]
)

print("\n=== DÍA 2: DATOS INCREMENTALES ===")
df_dia_2.show()

# MERGE incremental
delta_table_ventas_diarias = DeltaTable.forPath(spark, ruta_delta_ventas_diarias)

delta_table_ventas_diarias.alias("destino").merge(
    df_dia_2.alias("origen"),
    "destino.venta_id = origen.venta_id"
).whenMatchedUpdate(
    set = {
        "monto": "origen.monto",
        "estado": "origen.estado",
        "fecha": "origen.fecha"
    }
).whenNotMatchedInsert(
    values = {
        "venta_id": "origen.venta_id",
        "cliente_id": "origen.cliente_id",
        "fecha": "origen.fecha",
        "monto": "origen.monto",
        "estado": "origen.estado"
    }
).execute()

print("\n=== DESPUÉS DE CARGA DÍA 2 ===")
spark.read.format("delta").load(ruta_delta_ventas_diarias).orderBy("venta_id").show()

# Simular carga incremental día 3
datos_dia_3 = [
    (5, 105, datetime(2024, 10, 22), 450.00, "COMPLETADA"),  # Actualización: PENDIENTE -> COMPLETADA
    (6, 106, datetime(2024, 10, 23), 890.00, "COMPLETADA"),  # Nueva venta
    (7, 107, datetime(2024, 10, 23), 650.00, "COMPLETADA"),  # Nueva venta
]

df_dia_3 = spark.createDataFrame(
    datos_dia_3,
    ["venta_id", "cliente_id", "fecha", "monto", "estado"]
)

print("\n=== DÍA 3: DATOS INCREMENTALES ===")
df_dia_3.show()

delta_table_ventas_diarias.alias("destino").merge(
    df_dia_3.alias("origen"),
    "destino.venta_id = origen.venta_id"
).whenMatchedUpdate(
    set = {
        "monto": "origen.monto",
        "estado": "origen.estado",
        "fecha": "origen.fecha"
    }
).whenNotMatchedInsert(
    values = {
        "venta_id": "origen.venta_id",
        "cliente_id": "origen.cliente_id",
        "fecha": "origen.fecha",
        "monto": "origen.monto",
        "estado": "origen.estado"
    }
).execute()

print("\n=== DESPUÉS DE CARGA DÍA 3 ===")
spark.read.format("delta").load(ruta_delta_ventas_diarias).orderBy("venta_id").show()

# Ver métricas de todas las cargas
print("\n=== HISTORIAL DE CARGAS INCREMENTALES ===")
delta_table_ventas_diarias.history().select(
    "version", "timestamp", "operation",
    col("operationMetrics.numTargetRowsInserted").alias("insertados"),
    col("operationMetrics.numTargetRowsUpdated").alias("actualizados"),
    col("operationMetrics.numOutputRows").alias("total_rows")
).show(truncate=False)

Ejercicio 12.2: Streaming con Delta Lake (Structured Streaming)

In [0]:
# Crear carpeta para archivos de entrada (streaming source)
ruta_streaming_input = "/tmp/delta_tutorial/streaming_input"
ruta_streaming_checkpoint = "/tmp/delta_tutorial/streaming_checkpoint"
ruta_delta_streaming_output = "/tmp/delta_tutorial/streaming_output"

# Limpiar directorios previos
dbutils.fs.rm(ruta_streaming_input, True)
dbutils.fs.rm(ruta_streaming_checkpoint, True)
dbutils.fs.rm(ruta_delta_streaming_output, True)

# Crear directorio de entrada
dbutils.fs.mkdirs(ruta_streaming_input)

print("✓ Directorios de streaming creados")

# Escribir primer batch de datos
datos_batch_1 = [
    (1, "sensor_001", datetime(2024, 10, 23, 10, 0, 0), 25.5, 60.2),
    (2, "sensor_002", datetime(2024, 10, 23, 10, 0, 0), 26.1, 58.5),
    (3, "sensor_003", datetime(2024, 10, 23, 10, 0, 0), 24.8, 62.1),
]

df_batch_1 = spark.createDataFrame(
    datos_batch_1,
    ["id", "sensor_id", "timestamp", "temperatura", "humedad"]
)

df_batch_1.write.format("json").mode("overwrite").save(f"{ruta_streaming_input}/batch_1")

print("✓ Batch 1 escrito")

# Configurar streaming reader
schema_sensores = "id INT, sensor_id STRING, timestamp TIMESTAMP, temperatura DOUBLE, humedad DOUBLE"

stream_reader = spark.readStream \
    .format("json") \
    .schema(schema_sensores) \
    .option("maxFilesPerTrigger", 1) \
    .load(ruta_streaming_input)

print("✓ Stream reader configurado")

# Escribir a Delta Lake con streaming
print("\n=== INICIANDO STREAMING A DELTA LAKE ===")

query = stream_reader.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", ruta_streaming_checkpoint) \
    .trigger(processingTime="5 seconds") \
    .start(ruta_delta_streaming_output)

print("✓ Streaming query iniciado")

# Esperar a que procese el primer batch
import time
time.sleep(10)

# Ver datos procesados
print("\n=== DATOS DESPUÉS DEL BATCH 1 ===")
df_streaming_result = spark.read.format("delta").load(ruta_delta_streaming_output)
df_streaming_result.show()

# Escribir segundo batch
datos_batch_2 = [
    (4, "sensor_001", datetime(2024, 10, 23, 10, 5, 0), 25.8, 59.8),
    (5, "sensor_002", datetime(2024, 10, 23, 10, 5, 0), 26.5, 57.9),
    (6, "sensor_004", datetime(2024, 10, 23, 10, 5, 0), 23.9, 63.5),
]

df_batch_2 = spark.createDataFrame(
    datos_batch_2,
    ["id", "sensor_id", "timestamp", "temperatura", "humedad"]
)

df_batch_2.write.format("json").mode("overwrite").save(f"{ruta_streaming_input}/batch_2")

print("\n✓ Batch 2 escrito")

# Esperar procesamiento
time.sleep(10)

# Ver datos actualizados
print("\n=== DATOS DESPUÉS DEL BATCH 2 ===")
df_streaming_result = spark.read.format("delta").load(ruta_delta_streaming_output)
df_streaming_result.orderBy("timestamp").show()

# Ver estadísticas del streaming
print("\n=== ESTADÍSTICAS DE STREAMING ===")
print(f"Status: {query.status}")
print(f"Query ID: {query.id}")

# Detener el query
query.stop()
print("\n✓ Streaming query detenido")

# Ver historial de la tabla delta generada por streaming
delta_table_streaming = DeltaTable.forPath(spark, ruta_delta_streaming_output)
print("\n=== HISTORIAL DE LA TABLA ===")
delta_table_streaming.history().select("version", "timestamp", "operation", "operationMetrics").show(truncate=False)

Ejercicio 12.3: Deduplicación en Streaming

In [0]:
# Streaming con deduplicación usando watermark
ruta_streaming_dedup_input = "/tmp/delta_tutorial/streaming_dedup_input"
ruta_streaming_dedup_checkpoint = "/tmp/delta_tutorial/streaming_dedup_checkpoint"
ruta_delta_dedup_output = "/tmp/delta_tutorial/streaming_dedup_output"

# Limpiar
dbutils.fs.rm(ruta_streaming_dedup_input, True)
dbutils.fs.rm(ruta_streaming_dedup_checkpoint, True)
dbutils.fs.rm(ruta_delta_dedup_output, True)
dbutils.fs.mkdirs(ruta_streaming_dedup_input)

# Batch 1 con datos (algunos duplicados)
datos_batch_dup_1 = [
    (1, "user_001", "login", datetime(2024, 10, 23, 10, 0, 0)),
    (1, "user_001", "login", datetime(2024, 10, 23, 10, 0, 0)),  # Duplicado
    (2, "user_002", "purchase", datetime(2024, 10, 23, 10, 1, 0)),
    (3, "user_003", "logout", datetime(2024, 10, 23, 10, 2, 0)),
]

df_batch_dup_1 = spark.createDataFrame(
    datos_batch_dup_1,
    ["event_id", "user_id", "event_type", "event_time"]
)

df_batch_dup_1.write.format("json").mode("overwrite").save(f"{ruta_streaming_dedup_input}/batch_1")

print("✓ Batch con duplicados escrito")

# Configurar streaming con deduplicación
schema_eventos = "event_id INT, user_id STRING, event_type STRING, event_time TIMESTAMP"

stream_dedup = spark.readStream \
    .format("json") \
    .schema(schema_eventos) \
    .load(ruta_streaming_dedup_input)

# Aplicar watermark y deduplicación
stream_deduplicado = stream_dedup \
    .withWatermark("event_time", "1 minute") \
    .dropDuplicates(["event_id", "user_id"])

print("✓ Deduplicación configurada")

# Escribir a Delta
query_dedup = stream_deduplicado.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", ruta_streaming_dedup_checkpoint) \
    .trigger(processingTime="5 seconds") \
    .start(ruta_delta_dedup_output)

print("✓ Streaming con deduplicación iniciado")

time.sleep(10)

# Ver resultado (debe tener 3 registros, no 4)
print("\n=== DATOS DEDUPLICADOS ===")
df_dedup_result = spark.read.format("delta").load(ruta_delta_dedup_output)
print(f"Total de registros (esperado: 3): {df_dedup_result.count()}")
df_dedup_result.orderBy("event_time").show()

# Detener
query_dedup.stop()
print("✓ Query detenido")

## 🎯 NIVEL 13: CHANGE DATA FEED (CDF)
Ejercicio 13.1: Habilitar y Usar Change Data Feed

In [0]:
# Crear tabla con Change Data Feed habilitado
ruta_delta_cdf = "/tmp/delta_tutorial/clientes_cdf"

# Limpiar
dbutils.fs.rm(ruta_delta_cdf, True)

# Datos iniciales
datos_clientes_cdf = [
    (1, "Juan", "Lima", 5000.00, "ACTIVO"),
    (2, "María", "Cusco", 7500.00, "ACTIVO"),
    (3, "Carlos", "Arequipa", 3200.00, "ACTIVO"),
    (4, "Ana", "Trujillo", 9100.00, "ACTIVO"),
    (5, "Luis", "Lima", 4300.00, "ACTIVO"),
]

df_clientes_cdf = spark.createDataFrame(
    datos_clientes_cdf,
    ["cliente_id", "nombre", "ciudad", "saldo", "estado"]
)

# Escribir con CDF habilitado
df_clientes_cdf.write.format("delta") \
    .option("delta.enableChangeDataFeed", "true") \
    .mode("overwrite") \
    .save(ruta_delta_cdf)

print("✓ Tabla con Change Data Feed creada")

# Verificar que CDF está habilitado
print("\n=== PROPIEDADES DE LA TABLA ===")
spark.sql(f"DESCRIBE DETAIL delta.`{ruta_delta_cdf}`").select("format", "location", "properties").show(truncate=False)

# Ver datos iniciales (Versión 0)
print("\n=== VERSIÓN 0: DATOS INICIALES ===")
df_v0 = spark.read.format("delta").load(ruta_delta_cdf)
df_v0.show()

# Operación 1: UPDATE (Versión 1)
delta_table_cdf = DeltaTable.forPath(spark, ruta_delta_cdf)

delta_table_cdf.update(
    condition = "ciudad = 'Lima'",
    set = {"saldo": "saldo * 1.10"}
)

print("\n=== VERSIÓN 1: DESPUÉS DE UPDATE ===")
df_v1 = spark.read.format("delta").load(ruta_delta_cdf)
df_v1.show()

# Operación 2: INSERT (Versión 2)
datos_nuevos = [
    (6, "Pedro", "Piura", 6500.00, "ACTIVO"),
    (7, "Sofia", "Ica", 5200.00, "ACTIVO"),
]

df_nuevos = spark.createDataFrame(
    datos_nuevos,
    ["cliente_id", "nombre", "ciudad", "saldo", "estado"]
)

df_nuevos.write.format("delta").mode("append").save(ruta_delta_cdf)

print("\n=== VERSIÓN 2: DESPUÉS DE INSERT ===")
df_v2 = spark.read.format("delta").load(ruta_delta_cdf)
print(f"Total registros: {df_v2.count()}")

# Operación 3: DELETE (Versión 3)
delta_table_cdf.delete("saldo < 4000")

print("\n=== VERSIÓN 3: DESPUÉS DE DELETE ===")
df_v3 = spark.read.format("delta").load(ruta_delta_cdf)
print(f"Total registros: {df_v3.count()}")
df_v3.show()

# LEER CHANGE DATA FEED: Ver todos los cambios
print("\n=== CHANGE DATA FEED: TODOS LOS CAMBIOS ===")
df_changes_all = spark.read.format("delta") \
    .option("readChangeFeed", "true") \
    .option("startingVersion", 0) \
    .load(ruta_delta_cdf)

df_changes_all.select(
    "cliente_id", "nombre", "ciudad", "saldo", "estado",
    "_change_type", "_commit_version", "_commit_timestamp"
).orderBy("_commit_version", "cliente_id").show(50, truncate=False)

print("\n=== TIPOS DE CAMBIOS ===")
df_changes_all.groupBy("_change_type", "_commit_version").count().orderBy("_commit_version").show()

# Ver cambios entre versiones específicas
print("\n=== CAMBIOS ENTRE VERSIÓN 1 Y 2 ===")
df_changes_1_2 = spark.read.format("delta") \
    .option("readChangeFeed", "true") \
    .option("startingVersion", 1) \
    .option("endingVersion", 2) \
    .load(ruta_delta_cdf)

df_changes_1_2.select(
    "cliente_id", "nombre", "_change_type", "_commit_version"
).show()

# Ver solo inserciones
print("\n=== SOLO INSERCIONES ===")
df_changes_all.filter(col("_change_type") == "insert").select(
    "cliente_id", "nombre", "ciudad", "saldo", "_commit_version"
).show()

# Ver solo actualizaciones (muestra before y after)
print("\n=== SOLO ACTUALIZACIONES ===")
df_changes_all.filter(col("_change_type").isin(["update_preimage", "update_postimage"])).select(
    "cliente_id", "nombre", "saldo", "_change_type", "_commit_version"
).orderBy("cliente_id", "_change_type").show()

# Ver solo eliminaciones
print("\n=== SOLO ELIMINACIONES ===")
df_changes_all.filter(col("_change_type") == "delete").select(
    "cliente_id", "nombre", "ciudad", "saldo", "_commit_version"
).show()

Ejercicio 13.2: CDC con Timestamps

In [0]:
# Leer cambios por rango de tiempo
print("\n=== HISTORIAL DE VERSIONES CON TIMESTAMPS ===")
delta_table_cdf.history().select("version", "timestamp", "operation").show(truncate=False)

# Obtener timestamps
timestamp_v1 = delta_table_cdf.history().filter(col("version") == 1).select("timestamp").collect()[0][0]
timestamp_v3 = delta_table_cdf.history().filter(col("version") == 3).select("timestamp").collect()[0][0]

print(f"\nTimestamp Versión 1: {timestamp_v1}")
print(f"Timestamp Versión 3: {timestamp_v3}")

# Leer cambios por timestamp
print("\n=== CAMBIOS DESDE VERSIÓN 1 (POR TIMESTAMP) ===")
df_changes_by_time = spark.read.format("delta") \
    .option("readChangeFeed", "true") \
    .option("startingTimestamp", timestamp_v1.strftime("%Y-%m-%d %H:%M:%S")) \
    .load(ruta_delta_cdf)

df_changes_by_time.select(
    "cliente_id", "nombre", "_change_type", "_commit_version", "_commit_timestamp"
).orderBy("_commit_version", "cliente_id").show(truncate=False)

Ejercicio 13.3: CDF para Data Pipeline Incremental

In [0]:
# Caso de uso: Procesar solo cambios para actualizar data warehouse
print("\n=== PIPELINE INCREMENTAL CON CDF ===")

# Simular tabla destino (data warehouse)
ruta_delta_dw = "/tmp/delta_tutorial/clientes_dw"
dbutils.fs.rm(ruta_delta_dw, True)

# Primera carga completa (full load)
print("\n--- CARGA INICIAL COMPLETA ---")
df_initial = spark.read.format("delta").option("versionAsOf", 0).load(ruta_delta_cdf)
df_initial.write.format("delta").mode("overwrite").save(ruta_delta_dw)

print(f"Carga inicial completada: {df_initial.count()} registros")

# Guardar última versión procesada
ultima_version_procesada = 0

# Función para procesar cambios incrementales
def procesar_cambios_incrementales(version_inicial, version_final):
    print(f"\n--- PROCESANDO CAMBIOS: Versión {version_inicial} -> {version_final} ---")
    
    # Leer cambios
    df_changes = spark.read.format("delta") \
        .option("readChangeFeed", "true") \
        .option("startingVersion", version_inicial + 1) \
        .option("endingVersion", version_final) \
        .load(ruta_delta_cdf)
    
    # Contar cambios por tipo
    print("Cambios detectados:")
    df_changes.groupBy("_change_type").count().show()
    
    # Preparar datos para MERGE en el DW
    # Para updates: usar solo update_postimage (estado final)
    # Para inserts: usar las filas insert
    # Para deletes: usar las filas delete
    
    df_changes_to_apply = df_changes.filter(
        (col("_change_type") == "insert") |
        (col("_change_type") == "update_postimage") |
        (col("_change_type") == "delete")
    )
    
    if df_changes_to_apply.count() > 0:
        # Aplicar cambios al DW usando MERGE
        delta_table_dw = DeltaTable.forPath(spark, ruta_delta_dw)
        
        delta_table_dw.alias("destino").merge(
            df_changes_to_apply.alias("origen"),
            "destino.cliente_id = origen.cliente_id"
        ).whenMatchedDelete(
            condition = "origen._change_type = 'delete'"
        ).whenMatchedUpdate(
            condition = "origen._change_type = 'update_postimage'",
            set = {
                "nombre": "origen.nombre",
                "ciudad": "origen.ciudad",
                "saldo": "origen.saldo",
                "estado": "origen.estado"
            }
        ).whenNotMatchedInsert(
            condition = "origen._change_type = 'insert'",
            values = {
                "cliente_id": "origen.cliente_id",
                "nombre": "origen.nombre",
                "ciudad": "origen.ciudad",
                "saldo": "origen.saldo",
                "estado": "origen.estado"
            }
        ).execute()
        
        print(f"✓ {df_changes_to_apply.count()} cambios aplicados al DW")
    else:
        print("No hay cambios para aplicar")
    
    return version_final

# Procesar cambios hasta la versión 3
ultima_version_disponible = delta_table_cdf.history().select(max("version")).collect()[0][0]

ultima_version_procesada = procesar_cambios_incrementales(
    ultima_version_procesada, 
    ultima_version_disponible
)

print(f"\n✓ Última versión procesada: {ultima_version_procesada}")

# Verificar que el DW está actualizado
print("\n=== DATOS EN DW (DESPUÉS DE PROCESAMIENTO INCREMENTAL) ===")
df_dw_final = spark.read.format("delta").load(ruta_delta_dw)
df_dw_final.orderBy("cliente_id").show()

# Comparar con tabla origen
print("\n=== DATOS EN TABLA ORIGEN (VERSIÓN ACTUAL) ===")
df_origen_actual = spark.read.format("delta").load(ruta_delta_cdf)
df_origen_actual.orderBy("cliente_id").show()

# Simular más cambios y nuevo procesamiento incremental
print("\n\n=== SIMULANDO NUEVOS CAMBIOS ===")

# Nuevo UPDATE (Versión 4)
delta_table_cdf.update(
    condition = "ciudad = 'Cusco'",
    set = {"estado": "'PREMIUM'"}
)

# Nuevos INSERTS (Versión 5)
datos_adicionales = [
    (8, "Roberto", "Lima", 12000.00, "PREMIUM"),
    (9, "Lucia", "Arequipa", 8500.00, "ACTIVO"),
]

df_adicionales = spark.createDataFrame(
    datos_adicionales,
    ["cliente_id", "nombre", "ciudad", "saldo", "estado"]
)

df_adicionales.write.format("delta").mode("append").save(ruta_delta_cdf)

print("✓ Nuevos cambios aplicados (versiones 4 y 5)")

# Procesar cambios incrementales nuevamente
ultima_version_disponible = delta_table_cdf.history().select(max("version")).collect()[0][0]

ultima_version_procesada = procesar_cambios_incrementales(
    ultima_version_procesada,
    ultima_version_disponible
)

# Verificar DW actualizado
print("\n=== DW DESPUÉS DE SEGUNDO PROCESAMIENTO INCREMENTAL ===")
df_dw_final_2 = spark.read.format("delta").load(ruta_delta_dw)
print(f"Total registros en DW: {df_dw_final_2.count()}")
df_dw_final_2.orderBy("cliente_id").show()