In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)

In [2]:
os.environ['KAGGLE_USERNAME'] = "elahezohdi"
os.environ['KAGGLE_KEY'] = "b4d81db0a12258d41bbdde98d1131209"

!kaggle datasets download -d mohamedbakhet/amazon-books-reviews
!unzip -q amazon-books-reviews.zip -d data

Dataset URL: https://www.kaggle.com/datasets/mohamedbakhet/amazon-books-reviews
License(s): CC0-1.0
Downloading amazon-books-reviews.zip to /content
 96% 1.02G/1.06G [00:04<00:00, 297MB/s]
100% 1.06G/1.06G [00:04<00:00, 230MB/s]


In [3]:
df = spark.read.options(header=True, inferSchema=True).csv("data/Books_rating.csv")

df.show()

+----------+--------------------+-----+--------------+--------------------+------------------+------------+-----------+--------------------+--------------------+
|        Id|               Title|Price|       User_id|         profileName|review/helpfulness|review/score|review/time|      review/summary|         review/text|
+----------+--------------------+-----+--------------+--------------------+------------------+------------+-----------+--------------------+--------------------+
|1882931173|Its Only Art If I...| null| AVCGYZL8FQQTD|"Jim of Oz ""jim-...|               7/7|         4.0|  940636800|Nice collection o...|This is only for ...|
|0826414346|Dr. Seuss: Americ...| null|A30TK6U7DNS82R|       Kevin Killian|             10/10|         5.0| 1095724800|   Really Enjoyed It|I don't care much...|
|0826414346|Dr. Seuss: Americ...| null|A3UH4UZ4RSVO82|        John Granger|             10/11|         5.0| 1078790400|Essential for eve...|"If people become...|
|0826414346|Dr. Seuss: Ameri

In [4]:
# Print schema: column names and data types
df.printSchema()

# Show a few rows to understand the content
df.show(5, truncate=False)

root
 |-- Id: string (nullable = true)
 |-- Title: string (nullable = true)
 |-- Price: string (nullable = true)
 |-- User_id: string (nullable = true)
 |-- profileName: string (nullable = true)
 |-- review/helpfulness: string (nullable = true)
 |-- review/score: string (nullable = true)
 |-- review/time: string (nullable = true)
 |-- review/summary: string (nullable = true)
 |-- review/text: string (nullable = true)

+----------+------------------------------+-----+--------------+--------------------------------------+------------------+------------+-----------+-----------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [5]:
df = df.select("User_id", "Title")

In [6]:
from pyspark.sql.functions import col, count, when

# Count nulls per column
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()


+-------+-----+
|User_id|Title|
+-------+-----+
| 562250|  208|
+-------+-----+



In [7]:
# Drop rows with missing values in either column
df = df.dropna(subset=["User_id", "Title"])

# Drop exact duplicate reviews
df = df.dropDuplicates(["User_id", "Title"])


In [8]:
print("Unique users:", df.select("User_id").distinct().count())
print("Unique titles:", df.select("Title").distinct().count())

Unique users: 1008423
Unique titles: 206640


In [9]:
# Show examples of similar book titles
from pyspark.sql.functions import lower

df_lower = df.withColumn("Title", lower(col("Title")))

In [10]:
df.describe()

summary,User_id,Title
count,2114559,2114559
mean,23.35,2071.738021978022
stddev,33.92977175153278,2403.9374661206234
min,""""" Film acting """"""",""""""" Film technique"
max,AZZZZW74AAX75,xBase Programming...


In [11]:
df_sample = df.sample(False, 0.01, seed=42)
print("Sample size:", df_sample.count())
df_sample.show(5, truncate=False)

Sample size: 21280
+--------------+---------------------------------------------------------------------------------+
|User_id       |Title                                                                            |
+--------------+---------------------------------------------------------------------------------+
|ARD5I31J12UU8 |The Mayor of Casterbridge (Signet classics)                                      |
|A3N8RZSXKN1OR2|Sword and sorceress xiii                                                         |
|A3IUPJ2SSLG7RH|The Complete Book of Chinese Health Balls: Background and Use of the Health Balls|
|A1VQEAFW83OTG9|Rabbit, run                                                                      |
|AJHPZ48YOZK4H |Zen and the Art of Motorcycle Maintenance                                        |
+--------------+---------------------------------------------------------------------------------+
only showing top 5 rows



In [12]:
from pyspark.sql.functions import collect_set

# Group titles into baskets per user
df_baskets = df.groupBy("User_id").agg(collect_set("Title").alias("Basket"))

# Show a few examples of user baskets
df_baskets.show(5, truncate=False)

# Count total number of baskets

+---------------------+-------------------------------------------------------------------------------+
|User_id              |Basket                                                                         |
+---------------------+-------------------------------------------------------------------------------+
|A06765203K4HQ4HP9CW9A|[The Power of the Actor]                                                       |
|A1005YJDO9VCIY       |[Choose Peace & Happiness: A 52-Week Guide, The Gift of the Acorn]             |
|A1006V961PBMKA       |[DAVE GROHL, Fighting The Forces: What's At Stake In Buffy The Vampire Slayer?]|
|A100DWG2GUBPF2       |[Purple America.]                                                              |
|A100HWDN5JMK8G       |[Jake's Gold]                                                                  |
+---------------------+-------------------------------------------------------------------------------+
only showing top 5 rows



In [13]:
# Sample 1% of the baskets for faster testing (disable later for full run)
df_sampled_baskets = df_baskets.sample(fraction=0.01, seed=42)

# Check how many baskets are in the sample
print("Sample size:", df_sampled_baskets.count())

# Show a few sample baskets
df_sampled_baskets.show(5, truncate=False)


Sample size: 10191
+--------------+--------------------------------------------------------------------------------------------------------------------------------------------------+
|User_id       |Basket                                                                                                                                            |
+--------------+--------------------------------------------------------------------------------------------------------------------------------------------------+
|A168DC570805X |[Cosmic Voyage: True Evidence of Extraterrestrials Visiting Earth]                                                                                |
|A17MHHEGHJ7GMO|[The Night Before Christmas, The (Wee Books for Wee Folk), A visit from Santa Claus, Twas the night before Christmas;: A visit from St. Nicholas,]|
|A19W94F9XQKV0L|[A Biblical Defense Guide for Gays, Lesbians and Those Who Love Them]                                                                            

In [14]:
from pyspark.ml.fpm import FPGrowth

# Retry with lower minSupport
fpGrowth = FPGrowth(itemsCol="Basket", minSupport=0.001, minConfidence=0.1)
model = fpGrowth.fit(df_sampled_baskets)

# Get frequent itemsets and rules
freq_itemsets = model.freqItemsets
assoc_rules = model.associationRules

print("Frequent Itemsets:")
freq_itemsets.show(10, truncate=False)

print("Association Rules:")
assoc_rules.show(10, truncate=False)


Frequent Itemsets:
+------------------------------------------------------------------------------------------------------------------+----+
|items                                                                                                             |freq|
+------------------------------------------------------------------------------------------------------------------+----+
|[The Hobbit There and Back Again]                                                                                 |40  |
|[The Hobbit]                                                                                                      |40  |
|[The Hobbit, The Hobbit There and Back Again]                                                                     |40  |
|[The Hobbit or There and Back Again]                                                                              |40  |
|[The Hobbit or There and Back Again, The Hobbit]                                                                  |40  |
|[The

In [37]:
df_clean = df.dropna(subset=["User_id", "Title"]).dropDuplicates(["User_id", "Title"])


In [38]:
# Sample 1% of your cleaned DataFrame (adjust sample size as needed)
df_sample = df_clean.sample(withReplacement=False, fraction=0.01, seed=42)

In [39]:
df_sample.show(3)

+--------------+--------------------+
|       User_id|               Title|
+--------------+--------------------+
| ARD5I31J12UU8|The Mayor of Cast...|
|A3N8RZSXKN1OR2|Sword and sorcere...|
|A3IUPJ2SSLG7RH|The Complete Book...|
+--------------+--------------------+
only showing top 3 rows



In [40]:
from pyspark.sql.functions import concat_ws

# Convert baskets to a single comma-separated string
baskets_df = df_baskets.withColumn("basket_str", concat_ws(", ", "Basket")).select("basket_str")

# Convert to RDD of strings
rdd = baskets_df.rdd.map(lambda row: row['basket_str'])

# Split into list of book titles
baskets = rdd.map(lambda line: line.split(", "))

# You can now work with baskets as RDD[list of strings]




In [45]:
# 1. Flatten function defined properly at the top-level
def identity(x):
    return x

# 2. Hashing vocabulary: flat and distinct items
flat_items_rdd = baskets.flatMap(identity).distinct().zipWithIndex()

# 3. Safe collect (this is what Nicolaus did)
hash_index = dict(flat_items_rdd.collect())

# 4. Hashing the baskets
def hashing(basket):
    return {hash_index[item] for item in basket if item in hash_index}

hashed_baskets = baskets.map(hashing)


Traceback (most recent call last):
  File "/content/spark-3.1.1-bin-hadoop3.2/python/pyspark/serializers.py", line 437, in dumps
    return cloudpickle.dumps(obj, pickle_protocol)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/content/spark-3.1.1-bin-hadoop3.2/python/pyspark/cloudpickle/cloudpickle_fast.py", line 72, in dumps
    cp.dump(obj)
  File "/content/spark-3.1.1-bin-hadoop3.2/python/pyspark/cloudpickle/cloudpickle_fast.py", line 540, in dump
    return Pickler.dump(self, obj)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/content/spark-3.1.1-bin-hadoop3.2/python/pyspark/cloudpickle/cloudpickle_fast.py", line 630, in reducer_override
    return self._function_reduce(obj)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/content/spark-3.1.1-bin-hadoop3.2/python/pyspark/cloudpickle/cloudpickle_fast.py", line 503, in _function_reduce
    return self._dynamic_function_reduce(obj)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/content/spark-3.1.1-bin-hadoop3.2/python/

PicklingError: Could not serialize object: IndexError: tuple index out of range

In [48]:
from itertools import combinations

def a_priori(baskets_rdd, support_threshold):
    print("=== A-Priori Algorithm ===")

    # Pass 1: Count singletons
    singleton_counts = baskets_rdd.flatMap(lambda basket: [(item, 1) for item in basket]) \
                                  .reduceByKey(lambda a, b: a + b) \
                                  .filter(lambda x: x[1] >= support_threshold)

    singletons = singleton_counts.map(lambda x: (x[0],)).collect()
    print("Frequent singletons:", len(singletons))

    k = 2
    current_frequents = set(singletons)

    while True:
        print(f"\nGenerating itemsets of size {k}...")
        candidate_counts = baskets_rdd.flatMap(
            lambda basket: [(tuple(sorted(c)), 1) for c in combinations(basket, k)
                            if all(tuple(sorted(sub)) in current_frequents for sub in combinations(c, k-1))]
        ).reduceByKey(lambda a, b: a + b).filter(lambda x: x[1] >= support_threshold)

        count = candidate_counts.count()
        if count == 0:
            print(f"No frequent itemsets of size {k}. Stopping.")
            break

        current_frequents = set(candidate_counts.map(lambda x: x[0]).collect())
        print(f"Frequent itemsets of size {k}: {count}")
        k += 1


In [49]:
# Adjust the support threshold to something meaningful
support_threshold = 50  # try 20 or 10 for testing
a_priori(baskets, support_threshold)


=== A-Priori Algorithm ===


Traceback (most recent call last):
  File "/content/spark-3.1.1-bin-hadoop3.2/python/pyspark/serializers.py", line 437, in dumps
    return cloudpickle.dumps(obj, pickle_protocol)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/content/spark-3.1.1-bin-hadoop3.2/python/pyspark/cloudpickle/cloudpickle_fast.py", line 72, in dumps
    cp.dump(obj)
  File "/content/spark-3.1.1-bin-hadoop3.2/python/pyspark/cloudpickle/cloudpickle_fast.py", line 540, in dump
    return Pickler.dump(self, obj)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/content/spark-3.1.1-bin-hadoop3.2/python/pyspark/cloudpickle/cloudpickle_fast.py", line 630, in reducer_override
    return self._function_reduce(obj)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/content/spark-3.1.1-bin-hadoop3.2/python/pyspark/cloudpickle/cloudpickle_fast.py", line 503, in _function_reduce
    return self._dynamic_function_reduce(obj)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/content/spark-3.1.1-bin-hadoop3.2/python/

PicklingError: Could not serialize object: IndexError: tuple index out of range