## Loading the data

In [1]:
import numpy as np
import pandas as pd
import ast
import os
import zipfile

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lower, regexp_replace
from pyspark.ml.feature import MinHashLSH, HashingTF
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType

In [None]:

os.environ['KAGGLE_USERNAME'] = "aidanaakkaziyeva"
os.environ['KAGGLE_KEY'] = "609e0e320a0900d9d1865319a498c843"
!kaggle datasets download -d mohamedbakhet/amazon-books-reviews

Dataset URL: https://www.kaggle.com/datasets/mohamedbakhet/amazon-books-reviews
License(s): CC0-1.0


In [5]:
# Define the zip file name
zip_file = "amazon-books-reviews.zip"

# Extract all contents
with zipfile.ZipFile(zip_file, 'r') as zip_ref:
    zip_ref.extractall("amazon-books-reviews")  # Extracts into a folder

In [6]:
os.listdir("amazon-books-reviews")

['books_data.csv', 'Books_rating.csv']

In [3]:
from pyspark.sql import SparkSession

In [4]:
# os.environ["JAVA_HOME"] = r"C:\Program Files\Java\jdk-11"  # Replace with your Java installation path

In [5]:
# os.environ["PYSPARK_PYTHON"] = r"C:\Users\ACER\anaconda3\envs\pyspark310\python.exe"
# os.environ["PYSPARK_DRIVER_PYTHON"] = r"C:\Users\ACER\anaconda3\envs\pyspark310\python.exe"

In [6]:
spark = SparkSession.builder \
    .appName("BooksReviewSimilarity") \
    .config("spark.driver.memory", "5g") \
    .master("local[*]") \
    .getOrCreate()

In [7]:
#books_data = spark.read.csv("amazon-books-reviews/books_data.csv", header=True, inferSchema=True)
books_rating = spark.read.csv("amazon-books-reviews/Books_rating.csv", header=True, inferSchema=True)

## Preprocessing the Books Data

In [8]:
#books_data.show(5)

In [9]:
# how many rows and columns in the dataset
#books_data.count(), len(books_data.columns)

In [10]:
# keep only 1,3,7 columns
#books_data = books_data.select("Title", "authors")

In [11]:
books_rating.show(5)

+----------+--------------------+-----+--------------+--------------------+------------------+------------+-----------+--------------------+--------------------+
|        Id|               Title|Price|       User_id|         profileName|review/helpfulness|review/score|review/time|      review/summary|         review/text|
+----------+--------------------+-----+--------------+--------------------+------------------+------------+-----------+--------------------+--------------------+
|1882931173|Its Only Art If I...| NULL| AVCGYZL8FQQTD|"Jim of Oz ""jim-...|               7/7|         4.0|  940636800|Nice collection o...|This is only for ...|
|0826414346|Dr. Seuss: Americ...| NULL|A30TK6U7DNS82R|       Kevin Killian|             10/10|         5.0| 1095724800|   Really Enjoyed It|I don't care much...|
|0826414346|Dr. Seuss: Americ...| NULL|A3UH4UZ4RSVO82|        John Granger|             10/11|         5.0| 1078790400|Essential for eve...|"If people become...|
|0826414346|Dr. Seuss: Ameri

In [12]:
books_rating.count(), len(books_rating.columns)

(3000000, 10)

In [13]:
# keep only 2,4,10 columns
books_rating = books_rating.select("Title", "User_id", "review/text")

In [14]:
# subsample the dataset (1%)
books_rating = books_rating.sample(0.001, seed=42)
books_rating.count(), len(books_rating.columns)

(3048, 3)

In [15]:
books_rating.show(5)

+--------------------+--------------+--------------------+
|               Title|       User_id|         review/text|
+--------------------+--------------+--------------------+
|MITTEE A GROVE OF...|A2VKWLCNZF4ZVB|Water Street is a...|
|       Powerful Golf|A3S4Z8LOQJKO3D|My 19 year old so...|
|          Foundation| AZD3ZGELAE1CU|one would be hard...|
|Even A Daughter I...| AFUMAMEJKZYNF|"My expectations ...|
|Caught Up Into Pa...| AAT5IG3E8QS9T|"First, I want to...|
+--------------------+--------------+--------------------+
only showing top 5 rows



## clean

In [16]:
import re

In [17]:
# remove null values in all columns
books_rating = books_rating.na.drop()


ensuring that a user only has one entry per book.

In [18]:
books_rating = books_rating.dropDuplicates(
    subset=["Title", "User_id"]
)

In [19]:
#books_rating.count(), len(books_rating.columns)

In [20]:
from pyspark.sql.functions import lower, regexp_replace

In [21]:
df_clean = books_rating.withColumn(
    "clean_text",
    regexp_replace(lower(col("review/text")), r'[^\w\s]', '')
)

In [22]:
# # 1. Clean and preprocess the text data
# def clean_text(text):
#     if text is None:
#         return ""
#     # Basic cleaning - remove special chars, lowercase, etc.
#     text = re.sub(r'[^\w\s]', '', text.lower())
#     return text.strip()

# clean_udf = udf(clean_text, StringType())
# df_clean = books_rating.withColumn("clean_text", clean_udf(col("review/text")))

In [23]:
df_clean.show(5)

+--------------------+--------------------+--------------------+--------------------+
|               Title|             User_id|         review/text|          clean_text|
+--------------------+--------------------+--------------------+--------------------+
|"""Forget Not Lov...|      A3ETZH64WMTD84|Well written, thi...|well written this...|
|       """Stand back| ""I'm going to s...|          1097452800|          1097452800|
|"""The General Da...|      A2521E9PPDSKJF|This is much more...|this is much more...|
|"Alice""s Adventu...|      A2D6DW7DQT9ANV|This is an abridg...|this is an abridg...|
|"Called Together:...|      A3B14FGZJV95LW|This is an excell...|this is an excell...|
+--------------------+--------------------+--------------------+--------------------+
only showing top 5 rows



In [24]:
# 2. Create shingles (n-grams)
def create_shingles(text, n=5):
    if len(text) < n:
        return [text]
    return [text[i:i+n] for i in range(len(text) - n + 1)]

shingle_udf = udf(create_shingles, ArrayType(StringType()))
df_shingled = df_clean.withColumn("shingles", shingle_udf(col("clean_text")))

In [25]:
df_shingled.columns

['Title', 'User_id', 'review/text', 'clean_text', 'shingles']

In [26]:
# 3. Create feature vectors (using hashing trick)
from pyspark.ml.feature import HashingTF

hashing_tf = HashingTF(inputCol="shingles", outputCol="rawFeatures", numFeatures=1024)
featurized_data = hashing_tf.transform(df_shingled)

In [27]:
# show columns
featurized_data.columns

['Title', 'User_id', 'review/text', 'clean_text', 'shingles', 'rawFeatures']

In [28]:
from pyspark.sql.functions import monotonically_increasing_id

df_with_id = featurized_data.withColumn("Review_ID", monotonically_increasing_id())

In [29]:
# 4. Apply MinHash
mh = MinHashLSH(inputCol="rawFeatures", outputCol="hashes", numHashTables=5)
model = mh.fit(df_with_id)

In [30]:
# 5. Find similar reviews
similar_reviews = model.approxSimilarityJoin(
    df_with_id, df_with_id, 0.6, distCol="JaccardDistance"
)

In [31]:
filtered_df = similar_reviews.filter(col("datasetA.Review_ID") != col("datasetB.Review_ID"))

In [32]:
# keeping only one direction of each pair
filtered_df = filtered_df.filter(col("datasetA.review_ID") < col("datasetB.review_ID"))

In [33]:
top3 = (
    filtered_df
    .orderBy(col("JaccardDistance").asc())
    .limit(3)
)

In [34]:
top3.select(
    col("datasetA.review/text").alias("review/text_A"),
    col("datasetA.Title").alias("Title_A"),
    col("datasetB.review/text").alias("review/text_B"),
    col("datasetB.Title").alias("Title_B"),
    col("JaccardDistance")
).show(3, truncate=50)

Py4JJavaError: An error occurred while calling o153.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 17.0 failed 1 times, most recent failure: Lost task 0.0 in stage 17.0 (TID 117) (100.65.113.22 executor driver): java.net.SocketException: Connection reset
	at java.base/java.net.SocketInputStream.read(SocketInputStream.java:186)
	at java.base/java.net.SocketInputStream.read(SocketInputStream.java:140)
	at java.base/java.io.BufferedInputStream.read1(BufferedInputStream.java:290)
	at java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:351)
	at java.base/java.io.DataInputStream.readFully(DataInputStream.java:200)
	at java.base/java.io.DataInputStream.readFully(DataInputStream.java:170)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:86)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:75)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage8.hashAgg_doAggregateWithKeys_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage8.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: java.net.SocketException: Connection reset
	at java.base/java.net.SocketInputStream.read(SocketInputStream.java:186)
	at java.base/java.net.SocketInputStream.read(SocketInputStream.java:140)
	at java.base/java.io.BufferedInputStream.read1(BufferedInputStream.java:290)
	at java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:351)
	at java.base/java.io.DataInputStream.readFully(DataInputStream.java:200)
	at java.base/java.io.DataInputStream.readFully(DataInputStream.java:170)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:86)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:75)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage8.hashAgg_doAggregateWithKeys_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage8.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
