### **Importing the dataset from Kaggle**

In [2]:
import os
os.environ['KAGGLE_USERNAME'] = "xxxxxx"
os.environ['KAGGLE_KEY'] = "xxxxxx"
!kaggle datasets download -d mohamedbakhet/amazon-books-reviews

Dataset URL: https://www.kaggle.com/datasets/mohamedbakhet/amazon-books-reviews
License(s): CC0-1.0
Downloading amazon-books-reviews.zip to /content
100% 1.06G/1.06G [00:05<00:00, 254MB/s]
100% 1.06G/1.06G [00:05<00:00, 227MB/s]


### **Unzipping the files**

In [3]:
!unzip amazon-books-reviews.zip -d /content/amazon_books/

Archive:  amazon-books-reviews.zip
  inflating: /content/amazon_books/Books_rating.csv  
  inflating: /content/amazon_books/books_data.csv  


### **Installing the dipendencies and importing them**

In [4]:
!pip install python-dotenv kaggle pandas swifter nltk matplotlib

Collecting swifter
  Downloading swifter-1.4.0.tar.gz (1.2 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.2/1.2 MB[0m [31m17.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: swifter
  Building wheel for swifter (setup.py) ... [?25l[?25hdone
  Created wheel for swifter: filename=swifter-1.4.0-py3-none-any.whl size=16505 sha256=309f954eadbb03d8441a26ab24f93dad0ee28d9179956921a1c71d81c549f370
  Stored in directory: /root/.cache/pip/wheels/d9/31/ff/ff51141a088571a9f672449e5aad5ea8bb35ca5d95ba135f30
Successfully built swifter
Installing collected packages: swifter
Successfully installed swifter-1.4.0


In [1]:
import pandas as pd
from itertools import combinations, count
from collections import Counter, defaultdict
import hashlib
import re
import swifter
import nltk
nltk.download('stopwords')
from nltk.corpus import stopwords
import numpy as np
import matplotlib.pyplot as plt
import os

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


### **Set up of Spark**

In [6]:
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.5.3/spark-3.5.3-bin-hadoop3.tgz
!tar xf spark-3.5.3-bin-hadoop3.tgz

In [7]:
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.3-bin-hadoop3"
os.environ["PATH"] += ":/content/spark-3.5.3-bin-hadoop3/bin"

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName("SON Algorithm") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .getOrCreate()

sc = spark.sparkContext
print("Spark version:", spark.version)

Spark version: 3.5.1


### **Loading the dataset and exploring it**

In [3]:
data = pd.read_csv("/content/amazon_books/Books_rating.csv")
print(data.head())
print(data.info())


           Id                           Title  Price         User_id  \
0  1882931173  Its Only Art If Its Well Hung!    NaN   AVCGYZL8FQQTD   
1  0826414346        Dr. Seuss: American Icon    NaN  A30TK6U7DNS82R   
2  0826414346        Dr. Seuss: American Icon    NaN  A3UH4UZ4RSVO82   
3  0826414346        Dr. Seuss: American Icon    NaN  A2MVUWT453QH61   
4  0826414346        Dr. Seuss: American Icon    NaN  A22X4XUPKF66MR   

                          profileName review/helpfulness  review/score  \
0               Jim of Oz "jim-of-oz"                7/7           4.0   
1                       Kevin Killian              10/10           5.0   
2                        John Granger              10/11           5.0   
3  Roy E. Perry "amateur philosopher"                7/7           4.0   
4     D. H. Richards "ninthwavestore"                3/3           4.0   

   review/time                                   review/summary  \
0    940636800           Nice collection of Julie Strai

# **Reviews as Baskets and Words as Items**

### **Data Cleaning**

In [4]:
reviews_data_clean = data.copy()

reviews_data_clean = reviews_data_clean[['review/text']]

print("Numbers of NaN for each column:")
print(reviews_data_clean.isna().sum())
print(reviews_data_clean.info())

reviews_data_clean = reviews_data_clean.dropna()
print("Rows remaning after the cleaning:", len(reviews_data_clean))

Numbers of NaN for each column:
review/text    8
dtype: int64
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3000000 entries, 0 to 2999999
Data columns (total 1 columns):
 #   Column       Dtype 
---  ------       ----- 
 0   review/text  object
dtypes: object(1)
memory usage: 22.9+ MB
None
Rows remaning after the cleaning: 2999992


### **Setting the Global Variables**

In [5]:
USE_SUBSAMPLE = True
SUBSAMPLE_FRACTION = 0.15
RELATIVE_SUPPORT = 0.1
MAX_K = 3

### **Subsampling the Data**

In [6]:
df_reviews = reviews_data_clean.copy()

if USE_SUBSAMPLE:
    df_reviews = df_reviews.sample(frac=SUBSAMPLE_FRACTION, random_state=42)
    print(f"Subsample of the dataset ({len(df_reviews)}, {SUBSAMPLE_FRACTION*100:.1f}%)")
else:
    print(f"Entire dataset ({len(df_reviews)} )")

Subsample of the dataset (449999, 15.0%)


### **Tokenization**

In [7]:
def tokenize_text(text):
    text = text.lower()
    tokens = text.split()
    tokens = [t for t in tokens if t not in stop_words and t.strip() != '']
    tokens = [re.sub(r'[^\w\s]', '', t) for t in tokens]
    tokens = [t for t in tokens if t != '']
    return tokens


stop_words = set(stopwords.words('english'))

reviews_data_clean_sub = df_reviews.copy()

reviews_data_clean_sub['tokens'] = reviews_data_clean_sub['review/text'].astype(str).swifter.apply(tokenize_text)
print(reviews_data_clean_sub.head())

Pandas Apply:   0%|          | 0/449999 [00:00<?, ?it/s]

                                               review/text  \
2920649  This book is a prime example of how horribly w...   
2352586  Extremely disappointed by the SHORT length and...   
908220   I will openly admit that I have never seen the...   
1542097  I loved this book so much that I selected it a...   
2297506  Perhaps the best book on the market for passin...   

                                                    tokens  
2920649  [book, prime, example, horribly, written, deep...  
2352586  [extremely, disappointed, short, length, curso...  
908220   [openly, admit, never, seen, punch, judy, stor...  
1542097  [loved, book, much, selected, first, choice, b...  
2297506  [perhaps, best, book, market, passing, pmp, ex...  


### **Creation of the Baskets**

In [8]:
baskets_rdd = spark.sparkContext.parallelize(
   reviews_data_clean_sub['tokens'].tolist()).map(lambda toks: set(toks))

total_baskets = baskets_rdd.count()
support_threshold = int(total_baskets * RELATIVE_SUPPORT)

print(f"Number of total baskets: {total_baskets}")
print(f"Relative support: {RELATIVE_SUPPORT*100:.1f}% ")

Number of total baskets: 449999
Relative support: 10.0% 


### **The Algorithm of Savasere, Omiecinski, and Navathe**

In [9]:
def apriori_partition(partition, support_threshold, total_baskets, max_k=3):
    baskets = list(partition)
    if not baskets:
        return []

    p = len(baskets) / total_baskets
    local_support = max(1, int(support_threshold * p))

    item_counts = Counter()
    for basket in baskets:
        item_counts.update(basket)
    freq_items = {frozenset([i]) for i, c in item_counts.items() if c >= local_support}

    all_frequents = list(freq_items)
    current_freq = freq_items
    k = 2

    while current_freq and k <= max_k:
        candidates = set([
            i1 | i2 for i1 in current_freq for i2 in current_freq
            if len(i1 | i2) == k
        ])
        candidate_counts = Counter()
        for basket in baskets:
            for cand in candidates:
                if cand.issubset(basket):
                    candidate_counts[cand] += 1
        current_freq = {c for c, count in candidate_counts.items() if count >= local_support}
        all_frequents.extend(current_freq)
        k += 1

    return all_frequents


candidates = (baskets_rdd
    .mapPartitions(lambda partition: apriori_partition(partition, support_threshold, total_baskets, MAX_K))
    .distinct()
    .collect()
)


def count_candidates(basket, candidates):
    basket_counts = []
    for cand in candidates:
        if cand.issubset(basket):
            basket_counts.append((cand, 1))
    return basket_counts

candidate_counts = (baskets_rdd
    .flatMap(lambda basket: count_candidates(basket, candidates))
    .reduceByKey(lambda x, y: x + y)
    .filter(lambda x: x[1] >= support_threshold)
)


### **Experimental Results**

In [10]:
num_frequents = candidate_counts.count()

print(f"Total baskets: {total_baskets}")
print(f"Number of candidates {len(candidates)}")
print(f"Relative support: {RELATIVE_SUPPORT*100:.1f}%")
print(f"Total frequent itemsets: {num_frequents}\n")

sample_results = candidate_counts.take(20)
print("First 20 frequent itemsets")
for itemset, count in sample_results:
    rel_sup = count / total_baskets
    print(f"{set(itemset)} : {count}")

output_path = f"/content/output_frequent_itemsets/{SUBSAMPLE_FRACTION}_{RELATIVE_SUPPORT}"
candidate_counts.saveAsTextFile(output_path)
print(f"All results are saved in: {output_path}")

Total baskets: 449999
Number of candidates 100
Relative support: 10.0%
Total frequent itemsets: 100

First 20 frequent itemsets
{'one'} : 162744
{'read', 'first'} : 50901
{'could'} : 60566
{'find'} : 56159
{'book', 'love'} : 50522
{'new'} : 51706
{'well'} : 79682
{'really', 'book'} : 60922
{'it'} : 93586
{'books', 'read', 'book'} : 47207
{'even'} : 73307
{'book', 'like'} : 93253
{'many', 'read'} : 45002
{'recommend'} : 48774
{'little'} : 51473
{'read'} : 205383
{'book', 'like', 'one'} : 46523
{'time', 'read'} : 54375
{'books', 'one'} : 53887
{'would', 'one'} : 50829
{'way'} : 69626
{'would', 'read'} : 55724
{'like', 'read'} : 61317
{'get', 'book'} : 58065
{'reading'} : 91692
{'good'} : 97629
{'much', 'book'} : 66641
{'people', 'book'} : 56971
{'book', 'like', 'read'} : 53240
{'time', 'one'} : 49352
{'book', 'many'} : 68532
{'book', 'time'} : 76910
{'book', 'written'} : 47743
{'books', 'read'} : 56798
{'great'} : 96730
{'book', 'also'} : 64536
{'book', 'would', 'read'} : 49050
{'think'}

# **Users as Baskets and Books as Items**

### **Data Cleaning**

In [11]:
book_data_clean = data.copy()

book_data_clean = book_data_clean[['Title', 'User_id']]

print("Numbers of NaN for each column:")
print(book_data_clean.isna().sum())
print(book_data_clean.info())

book_data_clean = book_data_clean.dropna()
print("Rows remaning after the cleaning:", len(book_data_clean))

Numbers of NaN for each column:
Title         208
User_id    561787
dtype: int64
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3000000 entries, 0 to 2999999
Data columns (total 2 columns):
 #   Column   Dtype 
---  ------   ----- 
 0   Title    object
 1   User_id  object
dtypes: object(2)
memory usage: 45.8+ MB
None
Rows remaning after the cleaning: 2438018


### **Normalization of the Title**

In [12]:
book_data_clean['Title'] = book_data_clean['Title'].str.lower()
book_data_clean['Title'] = book_data_clean['Title'].str.replace(r'[^\w\s]', '', regex=True)
book_data_clean['Title'] = book_data_clean['Title'].str.replace(' ', '')
print(book_data_clean.head())
print(book_data_clean.info())

                     Title         User_id
0  itsonlyartifitswellhung   AVCGYZL8FQQTD
1      drseussamericanicon  A30TK6U7DNS82R
2      drseussamericanicon  A3UH4UZ4RSVO82
3      drseussamericanicon  A2MVUWT453QH61
4      drseussamericanicon  A22X4XUPKF66MR
<class 'pandas.core.frame.DataFrame'>
Index: 2438018 entries, 0 to 2999998
Data columns (total 2 columns):
 #   Column   Dtype 
---  ------   ----- 
 0   Title    object
 1   User_id  object
dtypes: object(2)
memory usage: 55.8+ MB
None


### **Setting the Global Variables**

In [13]:
B1 = 1_000_003
B2 = 999_983
HASH1_SEED = 1
HASH2_SEED = 2

### **Creation of the Baskets**

In [14]:
df = book_data_clean.copy()

user_baskets = (
    df.groupby('User_id')['Title']
      .apply(lambda s: set(s))
      .tolist()
)

n_baskets = len(user_baskets)
print(f"Unique baskets created {n_baskets}")

Unique baskets created 1008961


### **Utility Functions Definition**

In [15]:
def stable_bucket(tuple_key: bytes, num_buckets: int, seed: int) -> int:
    h = hashlib.blake2b(tuple_key, digest_size=8, person=f"ms{seed:02d}".encode())
    return int.from_bytes(h.digest(), byteorder="big") % num_buckets


def tuple_key(items: tuple) -> bytes:
    return "||".join(items).encode("utf-8")

### **The Multistage Algorithm**

In [16]:
def multistage_k_itemsets(user_baskets, min_support, k_max=4):

    s_abs = max(1, int(min_support * n_baskets))
    print(f"Multistage Algorithm (min_support={min_support}, s_abs={s_abs})")


    item_counts = Counter()
    for basket in user_baskets:
        for it in basket:
            item_counts[it] += 1
    frequent_itemsets = set(frozenset([it]) for it, c in item_counts.items() if c >= s_abs)

    all_results = []
    k = 2


    while k <= k_max and frequent_itemsets:
        print(f"Computation for k={k}")

        prev_L = sorted(list(frequent_itemsets), key=lambda fs: tuple(sorted(fs)))
        prev_set = set(prev_L)

        candidate_sets = set()
        len_prev = len(prev_L)
        for i in range(len_prev):
            for j in range(i+1, len_prev):
                l1 = tuple(sorted(prev_L[i]))
                l2 = tuple(sorted(prev_L[j]))

                if l1[:k-2] == l2[:k-2]:
                    cand = frozenset(prev_L[i] | prev_L[j])
                    if len(cand) == k:

                        subsets = (cand - {x} for x in cand)
                        if all(frozenset(sub) in prev_set for sub in subsets):
                            candidate_sets.add(cand)
                else:
                    break

        if not candidate_sets:
            print("0 generated candidates")
            break


        bucket_counts_1 = np.zeros(B1, dtype=np.int32)
        for basket in user_baskets:
            fitems = [it for it in basket if frozenset([it]) in frequent_itemsets]
            if len(fitems) < k:
                continue
            for comb in combinations(sorted(fitems), k):
                hk = tuple_key(comb)
                bidx = stable_bucket(hk, B1, HASH1_SEED)
                bucket_counts_1[bidx] += 1
        bitmap1 = bucket_counts_1 >= s_abs


        bucket_counts_2 = np.zeros(B2, dtype=np.int32)
        for basket in user_baskets:
            fitems = [it for it in basket if frozenset([it]) in frequent_itemsets]
            if len(fitems) < k:
                continue
            for comb in combinations(sorted(fitems), k):
                hk = tuple_key(comb)
                if bitmap1[stable_bucket(hk, B1, HASH1_SEED)]:
                    bidx2 = stable_bucket(hk, B2, HASH2_SEED)
                    bucket_counts_2[bidx2] += 1
        bitmap2 = bucket_counts_2 >= s_abs


        counts = defaultdict(int)
        for basket in user_baskets:
            fitems = [it for it in basket if frozenset([it]) in frequent_itemsets]
            if len(fitems) < k:
                continue
            for comb in combinations(sorted(fitems), k):
                hk = tuple_key(comb)
                if bitmap1[stable_bucket(hk, B1, HASH1_SEED)] and bitmap2[stable_bucket(hk, B2, HASH2_SEED)]:
                    counts[frozenset(comb)] += 1


        new_frequent = set(fs for fs, c in counts.items() if c >= s_abs)


        result_df = pd.DataFrame([
            (tuple(sorted(list(fs))), counts[fs], counts[fs] / n_baskets, k)
            for fs in new_frequent
        ], columns=["itemset", "count", "support", "k"])


        print(f" Number of frequent itemsets {len(new_frequent)} with dimension {k}.")
        all_results.append(result_df)

        frequent_itemsets = new_frequent
        k += 1

    if not all_results:
        return pd.DataFrame(columns=["itemset", "count", "support", "k"])
    return pd.concat(all_results, ignore_index=True)


### **Experimental Results**

In [17]:
support_thresholds = [0.0005, 0.001, 0.002, 0.005, 0.01, 0.02, 0.05]
results_summary = []

for sup in support_thresholds:
    df_res = multistage_k_itemsets(user_baskets, sup, k_max=4)
    counts_by_k = df_res.groupby("k").size().to_dict()
    results_summary.append({
        "min_support": sup,
        "k=2": counts_by_k.get(2, 0),
        "k=3": counts_by_k.get(3, 0),
        "k=4": counts_by_k.get(4, 0)
    })

summary_df = pd.DataFrame(results_summary)
print("Summary Table")
print(summary_df)

Multistage Algorithm (min_support=0.0005, s_abs=504)
Computation for k=2
 Number of frequent itemsets 315 with dimension 2.
Computation for k=3
 Number of frequent itemsets 0 with dimension 3.
Multistage Algorithm (min_support=0.001, s_abs=1008)
Computation for k=2
 Number of frequent itemsets 78 with dimension 2.
Computation for k=3
 Number of frequent itemsets 0 with dimension 3.
Multistage Algorithm (min_support=0.002, s_abs=2017)
Computation for k=2
 Number of frequent itemsets 7 with dimension 2.
Computation for k=3
 Number of frequent itemsets 0 with dimension 3.
Multistage Algorithm (min_support=0.005, s_abs=5044)
Multistage Algorithm (min_support=0.01, s_abs=10089)
Multistage Algorithm (min_support=0.02, s_abs=20179)
Multistage Algorithm (min_support=0.05, s_abs=50448)
Summary Table
   min_support  k=2  k=3  k=4
0       0.0005  315    0    0
1       0.0010   78    0    0
2       0.0020    7    0    0
3       0.0050    0    0    0
4       0.0100    0    0    0
5       0.0200    