# HW 06: ETLs y ELTs on MBanco de Mexico and INEGI

## Introducción


## Objetivo

* Mostrar cómo se crea un ETL
* Mostrar cómo se crea un ELT
* Mostrar cómo se utilizan los servicios Amazon S3, AWS Glue y Amazon Athena 
para tareas de ingeniería de datos y analítica.

## Herramientas que aprenderas a utilizar:

#

## ETL


El objetivo de esta primera parte es sacar de los siguientes links la siguiente
información:

* [Banco de Mexico mxn/uds](https://www.banxico.org.mx/tipcamb/tipCamIHAction.do)
* [Banco de Mexico tase de interes](https://www.banxico.org.mx/apps/gc/tasas-interes-grafica-coyuntu.html)
* [INEGI](https://www.inegi.org.mx/temas/inpc/)



### a) Extract

Lo que queremos es automatizar la extracción de datos, y no hacer un típico
*copy and paste*, para lo cual lo vamos a implementar un en un webscrapper, que
haga un *crawl* sobre las tablas del HTML. 


* [BeautifulSoup](https://beautiful-soup-4.readthedocs.io/en/latest/#)
* [Selenium](https://www.selenium.dev/)

In [1]:
# Importar librerías
import os
import pandas as pd
import numpy as np

import requests
import json
from sie_banxico import SIEBanxico

#### Tabla del Banco de Mexico y INEGI

Para extraer la información de la tabla del Banco de Mexico y del INEGI vamos a la fucente de datos y utilizamos la función `pandas.read_html` para extraer la información de la tabla.

In [2]:
def download_bmx_serie(serie, fechainicio, fechafin, token):
    """
    Descarga una serie de datos del Banco de México (Banxico) utilizando su API.

    Parameters
    ----------
    serie : str
        El identificador de la serie de datos a descargar.
    fechainicio : str
        La fecha de inicio del rango de datos en formato 'YYYY-MM-DD'.
    fechafin : str
        La fecha de fin del rango de datos en formato 'YYYY-MM-DD'.
    token : str
        El token de autenticación para acceder a la API de Banxico.

    Returns
    -------
    pd.DataFrame
        Un DataFrame de pandas con los datos de la serie solicitada. La columna 'dato' contiene los valores de la serie y la columna 'fecha' contiene las fechas correspondientes.
    
    Raises
    ------
    ValueError
        Si la respuesta de la API no es exitosa (código de estado diferente de 200).
    """
    url = f'https://www.banxico.org.mx/SieAPIRest/service/v1/series/{serie}/datos/{fechainicio}/{fechafin}'
    print(url)
    headers = {'Bmx-Token': token}
    response = requests.get(url, headers=headers)
    status = response.status_code
    if status != 200:
        raise ValueError(f'Error en la consulta, codigo {status}')
    raw_data = response.json()
    data = raw_data['bmx']['series'][0]['datos']
    df = pd.DataFrame(data)
    df['dato'] = df['dato'].apply(lambda x: float(x))
    df['fecha'] = pd.to_datetime(df['fecha'], format='%d/%m/%Y')
    df.set_index('fecha', inplace=True)
    return df



In [3]:
# Cargar variables de configuración
import configparser
config = configparser.ConfigParser()
config.read('../../config.ini')

BANXICO_TOKEN = "98230f9bf97afa035012284a92d8fff12edd05c027bc2cde505c7ec1848533eb" #config['TOKEN']['BANXICO_TOKEN']
# get time series of exchange rate (mxn/usd) from Banxico
# Download the information from PATH_BDM_P2D
serie_exchange = 'SF60653'
fechainicio = '2015-01-01'
fechafin = '2025-03-15'
df_mxn2usd = download_bmx_serie(serie_exchange, fechainicio, fechafin, BANXICO_TOKEN)
df_mxn2usd.to_csv("../../data/raw/tipo_de_cambio.csv", index=False)

serie_interes = 'SF61745'
df_interes = download_bmx_serie(serie_interes, fechainicio, fechafin, BANXICO_TOKEN)
# get time series of exchange rate (mxn/usd) from Banxico
# Save the dataframe to tipo_de_cambio.csv in data/raw directory
df_interes.to_csv("../../data/raw/tasa_de_interes.csv", index=False)

https://www.banxico.org.mx/SieAPIRest/service/v1/series/SF60653/datos/2015-01-01/2025-03-15
https://www.banxico.org.mx/SieAPIRest/service/v1/series/SF61745/datos/2015-01-01/2025-03-15


In [4]:
print(df_interes.size)
df_interes['fecha'] = df_interes.index
df_interes.reset_index(drop=True, inplace=True)
df_interes.head()

3716


Unnamed: 0,dato,fecha
0,3.0,2015-01-01
1,3.0,2015-01-02
2,3.0,2015-01-03
3,3.0,2015-01-04
4,3.0,2015-01-05


In [5]:
print(df_mxn2usd.size)
df_mxn2usd['fecha']=df_mxn2usd.index
df_mxn2usd.reset_index(drop=True, inplace=True)
df_mxn2usd.head()

3727


Unnamed: 0,dato,fecha
0,14.7348,2015-01-01
1,14.7348,2015-01-02
2,14.7414,2015-01-03
3,14.7414,2015-01-04
4,14.7414,2015-01-05


In [None]:
# AP

def download_inegi_serie(token, serie):
    """
    Descarga una serie de datos del INEGI utilizando su API.

    Parameters
    ----------
    token : str
        El token de autenticación para acceder a la API de INEGI.
    serie : str
        El identificador de la serie de datos a descargar.

    Returns
    -------
    pd.DataFrame
        Un DataFrame de pandas con los datos de la serie solicitada. La columna 'OBS_VALUE' contiene los valores de la serie y la columna 'TIME_PERIOD' contiene los periodos correspondientes.
    
    Raises
    ------
    ValueError
        Si la respuesta de la API no es exitosa (código de estado diferente de 200).
    """
    url = f"https://www.inegi.org.mx/app/api/indicadores/desarrolladores/jsonxml/INDICATOR/{serie}/es/0700/false/BIE/2.0/{token}?type=json"
    response = requests.get(url)
    if response.status_code != 200:
        raise ValueError(f'Error en la consulta, codigo {response.status_code}')
    
    data_general = response.text
    flow_data = json.loads(data_general)

    # Extract inflation data
    inflation_series = flow_data['Series'][0]['OBSERVATIONS']
    inflation_data = [(obs['TIME_PERIOD'], obs['OBS_VALUE']) for obs in inflation_series]

    # Save the inflation data to a DataFrame
    df_inflation = pd.DataFrame(inflation_data, columns=['TIME_PERIOD', 'OBS_VALUE'])
    return df_inflation





In [None]:
# Call INEGI series
#INEGI_TOKEN = config['TOKEN']['INEGI_TOKEN']
INEGI_TOKEN = "68ffaa78-1a6b-e3a6-c7cc-91d2c7a04426"
serie_inflation = "910417"
df_inflation = download_inegi_serie(INEGI_TOKEN, serie_inflation)
df_inflation.to_csv("../../data/raw/inflation.csv", index=False)

Observa que el scrapper regreso 14 tablas.

Puedes investigar que hay en cada una de las 14 tablas, para que identifiques
dónde se encuentra la tabla que estamos buscando.

In [8]:
df_inflation.head()

Unnamed: 0,TIME_PERIOD,OBS_VALUE
0,2025/02,-0.53
1,2025/01,-0.14
2,2024/12,5.95
3,2024/11,5.98
4,2024/10,4.18


### b) Transform

Transform the data in the same format: *period* and *value*

In [9]:
# Define un diccionario con los nombres limpios
DICT_CLEAN_NAMES_INEGI = {
    'TIME_PERIOD': 'period',
    'OBS_VALUE': 'inflation'
}
print(DICT_CLEAN_NAMES_INEGI)

DICT_CLEAN_NAMES_BANXICO_EXCHANGE = {
    'fecha': 'period',
    'dato': 'exchange_rate'
}
print(DICT_CLEAN_NAMES_BANXICO_EXCHANGE)

DICT_CLEAN_NAMES_BANXICO_INTERES = {
    'fecha': 'period',
    'dato': 'interes_rate'
}
print(DICT_CLEAN_NAMES_BANXICO_INTERES)

{'TIME_PERIOD': 'period', 'OBS_VALUE': 'inflation'}
{'fecha': 'period', 'dato': 'exchange_rate'}
{'fecha': 'period', 'dato': 'interes_rate'}


In [10]:
# Aqui va el preprocesamiento: exchange rate
df_mxn2usd_raw = (
    df_mxn2usd
        # Renombrar variables
        .rename(columns = DICT_CLEAN_NAMES_BANXICO_EXCHANGE)
)
df_mxn2usd_raw = df_mxn2usd_raw[['period', 'exchange_rate']]    
df_mxn2usd_raw.head()


Unnamed: 0,period,exchange_rate
0,2015-01-01,14.7348
1,2015-01-02,14.7348
2,2015-01-03,14.7414
3,2015-01-04,14.7414
4,2015-01-05,14.7414


In [11]:
# Aqui va el preprocesamiento: interes rate
df_interes_raw = (
    df_interes
        # Renombrar variables
        .rename(columns = DICT_CLEAN_NAMES_BANXICO_INTERES)
        
)
df_interes_raw = df_interes_raw[['period', 'interes_rate']]
df_interes_raw.head()

Unnamed: 0,period,interes_rate
0,2015-01-01,3.0
1,2015-01-02,3.0
2,2015-01-03,3.0
3,2015-01-04,3.0
4,2015-01-05,3.0


Guarda los datos de guests en local.

In [12]:
# Aqui va el preprocesamiento: infaltion
df_inflation_raw = (
    df_inflation
        # Renombrar variables
        .rename(columns = DICT_CLEAN_NAMES_INEGI)
        
)
df_inflation_raw['period'] = pd.to_datetime(df_inflation_raw['period'], format='%Y/%m').dt.to_period('M').dt.to_timestamp('M')
df_inflation_raw.head()

Unnamed: 0,period,inflation
0,2025-02-28,-0.53
1,2025-01-31,-0.14
2,2024-12-31,5.95
3,2024-11-30,5.98
4,2024-10-31,4.18


and save the data in a csv file in the loccal directory data/raw/.

In [13]:
# Guarda los datos en local
df_inflation_raw.to_csv("../../data/processed/inflation.csv", index=False)
df_mxn2usd_raw.to_csv("../../data/processed/tipo_de_cambio.csv", index=False)
df_interes_raw.to_csv("../../data/processed/tasa_de_interes.csv", index=False)

In [14]:
# Convert period to datetime format and set to the first day of the month
df_inflation_raw['period'] = pd.to_datetime(df_inflation_raw['period'], format='%Y-%m-%d')
df_interes_raw['period'] = pd.to_datetime(df_interes_raw['period'])

# Filter to keep only the first day of the month
df_inflation_raw = df_inflation_raw[df_inflation_raw['period'].dt.is_month_start]
df_interes_raw = df_interes_raw[df_interes_raw['period'].dt.is_month_start]

# Merge the dataframes on the period column
df_combined = pd.merge(df_interes_raw, df_inflation_raw, on='period', how='inner')

# Convert inflation to float
df_combined['inflation'] = df_combined['inflation'].astype(float)

# Display the combined dataframe
df_combined = df_combined.drop_duplicates(subset='period')
print(df_combined)
# save the result in a csv at data/raw
df_combined.to_csv("../../data/processed/econ.csv", index=False)

Empty DataFrame
Columns: [period, interes_rate, inflation]
Index: []


In [15]:
df_interes_raw

Unnamed: 0,period,interes_rate
0,2015-01-01,3.00
31,2015-02-01,3.00
59,2015-03-01,3.00
90,2015-04-01,3.00
120,2015-05-01,3.00
...,...,...
3582,2024-11-01,10.50
3611,2024-12-01,10.25
3642,2025-01-01,10.00
3673,2025-02-01,10.00


### c) Load to data lake

A
* Para crear el bucket, tenemos que crear un cliente, para conectarnos al 
servicio desde Python. Para ello vamos a utilizar la libería `boto3` que es el
Python SDK de AWS para interactuar con todos los servicios. Checa la
[documentación de boto3 para S3](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.upload_file)

* En S3 tienes que guardar cada tabla, osea cada S3, en un directorio por 
separado. Si no lo haces así vas a tener muchos problemas para utilizar otros 
servicios como el AWS Glue y Amazon Athena.
    - Si tienes muchos archivos de ventas, no hay problema pueden estar dentro
    de un bucket en un mismo path. Lo que no conviene es tener en el mismo
    path ventas y precios.

Ahora ejecuta y revisa con cuidado cada línea del código.

In [16]:
# Abres un cliente de S3
import boto3
session = boto3.Session(profile_name='arquitectura', 
                        region_name='us-east-1') ## added this line to define the region
s3 = session.client('s3')

In [17]:
# Crea un nuevo Bucket
# OJO: Aqui cambia el nombre del bucket, utiliza tu nombre como sufijo, para
# que sea único en el mundo, si no te va a salir error. Si tuviste éxito, el
# API de boto3 te va a regresar un HTTPStatusCode de 200. Eso es éxito.
BUCKET_NAME = "itam-analytics-thmrudolf"
s3.create_bucket(Bucket=BUCKET_NAME)

{'ResponseMetadata': {'RequestId': 'WWZH9T4SZY8S3Q6J',
  'HostId': 'Nb9f83REDUfgdxSzR4Amd7YY0XUpPRamyrS+IAEpPPv8Mt/yNsLZGjTZqqmW0Gg78UmOXeiyV6T2XPwUl5d9/w==',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'Nb9f83REDUfgdxSzR4Amd7YY0XUpPRamyrS+IAEpPPv8Mt/yNsLZGjTZqqmW0Gg78UmOXeiyV6T2XPwUl5d9/w==',
   'x-amz-request-id': 'WWZH9T4SZY8S3Q6J',
   'date': 'Thu, 20 Mar 2025 03:15:57 GMT',
   'location': '/itam-analytics-thmrudolf',
   'content-length': '0',
   'server': 'AmazonS3'},
  'RetryAttempts': 0},
 'Location': '/itam-analytics-thmrudolf'}

Now, all three csv files are in the S3 bucket. (**comment**: a new bucket is created for this task)

Doumentation for futher use [S3 upload a file from boto3](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.upload_file).

In [18]:
# Upload los dos archivos a S3 (raw)
s3.upload_file(Filename="../../data/raw/inflation.csv", Bucket=BUCKET_NAME, Key="hw06/raw/inflation/inflacion.csv")
s3.upload_file(Filename="../../data/raw/tipo_de_cambio.csv", Bucket=BUCKET_NAME, Key="hw06/raw/exchange/tipo_de_cambio.csv")
s3.upload_file(Filename="../../data/raw/tasa_de_interes.csv", Bucket=BUCKET_NAME, Key="hw06/raw/interes_rate/tasa_de_interes.csv")

# Upload los dos archivos a S3 (processed)
s3.upload_file(Filename="../../data/processed/inflation.csv", Bucket=BUCKET_NAME, Key="hw06/processed/inflation/inflacion.csv")
s3.upload_file(Filename="../../data/processed/tipo_de_cambio.csv", Bucket=BUCKET_NAME, Key="hw06/processed/exchange/tipo_de_cambio.csv")
s3.upload_file(Filename="../../data/processed/tasa_de_interes.csv", Bucket=BUCKET_NAME, Key="hw06/processed/interes_rate/tasa_de_interes.csv")

In [19]:
## Upload los dos archivos a S3 (combined "econ")
s3.upload_file(Filename="../../data/processed/econ.csv", Bucket=BUCKET_NAME, Key="econ/processed/econ/econ.csv")

# ELT

Ahora que ya subimos los datos al data lake, podemos utilizarlos para hacer
nuestros análisis o aplicaciones.

En esta sección vamos a utilizar Amazon Athena para hacer transformaciones a 
nuestros datos crudos y cargarlos en nuestro ambiente de Python local para 
hacer los análisis que necesitemos.

Para poder utilizar Amazon Athena, por primera vez, tenemos que configurar un
bucket en Amazon S3 donde se puedan guardar nuestras queries. Esto lo podemos
hacer siguiendo estas instrucciones:

* [Specifying a query result location using the Athena console](https://docs.aws.amazon.com/athena/latest/ug/querying.html#query-results-specify-location-console)

Este paso lo tenemos que hacer una sola vez.

Una vez configurado, para usar nuestros datos en Amazon Athena, lo que tenemos
que hacer es crear una base de datos en AWS Glue Catalog, que lo que hace es
llevar un catálogo de todas las tablas que tenemos en data lake, y que podemos
utilizar rápidamente con otros servicios en la nube.

Después solo tenemos que crear el schema de las tablas y agregarlas a la base 
de datos que creamos. Una vez hecho esto podemos realizar las operaciones en
SQL sobre nuestros datos.


## Configuración 

Sigamos paso a paso cada uno de los pasos descritos anteriormente.

### Crear una base de datos en el Glue Catalog

Para crear la base de datos en AWS Glue Catalog, primero tenemos que llamar
al cliente de Glue.

Aqui te dejo la [documentación](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue.html) para que veas que funciones puedes llamar de 
este servicio.

In [20]:
from botocore.config import Config
my_config = Config(
    region_name = 'us-east-1'
)

glue = boto3.client('glue', config=my_config)

Vamos a llamar la función `create_database`, que necesita un nombre
y una descripción.

In [22]:
response = glue.create_database(
    DatabaseInput={
        'Name': 'hw06',
        'Description': 'Contains data about exchange rate mxn-usd, infaltion and interes rate obtained form Banco de Mexico and INEGI.',
    },
)
response

AlreadyExistsException: An error occurred (AlreadyExistsException) when calling the CreateDatabase operation: Database already exists.

**NOTA**: Si no puedes correr esta línea porque no encuentras la sesión las credenciales, entonces entra a la consola de AWS, y crea la database desde el servicio de Glue.

Los pasos que tienes que seguir son:

* Entra a tu cuenta de AWS
* Navega al servicio de Glue.
* En el menú izquierdo buscao catalog, y luego databases.
* En databases, crea una nueva base de datos.
* Ingresa el `name` y el `description`, toma como referncia los valores que tienes en la celda de arriba.


### Configurar Athena

Necesitamos crear un bucket al que podamos apuntar nuestras queries en 
Amazon Athena. Recuerda poner un nombre único a tu bucket. En este caso puedes
poner `arquitectura-athena-queries-MINOMBRE`

In [None]:
s3.create_bucket(Bucket="arquitectura-athena-queries-thmrrudolf")

{'ResponseMetadata': {'RequestId': 'A5AJADP81SQEQFC1',
  'HostId': 'MTClDEQ+qxPYz/rkMRv+Q+/fiVz5d+HupfSFJWYu1bqd3tLCHLG6o0BcD0YzeVu31YEvTb6awxg=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'MTClDEQ+qxPYz/rkMRv+Q+/fiVz5d+HupfSFJWYu1bqd3tLCHLG6o0BcD0YzeVu31YEvTb6awxg=',
   'x-amz-request-id': 'A5AJADP81SQEQFC1',
   'date': 'Thu, 20 Mar 2025 03:03:09 GMT',
   'location': '/arquitectura-athena-queries-thmrrudolf',
   'content-length': '0',
   'server': 'AmazonS3'},
  'RetryAttempts': 0},
 'Location': '/arquitectura-athena-queries-thmrrudolf'}