In [1]:
# Para que funciones, todos nuestros scripts debemos exportar las siguientes variables de entorno
%env AWS_ACCESS_KEY_ID=minio   
%env AWS_SECRET_ACCESS_KEY=minio123 
%env MLFLOW_S3_ENDPOINT_URL=http://localhost:9000
%env AWS_ENDPOINT_URL_S3=http://localhost:9000

env: AWS_ACCESS_KEY_ID=minio
env: AWS_SECRET_ACCESS_KEY=minio123
env: MLFLOW_S3_ENDPOINT_URL=http://localhost:9000
env: AWS_ENDPOINT_URL_S3=http://localhost:9000


### Cargamos los datos con las columnas preprocesadas y separamos en features y target

In [2]:
import awswrangler as wr
import json

s3_base_path = "s3://data/chicago/crimes/2024"

data_preprocessed_path = f"{s3_base_path}/chicago_crimes_2024_preprocessed.csv"
dataset = wr.s3.read_csv(data_preprocessed_path)
target_col = "fbi_code"

# Separamos los features y el target
X = dataset.drop(columns=target_col)
y = dataset[target_col]


Tomamos una muestra e imprimimos los valores que nos servirán para probar la API y comparar el resultado.

In [3]:
idx = X.sample(1).index[0]
x_data = X.loc[idx]
print(json.dumps({"features": x_data.to_dict()}, indent=2))

print(f"True value: {y.loc[idx]}")

{
  "features": {
    "iucr": "0460",
    "primary_type": "BATTERY",
    "description": "SIMPLE",
    "location_description": "SIDEWALK",
    "arrest": false,
    "domestic": false,
    "beat": 132,
    "district": 1,
    "ward": 3,
    "community_area": 33,
    "latitude": 41.86168923,
    "longitude": -87.620814811,
    "mes": 8,
    "dia_mes": 9,
    "dia_semana": 4,
    "hora": 10
  }
}
True value: 08B


### Prueba del endpoint con cache intermedia

Probamos el endpoint a partir de un ejemplo al azar.  
Utilizamos cache en redis para agilizar las consultas.  
Si la predicción no se encuentra, se resuelve con una llamada a la API y se guarda en redis.

In [4]:
import requests
import redis
import hashlib

In [5]:
# Conectamos al servidor valkey
r = redis.Redis(host='localhost', port=6380, password='password', decode_responses=True)

In [6]:
# Endpoint de la API
api_predict_endpoint = "http://localhost:8800/predict"
headers = {'Content-type': 'application/json', 'Accept': 'application/json'}

In [7]:
def get_predict(dict_data):
    # Hasheamos la info, primero buscamos en redis, si hay versión cacheada
    # En nuestro caso no omitimos problemas que puedan aparecer por la representación numérica, pero deberíamos tenerlo en cuenta
    hashed = hashlib.sha256(json.dumps(dict_data).encode()).hexdigest()

    # Chequeamos si el hash esta almacenado en redis
    cached = r.get(hashed)

    if (cached is None):
        # No se encontró la predicción cacheada, la obtenemos mediante la API y la guardamos

        # Realizamos la llamada a la api con los datos obtenidos
        response = requests.post(url=api_predict_endpoint,
                                headers=headers,
                                json={"features": dict_data})

        # Validamos la respuesta
        if (response.status_code != 200):
            raise(Exception(f"Error al consultar API: {response.status_code} - {response.reason}.\n{response.text}"))

        prediction = json.dumps(response.json(), indent=2)
        r.set(hashed, prediction)
    else:
        prediction = cached

    return prediction, not (cached is None)

In [8]:
idx = X.sample(1).index[0]
# idx = X.sample(1, random_state=42).index[0]
data_sample = X.loc[idx].to_dict()

prediction, from_cache = get_predict(data_sample)

print(f"Predicción obtenida (desde {'cache' if from_cache else 'API'}):\n{prediction}")

Predicción obtenida (desde cache):
{
  "dtc_fbi_code_output": "14",
  "lda_fbi_code_output": "14"
}
