In [1]:
# AWS Glue Catalog and Athena Tables Creation
# Este script crea las definiciones de tablas en AWS Glue para los datos económicos
# subidos al bucket S3, permitiendo consultas con Amazon Athena

import boto3
import time
from botocore.exceptions import ClientError

In [2]:
# Definimos las constantes
BUCKET_NAME = "itam-analytics-ragp"
S3_RAW_DIRECTORY = "raw/"
PROFILE_NAME = "datascientist"
DATABASE_NAME = "econ"


In [3]:
def conectar_glue():
    """
    Establece conexión con AWS Glue y Athena usando el perfil especificado.
    """
    try:
        # Creamos una sesión usando el perfil
        session = boto3.Session(profile_name=PROFILE_NAME)
        
        # Creamos el cliente Glue
        glue_client = session.client('glue')
        
        # Creamos el cliente Athena
        athena_client = session.client('athena')
        
        print(f"✅ Conexión establecida con AWS Glue y Athena.")
        return glue_client, athena_client
    
    except Exception as e:
        print(f"❌ Error al conectar con AWS: {str(e)}")
        return None, None


In [4]:
def crear_base_datos(glue_client):
    """
    Crea la base de datos en AWS Glue Catalog si no existe.
    """
    try:
        # Verificamos si la base de datos ya existe
        try:
            glue_client.get_database(Name=DATABASE_NAME)
            print(f"ℹ️ Base de datos '{DATABASE_NAME}' ya existe.")
            return True
        except ClientError as e:
            if e.response['Error']['Code'] == 'EntityNotFoundException':
                # Creamos la base de datos
                glue_client.create_database(
                    DatabaseInput={
                        'Name': DATABASE_NAME,
                        'Description': 'Indicadores económicos de México: tipo de cambio, inflación y tasas de interés',
                    }
                )
                print(f"✅ Base de datos '{DATABASE_NAME}' creada exitosamente.")
                return True
            else:
                raise
    
    except Exception as e:
        print(f"❌ Error al crear la base de datos: {str(e)}")
        return False


In [5]:
def crear_tabla_tipo_cambio(glue_client):
    """
    Crea la tabla para los datos de tipo de cambio.
    """
    tabla_nombre = "tipo_cambio"
    s3_ruta = f"s3://{BUCKET_NAME}/{S3_RAW_DIRECTORY}{tabla_nombre}"
    
    try:
        # Verificamos si la tabla ya existe
        try:
            glue_client.get_table(DatabaseName=DATABASE_NAME, Name=tabla_nombre)
            print(f"ℹ️ Tabla '{tabla_nombre}' ya existe.")
            return True
        except ClientError as e:
            if e.response['Error']['Code'] == 'EntityNotFoundException':
                # Configuración de la tabla
                tabla_input = {
                    'Name': tabla_nombre,
                    'Description': 'Tipo de cambio FIX (peso/dólar) de Banxico',
                    'StorageDescriptor': {
                        'Columns': [
                            {'Name': 'tipo_cambio', 'Type': 'double'},
                            {'Name': 'fecha', 'Type': 'date'}
                        ],
                        'Location': s3_ruta,
                        'InputFormat': 'org.apache.hadoop.mapred.TextInputFormat',
                        'OutputFormat': 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat',
                        'SerdeInfo': {
                            'SerializationLibrary': 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe',
                            'Parameters': {
                                'field.delim': ','
                            }
                        }
                    },
                    'TableType': 'EXTERNAL',
                    'Parameters': {
                        'classification': 'csv',
                        'skip.header.line.count': '1'
                    }
                }
                
                # Creamos la tabla
                glue_client.create_table(
                    DatabaseName=DATABASE_NAME,
                    TableInput=tabla_input
                )
                
                print(f"✅ Tabla '{tabla_nombre}' creada exitosamente.")
                return True
            else:
                raise
    
    except Exception as e:
        print(f"❌ Error al crear la tabla '{tabla_nombre}': {str(e)}")
        return False

In [6]:
def crear_tabla_inflacion(glue_client):
    """
    Crea la tabla para los datos de inflación.
    """
    tabla_nombre = "inflacion"
    s3_ruta = f"s3://{BUCKET_NAME}/{S3_RAW_DIRECTORY}{tabla_nombre}"
    
    try:
        # Verificamos si la tabla ya existe
        try:
            glue_client.get_table(DatabaseName=DATABASE_NAME, Name=tabla_nombre)
            print(f"ℹ️ Tabla '{tabla_nombre}' ya existe.")
            return True
        except ClientError as e:
            if e.response['Error']['Code'] == 'EntityNotFoundException':
                # Configuración de la tabla
                tabla_input = {
                    'Name': tabla_nombre,
                    'Description': 'Datos de inflación mensual (INPC) del INEGI',
                    'StorageDescriptor': {
                        'Columns': [
                            {'Name': 'fecha', 'Type': 'date'},
                            {'Name': 'inflacion_mensual', 'Type': 'double'}
                        ],
                        'Location': s3_ruta,
                        'InputFormat': 'org.apache.hadoop.mapred.TextInputFormat',
                        'OutputFormat': 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat',
                        'SerdeInfo': {
                            'SerializationLibrary': 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe',
                            'Parameters': {
                                'field.delim': ','
                            }
                        }
                    },
                    'TableType': 'EXTERNAL',
                    'Parameters': {
                        'classification': 'csv',
                        'skip.header.line.count': '1'
                    }
                }
                
                # Creamos la tabla
                glue_client.create_table(
                    DatabaseName=DATABASE_NAME,
                    TableInput=tabla_input
                )
                
                print(f"✅ Tabla '{tabla_nombre}' creada exitosamente.")
                return True
            else:
                raise
    
    except Exception as e:
        print(f"❌ Error al crear la tabla '{tabla_nombre}': {str(e)}")
        return False


In [7]:

def crear_tabla_tasas_interes(glue_client):
    """
    Crea la tabla para los datos de tasas de interés.
    """
    tabla_nombre = "tasas_interes"
    s3_ruta = f"s3://{BUCKET_NAME}/{S3_RAW_DIRECTORY}{tabla_nombre}"
    
    try:
        # Verificamos si la tabla ya existe
        try:
            glue_client.get_table(DatabaseName=DATABASE_NAME, Name=tabla_nombre)
            print(f"ℹ️ Tabla '{tabla_nombre}' ya existe.")
            return True
        except ClientError as e:
            if e.response['Error']['Code'] == 'EntityNotFoundException':
                # Configuración de la tabla
                tabla_input = {
                    'Name': tabla_nombre,
                    'Description': 'Tasas de interés CETES 28 días de Banxico',
                    'StorageDescriptor': {
                        'Columns': [
                            {'Name': 'tasa_cetes28', 'Type': 'double'},
                            {'Name': 'fecha', 'Type': 'date'}
                        ],
                        'Location': s3_ruta,
                        'InputFormat': 'org.apache.hadoop.mapred.TextInputFormat',
                        'OutputFormat': 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat',
                        'SerdeInfo': {
                            'SerializationLibrary': 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe',
                            'Parameters': {
                                'field.delim': ','
                            }
                        }
                    },
                    'TableType': 'EXTERNAL',
                    'Parameters': {
                        'classification': 'csv',
                        'skip.header.line.count': '1'
                    }
                }
                
                # Creamos la tabla
                glue_client.create_table(
                    DatabaseName=DATABASE_NAME,
                    TableInput=tabla_input
                )
                
                print(f"✅ Tabla '{tabla_nombre}' creada exitosamente.")
                return True
            else:
                raise
    
    except Exception as e:
        print(f"❌ Error al crear la tabla '{tabla_nombre}': {str(e)}")
        return False


In [11]:
def crear_tabla_consolidada(glue_client, athena_client):
    """
    Crea una tabla consolidada con los datos de las tres tablas (tipo de cambio,
    inflación y tasas de interés) usando Athena CTAS (Create Table As Select).
    
    La tabla resultante tendrá las columnas:
    - date: fecha en formato YYYY-MM-DD
    - tasa_de_interes: valor de CETES a 28 días
    - inflacion: porcentaje de inflación mensual
    - tipo_de_cambio: valor del tipo de cambio FIX
    """
    tabla_nombre = "indicadores_economicos_consolidados"
    s3_resultados = f"s3://arquitectura-athena-queries-ragp/"
    
    try:
        # Verificamos si la tabla ya existe
        try:
            glue_client.get_table(DatabaseName=DATABASE_NAME, Name=tabla_nombre)
            print(f"ℹ️ Tabla consolidada '{tabla_nombre}' ya existe.")
            return True
        except ClientError as e:
            if e.response['Error']['Code'] == 'EntityNotFoundException':
                # Consulta CTAS para crear la tabla
                consulta = f"""
                CREATE TABLE {DATABASE_NAME}.{tabla_nombre}
                WITH (
                    format = 'PARQUET',
                    external_location = 's3://{BUCKET_NAME}/processed/{tabla_nombre}/',
                    bucketed_by = ARRAY['date'],
                    bucket_count = 1
                )
                AS
                SELECT 
                    tc.fecha AS date,
                    CAST(t.tasa_cetes28 AS DOUBLE) AS tasa_de_interes,
                    CAST(i.inflacion_mensual AS DOUBLE) AS inflacion,
                    CAST(tc.tipo_cambio AS DOUBLE) AS tipo_de_cambio
                FROM 
                    {DATABASE_NAME}.tipo_cambio tc
                JOIN 
                    {DATABASE_NAME}.inflacion i 
                    ON DATE_TRUNC('month', tc.fecha) = DATE_TRUNC('month', i.fecha)
                JOIN 
                    {DATABASE_NAME}.tasas_interes t 
                    ON DATE_TRUNC('month', tc.fecha) = DATE_TRUNC('month', t.fecha)
                ORDER BY 
                    tc.fecha;
                """
                
                # Ejecutamos la consulta CTAS
                response = athena_client.start_query_execution(
                    QueryString=consulta,
                    QueryExecutionContext={
                        'Database': DATABASE_NAME
                    },
                    ResultConfiguration={
                        'OutputLocation': s3_resultados,
                    }
                )
                
                query_execution_id = response['QueryExecutionId']
                print(f"✅ Creación de tabla consolidada iniciada con ID: {query_execution_id}")
                
                # Esperamos a que la consulta se complete
                estado = 'RUNNING'
                print("⏳ Creando tabla consolidada (esto puede tomar un tiempo)...")
                while estado in ['RUNNING', 'QUEUED']:
                    time.sleep(5)  # Esperamos 5 segundos antes de verificar de nuevo
                    response = athena_client.get_query_execution(QueryExecutionId=query_execution_id)
                    estado = response['QueryExecution']['Status']['State']
                
                # Verificamos el resultado final
                if estado == 'SUCCEEDED':
                    print(f"✅ Tabla consolidada '{tabla_nombre}' creada exitosamente.")
                    
                    # Verificamos los datos con una consulta simple
                    consulta_verificacion = f"""
                    SELECT 
                        date, 
                        tasa_de_interes, 
                        inflacion, 
                        tipo_de_cambio
                    FROM 
                        {DATABASE_NAME}.{tabla_nombre}
                    ORDER BY date
                    LIMIT 5;
                    """
                    
                    response = athena_client.start_query_execution(
                        QueryString=consulta_verificacion,
                        QueryExecutionContext={
                            'Database': DATABASE_NAME
                        },
                        ResultConfiguration={
                            'OutputLocation': s3_resultados,
                        }
                    )
                    
                    query_execution_id = response['QueryExecutionId']
                    
                    # Esperamos a que la consulta se complete
                    estado = 'RUNNING'
                    while estado in ['RUNNING', 'QUEUED']:
                        time.sleep(2)
                        response = athena_client.get_query_execution(QueryExecutionId=query_execution_id)
                        estado = response['QueryExecution']['Status']['State']
                    
                    if estado == 'SUCCEEDED':
                        resultados = athena_client.get_query_results(QueryExecutionId=query_execution_id)
                        
                        # Imprimimos la cabecera
                        cabecera = [col['Label'] for col in resultados['ResultSet']['ResultSetMetadata']['ColumnInfo']]
                        print("\nMuestra de datos en la tabla consolidada:")
                        print(f"| {' | '.join(cabecera)} |")
                        print("|" + "-" * (len(" | ".join(cabecera)) + 2) + "|")
                        
                        # Imprimimos las primeras filas
                        for fila in resultados['ResultSet']['Rows'][1:]:  # Omitimos la primera fila que es la cabecera
                            valores = [item.get('VarCharValue', 'NULL') for item in fila['Data']]
                            print(f"| {' | '.join(valores)} |")
                    
                    return True
                else:
                    error_info = response['QueryExecution']['Status'].get('StateChangeReason', 'Desconocido')
                    print(f"❌ La creación de la tabla consolidada falló: {estado} - {error_info}")
                    return False
            else:
                raise
    
    except Exception as e:
        print(f"❌ Error al crear la tabla consolidada: {str(e)}")
        return False

In [8]:
"""
Función principal que orquesta la creación de bases de datos y tablas.
"""
print("\n🔍 Iniciando creación de catálogo y tablas en AWS Glue/Athena")

# Conectamos con Glue y Athena
glue_client, athena_client = conectar_glue()
if not glue_client or not athena_client:
    print("❌ No se pudo establecer conexión con AWS. Abortando proceso.")



🔍 Iniciando creación de catálogo y tablas en AWS Glue/Athena
✅ Conexión establecida con AWS Glue y Athena.


In [9]:

# Creamos la base de datos
if not crear_base_datos(glue_client):
    print("❌ No se pudo crear la base de datos. Abortando proceso.")

    

✅ Base de datos 'econ' creada exitosamente.


In [10]:
# Creamos las tablas
exito_tipo_cambio = crear_tabla_tipo_cambio(glue_client)
exito_inflacion = crear_tabla_inflacion(glue_client)
exito_tasas = crear_tabla_tasas_interes(glue_client)

if exito_tipo_cambio and exito_inflacion and exito_tasas:
    print("\n✅ Todas las tablas han sido creadas exitosamente.")

✅ Tabla 'tipo_cambio' creada exitosamente.
✅ Tabla 'inflacion' creada exitosamente.
✅ Tabla 'tasas_interes' creada exitosamente.

✅ Todas las tablas han sido creadas exitosamente.


In [12]:
# Creamos la tabla consolidada
print("\n🔄 Creando tabla consolidada con los tres indicadores...")
crear_tabla_consolidada(glue_client, athena_client)


🔄 Creando tabla consolidada con los tres indicadores...
✅ Creación de tabla consolidada iniciada con ID: 241eab29-8a7d-4374-8ee4-e2467f07148b
⏳ Creando tabla consolidada (esto puede tomar un tiempo)...
✅ Tabla consolidada 'indicadores_economicos_consolidados' creada exitosamente.

Muestra de datos en la tabla consolidada:
| date | tasa_de_interes | inflacion | tipo_de_cambio |
|-----------------------------------------------------|
| 2004-12-01 | 8.93 | 5.19 | 11.2041 |
| 2005-01-01 | 8.97 | 0.0 | 11.2607 |
| 2005-02-01 | 9.47 | 0.34 | 11.1367 |
| 2005-03-01 | 9.78 | 0.79 | 11.1427 |
| 2005-04-01 | 10.01 | 1.15 | 11.1163 |


True