# Proyecto 1: Ingestión y Almacenamiento

Este notebook implementa la ingestión y almacenamiento de datos de inspecciones de establecimientos de comida de Chicago.

In [1]:
!pip install pandas sodapy boto3 pyyaml



In [39]:
import pandas as pd
import yaml
import pickle
from datetime import date
from sodapy import Socrata
from botocore.exceptions import ClientError
import yaml
import boto3
from botocore.exceptions import ClientError
from datetime import date

In [20]:
# Cargamos la configuración del YAML. Esto nos ahorra tener que escribir las credenciales directamente en el código.
with open('credentials.yaml', 'r') as f:
    config = yaml.safe_load(f)

token = config['api_chicago']['app_token']
username = config['api_chicago']['username']
password = config['api_chicago']['password']
matricula = config['iexe']['matricula']

# Este es el ID del dataset de inspecciones de comida de Chicago
chicago_dataset = "4ijn-s7e5"

## 1. Función get_client

In [4]:
def get_client():
    # Creamos un cliente para la API de Chicago. Esto es como pedir permiso para acceder a sus datos.
    return Socrata("data.cityofchicago.org", 
                   token,
                   username=username,
                   password=password)

## 2. Ingesta Inicial

In [5]:
def ingesta_inicial(chicago_dataset, client, limit):
    # Aquí hacemos la primera carga grande de datos. Es como llenar un camión con toda la información disponible.
    return client.get(chicago_dataset, limit=limit, offset=0, order='inspection_date')

client = get_client()
print("Iniciando la carga inicial de datos...")
datasets = ingesta_inicial(chicago_dataset, client, 300000)
print(f"Se obtuvieron {len(datasets)} registros en la carga inicial.")

Iniciando la carga inicial de datos...
Se obtuvieron 280438 registros en la carga inicial.


## 3. Almacenamiento de Ingesta Inicial

In [38]:
# Leer el archivo de credenciales
with open('credentials.yaml', 'r') as file:
    credentials = yaml.safe_load(file)

# Imprimir información relevante sin comprometer la seguridad
print("Información de API Chicago:")
print(f"Username: {credentials['api_chicago']['username']}")
print(f"App Token: {'*' * len(credentials['api_chicago']['app_token'])}")

print("\nInformación de AWS S3:")
print(f"AWS Access Key ID: {'*' * len(credentials['s3']['aws_access_key_id'])}")
print(f"AWS Secret Access Key: {'*' * len(credentials['s3']['aws_secret_access_key'])}")
print(f"AWS Session Token: {'*' * len(credentials['s3']['aws_session_token']) if credentials['s3']['aws_session_token'] else 'No proporcionado'}")

print(f"\nMatrícula IEXE: {credentials['iexe']['matricula']}")

# Configurar el cliente de boto3 con las credenciales
boto3.setup_default_session(
    aws_access_key_id=credentials['s3']['aws_access_key_id'],
    aws_secret_access_key=credentials['s3']['aws_secret_access_key'],
    aws_session_token=credentials['s3']['aws_session_token']
)

# Definir la región (ajusta según sea necesario)
AWS_REGION = "us-east-1"

def crear_bucket_si_no_existe():
    s3 = boto3.resource('s3', region_name=AWS_REGION)
    bucket_name = f"aplicaciones-cd-1-{credentials['iexe']['matricula']}"
    try:
        if AWS_REGION == "us-east-1":
            s3.create_bucket(Bucket=bucket_name)
        else:
            s3.create_bucket(
                Bucket=bucket_name,
                CreateBucketConfiguration={'LocationConstraint': AWS_REGION}
            )
        print(f"Bucket '{bucket_name}' creado exitosamente en la región {AWS_REGION}.")
    except ClientError as e:
        error_code = e.response['Error']['Code']
        if error_code == 'BucketAlreadyOwnedByYou':
            print(f"El bucket '{bucket_name}' ya existe y te pertenece.")
        elif error_code == 'BucketAlreadyExists':
            print(f"El bucket '{bucket_name}' ya existe, pero no te pertenece. Usando un nombre alternativo.")
            bucket_name = f"{bucket_name}-{date.today().strftime('%Y%m%d')}"
            s3.create_bucket(Bucket=bucket_name)
            print(f"Bucket alternativo '{bucket_name}' creado exitosamente.")
        else:
            print(f"Error al crear el bucket: {e}")
            return None
    return bucket_name

def guardar_ingesta(bucket, bucket_path, dataset):
    s3 = boto3.resource('s3', region_name=AWS_REGION)
    try:
        s3.Object(bucket, bucket_path).put(Body=pickle.dumps(dataset))
        print(f"Datos guardados exitosamente en {bucket}/{bucket_path}")
        return True
    except ClientError as e:
        print(f"Error al guardar los datos: {e}")
        return False

# Código principal
TODAY = date.today()
bucket_name = crear_bucket_si_no_existe()

if bucket_name:
    print(f"Bucket '{bucket_name}' listo para usar.")
    print("Guardando la carga inicial en S3...")
    key_inicial = f"ingesta/inicial/inspecciones-historicas-{TODAY}.pkl"
    if 'datasets' in globals():
        if guardar_ingesta(bucket_name, key_inicial, datasets):
            print(f"Datos guardados exitosamente en {bucket_name}/{key_inicial}")
        else:
            print("No se pudieron guardar los datos en S3.")
    else:
        print("Error: 'datasets' no está definido. Asegúrate de tener datos para guardar.")
else:
    print("No se pudo proceder con el guardado de datos debido a problemas con el bucket.")

print(f"Región AWS utilizada: {AWS_REGION}")

Información de API Chicago:
Username: mcda24a004_gerardo_mayel@iexe.edu.mx
App Token: *************************

Información de AWS S3:
AWS Access Key ID: ********************
AWS Secret Access Key: ****************************************
AWS Session Token: *********************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************

## 4. Ingestas Consecutivas

In [40]:
def ingesta_consecutiva(chicago_dataset, client, fecha, limit):
    # Esto es para cargas posteriores. Sólo tomamos los datos nuevos desde la última vez que revisamos.
    return client.get(chicago_dataset, limit=limit, where=f"inspection_date>='{fecha}'")

print("Realizando una carga consecutiva de ejemplo...")
new_dataset = ingesta_consecutiva(chicago_dataset, client, '2023-01-01', 1000)
print(f"Se obtuvieron {len(new_dataset)} registros en la carga consecutiva.")

Realizando una carga consecutiva de ejemplo...
Se obtuvieron 1000 registros en la carga consecutiva.


## 5. Almacenamiento de Ingestas Consecutivas

In [41]:
print("Guardando la carga consecutiva en S3...")
key_consecutiva = f"ingesta/consecutiva/inspecciones-consecutivas-{TODAY}.pkl"
guardar_ingesta(bucket_name, key_consecutiva, new_dataset)
print(f"Nuevos datos guardados en {bucket_name}/{key_consecutiva}")

Guardando la carga consecutiva en S3...
Datos guardados exitosamente en aplicaciones-cd-1-mcda24a004/ingesta/consecutiva/inspecciones-consecutivas-2024-10-20.pkl
Nuevos datos guardados en aplicaciones-cd-1-mcda24a004/ingesta/consecutiva/inspecciones-consecutivas-2024-10-20.pkl


## 6. Verificación de Carga (Plus para demostrar entendimeinto del problema y dar un valor agregado a la solución)

In [42]:
def verificar_carga(bucket, bucket_path):
    # Esta función es como hacer un conteo rápido para asegurarnos de que todo lo que cargamos está ahí.
    s3 = get_s3_resource()
    obj = s3.Object(bucket, bucket_path)
    data = pickle.loads(obj.get()['Body'].read())
    return len(data)

print("Verificando la integridad de la carga inicial...")
registros_cargados = verificar_carga(bucket_name, key_inicial)
print(f"Se verificaron {registros_cargados} registros en la carga inicial en S3.")

print("Verificando la integridad de la carga consecutiva...")
registros_consecutivos = verificar_carga(bucket_name, key_consecutiva)
print(f"Se verificaron {registros_consecutivos} registros en la carga consecutiva en S3.")

print("Proceso de ingesta y almacenamiento completado.")

Verificando la integridad de la carga inicial...
Se verificaron 280438 registros en la carga inicial en S3.
Verificando la integridad de la carga consecutiva...
Se verificaron 1000 registros en la carga consecutiva en S3.
Proceso de ingesta y almacenamiento completado.
