In [0]:
%sql
CREATE SCHEMA IF NOT EXISTS workspace.retail;

CREATE VOLUME IF NOT EXISTS workspace.retail.retail_data;


In [0]:
%sh
cd /Volumes/workspace/retail/retail_data

kaggle datasets download -d ulrikthygepedersen/online-retail-dataset


Dataset URL: https://www.kaggle.com/datasets/ulrikthygepedersen/online-retail-dataset
License(s): Attribution 4.0 International (CC BY 4.0)
Downloading online-retail-dataset.zip to /Volumes/workspace/retail/retail_data


  0%|          | 0.00/7.38M [00:00<?, ?B/s] 14%|█▎        | 1.00M/7.38M [00:00<00:01, 4.96MB/s]100%|██████████| 7.38M/7.38M [00:00<00:00, 13.9MB/s]





In [0]:
%sh
cd /Volumes/workspace/retail/retail_data

unzip -o online-retail-dataset.zip
rm -f online-retail-dataset.zip
ls -lh


Archive:  online-retail-dataset.zip
  inflating: online_retail.csv       
total 47M
-rwxrwxrwx 1 spark-acf9c6a9-3395-4f14-996a-21 nogroup 47M Jan 26 18:57 online_retail.csv


In [0]:
df = spark.read.csv(
    "/Volumes/workspace/retail/retail_data/online_retail.csv",
    header=True,
    inferSchema=True
)

df.printSchema()


root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: double (nullable = true)
 |-- Country: string (nullable = true)



In [0]:
df.write.mode("overwrite").saveAsTable("workspace.retail.online_retail")


In [0]:
%sql
SELECT * 
FROM workspace.retail.online_retail
LIMIT 5;


InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
536365,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,2010-12-01T08:26:00.000Z,2.55,17850.0,United Kingdom
536365,71053,WHITE METAL LANTERN,6,2010-12-01T08:26:00.000Z,3.39,17850.0,United Kingdom
536365,84406B,CREAM CUPID HEARTS COAT HANGER,8,2010-12-01T08:26:00.000Z,2.75,17850.0,United Kingdom
536365,84029G,KNITTED UNION FLAG HOT WATER BOTTLE,6,2010-12-01T08:26:00.000Z,3.39,17850.0,United Kingdom
536365,84029E,RED WOOLLY HOTTIE WHITE HEART.,6,2010-12-01T08:26:00.000Z,3.39,17850.0,United Kingdom


In [0]:
from pyspark.sql.functions import current_timestamp

bronze_df = spark.table("workspace.retail.online_retail") \
    .withColumn("ingestion_ts", current_timestamp())

bronze_df.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("workspace.retail.bronze_online_retail")


In [0]:
from pyspark.sql.functions import col, to_date

silver_df = spark.table("workspace.retail.bronze_online_retail") \
    .filter(col("Quantity") > 0) \
    .filter(col("UnitPrice") > 0) \
    .filter(col("CustomerID").isNotNull()) \
    .withColumn("InvoiceDate", to_date("InvoiceDate")) \
    .dropDuplicates()

silver_df.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("workspace.retail.silver_online_retail")


In [0]:
from pyspark.sql.functions import sum, countDistinct, max, datediff, current_date

gold_df = spark.table("workspace.retail.silver_online_retail")

customer_metrics = gold_df.groupBy("CustomerID").agg(
    countDistinct("InvoiceNo").alias("total_orders"),
    sum(col("Quantity") * col("UnitPrice")).alias("total_revenue"),
    max("InvoiceDate").alias("last_purchase_date")
)

customer_metrics = customer_metrics.withColumn(
    "recency_days",
    datediff(current_date(), col("last_purchase_date"))
)

customer_metrics.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("workspace.retail.gold_customer_metrics")


#ML-Workflow

In [0]:
import mlflow
import mlflow.sklearn
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression
import pandas as pd

df = spark.table("workspace.retail.gold_customer_metrics").toPandas()

features = ["total_orders", "recency_days"]
target = "total_revenue"

df = df[features + [target]].dropna()

X = df[features]
y = df[target]

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42
)

mlflow.set_experiment("/Users/reddysaad@gmail.com/retail_customer_value")

with mlflow.start_run(run_name="linear_regression_model"):

    model = LinearRegression()
    model.fit(X_train, y_train)

    r2 = model.score(X_test, y_test)

    mlflow.log_param("model", "LinearRegression")
    mlflow.log_metric("r2_score", r2)

    mlflow.sklearn.log_model(
        model,
        artifact_path="model",
        input_example=X_train.iloc[:5]
    )

print("R2 Score:", r2)


2026/01/30 18:07:58 INFO mlflow.tracking.fluent: Experiment with name '/Users/reddysaad@gmail.com/retail_customer_value' does not exist. Creating a new experiment.


R2 Score: 0.2153898963030445


#ML workflow

In [0]:
from pyspark.sql import functions as F

# Load table
df = spark.table("workspace.retail.online_retail")

# Clean data
df_clean = df.filter(
    (F.col("Quantity") > 0) &
    (F.col("UnitPrice") > 0) &
    (F.col("CustomerID").isNotNull())
)

# Customer-level features
customer_features = df_clean.groupBy("CustomerID").agg(
    F.countDistinct("InvoiceNo").alias("total_orders"),
    F.sum("Quantity").alias("total_quantity"),
    F.avg("UnitPrice").alias("avg_unit_price"),
    F.sum(F.col("Quantity") * F.col("UnitPrice")).alias("total_spent")
)

display(customer_features.limit(5))


CustomerID,total_orders,total_quantity,avg_unit_price,total_spent
17850.0,34,1733,3.960370370370371,5391.209999999999
13047.0,10,1391,3.9320348837209314,3237.54
12583.0,15,5060,3.103603238866397,7281.38
13748.0,5,439,3.996428571428571,948.25
15100.0,3,80,10.949999999999998,876.0


In [0]:
# Convert to Pandas for sklearn
pdf = customer_features.toPandas()

# Features & target
X = pdf[["total_orders", "total_quantity", "avg_unit_price"]]
y = pdf["total_spent"]


In [0]:
from sklearn.model_selection import train_test_split

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42
)




In [0]:
import mlflow
import mlflow.sklearn
from sklearn.linear_model import LinearRegression
mlflow.set_experiment("/Users/reddysaad@gmail.com/OnlineRetail_CustomerSpend")


2026/01/30 18:59:44 INFO mlflow.tracking.fluent: Experiment with name '/Users/reddysaad@gmail.com/OnlineRetail_CustomerSpend' does not exist. Creating a new experiment.


<Experiment: artifact_location='dbfs:/databricks/mlflow-tracking/3637287640498647', creation_time=1769799584846, experiment_id='3637287640498647', last_update_time=1769799584846, lifecycle_stage='active', name='/Users/reddysaad@gmail.com/OnlineRetail_CustomerSpend', tags={'mlflow.experiment.sourceName': '/Users/reddysaad@gmail.com/OnlineRetail_CustomerSpend',
 'mlflow.experimentType': 'MLFLOW_EXPERIMENT',
 'mlflow.ownerEmail': 'reddysaad@gmail.com',
 'mlflow.ownerId': '73277983628516'}>

In [0]:
with mlflow.start_run(run_name="LinearRegression_CustomerSpend"):

    # Model
    model = LinearRegression()
    model.fit(X_train, y_train)

    # Evaluation
    r2 = model.score(X_test, y_test)

    # Log params
    mlflow.log_param("model_type", "LinearRegression")
    mlflow.log_param("features", "total_orders,total_quantity,avg_unit_price")

    # Log metric
    mlflow.log_metric("r2_score", r2)

    # Log model
    mlflow.sklearn.log_model(
        model,
        artifact_path="model",
        input_example=X_train.iloc[:5]
    )

    print(f"R² Score: {r2:.4f}")




R² Score: 0.8123


In [0]:
import pandas as pd

coefficients = pd.DataFrame({
    "Feature": X.columns,
    "Impact": model.coef_
})

coefficients


Unnamed: 0,Feature,Impact
0,total_orders,48.26746
1,total_quantity,1.641612
2,avg_unit_price,1.538761


In [0]:
# Predict
pdf["predicted_spend"] = model.predict(X)

# Convert back to Spark
predictions_spark = spark.createDataFrame(pdf)

# Save as Gold ML table
predictions_spark.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("workspace.retail.gold_customer_spend_predictions")


In [0]:
%sql
SELECT CustomerID,
       total_spent,
       predicted_spend
FROM workspace.retail.gold_customer_spend_predictions
ORDER BY predicted_spend DESC
LIMIT 10;


CustomerID,total_spent,predicted_spend
14646.0,280206.02,326674.4906076904
14911.0,143825.06000000008,141361.91828055237
16446.0,168472.5,132953.80800057133
12415.0,124914.53000000004,127925.29363907114
12346.0,77183.6,121771.17211888652
17450.0,194550.78999999995,117015.94100064933
18102.0,259657.30000000005,108058.77208659648
17511.0,91062.38,107353.3207988178
13694.0,65039.62,106238.59313808416
14298.0,51527.30000000001,97791.70774471755
