Este notebook forma parte del trabajo final del curso de MLOps en el Diplomado de Data Analytics de ESAN y aborda la fase de monitoreo y mantenimiento de un modelo en producción. Su objetivo es evaluar el desempeño del modelo con datos nuevos y detectar posibles cambios en la distribución de las variables (data drift) que puedan afectar su precisión.

Para ello, se cargan datos históricos y recientes, verificando la consistencia de las columnas clave. Se analiza el data drift mediante estadísticas descriptivas y el test de Kolmogorov-Smirnov. Luego, se evalúa el modelo en los nuevos datos usando la métrica AUC (Area Under the Curve).Finalmente, los resultados se almacenan en MLflow, y se establece un sistema de alertas para identificar caídas en la precisión y determinar si es necesario reentrenar o ajustar el modelo. Este proceso es clave en MLOps para asegurar la confiabilidad del modelo en producción. 

### 1. Inicialización del entorno y librerías
En este primer paso, se importan las librerías necesarias para la sesión de Spark, MLflow y el análisis de data drift.
Se inicia la sesión de Spark y se imprime un mensaje para indicar que el notebook ha sido cargado correctamente.

In [0]:
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
import mlflow
import mlflow.spark
import time

# Librerías opcionales para data drift
from pyspark.ml.stat import KolmogorovSmirnovTest

spark = SparkSession.builder.getOrCreate()

print("Notebook 06 - Monitoreo y Mantenimiento - iniciado.")


Notebook 06 - Monitoreo y Mantenimiento - iniciado.


### 2. Carga de datos históricos y datos recientes
Se cargan dos conjuntos de datos:
df_train: Datos de entrenamiento usados previamente en la fase de modelado.
df_new: Un subconjunto aleatorio de df_train, simulado para representar nuevos datos post-producción.
Se comparan los tamaños de ambos datasets.

In [0]:
# =========================================================
# 2) Cargar datos de entrenamiento y nuevo dataset simulado
# =========================================================

# 2.1 Cargar df_train 
#     Aquí lo simulamos con la tabla de antes:
df_train = spark.table("mining.safety_analysis.cleaned_mining_data")

# 2.2 Cargar un df_new que simula los datos "recientes" 
#     (post-producción). 
#     Aquí creamos un subset al azar para simularlo:
df_new = df_train.sample(fraction=0.1, seed=999).cache()

print("df_train registros:", df_train.count())
print("df_new (simulados) registros:", df_new.count())

display(df_new.limit(5))


df_train registros: 10000
df_new (simulados) registros: 1006


Accident_Occurred,Hours_Worked,Weather_Risk_Index,Machine_Age_Years,Employee_Experience_Years,Safety_Violations,Inspection_Frequency,Job_Risk_Level,Shift_Type,Area,Employee_Age,Noise_Level_dB,Temperature_C,features,scaled_features,Moving_Avg_Noise,Moving_Avg_Temperature,Diff_Safety_Violations,Diff_Inspection_Frequency,Age_Group
0,7,0.7876088143823133,1,9,9,11,2,0,0,18,97.82037145789188,32.299454658105226,"Map(vectorType -> dense, length -> 10, values -> List(7.0, 0.7876088143823133, 1.0, 9.0, 9.0, 11.0, 2.0, 18.0, 97.82037145789187, 32.299454658105226))","Map(vectorType -> dense, length -> 10, values -> List(-0.8772547561736581, 0.9920654263915089, -1.6235212602055429, -0.6389558183766151, 1.5662941927316953, 1.5835148358060784, 0.7448507855134497, -1.7034203426534413, 1.5786643044219437, 0.8490793014224517))",89.86964217028178,29.798136636343173,4,10,Joven
0,9,0.5691529759659067,13,25,3,11,1,0,1,18,93.17319493801875,30.35915084752571,"Map(vectorType -> dense, length -> 10, values -> List(9.0, 0.5691529759659067, 13.0, 25.0, 3.0, 11.0, 1.0, 18.0, 93.17319493801877, 30.35915084752571))","Map(vectorType -> dense, length -> 10, values -> List(0.3020888697907859, 0.24198032379971338, 0.5640705476574821, 1.2098078067139635, -0.5164515092296965, 1.5835148358060784, -0.7392113494007705, -1.7034203426534413, 1.2570722285583302, 0.6246834933390406))",81.84400007944413,30.19145795891655,1,2,Joven
1,8,0.947168511578586,10,16,7,5,1,0,0,18,74.67027611156388,24.00643389630324,"Map(vectorType -> dense, length -> 10, values -> List(8.0, 0.9471685115785861, 10.0, 16.0, 7.0, 5.0, 1.0, 18.0, 74.67027611156388, 24.006433896303236))","Map(vectorType -> dense, length -> 10, values -> List(-0.2875829431914361, 1.539926049886722, 0.017172595691725853, 0.16987826760051303, 0.8720456254112313, -0.3160950919843262, -0.7392113494007705, -1.7034203426534413, -0.023359459261018917, -0.11000714944694003))",76.46893671186487,28.215117972436804,7,1,Joven
0,9,0.6671909163791936,15,23,4,4,2,0,1,18,54.9824605115324,20.689619082630813,"Map(vectorType -> dense, length -> 10, values -> List(9.0, 0.6671909163791936, 15.0, 23.0, 4.0, 4.0, 2.0, 18.0, 54.982460511532395, 20.689619082630813))","Map(vectorType -> dense, length -> 10, values -> List(0.3020888697907859, 0.5786012130643776, 0.9286691823013197, 0.9787123535776412, -0.16932722556946456, -0.6326967466160603, 0.7448507855134497, -1.7034203426534413, -1.3857879060735043, -0.4935962226213912))",58.89418221459263,20.69038495613166,-5,-7,Joven
0,7,0.2283405981832442,4,23,6,8,1,0,1,18,61.86593677005129,22.174688437542187,"Map(vectorType -> dense, length -> 10, values -> List(7.0, 0.22834059818324426, 4.0, 23.0, 6.0, 8.0, 1.0, 18.0, 61.86593677005129, 22.174688437542187))","Map(vectorType -> dense, length -> 10, values -> List(-0.8772547561736581, -0.9282254679373826, -1.0766233082397867, 0.9787123535776412, 0.5249213417509994, 0.6337098719108761, -0.7392113494007705, -1.7034203426534413, -0.9094402980864851, -0.32184820247205315))",71.33539614618614,29.76576504290782,-3,1,Joven


### 3. Verificación de columnas relevantes
Antes de analizar los datos, se aseguran de que las columnas clave del modelo estén presentes en df_new.
Si faltan columnas esenciales, se lanza un error.

In [0]:
# =========================================================
# 3) Verificar columnas relevantes
# =========================================================
# Suponemos que las 8 features y la label 
feature_cols = [
    "Shift_Type",
    "Weather_Risk_Index",
    "Job_Risk_Level",
    "Hours_Worked",
    "Employee_Experience_Years",
    "Safety_Violations",
    "Inspection_Frequency",
    "Temperature_C"
]
label_col = "Accident_Occurred"

print("Columnas en df_new:", df_new.columns)

# Asegurarnos que df_new contenga esas columnas
missing_cols = [c for c in feature_cols + [label_col] if c not in df_new.columns]
if missing_cols:
    raise ValueError(f"df_new no tiene columnas: {missing_cols}")


Columnas en df_new: ['Accident_Occurred', 'Hours_Worked', 'Weather_Risk_Index', 'Machine_Age_Years', 'Employee_Experience_Years', 'Safety_Violations', 'Inspection_Frequency', 'Job_Risk_Level', 'Shift_Type', 'Area', 'Employee_Age', 'Noise_Level_dB', 'Temperature_C', 'features', 'scaled_features', 'Moving_Avg_Noise', 'Moving_Avg_Temperature', 'Diff_Safety_Violations', 'Diff_Inspection_Frequency', 'Age_Group']


### 4. Análisis de Data Drift con Kolmogorov-Smirnov
Se compara la distribución de una variable clave (Weather_Risk_Index) entre df_train y df_new usando el test de Kolmogorov-Smirnov.
Si la diferencia de medias es significativa, se alerta sobre posible data drift.

In [0]:
# =========================================================
# 4) Data Drift (básico) - Comparar distribuciones
# =========================================================
# Usamos KolmogorovSmirnovTest para comparar la distribución
# de una sola columna en 'df_train' vs 'df_new'.
# Como ejemplo, medimos drift en 'Weather_Risk_Index'.

col_drift = "Weather_Risk_Index"

# Nota: KolmogorovSmirnovTest requiere un RDD o DataFrame 
# con una sola columna, sin nulos:
train_col_df = df_train.select(col_drift).dropna()
new_col_df   = df_new.select(col_drift).dropna()

# Convertir a double si no lo es
train_col_df = train_col_df.withColumn(col_drift, F.col(col_drift).cast("double"))
new_col_df   = new_col_df.withColumn(col_drift, F.col(col_drift).cast("double"))

# Para KSTest, Spark ML 3.0+ hace:
# KolmogorovSmirnovTest.test() -> solo sirve con un DataFrame con 2 col: "features", "label"
# Lo ideal es transformar en "features" la variable.
# Simplificaremos -> un approach typical es recabar stats y comparar manualmente.

train_stats = train_col_df.describe().collect()
new_stats   = new_col_df.describe().collect()

print("Estadísticas (train):", train_stats)
print("Estadísticas (new):", new_stats)

# EJEMPLO: Compare la media
# Convertir a float:
def get_mean_from_describe(describe_rows):
    # describe_rows es una lista de Row, buscaremos row donde summary='mean'
    for r in describe_rows:
        if r["summary"] == "mean":
            return float(r[col_drift])
    return None

mean_train = get_mean_from_describe(train_stats)
mean_new   = get_mean_from_describe(new_stats)

drift_diff = abs(mean_new - mean_train)
print(f"Mean drift en {col_drift}: {mean_train} -> {mean_new}, diff={drift_diff:.3f}")

# Si drift_diff es mayor a cierto umbral, consideras que hay data drift
drift_threshold = 1.0  # Ejemplo
if drift_diff > drift_threshold:
    print(f"⚠️ DRIFT DETECTADO en {col_drift}: cambio de media {drift_diff:.2f}")
else:
    print("No se detectó drift significativo (mean).")


Estadísticas (train): [Row(summary='count', Weather_Risk_Index='10000'), Row(summary='mean', Weather_Risk_Index='0.4986782867162684'), Row(summary='stddev', Weather_Risk_Index='0.2912414040241146'), Row(summary='min', Weather_Risk_Index='2.334951493544457E-4'), Row(summary='max', Weather_Risk_Index='0.9999300425196784')]
Estadísticas (new): [Row(summary='count', Weather_Risk_Index='1006'), Row(summary='mean', Weather_Risk_Index='0.4952895500112074'), Row(summary='stddev', Weather_Risk_Index='0.2847381397932316'), Row(summary='min', Weather_Risk_Index='0.004117857299994809'), Row(summary='max', Weather_Risk_Index='0.9978868110445832')]
Mean drift en Weather_Risk_Index: 0.4986782867162684 -> 0.4952895500112074, diff=0.003
No se detectó drift significativo (mean).


### 5. Evaluación del modelo en datos recientes
Se carga el modelo de ML desde MLflow y se evalúa su desempeño en df_new usando el AUC como métrica de referencia.

In [0]:
# =========================================================
# 5) Cargar la pipeline y evaluar performance en df_new
# =========================================================
# Asumiendo df_new también tiene la label 'Accident_Occurred' 
# (que llegó con retraso, por ejemplo).
# Usaremos la pipeline final. 
# Ajustar 'models:/AccidentPrediction2025/1' o la stage 
# si ya está en Production.

model_uri = "models:/AccidentPrediction2025/1"
pipeline_model = mlflow.spark.load_model(model_uri)

# Hacemos predicciones
df_pred_new = pipeline_model.transform(df_new)

# Evaluar AUC
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator(
    labelCol=label_col,
    rawPredictionCol="rawPrediction",
    metricName="areaUnderROC"
)

auc_new = evaluator.evaluate(df_pred_new)
print(f"AUC en df_new: {auc_new:.4f}")


  """The sequence number of this run attempt for a triggered job run. The initial attempt of a run
  """The sequence number of this run attempt for a triggered job run. The initial attempt of a run
  """The sequence number of this run attempt for a triggered job run. The initial attempt of a run
2025/03/04 13:51:59 INFO mlflow.spark: 'models:/AccidentPrediction2025/1' resolved as 'abfss://unity-catalog-storage@dbstorageyp3hv5lr3xzh6.dfs.core.windows.net/1781258311325241/models/f0b5bfa6-b545-4a3a-8f96-eddccdc04e61/versions/74acc5b5-5c4b-414d-b811-03fa453d36b7'


AUC en df_new: 0.9829


### 6. Guardado de resultados y monitoreo continuo
Se almacena el resultado del AUC en MLflow y se implementa una lógica básica de alerta si el rendimiento del modelo cae por debajo de un umbral.

In [0]:
# =========================================================
# 6) Log de performance en MLflow. 
#    Decidir reentrenar si cae < threshold
# =========================================================

with mlflow.start_run(run_name="Monitoring_in_Production"):
    mlflow.log_metric("AUC_new", auc_new)
    # mlflow.log_metric("drift_mean_diff", drift_diff)
    
    # Si la AUC es demasiado baja, se gatilla un "Retrain"
    # Lo simulamos:
    performance_threshold = 0.90  # Ejemplo
    if auc_new < performance_threshold:
        print("⚠️ AUC < threshold, disparar un reentrenamiento.")
        mlflow.set_tag("ACTION_NEEDED", "Retrain")
        # Podrías invocar un job que reejecute notebook 03, etc.
    else:
        print("✅ Performance aceptable, no se requiere reentrenamiento.")
        mlflow.set_tag("ACTION_NEEDED", "None")


✅ Performance aceptable, no se requiere reentrenamiento.


2025/03/04 13:52:47 INFO mlflow.tracking._tracking_service.client: 🏃 View run Monitoring_in_Production at: adb-1781258311325241.1.azuredatabricks.net/ml/experiments/1516674257214703/runs/65f10f057f0f4932b210df1a49d8d1ee.
2025/03/04 13:52:47 INFO mlflow.tracking._tracking_service.client: 🧪 View experiment at: adb-1781258311325241.1.azuredatabricks.net/ml/experiments/1516674257214703.
