In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import requests
#Crear sesión de Spark
spark = SparkSession \
    .builder \
    .appName("Weather_ETL_Pipeline") \
    .config("spark.driver.extraJavaOptions", "-Djava.library.path=C:\\hadoop\\bin") \
    .config("spark.executor.extraJavaOptions", "-Djava.library.path=C:\\hadoop\\bin") \
    .getOrCreate()

In [None]:
# Obtener datos meteorológicos de la API
url = "https://api.open-meteo.com/v1/forecast?latitude=18.9261&longitude=-99.2308&hourly=temperature_2m,relative_humidity_2m,wind_speed_10m"
response = requests.get(url)
data = response.json()


In [None]:
# Procesar datos horarios y crear DataFrame de Spark
hourly = data['hourly']
rows = list(zip(
    hourly['time'],
    hourly['temperature_2m'],
    hourly['relative_humidity_2m'],
    hourly['wind_speed_10m']

))
hourly_data = spark.createDataFrame(rows, ['time', 'temperature_2m', 'relative_humidity_2m', 'wind_speed_10m'])
hourly_data = hourly_data.withColumn("time", F.to_timestamp(F.col("time"), "yyyy-MM-dd'T'HH:mm"))


+-------------------+--------------+--------------------+--------------+
|               time|temperature_2m|relative_humidity_2m|wind_speed_10m|
+-------------------+--------------+--------------------+--------------+
|2026-01-14 00:00:00|          20.8|                  46|           2.1|
|2026-01-14 01:00:00|          19.0|                  51|           6.6|
|2026-01-14 02:00:00|          16.7|                  56|           9.5|
|2026-01-14 03:00:00|          17.4|                  51|           8.9|
|2026-01-14 04:00:00|          17.3|                  49|           8.3|
|2026-01-14 05:00:00|          16.8|                  51|           7.1|
|2026-01-14 06:00:00|          16.0|                  53|           7.9|
|2026-01-14 07:00:00|          15.9|                  54|           8.0|
|2026-01-14 08:00:00|          15.1|                  57|           7.6|
|2026-01-14 09:00:00|          14.5|                  59|           7.2|
|2026-01-14 10:00:00|          14.3|               

In [None]:
# Calcular promedios diarios
df_temp = hourly_data.groupBy(F.to_date("time").alias("date")) \
            .agg(F.avg("temperature_2m").alias("avg_temp_daily")) \
            .orderBy("date")
df_humidity = hourly_data.groupBy(F.to_date("time").alias("date")) \
                    .agg(F.avg("relative_humidity_2m").alias("avg_humidity_daily")).orderBy("date")

In [16]:
df_temp.show()


+----------+------------------+
|      date|    avg_temp_daily|
+----------+------------------+
|2026-01-14|            18.325|
|2026-01-15|           18.4625|
|2026-01-16|16.670833333333334|
|2026-01-17|19.150000000000002|
|2026-01-18|18.212500000000002|
|2026-01-19|16.525000000000002|
|2026-01-20|17.104166666666668|
+----------+------------------+



In [17]:
df_humidity.show()

+----------+------------------+
|      date|avg_humidity_daily|
+----------+------------------+
|2026-01-14|              50.0|
|2026-01-15|             49.75|
|2026-01-16|            52.875|
|2026-01-17|              54.0|
|2026-01-18|             58.75|
|2026-01-19|              56.0|
|2026-01-20|              54.5|
+----------+------------------+



In [None]:
# Guardar resultados en archivos Parquet
df_temp_pandas = df_temp.toPandas()
df_temp_pandas.to_parquet("output/avg_temp_daily", index=False)
df_humidity_pandas = df_humidity.toPandas()
df_humidity_pandas.to_parquet("output/avg_humidity_daily", index=False)

In [None]:
spark.stop()