# Deployment

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf, desc, broadcast, concat, lit, to_timestamp, concat_ws, row_number, from_json, collect_list
from pyspark.sql.window import Window
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, Normalizer, PCA, IDFModel
from pyspark.sql.types import StringType, StructType, StructField
import numpy as np
import os

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.2.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0 pyspark-shell'

# Intialize Spark Session
# Set Driver & Executor Memory
spark = SparkSession.builder \
    .appName("Content-basedRecommendationSystem") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.extraJavaOptions", "-Dfile.encoding=UTF-8") \
    .getOrCreate()

In [None]:
from google.colab import drive, files
drive.mount("/content/drive")

Mounted at /content/drive


In [None]:
uploaded = files.upload()

Saving transactions_data.csv to transactions_data.csv


#### Load Dataset and Select Relevant Columns

In [None]:
# Load transactions data
transactions_data = spark.read.csv("transactions_data.csv", header=True)

# Select distinct products and build a feature combining product_name, category and product_description
df = transactions_data\
    .select(
        col("product_name"),
        col("product_description"),
        col("category"),
        concat(
            col("product_name"), lit(" "),
            col("product_description"), lit(" "),
            col("category")).alias("full_product_description"))\
    .distinct()

# Show data
df.show(10, truncate=0)

+-------------+-------------------------------------------------+--------------------+----------------------------------------------------------------------+
|product_name |product_description                              |category            |full_product_description                                              |
+-------------+-------------------------------------------------+--------------------+----------------------------------------------------------------------+
|Paper Towels |Ultra-absorbent paper towels.                    |Household Essentials|Paper Towels Ultra-absorbent paper towels. Household Essentials       |
|Apples       |Fresh organic apples, rich in fiber and vitamins.|Fresh Produce       |Apples Fresh organic apples, rich in fiber and vitamins. Fresh Produce|
|Almonds      |Roasted almonds, a healthy snack.                |Snacks & Sweets     |Almonds Roasted almonds, a healthy snack. Snacks & Sweets             |
|Milk         |Whole organic milk, high in calcium a

#### Optimizing TF-IDF with caching

In [None]:
# Tokenizing product names
tokenizer = Tokenizer(inputCol="full_product_description", outputCol="words")
wordsData = tokenizer.transform(df)

# Removing common words
remover = StopWordsRemover(inputCol="words", outputCol="filtered")
filteredData = remover.transform(wordsData)

# Applying TF-IDF
hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=1000)
featurizedData = hashingTF.transform(filteredData)

idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData).cache() # <== Optimize query with caching

rescaledData.show(10, truncate=0)

+-------------+-------------------------------------------------+--------------------+----------------------------------------------------------------------+-----------------------------------------------------------------------------------+----------------------------------------------------------------------------+-----------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|product_name |product_description                              |category            |full_product_description                                              |words                                                                              |filtered                                                                    |rawFeatures                         

#### Using PCA to Reduce Features

In [None]:
# Applying pca for dimentionaliy reduction
pca = PCA(k=5, inputCol="features", outputCol="pcaFeatures")
pcaModel = pca.fit(rescaledData)
pcaData = pcaModel.transform(rescaledData)
pcaData.select("product_name", "full_product_description", "pcaFeatures").show(10, truncate=0)

+-------------+----------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------+
|product_name |full_product_description                                              |pcaFeatures                                                                                          |
+-------------+----------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------+
|Paper Towels |Paper Towels Ultra-absorbent paper towels. Household Essentials       |[0.4227433601435146,-2.8527575735066795,0.7815012058048423,2.8513577127913905,0.5166529130973001]    |
|Apples       |Apples Fresh organic apples, rich in fiber and vitamins. Fresh Produce|[-0.25785556886651245,-0.31818330959050894,0.6848604781956902,-1.2392067275061827,0.4111066758039602]|
|Almonds      |Almonds Roasted almonds, a healthy snack

#### Calcualte Cosine Similarity

In [None]:
# Normalizing feature vectors
normalizer = Normalizer(inputCol="pcaFeatures", outputCol="normFeatures", p=2.0)
normalizedData = normalizer.transform(pcaData)


# UDF to compute cosine similarity
def cosine_similarity(vec1, vec2):
    vec1 = np.array(vec1)
    vec2 = np.array(vec2)
    return float(np.dot(vec1, vec2) / (np.linalg.norm(vec1) * np.linalg.norm(vec2)))

cosine_similarity_udf = udf(cosine_similarity)


# Compute similarity
product_similarity = broadcast(normalizedData).alias("a").join(
    normalizedData.alias("b"), col("a.full_product_description") != col("b.full_product_description")
    ).withColumn(
        "similarity", cosine_similarity_udf(col("a.normFeatures"), col("b.normFeatures"))
    )

product_similarity\
    .select("a.product_name", col("b.product_name").alias("Recommended Product"), "similarity")\
    .filter(col("similarity")>0.1)\
    .orderBy(desc("similarity"))\
    .show(10, truncate=0)

+-----------------+-------------------+------------------+
|product_name     |Recommended Product|similarity        |
+-----------------+-------------------+------------------+
|Hand Sanitizer   |Paper Towels       |0.9995161918791986|
|Paper Towels     |Hand Sanitizer     |0.9995161918791986|
|Frozen Dumplings |Frozen Berries     |0.999463674735494 |
|Frozen Berries   |Frozen Dumplings   |0.999463674735494 |
|Frozen Dumplings |Frozen Vegetables  |0.9982418587964978|
|Frozen Vegetables|Frozen Dumplings   |0.9982418587964978|
|Paper Towels     |Dish Soap          |0.9982229337381391|
|Dish Soap        |Paper Towels       |0.9982229337381391|
|Frozen Vegetables|Frozen Berries     |0.9979322566596058|
|Frozen Berries   |Frozen Vegetables  |0.9979322566596058|
+-----------------+-------------------+------------------+
only showing top 10 rows



#### Make Recommendations

In [None]:
def get_similar_products(product_name, top_n=5):

    # Step 1: Get the product's full description
    product_record = df.filter(col("product_name") == product_name).select(
        concat(col("product_name"), lit(" "), col("product_description"), lit(" "), col("category")).alias("full_product_description")
    ).first()

    if not product_record:
        print(f"Product {product_name} not found.")
        return []

    full_product_description = product_record["full_product_description"]

    # Step 2: Get similar products
    return product_similarity.filter(
        (col("a.full_product_description") == full_product_description) &
        (col("similarity") > 0.001)
    ).select(
        col("b.product_name").alias("Recommended Products"),
        col("similarity").alias("Product Similarity")
    ).orderBy(desc("Product Similarity")).limit(top_n)




# Example: Get similar products for a specific product
product_name = "Paper Towels"
print(f"\nRECOMMENDED PRODUCTS FOR {product_name}: \n")
similar_items = get_similar_products(product_name, 10)

# Show the result
if similar_items:
    similar_items.show(truncate=0)


RECOMMENDED PRODUCTS FOR Paper Towels: 

+--------------------+-------------------+
|Recommended Products|Product Similarity |
+--------------------+-------------------+
|Hand Sanitizer      |0.9995161918791986 |
|Dish Soap           |0.9982229337381391 |
|Sponges             |0.9957304697616048 |
|Toilet Paper        |0.9957299834964556 |
|Laundry Detergent   |0.987398455551052  |
|Skincare Set        |0.39804739531150873|
|Avocados            |0.386317627580931  |
|Sugar               |0.30563929934847534|
|Peanut Butter       |0.209196460321398  |
|Baguette            |0.19746187715196314|
+--------------------+-------------------+



#### Get Customers' Latest Purchase

In [None]:
# Combine date and time into a timestamp
df_with_ts = transactions_data.withColumn(
    "timeStamp",
    to_timestamp(concat_ws(" ", col("date"), col("time")), "yyyy-MM-dd HH:mm:ss")
)

# Define window by customer_id ordered by timestamp descending
window_spec = Window.partitionBy("customer_id").orderBy(col("timeStamp").desc())

# Add row number to identify most recent transaction
df_ranked = df_with_ts.withColumn("row_num", row_number().over(window_spec))

# Filter only the latest transaction per customer
latest_products = df_ranked.filter(col("row_num") == 1).select(
    "customer_id", "product_name", "category", "product_description"
)

#### Optimize Customers' Latest Purchase DataFrame with ".repartition()"

In [None]:
# Repartition the latest_products DataFrame by customer_id to optimize parallel processing
customer_latest_purchase = latest_products.repartition("customer_id")

# Show result
customer_latest_purchase.show(5, truncate=False)

+-----------+-------------+--------------+---------------------------------------------------------+
|customer_id|product_name |category      |product_description                                      |
+-----------+-------------+--------------+---------------------------------------------------------+
|C1001      |Energy Drinks|Beverages     |Caffeinated energy drinks for an energy boost.           |
|C1002      |Bagels       |Bakery        |Classic New York-style bagels, perfect with cream cheese.|
|C1003      |Bananas      |Fresh Produce |Ripe bananas, perfect for snacking or smoothies.         |
|C1004      |Peanut Butter|Pantry Staples|Creamy peanut butter, protein-packed.                    |
|C1005      |Croissants   |Bakery        |Buttery and flaky croissants, fresh from the bakery.     |
+-----------+-------------+--------------+---------------------------------------------------------+
only showing top 5 rows



#### Model Persistence for Production Deployment

In [None]:
# Save the fitted TF-IDF model
idfModel.write().overwrite().save("model/tfidf")

In [None]:
# Load TF-IDF model
loaded_model = IDFModel.load("model/tfidf")

In [None]:
# Use loaded model to score data
loaded_model.transform(featurizedData.limit(5)).show()

+------------+--------------------+--------------------+------------------------+--------------------+--------------------+--------------------+--------------------+
|product_name| product_description|            category|full_product_description|               words|            filtered|         rawFeatures|            features|
+------------+--------------------+--------------------+------------------------+--------------------+--------------------+--------------------+--------------------+
|Paper Towels|Ultra-absorbent p...|Household Essentials|    Paper Towels Ultr...|[paper, towels, u...|[paper, towels, u...|(1000,[24,149,690...|(1000,[24,149,690...|
|      Apples|Fresh organic app...|       Fresh Produce|    Apples Fresh orga...|[apples, fresh, o...|[apples, fresh, o...|(1000,[63,74,212,...|(1000,[63,74,212,...|
|     Almonds|Roasted almonds, ...|     Snacks & Sweets|    Almonds Roasted a...|[almonds, roasted...|[almonds, roasted...|(1000,[64,166,174...|(1000,[64,166,174...|
|   

#### Batch Recommendation Deployment

In [None]:
# Define a window to group by each product (a.product_name) and order by similarity descending
window_spec = Window.partitionBy("a.product_name").orderBy(col("similarity").desc())

# Add row numbers to rank recommendations within each product group
# Filter only the top recommendation per product (row_num == 1)
# Select and rename columns for clarity: original product and its top recommended product
product_recommendations = (
    product_similarity
    .withColumn("row_num", row_number().over(window_spec))
    .filter(col("row_num") == 1)
    .select(
        col("a.product_name").alias("product_name"),
        col("b.product_name").alias("recommended_product"),
        "similarity"
    )
)

# Generate recommendations in batch
recommendations = customer_latest_purchase.join(broadcast(product_recommendations), on='product_name')\
            .select(col("customer_id"), col("product_name").alias("last_purchased_product"), col("recommended_product"), col("similarity"))

# Write output to storage
recommendations.write.mode("overwrite").parquet("recommendations/batch/")

In [None]:
# Write batch results to storage (Google Drive)
recommendations.write.mode("overwrite").parquet("/content/drive/MyDrive/recommendations/batch/")

In [None]:
recommendations.show(5, truncate=0)

+-----------+----------------------+-------------------+------------------+
|customer_id|last_purchased_product|recommended_product|similarity        |
+-----------+----------------------+-------------------+------------------+
|C1001      |Energy Drinks         |Trail Mix          |0.9316971324338094|
|C1002      |Bagels                |Tomatoes           |0.8879802136652006|
|C1003      |Bananas               |Apples             |0.9206631795760941|
|C1004      |Peanut Butter         |Sugar              |0.9550014965147042|
|C1005      |Croissants            |Peanut Butter      |0.9176584437509188|
+-----------+----------------------+-------------------+------------------+
only showing top 5 rows



#### Real-time Streaming Deployment

In [None]:
# run this in command line to create kafka top

# docker exec -it demo-kafka-1 kafka-topics --create \
#   --topic purchase \
#   --bootstrap-server localhost:9092 \
#   --partitions 1 \
#   --replication-factor 1


schema = StructType([
    StructField("customer_id", StringType()),
    StructField("product_name", StringType()),
    StructField("timestamp", StringType())
])

# Subscribe to the customer purchase topic
purchase_df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "purchase") \
  .load()


# run this command in the command line to push events to the kafka topic

# echo '{"customer_id": "c1001", "product_name": "Paper Towels", "timestamp": "2025-04-05T12:00:00"}' | \
# docker exec -i demo-kafka-1 kafka-console-producer \
#   --broker-list localhost:9092 \
#   --topic purchase

purchase_events = purchase_df.selectExpr("CAST(value AS STRING)") \
    .select(from_json(col("value"), schema).alias("data")).select("data.*")

# Join with product recommendations and update recommendations
updated_recommendations = purchase_events.join(broadcast(product_recommendations), on='product_name') \
    .groupBy("customer_id") \
    .agg(collect_list("recommended_product").alias("recommended_products"))


In [None]:
# Output to sink
query = updated_recommendations.writeStream \
    .format("console") \
    .outputMode("complete") \
    .option("truncate", False) \
    .start()
query.awaitTermination()

In [None]:
# Finally, stop Spark Session
spark.stop()