# Project: Lunar Events

- Author:   Edgar Rios
- Date:     2025-01-19
- Version:  1.0

In [0]:
# ******************************************************************************************************************
# || DESCRIPTION
# || -------------------------------------------------------------------------------------------------------------
# || PROJECT       	: Lunar Events
# || FILE        	: analysis.ipynb
# || SOURCE         : 
# || TARGET         : /
# || OBJETIVE		: Analysis data of lunar events
# || Reprocess      : Yes
# || NOTES      	: TBD
# || SCHEDULER		: TBD
# || JOB			: TBD
# || VERSION  DEVELOPER	        PROVIDER              DATE			 DESCRIPTION
# || -------------------------------------------------------------------------------------------------------------
# || 	1	  EDGAR RIOS        SYNTHETIC       	  2025-01-19	Analysis of lunar events
# ******************************************************************************************************************


## Import libaries

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import uuid
from datetime import datetime, timedelta
import random

## Analysis

### Leer datos

In [0]:
# load data 
metastore="default.astronomical_events"
df = spark.read.table(metastore)


### 1. Filtrar eventos lunares y solares de los últimos dos años

In [0]:
filtered_df = df.filter((col("event_type").isin(["lunar", "solar"])) & (year(col("timestamp")) >= 2023))
filtered_df.show()

+--------------------+--------------------+----------+-------------+--------------------+
|             details|            event_id|event_type|     location|           timestamp|
+--------------------+--------------------+----------+-------------+--------------------+
|Event description...|c7b0109d-d7f0-448...|     solar|South_America|2023-07-04 02:24:...|
|Event description...|4945f5b1-fb64-471...|     solar|North_America|2023-02-20 02:24:...|
|Event description...|f3158d08-c1bc-4e9...|     lunar|North_America|2024-06-12 02:24:...|
|Event description...|765f9c43-0abf-441...|     solar|South_America|2024-01-24 02:24:...|
|Event description...|025c20fa-dd19-4cd...|     solar|North_America|2023-07-17 02:24:...|
|Event description...|8556cc38-70d0-402...|     lunar|South_America|2023-02-01 02:24:...|
|Event description...|8ff7ee39-0e15-474...|     lunar|North_America|2023-12-11 02:24:...|
|Event description...|00849c99-32ed-426...|     lunar|North_America|2024-10-05 02:24:...|
|Event des

### 2. Agrupar eventos por tipo y calcular la media de eventos por año.


In [0]:
df_with_year = df.withColumn("year", year(col("timestamp")))

events_per_year = (
    df_with_year.groupBy("event_type", "year")
    .agg(count("*").alias("events_count"))
)

average_events = (
    events_per_year.groupBy("event_type")
    .agg(avg("events_count").alias("avg_events_per_year"))
)

# Unir la media de eventos al DataFrame original agrupado
result_with_average = events_per_year.join(
    average_events, on="event_type", how="inner"
)

# Mostrar el resultado
result_with_average.show()



+----------+----+------------+-------------------+
|event_type|year|events_count|avg_events_per_year|
+----------+----+------------+-------------------+
|  asteroid|2023|         686|             504.75|
|  asteroid|2024|         693|             504.75|
|  asteroid|2022|         614|             504.75|
|  asteroid|2025|          26|             504.75|
|    meteor|2025|          32|              510.0|
|    meteor|2024|         692|              510.0|
|    meteor|2023|         674|              510.0|
|    meteor|2022|         642|              510.0|
|     comet|2022|         648|              514.0|
|     comet|2023|         667|              514.0|
|     comet|2024|         699|              514.0|
|     lunar|2023|         597|              468.5|
|     lunar|2022|         624|              468.5|
|     solar|2022|         674|             502.75|
|     solar|2024|         645|             502.75|
|     lunar|2024|         624|              468.5|
|     comet|2025|          42| 

3. Implementar un join utilizando broadcast para relacionar un dataset de eventos con uno de ubicaciones pequeñas.

In [0]:
#obtener catálogo de eventos
event_types_catalog = df.select("event_type").distinct().withColumn("event_type_id", monotonically_increasing_id())
event_types_catalog.show()


+----------+-------------+
|event_type|event_type_id|
+----------+-------------+
|  asteroid|            0|
|    meteor|            1|
|     solar|            2|
|     lunar|            3|
|     comet|            4|
+----------+-------------+



In [0]:
# Realizar un broadcast join, cruzar los eventos con el catalogo para obtener el ID
result_df = event_types_catalog.join(
    broadcast(df),  # Indicar que locations_df debe ser transmitido
    on="event_type",         # Clave de unión
    how="inner"               # Tipo de join
)

# Mostrar resultados
result_df.show()

+----------+-------------+--------------------+--------------------+--------+--------------------+
|event_type|event_type_id|             details|            event_id|location|           timestamp|
+----------+-------------+--------------------+--------------------+--------+--------------------+
|  asteroid|            0|Event description...|e7f55a9d-3dc7-4c6...|    Asia|2023-05-02 02:24:...|
|  asteroid|            0|Event description...|f614697d-169c-43c...|    Asia|2022-03-31 02:24:...|
|  asteroid|            0|Event description...|172325ed-0524-42c...|    Asia|2023-03-07 02:24:...|
|  asteroid|            0|Event description...|979db314-b3ba-484...|    Asia|2024-01-26 02:24:...|
|  asteroid|            0|Event description...|92684258-8492-4be...|    Asia|2023-12-03 02:24:...|
|  asteroid|            0|Event description...|697a5d63-b281-4a9...|    Asia|2023-12-12 02:24:...|
|  asteroid|            0|Event description...|de979cca-135c-4a4...|    Asia|2024-03-20 02:24:...|
|  asteroi

4. Escribir los datos procesados en formato Parquet con particionamiento por año y ubicación.

In [0]:
    # save in parquet format
    result_df.write.mode("overwrite") \
        .partitionBy("timestamp") \
        .format("delta") \
        .option("mergeSchema", "true")\
        .saveAsTable("default.lunar_events_id")


5. Leer un CSV con datos corruptos y manejar los errores

In [0]:
# necesito un bucket para cargarlo y la autenticación hacia el bucket

6. Calcular la duración promedio de eventos por tipo

In [0]:
# no veo como obtener cuanto duro cada evento dado que solo viene el datetime del evento,

7. Diseñar un pipeline que procese datos de manera incremental.


**1. Leer los datos**

- Crear una sesión de Spark
- Definir las rutas de entrada y salida apuntando a buckets, ejempo AWS

  - input_path = ""
  - output_path = ""

**Cargar los datos que ya han sido procesados**

processed_data = spark.read.parquet(output_path)

**Obtener la última fecha de los datos procesados**

last_processed_timestamp = processed_data.agg(max("timestamp")).collect()[0][0]

**Leer los nuevos datos que han llegado desde la última ejecución**

new_data = spark.read.parquet(input_path).filter(col("timestamp") > last_processed_timestamp)

**2.Transformación de Datos**

transformed_data = ...

**3.Escribir los nuevos datos**

transformed_data.write.mode("append").parquet(output_path)

**4. Guardar fecha de actualización del registro**

last_processed_timestamp = ...

**5. Automatización**

Usar Apache Airflow o Databricks Workflows

### 8. Ajustar configuraciones de Spark para manejar errores de memoria en el procesamiento de 50 GB.


In [0]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("DataPipeline") \
    .config("spark.executor.memory", "8g") \
    .config("spark.driver.memory", "8g") \
    .config("spark.sql.shuffle.partitions", "200") \
    .config("spark.sql.files.maxPartitionBytes", "134217728") \
    .config("spark.executor.memoryOverhead", "2g") \
    .config("spark.executor.instances", "10") \
    .config("spark.executor.cores", "4") \
    .config("spark.executor.extraJavaOptions", "-XX:+UseG1GC") \
    .getOrCreate()


### 9. Comparar el uso de cache vs persist en una operación de agregación

In [0]:
#cache
# Supón que ya tienes el DataFrame 'df' con las ventas
df_with_year.cache()
df_agg = df_with_year.groupBy("event_type", "year") .agg(count("*").alias("events_count"))
df_agg.show()


+----------+----+------------+
|event_type|year|events_count|
+----------+----+------------+
|  asteroid|2023|         686|
|  asteroid|2024|         693|
|  asteroid|2022|         614|
|  asteroid|2025|          26|
|    meteor|2025|          32|
|    meteor|2024|         692|
|    meteor|2023|         674|
|    meteor|2022|         642|
|     comet|2022|         648|
|     comet|2023|         667|
|     comet|2024|         699|
|     lunar|2022|         624|
|     solar|2022|         674|
|     solar|2023|         659|
|     lunar|2023|         597|
|     solar|2024|         645|
|     lunar|2024|         624|
|     comet|2025|          42|
|     lunar|2025|          29|
|     solar|2025|          33|
+----------+----+------------+



In [0]:
#pesistente 
from pyspark import StorageLevel
df_with_year.persist(StorageLevel.MEMORY_AND_DISK)
df_agg = df_with_year.groupBy("event_type", "year") .agg(count("*").alias("events_count"))
df_agg.show()


+----------+----+------------+
|event_type|year|events_count|
+----------+----+------------+
|  asteroid|2023|         686|
|  asteroid|2024|         693|
|  asteroid|2022|         614|
|  asteroid|2025|          26|
|    meteor|2025|          32|
|    meteor|2024|         692|
|    meteor|2023|         674|
|    meteor|2022|         642|
|     comet|2022|         648|
|     comet|2023|         667|
|     comet|2024|         699|
|     lunar|2022|         624|
|     solar|2022|         674|
|     solar|2023|         659|
|     lunar|2023|         597|
|     solar|2024|         645|
|     lunar|2024|         624|
|     comet|2025|          42|
|     lunar|2025|          29|
|     solar|2025|          33|
+----------+----+------------+



### 10. Optimizar un flujo PySpark con particionamiento adecuado.

In [0]:
# Reparticionar los datos
num_partitions = sc.defaultParallelism  # Usa el número de núcleos disponibles
# Reparticionar el DataFrame
df_repartitioned = df_with_year.repartition(num_partitions)

# particionar por una columa
df_repartitioned = df_with_year.repartition("year")


### SQL y Modelado de Datos

**1. Escribir una consulta para obtener eventos lunares por ubicación:**

In [0]:
%sql
SELECT location, COUNT(*) AS event_count
FROM astronomical_events
WHERE event_type = 'lunar'
GROUP BY location;

location,event_count
South_America,303
North_America,337
Oceania,301
Europe,309
Africa,310
Asia,314


**2. Identificar problemas en el siguiente esquema estrella y proponer mejoras:**

   ```
   fact_events (event_id, event_type_id, timestamp_id, location_id, details)

   dim_event_type (event_type_id, event_name)

   dim_timestamp (timestamp_id, year, month, day)
   
   dim_location (location_id, location_name, coordinates)
   ```

  1. En la tabla fact_events detail puede ser una campo con muchos valores o texto muy grande o json o xml, se recomienda separar de la FACT por rendimiento y almacenamiento y crear una tabla de **details**

  2. La dimension dim_timestamp solo tiene year, month y day, se recomienda que tenga la el timestamp del evento, semana, trimestre, semestre y temporada para poder generar mas consultas agrupadas por diferentes rubros.

  3. dim_location parece que puede estar redundante en las corrdenadas, ya que se repiten las coordenadas en los eventos, sería conveniente hacer otra tabla de dim_coordinates

### 

3. Escribir una consulta para calcular el rango de eventos entre años.


**3. Escribir una consulta para calcular el rango de eventos entre años.**

In [0]:
%sql
SELECT 
    year(timestamp),
    count(event_id) events_yearly
FROM 
    astronomical_events 
GROUP BY 
    year(timestamp)
ORDER BY 
    year(timestamp);


year(timestamp),events_yearly
2022,3202
2023,3283
2024,3353
2025,162


**4. Crear índices para mejorar el rendimiento de consultas en tablas dimensionales.**

La actividad indica Diseñar un modelo dimensional, no construí el esquema del modelo en una BD Anlitica, ni en el Catalog Unity de Databricks, aquí no aplican los indices sino particiones


In [0]:
%sql
-- una opción de crear los indices sería:
CREATE INDEX idx_timestamp_id ON dim_time (timestamp_id);


**5. Diseñar una tabla que audite cambios en la tabla de hechos.**

In [0]:
%sql
CREATE TABLE default.fact_events_audit (
    audit_id STRING,
    event_id INT,
    event_type_id INT,
    timestamp_id INT,
    location_id INT,
    details STRING,
    change_type STRING, -- INSERT, UPDATE, DELETE
    old_event_type_id INT,
    old_timestamp_id INT,
    old_location_id INT,
    old_details STRING,
    changed_by STRING,
    change_timestamp TIMESTAMP,
    operation_timestamp TIMESTAMP
)
USING DELTA
LOCATION 'dbfs:/user/hive/warehouse/fact_events_audit_new';


**6. Calcular promedios de eventos por año y ubicación.**

In [0]:
%sql

SELECT
    year(timestamp),
    location,
    count(event_id) events
FROM 
    astronomical_events 
GROUP BY 
    year(timestamp), location
ORDER BY 
    year(timestamp), location;




year(timestamp),location,events
2022,Africa,540
2022,Asia,522
2022,Europe,533
2022,North_America,514
2022,Oceania,577
2022,South_America,516
2023,Africa,559
2023,Asia,576
2023,Europe,547
2023,North_America,533


**7. Relacionar dimensiones de tiempo y ubicación en un esquema Snowflake.**

EL esquema snowflake requiere que unas dimensiones se disgreguen en varias dimensiones, ejemplo dim_tiempo, quedaria en:

dim_day -> que a su vez esta relacionada con dim_year y dim_month

**8. Diseñar una consulta que identifique picos de eventos lunares por trimestre.**

In [0]:
En general como no se construyo el esquema ejemplo: DW_LUNAR_EVENTS hay que contruir en este momento con queries

In [0]:
%sql
WITH event_counts_per_quarter AS (
    SELECT
        quarter(timestamp) as quarter_name,
        year(timestamp) as year,
        COUNT(event_id) AS event_count
    FROM
        astronomical_events
    WHERE event_type = 'lunar'  -- Asumiendo que 'Lunar Event' es el tipo de evento lunar
    GROUP BY
      quarter(timestamp),
      year(timestamp)
),
quarter_avg AS (
    SELECT
        year,
        AVG(event_count) AS avg_event_count
    FROM
        event_counts_per_quarter
    GROUP BY
        year
)
SELECT
    eq.year,
    eq.quarter_name,
    eq.event_count,
    qa.avg_event_count,
    CASE
        WHEN eq.event_count > qa.avg_event_count * 1.5 THEN 'Peak'  -- Identifica picos como eventos más de 1.5 veces el promedio
        ELSE 'Normal'
    END AS event_peak_status
FROM
    event_counts_per_quarter eq
JOIN
    quarter_avg qa ON eq.year = qa.year
ORDER BY
    eq.year,
    CASE
        WHEN eq.event_count > qa.avg_event_count * 1.5 THEN 1
        ELSE 2
    END;


year,quarter_name,event_count,avg_event_count,event_peak_status
2022,2,173,156.0,Normal
2022,3,168,156.0,Normal
2022,1,117,156.0,Normal
2022,4,166,156.0,Normal
2023,2,160,149.25,Normal
2023,1,146,149.25,Normal
2023,3,159,149.25,Normal
2023,4,132,149.25,Normal
2024,3,168,156.0,Normal
2024,1,153,156.0,Normal


**9. Ajustar una consulta SQL para evitar escaneos completos de tabla.**

In [0]:
%sql
WITH filtered_events AS (
    SELECT
        event_id,
        timestamp
    FROM
        astronomical_events
    WHERE event_type = 'lunar'  -- Filtrar primero por el tipo de evento lunar
),
event_counts_per_quarter AS (
    SELECT
        EXTRACT(QUARTER FROM timestamp) AS quarter_name,  -- Usar EXTRACT para evitar la función 'quarter()' costosa
        EXTRACT(YEAR FROM timestamp) AS year,  -- Usar EXTRACT para evitar la función 'year()' costosa
        COUNT(event_id) AS event_count
    FROM
        filtered_events
    GROUP BY
        EXTRACT(QUARTER FROM timestamp),
        EXTRACT(YEAR FROM timestamp)
),
quarter_avg AS (
    SELECT
        year,
        AVG(event_count) AS avg_event_count
    FROM
        event_counts_per_quarter
    GROUP BY
        year
)
SELECT
    eq.year,
    eq.quarter_name,
    eq.event_count,
    qa.avg_event_count,
    CASE
        WHEN eq.event_count > qa.avg_event_count * 1.5 THEN 'Peak'  -- Identifica picos como eventos más de 1.5 veces el promedio
        ELSE 'Normal'
    END AS event_peak_status
FROM
    event_counts_per_quarter eq
JOIN
    quarter_avg qa ON eq.year = qa.year
ORDER BY
    eq.year,
    CASE
        WHEN eq.event_count > qa.avg_event_count * 1.5 THEN 1
        ELSE 2
    END;


year,quarter_name,event_count,avg_event_count,event_peak_status
2022,2,173,156.0,Normal
2022,3,168,156.0,Normal
2022,1,117,156.0,Normal
2022,4,166,156.0,Normal
2023,2,160,149.25,Normal
2023,1,146,149.25,Normal
2023,3,159,149.25,Normal
2023,4,132,149.25,Normal
2024,3,168,156.0,Normal
2024,1,153,156.0,Normal


**10. Proponer una estrategia para manejar datos históricos en un esquema dimensional.**

1. Si las dimensiones no se necesitan controlar, se puede realizar la actualizacion direca en los dimensiones

2. Si se necesita mantener el hisótico de los cambios en las dimensiones, se crean nuevos registros con una nueva clave, indicando fecha inicio y fin de ese nuevo valor

3. En las tablas de hechos se debe agregar un campo para saber si un valor está activo o no.

4. Fechas de vigencia en las tablas de hechos, si está activo o no, si está actualziado.

**Parte 5: Capacidad de Entender el Problema de Negocio**

Situación 1:

Un cliente interno tiene la hipótesis de que conocer la ruta de la luna podría incrementar las ventas. Tu tarea es:

1. Diseñar un flujo de análisis que permita relacionar eventos lunares con tendencias de ventas.

2. Proponer métricas clave que respalden o refuten la hipótesis.

Fase 1: Recopilar  datos de Ventas

Obtener los datos históricos de venta que tengan el campo timestamp para poder empatare los datos de ventas con eventos lunares.

La venta debe tener, producto, categoria, dia, hora.

Fase 2: Preprocesamiento y limpieza de datos

integrar los datos de los evebtos lunares con las ventas en una tabla de tal manera que queden las ventas y los eventos lunares usando los datos de las ventas y de los eventos lunares como variables (columnas).

Fase 3: Análisis exploratorio de datos (EDA)

Matriz de correlacón: Buscar la correlación de las variables, fecha, evento lunar y venta usando matriz de correlación de pearson o spearman,

Gráfica de tendencias: Graficar por día en el eje X y graficar sobre el eje Y los eventos lunares y los montos de venta para buscar un patron.

Segmentar datos:
En grupos por semanas antes y despues del evento de lunar, para revisar si el evento genera un efecto en las ventas.

Fase 4: Modelos de predicción

Usar moelos de serie de tiempo

Tablas:

Incluir tablas de resumen que muestren las medias de ventas en las diferentes fases lunares.

Informe de hallazgos:

Resumir los hallazgos, explicando si existe o no una correlación significativa entre los eventos lunares y las ventas.

Situación 2:

Otro cliente interno menciona que conocer la altura de la marea en base a eventos lunares ayudará a reducir el riesgo de operaciones comerciales en la playa. Tu tarea es:

1. Diseñar un flujo para predecir la altura de la marea usando eventos lunares.
2. Proponer un reporte para alertar de riesgos potenciales basados en la altura de la marea.


Fase 1: Recopilar datos de la marea

Obtener los datos históricos de la marea  para poder empatarse con los datos de eventos lunares.

Obtener datos

La marea debe tener, marea_id, timestamp, altura_marea, ubicacion_playa, evento_lunar_id.

Fase 2: Preprocesamiento y limpieza de datos

integrar los datos de los eventos lunares con las mareas en una tabla de tal manera que queden las mareas y los eventos lunares usando los datos de las mareas y de los eventos lunares como variables (columnas).

Fase 3: Análisis exploratorio de datos (EDA)

Matriz de correlacón: Buscar la correlación de las variables, fecha, evento lunar y altura de la marea usando matriz de correlación de pearson o spearman,

Gráfica de tendencias: Graficar por día en el eje X y graficar sobre el eje Y los eventos lunares y las alturas de la marea para buscar un patron.

Etiquetar datos:
Etiquetar cada registro para saber si hay un evento lunar y se presenta marea alta, como True o False

Fase 4: Modelos de predicción

Usar un modelo supervisado para entrenar con base a los datos etiquetados y después poder predecir con base al entrenamiento para los nuevos datos, se puede usar: Regresión Logística, Árboles de Decisión, SVM, K-NN, Naive Bayes

Informe de hallazgos:

Resumir los hallazgos, explicando si es posible predecir y con que porcentaje de exactitud si es posbile usar un modelo de Machine Learning para predecir las alturas de la marea con base a eventos lunares. 

****

### 