# Transaction log en acción

In [0]:
%sql
DROP DATABASE IF EXISTS workspace.bronze CASCADE;
CREATE DATABASE IF NOT EXISTS workspace.bronze;
CREATE VOLUME bronze.external_tables;


In [0]:
%sql
CREATE TABLE delta.`/Volumes/workspace/bronze/external_tables/productos`
(
  id INT,
  nombre STRING
);

In [0]:
%sql
INSERT INTO delta.`/Volumes/workspace/bronze/external_tables/productos`
VALUES
(1, 'Producto 1'),
(2, 'Producto 2'),
(3, 'Producto 3');

In [0]:
%sql
SELECT * FROM delta.`/Volumes/workspace/bronze/external_tables/productos`;

In [0]:
df = spark.read.format("json").load("/Volumes/workspace/bronze/external_tables/productos/_delta_log/00000000000000000001.json")
display(df)

# Realizando Viajes en el tiempo

In [0]:
%sql
DROP TABLE IF EXISTS bronze.rubros;
CREATE TABLE bronze.rubros (
  rubro_sk BIGINT GENERATED ALWAYS AS IDENTITY,
  rubro_id BIGINT GENERATED BY DEFAULT AS IDENTITY,
  rubro_nombre STRING COMMENT 'Nombre del rubro' NOT NULL
)
USING DELTA

In [0]:
%sql
INSERT INTO bronze.rubros (rubro_id, rubro_nombre)
VALUES 
(1, 'Rubro 1'),
(2, 'Rubro 2');

In [0]:
%sql
SELECT * FROM bronze.rubros;

In [0]:
%sql
INSERT INTO bronze.rubros (rubro_id, rubro_nombre)
VALUES 
(3,'Rubro 3'),
(4,'Rubro 4'),
(5,'Rubro 5');

In [0]:
%sql
SELECT * FROM bronze.rubros;

In [0]:
%sql
DESCRIBE HISTORY bronze.rubros;

In [0]:
%sql
SELECT * FROM bronze.rubros VERSION AS OF 1;

In [0]:
%sql
-- Para ver este ejemplo ingresar fecha y hora mayor a la versión 1 y menor a la versión 2
SELECT * FROM bronze.rubros TIMESTAMP AS OF '2025-11-30 12:00:00';

In [0]:
%sql
RESTORE TABLE bronze.rubros TO VERSION AS OF 1;
--RESTORE TABLE bronze.rubros TO TIMESTAMP AS OF '2025-11-30 12:00:00';

In [0]:
%sql
DESCRIBE HISTORY bronze.rubros;

# Propiedades de las tablas Delta

In [0]:
%sql
DESCRIBE DETAIL bronze.rubros;

In [0]:
%sql
ALTER TABLE bronze.rubros SET
TBLPROPERTIES (
  delta.logRetentionDuration = 'interval 7 days',
  equipo.nombre = 'Equipo A',
  equipo.responsable = 'Dario Bernabeu'
);

In [0]:
%sql
DESCRIBE DETAIL bronze.rubros;

In [0]:
%sql
ALTER TABLE bronze.rubros
UNSET TBLPROPERTIES (
    equipo.nombre
);


# Change Data Feed (CDF)

In [0]:
%sql
ALTER TABLE bronze.rubros SET TBLPROPERTIES (
    delta.enableChangeDataFeed = true
);

In [0]:
%sql
UPDATE bronze.rubros SET rubro_nombre = 'Rubro 1 Actualizado' WHERE rubro_id = 1;
    

In [0]:
%sql
DESCRIBE HISTORY bronze.rubros;

In [0]:
%sql
SELECT * FROM table_changes('bronze.rubros', 6)

# Particionamiento de tablas Delta

In [0]:
%sql
CREATE TABLE bronze.ventas (
    id BIGINT,
    producto STRING,
    cantidad DOUBLE,
    fecha DATE
)
USING DELTA
PARTITIONED BY (fecha)

In [0]:
%sql
INSERT INTO bronze.ventas
VALUES
(1, 'Producto 1', 10, '2025-01-01'),
(2, 'Producto 2', 25, '2025-01-01'),
(3, 'Producto 3', 20, '2025-01-01'),
(4, 'Producto 4', 13, '2025-01-02'),
(5, 'Producto 5', 15, '2025-01-02'),
(6, 'Producto 6', 28, '2025-01-02');

In [0]:
%sql
SELECT * FROM bronze.ventas 
WHERE fecha = '2025-01-01';

# OPTIMIZE en Delta Lake

In [0]:
%sql
CREATE TABLE bronze.clientes (
  id BIGINT,
  cliente STRING
)
USING DELTA;

In [0]:
from pyspark.sql import SparkSession

for i in range(1, 20 + 1):    
    spark.sql( f"""
        INSERT INTO bronze.clientes (id, cliente)
        VALUES ({i}, 'Cliente {i}');
    """
    )



In [0]:
%sql
OPTIMIZE bronze.clientes;

In [0]:
%sql
DESCRIBE HISTORY bronze.clientes;

# Z-ORDERING en Delta Lake

In [0]:
%sql
OPTIMIZE bronze.clientes
ZORDER BY (id)

# Limpiando datos obsoletos con Vacuum

In [0]:
%sql
ALTER TABLE bronze.clientes SET TBLPROPERTIES (
    delta.deletedFileRetentionDuration = 'interval 3 days'
);

In [0]:
%sql
VACUUM bronze.clientes;

# Procesamiento Streaming en Delta Lake

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS workspace.streaming;
CREATE VOLUME IF NOT EXISTS workspace.streaming.checkpoints;
CREATE VOLUME IF NOT EXISTS workspace.streaming.raw;

In [0]:
%sql
CREATE TABLE IF NOT EXISTS workspace.bronze.clima (
    latitud FLOAT,
    longitud FLOAT,
    temperatura FLOAT,
    velocidad_viento FLOAT,
    direccion_viento INT,
    codigo_clima INT,
    hora_evento TIMESTAMP,
    hora_ingesta TIMESTAMP
) USING DELTA;

In [0]:
import requests
import json
import time
import os
from datetime import datetime

# Configuración inicial
LANDING_ZONE = "/Volumes/workspace/streaming/raw"
API_URL = "https://api.open-meteo.com/v1/forecast?latitude=-34.90&longitude=-56.16&current_weather=true" # API de clima

# Nos aseguramos que el directorio existe
os.makedirs(LANDING_ZONE, exist_ok=True)

print(f"📡 Iniciando simulador de sensor. Escribiendo en: {LANDING_ZONE}")
print("Presiona Ctrl+C para detener el generador.\n")

# Condición de corte para que solo itere un número limitado de veces
contador = 0
limite = 10

try:
    while True:
        # Obtener datos de la API
        response = requests.get(API_URL)
        
        if response.status_code == 200:
            data = response.json()
            
            # Extraemos la información relevante
            evento_clima = {
                "latitud": data['latitude'],
                "longitud": data['longitude'],
                "temperatura": data['current_weather']['temperature'],
                "velocidad_viento": data['current_weather']['windspeed'],
                "direccion_viento": data['current_weather']['winddirection'],
                "codigo_clima": data['current_weather']['weathercode'],
                "hora_evento": data['current_weather']['time'],
                "hora_ingesta": datetime.now().isoformat()
            }
            
            # Generamos nombre de archivo único
            filename = f"clima_{int(time.time())}.json"
            filepath = os.path.join(LANDING_ZONE, filename)
            
            # Guardamos el archivo JSON
            with open(filepath, 'w') as f:
                json.dump(evento_clima, f)
            
            print(f"✅ Archivo generado: {filename} | Temp: {evento_clima['temperatura']}°C")
        else:
            print(f"❌ Error en API: {response.status_code}")

        contador += 1
        if contador >= limite:
            break
        
        # Esperar 5 segundos antes de la siguiente iteración
        time.sleep(5)

except KeyboardInterrupt:
    print("\n🛑 Generador detenido.")

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, FloatType, IntegerType, StringType
from pyspark.sql.functions import col, to_timestamp

# Configuración inicial
INPUT_PATH = "/Volumes/workspace/streaming/raw"
DELTA_TABLE_OUTPUT = "workspace.bronze.clima"
CHECKPOINT_PATH = "/Volumes/workspace/streaming/checkpoints/clima"

# Definimos el esquema
json_schema = StructType([
    StructField("latitud", FloatType(), True),
    StructField("longitud", FloatType(), True),
    StructField("temperatura", FloatType(), True),
    StructField("velocidad_viento", FloatType(), True),
    StructField("direccion_viento", IntegerType(), True),
    StructField("codigo_clima", IntegerType(), True),
    StructField("hora_evento", StringType(), True),
    StructField("hora_ingesta", StringType(), True)
])

# Leemos los datos generados por el Stream de entrada
# maxFilesPerTrigger=1 hace que procese archivo por archivo
df_stream = spark.readStream \
    .format("json") \
    .schema(json_schema) \
    .option("maxFilesPerTrigger", 1) \
    .load(INPUT_PATH)

# Convertirmos strings de tiempo a Timestamp
df_transformed = df_stream \
    .withColumn("hora_evento", to_timestamp(col("hora_evento"))) \
    .withColumn("hora_ingesta", to_timestamp(col("hora_ingesta")))

# Escribimos el Stream de salida en tabla Delta
query = df_transformed.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", CHECKPOINT_PATH) \
    .trigger(availableNow=True) \
    .toTable(DELTA_TABLE_OUTPUT)

query.awaitTermination()