In [0]:
import pandas as pd
import numpy as np
from sklearn.ensemble import IsolationForest
from sklearn.model_selection import ParameterGrid
import mlflow
import mlflow.sklearn
import os

## Load training and scoring Data

In [0]:
# Load normal data for training
df_train = spark.table("sensor_data_normal").select("temperature", "vibration", "pressure").toPandas()
X_train = df_train.values

## Train the anomaly detection model

In [0]:
# Define Search Space for Tuning

param_grid = {
    "n_estimators": [50, 100],
    "contamination": [1e-4],
    "max_samples": ["auto"],
    "random_state": [42]
}

grid = list(ParameterGrid(param_grid))

In [0]:

mlflow.set_registry_uri("databricks-uc") 

mlflow.set_experiment("/Users/xuanang.leo.liu@gmail.com/predictive_maintenance")

best_params = None
best_score = -np.inf

for params in grid:
    with mlflow.start_run() as run:
        # Train model
        model = IsolationForest(**params)
        model.fit(X_train)

        # Score on scoring dataset
        scores = model.decision_function(X_train)
        mean_score = float(scores.mean())

        # Log to MLflow
        mlflow.log_params(params)
        mlflow.log_metric("mean_anomaly_score", mean_score)
        mlflow.sklearn.log_model(sk_model=model, artifact_path="model", input_example=X_train[:5])

        # Track best model
        if mean_score > best_score:
            best_score = mean_score
            best_model = model
            best_params = params
            best_run_id = run.info.run_id

## Register the best model

In [0]:
# model_uri = f"runs:/{best_run_id}/model"
# registered_name = "genai_demo.isolation_forest_pm_model"

# mlflow.register_model(model_uri=model_uri, name=registered_name)

## Predict anomalies using test data

In [0]:
import mlflow

# model_uri = 'runs:/175c622832c84abf959a2b2745065b1f/model'
# model = mlflow.sklearn.load_model(model_uri)

model = best_model

In [0]:
df_test_spark = spark.table("sensor_data_test")
df_test = df_test_spark.toPandas()
pred = model.predict(df_test[["temperature", "vibration", "pressure"]].values)
df_test["pred"] = pred

In [0]:
len(df_test[df_test['inliner']==df_test['pred']])/len(df_test)

In [0]:
len(df_test[df_test['pred']==1])/len(df_test)

In [0]:
spark.createDataFrame(df_test).write.mode("overwrite").format("delta").option("mergeSchema", "true").saveAsTable("sensor_data_test_scored")