In [3]:
import pandas as pd
import psycopg2
import json
import logging

In [5]:
logging.basicConfig(level=logging.INFO)


def load_data(csv_path, config_path):
    
    logging.info(f"Leyendo datos desde el archivo CSV en: {csv_path}")
    df_water = pd.read_csv(csv_path)
    df_water.fillna(value=pd.NA, inplace=True)  # Normalizar NaN a NA para compatibilidad con la base de datos

    logging.info("Tipos de datos del DataFrame:")
    logging.info(df_water.dtypes)

    try:
        with open(config_path, 'r') as config_json:
            config = json.load(config_json)

        conx = psycopg2.connect(**config)
        cursor = conx.cursor()
        logging.info(f"Conectado a la base de datos: {config['dbname']} en {config['host']}")

        # Insertando en la tabla de dimensión de fechas
        for year in df_water['año'].drop_duplicates():
            cursor.execute("""
                INSERT INTO dimension_date ("Año") VALUES (%s) 
                ON CONFLICT ("Año") DO NOTHING RETURNING "ID_Tiempo";
            """, (year,))
            id_tiempo = cursor.fetchone()[0] if cursor.rowcount != 0 else None
            logging.info(f"Insertado año {year} con ID {id_tiempo}")

        # Insertando en las demás tablas de dimensión y la tabla de hechos
        for _, row in df_water.iterrows():
            cursor.execute("""
                INSERT INTO dimension_ubication ("nombre_departamento", "div_dpto", "nombre_municipio", "divi_muni") 
                VALUES (%s, %s, %s, %s) 
                ON CONFLICT ("nombre_departamento", "div_dpto", "nombre_municipio", "divi_muni") DO NOTHING RETURNING "ID_Ubicacion";
            """, (row['nombre_departamento'], row['Div_dpto'], row['nombre_municipio_x'], row['Divi_muni']))
            id_ubicacion = cursor.fetchone()[0] if cursor.rowcount != 0 else None
            logging.info(f"Insertado ubicación con ID {id_ubicacion}")

            cursor.execute("""
                INSERT INTO dimension_parameters ("nombre_parametro_analisis") 
                VALUES (%s) 
                ON CONFLICT ("nombre_parametro_analisis") DO NOTHING RETURNING "ID_Parametro";
            """, (row['nombre_parametro_analisis'],))
            id_parametro = cursor.fetchone()[0] if cursor.rowcount != 0 else None
            logging.info(f"Insertado parámetro con ID {id_parametro}")

            cursor.execute("""
                INSERT INTO dimension_tratamiento ("rango_irca", "tratamiento_categoría") 
                VALUES (%s, %s) 
                ON CONFLICT ("rango_irca", "tratamiento_categoría") DO NOTHING RETURNING "ID_Rango";
            """, (row['rango_irca'], row['tratamiento_categoria']))
            id_rango = cursor.fetchone()[0] if cursor.rowcount != 0 else None
            logging.info(f"Insertado tratamiento con ID {id_rango}")

            cursor.execute("""
                INSERT INTO dimension_proyecto ("indicador", "nombre_proyecto", "origen", "estado_seguimiento", "num_municipios", "region", "total_financiamiento", "duracion_proyecto_dias", "ano_proyecto") 
                VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) 
                ON CONFLICT ("indicador", "nombre_proyecto", "origen", "estado_seguimiento", "num_municipios", "region", "total_financiamiento", "duracion_proyecto_dias", "ano_proyecto") DO NOTHING RETURNING "ID_Proyecto";
            """, (row['indicador'], row['nombre_proyecto'], row['origen'], row['estado_seguimiento'], row['num_municipios'], row['región'], row['total_financiamiento'], row['duracion_proyecto_dias'], row['año_proyecto']))
            id_proyecto = cursor.fetchone()[0] if cursor.rowcount != 0 else None
            logging.info(f"Insertado proyecto con ID {id_proyecto}")

            cursor.execute("""
                INSERT INTO Fact_WaterQuality ("ID_Tiempo", "ID_Ubicacion", "ID_Parametro", "ID_Rango", "ID_Proyecto", "irca_promedio", "numero_parametros_promedio", "proporcion_critica", "fecha_proyecto") 
                VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s);
            """, (id_tiempo, id_ubicacion, id_parametro, id_rango, id_proyecto, row['irca_promedio'], row['numero_parametros_promedio'], row['proporción_crítica'], row['fecha_proyecto']))
            logging.info(f"Insertado medición con ID_Tiempo {id_tiempo}, ID_Ubicacion {id_ubicacion}, ID_Parametro {id_parametro}, ID_Rango {id_rango}, ID_Proyecto {id_proyecto}")

        conx.commit()
        logging.info("Los datos se han cargado exitosamente a la base de datos.")
        logging.info(f"Primeras filas del DataFrame cargado:\n{df_water.head()}")

    except Exception as e:
        logging.error(f"An error occurred: {e}")
        if 'conx' in locals() and conx:
            conx.rollback()
    finally:
        if 'cursor' in locals() and cursor:
            cursor.close()
        if 'conx' in locals() and conx:
            conx.close()

    return "Data loaded successfully"

# Ruta al archivo CSV y al archivo de configuración
csv_path = 'P:/ETL/Proyecto 3/ETL-Proyect/merged_data (1).csv'
config_path = 'P:/ETL/Proyecto 3/ETL-Proyect/Airflow_water/db_config.json'

# Ejecutar la función
load_data(csv_path, config_path)

INFO:root:Leyendo datos desde el archivo CSV en: P:/ETL/Proyecto 3/ETL-Proyect/merged_data (1).csv
INFO:root:Tipos de datos del DataFrame:
INFO:root:año                             int64
nombre_departamento            object
Div_dpto                        int64
nombre_municipio_x             object
Divi_muni                       int64
irca_promedio                 float64
nombre_parametro_analisis      object
numero_parametros_promedio      int64
is_top_20                        bool
rango_irca                     object
tratamiento_categoria          object
proporción_crítica            float64
clave                          object
fecha_proyecto                float64
codigo_departamento           float64
departamento                   object
c_digo_divipola_municipio      object
nombre_municipio_y             object
indicador                      object
nombre_proyecto                object
origen                         object
estado_seguimiento             object
num_municipios 

'Data loaded successfully'

: 

In [None]:
def read_water():
    with open('./dag_water/db_config.json') as file:
        db_config = json.load(file)

    engine = create_engine(f'postgresql+psycopg2://{db_config["user"]}:{db_config["password"]}@{db_config["host"]}:3033/{db_config["dbname"]}')

    water = pd.read_sql('SELECT * FROM water_table LIMIT 100000', con=engine)

    return water.to_json(orient='records')


def transform_water(**kwargs):
    ti = kwargs['ti']
    json_data = json.loads(ti.xcom_pull(task_ids='read_water'))
    water = pd.json_normalize(data=json_data)

    water = transformations_water(water)

    logging.info("Datos transformados de agua: %s", water.to_string())  # Limitar la cantidad de datos logueados si es necesario

    return water.to_json(orient='records')



def extract_api():
    try:
        client = Socrata("www.datos.gov.co", None)
        results = client.get("tcwu-r53g", limit=2000)
        api_data = pd.DataFrame.from_records(results)
        print(api_data.shape)
        return api_data.to_json(orient='records')
    except Exception as e:
        logging.error("Se produjo un error: %s", e)



def transform_api(**kwargs):
    ti = kwargs['ti']

    json_data = ti.xcom_pull(task_ids='extract_api')

    api = pd.read_json(json_data, orient='records')

    api_transformed = transformations_api(api)

    logging.info("Datos transformados de API: %s", api_transformed.to_string())

    return api_transformed.to_json(orient='records')



def expectation_water(**kwargs):
    ti = kwargs['ti']
    water_json = ti.xcom_pull(task_ids='transform_water')
    df_water = pd.read_json(water_json, orient='records')
    water_ge = ge.from_pandas(df_water)

    logging.info("Validating column types")
    water_ge.expect_column_values_to_be_of_type('numero_parametros_promedio', 'int64')
    water_ge.expect_column_values_to_be_of_type('irca_promedio', 'float64')
    water_ge.expect_column_values_to_be_of_type('nombre_municipio', 'str')
    water_ge.expect_column_values_to_be_of_type('nombre_departamento', 'str')
    water_ge.expect_column_values_to_be_of_type('año', 'int64')  # Ajustamos a 'int64' porque 'año' es un año extraído como int

    logging.info("Validating place name normalization")
    water_ge.expect_column_values_to_match_regex('nombre_departamento', r'^[A-Z][a-z]+(?: [A-Z][a-z]+)*$')
    water_ge.expect_column_values_to_match_regex('nombre_municipio', r'^[A-Z][a-z]+(?: [A-Z][a-z]+)*$')

    logging.info("Validating categorical column values")
    water_ge.expect_column_values_to_be_in_set('rango_irca', [
        'Sin información', 'Sin riesgo', 'Riesgo bajo',
        'Riesgo medio', 'Riesgo alto', 'Riesgo inviable sanitariamente', 'No clasificado'
    ])
    water_ge.expect_column_values_to_be_in_set('tratamiento_categoria', [
        'Sin tratamiento', 'Tratamiento completo', 'Tratamiento parcial'
    ])

    logging.info("Validating scaled values")
    water_ge.expect_column_values_to_be_between('proporción_crítica', 0, 1)

    results = water_ge.validate()
    logging.info(f"Validation results: {results}")

    if not results['success']:
        logging.error("Water data validation failed")
        raise ValueError("Water data validation failed")

    return df_water.to_json(orient='records')







def expectation_api(**kwargs):
    ti = kwargs['ti']
    api_json = ti.xcom_pull(task_ids='transform_api')
    df_api = pd.read_json(api_json, orient='records')
    api_ge = ge.from_pandas(df_api)

    logging.info("Validating column types")
    api_ge.expect_column_values_to_be_of_type('nombre_municipio', 'str')
    api_ge.expect_column_values_to_be_of_type('fecha_proyecto', 'datetime64[ns]')
    api_ge.expect_column_values_to_be_of_type('codigo_departamento', 'int64')
    api_ge.expect_column_values_to_be_of_type('num_municipios', 'int64')
    api_ge.expect_column_values_to_be_of_type('departamento', 'str')
    api_ge.expect_column_values_to_be_of_type('región', 'str')
    api_ge.expect_column_values_to_be_of_type('total_financiamiento', 'float64')
    api_ge.expect_column_values_to_be_of_type('duracion_proyecto_dias', 'int64')

    logging.info("Validating place name normalization")
    api_ge.expect_column_values_to_match_regex('departamento', r'^[A-Z ]+$')
    api_ge.expect_column_values_to_match_regex('nombre_municipio', r'^[A-Z][a-z]+(?: [A-Z][a-z]+)*$')

    logging.info("Validating numerical ranges")
    api_ge.expect_column_values_to_be_between('total_financiamiento', 0, 1e9)  # Ajusta según los valores esperados
    api_ge.expect_column_values_to_be_between('duracion_proyecto_dias', 0, 1e4)  # Ajusta según los valores esperados

    logging.info("Remove parentheses validation")
    api_ge.expect_column_values_to_not_match_regex('nombre_municipio', r"\(.*?\)")

    logging.info("Space and capitalize validation")
    api_ge.expect_column_values_to_match_regex('nombre_municipio', r'^[A-Z][a-z]+(?: [A-Z][a-z]+)*$')
    
    results = api_ge.validate()
    logging.info(f"Validation results: {results}")

    return df_api.to_json(orient='records')




def merge_task(**kwargs):
    ti = kwargs['ti']

    logging.info("Recuperando los datos transformados de agua y API desde XCom.")
    # Recuperar los datos transformados de agua y API desde XCom
    water_json = ti.xcom_pull(task_ids='expectation_water')
    api_json = ti.xcom_pull(task_ids='expectation_api')

    logging.info("Convirtiendo los datos de JSON a DataFrame.")
    water_cleaned_df = pd.read_json(water_json, orient='records')
    api_done_df = pd.read_json(api_json, orient='records')

    logging.info("Ejecutando la función de merge.")
    # Ejecutar la función de merge
    merged_df = merge_datasets(api_done_df, water_cleaned_df)

    logging.info("Merge completado y datos convertidos a JSON.")
    merged_json = merged_df.to_json(orient='records')
    ti.xcom_push(key='merged_data', value=merged_json)  # Push JSON result to XCom
    return merged_json

def merge_datasets(api_done_df, water_cleaned_df):
    logging.info("Verificando las columnas disponibles en los DataFrames.")
    logging.info(f"Columnas en water_cleaned_df: {list(water_cleaned_df.columns)}")
    logging.info(f"Columnas en api_done_df: {list(api_done_df.columns)}")
    
    logging.info("Asegurando que las columnas son de tipo string y año como int.")
    water_cleaned_df['año'] = water_cleaned_df['año'].astype(int)
    water_cleaned_df['nombre_departamento'] = water_cleaned_df['nombre_departamento'].astype(str)
    water_cleaned_df['nombre_municipio'] = water_cleaned_df['nombre_municipio'].astype(str)

    api_done_df['fecha_proyecto'] = pd.to_datetime(api_done_df['fecha_proyecto'], errors='coerce')
    api_done_df['codigo_departamento'] = api_done_df['codigo_departamento'].astype(str)
    api_done_df['departamento'] = api_done_df['departamento'].astype(str)
    api_done_df['nombre_municipio'] = api_done_df['nombre_municipio'].astype(str)
    api_done_df['año_proyecto'] = api_done_df['fecha_proyecto'].dt.year

    logging.info("Filtrando los DataFrames para incluir solo los años 2017, 2018 y 2019.")
    water_cleaned_df = water_cleaned_df[water_cleaned_df['año'].isin([2017, 2018, 2019])]
    api_done_df = api_done_df[api_done_df['año_proyecto'].isin([2017, 2018, 2019])]

    logging.info("Creando claves únicas para el merge.")
    water_cleaned_df['clave'] = (water_cleaned_df['nombre_departamento'].str.lower().str.strip() + "_" +
                                 water_cleaned_df['nombre_municipio'].str.lower().str.strip() + "_" +
                                 water_cleaned_df['año'].astype(str))

    api_done_df['clave'] = (api_done_df['departamento'].str.lower().str.strip() + "_" +
                            api_done_df['nombre_municipio'].str.lower().str.strip() + "_" +
                            api_done_df['año_proyecto'].astype(str))

    logging.info(f"Claves únicas en water_cleaned_df:\n{water_cleaned_df['clave'].unique()}")
    logging.info(f"Claves únicas en api_done_df:\n{api_done_df['clave'].unique()}")


    logging.info("Realizando el merge de los datasets.")
    merged_df = pd.merge(water_cleaned_df, api_done_df, on='clave', how='outer')

    logging.info("Llenando valores faltantes con valores predeterminados.")
    merged_df.fillna({
        'fecha_proyecto': '1971-01-01', 'codigo_departamento': '-1', 'departamento': 'Desconocido',
        'nombre_municipio': 'Desconocido', 'indicador': '-1', 'nombre_proyecto': 'Desconocido',
        'origen': 'Desconocido', 'estado_seguimiento': 'Desconocido', 'num_municipios': -1,
        'región': 'Desconocido', 'total_financiamiento': -1.0, 'duracion_proyecto_dias': -1.0,
        'año_proyecto': -1
    }, inplace=True)

    logging.info(f"El DataFrame combinado tiene {merged_df.shape[0]} filas y {merged_df.shape[1]} columnas.")
    logging.info(f"Primeras filas del DataFrame combinado:\n{merged_df.head()}")
    logging.info(f"Número de proyectos únicos: {merged_df['nombre_proyecto'].nunique()}")
    logging.info(f"Años presentes en los datos combinados: {merged_df['año'].unique()}")
    logging.info(f"Tipos de datos de las columnas:\n{merged_df.dtypes}")



    csv_path = '/root/airflow_water12/dag_water/merged_data.csv'
    merged_df.to_csv(csv_path, index=False)
    logging.info(f"Archivo CSV guardado en: {csv_path}")

    return merged_df




def load(**kwargs):
    csv_path = '/root/airflow_water12/dag_water/merged_data.csv'
    db_config_path = '/root/airflow_water12/dag_water/db_config.json'

    logging.info(f"Leyendo datos desde el archivo CSV en: {csv_path}")

    df_water = pd.read_csv(csv_path)
    df_water.fillna(value=pd.NA, inplace=True)  # Normalizar NaN a NA para compatibilidad con la base de datos

    try:
        with open(db_config_path, 'r') as config_json:
            db_config = json.load(config_json)
            conx = psycopg2.connect(**db_config)
            cursor = conx.cursor()

            # Mostrar la base de datos y la ubicación
            logging.info(f"Conectado a la base de datos: {db_config['dbname']} en {db_config['host']}")
            
            
            
            # Creación de tablas de dimensiones y tabla de hechos con restricciones únicas
            cursor.execute("""
                CREATE TABLE IF NOT EXISTS dimension_date (
                    "ID_Tiempo" SERIAL PRIMARY KEY,
                    "Año" INT NOT NULL UNIQUE
                );
            """)
            cursor.execute("""
                CREATE TABLE IF NOT EXISTS dimension_ubication (
                    "ID_Ubicacion" SERIAL PRIMARY KEY,
                    "nombre_departamento" VARCHAR(255) NOT NULL,
                    "div_dpto" INT NOT NULL,
                    "nombre_municipio" VARCHAR(255) NOT NULL,
                    "divi_muni" INT NOT NULL,
                    UNIQUE ("nombre_departamento", "div_dpto", "nombre_municipio", "divi_muni")
                );
            """)
            cursor.execute("""
                CREATE TABLE IF NOT EXISTS dimension_parameters (
                    "ID_Parametro" SERIAL PRIMARY KEY,
                    "nombre_parametro_analisis" VARCHAR(255) NOT NULL UNIQUE
                );
            """)
            
            
            cursor.execute("""
                CREATE TABLE IF NOT EXISTS dimension_tratamiento (
                    "ID_Rango" SERIAL PRIMARY KEY,
                    "rango_irca" VARCHAR(255) NOT NULL,
                    "tratamiento_categoría" VARCHAR(255) NOT NULL,
                    UNIQUE ("rango_irca", "tratamiento_categoría")
                );
            """)
            cursor.execute("""
                CREATE TABLE IF NOT EXISTS dimension_proyecto (
                    "ID_Proyecto" SERIAL PRIMARY KEY,
                    "indicador" VARCHAR(255) NOT NULL,
                    "nombre_proyecto" VARCHAR(255) NOT NULL,
                    "origen" VARCHAR(255) NOT NULL,
                    "estado_seguimiento" VARCHAR(255) NOT NULL,
                    "num_municipios" INT NOT NULL,
                    "region" VARCHAR(255) NOT NULL,
                    "total_financiamiento" FLOAT NOT NULL,
                    "duracion_proyecto_dias" INT NOT NULL,
                    "ano_proyecto" INT NOT NULL,
                    UNIQUE ("indicador", "nombre_proyecto", "origen", "estado_seguimiento", "num_municipios", "region", "total_financiamiento", "duracion_proyecto_dias", "ano_proyecto")
                );
            """)
            
            
            cursor.execute("""
                CREATE TABLE IF NOT EXISTS Fact_WaterQuality (
                    "ID_Medicion" SERIAL PRIMARY KEY,
                    "ID_Tiempo" INT NOT NULL REFERENCES dimension_date("ID_Tiempo"),
                    "ID_Ubicacion" INT NOT NULL REFERENCES dimension_ubication("ID_Ubicacion"),
                    "ID_Parametro" INT NOT NULL REFERENCES dimension_parameters("ID_Parametro"),
                    "ID_Rango" INT NOT NULL REFERENCES dimension_tratamiento("ID_Rango"),
                    "ID_Proyecto" INT NOT NULL REFERENCES dimension_proyecto("ID_Proyecto"),
                    "irca_promedio" FLOAT NOT NULL,
                    "numero_parametros_promedio" INT NOT NULL,
                    "proporcion_critica" FLOAT NOT NULL,
                    "fecha_proyecto" DATE NOT NULL
                );
            """)

            conx.commit()
            logging.info("Tablas creadas exitosamente.")
            
            
            
            
            for year in df_water['año'].drop_duplicates():
                cursor.execute("""
                    INSERT INTO dimension_date ("Año") VALUES (%s) ON CONFLICT ("Año") DO NOTHING RETURNING "ID_Tiempo";
                """, (year,))
                id_tiempo = cursor.fetchone()[0] if cursor.rowcount != 0 else None
                logging.info(f"Insertado año {year} con ID {id_tiempo}")

            for _, row in df_water.iterrows():
                cursor.execute("""
                    INSERT INTO dimension_ubication ("nombre_departamento", "div_dpto", "nombre_municipio", "divi_muni")                    VALUES (%s, %s, %s, %s) ON CONFLICT ("nombre_departamento", "div_dpto", "nombre_municipio", "divi_muni") DO NOTHING RETURNING "ID_Ubicacion";
                """, (row['nombre_departamento'], row['Div_dpto'], row['nombre_municipio_x'], row['Divi_muni']))
                id_ubicacion = cursor.fetchone()[0] if cursor.rowcount != 0 else None
                logging.info(f"Insertado ubicación con ID {id_ubicacion}")

                cursor.execute("""
                    INSERT INTO dimension_parameters ("nombre_parametro_analisis")
                    VALUES (%s) ON CONFLICT ("nombre_parametro_analisis") DO NOTHING RETURNING "ID_Parametro";
                """, (row['nombre_parametro_analisis'],))
                id_parametro = cursor.fetchone()[0] if cursor.rowcount != 0 else None
                logging.info(f"Insertado parámetro con ID {id_parametro}")

                cursor.execute("""
                    INSERT INTO dimension_tratamiento ("rango_irca", "tratamiento_categoría")
                    VALUES (%s, %s) ON CONFLICT ("rango_irca", "tratamiento_categoría") DO NOTHING RETURNING "ID_Rango";                """, (row['rango_irca'], row['tratamiento_categoria']))
                id_rango = cursor.fetchone()[0] if cursor.rowcount != 0 else None
                
                
                cursor.execute("""
                    INSERT INTO dimension_tratamiento ("rango_irca", "tratamiento_categoría")
                    VALUES (%s, %s) ON CONFLICT ("rango_irca", "tratamiento_categoría") DO NOTHING RETURNING "ID_Rango";
                """, (row['rango_irca'], row['tratamiento_categoria']))
                id_rango = cursor.fetchone()[0] if cursor.rowcount != 0 else None
                logging.info(f"Insertado tratamiento con ID {id_rango}")

                cursor.execute("""
                    INSERT INTO dimension_proyecto ("indicador", "nombre_proyecto", "origen", "estado_seguimiento", "num_municipios", "region", "total_financiamiento", "duracion_proyecto_dias", "ano_proyecto")
                    VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) ON CONFLICT ("indicador", "nombre_proyecto", "origen", "estado_seguimiento", "num_municipios", "region", "total_financiamiento", "duracion_proyecto_dias", "ano_proyecto") DO NOTHING RETURNING "ID_Proyecto";
                """, (row['indicador'], row['nombre_proyecto'], row['origen'], row['estado_seguimiento'], row['num_municipios'], row['región'], row['total_financiamiento'], row['duracion_proyecto_dias'], row['año_proyecto']))
                id_proyecto = cursor.fetchone()[0] if cursor.rowcount != 0 else None
                logging.info(f"Insertado proyecto con ID {id_proyecto}")

                cursor.execute("""
                    INSERT INTO Fact_WaterQuality ("ID_Tiempo", "ID_Ubicacion", "ID_Parametro", "ID_Rango", "ID_Proyecto", "irca_promedio", "numero_parametros_promedio", "proporcion_critica", "fecha_proyecto")
                    VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s);
                """, (id_tiempo, id_ubicacion, id_parametro, id_rango, id_proyecto, row['irca_promedio'], row['numero_parametros_promedio'], row['proporción_crítica'], row['fecha_proyecto']))
                logging.info(f"Insertado medición con ID_Tiempo {id_tiempo}, ID_Ubicacion {id_ubicacion}, ID_Parametro {id_parametro}, ID_Rango {id_rango}, ID_Proyecto {id_proyecto}")

            conx.commit()
            logging.info("Los datos se han cargado exitosamente a la base de datos.")
            logging.info("Los datos se han cargado exitosamente a la base de datos.")
            logging.info(f"Primeras filas del DataFrame cargado:\n{df_water.head()}")
    except Exception as e:
        logging.error(f"An error occurred: {e}")
        if 'conx' in locals() and conx:
            conx.rollback()
    finally:
        if 'cursor' in locals() and cursor:
            cursor.close()
        if 'conx' in locals() and conx:
            conx.close()

    return "Data loaded successfully"







    
    

