In [0]:
# Databricks Notebook: 3_Product_Recommendation
# This notebook builds a simple recommendation model using product co-occurrence.
# We use PySpark MLlib to compute similarities between products based on co-purchase behavior.

from pyspark.sql import SparkSession
from pyspark.sql.functions import collect_set, explode
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.linalg import Vectors
import pyspark.sql.functions as F

spark = SparkSession.builder.appName("ProductRecommendation").getOrCreate()

# Read cleaned sales data
df = spark.sql("SELECT order_id, product_id FROM ecommerce_sales_cleaned")
display(df)

order_id,product_id
O1001,P101
O1001,P102
O1001,P103
O1002,P102
O1002,P104
O1002,P105
O1002,P101
O1003,P103
O1003,P106
O1004,P101


In [0]:
# Create a basket of products per order
df_basket = df.groupBy("order_id").agg(collect_set("product_id").alias("products"))
display(df_basket)

# Explode the basket so we have a row per product per order
df_exploded = df_basket.withColumn("product", explode("products"))
display(df_exploded)

order_id,products
O1006,"List(P104, P103, P105)"
O1002,"List(P104, P101, P105, P102)"
O1004,"List(P101, P108, P107, P109)"
O1009,"List(P104, P106, P105, P107)"
O1007,"List(P106, P108, P107)"
O1001,"List(P103, P101, P102)"
O1011,"List(P104, P103, P101, P105, P102)"
O1012,"List(P106, P110, P108, P107, P109)"
O1003,"List(P103, P106)"
O1014,"List(P104, P106, P110, P102, P108)"


order_id,products,product
O1006,"List(P104, P103, P105)",P104
O1006,"List(P104, P103, P105)",P103
O1006,"List(P104, P103, P105)",P105
O1002,"List(P104, P101, P105, P102)",P104
O1002,"List(P104, P101, P105, P102)",P101
O1002,"List(P104, P101, P105, P102)",P105
O1002,"List(P104, P101, P105, P102)",P102
O1004,"List(P101, P108, P107, P109)",P101
O1004,"List(P101, P108, P107, P109)",P108
O1004,"List(P101, P108, P107, P109)",P107


In [0]:
# Compute co-occurrence: For each product, find other products in the same order
df_cooccurrence = df_exploded.alias("a").join(
    df_exploded.alias("b"),
    (F.col("a.order_id") == F.col("b.order_id")) & (F.col("a.product") != F.col("b.product")),
    "inner"
).select(F.col("a.product").alias("product"), F.col("b.product").alias("co_product"))

# Count the co-occurrence frequency
df_recommendation = df_cooccurrence.groupBy("product", "co_product").count().orderBy("product", F.desc("count"))
display(df_recommendation)

# For a given product (for demo purposes), list top 5 co-purchased items
product_of_interest = "P101"  # change to a valid product_id
recommendations = df_recommendation.filter(F.col("product") == product_of_interest).limit(5)
display(recommendations)


product,co_product,count
P101,P102,5
P101,P103,4
P101,P109,3
P101,P105,3
P101,P104,2
P101,P110,2
P101,P107,2
P101,P108,1
P102,P101,5
P102,P104,3


product,co_product,count
P101,P102,5
P101,P103,4
P101,P109,3
P101,P105,3
P101,P104,2
