# UD1 – Bloque 7 e 8: Exercicios prácticos con Delta Lake e Apache Iceberg

Este notebook recolle exercicios guiados para repetir e probar os exemplos dos apuntamentos:

- UD1 – Bloque 7: Delta Lake sobre HDFS e S3 (MinIO)
- UD1 – Bloque 8: Apache Iceberg sobre HDFS e S3 (MinIO)

O obxectivo é que poidas:

- Crear e consultar táboas Delta e Iceberg en HDFS e MinIO.
- Practicar operacións ACID: inserción, actualización, *time travel*.
- Ver como cambian os ficheiros físicos e os metadatos.

## 0. Preparación da sesión de Spark

No noso clúster, a sesión de Spark adoita estar xa creada como `spark`.
Se non é así, crea unha nova sesión co seguinte código.

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
spark.version

---

# Parte 1 – Delta Lake sobre HDFS

Nesta parte imos reproducir o **Escenario 1** do Bloque 7:
creación dunha táboa Delta en HDFS, lectura, consulta, MERGE e *time travel*.

## 1.1 Crear un DataFrame de exemplo

Pequeno conxunto de datos en memoria con lecturas de temperatura.

In [None]:
data = [
    ("2024-01-01", "A Coruña", 11.2),
    ("2024-01-01", "Vigo", 13.5),
    ("2024-01-02", "A Coruña", 9.8),
    ("2024-01-02", "Vigo", 14.1)
]

df = spark.createDataFrame(data, ["data", "cidade", "temperatura"])
df.show()
df.printSchema()

## 1.2 Gardar como Delta Table en HDFS

Escribimos o DataFrame en formato Delta en HDFS, na ruta `/datalake/meteo_delta`.

In [None]:
delta_path = "/datalake/meteo_delta"

(df.write
 .format("delta")
 .mode("overwrite")
 .save(delta_path))

Comproba o contido en HDFS co seguinte comando (desde o contedor do namenode):

```bash
hdfs dfs -ls /datalake/meteo_delta
hdfs dfs -ls /datalake/meteo_delta/_delta_log
```

## 1.3 Lectura e consulta da táboa Delta

Lemos os datos dende HDFS e facemos unha consulta SQL de agregación.

In [None]:
df_delta = spark.read.format("delta").load(delta_path)
df_delta.show()

df_delta.createOrReplaceTempView("meteo_delta")

spark.sql("""
    SELECT cidade, AVG(temperatura) AS media
    FROM meteo_delta
    GROUP BY cidade
""").show()

## 1.4 Actualización (upsert) con MERGE INTO

Usamos `DeltaTable.forPath` para cargar a táboa e facemos un MERGE
que actualiza un rexistro e insire outro novo.

In [None]:
from delta.tables import DeltaTable

delta_table = DeltaTable.forPath(spark, delta_path)

novos_datos = [
    ("2024-01-02", "A Coruña", 10.2),  # actualización
    ("2024-01-03", "Ourense", 6.4)     # inserción
]

df_novos = spark.createDataFrame(novos_datos, ["data", "cidade", "temperatura"])

(delta_table.alias("old")
 .merge(
     df_novos.alias("new"),
     "old.data = new.data AND old.cidade = new.cidade"
 )
 .whenMatchedUpdateAll()
 .whenNotMatchedInsertAll()
 .execute())

spark.read.format("delta").load(delta_path).show()

## 1.5 Versionado e *time travel* en Delta Lake

Cada escritura ou MERGE crea unha nova versión. Podemos ver o historial e ler versións antigas.

In [None]:
# Historial de versións
spark.sql(f"""
    DESCRIBE HISTORY delta.`{delta_path}`
""").show(truncate=False)

In [None]:
# Ler a versión inicial (version 0)
df_v0 = (spark.read.format("delta")
         .option("versionAsOf", 0)
         .load(delta_path))

print("Versión 0 da táboa:")
df_v0.show()

---

# Parte 2 – Delta Lake sobre MinIO (S3A)

Agora repetimos un escenario similar, pero gardando os datos nun bucket de MinIO usando o *filesystem* `s3a://`.

## 2.1 Gardar e ler datos Delta en MinIO

Neste exemplo asumimos que existe un bucket chamado `deltalake`.

In [None]:
delta_s3_path = "s3a://deltalake/meteo_delta"

# Gardar no bucket de MinIO
(df.write
 .format("delta")
 .mode("overwrite")
 .save(delta_s3_path))

# Ler desde MinIO
df_s3 = spark.read.format("delta").load(delta_s3_path)
df_s3.show()

Podes comprobar o contido dende:

- A liña de comandos de Hadoop:
  ```bash
  hadoop fs -ls s3a://deltalake/meteo_delta
  ```
- A consola web de MinIO.

---

# Parte 3 – Apache Iceberg sobre HDFS

Nesta parte imos reproducir os exemplos principais do Bloque 8, usando o catálogo `local`
configurado para traballar con Iceberg en HDFS.

## 3.1 Crear un namespace en Iceberg (catálogo local)

Creamos un namespace chamado `meteo` no catálogo `local`.

In [None]:
spark.sql("CREATE NAMESPACE IF NOT EXISTS local.meteo")
spark.sql("SHOW NAMESPACES IN local").show()

## 3.2 Importar o CSV `meteo.csv` como DataFrame

Usamos o ficheiro `meteo.csv` gardado en `/home/hadoop/work/meteo.csv`.

In [None]:
meteo_csv_path = "/home/hadoop/work/meteo.csv"

df_meteo = (spark.read
            .option("header", True)
            .option("inferSchema", True)
            .csv(meteo_csv_path))

df_meteo.show()
df_meteo.printSchema()

## 3.3 Crear unha táboa Iceberg en HDFS a partir do DataFrame

Creamos a táboa `local.meteo.lecturas` usando Iceberg.

In [None]:
(df_meteo.writeTo("local.meteo.lecturas")
 .using("iceberg")
 .create())

spark.sql("SELECT * FROM local.meteo.lecturas").show()

## 3.4 Consultas SQL sobre a táboa Iceberg

Facemos unha agregación simple por cidade.

In [None]:
spark.sql("""
    SELECT cidade, AVG(temperatura) AS media
    FROM local.meteo.lecturas
    GROUP BY cidade
""").show()

## 3.5 Evolución de esquema en Iceberg

Engadimos unha nova columna chamada `humidade`.

In [None]:
spark.sql("ALTER TABLE local.meteo.lecturas ADD COLUMN humidade DOUBLE")
spark.sql("DESCRIBE TABLE local.meteo.lecturas").show(truncate=False)

## 3.6 Inserción de datos en Iceberg (tipos estritos)

Iceberg é estrito co tipo DATE, polo que usamos `DATE('YYYY-MM-DD')`.

In [None]:
spark.sql("""
    INSERT INTO local.meteo.lecturas
    VALUES (DATE('2024-01-03'), 'Lugo', 7.3, 65.0)
""")

spark.sql("SELECT * FROM local.meteo.lecturas WHERE cidade = 'Lugo'").show()

## 3.7 Time travel en Iceberg

Primeiro listamos os snapshots dispoñibles, e despois usamos un `snapshot_id`
para consultar o estado pasado da táboa.

In [None]:
history_df = spark.sql("""
    SELECT snapshot_id, parent_id, operation, made_current_at
    FROM local.meteo.lecturas.history
""")
history_df.show(truncate=False)

In [None]:
# Substitúe este valor por un snapshot_id real visto na cela anterior
snapshot_id_exemplo = 0  # EXEMPLO: cambia este número por un snapshot_id válido

if snapshot_id_exemplo != 0:
    df_old = spark.sql(f"""
        SELECT *
        FROM local.meteo.lecturas VERSION AS OF {snapshot_id_exemplo}
    """)
    df_old.show()
else:
    print("Actualiza 'snapshot_id_exemplo' cun valor válido antes de executar esta cela.")

---

# Parte 4 – Apache Iceberg sobre MinIO

Agora imos crear unha táboa Iceberg no catálogo `minio`, que apunta a un warehouse en S3A/MinIO.

## 4.1 Crear namespace en MinIO

Creamos o namespace `meteo` dentro do catálogo `minio`.

In [None]:
spark.sql("CREATE NAMESPACE IF NOT EXISTS minio.meteo")
spark.sql("SHOW NAMESPACES IN minio").show()

## 4.2 Crear a táboa Iceberg en MinIO

Reutilizamos o DataFrame `df_meteo` e escribímolo como táboa Iceberg en MinIO.

In [None]:
(df_meteo.writeTo("minio.meteo.lecturas")
 .using("iceberg")
 .create())

spark.sql("SELECT * FROM minio.meteo.lecturas").show()

## 4.3 Reescritura con particionamento dinámico

Reescribimos os datos particionando por `cidade` para mellorar o rendemento en consultas filtradas.

In [None]:
spark.sql("""
    ALTER TABLE minio.meteo.lecturas
    REWRITE DATA USING PARTITION BY (cidade)
""")

## 4.4 Ver ficheiros físicos e metadatos da táboa en MinIO

Listamos os ficheiros coñecidos por Iceberg para esta táboa.

In [None]:
spark.sql("SELECT * FROM minio.meteo.lecturas.files").show(truncate=False)

---

## 5. Exercicios adicionais propostos

1. Modifica os exemplos para engadir máis cidades e datas ao conxunto de datos inicial.
2. Proba a engadir novas columnas tanto en Delta como en Iceberg e observa as diferenzas de comportamento.
3. En Delta Lake, fai varias actualizacións e comproba como cambian as versións no `DESCRIBE HISTORY`.
4. En Iceberg, crea outra táboa con particionamento por `data` e compara o rendemento de consultas filtrando por rango de datas.
5. Explora no HDFS e en MinIO a estrutura de directorios e ficheiros que crean Delta e Iceberg para as distintas táboas.