In [1]:
import numpy as np
import os

os.environ['PYSPARK_SUBMIT_ARGS'] = (
  '--packages org.mongodb.spark:mongo-spark-connector_2.12:3.0.2 '
  'pyspark-shell'
)

# Spark SQL
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import ArrayType, DoubleType
from pyspark.sql.window import Window
from pyspark.sql.functions import col, sum, when, udf, row_number

# ML pipelines & features
from pyspark.ml import Pipeline
from pyspark.ml.feature import (
    StringIndexer,
    OneHotEncoder,
    VectorAssembler,
    MinMaxScaler,
    Word2Vec
)

# ALS recommendation
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

# Clustering
from pyspark.ml.clustering import KMeans

# Linear algebra types
from pyspark.ml.linalg import Vectors, DenseVector, VectorUDT


In [2]:
MONGO_URI = "mongodb+srv://bigdata:JyRgEeuQ3X0Uz29g@retaildb.e9pmb.mongodb.net/retaildb.transactions?retryWrites=true&w=majority"

# Build or retrieve a Spark session named "Recommendation" with tuned resources, parallelism, and Kryo serialization
spark = SparkSession.builder \
    .appName("Recommendation") \
    .master("local[*]") \
    .config("spark.executor.memory", "8g") \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.cores", "8") \
    .config("spark.default.parallelism", "8") \
    .config("spark.sql.shuffle.partitions", "8") \
    .config("spark.memory.fraction", "0.8") \
    .config("spark.memory.storageFraction", "0.3") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.kryoserializer.buffer.max", "1024m") \
    .config("spark.mongodb.input.uri",  MONGO_URI) \
    .config("spark.mongodb.output.uri", MONGO_URI) \
    .getOrCreate()

# Only log error-level messages to reduce console verbosity
spark.sparkContext.setLogLevel("ERROR")

25/04/17 22:01:57 WARN Utils: Your hostname, Ilker-MacBook-Pro-2.local resolves to a loopback address: 127.0.0.1; using 172.20.10.2 instead (on interface en0)
25/04/17 22:01:57 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Ivy Default Cache set to: /Users/ilkeryasincakir/.ivy2/cache
The jars for the packages stored in: /Users/ilkeryasincakir/.ivy2/jars
org.mongodb.spark#mongo-spark-connector_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-098784b9-0288-48fe-8a12-2bb880026b05;1.0
	confs: [default]
	found org.mongodb.spark#mongo-spark-connector_2.12;3.0.2 in central


:: loading settings :: url = jar:file:/Users/ilkeryasincakir/Developer/Spark/spark-3.5.3-bin-hadoop3/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


	found org.mongodb#mongodb-driver-sync;4.0.5 in central
	found org.mongodb#bson;4.0.5 in central
	found org.mongodb#mongodb-driver-core;4.0.5 in central
:: resolution report :: resolve 99ms :: artifacts dl 4ms
	:: modules in use:
	org.mongodb#bson;4.0.5 from central in [default]
	org.mongodb#mongodb-driver-core;4.0.5 from central in [default]
	org.mongodb#mongodb-driver-sync;4.0.5 from central in [default]
	org.mongodb.spark#mongo-spark-connector_2.12;3.0.2 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   4   |   0   |   0   |   0   ||   4   |   0   |
	---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-098784b9-0288-48fe-8a12-2bb88002

In [3]:
def read_from_mongodb(spark):
    """Read data from MongoDB using Spark MongoDB connector"""
    print("Reading data from MongoDB...")
    
    # Read data from MongoDB
    df = spark.read \
        .format("mongo") \
        .option("database", "RetailDB") \
        .option("collection", "transactions") \
        .load()
    
    print(f"Successfully read {df.count()} records from MongoDB")
    return df

In [4]:
# Load data from a CSV file into a Spark DataFrame, using the first row as column headers
# df = spark.read \
#     .option("header", "true") \
#     .csv("new_cleaned_data.csv")

df = read_from_mongodb(spark)
# Show the DataFrame’s schema (column names and types) to confirm successful load and structure
df.printSchema()

Reading data from MongoDB...




Successfully read 301171 records from MongoDB
root
 |-- Age: double (nullable = true)
 |-- Country: string (nullable = true)
 |-- Customer_ID: double (nullable = true)
 |-- Customer_Segment: string (nullable = true)
 |-- Feedback: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Income: double (nullable = true)
 |-- Month: string (nullable = true)
 |-- Payment_Method: string (nullable = true)
 |-- Product_Brand: string (nullable = true)
 |-- Product_Category: string (nullable = true)
 |-- Product_Type: string (nullable = true)
 |-- Ratings: double (nullable = true)
 |-- Shipping_Method: string (nullable = true)
 |-- Total_Purchases: double (nullable = true)
 |-- Transaction_ID: integer (nullable = true)
 |-- Year: double (nullable = true)
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- products: string (nullable = true)



                                                                                

In [5]:
# Define the string columns to convert into numeric indices (tweak this list to match your dataset)
string_columns = [
    "Customer_ID", 
    "Country", 
    "Gender", 
    "Customer_Segment",
    "Product_Category", 
    "Product_Brand", 
    "Product_Type",
    "Shipping_Method", 
    "Payment_Method",
    "products", 
    "Transaction_ID"
]

# Create a StringIndexer stage for each column to map strings → indices, using handleInvalid="keep" 
# so unseen labels won’t break the pipeline
indexers = [
    StringIndexer(inputCol=col, outputCol=col + "_index", handleInvalid="keep")
    for col in string_columns
]

# Chain all indexers into a single Pipeline, fit it on df, and transform to get indexed_df
pipeline = Pipeline(stages=indexers)
indexed_df = pipeline.fit(df).transform(df)

# Build a Python dict mapping each numeric product index back to its original name
product_id_to_name = (
    indexed_df
    .select("products_index", "products")
    .distinct()
    .rdd
    .map(lambda r: (int(r["products_index"]), r["products"]))
    .collectAsMap()
)

# Replace each original string column with its indexed version:
# – For 'products', keep the original string and rename products_index → product_id
# – For all other columns, drop the string col and rename the index col back to the original name
for col_name in string_columns:
    if col_name == "products":
        indexed_df = indexed_df.withColumnRenamed("products_index", "product_id")
    else:
        indexed_df = (
            indexed_df
            .drop(col_name)
            .withColumnRenamed(f"{col_name}_index", col_name)
        )


# Convert categorical feedback into a numeric rating scale (Bad→1.0, Average→2.0, Good→3.0, Excellent→4.0)
indexed_df = indexed_df.withColumn(
    "Feedback_Rating",
    when(col("Feedback") == "Bad", 1.0)
    .when(col("Feedback") == "Average", 2.0)
    .when(col("Feedback") == "Good", 3.0)
    .when(col("Feedback") == "Excellent", 4.0)
    .otherwise(None)  # Assign null if feedback is missing or unexpected
)

# Drop the now-redundant original 'Feedback' column
indexed_df = indexed_df.drop("Feedback")

# Display a sample of the transformed DataFrame to verify changes
indexed_df.show(5)


                                                                                

+----+------+-------+-------+---------------+------+--------------------+--------------------+-----------+-------+------+----------------+----------------+-------------+------------+---------------+--------------+----------+--------------+---------------+
| Age|Income|  Month|Ratings|Total_Purchases|  Year|                 _id|            products|Customer_ID|Country|Gender|Customer_Segment|Product_Category|Product_Brand|Product_Type|Shipping_Method|Payment_Method|product_id|Transaction_ID|Feedback_Rating|
+----+------+-------+-------+---------------+------+--------------------+--------------------+-----------+-------+------+----------------+----------------+-------------+------------+---------------+--------------+----------+--------------+---------------+
|42.0|   2.2|January|    5.0|            5.0|2024.0|{680144a6a7ade10f...|             Science|    12573.0|    4.0|   1.0|             2.0|             3.0|          9.0|         2.0|            0.0|           0.0|      34.0|      27

In [6]:
# Convert month names or numeric strings into a numeric Month_Index (1–12), defaulting to 0.0 for any unexpected values
indexed_df = indexed_df.withColumn(
    "Month_Index",
    when((col("Month") == "January")   | (col("Month") == "1.0"),  1)
   .when((col("Month") == "February")  | (col("Month") == "2.0"),  2)
   .when((col("Month") == "March")     | (col("Month") == "3.0"),  3)
   .when((col("Month") == "April")     | (col("Month") == "4.0"),  4)
   .when((col("Month") == "May")       | (col("Month") == "5.0"),  5)
   .when((col("Month") == "June")      | (col("Month") == "6.0"),  6)
   .when((col("Month") == "July")      | (col("Month") == "7.0"),  7)
   .when((col("Month") == "August")    | (col("Month") == "8.0"),  8)
   .when((col("Month") == "September") | (col("Month") == "9.0"),  9)
   .when((col("Month") == "October")   | (col("Month") == "10.0"), 10)
   .when((col("Month") == "November")  | (col("Month") == "11.0"), 11)
   .when((col("Month") == "December")  | (col("Month") == "12.0"), 12)
   .otherwise(0.0)
)

# Remove the original 'Month' column now that Month_Index is available
indexed_df = indexed_df.drop("Month")


In [None]:
# Count and display nulls in each column to assess data completeness
null_counts = indexed_df.select([
    sum(col(c).isNull().cast("int")).alias(c) 
    for c in indexed_df.columns
])
null_counts.show()


# Cast key fields to numeric types for downstream processing
indexed_df = indexed_df \
    .withColumn("Ratings",      col("Ratings").cast("Double")) \
    .withColumn("Age",          col("Age").cast("Int")) \
    .withColumn("Year",         col("Year").cast("Int")) \
    .withColumn("Income",       col("Income").cast("Double")) \
    .withColumn("Customer_ID",  col("Customer_ID").cast("Int")) \
    .withColumn("product_id",   col("product_id").cast("Int"))

# Define mixing weight for explicit ratings vs. feedback-derived ratings
alpha = 0.5

# Compute a combined rating: blend the original Ratings and the Feedback_Rating
indexed_df = indexed_df.withColumn(
    "Combined_Rating",
    alpha * col("Ratings") + (1 - alpha) * col("Feedback_Rating")
)

# Quick peek at the three rating columns to verify the new calculation
indexed_df.select("Ratings", "Feedback_Rating", "Combined_Rating").show(5)

[Stage 41:>                                                         (0 + 3) / 3]

In [None]:
# Specify a clear, logical ordering of columns to improve readability and downstream processing
reordered_cols = [
    "Customer_ID", 
    "Age", 
    "Gender", 
    "Country", 
    "Income", 
    "Customer_Segment",
    "Transaction_ID", 
    "Year", 
    "Month_Index", 
    "Payment_Method", 
    "Shipping_Method", 
    "Total_Purchases", 
    "Products",
    "Product_Category", 
    "Product_Brand", 
    "Product_Type",
    "Ratings", 
    "Feedback_Rating",
    "Combined_Rating"
]

# Reorder the DataFrame columns according to the specified sequence
indexed_df = indexed_df.select(reordered_cols)

# Show a quick sample to verify that columns have been reordered correctly
indexed_df.show(5)

### Recommendation Phase 1

1. Generate Recommendations with Ratings

In [None]:
# Index the 'Products' column to numeric values, keeping unseen labels
productIndexer = StringIndexer(
    inputCol="Products", 
    outputCol="Products_Index", 
    handleInvalid="keep"
)
indexed_df = productIndexer.fit(indexed_df).transform(indexed_df)

# Split the data into 80% training and 20% test sets (seed for reproducibility)
train, test = indexed_df.randomSplit([0.8, 0.2], seed=42)

# Configure an ALS model for collaborative filtering on customer–product interactions
als = ALS(
    userCol="Customer_ID",         # customer identifier
    itemCol="Products_Index",      # product identifier (indexed)
    ratingCol="Ratings",           # explicit rating column
    rank=10,                       # number of latent factors
    maxIter=10,                    # number of optimization iterations
    regParam=0.1,                  # regularization parameter
    coldStartStrategy="drop"       # drop NaN predictions during evaluation
)

# Train the ALS model on the training set
model = als.fit(train)

# Generate predicted ratings for the test set
predictions = model.transform(test)

# Produce top-5 recommendations for every user
userRecs = model.recommendForAllUsers(5)
userRecs.show(5, truncate=False)


In [None]:
# Configure an evaluator to measure the model’s prediction accuracy using RMSE
evaluator = RegressionEvaluator(
    metricName="rmse",            # use root-mean-square error
    labelCol="Ratings",           # ground-truth ratings column
    predictionCol="prediction"    # model’s predicted ratings
)

# Compute and display the RMSE on the test dataset
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error (RMSE):", rmse)


2. Generate Recommendations with Feedbacks

In [None]:
# Set up a separate ALS model to learn from implicit feedback (Feedback_Rating) instead of explicit Ratings
als_fb = ALS(
    userCol="Customer_ID",          # customer identifier
    itemCol="Products_Index",       # product identifier (indexed)
    ratingCol="Feedback_Rating",    # use the feedback-derived rating as the target
    rank=10,                        # number of latent factors
    maxIter=10,                     # iterations for matrix factorization
    regParam=0.1,                   # regularization strength
    coldStartStrategy="drop"        # drop NaN predictions during recommendation
)

# Train the feedback-based ALS model on the training set
als_model_fb = als_fb.fit(train)

# Generate top-5 recommendations per user based on feedback patterns
userRecs = als_model_fb.recommendForAllUsers(5)
userRecs.show(5, truncate=False)

# Create predictions on the test set to evaluate feedback-based model quality
predictions = als_model_fb.transform(test)

In [None]:
# Initialize RMSE evaluator comparing predicted vs. actual feedback ratings
evaluator = RegressionEvaluator(
    metricName="rmse",             # measure root-mean-square error
    labelCol="Feedback_Rating",    # true feedback values
    predictionCol="prediction"     # model’s feedback predictions
)

# Compute and print RMSE for the feedback-based ALS model
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error (RMSE) for feedback-based model:", rmse)

3. Generate Recommendations with Combined Ratings

In [None]:
# Configure an ALS model using the blended Combined_Rating (explicit + feedback)
als_combined = ALS(
    userCol="Customer_ID",           # customer identifier
    itemCol="Products_Index",        # product identifier (indexed)
    ratingCol="Combined_Rating",     # use the precomputed weighted rating
    rank=10,                         # number of latent factors
    maxIter=10,                      # optimization iterations
    regParam=0.1,                    # regularization strength to prevent overfitting
    coldStartStrategy="drop"         # drop NaN predictions during evaluation
)

# Train the combined-rating ALS model on the training dataset
als_model_combined = als_combined.fit(train)

# Generate top-5 product recommendations per user based on the combined model
userRecs = als_model_combined.recommendForAllUsers(5)
userRecs.show(5, truncate=False)

# Produce predictions on the test set to assess model performance
predictions = als_model_combined.transform(test)

In [None]:
# Set up an evaluator to compute RMSE between predicted and actual Combined_Rating
evaluator = RegressionEvaluator(
    metricName="rmse",             # root-mean-square error metric
    labelCol="Combined_Rating",    # ground-truth combined rating
    predictionCol="prediction"     # model’s predicted rating
)

# Calculate and print the RMSE for the combined-rating model
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error (RMSE) for combined model:", rmse)

### Recommendation Phase 2

1. User Profile Vector

In [None]:
# Gather each user’s core attributes (ID, demographics, segment) exactly once
distinct_users = indexed_df.select(
    "Customer_ID", 
    "Age", 
    "Gender", 
    "Country", 
    "Income", 
    "Customer_Segment"
).distinct()

# Combine the selected fields into a single feature vector for downstream modeling
user_profile_vector = VectorAssembler(
    inputCols=[
        "Customer_ID",
        "Age", 
        "Gender", 
        "Country", 
        "Income", 
        "Customer_Segment"
    ],
    outputCol="user_profile_vector"
).transform(distinct_users)

# Preview the user IDs alongside their assembled feature vectors
user_profile_vector.select(
    "Customer_ID", 
    "user_profile_vector"
).show(5, truncate=False)


2. Product Vector

In [None]:
# Assemble product-related fields into a single feature vector for downstream modeling
product_vector = VectorAssembler(
    inputCols=[
        "Transaction_ID",    # unique transaction identifier
        "Product_Category",  # categorical index for product category
        "Product_Brand",     # categorical index for product brand
        "Product_Type",      # categorical index for product type
        "Products_Index"     # numerical index for the actual product
    ],
    outputCol="product_vector"
).transform(indexed_df)

# Show a few products alongside their assembled feature vectors
product_vector.select("Products", "product_vector").show(5, truncate=False)

3. Transaction Vector  

In [None]:
# Assemble transaction-level features—including time and combined rating—into one vector
transaction_vector = VectorAssembler(
    inputCols=[
        "Transaction_ID",    # unique transaction identifier
        "Customer_ID",       # customer identifier
        "Month_Index",       # numerical month value
        "Year",              # transaction year
        "Combined_Rating"    # blended rating metric
    ],
    outputCol="transaction_vector"
).transform(indexed_df)

# Preview a sample of transaction vectors for validation
transaction_vector.select("Transaction_ID", "transaction_vector").show(5, truncate=False)

In [None]:
# Scale user profile vectors to [0,1] range for uniform weighting
scaler_user = MinMaxScaler(
    inputCol="user_profile_vector", 
    outputCol="user_profile_scaled"
)
scaler_model_user = scaler_user.fit(user_profile_vector)
user_profile_scaled = scaler_model_user.transform(user_profile_vector)
user_profile_scaled.select("Customer_ID", "user_profile_scaled").show(5, truncate=False)

# Scale product feature vectors to [0,1] to align with other scaled inputs
scaler_product = MinMaxScaler(
    inputCol="product_vector", 
    outputCol="product_vector_scaled"
)
scaler_model_product = scaler_product.fit(product_vector)
product_scaled = scaler_model_product.transform(product_vector)
product_scaled.select("Products", "product_vector_scaled").show(5, truncate=False)

# Scale transaction vectors (including combined rating and time features) to [0,1]
scaler_transaction = MinMaxScaler(
    inputCol="transaction_vector", 
    outputCol="transaction_vector_scaled"
)
scaler_model_transaction = scaler_transaction.fit(transaction_vector)
transaction_vector_scaled = scaler_model_transaction.transform(transaction_vector)
transaction_vector_scaled.select("Transaction_ID", "transaction_vector_scaled").show(5, truncate=False)


In [None]:
# Select only the scaled user profiles
user_profile_scaled = user_profile_scaled.select(
    "Customer_ID", 
    "user_profile_scaled"
)

# Keep relevant product features including its scaled vector
product_scaled = product_scaled.select(
    "Transaction_ID", 
    "Products", 
    "product_vector_scaled"
)

# Select transaction features plus combined rating for later filtering/evaluation
transaction_scaled = transaction_vector_scaled.select(
    "Transaction_ID", 
    "Customer_ID", 
    "transaction_vector_scaled", 
    "Combined_Rating"
)

# Join transaction records with their corresponding user profiles on Customer_ID
user_transaction_joined = transaction_scaled.join(
    user_profile_scaled, 
    on="Customer_ID", 
    how="inner"
)

# Next, bring in product info by joining on Transaction_ID
hybrid_joined = user_transaction_joined.join(
    product_scaled, 
    on="Transaction_ID", 
    how="inner"
)

# Assemble a single "hybrid_vector" from user profile, transaction, and product vectors
hybrid_assembler = VectorAssembler(
    inputCols=[
        "user_profile_scaled", 
        "transaction_vector_scaled", 
        "product_vector_scaled"
    ],
    outputCol="hybrid_vector"
)
hybrid_df = hybrid_assembler.transform(hybrid_joined)

# Preview hybrid feature vectors alongside customer, product, and rating
hybrid_df.select(
    "Customer_ID", 
    "Products", 
    "hybrid_vector", 
    "Combined_Rating"
).show(5, truncate=False)


In [None]:
# Extract unique customer hybrid vectors to avoid duplicates
user_vectors = hybrid_df.select("Customer_ID", "hybrid_vector").distinct()

# Group all hybrid vectors by product to aggregate user interactions per product
vector_list_df = hybrid_df.groupBy("Products").agg(
    F.first("Transaction_ID").alias("Transaction_ID"),
    F.first("Customer_ID").alias("Customer_ID"),
    F.collect_list("hybrid_vector").alias("hybrid_vector_list")
)

# UDF to compute the element-wise average of a list of Spark vectors
def average_vectors(vectors):
    if not vectors:
        return None
    arrays = [np.array(v.toArray()) for v in vectors]
    return np.mean(arrays, axis=0).tolist()

average_vectors_udf = F.udf(average_vectors, ArrayType(DoubleType()))

# Apply the averaging UDF to get a raw array representation of each product’s vector
vector_list_df = vector_list_df.withColumn(
    "avg_vector_array", 
    average_vectors_udf("hybrid_vector_list")
)

# UDF to convert a Python list back into a Spark DenseVector
def array_to_vector(arr):
    if arr is None:
        return None
    return DenseVector(arr)

array_to_vector_udf = F.udf(array_to_vector, VectorUDT())

# Turn the averaged array into a DenseVector and select final product embeddings
product_vectors = vector_list_df.withColumn(
    "product_vector", 
    array_to_vector_udf("avg_vector_array")
).select("Products", "product_vector")

# Show the resulting product vectors
product_vectors.show(truncate=False)


In [None]:
# Define a helper to compute cosine similarity between two numeric vectors:
# returns None if either vector is missing, 0.0 if a vector has zero magnitude, 
# otherwise the normalized dot product.
def cosine_similarity(vec1, vec2):
    if vec1 is None or vec2 is None:
        return None
    # Convert inputs (e.g., Spark vectors) to NumPy arrays for linear algebra
    vec1 = np.array(vec1)
    vec2 = np.array(vec2)
    # Compute L2 norms; guard against division by zero
    norm1 = np.linalg.norm(vec1)
    norm2 = np.linalg.norm(vec2)
    if norm1 == 0 or norm2 == 0:
        return 0.0
    # Return the cosine similarity: dot(vec1, vec2) / (||vec1|| * ||vec2||)
    return float(np.dot(vec1, vec2) / (norm1 * norm2))

# Register the Python function as a Spark UDF that returns a DoubleType
cosine_sim_udf = udf(cosine_similarity, DoubleType())


In [None]:
# Generate all possible user–product pairs for scoring
user_product_scores = user_vectors.crossJoin(product_vectors)

# Compute cosine similarity between each user’s hybrid vector and each product vector
user_product_scores = user_product_scores.withColumn(
    "similarity",
    cosine_sim_udf("hybrid_vector", "product_vector")
)

# Define a window to rank products per user by descending similarity
windowSpec = Window.partitionBy("Customer_ID").orderBy(F.desc("similarity"))

# Assign ranks and keep only the top 5 recommendations per user
top_n_recommendations = (
    user_product_scores
    .withColumn("rank", row_number().over(windowSpec))
    .filter("rank <= 5")
)

# Show user-wise top-5 products with their similarity scores
top_n_recommendations.select("Customer_ID", "Products", "similarity").show(truncate=False)


In [None]:
# Cluster users into 5 segments based on their hybrid behavior vectors
kmeans = KMeans(
    k=5, 
    seed=42, 
    featuresCol="hybrid_vector", 
    predictionCol="cluster"
)
kmeans_model = kmeans.fit(user_vectors)

# Attach cluster labels to each user
user_clusters = kmeans_model.transform(user_vectors)
user_clusters.select("Customer_ID", "cluster").show(10)

# Merge cluster assignments back into the full hybrid DataFrame
clustered_data = hybrid_df.join(
    user_clusters.select("Customer_ID", "cluster"),
    on="Customer_ID"
)

# Within each cluster, find the products with highest average Combined_Rating
popular_by_cluster = (
    clustered_data
    .groupBy("cluster", "Products")
    .agg(F.avg("Combined_Rating").alias("avg_rating"))
    .orderBy("cluster", F.desc("avg_rating"))
)

# Display the top products per user cluster
popular_by_cluster.show(10, truncate=False)

Time Based Recommendations

In [48]:
# 1) Build user purchase sequences
window_spec = Window.partitionBy("Customer_ID") \
                    .orderBy("Year", "Month_Index", "Transaction_ID")

user_sequences = (
    indexed_df
      .withColumn("ordered_products", F.collect_list("Products").over(window_spec))
      .groupBy("Customer_ID")
      .agg(F.max("ordered_products").alias("product_sequence"))
      .withColumn("seq_length", F.size("product_sequence"))
      .filter(F.col("seq_length") >= 2)
)

# 2) Prepare for Word2Vec
user_sequences = user_sequences.withColumn(
    "product_sequence_str",
    F.expr("transform(product_sequence, x -> cast(x as string))")
)

In [None]:
# 3) Train Word2Vec
w2v_model = Word2Vec(
    vectorSize=50, minCount=1,
    inputCol="product_sequence_str",
    outputCol="product_embedding"
).fit(user_sequences)

product_embeddings = w2v_model.getVectors()  # [word, vector]

# 4) Build product→category map
prod_cat_map = (
    indexed_df
      .select("Products", "Product_Category")
      .distinct()
      .rdd
      .map(lambda r: (r["Products"], r["Product_Category"]))
      .collectAsMap()
)

In [None]:
# 5) Leave‑one‑out evaluation counting same‑category hits
top_k = 5
eval_results = []

for row in user_sequences.select("Customer_ID","product_sequence").collect():
    user_id     = row["Customer_ID"]
    seq         = row["product_sequence"]
    train_seq   = seq[:-1]
    actual_item = seq[-1]
    actual_cat  = prod_cat_map[actual_item]

    # 5.1) Avg embedding
    vecs = (
        product_embeddings
          .filter(F.col("word").isin([str(p) for p in train_seq]))
          .select("vector")
          .rdd.map(lambda r: np.array(r[0].toArray()))
          .collect()
    )
    avg_vec = Vectors.dense(np.mean(vecs, axis=0).tolist())

    # 5.2) Top‑K recommendations
    sims = (
        w2v_model
          .findSynonyms(avg_vec, top_k)
          .withColumnRenamed("word", "predicted")
          .select("predicted","similarity")
          .toPandas()
    )
    preds = sims["predicted"].tolist()   # still strings

    # 5.3) Count how many share the actual category
    pred_cats = [prod_cat_map[p] for p in preds]
    cat_hits  = len([c for c in pred_cats if c == actual_cat])

    eval_results.append((user_id, actual_item, actual_cat, preds, cat_hits))


In [None]:
# 6) Convert to DataFrame & compute metrics
eval_df = spark.createDataFrame(
    eval_results,
    schema=["Customer_ID","actual_item","actual_cat","preds","cat_hits"]
)

avg_cat_hits = eval_df.agg(F.avg("cat_hits")).first()[0]
cat_hit_rate = avg_cat_hits / top_k

print(f"Avg. category hits in top-{top_k}: {avg_cat_hits:.2f}")
print(f"Category hit rate@{top_k}: {cat_hit_rate:.2%}")

# 7) Inspect examples
eval_df.show(10, truncate=False)