# ELT

Crea la base de datos en AWS Glue y carga los datos en ella a través de Athena.

## Librerías

In [6]:
import yaml
import boto3
import awswrangler as wr

## Parámetros de configuración

In [8]:
with open('config.yaml') as f:
    config = yaml.safe_load(f)

BUCKET_NAME = config["aws"]["bucket_name"]
DATABASE_NAME = config["aws"]["database_name"]
REGION = config["aws"]["region"]
ATHENA_OUTPUT = config["aws"]["athena_output"]

## Crear la base de datos en AWS Glue

In [None]:
# Crea el cliente para glue
glue = boto3.client('glue', region_name=REGION)

database_input = {
    'Name': DATABASE_NAME,
    'Description': 'Base de datos de indicadores económicos'
}

In [4]:
try:
    glue.create_database(DatabaseInput=database_input)
    print(f"La base de datos '{DATABASE_NAME}' ha sido creada exitosamente.")
except ClientError as e:
    # Si la base de datos ya existe
    if e.response['Error']['Code'] == 'AlreadyExistsException':
        print(f"La base de datos '{DATABASE_NAME}' ya existe.")
    else:
        print(f"Error al crear la base de datos: {e}")

La base de datos 'econ' ha sido creada exitosamente.


## Crear las tablas dentro de la base de datos con Athena

In [15]:
# Construir la ruta de salida de Athena
athena_output = f's3://{BUCKET_NAME}/{ATHENA_OUTPUT}'
print(f"La ruta de salida de Athena es: {athena_output}")

La ruta de salida de Athena es: s3://itam-analytics-ferlango/athena_results


### Tipo de cambio

In [16]:
raw_tipo_de_cambio = f"s3://{BUCKET_NAME}/{DATABASE_NAME}/raw/tipo_de_cambio/"

ddl_tipo_de_cambio = f"""
CREATE EXTERNAL TABLE IF NOT EXISTS tipo_de_cambio (
    timestamp date,
    tipo_de_cambio double
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES ('field.delim' = ',')
STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION '{raw_tipo_de_cambio}'
TBLPROPERTIES ('classification' = 'csv', "skip.header.line.count"="1");
"""

wr.athena.start_query_execution(
    sql=ddl_tipo_de_cambio,
    database=DATABASE_NAME,
    s3_output=athena_output
)
print("Tabla 'tipo_de_cambio' creada en la base de datos", DATABASE_NAME)

Tabla 'tipo_de_cambio' creada en la base de datos econ


### Tasa de interés

In [17]:
raw_tasa_de_interes = f"s3://{BUCKET_NAME}/{DATABASE_NAME}/raw/tasa_de_interes/"

ddl_tasa_de_interes = f"""
CREATE EXTERNAL TABLE IF NOT EXISTS tasa_de_interes (
    timestamp date,
    tasa_de_interes double
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES ('field.delim' = ',')
STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION '{raw_tasa_de_interes}'
TBLPROPERTIES ('classification' = 'csv', "skip.header.line.count"="1");
"""

wr.athena.start_query_execution(
    sql=ddl_tasa_de_interes,
    database=DATABASE_NAME,
    s3_output=athena_output
)

print("Tabla 'tasa_de_interes' creada en la base de datos", DATABASE_NAME)

Tabla 'tasa_de_interes' creada en la base de datos econ


### Inflación

In [18]:
raw_inflacion = f"s3://{BUCKET_NAME}/{DATABASE_NAME}/raw/inflacion/"

ddl_inflacion = f"""
CREATE EXTERNAL TABLE IF NOT EXISTS inflacion (
    timestamp date,
    inflacion double
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES ('field.delim' = ',')
STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION '{raw_inflacion}'
TBLPROPERTIES ('classification' = 'csv', "skip.header.line.count"="1");
"""

wr.athena.start_query_execution(
    sql=ddl_inflacion,
    database=DATABASE_NAME,
    s3_output=athena_output
)

print("Tabla 'inflacion' creada en la base de datos", DATABASE_NAME)

Tabla 'inflacion' creada en la base de datos econ


### Mensual

In [26]:
processed_mensual = f"s3://{BUCKET_NAME}/{DATABASE_NAME}/processed/mensual/"

# Eliminar los objetos en S3 para asegurar que se actualicen
s3 = boto3.resource('s3')
bucket = s3.Bucket(BUCKET_NAME)
bucket.objects.filter(Prefix=f"{DATABASE_NAME}/processed/mensual/").delete()

# Elimina la tabla si existe para asegurar que se actualice
drop_query = "DROP TABLE IF EXISTS mensual"
wr.athena.start_query_execution(
    sql=drop_query,
    database=DATABASE_NAME,
    s3_output=f"s3://{BUCKET_NAME}/{ATHENA_OUTPUT}/")

print("Tabla 'mensual' eliminada de la base de datos", DATABASE_NAME)

Tabla 'mensual' eliminada de la base de datos econ


In [27]:
# Crea la tabla materializada con CTAS
ddl_mensual = f"""
CREATE TABLE IF NOT EXISTS mensual
WITH (
    format = 'PARQUET',
    external_location = '{processed_mensual}'
) AS
SELECT 
    i.timestamp AS date,
    tc.avg_tipo_de_cambio AS tipo_de_cambio,
    ti.avg_tasa_de_interes AS tasa_de_interes,
    i.inflacion
FROM (
    SELECT timestamp, inflacion, year(timestamp) AS year, month(timestamp) AS month
    FROM inflacion
) i
INNER JOIN (
    SELECT year(timestamp) AS year, month(timestamp) AS month, AVG(tipo_de_cambio) AS avg_tipo_de_cambio
    FROM tipo_de_cambio
    GROUP BY year(timestamp), month(timestamp)
) tc
    ON i.year = tc.year AND i.month = tc.month
INNER JOIN (
    SELECT year(timestamp) AS year, month(timestamp) AS month, AVG(tasa_de_interes) AS avg_tasa_de_interes
    FROM tasa_de_interes
    GROUP BY year(timestamp), month(timestamp)
) ti
    ON i.year = ti.year AND i.month = ti.month
"""

wr.athena.start_query_execution(
    sql=ddl_mensual,
    database=DATABASE_NAME,
    s3_output=f"s3://{BUCKET_NAME}/{ATHENA_OUTPUT}/"
)

print("Tabla 'mensual' creada en formato Parquet con datos consolidados mensualmente.")

Tabla 'mensual' creada en formato Parquet con datos consolidados mensualmente.
