In [52]:
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, lit, explode
import os
import re

In [53]:
# Se crea la sesión de Spark
conf = SparkConf().set("spark.hadoop.fs.file.impl", "org.apache.hadoop.fs.LocalFileSystem")
spark = SparkSession.builder \
        .config(conf=conf) \
        .appName('Clima Procesamiento') \
        .getOrCreate()

In [3]:
def obtener_ultimo_archivo(directorio, extension = '*.json'):
    
    '''
    Obtiene el archivo más reciente de un directorio con la extensión especificada.

    Args:
        directorio (str): Ruta del directorio donde buscar los archivos.
        extension (str): Extensión de los archivos a buscar (por defecto '*.json').

    Returns:
        str: Ruta completa del archivo más reciente con la extensión especificada.
        '''
    
    try:
        # Obtener la lista de rutas completas de los archivos que coinciden con la extensión
        archivos = [os.path.join(directorio, archivo) for archivo in os.listdir(directorio) if archivo.endswith(extension)]
        
        # Verificar si no hay archivos en la lista
        if not archivos:
            raise FileNotFoundError('No se encontraron archivos en el directorio especificado.')
        
        # Obtener el archivo más reciente basado en la fecha de modificación
        ultimo_archivo = max(archivos, key = os.path.getmtime)
        
        # Retorna ruta completa de último archivo
        return ultimo_archivo
    
    except FileNotFoundError as e:
        print(f'Error: {e}')
        raise
    except Exception as e:
        print(f'Error inesperado: {e}')
        raise

data_dir = 'Datos'

ultimo_archivo = obtener_ultimo_archivo(data_dir, extension='.json')

In [54]:
df = spark.read.json(ultimo_archivo)
df.show()


+-------+------+--------------------+--------------------+--------------------+--------+---------+---------+---------------+--------------------+-----------+--------+
|address|alerts|   currentConditions|                days|         description|latitude|longitude|queryCost|resolvedAddress|            stations|   timezone|tzoffset|
+-------+------+--------------------+--------------------+--------------------+--------+---------+---------+---------------+--------------------+-----------+--------+
|Sicilia|    []|{25.0, Partially ...|[{42.1, Partially...|Similar temperatu...| 38.1221|  13.3611|        1|Sicilia, Italia|{{0.0, 54398.0, C...|Europe/Rome|     1.0|
+-------+------+--------------------+--------------------+--------------------+--------+---------+---------+---------------+--------------------+-----------+--------+



In [38]:
df.printSchema()

root
 |-- address: string (nullable = true)
 |-- alerts: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- currentConditions: struct (nullable = true)
 |    |-- cloudcover: double (nullable = true)
 |    |-- conditions: string (nullable = true)
 |    |-- datetime: string (nullable = true)
 |    |-- datetimeEpoch: long (nullable = true)
 |    |-- dew: double (nullable = true)
 |    |-- feelslike: double (nullable = true)
 |    |-- humidity: double (nullable = true)
 |    |-- icon: string (nullable = true)
 |    |-- moonphase: double (nullable = true)
 |    |-- precip: double (nullable = true)
 |    |-- precipprob: double (nullable = true)
 |    |-- preciptype: string (nullable = true)
 |    |-- pressure: double (nullable = true)
 |    |-- snow: double (nullable = true)
 |    |-- snowdepth: double (nullable = true)
 |    |-- solarenergy: double (nullable = true)
 |    |-- solarradiation: double (nullable = true)
 |    |-- source: string (nullable = true)
 |    

In [89]:
dataframes_exploded_Days

{'col': DataFrame[col_cloudcover: double, col_conditions: string, col_datetime: string, col_datetimeEpoch: bigint, col_description: string, col_dew: double, col_feelslike: double, col_feelslikemax: double, col_feelslikemin: double, col_hours: array<struct<cloudcover:double,conditions:string,datetime:string,datetimeEpoch:bigint,dew:double,feelslike:double,humidity:double,icon:string,precip:double,precipprob:double,preciptype:string,pressure:double,severerisk:double,snow:double,snowdepth:double,solarenergy:double,solarradiation:double,source:string,stations:array<string>,temp:double,uvindex:double,visibility:double,winddir:double,windgust:double,windspeed:double>>, col_humidity: double, col_icon: string, col_moonphase: double, col_precip: double, col_precipcover: double, col_precipprob: double, col_preciptype: string, col_pressure: double, col_severerisk: double, col_snow: double, col_snowdepth: double, col_solarenergy: double, col_solarradiation: double, col_source: string, col_stations

In [73]:
dataframes_exploded_Alerts_Days

{'alerts': DataFrame[alerts_explode: string],
 'days': DataFrame[days_explode: struct<cloudcover:double,conditions:string,datetime:string,datetimeEpoch:bigint,description:string,dew:double,feelslike:double,feelslikemax:double,feelslikemin:double,hours:array<struct<cloudcover:double,conditions:string,datetime:string,datetimeEpoch:bigint,dew:double,feelslike:double,humidity:double,icon:string,precip:double,precipprob:double,preciptype:string,pressure:double,severerisk:double,snow:double,snowdepth:double,solarenergy:double,solarradiation:double,source:string,stations:array<string>,temp:double,uvindex:double,visibility:double,winddir:double,windgust:double,windspeed:double>>,humidity:double,icon:string,moonphase:double,precip:double,precipcover:double,precipprob:double,preciptype:string,pressure:double,severerisk:double,snow:double,snowdepth:double,solarenergy:double,solarradiation:double,source:string,stations:array<string>,sunrise:string,sunriseEpoch:bigint,sunset:string,sunsetEpoch:bigi

In [78]:
dataframes_exploded_Days

{'days_explode': DataFrame[days_explode_cloudcover: double, days_explode_conditions: string, days_explode_datetime: string, days_explode_datetimeEpoch: bigint, days_explode_description: string, days_explode_dew: double, days_explode_feelslike: double, days_explode_feelslikemax: double, days_explode_feelslikemin: double, days_explode_hours: array<struct<cloudcover:double,conditions:string,datetime:string,datetimeEpoch:bigint,dew:double,feelslike:double,humidity:double,icon:string,precip:double,precipprob:double,preciptype:string,pressure:double,severerisk:double,snow:double,snowdepth:double,solarenergy:double,solarradiation:double,source:string,stations:array<string>,temp:double,uvindex:double,visibility:double,winddir:double,windgust:double,windspeed:double>>, days_explode_humidity: double, days_explode_icon: string, days_explode_moonphase: double, days_explode_precip: double, days_explode_precipcover: double, days_explode_precipprob: double, days_explode_preciptype: string, days_explo

In [81]:
dataframes_exploded_DaysHours

{'days_explode_hours': DataFrame[days_explode_hours_explode3: struct<cloudcover:double,conditions:string,datetime:string,datetimeEpoch:bigint,dew:double,feelslike:double,humidity:double,icon:string,precip:double,precipprob:double,preciptype:string,pressure:double,severerisk:double,snow:double,snowdepth:double,solarenergy:double,solarradiation:double,source:string,stations:array<string>,temp:double,uvindex:double,visibility:double,winddir:double,windgust:double,windspeed:double>],
 'days_explode_stations': DataFrame[days_explode_stations_explode3: string]}

In [83]:
for key, df in dataframes_exploded_DaysHours.items():
    print(f'- LLAVE DF: {key}')
    df.show()

- LLAVE DF: days_explode_hours
+---------------------------+
|days_explode_hours_explode3|
+---------------------------+
|       {76.2, Partially ...|
|       {100.0, Overcast,...|
|       {96.7, Overcast, ...|
|       {98.0, Overcast, ...|
|       {91.8, Overcast, ...|
|       {53.7, Partially ...|
|       {85.8, Partially ...|
|       {100.0, Overcast,...|
|       {99.4, Overcast, ...|
|       {100.0, Overcast,...|
|       {100.0, Overcast,...|
|       {100.0, Overcast,...|
|       {25.0, Partially ...|
|       {25.0, Partially ...|
|       {25.0, Partially ...|
|       {25.0, Partially ...|
|       {25.0, Partially ...|
|       {26.6, Partially ...|
|       {100.0, Overcast,...|
|       {100.0, Overcast,...|
+---------------------------+
only showing top 20 rows

- LLAVE DF: days_explode_stations
+------------------------------+
|days_explode_stations_explode3|
+------------------------------+
|                         C6242|
|                          LICJ|
|                       

In [49]:
for key, df in dataframes_desanidados.items():
    print(f'- LLAVE DF: {key}')
    df.printSchema()

- LLAVE DF: currentConditions
root
 |-- currentConditions_cloudcover_desanidado: double (nullable = true)
 |-- currentConditions_conditions_desanidado: string (nullable = true)
 |-- currentConditions_datetime_desanidado: string (nullable = true)
 |-- currentConditions_datetimeEpoch_desanidado: long (nullable = true)
 |-- currentConditions_dew_desanidado: double (nullable = true)
 |-- currentConditions_feelslike_desanidado: double (nullable = true)
 |-- currentConditions_humidity_desanidado: double (nullable = true)
 |-- currentConditions_icon_desanidado: string (nullable = true)
 |-- currentConditions_moonphase_desanidado: double (nullable = true)
 |-- currentConditions_precip_desanidado: double (nullable = true)
 |-- currentConditions_precipprob_desanidado: double (nullable = true)
 |-- currentConditions_preciptype_desanidado: string (nullable = true)
 |-- currentConditions_pressure_desanidado: double (nullable = true)
 |-- currentConditions_snow_desanidado: double (nullable = true)
 

### PROCESAMIENTO DE DATOS 

In [10]:
from pyspark.sql import DataFrame
from pyspark.sql.functions import array
from pyspark.sql.types import StructType, ArrayType, StringType, DoubleType, LongType, FloatType
from pyspark.sql.types import StringType, IntegerType, LongType, DoubleType



def explotar_columnas_array(df, diccionario_resultado, sufijo_explode=None, columnas_target=None):
    
    for columna in df.schema:
        if isinstance(columna.dataType, ArrayType):
            columna_nombre = columna.name
            if columnas_target is None or columna_nombre in columnas_target:
                diccionario_resultado[columna_nombre] = df.select(explode(col(columna_nombre)).alias(f'{columna_nombre}'))
            
def desanidar_columnas_struct(df, diccionario_resultado, sufijo_desanidado=None, columnas_target=None):
    
    for columna in df.schema:
        if isinstance(columna.dataType, StructType):
            columna_nombre = columna.name
            if columnas_target is None or columna_nombre in columnas_target:

                campos_struct = [
                    col(f'{columna_nombre}.{subfield.name}').alias(f'{columna_nombre}_{subfield.name}')
                    for subfield in columna.dataType.fields
                ]
                diccionario_resultado[columna_nombre] = df.select(*campos_struct)
                    
def aplicar_dataframe(metodo:str, diccionario_df, diccionario_dfResultado, sufijo=None, columnas_target=None):
    
    if metodo == 'explotar':
        if isinstance(diccionario_df, DataFrame):
            explotar_columnas_array(diccionario_df, diccionario_dfResultado, sufijo, columnas_target)
        
        elif isinstance(diccionario_df, dict):
            for key, df in diccionario_df.items():
                explotar_columnas_array(df, diccionario_dfResultado, sufijo, columnas_target)
    
    elif metodo == 'desanidar':
        if isinstance(diccionario_df, DataFrame):
            desanidar_columnas_struct(diccionario_df, diccionario_dfResultado, sufijo, columnas_target)
        
        elif isinstance(diccionario_df, dict):
            for key, df in diccionario_df.items():
                desanidar_columnas_struct(df, diccionario_dfResultado, sufijo, columnas_target)
        
        


# REEMPLAZO VALORES NULOS

valores_reemplazo = {
    StringType : 'Sin Dato',
    IntegerType : 0,
    LongType : 0,
    DoubleType : 0.0
    }


def reemplazar_nulos(diccionario_df):
    
    if isinstance(diccionario_df, dict):
        
        for key, df in diccionario_df.items():
            # Reemplazar según tipo de dato
            for columna in df.schema.fields:
                tipo = columna.dataType
                
                if isinstance(tipo, ArrayType) and isinstance(tipo.elementType, StringType):
                    df = df.withColumn(
                        columna.name,
                        when(col(columna.name).isNull(), array(lit('Sin Dato')))
                        .otherwise(col(columna.name))
                    )
                
                elif type(tipo) in valores_reemplazo:
                    df = df.fillna({columna.name : valores_reemplazo[type(tipo)]})
            
            diccionario_df[key] = df


In [21]:

dfExplodedArray_Alerts_Days_1 = {}
dfDesanidadoStruct_Days_2 = {}
dfExplodeArray_DaysHours_DayStation_3 = {} 
dfDesanidadoStruct_DaysHours_4 = {} 

dfDesanidadoStruct_Current_Station_1 = {}

dfDesanidadoStruct_Stations_2 = {}


columnas_array_1 = ['alerts', 'days']
columnas_struct_1 = ['currentConditions', 'stations']

columnas_struct_2 = ['days']
columnas_array_2 = ['days_hours', 'days_stations']

columnas_struct_3 = ['days_hours']

columnas_struct_4 = {'stations_C6242', 'stations_D2770', 'stations_LICJ', 'stations_LICT'}


aplicar_dataframe('explotar', df, dfExplodedArray_Alerts_Days_1, 'explode1', columnas_array_1)
aplicar_dataframe('desanidar', df, dfDesanidadoStruct_Current_Station_1, 'desanidar1', columnas_struct_1)
aplicar_dataframe('desanidar', dfExplodedArray_Alerts_Days_1, dfDesanidadoStruct_Days_2, 'desanidar2', columnas_struct_2)
aplicar_dataframe('explotar', dfDesanidadoStruct_Days_2, dfExplodeArray_DaysHours_DayStation_3, 'desanidar2', columnas_array_2)
aplicar_dataframe('desanidar', dfExplodeArray_DaysHours_DayStation_3, dfDesanidadoStruct_DaysHours_4, 'desanidar2', columnas_struct_3)
aplicar_dataframe('desanidar', dfDesanidadoStruct_Current_Station_1, dfDesanidadoStruct_Stations_2, 'desanidar2', columnas_struct_4)


reemplazar_nulos(dfExplodedArray_Alerts_Days_1)
reemplazar_nulos(dfDesanidadoStruct_Days_2)
reemplazar_nulos(dfExplodeArray_DaysHours_DayStation_3)
reemplazar_nulos(dfDesanidadoStruct_DaysHours_4)
reemplazar_nulos(dfDesanidadoStruct_Current_Station_1)
reemplazar_nulos(dfDesanidadoStruct_Stations_2)

In [25]:
for key, df in dfDesanidadoStruct_Current_Station_1.items():
        print(f'- DATAFRAME {key}:')
        df.show()


- DATAFRAME currentConditions:
+----------------------------+----------------------------+--------------------------+-------------------------------+---------------------+---------------------------+--------------------------+----------------------+---------------------------+------------------------+----------------------------+----------------------------+--------------------------+----------------------+---------------------------+-----------------------------+--------------------------------+------------------------+--------------------------+-------------------------+------------------------------+------------------------+-----------------------------+----------------------+-------------------------+----------------------------+-------------------------+--------------------------+---------------------------+
|currentConditions_cloudcover|currentConditions_conditions|currentConditions_datetime|currentConditions_datetimeEpoch|currentConditions_dew|currentConditions_feelslike|current

### GUARDAR ARCHIVOS PROCESADOS

In [45]:
# Se extrae cada df de Spark del diccionario donde se encuentran 
dfSP_currentConditions = dfDesanidadoStruct_Current_Station_1['currentConditions']
dfSP_currentConditionsStations = dfDesanidadoStruct_Current_Station_1['stations']

# Se elimina la columna "currentConditions_stations" ya que al ser Array la procesa erróneamente al convertir a DF de Pandas
dfSP_currentConditions = dfSP_currentConditions.drop('currentConditions_stations')

In [None]:
# El Df de Spark se convierte en DF de Pandas
dfPandas_currentConditions = dfSP_currentConditions.toPandas()

# Se guarda el DF de Pandas en un archivo CSV
dfPandas_currentConditions.to_csv('Datos/Datos_Procesados/CurrentConditions.csv')