In [None]:
import pandas as pd
import sqlalchemy
from sqlalchemy import create_engine
import json
import sys
import os
import shutil

def procesar_directorio(initial_path, cadena):
    """
    Busca mediante recursividad en el directorio inicial todos los directorios hasta encontrar un directorio que contenga
    la cadena de texto "cadena". Una vez encontrado, busca en el directorio todos los ficheros .csv y los concatena en un
    único dataframe. Por último, mueve el directorio encontrado a la carpeta "Procesados" y devuelve el dataframe.
    
    Parameters:
        initial_path (str): Ruta del directorio inicial.
        cadena (str): Cadena de texto a buscar en los nombres de los directorios.
    
    Returns:
        pandas.DataFrame: Dataframe que contiene la concatenación de todos los ficheros .csv encontrados.
    """
    
    df = pd.DataFrame()
    for filename in os.listdir(initial_path):
        complete_path = os.path.join(initial_path, filename)
        if (os.path.isdir(complete_path)) and (cadena in filename):
            for internal_filename in os.listdir(complete_path):
                if internal_filename.endswith(".csv"):
                    csv_path = os.path.join(complete_path, internal_filename)
                    try:
                        temp_df = pd.read_csv(csv_path, delimiter = ";")
                        if temp_df.shape[1] == 1:
                            temp_df = pd.read_csv(csv_path, delimiter = ",")
                    except pd.errors.ParserError:
                        temp_df = pd.read_csv(csv_path, delimiter = ",")
                    df = pd.concat([df, temp_df])
                    if not os.path.exists(complete_path.replace("Nuevos", "Procesados")):
                        os.makedirs(complete_path.replace("Nuevos", "Procesados"))
                    #shutil.move(csv_path, csv_path.replace("Nuevos", "Procesados"))   
        elif (os.path.isfile(complete_path)) and (cadena in filename):
            csv_path = os.path.join(initial_path, filename)
            try:
                temp_df = pd.read_csv(csv_path, delimiter = ";")
                if temp_df.shape[1] == 1:
                    temp_df = pd.read_csv(csv_path, delimiter = ",")
            except pd.errors.ParserError:
                temp_df = pd.read_csv(csv_path, delimiter = ",")
            df = pd.concat([df, temp_df])
            if not os.path.exists(initial_path.replace("Nuevos", "Procesados")):
                os.makedirs(initial_path.replace("Nuevos", "Procesados"))
            #shutil.move(csv_path, csv_path.replace("Nuevos", "Procesados"))
        elif (os.path.isdir(complete_path)):
            partial_df = procesar_directorio(complete_path, cadena)
            if not partial_df.empty:
                df = pd.concat([df, partial_df])
        else:
            continue
    return df

if __name__ == "__main__":
    # # Carga, desde la ruta de ejecución, de los parámetros para conexión a la base de datos  
    # root_path = os.getcwd()
    # params = None
    # for filename in os.listdir(root_path):
    #     if "params.json" in filename:
    #         with open(os.path.join(root_path, filename)) as f:
    #             params = json.load(f)
    # if params is None:
    #     print("No se ha encontrado el archivo de parámetros para la conexión a la base de datos")
    #     sys.exit()
    # else:
    #     print(f"Parámetros de la planta {params['schema'].capitalize()} cargados correctamente")
    # data_path = os.path.join(root_path, params["data_path"])
    # schema_name = params["schema"]

    # Carga y comprobación de la existencia de nuevos ficheros para procesar
    data_path = "/home/upo/Desktop/Test_FVPredictive/FVPredictive_TEST/Alcazar/Datos/Nuevos"
    df = procesar_directorio(data_path, 'IAZ')
    if df.empty:
        print(f"No se han encontrado nuevos ficheros para procesar en {data_path}")
        sys.exit()
    print(f"Se han encontrado {df.shape[0]} nuevos registros para procesar")

    columnas = df.columns

    # Crear un diccionario para almacenar grupos de columnas y los nombres de las columnas para el dataframe reestructurado
    grupos_columnas = {}
    col_names = []

    # Iterar sobre las columnas y agruparlas por el patrón común
    for columna in columnas:
        if columna != "Datetime":
            if columna.split(" - ")[1] not in col_names:
                col_names.append(columna.split(" - ")[1])
            else:
                pass
        
            grupo = columna.split(" - ")[0]
            if grupo not in grupos_columnas:
                grupos_columnas[grupo] = [columna]
            else:
                grupos_columnas[grupo].append(columna)

    # Crear un dataframe para cada grupo de columnas y concatenarlos en un único dataframe
    meteo_df = pd.DataFrame()
    for grupo, columnas_grupo in grupos_columnas.items():
        single_meteo_df = pd.concat([df["Datetime"], df[columnas_grupo].copy().rename(columns={col: col.split(" - ")[1] for col in columnas_grupo})], axis = 1)
        single_meteo_df["disp_name"] = grupo
        meteo_df = pd.concat([meteo_df, single_meteo_df], axis=0)

    # Asignación de identificadores a los dispositivos y parque
    meteo_df["parque_id"] = meteo_df["disp_name"].str[3]
    meteo_df["dispositivo_id"] = meteo_df["dispositivo_id"] = meteo_df.groupby("parque_id")["disp_name"].transform(lambda x: x.astype("category").cat.codes + 41)
    meteo_df.drop("disp_name", axis=1, inplace=True)

    # Procesado de columnas: conversión de tipos
    columns = []
    for column in meteo_df.columns:
        try:
            meteo_df[column] = meteo_df[column].str.replace(",", ".").astype(float)
            if meteo_df[column].iloc[0].is_integer():
                meteo_df[column] = meteo_df[column].astype(int)
        except Exception as e:
            pass
        columns.append(column.strip().replace(" ", "_").split("(")[0].lower())
    meteo_df.columns = columns

    meteo_df = meteo_df.rename(columns={'datetime': "datetime_utc", 
                                'wind_speed_1': "vel_viento", 
                                'wind_direction_1': "dir_viento",
                                'average_external_ambient_temperature': "temp_amb", 
                                'average_humidity': "hum_rel",
                                'average_atmospheric_pressure': "presion_atm", 
                                'radiation_pyranometer_1': "rad_hor",
                                'radiation_pyranometer_2': "rad_poa"})

    # Parseo de fechas y ordenación del dataframe
    meteo_df["datetime_utc"] = pd.to_datetime(meteo_df["datetime_utc"])
    num_duplicates = meteo_df[meteo_df.duplicated(subset = ["datetime_utc", "dispositivo_id"])].shape[0]
    meteo_df = meteo_df.drop_duplicates(subset = ["datetime_utc", "dispositivo_id"], keep = "last").reset_index(drop = True)

    # Conexión a la base de datos y carga de datos
    try:
        password = params['password'].replace('@', '%40')
        engine = create_engine(f'postgresql://{params["user"]}:{password}@{params["host"]}:{params["port"]}/{params["dbname"]}')
        print(f"Conexión a la base de datos {params['dbname']} (esquema {schema_name}) establecida")
    except Exception as error:
        print("Error en la apertura de base de datos: \n\t{}".format(error))
        sys.exit()

    # Comprobación de la existencia de registros duplicados en la base de datos
    check_query = f"SELECT datetime_utc, dispositivo_id FROM {schema_name}.meteo_raw"
    check_df = pd.read_sql_query(check_query, engine)
    check_df["datetime_utc"] = pd.to_datetime(check_df["datetime_utc"], utc=True)
    merged_df = meteo_df.merge(check_df, how='left', indicator=True, left_on = ["datetime_utc", "dispositivo_id"], right_on = ["datetime_utc", "dispositivo_id"])
    meteo_df = merged_df[merged_df["_merge"] == "left_only"]
    meteo_df = meteo_df.drop(columns = "_merge")

    print(f"Se han encontrado {num_duplicates} registros duplicados")
    print(f"Se han encontrado {merged_df.shape[0] - meteo_df.shape[0]} registros ya existentes en la base de datos")

    # Ordenación del dataframe por fecha y dispositivo
    meteo_df = meteo_df.sort_values(by = ["datetime_utc", "dispositivo_id"])\
            .reset_index(drop = True)