In [1]:
from pyspark.sql.functions import avg, sum, count, datediff, col, round, when, lit, median, approx_percentile

StatementMeta(, e1f67b50-9756-4e10-9c63-73d8201057c5, 3, Finished, Available, Finished)

In [2]:
# Read tables in the Gold Lakehouse
Gold_dim_sellers = spark.table("dim_sellers")
Gold_dim_dates = spark.table("dim_dates")
Gold_fact_items = spark.table("fact_order_items")
Gold_fact_orders = spark.table("fact_orders")
Gold_fact_reviews = spark.table("fact_reviews")

StatementMeta(, e1f67b50-9756-4e10-9c63-73d8201057c5, 4, Finished, Available, Finished)

###### Create Vendor Metrics Table

In [3]:
# Sellers join to Order Items
seller_performance = Gold_dim_sellers.alias("s").join(
    Gold_fact_items.alias("oi"),
    col("s.seller_id") == col("oi.seller_id"),
    "left"
).select(
    # Select key columns for grouping and calculation
    col("s.seller_id"),
    col("s.seller_city"),
    col("s.seller_state"),
    col("oi.order_id"),
    col("oi.price")
)

StatementMeta(, e1f67b50-9756-4e10-9c63-73d8201057c5, 5, Finished, Available, Finished)

In [4]:
# Join to Orders (for timestamps) and Reviews
seller_performance = seller_performance.join(
    Gold_fact_orders.alias("o"),
    col("oi.order_id") == col("o.order_id"),
    "left"
).join(
    Gold_fact_reviews.alias("r"),
    col("oi.order_id") == col("r.order_id"),
    "left"
)

StatementMeta(, e1f67b50-9756-4e10-9c63-73d8201057c5, 6, Finished, Available, Finished)

In [10]:
# Group and Calculate Metrics
vendor_metrics = seller_performance.groupBy("s.seller_id", "s.seller_city", "s.seller_state").agg(
    # Sales Metrics
    round(sum(col("oi.price")), 2).alias("Total_GMV"),
    count(col("oi.order_id")).alias("Total_Orders_Count"),

    # Rating Metrics
    round(avg(col("r.review_score")), 4).alias("Avg_Review_Score"),
    count(col("r.review_id")).alias("Total_Reviews_Count"),

    # Delivery Metrics
    round(avg(
        datediff(col("o.order_delivered_carrier_date"), col("o.order_purchase_timestamp"))
    ), 2).alias("Avg_Days_to_Handover"),

    # On-Time Delivery Rate
    round(sum(
        when(col("o.order_delivered_customer_date") <= col("o.order_estimated_delivery_date"), 1).otherwise(0)
    ) * 100.0 / count(col("o.order_id")), 2).alias("On_Time_Delivery_Rate_Pct")
).filter(col("Total_Orders_Count") > 0) # Remove sellers with zero orders

StatementMeta(, e1f67b50-9756-4e10-9c63-73d8201057c5, 12, Finished, Available, Finished)

In [11]:
# Write to Gold Lakehouse as Vendor_Metrics_Tbl
vendor_metrics.write.format("delta").mode("overwrite").saveAsTable("Vendor_Metrics_Tbl")

StatementMeta(, e1f67b50-9756-4e10-9c63-73d8201057c5, 13, Finished, Available, Finished)

###### Create Vendor Category Table

In [13]:
vendor_metrics_df = spark.table("Vendor_Metrics_Tbl")

StatementMeta(, e1f67b50-9756-4e10-9c63-73d8201057c5, 15, Finished, Available, Finished)

In [14]:
# Calculate Median Sales (GMV: Gross Merchandise Value) and Average Rating for the entire dataset
# Using approx_percentile (0.5) for median is best practice in Spark for large data
thresholds = vendor_metrics_df.agg(
    approx_percentile(col("Total_GMV"), lit(0.5)).alias("sales_threshold"),
    avg(col("Avg_Review_Score")).alias("rating_threshold")
).collect()[0]

SALES_THRESHOLD = thresholds["sales_threshold"]
RATING_THRESHOLD = thresholds["rating_threshold"]

print(f"Calculated Sales Threshold (Median GMV): {SALES_THRESHOLD}")
print(f"Calculated Rating Threshold (Overall Avg): {RATING_THRESHOLD}")

StatementMeta(, e1f67b50-9756-4e10-9c63-73d8201057c5, 16, Finished, Available, Finished)

Calculated Sales Threshold (Median GMV): 832.41
Calculated Rating Threshold (Overall Avg): 3.975321754613143


In [17]:
# Apply categorisation logic
# High sales + high ratings → “top vendors”
# High sales + low ratings → “at risk” vendors
# Low sales + high ratings → “growth potential” vendors
# Low sales + low ratings --> "Low priority" vendors

vendor_category_df = vendor_metrics_df.withColumn(
    "Vendor_Category",
    when(
        (col("Total_GMV") >= SALES_THRESHOLD) & (col("Avg_Review_Score") >= RATING_THRESHOLD), lit("1 - Top Vendors")
    ).when(
        (col("Total_GMV") >= SALES_THRESHOLD) & (col("Avg_Review_Score") < RATING_THRESHOLD), lit("2 - At Risk Vendors")
    ).when(
        (col("Total_GMV") < SALES_THRESHOLD) & (col("Avg_Review_Score") >= RATING_THRESHOLD), lit("3 - Growth Potential Vendors")
    ).otherwise(
        lit("4 - Low Priority Vendors")
    )
)

StatementMeta(, e1f67b50-9756-4e10-9c63-73d8201057c5, 19, Finished, Available, Finished)

In [18]:
# Write to Gold Lakehouse as Vendor_Category_Tbl

vendor_category_df.write.format("delta").mode("overwrite").saveAsTable("Vendor_Category_Tbl")

StatementMeta(, e1f67b50-9756-4e10-9c63-73d8201057c5, 20, Finished, Available, Finished)