In [1]:

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
import mlflow
import mlflow.sklearn
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score

# Definir parámetros del experimento MLflow
MLFLOW_EXPERIMENT = "Logistic_Regression_Experiment"
MLFLOW_TRACKING_URI = "file:///tmp/mlruns"  # Cambiar si se usa un servidor MLflow remoto

# Inicializar MLflow
mlflow.set_tracking_uri(MLFLOW_TRACKING_URI)
mlflow.set_experiment(MLFLOW_EXPERIMENT)

def train_model():
    """Carga datos, entrena un modelo y registra resultados en MLflow."""
    # Generar datos sintéticos
    np.random.seed(42)
    X = np.random.rand(1000, 5)
    y = (X[:, 0] + X[:, 1] > 1).astype(int)
    
    # Dividir datos en entrenamiento y prueba
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
    
    with mlflow.start_run():
        # Entrenar modelo
        model = LogisticRegression()
        model.fit(X_train, y_train)
        
        # Evaluar modelo
        y_pred = model.predict(X_test)
        acc = accuracy_score(y_test, y_pred)
        
        # Registrar parámetros, métricas y modelo en MLflow
        mlflow.log_param("solver", "lbfgs")
        mlflow.log_param("penalty", "l2")
        mlflow.log_metric("accuracy", acc)
        mlflow.sklearn.log_model(model, "logistic_regression_model")
        
        print(f"Modelo registrado con precisión: {acc:.4f}")

# Definir el DAG de Airflow
default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    "start_date": datetime(2025, 3, 24),
    "retries": 1,
}

dag = DAG(
    "mlflow_training_pipeline",
    default_args=default_args,
    description="Entrenamiento de un modelo ML con MLflow y Airflow",
    schedule_interval="@daily",  # Corre una vez al día
    catchup=False,
)

# Definir tarea en Airflow
train_task = PythonOperator(
    task_id="train_model",
    python_callable=train_model,
    dag=dag,
)

train_task


ModuleNotFoundError: No module named 'airflow'