In [None]:
## Usar Spark para procesar el dataset limpio de terremotos, realizar agregaciones escalables y generar datasets finales optimizados para análisis y visualización.


import os   
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, year, month  
from pyspark import SparkContext

os.environ["HADOOP_HOME"] = r"C:\hadoop"
os.environ["PATH"] += r";C:\hadoop\bin"

## Se utiliza Spark para simular un entorno distribuido, preparado para escalar a volúmenes mayores.
# Inicializar SparkSession
spark = SparkSession \
      .builder \
      .appName("Earthquake ETL Spark") \
      .config("spark.sql.shuffle.partitions", "8") \
      .getOrCreate()

# Cargar dataset limpio
input_path = '../data/processed/earthquakes_clean.csv'  
df = spark.read.csv(input_path, header=True, inferSchema=True)
print("Esquema del DataFrame cargado:")     
df.printSchema()
print("Primeras 20 filas del DataFrame cargado:")
df.show(20, truncate=False) 


In [None]:
# Agregar datos por año y contar terremotos
## Esta agregación permite analizar la evolución temporal de la actividad sísmica.

yearly_counts = df.groupBy(col("year").alias("year")) \
    .count() \
    .orderBy("year")
 
print("Conteo anual de terremotos:")
yearly_counts.show()


In [None]:
# magnitud media por magnitude_category

avg_mag_by_type = df.groupBy("magnitude_category") \
    .avg("mag") \
    .withColumnRenamed("avg(mag)", "avg_magnitude") \
    .orderBy("magnitude_category")
print("Magnitud media por categoria de magnitud:")

avg_mag_by_type.show()

In [None]:
#correlacion entre depth y magnitud
## Analizar la relación entre la profundidad del terremoto y su magnitud.
correlation = df.stat.corr("depth", "mag")
print(f"Correlación entre profundidad y magnitud: {correlation}")   


In [None]:
## guardar los datasets agregados en parquet usando mode("overwrite")
os.environ["HADOOP_HOME"] = r"C:\hadoop"
os.environ["PATH"] = os.environ["PATH"] + ";" + r"C:\hadoop\bin"

avg_mag_by_type.write.mode("overwrite").parquet("../data/processed/spark/avg_mag_by_type.parquet")
yearly_counts.write.mode("overwrite").parquet("../data/processed/spark/yearly_counts.parquet")

print("Datasets agregados guardados en formato Parquet en la carpeta 'data/processed/spark'")


In [None]:
import folium
from folium.plugins import MarkerCluster

# Cargar dataset limpio
input_path = '../data/processed/earthquakes_clean.csv'  
df = pd.read_csv(input_path)

# Filtrar datos para el primer trimestre de 2025
df['time'] = pd.to_datetime(df['time'])
df_q1_2025 = df[(df['time'].dt.year == 2025) & (df['time'].dt.month.isin([1, 2, 3]))]

# Crear mapa centrado en la media de las coordenadas
map_center = [df_q1_2025['latitude'].mean(), df_q1_2025['longitude'].mean()]
m = folium.Map(location=map_center, zoom_start=2)

# Agregar marcadores con clustering
marker_cluster = MarkerCluster().add_to(m)  
for idx, row in df_q1_2025.iterrows():
    popup_text = f"Place: {row['place']}<br>Magnitude: {row['mag']}"
    folium.Marker(
        location=[row['latitude'], row['longitude']],
        popup=popup_text
    ).add_to(marker_cluster)

# Guardar mapa en archivo HTML
output_map_path = '../data/processed/earthquakes_q1_2025_map.html'  
m.save(output_map_path)
print(f'Mapa de terremotos del primer trimestre de 2025 guardado en: {os.path.abspath(output_map_path)}')   
