### 🛠️ Day 13 Tasks:

1. Train 3 different models
2. Compare metrics in MLflow
3. Build Spark ML pipeline
4. Select best model

## Task 1: Train 3 Different Models

Load Data (Spark)

In [0]:
from pyspark.sql.functions import col

df = spark.table("ecommerce_catalog.gold.daily_sales_features") \
          .select(
              "total_orders",
              "is_weekend",
              "avg_order_value",
              "total_revenue"
          ) \
          .dropna()


Assemble Features

In [0]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=["total_orders", "is_weekend", "avg_order_value"],
    outputCol="features"
)

data = assembler.transform(df).select("features", "total_revenue")


Train/Test Split

In [0]:
train_df, test_df = data.randomSplit([0.8, 0.2], seed=42)


Define Models

In [0]:
from pyspark.ml.regression import (
    LinearRegression,
    DecisionTreeRegressor,
    RandomForestRegressor
)

lr = LinearRegression(labelCol="total_revenue")
dt = DecisionTreeRegressor(labelCol="total_revenue")
rf = RandomForestRegressor(labelCol="total_revenue", numTrees=50)


Train Models

In [0]:
lr_model = lr.fit(train_df)
dt_model = dt.fit(train_df)
rf_model = rf.fit(train_df)


## Task 2: Compare Metrics in MLflow

Create / Choose a UC Volume

In [0]:
%sql
CREATE VOLUME IF NOT EXISTS workspace.ecommerce.mlflow_tmp;


Configure MLflow to use UC Volume

In [0]:

import os

os.environ["MLFLOW_DFS_TMP"] = "/Volumes/workspace/ecommerce/mlflow_tmp"


Import Required Libraries

In [0]:
import mlflow
import mlflow.spark
from pyspark.ml.evaluation import RegressionEvaluator


Set MLflow Experiment

In [0]:
mlflow.set_experiment("/Day13_Model_Comparison")


<Experiment: artifact_location='dbfs:/databricks/mlflow-tracking/4152753210750153', creation_time=1768998744986, experiment_id='4152753210750153', last_update_time=1768999998338, lifecycle_stage='active', name='/Day13_Model_Comparison', tags={'mlflow.experiment.sourceName': '/Day13_Model_Comparison',
 'mlflow.experimentKind': 'custom_model_development',
 'mlflow.experimentType': 'MLFLOW_EXPERIMENT',
 'mlflow.ownerEmail': 'karthika2738@gmail.com',
 'mlflow.ownerId': '78119440703336'}>

Create Evaluator (RMSE & R²)

In [0]:
evaluator = RegressionEvaluator(
    labelCol="total_revenue",
    predictionCol="prediction"
)


Prepare Models Dictionary

In [0]:
models = {
    "LinearRegression": lr_model,
    "DecisionTree": dt_model,
    "RandomForest": rf_model
}


Create Input Example

In [0]:
input_example = test_df.limit(1)


Log Each Model to MLflow

In [0]:
import pandas as pd
input_example_pd = input_example.toPandas()
if 'features' in input_example_pd.columns:
    input_example_pd['features'] = input_example_pd['features'].apply(lambda x: x.toArray().tolist() if hasattr(x, 'toArray') else list(x) if hasattr(x, '__iter__') else x)

for model_name, model in models.items():
    # 1. Generate predictions
    predictions = model.transform(test_df)

    # 2. Calculate RMSE
    evaluator.setMetricName("rmse")
    rmse = evaluator.evaluate(predictions)

    # 3. Calculate R2
    evaluator.setMetricName("r2")
    r2 = evaluator.evaluate(predictions)

    # 4. Start MLflow run
    with mlflow.start_run():
        # Log parameters
        mlflow.log_param("model_type", model_name)
        # Log metrics
        mlflow.log_metric("rmse", rmse)
        mlflow.log_metric("r2_score", r2)
        # Log Spark ML model WITH input example
        mlflow.spark.log_model(
            model,
            artifact_path="model",
            input_example=input_example_pd
        )


{"ts": "2026-01-21 17:09:13.489", "level": "ERROR", "logger": "pyspark.sql.connect.logging", "msg": "GRPC Error received", "context": {}, "exception": {"class": "_InactiveRpcError", "msg": "<_InactiveRpcError of RPC that terminated with:\n\tstatus = StatusCode.INTERNAL\n\tdetails = \"requirement failed: Column features must be of type class org.apache.spark.ml.linalg.VectorUDT:struct<type:tinyint,size:int,indices:array<int>,values:array<double>> but was actually class org.apache.spark.sql.types.ArrayType:array<double>.\"\n\tdebug_error_string = \"UNKNOWN:Error received from peer  {created_time:\"2026-01-21T17:09:13.488210602+00:00\", grpc_status:13, grpc_message:\"requirement failed: Column features must be of type class org.apache.spark.ml.linalg.VectorUDT:struct<type:tinyint,size:int,indices:array<int>,values:array<double>> but was actually class org.apache.spark.sql.types.ArrayType:array<double>.\"}\"\n>", "stacktrace": [{"class": null, "method": "_analyze", "file": "/databricks/pyt

Verify in MLflow UI

1.Click 🧪 Experiments icon (right sidebar)

2.Open experiment:

Day13_Model_Comparison


3.You should see 3 runs:

LinearRegression

DecisionTree

RandomForest

Each run will show:

Parameters ✔

RMSE ✔

R² ✔

Spark model artifact ✔

## Task 3: Build Spark ML Pipeline

Import Required Classes

In [0]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression


Define Feature Columns

In [0]:
feature_cols = ["total_orders", "is_weekend", "avg_order_value"]


Create VectorAssembler (Feature Stage)

In [0]:

assembler = VectorAssembler(
    inputCols=feature_cols,
    outputCol="features"
)


Define the Model Stage

In [0]:
lr = LinearRegression(
    featuresCol="features",
    labelCol="total_revenue"
)


Create the Pipeline


In [0]:
pipeline = Pipeline(stages=[
    assembler,
    lr
])


Train the Pipeline

In [0]:
df = spark.table("ecommerce_catalog.gold.daily_sales_features").dropna()
pipeline_model = pipeline.fit(df)


INFO:py4j.clientserver:Received command c on object id p0


Generate Predictions Using Pipeline

In [0]:
pipeline_predictions = pipeline_model.transform(df)
pipeline_predictions.select(
    "total_orders",
    "is_weekend",
    "avg_order_value",
    "total_revenue",
    "prediction"
).display()


total_orders,is_weekend,avg_order_value,total_revenue,prediction
1189507,0,300.6065023828165,357573538.82987684,363073252.7025794
1125950,0,301.5065523779545,339481302.6499579,345660427.9444033
1415671,0,299.2876306855224,423692819.4202042,428833152.8777374
1329047,1,297.7144504971646,395676497.2899051,397123422.3814404
1317309,1,301.39099258424864,397025067.050164,400983207.02472806
1198695,0,296.5300084675094,355449038.4999612,357635877.43608296
1365036,0,279.01953442247566,380871709.1899185,372661294.5832708
1342556,0,282.8748700390103,379775354.0200936,373631838.0694982
1281337,0,290.3646107934492,372054919.30024576,370208986.3050447
1492836,0,282.5372595984001,421781792.4698372,418413489.47436976


## Task 4 : Select best model

Selected Linear Regression as the final model based on lowest RMSE and highest R².
The strong linear correlation between total orders and revenue made simpler models
more effective than tree-based approaches for this dataset.
