In [2]:
# --- 0. Imports / setup ---
import os, zipfile, glob
import findspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import collect_set, size
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover
from pyspark.ml.fpm import FPGrowth

# --- PROJECT CONFIGURATION ---
# Set this to False to attempt running on the whole dataset (if resources allow)
USE_SUBSAMPLE_FOR_WORDS = True 
SUBSAMPLE_FRACTION = 0.03


### 1. Kaggle authentication and data download

In [3]:
os.environ['KAGGLE_USERNAME'] = "******"
os.environ['KAGGLE_KEY'] = "******"

DATASET_ID = "mohamedbakhet/amazon-books-reviews"
DATA_DIR = "data"
CSV_NAME = "Books_rating.csv"   # file we will read after unzip

os.makedirs(DATA_DIR, exist_ok=True)

# Download from Kaggle into data/
!kaggle datasets download -d $DATASET_ID -p $DATA_DIR -o

# Unzip the dataset into data/
for zpath in glob.glob(f"{DATA_DIR}/*.zip"):
    with zipfile.ZipFile(zpath, "r") as z:
        z.extractall(DATA_DIR)

!ls -lh $DATA_DIR

Dataset URL: https://www.kaggle.com/datasets/mohamedbakhet/amazon-books-reviews
License(s): CC0-1.0
Downloading amazon-books-reviews.zip to data
 93%|████████████████████████████████████▍  | 0.99G/1.06G [00:01<00:00, 954MB/s]
100%|███████████████████████████████████████| 1.06G/1.06G [00:01<00:00, 946MB/s]
total 8166296
-rw-r--r--@ 1 menimalina  staff   2.7G Oct 27 18:54 Books_rating.csv
-rw-r--r--@ 1 menimalina  staff   1.1G Sep 13  2022 amazon-books-reviews.zip
-rw-r--r--@ 1 menimalina  staff   173M Oct 27 18:54 books_data.csv


In [4]:
# 2. Start Spark

import os
os.environ.pop("SPARK_HOME", None)

findspark.init()

spark = SparkSession.builder.getOrCreate()
spark.stop()

spark = (
    SparkSession.builder
    .appName("MarketBasketAmazonBooks")
    .master("local[*]")
    .config("spark.driver.memory", "32g")   # or 12g if you have RAM
    .getOrCreate()
)
spark.sparkContext.setLogLevel("WARN")
spark

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/10/27 18:54:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### 3. Load data (pandas sample + Spark full)

In [5]:
# 3. Load the Books_rating.csv file with Spark

ratings_path = f"{DATA_DIR}/{CSV_NAME}"

df = (
    spark.read
    .option("header", True)
    .option("multiLine", True)   # reviews can span multiple lines
    .option("escape", '"')       # handle quotes
    .csv(ratings_path, inferSchema=True)
)

df.printSchema()
df.show(5, truncate=100)


                                                                                

root
 |-- Id: string (nullable = true)
 |-- Title: string (nullable = true)
 |-- Price: double (nullable = true)
 |-- User_id: string (nullable = true)
 |-- profileName: string (nullable = true)
 |-- review/helpfulness: string (nullable = true)
 |-- review/score: double (nullable = true)
 |-- review/time: integer (nullable = true)
 |-- review/summary: string (nullable = true)
 |-- review/text: string (nullable = true)

+----------+------------------------------+-----+--------------+----------------------------------+------------------+------------+-----------+-----------------------------------------------+----------------------------------------------------------------------------------------------------+
|        Id|                         Title|Price|       User_id|                       profileName|review/helpfulness|review/score|review/time|                                 review/summary|                                                                                         revi

In [6]:
# 4. Select and rename only the columns we need

BOOK_COL = "Id"            # book identifier in the dataset
USER_COL = "User_id"       # reviewer identifier in the dataset
TEXT_COL = "review/text"   # raw review text

df_clean = (
    df
    .select(
        F.col(BOOK_COL).alias("book_id"),
        F.col(USER_COL).alias("user_id"),
        F.col(TEXT_COL).alias("review_text")
    )
)

df_clean.show(5, truncate=100)


+----------+--------------+----------------------------------------------------------------------------------------------------+
|   book_id|       user_id|                                                                                         review_text|
+----------+--------------+----------------------------------------------------------------------------------------------------+
|1882931173| AVCGYZL8FQQTD|This is only for Julie Strain fans. It's a collection of her photos -- about 80 pages worth with ...|
|0826414346|A30TK6U7DNS82R|I don't care much for Dr. Seuss but after reading Philip Nel's book I changed my mind--that's a g...|
|0826414346|A3UH4UZ4RSVO82|If people become the books they read and if "the child is father to the man," then Dr. Seuss (The...|
|0826414346|A2MVUWT453QH61|Theodore Seuss Geisel (1904-1991), aka &quot;Dr. Seuss,&quot; was one of the most influential wri...|
|0826414346|A22X4XUPKF66MR|Philip Nel - Dr. Seuss: American IconThis is basically an academic ove

In [7]:
# 5. Build baskets of books per user

user_book_baskets = (
    df_clean
    .select("user_id", "book_id")
    .filter(F.col("user_id").isNotNull() & F.col("book_id").isNotNull())
    .groupBy("user_id")
    .agg(F.collect_set("book_id").alias("items"))
    .filter(F.size("items") >= 2)
    .cache()
)

user_book_baskets.count() 

print("Number of user baskets:", user_book_baskets.count())
user_book_baskets.show(5, truncate=120)


                                                                                

Number of user baskets: 308676
+--------------+------------------------------------------------------------------------------------------------------------------------+
|       user_id|                                                                                                                   items|
+--------------+------------------------------------------------------------------------------------------------------------------------+
|A1005YJDO9VCIY|                                                                                                [0977390403, 1590030591]|
|A1006V961PBMKA|                                                                                                [B000N6OSCM, 0742516814]|
|A1018G2FPJBJ0S|                                                                                                [B000KW0HGK, B0006C2DE8]|
|A1036ICK1O363W|                                                                        [1559390921, 0553354566, 155939188X, 1932100385]|
|A1

In [8]:
# 6. Build baskets of words per review
if USE_SUBSAMPLE_FOR_WORDS:
    print(f"Using a {SUBSAMPLE_FRACTION*100}% subsample for word analysis.")
    df_clean_subset = df_clean.sample(False, SUBSAMPLE_FRACTION, seed=42)
else:
    print("Using the full dataset for word analysis (may require more resources).")
    df_clean_subset = df_clean

tok = RegexTokenizer(inputCol="review_text", outputCol="tokens", pattern="\\W+")
rm  = StopWordsRemover(inputCol="tokens", outputCol="filtered", locale="en_US")

# 1. Lowercase the review text
df_lower = df_clean_subset.withColumn("review_text", F.lower(F.col("review_text")))

# 2. Handle nulls and apply tokenizer/remover
df_no_null = df_lower.fillna({"review_text": ""})
df_tok = tok.transform(df_no_null)
df_filt = rm.transform(df_tok)

word_baskets = (
    df_filt
    .select(F.array_distinct("filtered").alias("items"))
    .filter(F.size("items") >= 2)
    .cache()
)

word_baskets.count() 

print("Number of word baskets:", word_baskets.count())
word_baskets.show(5, truncate=120)


Using a 3.0% subsample for word analysis.


25/10/27 18:54:34 WARN StopWordsRemover: Default locale set was [en_RU]; however, it was not found in available locales in JVM, falling back to en_US locale. Set param `locale` in order to respect another locale.
[Stage 16:>                                                         (0 + 1) / 1]

Number of word baskets: 90284
+------------------------------------------------------------------------------------------------------------------------+
|                                                                                                                   items|
+------------------------------------------------------------------------------------------------------------------------+
|[self, published, book, want, know, read, paragraphs, 5, star, reviews, must, written, ms, haddon, family, friends, p...|
|[chance, read, book, sure, bothered, purchasing, store, sourdough, lore, interesting, order, make, seem, authenticall...|
|[book, worth, keep, collection, advise, sourdough, ruth, also, told, picture, past, 100, years, ago, alaska, stand, m...|
|             [gave, detailed, vision, day, life, commune, still, existence, today, interesting, quick, delightful, read]|
|[dr, baker, one, great, 20th, century, metaphysicians, like, emmet, fox, ernest, holmes, thomas, troward, un

                                                                                

In [9]:
# 7. Mine frequent itemsets and association rules using Spark FP-Growth

MIN_SUPPORT_USER_BOOKS = 0.01   # 1% of baskets
MIN_SUPPORT_WORDS      = 0.04   # 4% of baskets
MIN_CONFIDENCE         = 0.5    # confidence threshold for rules

# A) user -> books
fp_user = FPGrowth(
    itemsCol="items",
    minSupport=MIN_SUPPORT_USER_BOOKS,
    minConfidence=MIN_CONFIDENCE
)
model_user = fp_user.fit(user_book_baskets)

freq_user  = model_user.freqItemsets.orderBy(F.desc("freq"))
rules_user = model_user.associationRules.orderBy(F.desc("confidence"))

print("Frequent book itemsets:")
freq_user.show(20, truncate=False)

print("Book association rules:")
rules_user.show(20, truncate=False)

# B) review -> words
fp_word = FPGrowth(
    itemsCol="items",
    minSupport=MIN_SUPPORT_WORDS,  # Use the word-specific support!
    minConfidence=MIN_CONFIDENCE
)
model_word = fp_word.fit(word_baskets) # *** FIT TO word_baskets ***

freq_word  = model_word.freqItemsets.orderBy(F.desc("freq"))
rules_word = model_word.associationRules.orderBy(F.desc("confidence"))

print("Frequent word itemsets:")
freq_word.show(20, truncate=False)

print("Word association rules:")
rules_word.show(20, truncate=False)


25/10/27 18:54:48 WARN FPGrowth: Input data is not cached.
                                                                                

Frequent book itemsets:
+------------------------------------------------+----+
|items                                           |freq|
+------------------------------------------------+----+
|[B000ILIJE0]                                    |3574|
|[B000NWU3I4]                                    |3562|
|[B000NWU3I4, B000ILIJE0]                        |3561|
|[B000PC54NG]                                    |3540|
|[B000PC54NG, B000ILIJE0]                        |3539|
|[B000NWQXBA]                                    |3535|
|[B000NWQXBA, B000PC54NG]                        |3535|
|[B000NWQXBA, B000PC54NG, B000ILIJE0]            |3535|
|[B000NWQXBA, B000ILIJE0]                        |3535|
|[B000PC54NG, B000NWU3I4]                        |3534|
|[B000PC54NG, B000NWU3I4, B000ILIJE0]            |3533|
|[B000NWQXBA, B000PC54NG, B000NWU3I4]            |3529|
|[B000NWQXBA, B000PC54NG, B000NWU3I4, B000ILIJE0]|3529|
|[B000NWQXBA, B000NWU3I4]                        |3529|
|[B000NWQXBA, B000NWU3I4

25/10/27 18:54:54 WARN FPGrowth: Input data is not cached.
                                                                                

Frequent word itemsets:


                                                                                

+-------------+-----+
|items        |freq |
+-------------+-----+
|[book]       |68339|
|[read]       |42670|
|[read, book] |34792|
|[one]        |33705|
|[one, book]  |26584|
|[like]       |23637|
|[story]      |21782|
|[time]       |20242|
|[good]       |20198|
|[great]      |19716|
|[like, book] |19494|
|[reading]    |18996|
|[books]      |18740|
|[one, read]  |18701|
|[well]       |18490|
|[first]      |17620|
|[many]       |16964|
|[much]       |16680|
|[good, book] |16618|
|[story, book]|16433|
+-------------+-----+
only showing top 20 rows

Word association rules:


[Stage 45:>                 (0 + 1) / 1][Stage 46:>                 (0 + 1) / 1]

+---------------------+----------+------------------+------------------+--------------------+
|antecedent           |consequent|confidence        |lift              |support             |
+---------------------+----------+------------------+------------------+--------------------+
|[good, like, read]   |[book]    |0.9097744360902256|1.2019209410142075|0.045567320898498076|
|[first, like, read]  |[book]    |0.9065442020665901|1.1976534151711324|0.04372867839262771 |
|[think, good]        |[book]    |0.9047261815453863|1.1952515924237062|0.04007354570023482 |
|[think, one, read]   |[book]    |0.9041095890410958|1.194436999912002 |0.04093748615480041 |
|[much, like, read]   |[book]    |0.9040537299112497|1.1943632033144655|0.04174604581099641 |
|[reading, like, read]|[book]    |0.9025206335043497|1.192337799430877 |0.04481414204067166 |
|[people, one, read]  |[book]    |0.9016357688113413|1.1911687872424697|0.04578884409197643 |
|[many, like, read]   |[book]    |0.9011152416356877|1.19048

                                                                                

In [10]:
freq_user.limit(50).toPandas().to_csv("freq_user_itemsets.csv", index=False)
rules_user.limit(50).toPandas().to_csv("rules_user_books.csv", index=False)
freq_word.limit(50).toPandas().to_csv("freq_word_itemsets.csv", index=False)
rules_word.limit(50).toPandas().to_csv("rules_word_reviews.csv", index=False)

print("\nSaved CSVs:")
print("- freq_user_itemsets.csv")
print("- rules_user_books.csv")
print("- freq_word_itemsets.csv")
print("- rules_word_reviews.csv")

[Stage 57:>                                                         (0 + 1) / 1]


Saved CSVs:
- freq_user_itemsets.csv
- rules_user_books.csv
- freq_word_itemsets.csv
- rules_word_reviews.csv


                                                                                

In [11]:
import pandas as pd
import matplotlib.pyplot as plt

plt.rcParams["figure.figsize"] = (8, 4)   # default size
plt.rcParams["axes.grid"] = True
plt.rcParams["grid.linestyle"] = ":"
plt.rcParams["grid.alpha"] = 0.4
plt.rcParams["font.size"] = 11

In [12]:
# 1. Spark -> Pandas
freq_user_pd  = freq_user.limit(20).toPandas()
freq_word_pd  = freq_word.limit(20).toPandas()
rules_user_pd = rules_user.limit(20).toPandas()
rules_word_pd = rules_word.limit(20).toPandas()

def plot_top_itemsets(df, title, filename=None):
    labels = df["items"].apply(lambda x: ", ".join(x) if isinstance(x, (list, tuple)) else str(x))
    plt.figure()
    plt.barh(range(len(df)), df["freq"])
    plt.yticks(range(len(df)), labels)
    plt.gca().invert_yaxis()
    plt.xlabel("Frequency (count of baskets)")
    plt.title(title)
    plt.tight_layout()
    if filename:
        plt.savefig(filename, dpi=300)
        plt.close()
    else:
        plt.show()

def plot_top_rules_by_confidence(df, title, filename=None):
    df_sorted = df.sort_values(by="confidence", ascending=False).head(20).reset_index(drop=True)
    rule_labels = df_sorted.apply(
        lambda row: f"{{{', '.join(row['antecedent'])}}} → {{{', '.join(row['consequent'])}}}",
        axis=1
    )
    plt.figure()
    plt.barh(range(len(df_sorted)), df_sorted["confidence"])
    plt.yticks(range(len(df_sorted)), rule_labels)
    plt.gca().invert_yaxis()
    plt.xlabel("Confidence")
    plt.title(title)
    plt.tight_layout()
    if filename:
        plt.savefig(filename, dpi=300)
        plt.close()
    else:
        plt.show()

def plot_confidence_vs_lift(df, title, filename=None):
    plt.figure()
    plt.scatter(df["lift"], df["confidence"], s=df["support"] * 4000, alpha=0.6)
    plt.xlabel("Lift")
    plt.ylabel("Confidence")
    plt.title(title)

    top5 = df.sort_values(by="confidence", ascending=False).head(5)
    for _, row in top5.iterrows():
        rule_txt = f"{','.join(row['antecedent'])} → {','.join(row['consequent'])}"
        plt.annotate(rule_txt,
                     (row["lift"], row["confidence"]),
                     xytext=(5,5),
                     textcoords="offset points",
                     fontsize=8)

    plt.tight_layout()
    if filename:
        plt.savefig(filename, dpi=300)
        plt.close()
    else:
        plt.show()

# Make + save plots
plot_top_itemsets(freq_user_pd,  "Top Frequent Book Itemsets (User–Book baskets)", "fig_user_itemsets.png")
plot_top_itemsets(freq_word_pd,  "Top Frequent Word Itemsets (Word–Review baskets)", "fig_word_itemsets.png")

plot_top_rules_by_confidence(rules_user_pd, "Top Association Rules (User–Book baskets)", "fig_user_rules.png")
plot_top_rules_by_confidence(rules_word_pd, "Top Association Rules (Word–Review baskets)", "fig_word_rules.png")

plot_confidence_vs_lift(rules_user_pd, "Confidence vs Lift (User–Book rules)", "fig_user_scatter.png")
plot_confidence_vs_lift(rules_word_pd, "Confidence vs Lift (Word–Review rules)", "fig_word_scatter.png")

print("Saved figures: fig_user_itemsets.png, fig_word_itemsets.png, fig_user_rules.png, fig_word_rules.png, fig_user_scatter.png, fig_word_scatter.png")


  plt.tight_layout()


Saved figures: fig_user_itemsets.png, fig_word_itemsets.png, fig_user_rules.png, fig_word_rules.png, fig_user_scatter.png, fig_word_scatter.png
