# Checklist Pre- entrega

Desarrollar un script que extraiga datos de una API pública. A su vez, el alumno debe crear una tabla en
Redshift para posterior carga de los datos extraidos.

In [1]:
import psycopg2
import requests
from tqdm import tqdm
import pandas as pd
import datetime

# Datos de configuración para la conexión
url = 'data-engineer-cluster.cyhh5bfevlmn.us-east-1.redshift.amazonaws.com'
base_de_datos = 'data-engineer-database'
archivo_usuario = 'C:/Users/jeshu/Documents/user_redshift.txt'
archivo_contraseña = 'C:/Users/jeshu/Documents/password_redshift.txt'

## Elección de la API

In [30]:
def obtener_datos_desde_api(codigo_pais, codigo_indicador):
    """
    Realiza una petición a la API del Banco Mundial para obtener datos de un indicador específico de un país.
    
    Parámetros:
    - codigo_pais (str): El código ISO del país.
    - codigo_indicador (str): El código del indicador económico.
    
    Retorna:
    - list: Lista de datos obtenidos de la API, o una lista vacía si hubo un error o no se encontraron datos.
    """
    url = f"http://api.worldbank.org/v2/country/{codigo_pais}/indicator/{codigo_indicador}?format=json&date=2000:2021"
    try:
        response = requests.get(url)
        data = response.json()
        if len(data) == 2 and 'page' in data[0]:
            return data[1]  # data[1] contiene los datos reales
    except requests.RequestException as e:
        print(f"Error al obtener datos desde la API: {e}")
    return []

def transformar_datos(datos_crudos):
    """
    Transforma los datos crudos obtenidos de la API en una lista de diccionarios, preparados para ser convertidos en DataFrame.
    
    Parámetros:
    - datos_crudos (list): Lista de datos crudos obtenidos de la API.
    
    Retorna:
    - list: Lista de diccionarios con los datos transformados, adecuados para conversión a DataFrame.
    """
    if datos_crudos:
        return [
            {'Country Code': dato.get('country', {}).get('id', 'N/A'),
             'Country Name': dato.get('country', {}).get('value', 'N/A'),
             'Indicator Code': dato.get('indicator', {}).get('id', 'N/A'),
             'Year': int(dato.get('date', '1900')),
             'Value': dato.get('value', None),}
            for dato in datos_crudos if dato.get('value') is not None
        ]
    return []

#
# Inicializar sesión de requests
session = requests.Session()

# Indicadores y países
indicadores = ['FR.INR.RINR', 'PA.NUS.FCRF', 'NY.GDP.MKTP.CD', 'NY.GDP.PCAP.CD', 'SP.POP.TOTL', 'FP.CPI.TOTL', 'GC.DOD.TOTL.GD.ZS', 'NE.EXP.GNFS.CD', 'NE.IMP.GNFS.CD', 'FI.RES.TOTL.CD', 'DT.DOD.DECT.CD', 'GC.TAX.TOTL.GD.ZS', 'SL.UEM.TOTL.NE.ZS', 'SH.XPD.CHEX.GD.ZS', 'SE.XPD.TOTL.GD.ZS', 'EG.ELC.ACCS.ZS', 'EG.USE.PCAP.KG.OE', 'EN.ATM.CO2E.PC', 'IT.NET.USER.ZS', 'SP.DYN.CBRT.IN']
indicadores_sanitizados = [indicador.replace('.', '_') for indicador in indicadores]
paises = ['MEX']

resultados = []
for pais in tqdm(paises, desc="Procesando países"):
    for indicador in tqdm(indicadores, desc=f"Indicadores para {pais}"):
        datos_crudos = obtener_datos_desde_api(pais, indicador)
        resultados.extend(transformar_datos(datos_crudos))

# Crear DataFrame
df = pd.DataFrame(resultados)

# Reemplazar puntos por guiones bajos en los nombres de los indicadores en el DataFrame
df['Indicator Code'] = df['Indicator Code'].str.replace('.', '_')

print(df)

Procesando países:   0%|                                                                         | 0/1 [00:00<?, ?it/s]
Indicadores para MEX:   0%|                                                                     | 0/20 [00:00<?, ?it/s][A
Indicadores para MEX:   5%|███                                                          | 1/20 [00:00<00:11,  1.65it/s][A
Indicadores para MEX:  10%|██████                                                       | 2/20 [00:00<00:08,  2.23it/s][A
Indicadores para MEX:  15%|█████████▏                                                   | 3/20 [00:01<00:08,  1.89it/s][A
Indicadores para MEX:  20%|████████████▏                                                | 4/20 [00:01<00:07,  2.23it/s][A
Indicadores para MEX:  25%|███████████████▎                                             | 5/20 [00:02<00:06,  2.47it/s][A
Indicadores para MEX:  30%|██████████████████▎                                          | 6/20 [00:02<00:05,  2.63it/s][A
Indicadores para ME

    Country Code Country Name  Indicator Code  Year      Value
0             MX       Mexico     FR_INR_RINR  2021   0.697497
1             MX       Mexico     FR_INR_RINR  2020   1.452698
2             MX       Mexico     FR_INR_RINR  2019   3.999376
3             MX       Mexico     FR_INR_RINR  2018   2.695227
4             MX       Mexico     FR_INR_RINR  2017   0.717765
..           ...          ...             ...   ...        ...
401           MX       Mexico  SP_DYN_CBRT_IN  2004  22.351000
402           MX       Mexico  SP_DYN_CBRT_IN  2003  22.802000
403           MX       Mexico  SP_DYN_CBRT_IN  2002  23.256000
404           MX       Mexico  SP_DYN_CBRT_IN  2001  23.717000
405           MX       Mexico  SP_DYN_CBRT_IN  2000  24.158000

[406 rows x 5 columns]





## Estructura de la tabla

In [31]:
def crear_conexion_redshift(host, base_de_datos, archivo_usuario, archivo_contraseña, puerto=5439):
    """
    Crea y devuelve una conexión a una base de datos de Redshift.
    
    Parámetros:
        host (str): El hostname del cluster de Redshift.
        base_de_datos (str): El nombre de la base de datos a la que conectar.
        archivo_usuario (str): Ruta al archivo que contiene el nombre de usuario.
        archivo_contraseña (str): Ruta al archivo que contiene la contraseña.
        puerto (int): Puerto para conectar al cluster de Redshift (por defecto 5439).
    
    Retorna:
        Una conexión psycopg2 a Redshift si es exitosa, None en caso contrario.
    """
    try:
        # Leer el usuario y la contraseña de los archivos
        with open(archivo_usuario, 'r') as f:
            usuario = f.read().strip()
        with open(archivo_contraseña, 'r') as f:
            contraseña = f.read().strip()

        # Establecer la conexión
        conexion = psycopg2.connect(
            host=host,
            dbname=base_de_datos,
            user=usuario,
            password=contraseña,
            port=puerto
        )
        print("Conectado a Redshift con éxito!")
        return conexion

    except Exception as e:
        print("No es posible conectar a Redshift")
        print(e)
        return None

def crear_tablas(conexion):
    """
    Crea las tablas 'countries' y 'country_indicators' en la base de datos especificada.

    Parámetros:
    - conexion: Objeto de conexión a la base de datos.

    Las tablas creadas son:
    1. 'countries': Almacena la información de los países con los siguientes campos:
       - country_code VARCHAR(3) PRIMARY KEY,
       - country_name VARCHAR(100).

    2. 'country_indicators': Almacena los indicadores económicos de los países con los siguientes campos:
       - id INTEGER IDENTITY(1,1) PRIMARY KEY,
       - country_code VARCHAR(3) REFERENCES countries(country_code),
       - year INT,
       - ingest_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
       - Una columna para cada indicador con su nombre sanitizado.
    """
    cursor = conexion.cursor()
    
    crear_tabla_paises = """
    CREATE TABLE IF NOT EXISTS countries (
        country_code VARCHAR(3) PRIMARY KEY,
        country_name VARCHAR(100)
    );
    """
    
    crear_tabla_indicadores = f"""
    CREATE TABLE IF NOT EXISTS country_indicators (
        id INTEGER IDENTITY(1,1) PRIMARY KEY,
        country_code VARCHAR(3) REFERENCES countries(country_code),
        year INT,
        {", ".join([f'"{indicador}" DECIMAL(14, 5)' for indicador in indicadores_sanitizados])},
        ingest_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
    );
    """
    
    try:
        cursor.execute(crear_tabla_paises)
        cursor.execute(crear_tabla_indicadores)
        conexion.commit()
        print("Tablas creadas con éxito")

    except Exception as e:
        print("Ocurrió un error al crear las tablas:", e)
        conexion.rollback()

    finally:
        cursor.close()

# Crear y probar la conexión
conexion = crear_conexion_redshift(url, base_de_datos, archivo_usuario, archivo_contraseña)
if conexion:
    crear_tablas(conexion)
    conexion.close()

Conectado a Redshift con éxito!
Tablas creadas con éxito


## Variedad de los datos

In [4]:
def ejecutar_consulta(conexion, consulta_sql):
    """
    Ejecuta una consulta SQL en la base de datos y devuelve los resultados como un DataFrame de pandas.
    
    Parámetros:
        conexion (connection): Conexión a la base de datos.
        consulta_sql (str): La consulta SQL a ejecutar.
    
    Retorna:
        DataFrame: Resultados de la consulta en un DataFrame de pandas.
    """
    try:
        cursor = conexion.cursor()
        cursor.execute(consulta_sql)
        resultados = cursor.fetchall()
        columnas = [desc[0] for desc in cursor.description]
        df_resultados = pd.DataFrame(resultados, columns=columnas)
        return df_resultados
    except Exception as e:
        print("Error al ejecutar la consulta:", e)
        return pd.DataFrame()  # Devuelve un DataFrame vacío en caso de error
    finally:
        cursor.close()

In [5]:
conexion = crear_conexion_redshift(url, base_de_datos, archivo_usuario, archivo_contraseña)
consulta = "SELECT table_schema, table_name FROM information_schema.tables WHERE table_schema NOT IN ('information_schema', 'pg_catalog') ORDER BY table_schema, table_name;"
consulta_df = ejecutar_consulta(conexion, consulta)
print(consulta_df)
conexion.close()

Conectado a Redshift con éxito!
               table_schema          table_name
0  jeshua_romero_coderhouse           countries
1  jeshua_romero_coderhouse  country_indicators


In [38]:
def insertar_datos(conexion, df):
    """
    Inserta los datos del DataFrame en las tablas 'countries' y 'country_indicators' en la base de datos.

    Parámetros:
    - conexion: Objeto de conexión a la base de datos.
    - df (DataFrame): DataFrame con los datos a insertar.
    """
    cursor = conexion.cursor()

    # Verificar que las columnas necesarias existan en el DataFrame
    required_columns = ['Country Code', 'Country Name', 'Year', 'Indicator Code', 'Value']
    for col in required_columns:
        if col not in df.columns:
            raise KeyError(f"La columna '{col}' no se encuentra en el DataFrame.")
    
    # Insertar datos en la tabla 'countries'
    for index, row in df.iterrows():
        cursor.execute("""
            INSERT INTO countries (country_code, country_name)
            VALUES (%s, %s)
            ON CONFLICT (country_code) DO NOTHING;
        """, (row['Country Code'], row['Country Name']))
    
    # Pivotear el DataFrame para que cada indicador sea una columna
    df_pivot = df.pivot_table(index=['Country Code', 'Country Name', 'Year'], columns='Indicator Code', values='Value').reset_index()
    indicadores_sanitizados = [col for col in df_pivot.columns if col not in ['Country Code', 'Country Name', 'Year']]
    
    # Insertar datos en la tabla 'country_indicators'
    for index, row in df_pivot.iterrows():
        valores = [row['Country Code'], row['Year']] + [row[col] if pd.notna(row[col]) else None for col in indicadores_sanitizados]
        cursor.execute(f"""
            INSERT INTO country_indicators (country_code, year, {", ".join(indicadores_sanitizados)})
            VALUES ({", ".join(["%s"] * len(valores))});
        """, valores)

    try:
        conexion.commit()
        print("Datos insertados con éxito")
    except Exception as e:
        print("Ocurrió un error al insertar los datos:", e)
        conexion.rollback()
    finally:
        cursor.close()

conexion = crear_conexion_redshift(url, base_de_datos, archivo_usuario, archivo_contraseña)
# Crear y probar la conexión
if conexion:
    insertar_datos(conexion, df)
    conexion.close()

Conectado a Redshift con éxito!


SyntaxError: syntax error at or near "ON"
LINE 4:             ON CONFLICT (country_code) DO NOTHING;
                    ^


In [35]:
conexion = crear_conexion_redshift(url, base_de_datos, archivo_usuario, archivo_contraseña)
consulta = "SELECT * FROM countries;"
consulta_df = ejecutar_consulta(conexion, consulta)
print(consulta_df)
conexion.close()

Conectado a Redshift con éxito!
Empty DataFrame
Columns: [country_code, country_name]
Index: []


In [36]:
conexion = crear_conexion_redshift(url, base_de_datos, archivo_usuario, archivo_contraseña)
consulta = "SELECT * FROM country_indicators;"
consulta_df = ejecutar_consulta(conexion, consulta)
print(consulta_df)
conexion.close()

Conectado a Redshift con éxito!
Empty DataFrame
Columns: [id, country_code, indicator_code, year, value, last_updated]
Index: []
