## Consumer con una api de inferencia (con FastApi)

In [1]:
from fastapi import FastAPI, HTTPException
from confluent_kafka import Consumer
from sqlalchemy import create_engine, Column, Integer, String, Float, MetaData, Table
from sqlalchemy.orm import sessionmaker
import mlflow
import pandas as pd

In [2]:
# Inicializar FastAPI
app = FastAPI()

# Configuración de Kafka
kafka_conf = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'fraud_detection_group',
    'auto.offset.reset': 'earliest'
}
consumer = Consumer(kafka_conf)
topic = 'fraud_transactions'
consumer.subscribe([topic])

### Conexion con postgre local

In [3]:
import psycopg2

# Configuración de conexión
DATABASE_URL = "postgresql://postgres:admin@localhost:5433/transactions_db"

# Crear la conexión
conn = psycopg2.connect(DATABASE_URL)
print("Conexión exitosa a la base de datos")

# Crear un cursor para interactuar con la base de datos
cursor = conn.cursor()


Conexión exitosa a la base de datos


In [None]:
import psycopg2

# Conexión a la base de datos
conn = psycopg2.connect(
    dbname="transactions_db",
    user="admin",
    password="admin",
    host="localhost",
    port="5433"
)
cursor = conn.cursor()

# 1️⃣ SELECT antes del INSERT
print("Estado inicial de la tabla:")
cursor.execute("SELECT * FROM transaction.transaction_table;")
rows = cursor.fetchall()
for row in rows:
    print(row)

# 2️⃣ INSERT con parámetros seguros
sql = "INSERT INTO transaction.transaction_table (id, transaction_id, amount, prediction) VALUES (%s, %s, %s, %s);"
values = (4, "TXT", 100, "FRAUD")  # Asegúrate de que 'id' no exista ya
cursor.execute(sql, values)
conn.commit()  # 🔹 Importante: guardar cambios en la BD

# 3️⃣ SELECT después del INSERT
print("\nEstado después del INSERT:")
cursor.execute("SELECT * FROM transaction.transaction_table;")
rows = cursor.fetchall()
for row in rows:
    print(row)

# Cerrar cursor y conexión
cursor.close()
conn.close()
print("\nConexión cerrada")


Estado inicial de la tabla:
(1, '1', 10, '1')
(4, 'T', 100, 'F')


UniqueViolation: llave duplicada viola restricción de unicidad «transaction_table_pkey»
DETAIL:  Ya existe la llave (id)=(4).


In [None]:
# Cerrar el cursor y la conexión
conn.close()
print("Conexión cerrada")

# Configuración de MLflow

In [None]:
import subprocess
import mlflow
from mlflow.models import infer_signature

In [None]:

import subprocess

# Ejecutar MLFlow en segundo plano
# Simula ejecutar el comando "mlflow server --host 127.0.0.1 --port 8080"
process = subprocess.Popen(
    ["mlflow", "server", "--host", "127.0.0.1", "--port", "8080", "--backend-store-uri", "sqlite:///C:/Users/camil/OneDrive/Escritorio/Tesis/proyectos/main/mlflowfiles/mlflow.db", "--default-artifact-root", "file:///C:/Users/camil/OneDrive/Escritorio/Tesis/proyectos/main/mlflowfiles/artifacts"], 
    stdout=subprocess.PIPE,  # Captura la salida estándar
    stderr=subprocess.PIPE   # Captura errores
)

print(f"Servidor MLflow iniciado con PID {process.pid}")



Servidor MLflow iniciado con PID 10968


In [None]:
import mlflow
from mlflow.models import infer_signature

# Configurar SQLite como backend store
mlflow.set_tracking_uri("sqlite:///C:/Users/camil/OneDrive/Escritorio/Tesis/proyectos/main/mlflowfiles/mlflow.db")

# Definir la ubicación de los artefactos
mlflow.set_registry_uri("file:///C:/Users/camil/OneDrive/Escritorio/Tesis/proyectos/main/mlflowfiles/artifacts")

experiment_name = "your_experiment_name"
mlflow.create_experiment(experiment_name, artifact_location="file:///C:/Users/camil/OneDrive/Escritorio/Tesis/proyectos/main/mlflowfiles/artifacts")
mlflow.set_experiment(experiment_name)


In [None]:
model_name = "logistic-regression-experiment"
model_stage = "1"

In [None]:
# Función para cargar el modelo desde MLflow
def load_model():
    model_uri = f"models:/{model_name}/{model_stage}"
    model = mlflow.pyfunc.load_model(model_uri)
    return model


model = load_model()

### Definicion de rutas 

In [None]:
# Ruta principal
@app.get("/")
def read_root():
    return {"message": "API de Inferencia con Kafka y MLflow"}

In [None]:
# Función para consumir datos de Kafka y realizar predicciones
@app.get("/consume")
def consume_data():
    try:
        msg = consumer.poll(1.0)

        if msg is None:
            raise HTTPException(status_code=404, detail="No hay mensajes disponibles en Kafka.")
        
        if msg.error():
            raise HTTPException(status_code=500, detail=f"Error en Kafka: {msg.error()}")

        # Procesar el mensaje de Kafka
        message_value = msg.value().decode('utf-8')
        print(f"Mensaje recibido: {message_value}")

        # Transformar el mensaje (ejemplo: convertir a un DataFrame)
        data = pd.DataFrame([eval(message_value)])  # Asegúrate de recibir un mensaje compatible con eval()

        # Realizar predicción con el modelo de MLflow
        prediction = model.predict(data)
        prediction_label = "fraudulento" if prediction[0] == 1 else "no fraudulento"

        # Guardar resultado en PostgreSQL
        with engine.connect() as conn:
            insert_stmt = transactions_table.insert().values(
                transaction_id=data['transaction_id'][0],
                amount=data['amount'][0],
                prediction=prediction_label
            )
            conn.execute(insert_stmt)

        return {"transaction_id": data['transaction_id'][0], "prediction": prediction_label}

    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Error al procesar: {str(e)}")

In [None]:
# Ruta para detener el consumidor
@app.on_event("shutdown")
def shutdown_event():
    consumer.close()
    print("Consumidor de Kafka cerrado.")