# Title

Project website: https://docs.google.com/document/d/1GkK28wOyjTPZEZuOumKPU0z1NzVT_9sxETjPOcLPVYo/edit?tab=t.0#heading=h.qifoo7co6qtd

Professor website: https://malchiodi.di.unimi.it/teaching/AMD-DSE/2024-25/en

In [None]:
#!git clone https://github.com/chiesastefano/massiveData
import os
#os.chdir("massiveData")

In [None]:
# Install required packages from the requirement.txt file if not already installed
!pip install -r requirements.txt

In [None]:
import subprocess
import sys
from key import key_secret

# Set Your own Kaggle credentials
os.environ['KAGGLE_USERNAME'] = "ilchurch"
os.environ['KAGGLE_KEY'] = key_secret

# Ensure kaggle is installed
try:
    import kaggle
except ImportError:
    subprocess.check_call([sys.executable, "-m", "pip", "install", "kaggle"])
    import kaggle

# Import and authenticate using KaggleApi
from kaggle.api.kaggle_api_extended import KaggleApi
api = KaggleApi()
api.authenticate()

# Check if the dataset file already exists
if not os.path.exists('data/Books_rating.csv'):
    api.dataset_download_files('mohamedbakhet/amazon-books-reviews', path='data', unzip=True)
else:
    print("Dataset already exists.")

In [None]:
# Load the dataset with spark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("JaccardSimilarity") \
    .master("local[*]") \
    .getOrCreate()
books_rating = spark.read.csv("data/Books_rating.csv", header=True, inferSchema=True)

In [None]:
# Display the first few rows of the books_rating dataset
books_rating.show(5)

# Create a subsample of the dataset for performance reasons
I will use a sample of the dataset as a test case. My local and Google Collab computers are not powerful enough to handle the full dataset. I will use *sample* as a parameter can it can be changed to the desired sample size.

In [None]:
sample = 0.01 # 1% of the dataset

In [None]:
books_rating_sample = books_rating.sample(fraction=sample, seed=42)
books_rating_sample.show(5)
sample_size = books_rating_sample.count()

In [None]:
sample_size

# Checking for null values in the 'review/text' column
Since we are using text data, we need to check for null values in the 'review/text' column in order to take a decision on how to handle them.

In [None]:
# Check for null values in the 'review/text' column
null_rows_df = books_rating.filter(
    books_rating['review/text'].isNull() | (books_rating['review/text'].rlike('^\\s*$'))
)

null_rows_df.show(30)

Since the 'review/text' null values are more or less duplicates.

They share the same columns, besides the identifiers and the Title. Although the Title is not the same, with a qualitative check we can see they are about the same book.

In [None]:
from pyspark.sql.functions import col
filtered_df = books_rating.filter(
    col("Title").contains("Lord of the Rings")
).select("Title", "review/text").limit(100)

# Show the result
filtered_df.show(100, truncate=False)

There are many others Lord of The Rings reviews. I think we can't discard the NA values and it's better to keep them. We can use also the Title as similarity check.

In [None]:
# Check for null values in the 'review/text' column
null_rows_df_sample = books_rating_sample.filter(
    books_rating_sample['review/text'].isNull() | (books_rating_sample['review/text'].rlike('^\\s*$'))
)

null_rows_df_sample.show(30)

The is a null value in the subsample

# Jaccard Similarity

## Tokenization
Since the dataset is too big, I will use a subsample of the dataset to test the Jaccard Similarity approach, for test purposes. I will then try top scale it to the full dataset.
In addition, I will use the MapReduce approach to parallelize the computation of the Jaccard Similarity.

In [None]:
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType

In [None]:
tokenizer = RegexTokenizer(inputCol="review/text", outputCol="reviews/tokens", pattern="\\W") # \\W is a regex pattern that matches any non-word character

It tokenizes the text into words. I am trying this approach because I believe it will be faster then some neural network based approach, since this use sparks. It will have some limitations, since it does not take in account of compound words, but I think it will be good for the first approach.

In [None]:
stopwords_remover = StopWordsRemover(inputCol="reviews/tokens", outputCol="tokens/nostopwords")

StopWordsRemover() uses a predefined list of stop words in English. It removes common words that do not carry much meaning, such as "the", "is", "in", etc. This is important, since the Jaccard Similarity is based on the number of common words between two texts. If we don't remove the stop words, the Jaccard Similarity will be biased by the number of stop words in the texts, which are not relevant for the meaning of the texts.

In [None]:
def remove_numbers(tokens):
    return [token for token in tokens if not token.isdigit()]

Numbers are often not relevant for the meaning of a review. In our dataset, the score is stored in a separated column, so we can remove the numbers from the text.

In [None]:
remove_numbers_udf = udf(remove_numbers, ArrayType(StringType())) # Specify the return type as ArrayType(StringType), meaning an array/list of strings

Sparks does not work as a Python object, so we need UDF (user defined function) to use it in the pipeline.

In [None]:
tokenized_books_rating_sample = tokenizer.transform(books_rating_sample) # Apply the tokenizer to the dataset
tokenized_reviews_nostopwords_sample = stopwords_remover.transform(tokenized_books_rating_sample) # Remove the stop words from the tokenized dataset
tokenized_rw_nsw_nn_sample = tokenized_reviews_nostopwords_sample.withColumn("tokens/nostopwords/nonumbers", remove_numbers_udf(col("tokens/nostopwords"))) # Remove the numbers from the dataset

In [None]:
books_rating_sample.show(10)
tokenized_rw_nsw_nn_sample.show(10, truncate=True)

In [None]:
from pyspark.ml.feature import CountVectorizer

df_safe = tokenized_rw_nsw_nn_sample \
    .filter(col("review/text").isNotNull())

cv = CountVectorizer(
    inputCol="tokens/nostopwords/nonumbers",
    outputCol="features",
    vocabSize=5000,
    minDF=10
)

model = cv.fit(df_safe)
vocab = model.vocabulary


We need CountVectorizer() to create a vocabulary of the tokens. It will create a vocabulary of the most frequent tokens in the dataset. The vocabSize parameter is the maximum size of the vocabulary. The minDF parameter is the minimum number of documents that must contain a token for it to be included in the vocabulary. This is important, since we want to remove rare tokens that are not relevant for the meaning of the texts.

In [None]:
top30 = model.vocabulary[:30]
print("Top 30 tokens (approx. by frequency):")
for i, token in enumerate(top30, start=1):
    print(f"{i:2d}. {token}")

This method approximates the top 30 words in the vocaboulary. However, it doesn't show the frequency, since CountVectorizer() only approximates the real frequency of the top words. I want to find them.

In [None]:
from pyspark.sql.functions import explode, col

exploded = df_safe.select(explode(col("tokens/nostopwords/nonumbers")).alias("token")) 

# Filter to only the top-20 tokens
topK_set = set(model.vocabulary[:20])  # top 20 from vocab
filtered = exploded.filter(col("token").isin(topK_set))

exact_counts = (filtered.groupBy("token").count().orderBy(col("count").desc()))
exact_counts.show(truncate=False)

In [None]:
threshold = 0.20 * sample_size # You can change the percentage of the sample size to set the threshold
print(threshold)

I defined a function that only keeps the tokens which are not in the custom_stopwords list

In [None]:
high_freq_tokens = (exact_counts.filter(col("count") > threshold).select("token")
      .rdd.flatMap(lambda x: x)
      .collect()
)

In [None]:
high_freq_tokens

In [None]:
def remove_custom_stopwords(tokens):
    return [t for t in tokens if t not in high_freq_tokens] 

remove_udf = udf(remove_custom_stopwords, ArrayType(StringType()))

tokenized_sample_2 = tokenized_rw_nsw_nn_sample.withColumn(
    "tokens_clean",
    remove_udf(col("tokens/nostopwords/nonumbers"))
)

Filter out the tokens that are in the stopwords list.

In [None]:
from pyspark.sql.functions import size
df_safe = tokenized_sample_2.filter(col("review/text").isNotNull())

empty_tokens_count = df_safe.filter(
    (size(col("tokens_clean")) == 0) |
    (col("tokens_clean").isNull())
).count()

print(f"Number of records with empty tokens_clean: {empty_tokens_count}")

Sparks does not work well with empty lists, so we need to filter out the rows with empty lists. We can do this by using the size() and isNull() function to check if the list is empty or null

In [None]:
cv = CountVectorizer(
    inputCol="tokens_clean",
    outputCol="features",
    vocabSize=5000,
    minDF=10
)

model2 = cv.fit(df_safe)
vocab2 = model2.vocabulary


In [None]:
top30 = model2.vocabulary[:30]
print("Top 30 tokens (approx. by frequency):")
for i, token in enumerate(top30, start=1):
    print(f"{i:2d}. {token}")

We verified that the top words are now different, since the previous one got correctly filtered out

## Local Sensitive Hashing

Since the project must be scalable, we can't compute the Jaccard similarity on the whole dataset. Instead, we can use Local Sensitive Hashing (LSH) to compute the Jaccard similarity in a distributed way. LSH is a method that hashes similar input items into the same "buckets" with high probability. It is used to find similar items in large datasets efficiently.
You can find more informations about it in the book "Mining of Massive Datasets" by Anand Rajaraman and Jeffrey, free to download at http://infolab.stanford.edu/~ullman/mmds/book.pdf (Section 3.3.4 and  3.4.1).


In [None]:
sim_threshold = 0.6 # You can change it at your will

In [None]:
df_safe.show(10)

In [None]:
# Filter out rows where tokens_clean is null or empty
df_nonempty = df_safe.filter((df_safe.tokens_clean.isNotNull()) & (size(df_safe.tokens_clean) > 0))

In [None]:
from pyspark.ml.feature import HashingTF

hashingTF = HashingTF(inputCol="tokens_clean", outputCol="features", numFeatures= 8192) 
df_featurized = hashingTF.transform(df_nonempty) 

Here https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.HashingTF.html it suggested to use 2^n as numFeatures, where n is the number of bits. I used 8192, since is the first power of 2 which is greater than 5000, the size of the vocabulary. This is important, since we want to avoid collisions in the hashing process.

In [None]:
df_featurized.show(10)

In [None]:
from pyspark.ml.feature import MinHashLSH

mh = MinHashLSH(inputCol="features", outputCol="hashes", numHashTables=10) # You can change the number of hash tables at your will
model = mh.fit(df_featurized)

The column "hashes" contains the hashed values of the features. The numHashTables parameter is the number of hash tables to use. The more hash tables we use, the more accurate the results will be, but it will also take more time to compute. I used 2 hash tables, since it is a good trade-off between accuracy and performance.

In [None]:
similar_pairs = model.approxSimilarityJoin(
    df_featurized, df_featurized, threshold=sim_threshold, distCol="JaccardDistance"
).filter("datasetA.Id < datasetB.Id")  # to avoid duplicate/reverse pairs

similar_pairs = similar_pairs.orderBy("JaccardDistance", ascending=False)

In [None]:
similar_pairs.select(
    "datasetA.Title", "datasetB.Title", "datasetA.Id", "datasetB.Id", "datasetA.review/text", "datasetB.review/text", "JaccardDistance"
).show(2, truncate=False)

In [None]:
first_pair = similar_pairs.first()

In [None]:
# Accessing the review text of the first similar pair
review_a = first_pair['datasetA']['review/text']
review_b = first_pair['datasetB']['review/text']

# Printing the reviews
print("Review 1 (datasetA):")
print(review_a)
print("\nReview 2 (datasetB):")
print(review_b)

It looks like it works, since the two review share the words "very happy with purchase"

In [None]:
# Print the text/review of the first similar pair   
similar_pairs.select("datasetA.review/text", "datasetB.review/text").show(2, truncate=False)