In [1]:
!pip install pyspark
!pip install kaggle



# Market Basket Analysis with Spark (FP-Growth)
**Project 2:** Market-Basket Analysis

**Author:** Precious Prince

**Course:** Algorithms for Massive Data, Master's in Data Science for Economics, University of Milan

**Goal:** Recommend books to users based on co-occurrence patterns and rank those recommendations using average user ratings.


### Libraries

In [2]:
import os
import pandas as pd
import matplotlib.pyplot as plt
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.ml.fpm import FPGrowth

### Kaggle API Setup and Dataset Download

In [3]:
os.environ['KAGGLE_USERNAME'] = "preciousprince33"
os.environ['KAGGLE_KEY'] = "47e70b1e0bf52f6f0f3b6a3ac4bf04f9"
!kaggle datasets download -d mohamedbakhet/amazon-books-reviews
!unzip amazon-books-reviews.zip -d amazon_books_reviews

Dataset URL: https://www.kaggle.com/datasets/mohamedbakhet/amazon-books-reviews
License(s): CC0-1.0
Downloading amazon-books-reviews.zip to /content
 93% 0.99G/1.06G [00:06<00:01, 39.6MB/s]
100% 1.06G/1.06G [00:06<00:00, 169MB/s] 
Archive:  amazon-books-reviews.zip
  inflating: amazon_books_reviews/Books_rating.csv  
  inflating: amazon_books_reviews/books_data.csv  


### Initialize Spark Session

In [4]:
spark = SparkSession.builder.appName("MarketBasketAnalysis").getOrCreate()

# Load dataset files
ratings_df = spark.read.csv("amazon_books_reviews/Books_rating.csv", header=True, inferSchema=True, quote='"', escape='"', multiLine=True)
book_df = spark.read.csv("amazon_books_reviews/books_data.csv", header=True, inferSchema=True)

### Data Cleaning and Sampling

In [5]:
# Sampling for quicker local testing; the global variable can control this.

sample_df = ratings_df.sample(withReplacement=False, fraction=0.01, seed=42)
sample_df = sample_df.dropna(subset=['User_id', 'Id', 'Title'])

### Basket Creation (User → List of Books Reviewed (Amazon Standard Identification Number (ASIN)))

In [6]:
# Each user represents a basket, and the items are books they reviewed.
# Filter only users who reviewed at least 2 books.
baskets_id_df = sample_df.groupBy("User_id").agg(F.collect_list("Id").alias("basket"))
baskets_id_df = baskets_id_df.withColumn("basket", F.array_distinct(F.col("basket")))
baskets_id_df = baskets_id_df.filter(F.size(F.col("basket")) >= 2)

### FP-Growth Model for Frequent Itemset Mining

In [None]:
MIN_SUPPORT = 0.001  # Minimum fraction of baskets containing the itemset(Support measures how frequently an item or itemset appears in the dataset.)
MIN_CONFIDENCE = 0.5  # Minimum confidence for association rules (Confidence measures how strongly one item implies another.)

fp_growth = FPGrowth(itemsCol="basket", minSupport=MIN_SUPPORT, minConfidence=MIN_CONFIDENCE)
model = fp_growth.fit(baskets_id_df)
association_rules = model.associationRules

I used minSupport = 0.001 to focus on books co-reviewed by at least 0.1% of users, ensuring that only meaningful and frequent co-occurrences were included.

minConfidence = 0.5 was chosen to retain only strong associations, where the consequent appeared in at least half of the baskets containing the antecedent.

In [None]:
# Count total number of association rules generated by FP-Growth
num_rules = association_rules.count()
print(f"Total number of association rules: {num_rules}")

📘 Total number of association rules: 15


In [None]:
unique_antecedents = association_rules.select("antecedent").distinct().count()
print(f"Unique antecedent itemsets: {unique_antecedents:,}")

### Add Titles to Association Rules for Readability

In [None]:
# Many books share similar titles but have different ASINs (paperback, hardcover, different sellers, etc.).
# If only the title is exactly the same, then only it would be rejected.
# To avoid recommending the same title again, we'll use both ASIN and title checks.

# Map ASIN (Id) to Title
title_mapping = sample_df.select("Id", "Title").dropna().distinct()

# Join titles to association rules for better readability
rules_readable = (
    association_rules
    # extract ASINs from the item arrays
    .withColumn("antecedent_asin", F.element_at(F.col("antecedent"), 1))
    .withColumn("consequent_asin", F.element_at(F.col("consequent"), 1))
    # join titles
    .join(
        title_mapping.withColumnRenamed("Id", "a_Id").withColumnRenamed("Title", "antecedent_title"),
        F.col("antecedent_asin") == F.col("a_Id"),
        "left"
    )
    .join(
        title_mapping.withColumnRenamed("Id", "c_Id").withColumnRenamed("Title", "consequent_title"),
        F.col("consequent_asin") == F.col("c_Id"),
        "left"
    )
    # remove duplicates where antecedent and consequent refer to same title
    .filter(F.lower(F.col("antecedent_title")) != F.lower(F.col("consequent_title")))
    # keep relevant columns
    .select(
        "antecedent_asin",
        "antecedent_title",
        "consequent_asin",
        "consequent_title",
        "confidence",
        "support"
    )
)

print(" Readable association rules created:", rules_readable.count())
rules_readable.show(10, truncate=False)

Since many books share the same title but differ in ASIN (e.g., paperback vs hardcover), I merged the FP-Growth results with the title mapping and filtered identical-title pairs to avoid redundant recommendations.
This cleaning step ensured that recommendations represent genuinely distinct books rather than format duplicates.

### Compute Average Rating per Book

In [None]:
# We'll use this to rank recommendations by quality.
# Compute average rating for each book (from sampled data)
avg_rating_df = (
    sample_df
    .groupBy("Id")
    .agg(F.avg("review/score").alias("avg_rating"))
)

# Join average rating on the consequent (recommended) books
rules_with_rating = (
    rules_readable
    .join(
        avg_rating_df.withColumnRenamed("Id", "c_Id"),
        F.col("consequent_asin") == F.col("c_Id"),
        "left"
    )
    .drop("c_Id")
)

print("✅ Rules enriched with average ratings.")
rules_with_rating.select("antecedent_title", "consequent_title", "confidence", "avg_rating").show(10, truncate=False)

To prioritize higher-quality recommendations, each consequent book was enriched with its average user rating. This allows sorting recommendations by a combined relevance-and-quality score.

In [None]:
print(f"Total number of rules after same titles and ASINs: {rules_with_rating.count()}")

📘 Total number of rules after same titles and ASINs: 12


### Generate Personalized Recommendations per User

In [None]:
user_suggestions = (
    baskets_id_df.alias("u")
    # Join rules whose antecedent is present in the user's basket
    .join(
        rules_with_rating.alias("r"),
        F.array_contains(F.col("u.basket"), F.col("r.antecedent_asin")),
        "inner"
    )
    .select(
        F.col("u.User_id").alias("user_id"),
        F.col("r.antecedent_asin"),
        F.col("r.antecedent_title"),
        F.col("r.consequent_asin"),
        F.col("r.consequent_title"),
        F.col("r.confidence"),
        F.col("r.support"),
        F.col("r.avg_rating"),
        F.col("u.basket")
    )
)

print("✅ User-level suggestions created:", user_suggestions.count())
user_suggestions.show(5, truncate=False)

For each user basket, the FP-Growth rules whose antecedent books appear in the basket were joined to generate personalized recommendations. The resulting table lists, for each user, the recommended book titles, their confidence, and average rating scores.

### Filter Out Already Read or Duplicate Books

In [None]:
#  Remove books the user already reviewed
user_suggestions_filtered = user_suggestions.filter(
    ~F.array_contains(F.col("basket"), F.col("consequent_asin"))
)

#  Remove duplicates: same user & same title (different ASIN)
user_suggestions_unique = (
    user_suggestions_filtered
    .dropDuplicates(["user_id", "consequent_title"])
)

print("Cleaned user recommendations:", user_suggestions_unique.count())
user_suggestions_unique.show(5, truncate=False)

After generating preliminary recommendations, two filtering steps were applied:
- Books already present in the user’s basket were excluded to avoid redundancy;
- Duplicates with identical titles but different ASINs were removed.

This ensured that final recommendations were novel and unique for each user.

### Sort Recommendations by Average Rating (Descending)

In [None]:
# Rank recommendations by average rating (descending)
windowSpec = Window.partitionBy("user_id").orderBy(F.desc("avg_rating"))

user_recommendations_sorted = (
    user_suggestions_unique
    .withColumn("rank", F.row_number().over(windowSpec))
    .groupBy("user_id")
    .agg(
        F.collect_list(
            F.struct(
                F.col("rank"),
                F.col("consequent_asin"),
                F.col("consequent_title"),
                F.col("avg_rating"),
                F.col("confidence"),
                F.col("support")
            )
        ).alias("recommended_books_sorted")
    )
)

# Display sample output for a few users
print("Top book recommendations generated per user.")
user_recommendations_sorted.show(10, truncate=False)

+--------------+-----------------------------------------------------------------------------------------------------------------------------------+
|user_id       |recommended_books_sorted                                                                                                           |
+--------------+-----------------------------------------------------------------------------------------------------------------------------------+
|A1D2C0WDCSHUWZ|[{1, 1581730470, Emma, 4.428571428571429, 0.5, 0.0014903129657228018}]                                                             |
|A1NT7ED5TATUAM|[{1, 0520050894, A Connecticut Yankee in King Arthur's Court (Mark Twain Library), 2.3333333333333335, 0.5, 0.0014903129657228018}]|
|A201GXWKSPNE4C|[{1, 0451515242, Emma (Signet classics), 4.428571428571429, 0.6666666666666666, 0.0014903129657228018}]                            |
|A319KYEIAZ3SON|[{1, 0760700435, Sense and sensibility, 5.0, 0.5, 0.0014903129657228018}]                 

To personalize results, recommendations were ranked per user using a window function ordered by average rating. The top-ranked items were then aggregated into user-level lists. This approach ensures that each user receives distinct, high-quality book suggestions based on frequent co-review patterns and book popularity.

### How many books each user gets recommended?

In [None]:
# Count how many recommendations each user received
user_recommendation_count = (
    user_recommendations_sorted
    .select(
        "user_id",
        F.size("recommended_books_sorted").alias("num_recommendations")
    )
    .orderBy(F.desc("num_recommendations"))
)

print("Recommendation count per user:")
user_recommendation_count.show(20, truncate=False)

# Find the maximum number of recommended books for any user
max_recommendations = (
    user_recommendation_count
    .agg(F.max("num_recommendations").alias("max_recommendations"))
    .collect()[0]["max_recommendations"]
)

print(f"Maximum number of books recommended to a single user: {max_recommendations}")

+--------------+-------------------+
|user_id       |num_recommendations|
+--------------+-------------------+
|A1D2C0WDCSHUWZ|1                  |
|A1NT7ED5TATUAM|1                  |
|A201GXWKSPNE4C|1                  |
|A319KYEIAZ3SON|1                  |
|A39IY0JU5JI69G|1                  |
|A3DICRPEYWH9K5|1                  |
|A3MVAN55WKCGR7|1                  |
|A3QZCA4LTTVGAD|1                  |
|ADFEVWV4EF2IF |1                  |
|AHD101501WCN1 |1                  |
|AOUBCAH5BI4FU |1                  |
|ARFCORBCTKX1J |1                  |
|AUT5ESMCYGBCE |1                  |
+--------------+-------------------+

📚 The maximum number of books suggested to a user is: 1


### Top 10 Most Reviewed Books

In [13]:
most_reviewed_books = (
    sample_df.groupBy("Id", "Title")
    .agg(F.count("User_id").alias("num_reviews"))
    .orderBy(F.desc("num_reviews"))
)

print("\nTop 10 Most Reviewed Books:")
most_reviewed_books.show(10, truncate=False)


🔥 Top 10 Most Reviewed Books:
+----------+----------------------------------------------------------------+-----------+
|Id        |Title                                                           |num_reviews|
+----------+----------------------------------------------------------------+-----------+
|B000PC54NG|The Hobbit                                                      |50         |
|B000GQG7D2|The Hobbit                                                      |44         |
|B000IEZE3G|Harry Potter and The Sorcerer's Stone                           |43         |
|B000GQG5MA|The Hobbit; Or, There and Back Again                            |39         |
|B000NWQXBA|The Hobbit                                                      |39         |
|B000NWU3I4|The Hobbitt, or there and back again; illustrated by the author.|37         |
|B000Q032UY|The Hobbit or There and Back Again                              |37         |
|B000NDSX6C|The Hobbit                                               

### Top 10 Most Recommended Books (Appearing in rules)

In [14]:
most_recommended_books = (
    rules_with_rating.groupBy("consequent_asin", "consequent_title")
    .agg(F.count("antecedent_asin").alias("times_recommended"))
    .orderBy(F.desc("times_recommended"))
)

print("\nTop 10 Most Recommended Books:")
most_recommended_books.show(10, truncate=False)


🏆 Top 10 Most Recommended Books:
+---------------+----------------------------------------------------------------+-----------------+
|consequent_asin|consequent_title                                                |times_recommended|
+---------------+----------------------------------------------------------------+-----------------+
|0451515242     |Emma (Signet classics)                                          |2                |
|B0007EJ04G     |Leaves of Grass (The Illustrated Modern Library)                |1                |
|1581730470     |Emma                                                            |1                |
|0520050894     |A Connecticut Yankee in King Arthur's Court (Mark Twain Library)|1                |
|0613175719     |Ulysses                                                         |1                |
|B000JJVHZE     |To Kill A Mockingbird                                           |1                |
|B00087CHVU     |Leaves of grass                         

###  How Often Each Book Was Recommended (Spark FP-Growth)

In [None]:
most_recommended_books_spark = (
    user_suggestions_unique
    .groupBy("consequent_asin", "consequent_title")
    .agg(F.countDistinct("user_id").alias("num_users_recommended"))
    .orderBy(F.desc("num_users_recommended"))
)

print("Top 10 most recommended books (Spark FP-Growth):")
most_recommended_books_spark.show(10, truncate=False)


### Summary



In this project, FP-Growth was implemented to perform large-scale association rule mining on the Amazon Books Review dataset. Each user’s reviewed books were treated as a basket, enabling the discovery of frequent co-reviewed patterns. Based on these patterns, user-level book recommendations were generated while carefully avoiding duplicates caused by identical titles with different ASINs (e.g., paperback vs. hardcover). Recommendations were ranked using the average review scores of the suggested books to prioritize quality. Finally, analyses of the most-read and most-recommended books provided additional insights into user preferences and global book popularity trends.

# Prof Suggestions

Custom FP-Growth Implementation (from scratch using PySpark)

**Goal:** Replicate FP-Growth outcomes (association rules) without using Spark's FPGrowth class

### Libaries

In [71]:
from pyspark.sql import Row
from pyspark.sql.types import ArrayType, StructType, StructField, StringType

### Spark Session Setup

In [72]:
spark = SparkSession.builder.appName("Custom_FP_Growth_Implementation").getOrCreate()

### Create item frequency table

In [73]:
item_freq = (
baskets_id_df
.withColumn("item", F.explode("basket"))
.groupBy("item")
.count()
.withColumnRenamed("count", "support_count")
)


In [None]:
total_baskets = baskets_id_df.count()
min_support = 0.001 # same as used in Spark's FP-Growth
min_support_count = int(total_baskets * min_support)

frequent_items = item_freq.filter(F.col("support_count") >= 3) 
# I am not using min_support_count as its too big as per me


In [60]:
#frequent_items.show()

+----------+-------------+
|      item|support_count|
+----------+-------------+
|B000Q032UY|            6|
|B000HH4MYQ|            6|
|1901768945|            7|
|B000PGI7QI|            6|
|0786135034|            6|
|8188280046|            6|
|B000JQXNSQ|            6|
|1566190932|            7|
|B0007C10MS|            9|
|0141804459|            6|
|B000HLFD4K|            6|
|B000GQG7D2|            8|
|B000ILIJE0|            9|
|B000NDSX6C|            7|
|B000JJVHZE|            6|
|B000NWU3I4|            6|
|B000GQG5MA|            6|
|0613175719|            7|
+----------+-------------+



In [58]:
#min_support_count

6

### UDF1
Takes one user’s basket (a list of items/books that the user has bought or reviewed) and returns all unique pairs of items that occur together in that basket.

In [None]:
def generate_pairs(basket):
    """
    Generate all unique, sorted item pairs from a user's basket.
    Example:
        ['A', 'B', 'B', 'C'] = [('A','B'), ('A','C'), ('B','C')]
    """
    # Defensive checks
    if basket is None:
        return []
    
    # Remove duplicates and sort for consistency
    basket = sorted(set(basket))
    
    # Generate pair combinations
    pairs = []
    for i in range(len(basket)):
        for j in range(i + 1, len(basket)):
            pairs.append((basket[i], basket[j]))
    
    return pairs

### Generate Item Pairs from Each Basket (Custom Step)

In [None]:
# Define schema for UDF output (array of structs)
pair_schema = ArrayType(
    StructType([
        StructField("itemA", StringType(), True),
        StructField("itemB", StringType(), True)
    ])
)

# Register UDF
generate_pairs_udf = F.udf(generate_pairs, pair_schema)

# Apply UDF to each basket, explode, and extract item pairs
basket_pairs = (
    baskets_id_df
    .withColumn("pairs", generate_pairs_udf(F.col("basket")))
    .withColumn("pair", F.explode("pairs"))
    .select(
        F.col("pair.itemA").alias("itemA"),
        F.col("pair.itemB").alias("itemB")
    )
)

In [77]:
#basket_pairs.show()

+----------+----------+
|     itemA|     itemB|
+----------+----------+
|0883686422|B000KW0GVG|
|B0006ITZ4S|B000P4VYRO|
|0520062388|B000N4XSFC|
|0071426442|0844273112|
|0071426442|8432208078|
|0844273112|8432208078|
|B000MWC3FQ|B000PGI7QI|
|0786103523|0786193026|
|1586217135|1592281044|
|0020186509|B00087QGG2|
|0020186509|B000PDDG8A|
|B00087QGG2|B000PDDG8A|
|1593555563|B00086Q244|
|0195132653|B00086Q244|
|0195132653|B000HJNEYS|
|B00086Q244|B000HJNEYS|
|1877733075|1887010017|
|1404365656|B000KS5WG4|
|0670171913|B0006F2EZS|
|0520219112|1400060036|
+----------+----------+
only showing top 20 rows



### Compute support for each pair

In [78]:
pair_support = (basket_pairs.groupBy("itemA", "itemB").count().withColumnRenamed("count", "pair_support_count"))

After generating item pairs for each user basket, the frequency of each unique pair was computed. The resulting support counts represent the number of baskets in which both items co-occur, forming the foundation for association rule generation.

In [79]:
# Filter by same min support
frequent_pairs = pair_support.filter(F.col("pair_support_count") >= 2)

In [65]:
#pair_support.orderBy(F.desc("pair_support_count")).show(20, truncate=False)


+----------+----------+------------------+
|itemA     |itemB     |pair_support_count|
+----------+----------+------------------+
|B000ILIJE0|B000NWQXBA|3                 |
|B0007EJ04G|B00087CHVU|2                 |
|B000GRMG5O|B000MS82OQ|2                 |
|0451518845|0582528259|2                 |
|B000BO2D6Y|B000O0AH22|2                 |
|B000JJVHZE|B000K7WNQW|2                 |
|0613175719|B000NQ9QF6|2                 |
|0520050894|B00086Q4RO|2                 |
|0451515242|0760700435|2                 |
|B000GQG7D2|B000Q032UY|2                 |
|B000GQG7D2|B000NWU3I4|2                 |
|1901768945|B000GDLGSG|2                 |
|0451515242|1581730470|2                 |
|B000GQG5MA|B000NWU3I4|2                 |
|0451518845|B000BO2D6Y|2                 |
|0533139759|B000MKYL0S|1                 |
|0872864243|1596911972|1                 |
|0879307455|0940352168|1                 |
|0970648219|B000OTS88I|1                 |
|0976251604|B000OTS88I|1                 |
+----------

###  Generate Association Rules (A → B) with Support, Confidence, Lift

In [None]:
rules = (
    frequent_pairs
    # Join singleton supports for both items
    .join(
        frequent_items
        .withColumnRenamed("item", "itemA_support")
        .withColumnRenamed("support_count", "support_A"),
        F.col("itemA") == F.col("itemA_support"),
        "left"
    )
    .join(
        frequent_items
        .withColumnRenamed("item", "itemB_support")
        .withColumnRenamed("support_count", "support_B"),
        F.col("itemB") == F.col("itemB_support"),
        "left"
    )
    # Compute metrics
    .withColumn("support", F.col("pair_support_count") / F.lit(total_baskets))
    .withColumn("confidence", F.col("pair_support_count") / F.col("support_A"))
    .withColumn(
        "lift",
        (F.col("pair_support_count") / F.lit(total_baskets))
        / ((F.col("support_A") / F.lit(total_baskets)) * (F.col("support_B") / F.lit(total_baskets)))
    )
    # Keep clean columns
    .select("itemA", "itemB", "support", "confidence", "lift")
)



In [None]:
print("Association rules generated:", rules.count())
rules.orderBy(F.desc("confidence")).show(10, truncate=False)

+----------+----------+--------------------+------------------+------------------+
|     itemA|     itemB|             support|        confidence|              lift|
+----------+----------+--------------------+------------------+------------------+
|B000GQG7D2|B000Q032UY|0.001490312965722...|              0.25|55.916666666666664|
|B000GQG7D2|B000NWU3I4|0.001490312965722...|              0.25|55.916666666666664|
|B000GQG5MA|B000NWU3I4|0.001490312965722...|0.3333333333333333| 74.55555555555556|
+----------+----------+--------------------+------------------+------------------+



Association rules were derived by joining frequent pairs with single-item supports to compute key metrics:
- Support measures the relative frequency of co-occurrence.
- Confidence measures the conditional probability of observing item B given item A.
- Lift quantifies how much more likely items are to appear together than if they were independent.

Only rules with confidence ≥ 0.5 were retained for analysis.

### Filter rules by confidence threshold (like minConfidence)

In [81]:
min_confidence = 0.5 #same
rules_filtered = rules.filter(F.col("confidence") >= min_confidence)

A confidence threshold of 0.5, consistent with the Spark FP-Growth experiment, was applied to retain only strong associations.

This filtering step ensures that the remaining rules represent meaningful co-occurrence patterns rather than random coincidences.

In [None]:
print(f"Rules retained with confidence ≥ {min_confidence}: {rules_filtered.count()}")
rules_filtered.orderBy(F.desc("confidence")).show(10, truncate=False)

+----------+----------+--------------------+----------+------------------+
|     itemA|     itemB|             support|confidence|              lift|
+----------+----------+--------------------+----------+------------------+
|0451515242|0760700435|0.001490312965722...|       0.5|223.66666666666666|
|0451515242|1581730470|0.001490312965722...|       0.5|223.66666666666666|
+----------+----------+--------------------+----------+------------------+



### Make the rules readable by adding titles

In [None]:
rules_readable_prof = (
    rules_filtered
    # Join antecedent titles
    .join(
        title_mapping.alias("tm1"),
        F.col("itemA") == F.col("tm1.Id"),
        "left"
    )
    .withColumnRenamed("Title", "antecedent_title")
    # Join consequent titles
    .join(
        title_mapping.alias("tm2"),
        F.col("itemB") == F.col("tm2.Id"),
        "left"
    )
    .withColumnRenamed("Title", "consequent_title")
    # Keep relevant columns
    .select(
        "itemA", "antecedent_title",
        "itemB", "consequent_title",
        "support", "confidence", "lift"
    )
)

### Custom Ranking and Filtering Logic

In [None]:
rules_final_prof = (
    rules_readable_prof
    # Remove redundant self-pairs (same title in A and B)
    .filter(F.col("antecedent_title") != F.col("consequent_title"))
    # Weighted ranking metric: combines confidence & lift
    .withColumn("custom_score", 0.6 * F.col("confidence") + 0.4 * F.col("lift"))
    .orderBy(F.desc("custom_score"))
)

In [None]:
print("✅ Final ranked rules:", rules_final_prof.count())
rules_final_prof.select(
    "antecedent_title", "consequent_title", "confidence", "lift", "custom_score"
).show(10, truncate=False)

### Generate user recommendations

In [None]:
# Match each user's basket with antecedents to get consequents as recommendations

user_suggestions_prof = (
    baskets_id_df.alias("b")
    # Join where user's basket contains the antecedent item
    .join(
        rules_final_prof.alias("r"),
        F.expr("array_contains(b.basket, r.itemA)"),
        "inner"
    )
    .select(
        F.col("b.User_id").alias("user_id"),
        F.col("r.itemB").alias("recommended_asin"),
        F.col("r.consequent_title").alias("recommended_title"),
        F.col("r.confidence"),
        F.col("r.lift"),
        F.col("r.custom_score")
    )
)


In [86]:
# Remove duplicates per user
user_suggestions_unique_prof = user_suggestions_prof.dropDuplicates(["user_id", "recommended_asin"])

# Rank recommendations per user
windowSpec_prof = Window.partitionBy("user_id").orderBy(F.desc("custom_score")) # Earlier i used avg. rating to sort the books to recommond

user_recommendations_sorted_prof = (
  user_suggestions_unique_prof
  .withColumn("rank", F.row_number().over(windowSpec_prof))
  .groupBy("user_id")
  .agg(
      F.collect_list(
        F.struct(
          F.col("rank"),
          F.col("recommended_asin"),
          F.col("consequent_title"),
          F.col("confidence"),
          F.col("lift"),
          F.col("custom_score"))).alias("recommended_books_sorted")))

In [87]:
user_recommendations_sorted_prof.show(10, truncate=False)

+--------------+-------------------------------------------------------------------------------------------------------------------------------------------------------+
|user_id       |recommended_books_sorted                                                                                                                               |
+--------------+-------------------------------------------------------------------------------------------------------------------------------------------------------+
|A1D2C0WDCSHUWZ|[{1, 0760700435, Sense and sensibility, 0.5, 223.66666666666666, 89.76666666666667}, {2, 1581730470, Emma, 0.5, 223.66666666666666, 89.76666666666667}]|
|A319KYEIAZ3SON|[{1, 0760700435, Sense and sensibility, 0.5, 223.66666666666666, 89.76666666666667}, {2, 1581730470, Emma, 0.5, 223.66666666666666, 89.76666666666667}]|
|A3DICRPEYWH9K5|[{1, 0760700435, Sense and sensibility, 0.5, 223.66666666666666, 89.76666666666667}, {2, 1581730470, Emma, 0.5, 223.66666666666666, 89.7666

### Top 10 Most Recommended Books

In [None]:
top_recommended_books = (
    user_suggestions_prof
    .groupBy("recommended_asin", "recommended_title")
    .agg(F.countDistinct("user_id").alias("num_users_recommended"))
    .orderBy(F.desc("num_users_recommended"))
)

print("Top 10 most frequently recommended books:")
top_recommended_books.show(10, truncate=False)

I declare that this material, which I now submit for assessment, is entirely my own work and has not been taken from the work of others, save and to the extent that such work has been cited and acknowledged within the text of my work, and including any code produced using generative AI systems. I understand that plagiarism, collusion, and copying are grave and serious offences in the university and accept the penalties that would be imposed should I engage in plagiarism, collusion or copying. This assignment, or any part of it, has not been previously submitted by me/us or any other person for assessment on this or any other course of study.

 Precious Prince
