In [0]:
import mlflow
model_name = "engine_rul_predictor"
username = dbutils.notebook.entry_point.getDbutils().notebook().getContext().userName().get()
mlflow.set_experiment(f'/Users/{username}/{model_name}')

In [0]:
catalog = 'demos'
database = "engine_rul_predictor"
source_table = "nasa_data_train_test"
target_table = "features"

In [0]:
# Set the default catalog and schema
spark.sql(f"USE CATALOG {catalog}")
spark.sql(f"USE {database}")

In [0]:
from pyspark.sql import functions as F

#Read the Training Data
train_df = spark.table(f"{database}.{source_table}")

display(train_df)

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import FloatType

# Define window specifications
rollingWindowSpec = Window.partitionBy("id").orderBy("Cycle").rowsBetween(-1, 0)
lagWindowSpec = Window.partitionBy("id").orderBy("Cycle")

# Additional feature engineering for all 21 sensors
features_df = train_df
# for i in range(1, 22):
#     features_df = features_df.withColumn(
#         f"SensorMeasure{i}_rolling_avg", 
#         F.avg(f"SensorMeasure{i}").over(rollingWindowSpec).cast(FloatType())
#     ).withColumn(
#         f"SensorMeasure{i}_diff", 
#         (F.col(f"SensorMeasure{i}") - F.lag(f"SensorMeasure{i}", 1).over(lagWindowSpec)).cast(FloatType())
#     )

# Save the features to a Delta table
features_df.write.mode("overwrite").saveAsTable(f"{database}.{target_table}")

In [0]:
import pandas as pd
import numpy as np
import mlflow
from sklearn.model_selection import train_test_split
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import *
mlflow.spark.autolog()
mlflow.sklearn.autolog()

# Convert Spark DataFrame to Pandas DataFrame and drop unnecessary columns
data = features_df.toPandas()

# Split data into train and test sets
train, test = train_test_split(data, test_size=0.30, random_state=206)
colLabel = 'RemainingUsefulLife'

# The predicted column is colLabel which is a scalar
train_x = train.drop([colLabel], axis=1)
test_x = test.drop([colLabel], axis=1)
train_y = train[colLabel]
test_y = test[colLabel]

In [0]:
from sklearn.tree import DecisionTreeRegressor
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
import mlflow
import numpy as np

# Train and log the model
with mlflow.start_run(run_name="RUL_DecisionTreeRegressor") as run:
    # Define the model
    model = DecisionTreeRegressor(random_state=206)
    
    # Train the model
    model.fit(train_x, train_y)
    
    # Predict on the test set
    predictions = model.predict(test_x)
    
    # Calculate error metrics
    mae = mean_absolute_error(test_y, predictions)
    mse = mean_squared_error(test_y, predictions)
    rmse = np.sqrt(mse)
    r2 = r2_score(test_y, predictions)
    
    # Log metrics
    mlflow.log_metrics({"mae": mae, "mse": mse, "rmse": rmse, "r2": r2})
    
    # Log the model
    mlflow.sklearn.log_model(model, "decision_tree_model")

In [0]:
from hyperopt import fmin, tpe, hp, SparkTrials, STATUS_OK
from hyperopt.pyll import scope
import numpy as np
from sklearn.tree import DecisionTreeRegressor
from sklearn.metrics import mean_squared_error

# Define the search space
search_space = {
    'max_depth': hp.choice('max_depth', range(1, 20)),
    'max_leaf_nodes': hp.choice('max_leaf_nodes', range(4, 128))
}

def train_model(params):
    mlflow.sklearn.autolog()
    
    with mlflow.start_run(nested=True):
        # Define the model
        model = DecisionTreeRegressor(**params)
        
        # Train the model
        model.fit(train_x, train_y)
        
        # Predict on the test set
        predictions = model.predict(test_x)
        
        # Calculate the loss (mean squared error)
        loss = mean_squared_error(test_y, predictions)
        
        return {'status': STATUS_OK, 'loss': loss}

# Start the hyperparameter tuning process
with mlflow.start_run(run_name='sklearn_hyperopt') as run:
    best_params = fmin(
        fn=train_model,
        space=search_space,
        algo=tpe.suggest,
        max_evals=20,
        trials=SparkTrials()
    )

run_id = run.info.run_uuid
experiment_id = run.info.experiment_id

In [0]:
from pyspark.sql.functions import *

# Read the experiment data
experiment_Df = spark.read.format("mlflow-experiment").load(experiment_id)

# Find the best run based on MSE (lower is better)
best_run = (
  experiment_Df
    .filter(experiment_Df.tags["mlflow.rootRunId"] == run_id)
    .orderBy(experiment_Df.metrics["mse"].asc())  # Using 'mse' as the metric
    .limit(1)
    .first()['run_id']
)

In [0]:
display(best_run)

In [0]:
import mlflow
from mlflow.tracking.client import MlflowClient

client = MlflowClient()
model_uri = f"runs:/{best_run}/model"


# print(model_uri)
# Verify the artifact location
local_path = mlflow.artifacts.download_artifacts(artifact_uri=model_uri)

# Register the model
model_details = mlflow.register_model(model_uri, model_name)

In [0]:
from mlflow import MlflowClient

client = MlflowClient()

# Set an alias for the model version
client.set_registered_model_alias(
    name=model_name,
    alias="Production",
    version=model_details.version
)