In [0]:
# Configuración de widgets para credenciales y parámetros

# Widget para el host de la base de datos PostgreSQL
dbutils.widgets.text("host", "datavision-dev.c340wcgc05b7.us-west-2.rds.amazonaws.com", "Host PostgreSQL")

# Widget para el usuario (tipo texto normal)
dbutils.widgets.text("username", "dateneo_readonly", "Usuario BD")

# Widget para la contraseña (tipo password para seguridad)
dbutils.widgets.text("password", "", "Contraseña BD")

# Widget para el nombre del alumno (usado en nombres de tablas)
dbutils.widgets.text("alumno", "bruno", "Nombre del Alumno")

# Widget para el catálogo de destino en Databricks
dbutils.widgets.text("catalogo", "bronce_dev", "Catálogo Destino")

# Leer los valores de los widgets
HOST = dbutils.widgets.get("host")
USERNAME = dbutils.widgets.get("username")
PASSWORD = dbutils.widgets.get("password")
ALUMNO = dbutils.widgets.get("alumno")
CATALOGO = dbutils.widgets.get("catalogo")

# Ingesta de datos con JDBC en Apache Spark

## ¿Qué es JDBC?

JDBC (Java Database Connectivity) es una API de Java que permite conectar aplicaciones Java con bases de datos relacionales. En el contexto de Apache Spark, JDBC nos permite:

- **Leer datos** desde bases de datos relacionales (PostgreSQL, MySQL, SQL Server, Oracle, etc.)
- **Escribir datos** hacia estas bases de datos
- **Realizar consultas SQL** directamente sobre las bases de datos fuente

## JDBC en Spark

Spark incluye un **data source JDBC** que facilita la conexión con bases de datos relacionales. Según la documentación oficial de Spark ([spark.apache.org/docs/latest/sql-data-sources-jdbc.html](https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html)):

- **DataFrameReader.jdbc()**: Para leer datos desde JDBC
- **DataFrameWriter.jdbc()**: Para escribir datos hacia JDBC
- Soporta múltiples opciones de configuración como `url`, `dbtable`, `query`, `partitionColumn`, `numPartitions`, etc.

### Ventajas del data source JDBC:
- ✅ **Distribuido**: Spark puede paralelizar la lectura/escritura
- ✅ **Escalable**: Maneja grandes volúmenes de datos eficientemente
- ✅ **Flexible**: Soporta queries complejas y particionamiento
- ✅ **Compatible**: Funciona con cualquier base de datos que tenga driver JDBC

### Desventajas:
- ❌ **Driver JDBC**: Debe estar disponible en el classpath de Spark
- ❌ **Conexiones**: Cada partición requiere una conexión JDBC
- ❌ **Rendimiento**: Puede ser más lento que conectores nativos

## Configuración de la conexión JDBC

### URL de conexión
La URL JDBC sigue el formato estándar: `jdbc:<subprotocol>://<host>:<port>/<database>`

En nuestro caso:
- **Subprotocol**: `postgresql` (para PostgreSQL)
- **Host**: servidor de base de datos
- **Puerto**: `5432` (puerto estándar de PostgreSQL)
- **Base de datos**: `datavision`

### Propiedades de conexión
Según la documentación de Spark, las propiedades principales son:

| Propiedad | Descripción | Ejemplo |
|-----------|-------------|---------|
| `url` | URL completa de conexión JDBC | `jdbc:postgresql://host:5432/db` |
| `dbtable` | Tabla o query a leer | `accounts` o `(SELECT * FROM accounts WHERE active=1) AS subq` |
| `user` | Usuario de la base de datos | |
| `password` | Contraseña del usuario | |
| `driver` | Clase del driver JDBC | `org.postgresql.Driver` |

### Opciones avanzadas importantes:
- **`partitionColumn`**: Columna numérica para particionar la lectura
- **`lowerBound` / `upperBound`**: Límites para el particionamiento
- **`numPartitions`**: Número de particiones para paralelizar
- **`fetchsize`**: Número de filas por fetch (mejora rendimiento)
- **`batchsize`**: Tamaño del batch para escrituras

### Driver JDBC
El driver debe estar disponible en el classpath de Spark. Para PostgreSQL usamos `org.postgresql.Driver`.

In [0]:
# =============================================================================
# CONFIGURACIÓN DE LA CONEXIÓN JDBC
# =============================================================================

# Construcción de la URL JDBC para PostgreSQL
# Formato: jdbc:postgresql://host:puerto/base_de_datos
jdbc_url = f"jdbc:postgresql://{HOST}:5432/datavision"

# Propiedades de conexión JDBC (según documentación de Spark)
# Estas propiedades se pasan al driver JDBC
properties = {
    "user": USERNAME,                    # Usuario de la base de datos
    "password": PASSWORD,               # Contraseña del usuario
    "driver": "org.postgresql.Driver"   # Clase del driver JDBC de PostgreSQL
}

## Carga full idempotente

### ¿Qué significa "idempotente"?

Una operación es **idempotente** cuando puede ejecutarse múltiples veces sin cambiar el resultado final. En el contexto de ETL/ELT:

- ✅ **Primera ejecución**: Inserta los datos
- ✅ **Ejecuciones posteriores**: No duplica datos, mantiene consistencia
- ✅ **Reintentos seguros**: Si falla, se puede re-ejecutar sin problemas

### Estrategia de carga full idempotente

Esta notebook implementa una **carga full idempotente** con la siguiente lógica:

1. **Lectura completa**: Lee TODOS los datos de las tablas fuente via JDBC
2. **Fecha de extracción**: Agrega columna `fecha_extraccion` con la fecha actual
3. **Borrado selectivo**: Elimina SOLO los registros del día actual en destino
4. **Inserción**: Agrega los nuevos datos en modo `append`

```sql
-- Pseudocódigo de lo que hace:
DELETE FROM tabla_destino WHERE fecha_extraccion = '2026-01-01';
INSERT INTO tabla_destino SELECT *, '2026-01-01' FROM tabla_fuente;
```

### Ventajas de esta estrategia:
- ✅ **Idempotente**: Se puede ejecutar múltiples veces en el mismo día
- ✅ **Histórico**: Mantiene datos de días anteriores
- ✅ **Simple**: Lógica fácil de entender y mantener
- ✅ **Consistente**: Datos del día siempre actualizados

### Desventajas:
- ❌ **No incremental**: Lee toda la tabla fuente cada vez
- ❌ **Borrado completo del día**: Si hay cambios en fuente, se pierden
- ❌ **Alto volumen**: Puede ser ineficiente para tablas muy grandes

In [0]:
# Importaciones necesarias para el procesamiento de datos
from pyspark.sql.functions import current_date  # Para agregar fecha de extracción
from datetime import date  # Para obtener fecha actual en formato ISO
from pyspark.sql.functions import col  # Para manipular columnas (no se usa aquí pero es común)

# =============================================================================
# CONFIGURACIÓN DE LAS TABLAS A PROCESAR
# =============================================================================

# Lista de tuplas: (nombre_tabla_fuente, nombre_tabla_destino)
# Cada tabla se lee desde PostgreSQL y se escribe en Delta Lake
tables = [
    ("accounts", f"{CATALOGO}.datavision_{ALUMNO}.accounts"),
    ("subscriptions", f"{CATALOGO}.datavision_{ALUMNO}.subscriptions"),
    ("accounts_subscription", f"{CATALOGO}.datavision_{ALUMNO}.accounts_subscription"),
    ("premium_features", f"{CATALOGO}.datavision_{ALUMNO}.premium_features")
]

# =============================================================================
# PROCESAMIENTO DE DATOS - CARGA FULL IDEMPOTENTE
# =============================================================================

# Diccionario para almacenar los DataFrames leídos
dfs = {}

# Obtener la fecha actual en formato ISO (YYYY-MM-DD)
today = date.today().isoformat()

# Procesar cada tabla definida en la lista
for table, target in tables:
    print(f"Procesando tabla: {table} -> {target}")

    # 1. LECTURA DESDE JDBC
    # spark.read.jdbc() utiliza el JDBC Data Source de Spark
    # Lee TODOS los datos de la tabla fuente
    df = spark.read.jdbc(url=jdbc_url, table=table, properties=properties)
    df_strings = (
        df.select([col(c).cast("string").alias(c) for c in df.columns])
    )

    # 2. AGREGAR FECHA DE EXTRACCIÓN
    # Esto nos permite saber cuándo se extrajeron los datos
    # y hacer la carga idempotente
    df_strings = df_strings.withColumn("fecha_extraccion", current_date())

    # 3. ESCRITURA SELECTIVA EN MODO OVERWRITE CON replaceWhere
    # Sobrescribe SOLO los registros del día actual para evitar duplicados
    # Hace la carga idempotente y evita duplicados sin borrar datos históricos de otros días
    (df_strings.write
        .mode("overwrite")
        .option("replaceWhere", f"fecha_extraccion = '{today}'")
        .saveAsTable(target)
    )

    # Guardar el DataFrame en el diccionario para reporting
    dfs[table] = df_strings

# =============================================================================
# REPORTING - MOSTRAR RESULTADOS
# =============================================================================

print("=== RESUMEN DE LA CARGA ===")
# Mostrar el conteo de registros procesados por tabla
for table, df in dfs.items():
    display(spark.createDataFrame([(table, df.count())], ["Tabla", "Registros Procesados"]))

# Ejercicio: Modificar la estrategia de carga

## Objetivo
Esta notebook implementa una **carga full idempotente**. Ahora te proponemos **modificar la estrategia** para explorar diferentes enfoques de carga de datos.

## Alternativas de carga

### A) Carga incremental sin borrado
**Modificar la carga para que se carguen datos ante cada ejecución sin borrar nada**

```python
# En lugar de borrar por fecha simplemente hacer append siempre:
df.write.mode("append").saveAsTable(target)
```

**Preguntas para reflexionar:**
- ¿Qué sucede si ejecuto la notebook múltiples veces?
- ¿Cómo identifico qué datos son nuevos vs. duplicados?
- ¿Cuándo sería útil esta estrategia?

### B) Carga con overwrite completo
**Hacer un overwrite completo de la tabla**

```python
# En lugar de borrar selectivo hacer overwrite completo:
df.write.mode("overwrite").saveAsTable(target)
```

**Preguntas para reflexionar:**
- ¿Qué pasa con los datos históricos?
- ¿Es idempotente esta operación?
- ¿Cuándo usaría overwrite en lugar de append?

### C) Merge por clave primaria
**Merge por clave primaria de la tabla (más avanzado)**

```python
from pyspark.sql.functions import current_date

# Leer datos existentes
existing_df = spark.read.table(target)

# Hacer merge basado en clave primaria
# (pseudocódigo - requeriría implementar lógica de merge)
# MERGE INTO {target} t
# USING (SELECT *, '{today}' as fecha_extraccion FROM fuente) s
# ON t.primary_key = s.primary_key
# WHEN MATCHED THEN UPDATE SET ...
# WHEN NOT MATCHED THEN INSERT ...
```

**Preguntas para reflexionar:**
- ¿Qué necesito saber de la tabla para implementar merge?
- ¿Cómo identifico la clave primaria?
- ¿Cuáles son las ventajas sobre las estrategias anteriores?

## Instrucciones del ejercicio

1. **Elige una alternativa** de las tres presentadas
2. **Explica por qué** elegiste esa alternativa considerando:
   - El caso de uso específico
   - Ventajas y desventajas
   - Requisitos de idempotencia
3. **Implementa la modificación** copiando esta notebook
4. **Prueba tu solución** ejecutándola múltiples veces
5. **Documenta los resultados** y qué aprendiste