In [None]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

# Crear una única sesión de Spark y configurar la zona horaria local
spark = SparkSession.builder \
    .appName("Data Loader") \
    .config("spark.jars.packages", "org.postgresql:postgresql:42.5.0") \
    .config("spark.sql.session.timeZone", "America/Argentina/Buenos_Aires") \
    .getOrCreate()

def process_and_save_data(spark, table_name, renaming_dict, db_table):
    """
    Procesa y guarda los datos de una tabla en PostgreSQL.
    """
    try:
        # Leer la tabla guardada en Spark
        df_table = spark.table(table_name)
        
        # Mostrar los datos de la tabla original
        print(f"Datos de la tabla '{table_name}':")
        df_table.show()

        # Pivotear la tabla
        pivot_df = df_table.groupBy("Timestamp", "Mapped_Unit") \
            .pivot("nombre_variable") \
            .agg(F.first("Measurement_Numeric"))

        # Renombrar columnas según el diccionario proporcionado
        for old_name, new_name in renaming_dict.items():
            pivot_df = pivot_df.withColumnRenamed(old_name, new_name)

        # Eliminar la columna "Mapped_Unit"
        pivot_df = pivot_df.drop("Mapped_Unit")

        # Agregar la columna 'created_at' con la marca de tiempo actual
        pivot_df = pivot_df.withColumn("created_at", F.current_timestamp() - F.expr("INTERVAL 3 HOURS"))
        
        # Mostrar el DataFrame reestructurado
        print(f"Datos pivotados y renombrados para '{db_table}':")
        pivot_df.show(truncate=False)

        # Escribir DataFrame a PostgreSQL
        pivot_df.write \
            .format("jdbc") \
            .option("url", "") \
            .option("dbtable", db_table) \
            .option("user", "") \
            .option("password", "") \
            .mode("overwrite") \
            .save()

        print(f"Datos guardados exitosamente en '{db_table}'.\n")

    except Exception as e:
        print(f"Error al procesar la tabla '{table_name}': {e}")

# Diccionarios de nombres de columnas para cada tabla
rain_renaming = {
    "Rain accumulation": "Rain accumulation (mm)",
    "Rain duration": "Rain duration (s)",
    "Rain intensity": "Rain intensity (mm/h)"
}

wind_renaming = {
    "Wind speed maximum": "Wind speed maximum (m/s)",
    "Wind speed minimum": "Wind speed minimum (m/s)",
    "Wind speed average": "Wind speed average (m/s)",
    "Wind direction maximum": "Wind direction maximum (degrees)",
    "Wind direction minimum": "Wind direction minimum (degrees)",
    "Wind direction average": "Wind direction average (degrees)"
}

temperature_pressure_renaming = {
    "Air pressure": "Air pressure (hPa)",
    "Air temperature": "Air temperature (°C)"
}

humidity_renaming = {
    "Relative humidity": "Relative humidity (%RH)"
}

hail_renaming = {
    "Hail accumulation": "Hail accumulation (hits/cm²)",
    "Hail duration": "Hail duration (s)"
}

# Llamadas a la función para procesar y guardar cada tabla
process_and_save_data(spark, "rain_table", rain_renaming, "rain_data")
process_and_save_data(spark, "wind_table", wind_renaming, "wind_data")
process_and_save_data(spark, "temperature_pressure_table", temperature_pressure_renaming, "temperature_pressure_data")
process_and_save_data(spark, "Relative_humidity_table", humidity_renaming, "humidity_data")
process_and_save_data(spark, "hail_table", hail_renaming, "hail_data")

# Cerrar la sesión de Spark al finalizar
spark.stop()


StatementMeta(, c64217ee-c22f-4a43-9f37-0d958dc20c2e, 3, Finished, Available, Finished)

Datos de la tabla 'rain_table':
+-----------------+-------------------+-----------+-------------------+
|  nombre_variable|Measurement_Numeric|Mapped_Unit|          Timestamp|
+-----------------+-------------------+-----------+-------------------+
|Rain accumulation|               1.34|         mm|2024-11-07 14:54:22|
|Rain accumulation|               1.09|         mm|2024-11-02 20:31:25|
|Rain accumulation|               1.59|         mm|2024-11-12 18:41:20|
|Rain accumulation|               1.07|         mm|2024-11-02 12:30:25|
|Rain accumulation|               1.34|         mm|2024-11-09 12:07:21|
|Rain accumulation|               1.34|         mm|2024-11-06 17:00:23|
|Rain accumulation|               1.34|         mm|2024-11-05 19:27:23|
|Rain accumulation|               1.59|         mm|2024-11-13 16:16:19|
|Rain accumulation|               1.59|         mm|2024-11-13 16:17:19|
|Rain accumulation|               1.59|         mm|2024-11-13 16:08:19|
|Rain accumulation|             