In [1]:
from pyspark import SparkContext

sc = SparkContext.getOrCreate()
sc.stop()

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("MeteorologiaPySpark").getOrCreate()

import pyspark.sql.functions as F
from pyspark.sql.functions import to_timestamp
from pyspark.sql.window import Window



In [2]:
df = spark.read.csv("historico_provincias_espana.csv",header=True, inferSchema=True, sep=",")

In [3]:
df.printSchema()
df.toPandas()

root
 |-- time: date (nullable = true)
 |-- temperature_2m_max: double (nullable = true)
 |-- temperature_2m_min: double (nullable = true)
 |-- temperature_2m_mean: double (nullable = true)
 |-- precipitation_sum: double (nullable = true)
 |-- windspeed_10m_max: double (nullable = true)
 |-- date: date (nullable = true)
 |-- provincia: string (nullable = true)



Unnamed: 0,time,temperature_2m_max,temperature_2m_min,temperature_2m_mean,precipitation_sum,windspeed_10m_max,date,provincia
0,2013-01-01,12.3,9.3,10.8,1.2,19.8,2013-01-01,A Coruña
1,2013-01-02,12.4,6.0,9.1,0.0,20.4,2013-01-02,A Coruña
2,2013-01-03,16.1,6.4,10.1,0.0,17.1,2013-01-03,A Coruña
3,2013-01-04,15.2,5.0,9.0,0.0,11.3,2013-01-04,A Coruña
4,2013-01-05,14.2,4.1,8.2,0.0,9.7,2013-01-05,A Coruña
...,...,...,...,...,...,...,...,...
235230,2025-02-21,15.0,7.4,10.7,11.0,30.8,2025-02-21,Zamora
235231,2025-02-22,11.9,3.4,7.5,0.1,14.9,2025-02-22,Zamora
235232,2025-02-23,13.8,-0.6,5.8,0.0,6.2,2025-02-23,Zamora
235233,2025-02-24,5.4,4.6,,,10.3,2025-02-24,Zamora


In [4]:
df.count()



235235

In [5]:
duplicados = (df.groupBy(df.columns).count().filter(F.col("count") > 1))
duplicados.show()

+----------+------------------+------------------+-------------------+-----------------+-----------------+----------+---------+-----+
|      time|temperature_2m_max|temperature_2m_min|temperature_2m_mean|precipitation_sum|windspeed_10m_max|      date|provincia|count|
+----------+------------------+------------------+-------------------+-----------------+-----------------+----------+---------+-----+
|2013-01-23|               6.1|              -0.7|                2.4|              0.3|             26.5|2013-01-23|   Zamora|    2|
|2013-09-10|              25.7|              13.1|               19.2|              0.0|             20.7|2013-09-10|   Zamora|    2|
|2013-10-25|              16.9|              11.7|               13.5|             16.6|             15.7|2013-10-25|   Zamora|    2|
|2014-08-26|              27.7|              16.7|               22.2|              0.0|             24.9|2014-08-26|   Zamora|    2|
|2016-06-24|              27.0|              15.0|            

In [6]:
df = df.dropDuplicates()

In [7]:
df.count()

230796

In [8]:
df.toPandas()

Unnamed: 0,time,temperature_2m_max,temperature_2m_min,temperature_2m_mean,precipitation_sum,windspeed_10m_max,date,provincia
0,2014-02-18,11.5,6.9,9.0,1.5,17.1,2014-02-18,A Coruña
1,2014-07-25,23.5,17.0,20.0,0.0,26.0,2014-07-25,A Coruña
2,2015-02-22,12.5,7.1,9.9,1.4,25.5,2015-02-22,A Coruña
3,2015-05-25,19.7,12.5,15.5,0.0,32.2,2015-05-25,A Coruña
4,2016-07-01,19.6,14.8,17.2,0.5,16.6,2016-07-01,A Coruña
...,...,...,...,...,...,...,...,...
230791,2017-05-18,17.9,7.5,12.8,6.0,24.6,2017-05-18,Teruel
230792,2017-06-30,19.8,9.2,15.1,0.2,19.6,2017-06-30,Teruel
230793,2018-12-19,9.7,0.2,5.6,0.0,14.4,2018-12-19,Teruel
230794,2023-12-26,12.0,-2.6,3.5,0.0,5.7,2023-12-26,Teruel


In [9]:
df = df.drop("time")
df.toPandas()

Unnamed: 0,temperature_2m_max,temperature_2m_min,temperature_2m_mean,precipitation_sum,windspeed_10m_max,date,provincia
0,11.5,6.9,9.0,1.5,17.1,2014-02-18,A Coruña
1,23.5,17.0,20.0,0.0,26.0,2014-07-25,A Coruña
2,12.5,7.1,9.9,1.4,25.5,2015-02-22,A Coruña
3,19.7,12.5,15.5,0.0,32.2,2015-05-25,A Coruña
4,19.6,14.8,17.2,0.5,16.6,2016-07-01,A Coruña
...,...,...,...,...,...,...,...
230791,17.9,7.5,12.8,6.0,24.6,2017-05-18,Teruel
230792,19.8,9.2,15.1,0.2,19.6,2017-06-30,Teruel
230793,9.7,0.2,5.6,0.0,14.4,2018-12-19,Teruel
230794,12.0,-2.6,3.5,0.0,5.7,2023-12-26,Teruel


In [10]:
df = df.orderBy("date")
df.toPandas()

Unnamed: 0,temperature_2m_max,temperature_2m_min,temperature_2m_mean,precipitation_sum,windspeed_10m_max,date,provincia
0,10.0,4.9,7.2,4.9,20.7,2013-01-01,La Rioja
1,14.0,10.9,12.6,0.0,21.0,2013-01-01,Baleares
2,15.0,3.2,8.7,0.0,17.1,2013-01-01,Murcia
3,9.0,2.2,5.7,0.0,17.6,2013-01-01,León
4,13.2,1.9,6.6,0.0,17.1,2013-01-01,Lleida
...,...,...,...,...,...,...,...
230791,,,,,,2025-02-25,Álava
230792,,,,,,2025-02-25,Sevilla
230793,,,,,,2025-02-25,Pontevedra
230794,,,,,,2025-02-25,Zaragoza


In [11]:
cols = df.columns
cols.remove("date")
df = df.select(["date"] + cols)
df.show(5)

+----------+------------------+------------------+-------------------+-----------------+-----------------+---------+
|      date|temperature_2m_max|temperature_2m_min|temperature_2m_mean|precipitation_sum|windspeed_10m_max|provincia|
+----------+------------------+------------------+-------------------+-----------------+-----------------+---------+
|2013-01-01|              15.0|               3.2|                8.7|              0.0|             17.1|   Murcia|
|2013-01-01|              15.4|               6.3|               10.0|              0.0|              9.8|Castellón|
|2013-01-01|               8.5|               1.9|                5.1|              1.8|             21.4| Albacete|
|2013-01-01|               9.8|               2.8|                6.6|              0.2|             17.3|   Girona|
|2013-01-01|               9.0|               2.2|                5.7|              0.0|             17.6|     León|
+----------+------------------+------------------+--------------

In [12]:
nulos_por_columna = {c: df.filter(F.col(c).isNull()).count() for c in df.columns}
nulos_por_columna

{'date': 0,
 'temperature_2m_max': 52,
 'temperature_2m_min': 52,
 'temperature_2m_mean': 104,
 'precipitation_sum': 104,
 'windspeed_10m_max': 52,
 'provincia': 0}

In [13]:
df = df.withColumn("date", to_timestamp("date"))
df = df.orderBy("provincia", "date")

In [14]:
df.toPandas()

Unnamed: 0,date,temperature_2m_max,temperature_2m_min,temperature_2m_mean,precipitation_sum,windspeed_10m_max,provincia
0,2013-01-01,12.3,9.3,10.8,1.2,19.8,A Coruña
1,2013-01-02,12.4,6.0,9.1,0.0,20.4,A Coruña
2,2013-01-03,16.1,6.4,10.1,0.0,17.1,A Coruña
3,2013-01-04,15.2,5.0,9.0,0.0,11.3,A Coruña
4,2013-01-05,14.2,4.1,8.2,0.0,9.7,A Coruña
...,...,...,...,...,...,...,...
230791,2025-02-20,15.6,1.5,7.5,0.0,7.1,Ávila
230792,2025-02-21,11.1,4.2,7.3,5.1,28.2,Ávila
230793,2025-02-22,8.9,1.6,4.9,0.4,13.4,Ávila
230794,2025-02-23,1.4,0.9,,,4.6,Ávila


In [15]:
w_ffill = Window.partitionBy("provincia").orderBy("date").rowsBetween(Window.unboundedPreceding, 0)
w_bfill = Window.partitionBy("provincia").orderBy("date").rowsBetween(0, Window.unboundedFollowing)

cols_to_fill = [col for col in df.columns if col not in ["provincia", "date"]]


In [16]:
for col in cols_to_fill:
    df = df.withColumn(
        col,
        F.coalesce(
            F.last(F.col(col), ignorenulls=True).over(w_ffill),
            F.first(F.col(col), ignorenulls=True).over(w_bfill)
        )
    )

df.toPandas()

Unnamed: 0,date,temperature_2m_max,temperature_2m_min,temperature_2m_mean,precipitation_sum,windspeed_10m_max,provincia
0,2013-01-01,8.5,1.9,5.1,1.8,21.4,Albacete
1,2013-01-02,10.1,1.8,5.0,0.0,22.7,Albacete
2,2013-01-03,9.6,-0.9,3.5,0.0,13.4,Albacete
3,2013-01-04,10.4,-1.4,3.7,0.0,9.8,Albacete
4,2013-01-05,12.5,-1.2,4.4,0.0,17.2,Albacete
...,...,...,...,...,...,...,...
230791,2025-02-21,15.8,5.4,9.9,10.9,24.0,Valladolid
230792,2025-02-22,11.6,4.3,7.6,0.5,20.4,Valladolid
230793,2025-02-23,13.0,0.7,6.0,0.0,6.1,Valladolid
230794,2025-02-24,5.2,4.3,6.0,0.0,7.6,Valladolid


In [17]:
nulos_por_columna_2 = {c: df.filter(F.col(c).isNull()).count() for c in df.columns}
nulos_por_columna_2

{'date': 0,
 'temperature_2m_max': 0,
 'temperature_2m_min': 0,
 'temperature_2m_mean': 0,
 'precipitation_sum': 0,
 'windspeed_10m_max': 0,
 'provincia': 0}

In [18]:
df.coalesce(1).write.mode("overwrite").option("header", "true").csv("data_transf_open_meteo_pyspark.csv")


In [None]:
df.write.parquet("transformed_openmeteo.parquet")