# ML Pipeline using scikit-learn

In [0]:
spark.sql("USE CATALOG jeromeaymon_lakehouse")
spark.sql("USE SCHEMA ml_sandbox")

In [0]:
from pyspark.sql.functions import col

# Load dataset
data_path = "/Volumes/jeromeaymon_lakehouse/ml_sandbox/data/train.csv"
train_df = spark.read.csv(data_path, header=True, inferSchema=True)

# Cast Boolean columns to int
train_df = train_df.withColumn("PassengerId", col("PassengerId").cast("string")) \
                   .withColumn("VIP", col("VIP").cast("int")) \
                   .withColumn("CryoSleep", col("CryoSleep").cast("int")) \
                   .withColumn("Transported", col("Transported").cast("int")) 

display(train_df)

## Pandas & scikit-learn pipeline

In [0]:
import pandas as pd
train = train_df.toPandas()

train.head()

In [0]:
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import OneHotEncoder

# Step 1: Define transformers for different column types
numerical_cols = ['Age', 'RoomService', 'FoodCourt', 'ShoppingMall', 'Spa', 'VRDeck']
numeric_transformer = Pipeline(
    steps=[
        ("imputer", SimpleImputer(strategy="mean"))]
)

categorical_cols = ['HomePlanet', 'Destination', 'VIP', 'CryoSleep']
categorical_transformer = Pipeline(
    steps=[
        ('encoder', OneHotEncoder())
])

# Step 2: Create a ColumnTransformer that applies the transformations to the columns
preprocessor = ColumnTransformer(
    transformers=[
        ('num', numeric_transformer, numerical_cols),
        ('cat', categorical_transformer, categorical_cols)
    ],
    remainder='drop' 
)

# Step 3: Assemble the preprocessing pipeline
preprocessing_pipeline = Pipeline([
    ('preprocessor', preprocessor)
])

# Fit and transform the DataFrame
X_preprocessed = preprocessing_pipeline.fit_transform(train)

preprocessing_pipeline

In [0]:
# Converting back to Pandas DataFrame
onehot_encoder_feature_names = list(preprocessing_pipeline.named_steps['preprocessor'].named_transformers_['cat'].named_steps['encoder'].get_feature_names_out())
column_order =  numerical_cols + onehot_encoder_feature_names

# Show the cleaned DataFrame
X_preprocessed = pd.DataFrame(X_preprocessed, columns=column_order, index=train.index)
y = train['Transported']

X_preprocessed.head()

## Hyperparameter tuning of a Decision Tree Classifier 

We use optuna to hyperparameter tuning of a decision tree classifier

In [0]:
import optuna
from sklearn.tree import DecisionTreeClassifier
from sklearn.model_selection import cross_val_score

model = DecisionTreeClassifier(criterion='entropy', random_state= 42)

def objective(trial):

    params = {
        # trial parameters to optimize
        'max_depth' : trial.suggest_int('max_depth', 3, 40, log=True),
        'min_samples_split' : trial.suggest_float('min_samples_split', 1e-6, 1e-3, log=True),
        'min_samples_leaf' : trial.suggest_float('min_samples_leaf', 1e-6, 1e-3, log=True)
    }

    model.set_params(**params)

    cv_score = cross_val_score(model, X_preprocessed, y, cv=5, scoring='accuracy').mean()

    return cv_score

study = optuna.create_study(direction='maximize')
study.optimize(objective, n_trials=100)


print("--------------------------------------")
print("best_params =", study.best_params, "with cross_validation_score =", study.best_value)

## ML Flow

In [0]:
import mlflow
from mlflow import MlflowClient
from mlflow.models.signature import infer_signature

# Set the tracking URI to the Databricks workspace
mlflow.set_tracking_uri("databricks")
mlflow.set_registry_uri("databricks-uc")

# Create an instance of MlflowClient
client = MlflowClient()

X = train.drop(['Transported'], axis = 1)
y = train['Transported']

# Start an MLflow run
with mlflow.start_run():
    # Fit the model with the best hyperparameters from the study
    model = DecisionTreeClassifier(criterion= 'entropy', random_state= 42)
    model.set_params(**study.best_params)

    model_pipeline = Pipeline(steps=[
            ('preprocessor', preprocessor),
            ('classifier', model)
        ])

    model_pipeline.fit(X, y)
    
    # Log the hyperparameters
    mlflow.log_params(study.best_params)

    # Log the loss metric
    mlflow.log_metric("accuracy", study.best_value)

    # Set a tag that we can use to remind ourselves what this run was for
    mlflow.set_tag("Training Info", "Simple Decision Tree Classifier")
    
    # Infer the model signature
    signature = infer_signature(X, model_pipeline.predict(X))
    
    # Log the model
    model_info = mlflow.sklearn.log_model(
        sk_model=model_pipeline,
        signature=signature,
        registered_model_name="decision_tree_model",
        artifact_path="decision_tree_model"
    )

## Advanced Optuna: Distributed Hyperparameter Tuning

We'll now use Optuna with **distributed parallelization** across Spark executors to accelerate hyperparameter tuning.

**Key Features:**
* **MlflowStorage**: Uses MLflow Tracking Server as the storage backend for sharing trial results
* **MlflowSparkStudy**: Distributes trials across Spark executors for parallel execution
* **n_jobs parameter**: Controls the number of parallel trials to run simultaneously

This approach is ideal for computationally expensive models and large search spaces.

In [0]:
%pip install mlflow>=3.0.0 optuna==4.1.0 --upgrade --quiet
dbutils.library.restartPython()

In [0]:
# Re-run necessary setup after kernel restart
from pyspark.sql.functions import col
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import OneHotEncoder
import pandas as pd

# Set catalog and schema
spark.sql("USE CATALOG jeromeaymon_lakehouse")
spark.sql("USE SCHEMA ml_sandbox")

# Load and prepare data
data_path = "/Volumes/jeromeaymon_lakehouse/ml_sandbox/data/train.csv"
train_df = spark.read.csv(data_path, header=True, inferSchema=True)
train_df = train_df.withColumn("PassengerId", col("PassengerId").cast("string")) \
                   .withColumn("VIP", col("VIP").cast("int")) \
                   .withColumn("CryoSleep", col("CryoSleep").cast("int")) \
                   .withColumn("Transported", col("Transported").cast("int"))
train = train_df.toPandas()

# Recreate preprocessor
numerical_cols = ['Age', 'RoomService', 'FoodCourt', 'ShoppingMall', 'Spa', 'VRDeck']
numeric_transformer = Pipeline(
    steps=[('imputer', SimpleImputer(strategy="mean"))]
)

categorical_cols = ['HomePlanet', 'Destination', 'VIP', 'CryoSleep']
categorical_transformer = Pipeline(
    steps=[('encoder', OneHotEncoder())]
)

preprocessor = ColumnTransformer(
    transformers=[
        ('num', numeric_transformer, numerical_cols),
        ('cat', categorical_transformer, categorical_cols)
    ],
    remainder='drop'
)

print("Variables restored: train, preprocessor")

In [0]:
import mlflow
from mlflow.optuna.storage import MlflowStorage
from mlflow.pyspark.optuna.study import MlflowSparkStudy
from sklearn.tree import DecisionTreeClassifier
from sklearn.model_selection import cross_val_score
from sklearn.pipeline import Pipeline
import optuna

# Get the current experiment ID
experiment_id = mlflow.get_experiment_by_name(
    dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get()
).experiment_id

# Create MLflow storage backend for distributed optimization
mlflow_storage = MlflowStorage(experiment_id=experiment_id)

print(f"Using experiment ID: {experiment_id}")
print(f"MLflow storage configured for distributed optimization")

In [0]:
def distributed_objective(trial):
    """
    Objective function for distributed Optuna optimization.
    Each trial is logged to MLflow automatically via MlflowStorage.
    """
    
    # Define hyperparameter search space
    params = {
        'max_depth': trial.suggest_int('max_depth', 3, 40, log=True),
        'min_samples_split': trial.suggest_float('min_samples_split', 1e-6, 1e-3, log=True),
        'min_samples_leaf': trial.suggest_float('min_samples_leaf', 1e-6, 1e-3, log=True),
        'min_impurity_decrease': trial.suggest_float('min_impurity_decrease', 0.0, 0.5),
        'max_features': trial.suggest_categorical('max_features', ['sqrt', 'log2', None]),
        'ccp_alpha': trial.suggest_float('ccp_alpha', 0.0, 0.1)
    }
    
    # Create model with suggested parameters
    model = DecisionTreeClassifier(
        criterion='entropy',
        random_state=42,
        **params
    )
    
    # Create pipeline with preprocessing
    model_pipeline = Pipeline(steps=[
        ('preprocessor', preprocessor),
        ('classifier', model)
    ])
    
    # Prepare data
    X = train.drop(['Transported'], axis=1)
    y = train['Transported']
    
    # Evaluate with cross-validation
    cv_score = cross_val_score(
        model_pipeline, 
        X, 
        y, 
        cv=5, 
        scoring='accuracy',
        n_jobs=-1  # Use all available cores for CV
    ).mean()
    
    return cv_score

In [0]:
import time

# Create Optuna study with MLflow storage backend
# Note: Using standard Optuna study instead of MlflowSparkStudy because
# MlflowSparkStudy requires a multi-node cluster with workers.
# This approach still logs to MLflow and parallelizes using threads on the driver.
# Use unique study name to avoid conflicts with existing studies
study = optuna.create_study(
    study_name=f"decision_tree_tuning_{int(time.time())}",
    storage=mlflow_storage,
    direction="maximize",
    load_if_exists=False
)

# Run optimization with thread-based parallelization
# n_trials: total number of trials to run
# n_jobs: number of parallel threads (runs on driver node)
print("Starting hyperparameter optimization with MLflow tracking...")
print(f"Total trials: 200")
print(f"Parallel threads: 4")
print("\nThis will parallelize using threads on the driver node.\n")

study.optimize(
    distributed_objective,
    n_trials=200,
    n_jobs=4,  # Run 4 trials in parallel using threads
    catch=(Exception,)  # Catch exceptions during parallel execution to prevent None values
)

print("\n" + "="*60)
print("Optimization Complete!")
print("="*60)
print(f"Best parameters: {study.best_params}")
print(f"Best cross-validation score: {study.best_value:.4f}")
print(f"Total trials completed: {len(study.trials)}")
print(f"Failed trials: {len([t for t in study.trials if t.state != optuna.trial.TrialState.COMPLETE])}")

In [0]:
import pandas as pd
import matplotlib.pyplot as plt

# Get all trials as DataFrame
trials_df = study.trials_dataframe()

# Display top 10 trials
print("Top 10 Trials by Accuracy:")
top_trials = trials_df.nlargest(10, 'value')[[
    'number', 'value', 'params_max_depth', 'params_min_samples_split', 
    'params_min_samples_leaf', 'params_max_features'
]]
print(top_trials.to_string(index=False))

# Visualize optimization history
fig, axes = plt.subplots(1, 2, figsize=(14, 5))

# Plot 1: Optimization history
axes[0].plot(trials_df['number'], trials_df['value'], alpha=0.6, marker='o', markersize=3)
axes[0].axhline(y=study.best_value, color='r', linestyle='--', label=f'Best: {study.best_value:.4f}')
axes[0].set_xlabel('Trial Number')
axes[0].set_ylabel('Cross-Validation Accuracy')
axes[0].set_title('Optimization History')
axes[0].legend()
axes[0].grid(True, alpha=0.3)

# Plot 2: Parameter importance (max_depth)
axes[1].scatter(trials_df['params_max_depth'], trials_df['value'], alpha=0.5)
axes[1].set_xlabel('max_depth')
axes[1].set_ylabel('Cross-Validation Accuracy')
axes[1].set_title('Impact of max_depth on Performance')
axes[1].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

In [0]:
from mlflow.models.signature import infer_signature

# Prepare data
X = train.drop(['Transported'], axis=1)
y = train['Transported']

# Start MLflow run for final model
with mlflow.start_run(run_name="optimized_decision_tree"):
    
    # Create model with best parameters from optimization
    best_model = DecisionTreeClassifier(
        criterion='entropy',
        random_state=42,
        **study.best_params
    )
    
    # Create pipeline
    final_pipeline = Pipeline(steps=[
        ('preprocessor', preprocessor),
        ('classifier', best_model)
    ])
    
    # Train on full dataset
    final_pipeline.fit(X, y)
    
    # Log parameters
    mlflow.log_params(study.best_params)
    mlflow.log_param("optimization_method", "optuna_with_mlflow_storage")
    mlflow.log_param("total_trials", len(study.trials))
    mlflow.log_param("parallel_jobs", 4)
    
    # Log metrics
    mlflow.log_metric("cv_accuracy", study.best_value)
    
    # Set tags
    mlflow.set_tag("Training Info", "Decision Tree with Optuna Optimization")
    mlflow.set_tag("optimization_type", "optuna_mlflow_storage")
    
    # Infer signature
    signature = infer_signature(X, final_pipeline.predict(X))
    
    # Log model to Unity Catalog
    model_info = mlflow.sklearn.log_model(
        sk_model=final_pipeline,
        signature=signature,
        registered_model_name="decision_tree_optimized",
        artifact_path="decision_tree_model"
    )
    
    print("\n" + "="*60)
    print("Final Model Logged Successfully!")
    print("="*60)
    print(f"Model URI: {model_info.model_uri}")
    print(f"Registered Model: decision_tree_optimized")
    print(f"Best CV Accuracy: {study.best_value:.4f}")