# Codigo Extracción

In [None]:
import requests
import pandas as pd
from google.cloud import storage

def extract_wb_data(request):
    # Definir los países, indicadores y años
    countries = ["ARG", "BOL", "BRA", "CAN", "CHL", "COL", "CRI", "CUB", "DOM", "ECU", "SLV", "GTM", "HTI", "HND", "JAM", "MEX", "NIC", "PAN", "PRY", "PER", "USA", "URY", "VEN"]
    indicators = ['EN.ATM.CO2E.PP.GD', 'SP.DYN.LE00.IN', 'SH.H2O.SMDW.ZS', 'SP.RUR.TOTL.ZS', 'SP.DYN.CDRT.IN','SH.XPD.GHED.GD.ZS','SP.DYN.LE00.FE.IN','SE.ADT.LITR.ZS','SP.DYN.AMRT.FE', 'SP.DYN.AMRT.MA', 'SP.DYN.IMRT.IN','NY.GDP.PCAP.PP.KD', 'SP.DYN.CBRT.IN']
    start_year = "1990"
    end_year = "2021"

    # Lista para almacenar los datos
    data_frames = []

    # URL base de la API del Banco Mundial
    base_url = "http://api.worldbank.org/v2/country"

    # Realizar las consultas para cada país, indicador y año
    for country_code in countries:
        for indicator in indicators:
            # Construir la URL de la consulta
            url = f"{base_url}/{country_code}/indicator/{indicator}?date={start_year}:{end_year}&format=json"

            # Realizar la solicitud GET a la API del Banco Mundial
            response = requests.get(url)

            # Verificar si la solicitud fue exitosa
            if response.status_code == 200:
                data = response.json()

                if data[1]:
                # Los datos se encuentran en data[1]
                    for entry in data[1]:
                        year = entry['date']
                        value = entry['value']
                        indicator_name = entry['indicator']['value']
                        country_name = entry['country']['value']
                        data_frames.append(pd.DataFrame({"Pais": [country_name], "Indicador": [indicator_name], "Anio": [year], "Valor": [value]}))
                else:
                    print(f"No hay datos disponibles para {country_code} y {indicator}.")
            else:
                print(f"Error al obtener datos para {country_code} y {indicator}. Código de estado: {response.status_code}")

    # Concatenar todos los DataFrames individuales en uno
    data_df = pd.concat(data_frames, ignore_index=True)

    # Exportar el DataFrame a un archivo CSV
    csv_data = data_df.to_csv(index=False)

    # Guardar el archivo CSV en Google Cloud Storage
    bucket_name = "bucket-extraction"  # Reemplaza con el nombre de tu bucket
    file_name = "wb_data.csv"  # Nombre del archivo CSV en el bucket

    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(file_name)
    try:
        blob.upload_from_string(csv_data, content_type="text/csv")
        return "Datos extraídos y almacenados en Google Cloud Storage."
    except Exception as e:
        return f"Error al cargar el archivo en Google Cloud Storage: {str(e)}"

    #blob.upload_from_string(csv_data, content_type="text/csv")

    return "Datos extraídos y almacenados en Google Cloud Storage."

#requests
#pandas
#google-cloud-storage

# Codigo transformación

In [None]:
import pandas as pd
from flask import jsonify, abort, Request
from google.cloud import storage
import io
import numpy as np
from sklearn.linear_model import LinearRegression

def mark_outliers_with_nan(df, column):
    Q1 = df[column].quantile(0.25)
    Q3 = df[column].quantile(0.75)
    IQR = Q3 - Q1
    lower_whisker = Q1 - 1.5 * IQR
    upper_whisker = Q3 + 1.5 * IQR
    
    outliers_mask = ((df[column] < lower_whisker) | (df[column] > upper_whisker))
    df.loc[outliers_mask, column] = np.nan
    return df

def imputation_for_all_indicators(df):
    for column in df.columns[2:]:
        df = linear_regression_imputation(df, column)
    return df

def linear_regression_imputation(df, column):
    df_subset = df[df[column].notna()]
    if len(df_subset) > 2:
        X_train = df_subset[['Anio']].values.reshape(-1, 1)
        y_train = df_subset[column].values

        model = LinearRegression()
        model.fit(X_train, y_train)

        df_missing = df[df[column].isna()]
        if not df_missing.empty:
            X_missing = df_missing[['Anio']].values.reshape(-1, 1)
            predictions = model.predict(X_missing)
            df.loc[df_missing.index, column] = predictions
    return df

def process_csv(request: Request):
    try:
        # Configura el cliente de Google Cloud Storage
        storage_client = storage.Client()

        # Especifica el nombre del bucket y el archivo CSV
        bucket_name = 'bucket-extraction'
        file_name = 'wb_data.csv'

        # Obtiene el bucket y el archivo
        bucket = storage_client.bucket(bucket_name)
        blob = bucket.blob(file_name)

        # Descarga el archivo CSV a un objeto BytesIO
        csv_data = blob.download_as_text()

        # Crea un DataFrame a partir del CSV
        df = pd.read_csv(io.StringIO(csv_data))

        # Transformaciones adicionales
        pivoted_df = df.pivot(index=['Pais', 'Anio'], columns='Indicador', values='Valor').reset_index()
        pivoted_df.drop_duplicates(inplace=True)
        pivoted_df.replace(0, pd.NA, inplace=True)

        # Detección y manejo de outliers
        for column in pivoted_df.select_dtypes(include=np.number).columns:
            pivoted_df = mark_outliers_with_nan(pivoted_df, column)

        # Imputación de valores nulos
        pivoted_df = imputation_for_all_indicators(pivoted_df)

        # Guardar el DataFrame resultante como un nuevo archivo CSV en el bucket
        new_file_name = 'processed_data.csv'
        new_blob = bucket.blob(new_file_name)
        csv_content = pivoted_df.to_csv(index=False)
        new_blob.upload_from_string(csv_content)

        return jsonify({"success": True})

    except Exception as e:
        print(f"Error: {e}")
        abort(500, description=str(e))

#google-cloud-storage==2.13.0
#pandas==2.1.3
#numpy==1.26.1
#scikit-learn==0.24.2  # Specify the version of scikit-learn

# Codigo trigger

In [None]:
import requests
import pandas as pd
from google.cloud import storage

def trigger_wb_data(request):
    # Definir los países, indicadores y años
    countries = ["ARG", "BOL", "BRA", "CAN", "CHL", "COL", "CRI", "CUB", "DOM", "ECU", "SLV", "GTM", "HTI", "HND", "JAM", "MEX", "NIC", "PAN", "PRY", "PER", "USA", "URY", "VEN"]
    indicators = ['EN.ATM.CO2E.PP.GD', 'SP.DYN.LE00.IN', 'SH.H2O.SMDW.ZS', 'SP.RUR.TOTL.ZS', 'SP.DYN.CDRT.IN','SH.XPD.GHED.GD.ZS','SP.DYN.LE00.FE.IN','SE.ADT.LITR.ZS','SP.DYN.AMRT.FE', 'SP.DYN.AMRT.MA', 'SP.DYN.IMRT.IN','NY.GDP.PCAP.PP.KD', 'SP.DYN.CBRT.IN']
    start_year = "2022"

    # Lista para almacenar los datos
    data_frames = []

    # URL base de la API del Banco Mundial
    base_url = "http://api.worldbank.org/v2/country"

    # Realizar las consultas para cada país, indicador y año
    for country_code in countries:
        for indicator in indicators:
            # Construir la URL de la consulta
            url = f"{base_url}/{country_code}/indicator/{indicator}?date={start_year}&format=json"

            # Realizar la solicitud GET a la API del Banco Mundial
            response = requests.get(url)

            # Verificar si la solicitud fue exitosa
            if response.status_code == 200:
                data = response.json()

                if data[1]:
                # Los datos se encuentran en data[1]
                    for entry in data[1]:
                        year = entry['date']
                        value = entry['value']
                        indicator_name = entry['indicator']['value']
                        country_name = entry['country']['value']
                        data_frames.append(pd.DataFrame({"Pais": [country_name], "Indicador": [indicator_name], "Anio": [year], "Valor": [value]}))
                else:
                    print(f"No hay datos disponibles para {country_code} y {indicator}.")
            else:
                print(f"Error al obtener datos para {country_code} y {indicator}. Código de estado: {response.status_code}")

    # Concatenar todos los DataFrames individuales en uno
    data_df = pd.concat(data_frames, ignore_index=True)

    # Convertir DataFrame a formato CSV en memoria sin incluir encabezados ni índices
    csv_content = data_df.to_csv(index=False, header=False)

    # Configurar el cliente de Cloud Storage
    client = storage.Client()

    # Especificar el nombre del archivo CSV en tu bucket
    bucket_name = 'bucket-extraction'
    file_name = 'wb_data.csv'
    
    # Obtener el bucket
    bucket = client.get_bucket(bucket_name)

    # Verificar si el archivo ya existe en el bucket
    blob = bucket.blob(file_name)

    if blob.exists():
        # Descargar el contenido existente del archivo CSV en el bucket
        existing_content = blob.download_as_text()

        # Verificar si la información del año 2022 ya está en el archivo existente
        if '2022' not in existing_content.split('\n'):
            # Agregar una línea vacía si el archivo existente no termina con una línea vacía
            if existing_content and not existing_content.endswith('\n'):
                existing_content += '\n'
            
            # Concatenar el nuevo contenido con el contenido existente
            csv_content = existing_content + csv_content

            # Cargar el contenido actualizado en el archivo CSV en el bucket
            blob.upload_from_string(csv_content, content_type='text/csv')

            print(f"Información del año 2022 añadida a {bucket_name}/{file_name}")
        else:
            print(f"Información del año 2022 ya existe en {bucket_name}/{file_name}. No se ha añadido nada.")
    else:
        # Si el archivo no existe, cargar el contenido del DataFrame como nuevo archivo
        blob.upload_from_string(csv_content, content_type='text/csv')

        print(f"Nuevo archivo CSV creado en {bucket_name}/{file_name}")

    return "Proceso completado exitosamente"

#requests
#pandas
#google-cloud-storage
#fsspec
#gcsfs