Ejercicio 1: Filtrar eventos lunares y solares de los últimos dos años

In [None]:
df = spark.read.parquet("data/eventos")

# Filtrar eventos
eventos_filtrados = df \
    .withColumn("year", year("timestamp")) \
    .filter(
        (col("event_type").isin(["lunar", "solar"])) & 
        (col("year") >= 2023)
    )

# Mostrar resultados
print("Eventos lunares y solares desde 2023:")
eventos_filtrados.show()

Ejercicio 2: Agrupar eventos por tipo y calcular la media de eventos por año

In [None]:
media_eventos = df \
    .withColumn("year", year("timestamp")) \
    .groupBy("event_type", "year") \
    .agg(
        count("*").alias("total_eventos")
    ) \
    .groupBy("event_type") \
    .agg(
        avg("total_eventos").alias("media_eventos_por_año")
    )

print("Media de eventos por tipo:")
media_eventos.show()

Ejercicio 3: Implementar broadcast join con dataset de ubicaciones

In [None]:

ubicaciones_data = [
    ("NorteAmerica", "GMT-5", "Norte"),
    ("SurAmerica", "GMT-3", "Sur"),
    ("Europa", "GMT+1", "Norte"),
    ("Asia", "GMT+8", "Norte"),
    ("Africa", "GMT+2", "Sur"),
    ("Oceania", "GMT+10", "Sur")
]

ubicaciones_df = spark.createDataFrame(
    ubicaciones_data, 
    ["location", "zona_horaria", "hemisferio"]
)


from pyspark.sql.functions import broadcast

eventos_con_ubicacion = df \
    .join(broadcast(ubicaciones_df), "location") \
    .select("event_id", "event_type", "location", "zona_horaria", "hemisferio")

print("Eventos con información de ubicación:")
eventos_con_ubicacion.show()

Ejercicio 4: Escribir datos procesados en Parquet con particionamiento

In [None]:

df \
    .withColumn("year", year("timestamp")) \
    .withColumn("month", month("timestamp")) \
    .write \
    .partitionBy("year", "location") \
    .mode("overwrite") \
    .parquet("data/eventos_particionados")

Ejercicio 5: Leer CSV con datos corruptos

In [None]:
df_corrupto = spark.read \
    .option("mode", "DROPMALFORMED") \
    .option("header", "true") \
    .csv("data/eventos_corruptos.csv")

print("Datos leídos ignorando filas corruptas:")
df_corrupto.show()

Ejercicio 6: Calcular duración promedio por tipo

In [None]:
eventos_duracion = df \
    .withColumn("duracion_minutos", 
        unix_timestamp("timestamp") - unix_timestamp(lag("timestamp")
        .over(Window.partitionBy("event_type").orderBy("timestamp")))
    ) \
    .groupBy("event_type") \
    .agg(
        avg("duracion_minutos").alias("duracion_promedio_minutos")
    )

print("Duración promedio por tipo de evento:")
eventos_duracion.show()

Ejercicio 7: Pipeline incremental

In [None]:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2024, 1, 1),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'pipeline_eventos_incrementales',
    default_args=default_args,
    description='Pipeline incremental para procesar eventos',
    schedule_interval='0 * * * *',  # Cada hora
)

procesar_task = PythonOperator(
    task_id='procesar_datos_incrementales',
    python_callable=main,
    dag=dag,
)

Ejercicio 8: Configuración para memoria

In [None]:
spark.conf.set("spark.executor.memory", "8g")
spark.conf.set("spark.driver.memory", "4g")
spark.conf.set("spark.memory.offHeap.enabled", "true")
spark.conf.set("spark.memory.offHeap.size", "8g")
spark.conf.set("spark.sql.shuffle.partitions", "200")

Ejercicio 9: Cache vs Persist


In [None]:

# Cache
df.cache()
cached_time = time.time()
df.groupBy("event_type").count().show()
print(f"Tiempo con cache: {time.time() - cached_time}")

# Persist
from pyspark.storagelevel import StorageLevel
df.unpersist()
df.persist(StorageLevel.DISK_ONLY)
persist_time = time.time()
df.groupBy("event_type").count().show()
print(f"Tiempo con persist: {time.time() - persist_time}")

Ejercicio 10: Optimizar con particionamiento

In [None]:

df_optimizado = df \
    .repartition(col("event_type")) \
    .sortWithinPartitions("timestamp")

# Guardar optimizado
df_optimizado.write \
    .partitionBy("event_type") \
    .option("maxRecordsPerFile", 1000000) \
    .parquet("data/eventos_optimizados")

Sql 

Ejercicio 1: Consulta de eventos lunares por ubicación

In [None]:
SELECT location, COUNT(*) AS event_count
FROM fact_events
WHERE event_type = 'lunar'
GROUP BY location;

Ejercicio 2: Análisis de esquema estrella

fact_events (event_id, event_type_id, timestamp_id, location_id, details)
dim_event_type (event_type_id, event_name)
dim_timestamp (timestamp_id, year, month, day)
dim_location (location_id, location_name, coordinates)

Mejoras propuestas:

Agregar índices en claves foráneas
Particionar por año
Agregar columnas de auditoría

Ejercicio 3: Rango de eventos entre años

In [None]:
SELECT 
    event_type,
    MIN(year) as primer_año,
    MAX(year) as ultimo_año,
    COUNT(*) as total_eventos
FROM fact_events fe
JOIN dim_timestamp dt ON fe.timestamp_id = dt.timestamp_id
GROUP BY event_type;

Ejercicio 4: Índices para dimensiones

In [None]:
CREATE INDEX idx_event_type_id ON fact_events(event_type_id);
CREATE INDEX idx_timestamp_id ON fact_events(timestamp_id);
CREATE INDEX idx_location_id ON fact_events(location_id);

Ejercicio 5: Tabla de auditoría

In [None]:
CREATE TABLE audit_fact_events (
    audit_id SERIAL PRIMARY KEY,
    event_id VARCHAR(36),
    action_type VARCHAR(10),
    changed_at TIMESTAMP,
    changed_by VARCHAR(50)
);

Ejercicio 6: Promedios por año y ubicación

In [None]:
SELECT 
    l.location_name,
    t.year,
    COUNT(*) as total_eventos,
    AVG(COUNT(*)) OVER (PARTITION BY l.location_name) as promedio_ubicacion
FROM fact_events fe
JOIN dim_location l ON fe.location_id = l.location_id
JOIN dim_timestamp t ON fe.timestamp_id = t.timestamp_id
GROUP BY l.location_name, t.year;

Ejercicio 7: Esquema Snowflake para tiempo y ubicación

In [None]:
CREATE TABLE dim_region (
    region_id INT PRIMARY KEY,
    region_name VARCHAR(50)
);

CREATE TABLE dim_location (
    location_id INT PRIMARY KEY,
    location_name VARCHAR(50),
    region_id INT REFERENCES dim_region(region_id)
);

Ejercicio 8: Picos de eventos lunares por trimestre

In [None]:
WITH eventos_trimestre AS (
    SELECT 
        t.year,
        t.quarter,
        COUNT(*) as eventos,
        AVG(COUNT(*)) OVER () as promedio_general
    FROM fact_events fe
    JOIN dim_timestamp t ON fe.timestamp_id = t.timestamp_id
    WHERE event_type = 'lunar'
    GROUP BY t.year, t.quarter
)
SELECT *

Ejercicio 9: Optimizar consulta

In [None]:
-- Antes
SELECT *
FROM fact_events
WHERE event_type = 'lunar';

-- Después (con índice y particionamiento)
CREATE INDEX idx_event_type ON fact_events(event_type)
INCLUDE (event_id, location_id, timestamp_id);

SELECT /*+ INDEX(fact_events idx_event_type) */
    fe.event_id, 
    l.location_name,
    t.event_date
FROM fact_events fe
JOIN dim_location l ON fe.location_id = l.location_id
JOIN dim_timestamp t ON fe.timestamp_id = t.timestamp_id
WHERE fe.event_type = 'lunar';


Ejercicio 10: Estrategia para datos históricos

In [None]:
-- Particionamiento por rango de fechas
CREATE TABLE fact_events_history (
    event_id VARCHAR(36),
    event_type_id INT,
    timestamp_id INT,
    location_id INT
) PARTITION BY RANGE (timestamp_id);

-- Crear particiones por año
CREATE TABLE fact_events_2020 PARTITION OF fact_events_history
    FOR VALUES FROM (20200101) TO (20210101);
CREATE TABLE fact_events_2021 PARTITION OF fact_events_history
    FOR VALUES FROM (20210101) TO (20220101);