# Databricks: ETLs y Delta Lakes

## ¬øQu√© es un ETL?

Un ETL (Extract, Transform, Load) es un proceso fundamental en la ingenier√≠a de datos. Consiste en extraer datos de diferentes fuentes, transformarlos para adecuarlos a las necesidades del negocio y cargarlos en un sistema de almacenamiento o an√°lisis, como un Data Lake o un Data Warehouse.

### dbutils
`dbutils` es una colecci√≥n de utilidades proporcionadas por Databricks para interactuar con el entorno, gestionar archivos, par√°metros y flujos de trabajo. Es muy √∫til para la gesti√≥n de datos y automatizaci√≥n de tareas dentro de notebooks.

In [0]:
dbutils.help()

Dentro del cat√°logo de `dbutils` existen subsecciones como:
 - dbutils.fs: Para interactuar con el sistema de archivos
 - dbutils.secrets: Para usar secretos
 - dbutils.notebook: Para poder interactuar con otros notebooks
 - dbutils.widgets: Para poder parametrizar los notebooks


**IMPORTANTE**: Antes de seguir subir a Databricks los ficheros del repositorio

**Pista.** Para hacerlo sigue estos pasos. No obstante, si alg√∫n alumno prefiere guardar los ficheros en otro lugar puede hacerlo sin problema.
1. Ir a _Catalog_
2. Entrar en el Catalogo "workspace"
3. Crear un _schema_ con nombre "ceste"
4. Crear un _volumne_ con nombre "archivos"
5. Pulsar en a√±adir y subir los archivos "ventas.csv", "ventas.json", "ventas.parquet"
6. Comprobar si se han subido bien ejecutando la celda de abajo

In [0]:
base_path = "dbfs:/Volumes/workspace/ceste/archivos/"
dbutils.fs.ls(base_path)

### Ingesta desde Distintos Or√≠genes

En los entornos de Big Data, los datos pueden venir en m√∫ltiples formatos: CSV, Parquet, JSON, entre otros. Cada formato tiene ventajas y desventajas en cuanto a compresi√≥n, velocidad de lectura/escritura y compatibilidad. Parquet, por ejemplo, es columnar y eficiente para grandes vol√∫menes.

#### Leer desde CSV

Leer datos desde un archivo CSV es una de las formas m√°s comunes de ingesta en proyectos de datos. El formato CSV (Comma Separated Values) es ampliamente utilizado por su simplicidad y compatibilidad con la mayor√≠a de las herramientas. Sin embargo, es importante tener en cuenta que no es el formato m√°s eficiente para grandes vol√∫menes de datos, ya que no soporta tipos de datos complejos ni compresi√≥n nativa. Por eso, en entornos de Big Data, a menudo se prefiere convertir los datos a formatos como Parquet o Delta una vez cargados.

In [0]:
df_csv = spark.read.csv(
    path=base_path+"ventas.csv",
    header=True,
    inferSchema=True,
    sep=","
)

In [0]:
display(df_csv.head(5))

#### Leer desde parquet

El formato Parquet es un est√°ndar de almacenamiento columnar ampliamente utilizado en Big Data. Su principal fortaleza es la eficiencia tanto en almacenamiento como en velocidad de lectura, especialmente cuando se trabaja con grandes vol√∫menes de datos y consultas sobre columnas espec√≠ficas. Parquet permite compresi√≥n y soporta tipos de datos complejos, lo que lo hace ideal para an√°lisis y procesamiento distribuido. Por ello, es habitual convertir datos de formatos como CSV a Parquet para optimizar el rendimiento y reducir costes en proyectos de an√°lisis de datos.

In [0]:
df_parquet = spark.read.parquet(base_path+"ventas.parquet")

In [0]:
display(df_parquet)

#### Leer desde Json

El formato JSON (JavaScript Object Notation) es ampliamente utilizado para el intercambio de datos debido a su flexibilidad y legibilidad. Permite almacenar estructuras de datos complejas, como listas y diccionarios anidados. Sin embargo, en comparaci√≥n con Parquet, JSON no es tan eficiente en almacenamiento ni en velocidad de procesamiento para grandes vol√∫menes de datos. Es √∫til cuando se requiere flexibilidad en la estructura de los datos o cuando los datos provienen de APIs y sistemas web.

In [0]:
df_json = spark.read.json(base_path+"ventas.json")

In [0]:
display(df_json)

#### Leer desde BD

La lectura de datos desde bases de datos (BD) es fundamental cuando los datos se encuentran almacenados en sistemas transaccionales o relacionales, como MySQL, SQL Server, PostgreSQL, entre otros. En entornos de Big Data, es com√∫n extraer datos de estas fuentes para integrarlos en un Data Lake o procesarlos con Spark. La conexi√≥n suele realizarse mediante JDBC, permitiendo ejecutar consultas SQL y cargar los resultados como DataFrames. Es importante considerar la seguridad, el rendimiento y la gesti√≥n de credenciales al conectar con bases de datos externas.

*La siguiente celda no va a hacer nada porque para funcionar necesitar√≠amos una de BD activa*

In [0]:
jdbc_url = "jdbc:mysql://<host>:<port>/<database>"
properties = {"user": "user", "password": "pwd"}

 Fuente   | ‚úîÔ∏è Ventajas                                                                 | ‚ùå Desventajas                                                                | Uso Com√∫n                                               |
----------|-------------------------------------------------------------------------|----------------------------------------------------------------------------|---------------------------------------------------------|
 CSV      | Simple y ampliamente compatible. F√°cil de editar manualmente.            | No soporta tipos de datos complejos ni compresi√≥n nativa. Menos eficiente para grandes vol√∫menes. | Ingesta inicial de datos peque√±os o de fuentes externas.|
 Parquet  | Almacenamiento columnar eficiente, compresi√≥n nativa, r√°pido para consultas en columnas espec√≠ficas. | No es legible por humanos. Requiere herramientas espec√≠ficas para edici√≥n.  | Procesamiento de Big Data, an√°lisis y almacenamiento optimizado.|
 JSON     | Flexible para estructuras complejas (anidadas). Legible y compatible con APIs web. | Menos eficiente en almacenamiento y procesamiento para grandes vol√∫menes.   | Intercambio de datos con sistemas web o cuando se necesita flexibilidad en la estructura.|
 BD       | Acceso directo a datos actualizados, integraci√≥n con sistemas empresariales, soporte para consultas SQL. | Requiere configuraci√≥n de conexi√≥n, puede tener limitaciones de rendimiento y permisos. | An√°lisis de datos operacionales, integraci√≥n de datos de negocio.|

#### Ejercicio Guiado: Cargar el Dataset "ventas.csv" con Esquema

En este ejercicio, aprender√°s a cargar el dataset "ventas.csv" utilizando un esquema definido manualmente en lugar de inferirlo autom√°ticamente. Esto es √∫til para asegurar la consistencia de los tipos de datos y mejorar el rendimiento.

**Pasos a seguir:**

1. **Define el esquema manualmente:** Utiliza `StructType` y `StructField` para especificar los tipos de datos de cada columna. Bas√°ndote en el contenido del archivo, el esquema podr√≠a incluir campos como `id` (StringType), `fecha` (StringType), `producto` (StringType), `cantidad` (IntegerType) y `precio` (DoubleType).

2. **Lee el archivo CSV con el esquema:** Usa `spark.read.csv()` con los par√°metros `header=True`, `schema=tu_esquema`, `sep=","` y el path `base_path+"ventas.csv"`.

3. **Verifica el esquema y muestra los datos:** Imprime el esquema con `printSchema()` y muestra las primeras filas con `display()` o `show()`.

**C√≥digo de ejemplo (completa las partes faltantes):**

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

### Tu codigo

### Recordatorio: Leer con/sin esquema

| M√©todo de Lectura         | ‚úîÔ∏è Ventajas                                                                 | ‚ùå Desventajas                                                            | Uso Recomendado                          |
|--------------------------|--------------------------------------------------------------------------|------------------------------------------------------------------------|------------------------------------------|
| **Sin esquema (inferSchema=True)** | - R√°pido para exploraci√≥n inicial<br>- No requiere conocer los tipos de datos previamente | - Puede inferir tipos incorrectos si los datos son inconsistentes<br>- M√°s lento en archivos grandes<br>- Menos robusto en producci√≥n | Exploraci√≥n, pruebas r√°pidas, datos peque√±os o desconocidos |
| **Con esquema definido** | - Tipos de datos consistentes y controlados<br>- Mejor rendimiento en la carga<br>- Evita errores por inferencia incorrecta | - Requiere conocer la estructura de los datos<br>- M√°s trabajo inicial | Procesos productivos, datos cr√≠ticos, ETLs, grandes vol√∫menes |

## Delta Lake

### ¬øQu√© es Delta Lake?
Delta Lake es una capa de almacenamiento open source que se integra con Apache Spark y a√±ade capacidades ACID (Atomicidad, Consistencia, Aislamiento, Durabilidad) a los Data Lakes. Permite versionado, time travel, y operaciones transaccionales sobre los datos.

### ¬øPor qu√© es tan valioso el formato de Delta Table?
Guardar tablas en formato Delta permite aprovechar las ventajas de transacciones ACID, manejo de versiones, y optimizaci√≥n de consultas. Es ideal para entornos donde los datos cambian frecuentemente y se requiere trazabilidad.

Una Delta Table permite realizar operaciones ACID, mantener el hist√≥rico con time travel, gestionar versiones, optimizar el almacenamiento, y escalar en entornos de producci√≥n.

### Guardar una tabla

Guardar una tabla en Databricks puede hacerse de dos formas principales:

| M√©todo | Descripci√≥n | ‚úîÔ∏è Ventajas | Uso Recomendado |
|--------|-------------|----------|-----------------|
| **Por path** | Guarda los datos en una ruta espec√≠fica del sistema de archivos (ej: DBFS) usando formato Delta. | - Flexible y directo<br>- F√°cil de mover entre entornos<br>- Integraci√≥n con sistemas externos | Automatizaci√≥n, migraciones, acceso directo por ruta |
| **Por cat√°logo del metastore** | Registra la tabla en el cat√°logo de Databricks para consultas SQL y control de acceso. | - Ideal para colaboraci√≥n multiusuario<br>- Control de permisos y versiones<br> - Auditor√≠a y seguridad avanzada<br>- F√°cil acceso mediante SQL | Entornos colaborativos, equipos de an√°lisis, integraci√≥n con BI |

**Nota:** Ambas opciones aprovechan las ventajas del formato Delta: transacciones ACID, versionado y optimizaci√≥n de consultas.




In [0]:
# Por path
df_csv.write.format("delta").mode("overwrite").save(base_path+"delta")

# Por catalogo
df_csv.write.format("delta").mode("overwrite").saveAsTable("ceste.productos")


### Leer una tabla

1. Lectura de una tabla Delta desde Spark: Tambi√©n puedes leer una tabla Delta directamente desde Spark usando el API de DataFrame:

In [0]:
df_delta = spark.read.format("delta").load(base_path+"delta")
df_delta.show()

Esto es √∫til cuando necesitas manipular los datos con Python, realizar transformaciones complejas, aplicar l√≥gica de negocio o integrarlo en pipelines de procesamiento.

2. Consulta SQL sobre una tabla Delta: Puedes consultar una tabla Delta registrada en el cat√°logo usando SQL est√°ndar. Por ejemplo:

In [0]:
%sql
SELECT * FROM ceste.productos;

Esto te permite aprovechar toda la potencia del lenguaje SQL para filtrar, agrupar, unir y analizar los datos almacenados en formato Delta. Es especialmente √∫til para usuarios que prefieren trabajar con SQL o para integraciones con herramientas de BI.

| M√©todo                |       ‚úîÔ∏è Ventajas        | Diferencias| Uso recomendado|
|-----------------------|--------------------|------------|----------------|
| **Spark DataFrame API** | - Procesamiento avanzado<br>- Integraci√≥n con ML y ETL<br>- Automatizaci√≥n y pipelines<br>- Flexibilidad en transformaciones                         | Permite l√≥gica compleja y manipulaci√≥n program√°tica de datos.  | Procesos autom√°ticos, machine learning, ETL, integraci√≥n con Python/Scala.|
| **SQL**               | - F√°cil de usar y compartir<br>- Ideal para an√°lisis exploratorio<br>- Integraci√≥n con dashboards y BI<br>- Colaboraci√≥n multiusuario         | Sintaxis declarativa, acceso directo desde notebooks y herramientas BI.| An√°lisis, reporting, dashboards, colaboraci√≥n entre equipos.     |

Ambos m√©todos aprovechan las ventajas de Delta Lake: transacciones ACID, versionado, rendimiento y escalabilidad.


### Modificar una tabla

Cuando trabajamos con tablas Delta, una de las grandes ventajas es la posibilidad de realizar operaciones transaccionales complejas de forma eficiente y segura como:
- Updates
- Deletes
- Merges

In [0]:
from delta.tables import DeltaTable

delta_table = DeltaTable.forPath(spark, base_path+"delta")

#### Update

Permite modificar el valor de una o varias columnas en las filas que cumplen una condici√≥n espec√≠fica. Por ejemplo, se puede actualizar el nombre de un producto o corregir un valor err√≥neo en una tabla sin tener que reescribir todo el dataset. La sintaxis es similar a la de SQL, pero se realiza sobre la API de DeltaTable en Spark.

In [0]:
# UPDATE
delta_table.update(
    condition="id = 1",
    set={"producto": "'Ordenador'"}
)

#### Delete

Permite eliminar filas que cumplen una condici√≥n determinada. Es √∫til para depurar datos, eliminar registros obsoletos o cumplir con requisitos legales de borrado. Al igual que el update, el delete es transaccional y garantiza la integridad de la tabla.

In [0]:
# DELETE
delta_table.delete("precio > 150")

Ambas operaciones aprovechan las transacciones ACID de Delta Lake, lo que significa que los cambios son at√≥micos, consistentes, aislados y duraderos. Esto evita problemas de concurrencia y asegura que los datos siempre est√©n en un estado v√°lido, incluso en entornos multiusuario o de procesamiento distribuido.

#### Merge

En este bloque de c√≥digo se muestra c√≥mo realizar un "merge" (tambi√©n conocido como upsert) sobre una tabla Delta:

1. Se crea un DataFrame con nuevos datos o datos actualizados.
2. Luego, se utiliza el m√©todo `merge` de la API de Delta Lake para comparar los datos existentes en la tabla (target) con los nuevos datos (source) usando una condici√≥n de emparejamiento (en este caso, el campo id).
3. Si el `id` ya existe en la tabla, se actualizan todos los campos de ese registro (`whenMatchedUpdateAll`).
4. Si el `id` no existe, se inserta el nuevo registro (`whenNotMatchedInsertAll`).

In [0]:
# Nuevos datos a insertar/actualizar
columns = ["id", "fecha", "producto", "cantidad", "precio"]

nuevos_datos = [(3, "2025-05-24", "Monitor", 1, 179.99), (4, "2025-05-24", "Impresora", 2, 89.99)]
df_updates = spark.createDataFrame(nuevos_datos, columns)


delta_table.alias("target").merge(
    df_updates.alias("source"),
    "target.id = source.id") \
  .whenMatchedUpdateAll() \
  .whenNotMatchedInsertAll() \
  .execute()

Este tipo de operaci√≥n es fundamental en escenarios de integraci√≥n incremental de datos, donde peri√≥dicamente llegan nuevos registros o actualizaciones y queremos mantener la tabla Delta siempre actualizada y sin duplicados.

Ventajas de usar merge en Delta Lake:

- Permite mantener la integridad y consistencia de los datos.
- Facilita la implementaci√≥n de pipelines de datos incrementales.
- Aprovecha las transacciones ACID de Delta Lake, evitando problemas de concurrencia o corrupci√≥n de datos.
- Es mucho m√°s eficiente y sencillo que realizar operaciones manuales de actualizaci√≥n e inserci√≥n por separado.

### Time Travel

El Time Travel en Delta Lake es una funcionalidad que permite consultar versiones anteriores de una tabla Delta. Cada vez que se realiza una operaci√≥n de escritura (insert, update, delete, merge), Delta Lake crea una nueva versi√≥n de la tabla, manteniendo el historial de cambios.

**¬øPara qu√© sirve el Time Travel?**
- Recuperar datos borrados o modificados accidentalmente.
- Auditar cambios y analizar c√≥mo han evolucionado los datos a lo largo del tiempo.
- Comparar el estado de la tabla en diferentes momentos.
- Reproducir experimentos o an√°lisis sobre datos hist√≥ricos.

**¬øC√≥mo se usa?**
Puedes acceder a una versi√≥n anterior de la tabla especificando el n√∫mero de versi√≥n (`versionAsOf`) o una marca de tiempo (`timestampAsOf`) al leer los datos:

**Ventajas:**
- No necesitas mantener copias manuales de los datos para auditor√≠a o recuperaci√≥n.
- Todas las operaciones de Time Travel son transaccionales y consistentes.
- Facilita la trazabilidad y el cumplimiento normativo en entornos empresariales.

In [0]:
display(delta_table.toDF())

In [0]:
# Ver historial
display(delta_table.history())

In [0]:
# Leer versi√≥n anterior
df_old = spark.read.format("delta").option("versionAsOf", 0).load(base_path+"delta")
display(df_old)

In [0]:
# Leer la tabla tal como estaba en una fecha concreta
df_moment = spark.read.format("delta").option("timestampAsOf", "2025-05-27 10:00:00").load(base_path+"delta")

#### Ejercicio Guiado Time Travel

Seguir los pasos de las siguientes celdas para llegar a ver el resultado final

In [0]:
# Paso 1: Crear una df inicial con datos de empleados

from pyspark.sql.types import StructType, StructField, StringType, IntegerType

schema_empleados = StructType([
    StructField("id", IntegerType(), False),
    StructField("nombre", StringType(), False),
    StructField("departamento", StringType(), False),
    StructField("salario", IntegerType(), False)
])

datos_iniciales = [
    (1, "Ana Garc√≠a", "Ventas", 35000),
    (2, "Carlos L√≥pez", "IT", 45000),
    (3, "Mar√≠a Ruiz", "Marketing", 38000)
]

### Completa el c√≥digo para crear el DataFrame con el esquema definido
df_empleados = spark.createDataFrame(.....)
display(df_empleados)

In [0]:
# Paso 2: Guardar el DataFrame como tabla Delta
path_empleados = base_path + "empleados_delta"

df_empleados.write....

print("‚úÖ Tabla creada y guardada en:", path_empleados)
print("\nüìç Para verificar que la tabla existe, ve a:")
print("   - Databricks UI > Data > DBFS > Volumes > workspace > ceste > archivos > empleados_delta")
print("   - O ejecuta: dbutils.fs.ls(base_path + 'empleados_delta')")

In [0]:
# Paso 3: Realizar una operaci√≥n de actualizaci√≥n
print("\nüîÑ Ahora vamos a actualizar el salario de Carlos L√≥pez...")

delta_empleados = DeltaTable.forPath(spark, path_empleados)
delta_empleados.update(
    condition="nombre = 'Carlos L√≥pez'",
    set={"salario": "50000"}
)

print("‚úÖ Salario actualizado de 45000 a 50000")

In [0]:
# Paso 4 Verificar el cambio. Para ello vuelve a leer la tabla Delta
df_actualizado = spark.read....
print("\nüìä Datos actuales de la tabla:")
display(df_actualizado)

In [0]:
# Paso 5: Mostrar el historial de versiones
print("\nüìú Historial de versiones de la tabla:")
print("Ejecuta el siguiente c√≥digo para ver todas las operaciones realizadas:\n")
print("display(delta_empleados.history())")
print("\nüí° Deber√≠as ver 2 versiones:")
print("   - Versi√≥n 0: Creaci√≥n inicial de la tabla (WRITE)")
print("   - Versi√≥n 1: Actualizaci√≥n del salario (UPDATE)")

In [0]:
# Paso 6: Comparar versiones usando Time Travel
print("\n‚è∞ Time Travel: Comparando versi√≥n actual vs versi√≥n inicial")

df_version_0 = spark.read.format("delta").option("versionAsOf", 0).load(path_empleados)
df_version_1 = spark.read.format("delta").option("versionAsOf", 1).load(path_empleados)

print("\nüìå Versi√≥n 0 (Estado inicial):")
display(df_version_0)

print("\nüìå Versi√≥n 1 (Despu√©s de la actualizaci√≥n):")
display(df_version_1)

### Optimizaci√≥n de Tablas

La optimizaci√≥n de tablas en Delta Lake es clave para mejorar el rendimiento de las consultas y reducir el coste de almacenamiento en entornos de Big Data. Existen dos t√©cnicas principales:

- **Compactaci√≥n (OPTIMIZE):** Consiste en reducir el n√∫mero de archivos peque√±os que se generan tras m√∫ltiples escrituras o actualizaciones. Al compactar, se agrupan estos archivos en otros m√°s grandes, lo que acelera las lecturas y reduce la sobrecarga de gesti√≥n de archivos en el sistema distribuido.

- **Z-Ordering:** Es una t√©cnica de ordenaci√≥n f√≠sica de los datos en disco basada en una o varias columnas clave. Al aplicar Z-Ordering, los datos se almacenan de forma que las filas con valores similares en las columnas seleccionadas queden f√≠sicamente pr√≥ximas. Esto mejora notablemente el rendimiento de las consultas filtradas por esas columnas, ya que minimiza la cantidad de datos que Spark necesita leer.

**Ventajas de la optimizaci√≥n:**
- Consultas m√°s r√°pidas y eficientes, especialmente en grandes vol√∫menes de datos.
- Menor latencia en dashboards y an√°lisis interactivos.
- Reducci√≥n de costes de almacenamiento y procesamiento.
- Mejor aprovechamiento de los recursos del cluster.

**Cu√°ndo optimizar:**
- Tras cargas masivas de datos o procesos ETL frecuentes.
- Cuando se detecta degradaci√≥n en el rendimiento de las consultas.
- Antes de ejecutar an√°lisis cr√≠ticos o dashboards de negocio.

En resumen, la optimizaci√≥n peri√≥dica de las tablas Delta es una buena pr√°ctica para mantener el entorno √°gil, eficiente y escalable.

In [0]:
# Optimizar tabla
spark.sql("OPTIMIZE ceste.productos")
# Ordenar f√≠sicamente por "id"
spark.sql("OPTIMIZE ceste.productos ZORDER BY id")

## Ejercicios

1. Crea un DF con datos de clientes. Para ello puedes utilizar la informaci√≥n que te doy y la funcion `list(zip(X,Y))`

In [0]:
nombres = ["Juan", "Mar√≠a", "Carlos", "Ana", "Luis", "Carmen", "Jos√©", "Laura", "Pedro", "Luc√≠a", "Miguel", "Elena", "Javier", "Sof√≠a", "Antonio", "Marta", "Manuel", "Isabel", "Francisco", "Patricia"]
edades = [25, 30, 22, 28, 35, 27, 40, 32, 24, 29, 33, 26, 31, 23, 36, 21, 34, 38, 37, 20]



2. A√±ade la columna indice. Para ello ayudate de la funcion `monotonically_increasing_id`

In [0]:
from pyspark.sql.functions import monotonically_increasing_id


3. Guardar como Delta Table (por path). Utiliza la variable `base_path`

4. Actualiza la edad de un cliente

In [0]:
from delta.tables import DeltaTable



5. Insertar un nuevo cliente

In [0]:
from pyspark.sql.functions import current_timestamp



6. Consulta el historial de la tabla. Posteriormente lee una version anterior con Time Travel

In [0]:
# Ver historial de cambios


# Leer la versi√≥n inicial (version 0)


7. Registrar como tabla del metastore

8. Borra todos los registros

%md
9. Usa widgets para seleccionar din√°micamente la tabla Delta que creamos al principio del notebook ("delta") y mostrar su contenido

%md
10. Time travel con widgets, utiliza un widget tipo texto para poder leer dinamicamente la primera version de la tabla delta inicial ("delta").

%md
11. Atomicidad de Delta Table: Haz una demostracion de la atomicidad en delta Tables, prepara una insercion de dos filas donde la primera vaya a ser correcta y la segunda erronea. Seguidamente comprueba que no se inserto nada

In [0]:
from pyspark.sql.types import *

# Este batch contiene un error: la segunda fila tiene un precio como string
data_erronea = [
    ("10", "2025-05-28", "Tablet", 2, 299.99),
    ("11", "2025-05-28", "Teclado", 3, "caro")  # Error intencionado
]

%md
12. exceptAll() - Permite comparar 2 versiones de dfs y ver sus diferencias.

carga dos versiones de tu tabla delta en dos dfs distintos y aplicalo

In [0]:

df_actual=
df_anterior=



df_actual.exceptAll(df_anterior).display()