In [0]:
features_df = spark.table("sales_databricks_workspace.gold.sales_details")

In [0]:
features_df.display()

ITEM_ID,BRANCH_ID,ORDER_DATE,DAILY_UNITS_SOLD,ROLLING_7D_UNITS,ROLLING_30D_UNITS,LAG_1D_UNITS
13563,180-OS1,2023-01-16,1,1.5,1.5,2.0
13563,180-OS1,2023-05-01,2,1.6666666666666667,1.6666666666666667,1.0
13563,180-OS1,2023-08-05,8,3.25,3.25,2.0
13563,223-EL2,2022-09-27,6,5.75,5.75,7.0
13563,223-EL2,2023-05-11,1,4.8,4.8,6.0
13563,716-BU4,2022-05-25,8,2.5,2.4,2.0
13563,716-BU4,2022-06-05,1,2.375,2.272727272727273,8.0
13563,716-BU4,2022-08-01,2,2.5,2.25,1.0
13563,716-BU4,2022-08-26,4,2.875,2.3846153846153846,2.0
13563,716-BU4,2022-09-05,2,3.0,2.357142857142857,4.0


In [0]:
target_col = "DAILY_UNITS_SOLD"
feature_cols = [
    "ROLLING_7D_UNITS",
    "ROLLING_30D_UNITS",
    "LAG_1D_UNITS"
]



In [0]:
ml_df = features_df.dropna(subset=feature_cols + [target_col])


In [0]:
ml_df.display()

ITEM_ID,BRANCH_ID,ORDER_DATE,DAILY_UNITS_SOLD,ROLLING_7D_UNITS,ROLLING_30D_UNITS,LAG_1D_UNITS
12754,11-AD2,2021-04-06,3,2.0,2.0,1
12754,11-AD2,2021-06-06,1,1.6666666666666667,1.6666666666666667,3
12754,11-AD2,2021-07-14,7,3.0,3.0,1
12754,11-AD2,2021-08-10,8,4.0,4.0,7
12754,11-AD2,2022-04-03,5,4.166666666666667,4.166666666666667,8
12754,131-HA3,2021-03-28,7,6.0,6.0,5
12754,131-HA3,2021-05-18,1,4.333333333333333,4.333333333333333,7
12754,131-HA3,2021-11-28,3,4.0,4.0,1
12754,131-HA3,2022-04-14,3,3.8,3.8,3
12754,132-IS1,2021-11-07,2,4.5,4.5,7


In [0]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=feature_cols,
    outputCol="features"
)

ml_ready_df = assembler.transform(ml_df).select("features", target_col)


In [0]:
ml_ready_df.display()

features,DAILY_UNITS_SOLD
"{""type"":""1"",""size"":null,""indices"":null,""values"":[""2.0"",""2.0"",""1.0""]}",3
"{""type"":""1"",""size"":null,""indices"":null,""values"":[""1.6666666666666667"",""1.6666666666666667"",""3.0""]}",1
"{""type"":""1"",""size"":null,""indices"":null,""values"":[""3.0"",""3.0"",""1.0""]}",7
"{""type"":""1"",""size"":null,""indices"":null,""values"":[""4.0"",""4.0"",""7.0""]}",8
"{""type"":""1"",""size"":null,""indices"":null,""values"":[""4.166666666666667"",""4.166666666666667"",""8.0""]}",5
"{""type"":""1"",""size"":null,""indices"":null,""values"":[""6.0"",""6.0"",""5.0""]}",7
"{""type"":""1"",""size"":null,""indices"":null,""values"":[""4.333333333333333"",""4.333333333333333"",""7.0""]}",1
"{""type"":""1"",""size"":null,""indices"":null,""values"":[""4.0"",""4.0"",""1.0""]}",3
"{""type"":""1"",""size"":null,""indices"":null,""values"":[""3.8"",""3.8"",""3.0""]}",3
"{""type"":""1"",""size"":null,""indices"":null,""values"":[""4.5"",""4.5"",""7.0""]}",2


In [0]:
train_df, test_df = ml_ready_df.randomSplit([0.8, 0.2], seed=42)


In [0]:
import mlflow
import mlflow.spark

from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator



In [0]:
%sql
CREATE SCHEMA IF NOT EXISTS sales_databricks_workspace.ml; 
CREATE VOLUME IF NOT EXISTS sales_databricks_workspace.ml.volumes;


In [0]:
import os

os.environ["MLFLOW_DFS_TMP"] = "/Volumes/sales_databricks_workspace/ml/volumes"


In [0]:
from mlflow.models.signature import infer_signature

with mlflow.start_run(run_name="daily_sales_forecast_rf"):
    
    rf = RandomForestRegressor(
        featuresCol="features",
        labelCol=target_col,
        numTrees=50,
        maxDepth=10,
        seed=42
    )
    
    model = rf.fit(train_df)
    predictions = model.transform(test_df)
    
    evaluator = RegressionEvaluator(
        labelCol=target_col,
        predictionCol="prediction",
        metricName="rmse"
    )
    
    rmse = evaluator.evaluate(predictions)
    
    mlflow.log_param("numTrees", 50)
    mlflow.log_param("maxDepth", 10)
    mlflow.log_metric("rmse", rmse)
    
    input_example = ml_df.select(feature_cols).limit(5).toPandas()
    signature = infer_signature(input_example, predictions.select("prediction").limit(5).toPandas())
    
    mlflow.spark.log_model(
        model,
        "daily_sales_forecast_rf",
        input_example=input_example,
        signature=signature
    )


{"ts": "2026-02-08 18:03:33.006", "level": "ERROR", "logger": "pyspark.sql.connect.logging", "msg": "GRPC Error received", "context": {}, "exception": {"class": "_MultiThreadedRendezvous", "msg": "<_MultiThreadedRendezvous of RPC that terminated with:\n\tstatus = StatusCode.INTERNAL\n\tdetails = \"[FIELD_NOT_FOUND] No such struct field `features` in `ROLLING_7D_UNITS`, `ROLLING_30D_UNITS`, `LAG_1D_UNITS`. SQLSTATE: 42704\"\n\tdebug_error_string = \"UNKNOWN:Error received from peer  {created_time:\"2026-02-08T18:03:33.005683932+00:00\", grpc_status:13, grpc_message:\"[FIELD_NOT_FOUND] No such struct field `features` in `ROLLING_7D_UNITS`, `ROLLING_30D_UNITS`, `LAG_1D_UNITS`. SQLSTATE: 42704\"}\"\n>", "stacktrace": [{"class": null, "method": "_execute_and_fetch_as_iterator", "file": "/databricks/python/lib/python3.12/site-packages/pyspark/sql/connect/client/core.py", "line": "2019"}, {"class": null, "method": "__next__", "file": "<frozen _collections_abc>", "line": "356"}, {"class": null

In [0]:
from mlflow.models.signature import infer_signature

with mlflow.start_run(run_name="daily_sales_forecast_rf"):
    
    rf = RandomForestRegressor(
        featuresCol="features",
        labelCol=target_col,
        numTrees=50,
        maxDepth=10,
        seed=42
    )
    
    model = rf.fit(train_df)
    predictions = model.transform(test_df)
    
    evaluator = RegressionEvaluator(
        labelCol=target_col,
        predictionCol="prediction",
        metricName="rmse"
    )
    
    rmse = evaluator.evaluate(predictions)
    
    mlflow.log_param("numTrees", 50)
    mlflow.log_param("maxDepth", 10)
    mlflow.log_metric("rmse", rmse)
    
    input_example = (
        ml_ready_df
        .select("features")
        .limit(5)
        .toPandas()
    )
    
    signature = infer_signature(
        input_example,
        predictions.select("prediction").limit(5).toPandas()
    )
    
    mlflow.spark.log_model(
        model,
        "daily_sales_forecast_rf",
        input_example=input_example,
        signature=signature
    )




INFO:py4j.clientserver:Received command c on object id p0


com.databricks.backend.common.rpc.CommandCancelledException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$5(SequenceExecutionState.scala:139)
	at scala.Option.getOrElse(Option.scala:201)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:139)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:136)
	at scala.collection.immutable.Range.foreach(Range.scala:192)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:136)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:721)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:441)
	at scala.Option.getOrElse(Option.scala:201)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:441)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.can

In [0]:
num_trees_list = [20, 55, 100]
max_depth_list = [5, 10, 15]


In [0]:
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
import mlflow
from mlflow.models.signature import infer_signature

evaluator = RegressionEvaluator(
    labelCol=target_col,
    predictionCol="prediction",
    metricName="rmse"
)

for numTrees in num_trees_list:
    for maxDepth in max_depth_list:

        run_name = f"rf_trees_{numTrees}_depth_{maxDepth}"

        with mlflow.start_run(run_name=run_name):

            rf = RandomForestRegressor(
                featuresCol="features",
                labelCol=target_col,
                numTrees=numTrees,
                maxDepth=maxDepth,
                seed=42
            )

            model = rf.fit(train_df)
            predictions = model.transform(test_df)

            rmse = evaluator.evaluate(predictions)

            # log params & metric
            mlflow.log_param("numTrees", numTrees)
            mlflow.log_param("maxDepth", maxDepth)
            mlflow.log_metric("rmse", rmse)

            # log model (optional but good)
            input_example = ml_ready_df.select("features").limit(5).toPandas()
            signature = infer_signature(
                input_example,
                predictions.select("prediction").limit(5).toPandas()
            )

            mlflow.spark.log_model(
                model,
                "daily_sales_forecast_rf",
                input_example=input_example,
                signature=signature
            )

            print(f"Finished run: trees={numTrees}, depth={maxDepth}, rmse={rmse}")




[0;31m---------------------------------------------------------------------------[0m
[0;31mTypeError[0m                                 Traceback (most recent call last)
File [0;32m<command-7673323432158509>, line 44[0m
[1;32m     38[0m input_example [38;5;241m=[39m ml_ready_df[38;5;241m.[39mselect([38;5;124m"[39m[38;5;124mfeatures[39m[38;5;124m"[39m)[38;5;241m.[39mlimit([38;5;241m5[39m)[38;5;241m.[39mtoPandas()
[1;32m     39[0m signature [38;5;241m=[39m infer_signature(
[1;32m     40[0m     input_example,
[1;32m     41[0m     predictions[38;5;241m.[39mselect([38;5;124m"[39m[38;5;124mprediction[39m[38;5;124m"[39m)[38;5;241m.[39mlimit([38;5;241m5[39m)[38;5;241m.[39mtoPandas()
[1;32m     42[0m )
[0;32m---> 44[0m mlflow[38;5;241m.[39mspark[38;5;241m.[39mlog_model(
[1;32m     45[0m     model,
[1;32m     46[0m     [38;5;124m"[39m[38;5;124mdaily_sales_forecast_rf[39m[38;5;124m"[39m,
[1;32m     47[0m     input_example[38;5;241