In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, DateType
from datetime import datetime
from pyspark.sql.functions import year, month, to_date, col, sum, count, lag, round, when, struct, max
from pyspark.sql.window import Window

# **Landing Zone**

_Orders_ _Data_

In [0]:
orders_schema = StructType([
    StructField("index",StringType(), True),
    StructField("order_id", StringType(), True),
    StructField("order_date", StringType(), True),
    StructField("item_id", StringType(), True),
    StructField("qty_ordered", IntegerType(), True),
    StructField("price", DoubleType(), True),
    StructField("value", DoubleType(), True),
    StructField("discount_amount", DoubleType(), True),
    StructField("total", DoubleType(), True),
    StructField("category", StringType(), True),
    StructField("payment_method", StringType(), True),
    StructField("cust_id", StringType(), True),
    StructField("final_status", StringType(), True)
])


In [0]:
file_paths = ["/Volumes/workspace/default/customer_and_orders_volume/orders_01_21.csv",
              "/Volumes/workspace/default/customer_and_orders_volume/orders_02_21.csv",
              "/Volumes/workspace/default/customer_and_orders_volume/orders_03_21.csv",
              "/Volumes/workspace/default/customer_and_orders_volume/orders_04_21.csv",
              "/Volumes/workspace/default/customer_and_orders_volume/orders_05_21.csv",
              "/Volumes/workspace/default/customer_and_orders_volume/orders_06_21.csv",
              "/Volumes/workspace/default/customer_and_orders_volume/orders_07_21.csv",
              "/Volumes/workspace/default/customer_and_orders_volume/orders_08_21.csv",
              "/Volumes/workspace/default/customer_and_orders_volume/orders_09_21.csv"
]
# Reading the first file with schema
orders_df = spark.read.csv(file_paths[0], header=True, schema=orders_schema)
display(orders_df.head(5))

index,order_id,order_date,item_id,qty_ordered,price,value,discount_amount,total,category,payment_method,cust_id,final_status
296,100438643,21/01/21,718698,2,6979.2,6979.2,0.0,6979.2,Mobiles & Tablets,cod,6702,cancelled
297,100439395,24/01/21,719994,2,6979.2,6979.2,0.0,6979.2,Mobiles & Tablets,cod,6702,received
298,100439912,25/01/21,721015,2,255.9,255.9,0.0,255.9,Men's Fashion,cod,6702,cancelled
407,100437996,18/01/21,717518,2,1894.6,1894.6,0.0,1894.6,Mobiles & Tablets,Easypay_MA,55731,cancelled
606,100430519,01/01/21,703609,2,49.9,49.9,0.0,49.9,Appliances,easypay_voucher,44445,cancelled


In [0]:
print(orders_df.count())

17212


In [0]:
# Reading and union the rest
for file in file_paths[1:]:
    df_part = spark.read.csv(file, header=True, schema=orders_schema)
    orders_df = orders_df.union(df_part)
orders_df.head(5).show(truncate=False)

[0;31m---------------------------------------------------------------------------[0m
[0;31mAttributeError[0m                            Traceback (most recent call last)
File [0;32m<command-6828330260292727>, line 5[0m
[1;32m      3[0m     df_part [38;5;241m=[39m spark[38;5;241m.[39mread[38;5;241m.[39mcsv(file, header[38;5;241m=[39m[38;5;28;01mTrue[39;00m, schema[38;5;241m=[39morders_schema)
[1;32m      4[0m     orders_df [38;5;241m=[39m orders_df[38;5;241m.[39munion(df_part)
[0;32m----> 5[0m orders_df[38;5;241m.[39mhead([38;5;241m5[39m)[38;5;241m.[39mshow(truncate[38;5;241m=[39m[38;5;28;01mFalse[39;00m)

[0;31mAttributeError[0m: 'list' object has no attribute 'show'

_Customers Data_

In [0]:
customers_schema = StructType([
    StructField("City", StringType(), True),
    StructField("County", StringType(), True),
    StructField("Customer Since", DateType(), True),
    StructField("E Mail", StringType(), True),
    StructField("Gender", StringType(), True),
    StructField("Place Name", StringType(), True),
    StructField("Region", StringType(), True),
    StructField("State", StringType(), True),
    StructField("Zip", StringType(), True),
    StructField("Age", IntegerType(), True),
    StructField("cust_id", StringType(), True),
    StructField("full_name", StringType(), True)
])



In [0]:
customers_df = spark.read.json("/Volumes/workspace/default/customer_and_orders_volume/customers.json")

# Cleaning column names: removal of spaces, special chars, making to lowercase
def clean_column_names(df):
    for col_name in df.columns:
        new_col_name = col_name.strip().lower().replace(" ", "_").replace("-", "_")
        df = df.withColumnRenamed(col_name, new_col_name)
    return df

# Apply cleaning
customers_df_cleaned = clean_column_names(customers_df)
customers_df_cleaned.show()



# **Staging Layer**
#### Cleaned and Partitioned Data
Now we clean, transform, and optimize this orders for downstream querying.

In [0]:
# Add partitioning columns
orders_landing_df = orders_df
orders_stage_df = orders_landing_df.withColumn("order_date_parsed", to_date("order_date", "dd/MM/yy")).withColumn("order_year", year("order_date_parsed")).withColumn("order_month", month("order_date_parsed"))
orders_stage_df.show()



We now clean customers and add in staging layer
Loading from landing

In [0]:

# customers_landing_df = spark.read.format("delta").load("/mnt/data_lake/landing/Customers_data")

# Cleaning date and email
# customers_stage_df = customers_landing_df.withColumnRenamed("Customer Since", "customer_since").withColumnRenamed("E Mail", "email")
customers_stage_df = customers_df_cleaned.withColumnRenamed("Customer Since", "customer_since").withColumnRenamed("E Mail", "email")

# Writing to staging zone
# customers_stage_df.write.format("delta").mode("overwrite").save("/mnt/data_lake/staging/Customers_data")
customers_stage_df.show()




In [0]:
# Performing JOIN and aggregation
result_dataframe = orders_stage_df.alias("o").join(customers_stage_df.alias("c"), on="cust_id", how="inner").groupBy("c.full_name", "c.e_mail").agg(count("o.order_id").alias("total_orders"),sum("o.total").alias("total_spent")).orderBy("total_spent", ascending=False)

# Showing results
result_dataframe.show(10)



# **Curated Zone**

**1. Order Volume and Revenue by Year, Month, Category**

In [0]:
order_summary_dataframe = orders_stage_df.groupBy("order_year","order_month","category").agg(count("order_id").alias("total_orders"),sum("total").alias("total_revenue"))
order_summary_dataframe.show(70)



**2. Revenue growth by categories from previous month to this month.**

In [0]:
# Filtering for received orders only
df_filtered = orders_stage_df.filter(col("final_status") == "received")

revenue_df = df_filtered.groupBy(
    year("order_date_parsed").alias("year"),
    month("order_date_parsed").alias("month"),
    "category"
).agg(sum("total").alias("revenue"))
revenue_df.show(50)



In [0]:

# Defining window to get previous month's revenue per category
window_spec = Window.partitionBy("category").orderBy("year", "month")

# Add previous month's revenue column
revenue_with_lag = revenue_df.withColumn(
    "prev_month_revenue", lag("revenue").over(window_spec)
)
revenue_with_lag.show()




In [0]:
# Calculating growth percentage
revenue_growth = revenue_with_lag.withColumn(
    "growth_percent",
    when(col("prev_month_revenue").isNotNull(),
           (col("revenue") - col("prev_month_revenue")) / col("prev_month_revenue")) * 100
          )
revenue_growth.show()




In [0]:
# Getting the latest year and month
latest_month = revenue_growth.agg(
    max(struct("year", "month")).alias("latest")
).collect()[0]["latest"]

# Filtering data for the latest month
growth_latest = revenue_growth.filter(
    (col("year") == latest_month["year"]) &
    (col("month") == latest_month["month"])
)

# Showing the revenue growth for latest month
growth_latest.select(
    "category", "year", "month",
    "revenue", "prev_month_revenue", "growth_percent"
).show()




##### 3. Which top 5 categories have a maximum number of orders and maximum number of cancellations in the month.

In [0]:
latest_month = orders_stage_df.agg(
    max(struct("order_year", "order_month")).alias("latest")
).collect()[0]["latest"]

filtered_df3 = orders_stage_df.filter(
    (col("order_year") == latest_month["order_year"]) &
    (col("order_month") == latest_month["order_month"])
)
filtered_df3.show()



In [0]:
# Top 5 Orders

top_orders = (
    filtered_df3.groupBy("category")
    .agg(count("*").alias("total_orders"))
    .orderBy(col("total_orders").desc())
    .limit(5)
)
top_orders.show()



In [0]:
# Top 5 Cancellations

top_cancellations = (
    filtered_df3.filter(col("final_status") == "cancelled")
    .groupBy("category")
    .agg(count("*").alias("cancelled_orders"))
    .orderBy(col("cancelled_orders").desc())
    .limit(5)
)
top_cancellations.show()



In [0]:
# Combined result

from pyspark.sql.functions import coalesce, lit

orders = (
    filtered_df3.groupBy("category")
    .agg(count("*").alias("total_orders"))
)

cancellations = (
    filtered_df3.filter(col("final_status") == "cancelled")
    .groupBy("category")
    .agg(count("*").alias("cancelled_orders"))
)

combined = (
    orders.join(cancellations, on="category", how="outer")
    .fillna(0)
    .orderBy(col("total_orders").desc())
)
combined.show()




##### 4 . Segment customers by age: 0-20 as young, 20-35 adults, 35-55 middle-ages and >55 Old. Find the total spend (in percentage of total spend of categories)

In [0]:
customers_segmented = customers_stage_df.withColumn(
    "age_group",
    when(col("age") <= 20, "Young")
    .when((col("age") > 20) & (col("age") <= 35), "Adults")
    .when((col("age") > 35) & (col("age") <= 55), "Middle-Age")
    .otherwise("Old"))

customers_segmented.show()



In [0]:
orders_cat_distinct = orders_stage_df.select("cust_id", "category", "total").distinct()
customers_expanded = customers_segmented.join(
    orders_cat_distinct,
    on="cust_id",
    how="left"
)
customers_expanded.show()



In [0]:
from pyspark.sql import functions as F

# Step 1: Total spend per category
total_spend_per_cat = (
    customers_expanded
    .groupBy("category")
    .agg(F.sum("total").alias("total_spend"))
)

# Step 2: Spend per age group per category
spend_by_age_cat = (
    customers_expanded
    .groupBy("category", "age_group")
    .agg(F.sum("total").alias("age_group_spend"))
)

# Step 3: Pivot age group into columns
spend_pivot = (
    spend_by_age_cat
    .groupBy("category")
    .pivot("age_group", ["Young", "Adults", "Middle-Age", "Old"])
    .sum("age_group_spend")
)

# Step 4: Join with total spend
result = (
    total_spend_per_cat
    .join(spend_pivot, on="category", how="left")
)

# Step 5: Add percentage columns
result_pct = (
    result
    .withColumn("young_%",       (F.col("Young") / F.col("total_spend") * 100).cast("double"))
    .withColumn("adult_%",       (F.col("Adults") / F.col("total_spend") * 100).cast("double"))
    .withColumn("middle_age_%",  (F.col("Middle-Age") / F.col("total_spend") * 100).cast("double"))
    .withColumn("old_%",         (F.col("Old") / F.col("total_spend") * 100).cast("double"))
)

# Step 6: Select & order columns
final_df = result_pct.select(
    "category", "total_spend",
    "young_%", "adult_%", "middle_age_%", "old_%"
)

final_df.show(truncate=False)




In [0]:
revenue_df.display()



In [0]:
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import *
from pyspark.sql.functions import col
from pyspark.sql.functions import pandas_udf
from pyspark.sql.functions import PandasUDFType  # optional in older versions

from pyspark.sql.functions import pandas_udf
from pyspark.sql.functions import PandasUDFType
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
import pandas as pd
from statsmodels.tsa.api import SimpleExpSmoothing
from statsmodels.tsa.holtwinters import SimpleExpSmoothing
import mlflow
import mlflow.pyfunc
from mlflow.models.signature import infer_signature

alpha = 0.6

# Output schema
schema = StructType([
    StructField("category", StringType()),
    StructField("year", IntegerType()),
    StructField("month", IntegerType()),
    StructField("revenue", DoubleType()),
    StructField("forecast", DoubleType()),
    StructField("abs_error", DoubleType()),
    StructField("pct_error", DoubleType())
])
mlflow.set_tracking_uri("databricks")      # or "http://127.0.0.1:5000" if local MLflow
mlflow.set_registry_uri("databricks-uc")      # or "databricks-uc" if Unity Catalog is enabled

def forecast_ses(pdf: pd.DataFrame) -> pd.DataFrame:
    pdf = pdf.sort_values(by=["year", "month"])
    
    pdf["year"] = pdf["year"].astype(float) # Cast to float for compatibility with MLflow signature
    pdf["month"] = pdf["month"].astype(float)

    if len(pdf) < 9:
        pdf["forecast"] = None
        pdf["abs_error"] = None
        pdf["pct_error"] = None
        return pdf

    ses_model = SimpleExpSmoothing(pdf["revenue"]).fit(
        smoothing_level=alpha, optimized=False
    )
    forecast = ses_model.fittedvalues

    pdf["forecast"] = forecast.shift(1)
    pdf["abs_error"] = (pdf["revenue"] - pdf["forecast"]).abs()
    pdf["pct_error"] = pdf["abs_error"] / pdf["revenue"] * 100

    return pdf


# Apply grouped forecasting
forecast_result = (
    revenue_df.groupBy("category")
    .applyInPandas(forecast_ses, schema=schema)
)

forecast_result.show()




In [0]:
input_example = revenue_df.limit(5).toPandas()

# Ensure serializable dtypes
input_example["year"] = input_example["year"].astype(float)
input_example["month"] = input_example["month"].astype(float)
input_example["revenue"] = input_example["revenue"].astype(float)

# Handle NaNs and objects
input_example = input_example.fillna(0)
for col in input_example.select_dtypes(include=["object"]).columns:
    input_example[col] = input_example[col].astype(str)

# Infer signature using cleaned input
signature = infer_signature(
    input_example,
    forecast_ses(input_example)
)

print("Cleaned input_example ready for MLflow logging")



In [0]:

import mlflow
from mlflow.models.signature import infer_signature

# Ensure any active run is ended
if mlflow.active_run():
    mlflow.end_run()

signature = infer_signature(input_example, forecast_ses(input_example))

class SESForecastModel(mlflow.pyfunc.PythonModel):
    def __init__(self, alpha=0.6):
        self.alpha = alpha

    def predict(self, context, model_input: pd.DataFrame) -> pd.DataFrame:
        return forecast_ses(model_input)
    
with mlflow.start_run() as run:
    # ✅ Save model to DBFS (no S3 / UC involved)
    model_info = mlflow.pyfunc.log_model(
        artifact_path="SES_model_updated",
        python_model=SESForecastModel(alpha=0.6),
        input_example=input_example.head(1),
        signature=signature
    )

run_id = run.info.run_id
print("Run ID:", run_id)
print("Model path (DBFS):", model_info.model_uri)



In [0]:
# Load model from DBFS path
loaded_model = mlflow.pyfunc.load_model(model_info.model_uri)

# Run predictions
predictions = loaded_model.predict(input_example)
print(predictions)




Register model in model registry. 
registered_model_name and mlflow.register_model() must be removed → they force Unity Catalog.
Databricks free workspace does not have Unity Catalog (UC) or S3 permissions.

When we call mlflow.register_model, MLflow tries to push artifacts to S3-backed UC storage, which we don’t have access to → hence the AccessDenied (s3:PutObject) errors.

That means:
✅ We can log models locally (DBFS).
❌ We cannot register them into Unity Catalog in free edition.

In [0]:
%python
print("Model URI:", model_info.model_uri)



Databricks Free Edition you can register models into the Workspace Model Registry (what you already did — your ses_forecasting_model with versions 1 and 2 is proof ✅).

What you cannot do in Free Edition:

Deploy models to a serving endpoint (real-time REST API).

Use Model Monitoring & advanced governance features.

Unity Catalog–backed registry (only Pro/Premium).

So in Free:

✅ You can train and log models with MLflow.

✅ You can register versions of the model in the Workspace registry.

✅ You can load models back from the registry inside your notebooks or jobs and call .predict().

❌ You cannot create a REST-serving endpoint with w.serving_endpoints.create.

👉 In other words:

Registering = Allowed