In [0]:
%sql
CREATE CATALOG IF NOT EXISTS pizza_factory;
USE CATALOG pizza_factory;

In [0]:
%sql
CREATE SCHEMA IF NOT EXISTS pizza_factory.bronze;
CREATE SCHEMA IF NOT EXISTS pizza_factory.silver;
CREATE SCHEMA IF NOT EXISTS pizza_factory.gold;

In [0]:
import pandas as pd
import numpy as np
from datetime import datetime, timedelta

# 1. Generate Raw Data
np.random.seed(42)
data_size = 1000

data = {
    "order_id": range(1001, 1001 + data_size),
    "customer_id": np.random.randint(5000, 6000, size=data_size),
    "pizza_type": np.random.choice(['Margherita', 'Pepperoni', 'Veggie', 'BBQ Chicken'], data_size),
    "distance_km": np.round(np.random.uniform(1.0, 10.0, size=data_size), 2),
    "traffic_level": np.random.choice(['Low', 'Medium', 'High'], data_size, p=[0.4, 0.4, 0.2]),
    "order_timestamp": [datetime(2023, 10, 1) + timedelta(minutes=i*15) for i in range(data_size)],
    "prep_time_mins": np.random.randint(10, 25, size=data_size) 
}

df_raw = pd.DataFrame(data)



In [0]:
# 2. Save as Bronze Table (Raw format)
spark_df = spark.createDataFrame(df_raw)
spark_df.write.format("delta").mode("overwrite").saveAsTable("pizza_factory.bronze.bronze_orders")

print("Bronze table created successfully!")

Bronze table created successfully!


In [0]:
%sql
select * from pizza_factory.bronze.bronze_orders;

order_id,customer_id,pizza_type,distance_km,traffic_level,order_timestamp,prep_time_mins
1001,5102,Margherita,7.35,Medium,2023-10-01T00:00:00.000Z,10
1002,5435,Margherita,9.33,Medium,2023-10-01T00:15:00.000Z,15
1003,5860,Veggie,2.63,Low,2023-10-01T00:30:00.000Z,10
1004,5270,BBQ Chicken,6.11,High,2023-10-01T00:45:00.000Z,14
1005,5106,Margherita,9.24,Low,2023-10-01T01:00:00.000Z,18
1006,5071,Pepperoni,1.31,Medium,2023-10-01T01:15:00.000Z,21
1007,5700,Margherita,7.28,High,2023-10-01T01:30:00.000Z,23
1008,5020,Pepperoni,3.68,Medium,2023-10-01T01:45:00.000Z,14
1009,5614,BBQ Chicken,9.32,High,2023-10-01T02:00:00.000Z,15
1010,5121,Veggie,9.74,Low,2023-10-01T02:15:00.000Z,17


In [0]:
from pyspark.sql.functions import col, when, hour, dayofweek

# Load the Bronze data
bronze_df = spark.read.table("pizza_factory.bronze.bronze_orders")

# Logic: Calculate Actual Delivery Time 
# (Prep Time + Travel Time based on distance and traffic)
silver_df = bronze_df.withColumn(
    "travel_time_mins", 
    (col("distance_km") * when(col("traffic_level") == "Heavy", 6)
                         .when(col("traffic_level") == "Moderate", 4)
                         .otherwise(2.5))
).withColumn(
    "total_delivery_time", col("prep_time_mins") + col("travel_time_mins")
)

# Rule: If > 30 mins, refund_eligible = 1
silver_df = silver_df.withColumn(
    "is_late_refund", when(col("total_delivery_time") > 30, 1).otherwise(0)
)

# Feature Engineering for ML: Extract Hour and Day
silver_df = silver_df.withColumn("order_hour", hour(col("order_timestamp"))) \
                     .withColumn("day_of_week", dayofweek(col("order_timestamp")))

# Save as Silver
silver_df.write.format("delta").mode("overwrite").option("mergeSchema", "true").saveAsTable("pizza_factory.silver.silver_orders_cleaned")

print("Table 'pizza_factory.silver_orders_cleaned' created with refund logic.")

Table 'pizza_factory.silver_orders_cleaned' created with refund logic.


In [0]:
%sql
select * from pizza_factory.silver.silver_orders_cleaned;

order_id,customer_id,pizza_type,distance_km,traffic_level,order_timestamp,prep_time_mins,travel_time_mins,total_delivery_time,is_late_refund,order_hour,day_of_week
1001,5102,Margherita,7.35,Medium,2023-10-01T00:00:00.000Z,10,18.375,28.375,0,0,1
1002,5435,Margherita,9.33,Medium,2023-10-01T00:15:00.000Z,15,23.325,38.325,1,0,1
1003,5860,Veggie,2.63,Low,2023-10-01T00:30:00.000Z,10,6.574999999999999,16.575,0,0,1
1004,5270,BBQ Chicken,6.11,High,2023-10-01T00:45:00.000Z,14,15.275,29.275,0,0,1
1005,5106,Margherita,9.24,Low,2023-10-01T01:00:00.000Z,18,23.1,41.1,1,1,1
1006,5071,Pepperoni,1.31,Medium,2023-10-01T01:15:00.000Z,21,3.2750000000000004,24.275,0,1,1
1007,5700,Margherita,7.28,High,2023-10-01T01:30:00.000Z,23,18.2,41.2,1,1,1
1008,5020,Pepperoni,3.68,Medium,2023-10-01T01:45:00.000Z,14,9.2,23.200000000000003,0,1,1
1009,5614,BBQ Chicken,9.32,High,2023-10-01T02:00:00.000Z,15,23.3,38.3,1,2,1
1010,5121,Veggie,9.74,Low,2023-10-01T02:15:00.000Z,17,24.35,41.35,1,2,1


In [0]:
%sql
CREATE OR REPLACE TABLE pizza_factory.gold.gold_refund_analysis AS
SELECT 
    pizza_type,
    traffic_level,
    COUNT(order_id) as total_orders,
    SUM(is_late_refund) as free_pizzas_given,
    ROUND((SUM(is_late_refund) / COUNT(order_id)) * 100, 2) as refund_rate_percentage,
    ROUND(AVG(total_delivery_time), 2) as avg_delivery_minutes,
    30 AS guarantee_limit
FROM pizza_factory.silver.silver_orders_cleaned
GROUP BY pizza_type, traffic_level
ORDER BY refund_rate_percentage DESC;

num_affected_rows,num_inserted_rows


In [0]:
%sql
select * from pizza_factory.gold.gold_refund_analysis

pizza_type,traffic_level,total_orders,free_pizzas_given,refund_rate_percentage,avg_delivery_minutes,total_refund_count,guarantee_limit
Pepperoni,High,46,31,67.39,32.82,31,30
Margherita,High,45,29,64.44,32.03,29,30
Margherita,Low,108,69,63.89,32.2,69,30
Veggie,Low,105,63,60.0,31.43,63,30
Veggie,High,62,36,58.06,33.12,36,30
Margherita,Medium,111,62,55.86,31.42,62,30
BBQ Chicken,High,52,29,55.77,31.36,29,30
Pepperoni,Low,96,53,55.21,30.18,53,30
BBQ Chicken,Medium,75,38,50.67,30.39,38,30
BBQ Chicken,Low,87,40,45.98,30.1,40,30


Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

In [0]:
import mlflow
import mlflow.sklearn
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier

# Load Silver Data
data = spark.read.table("pizza_factory.silver.silver_orders_cleaned").toPandas()

# Select Features and Target
X = data[['distance_km', 'prep_time_mins', 'order_hour', 'day_of_week']]
y = data['is_late_refund']

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)

with mlflow.start_run(run_name="Pizza_Late_Predictor"):
    model = RandomForestClassifier(n_estimators=100)
    model.fit(X_train, y_train)
    
    # Log Metrics
    accuracy = model.score(X_test, y_test)
    mlflow.log_metric("accuracy", accuracy)
    
    # Log Model
    mlflow.sklearn.log_model(model, "random-forest-pizza-model")
    
    print(f"Model trained with accuracy: {accuracy}")



Model trained with accuracy: 0.98


In [0]:
import mlflow
from pyspark.sql.functions import struct, col

# 1. Load the model as a UDF
# Make sure your last run was the successful training run
run_id = mlflow.last_active_run().info.run_id
model_uri = f"runs:/{run_id}/random-forest-pizza-model"
predict_late_udf = mlflow.pyfunc.spark_udf(spark, model_uri)

# 2. Load the Silver data 
# (Using the table name from your screenshot error message)
silver_df = spark.read.table("pizza_factory.silver.silver_orders_cleaned")

# 3. Apply Prediction
# We wrap the features in struct() so the model receives them as one 'row'
features = ["distance_km", "prep_time_mins", "order_hour", "day_of_week"]

predictions_df = silver_df.withColumn(
    "predicted_is_late", 
    predict_late_udf(struct(*[col(c) for c in features]))
)

# 4. View Results
display(predictions_df.select("order_id", "total_delivery_time", "is_late_refund", "predicted_is_late"))

2026/01/31 17:36:13 INFO mlflow.models.flavor_backend_registry: Selected backend for flavor 'python_function'


order_id,total_delivery_time,is_late_refund,predicted_is_late
1001,28.375,0,0.0
1002,38.325,1,1.0
1003,16.575,0,0.0
1004,29.275,0,0.0
1005,41.1,1,1.0
1006,24.275,0,0.0
1007,41.2,1,1.0
1008,23.200000000000003,0,0.0
1009,38.3,1,1.0
1010,41.35,1,1.0
