In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import os
spark = SparkSession.builder \
    .appName('Homogenizar datos') \
    .getOrCreate()

sc = spark.sparkContext

In [2]:
## Cambiar la ruta de los datos

In [149]:
df_bar = spark.read.option("delimiter", ",").option("header", True).csv("../../../data/raw/data_aemet/estacion_0201X_data.csv")
df_mad = spark.read.option("delimiter", ",").option("header", True).csv("../../../data/raw/data_aemet/estacion_3195_data.csv")

In [150]:
df_bar = df_bar.withColumnRenamed("tmed", "temperatura_media") \
       .withColumnRenamed("hrMedia", "humedad_media") \
       .withColumnRenamed("tmax", "temperatura_maxima") \
       .withColumnRenamed("tmin", "temperatura_minima")
df_mad = df_mad.withColumnRenamed("tmed", "temperatura_media") \
       .withColumnRenamed("hrMedia", "humedad_media") \
       .withColumnRenamed("tmax", "temperatura_maxima") \
       .withColumnRenamed("tmin", "temperatura_minima")

In [151]:
columnas = ["temperatura_media", "humedad_media", "temperatura_maxima", "temperatura_minima"]
#dataframes_bar = ["df_temp_media_bar", "df_humedad_bar", "df_temp_maxima_bar", "df_temp_minima_bar"]
#dataframes_mad = ["df_temp_media_mad","df_humedad_mad","df_temp_maxima_mad","df_temp_minima_mad"]
#dataframes_hoyo = ["df_temp_media_hoyo","df_humedad_hoyo","df_temp_maxima_hoyo","df_temp_minima_hoyo"]

In [152]:
for col_name in columnas:
    df_bar = df_bar.withColumn(f"{col_name}", F.regexp_replace(F.col(f"{col_name}"), ",", "."))
    df_bar = df_bar.withColumn(f"{col_name}", F.col(f"{col_name}").cast("float"))
    df_mad = df_mad.withColumn(f"{col_name}", F.regexp_replace(F.col(f"{col_name}"), ",", "."))
    df_mad = df_mad.withColumn(f"{col_name}", F.col(f"{col_name}").cast("float"))

In [153]:
df_bar = df_bar.withColumn("fecha", F.to_date(df_bar["fecha"], "yyyy-MM-dd"))
df_mad = df_mad.withColumn("fecha", F.to_date(df_mad["fecha"], "yyyy-MM-dd"))

In [154]:
df_bar = df_bar.withColumn("anio", F.year(df_bar["fecha"])) \
        .withColumn("mes", F.month(df_bar["fecha"]))
df_mad = df_mad.withColumn("anio", F.year(df_mad["fecha"])) \
        .withColumn("mes", F.month(df_mad["fecha"]))

In [155]:
df_temp_media_bar = df_bar.select(F.col("fecha"), F.col("indicativo"), F.col("anio"), F.col("mes"), F.col("temperatura_media"))
df_humedad_bar = df_bar.select(F.col("fecha"), F.col("indicativo"), F.col("anio"), F.col("mes"), F.col("humedad_media"))
df_temp_maxima_bar = df_bar.select(F.col("fecha"), F.col("indicativo"), F.col("anio"), F.col("mes"), F.col("temperatura_maxima"))
df_temp_minima_bar = df_bar.select(F.col("fecha"), F.col("indicativo"), F.col("anio"), F.col("mes"), F.col("temperatura_minima"))

df_temp_media_mad = df_mad.select(F.col("fecha"), F.col("indicativo"), F.col("anio"), F.col("mes"), F.col("temperatura_media"))
df_humedad_mad = df_mad.select(F.col("fecha"), F.col("indicativo"), F.col("anio"), F.col("mes"), F.col("humedad_media"))
df_temp_maxima_mad = df_mad.select(F.col("fecha"), F.col("indicativo"), F.col("anio"), F.col("mes"), F.col("temperatura_maxima"))
df_temp_minima_mad = df_mad.select(F.col("fecha"), F.col("indicativo"), F.col("anio"), F.col("mes"), F.col("temperatura_minima"))

In [159]:
df_humedad_mad.dtypes

[('fecha', 'date'),
 ('indicativo', 'string'),
 ('anio', 'int'),
 ('mes', 'int'),
 ('tipo_dato', 'string'),
 ('value', 'float')]

In [157]:
df_temp_media_bar = df_temp_media_bar.withColumn("tipo_dato", F.lit("temperatura_media")) \
                                     .withColumn("value", df_temp_media_bar["temperatura_media"]) \
                                     .drop("temperatura_media")
df_humedad_bar = df_humedad_bar.withColumn("tipo_dato", F.lit("humedad_media")) \
                                     .withColumn("value", df_humedad_bar["humedad_media"]) \
                                     .drop("humedad_media")
df_temp_maxima_bar = df_temp_maxima_bar.withColumn("tipo_dato", F.lit("temperatura_maxima")) \
                                     .withColumn("value", df_temp_maxima_bar["temperatura_maxima"]) \
                                     .drop("temperatura_maxima")
df_temp_minima_bar = df_temp_minima_bar.withColumn("tipo_dato", F.lit("temperatura_minima")) \
                                     .withColumn("value", df_temp_minima_bar["temperatura_minima"]) \
                                     .drop("temperatura_minima")

df_temp_media_mad = df_temp_media_mad.withColumn("tipo_dato", F.lit("temperatura_media")) \
                                     .withColumn("value", df_temp_media_mad["temperatura_media"]) \
                                     .drop("temperatura_media")
df_humedad_mad = df_humedad_mad.withColumn("tipo_dato", F.lit("humedad_media")) \
                                     .withColumn("value", df_humedad_mad["humedad_media"]) \
                                     .drop("humedad_media")
df_temp_maxima_mad = df_temp_maxima_mad.withColumn("tipo_dato", F.lit("temperatura_maxima")) \
                                     .withColumn("value", df_temp_maxima_mad["temperatura_maxima"]) \
                                     .drop("temperatura_maxima")
df_temp_minima_mad = df_temp_minima_mad.withColumn("tipo_dato", F.lit("temperatura_minima")) \
                                     .withColumn("value", df_temp_minima_mad["temperatura_minima"]) \
                                     .drop("temperatura_minima")

In [160]:
df_bar_total = df_temp_media_bar.unionByName(df_humedad_bar).unionByName(df_temp_maxima_bar).unionByName(df_temp_minima_bar)
df_mad_total = df_temp_media_mad.unionByName(df_humedad_mad).unionByName(df_temp_maxima_mad).unionByName(df_temp_minima_mad)

df_completo = df_bar_total.unionByName(df_mad_total)

In [166]:
df_mad_total.count()

8932

In [168]:
df_completo.write.format("parquet").partitionBy("indicativo","tipo_dato","anio","mes").mode("overwrite").save("../../../data/homogenized/diarios/")
print("finished")

finished
