<a href="https://colab.research.google.com/github/MarcoColan01/algo4massivedatasets/blob/main/AMD2425Colangelo.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Finding similar reviews using Jaccard Similarity, Min-Hashing and Locality-Sensitive Hashing (LSH)**

This notebook contains the code written for the final project of the Algorithms for Massive Dataset course. Each code cell contains a short text cell preceding it that explains which step was implemented, especially regarding the parameters chosen and any implementation choices. Further details can be found in the report associated with this project.


In [None]:
#!pip install kaggle
#!pip install pyspark

# Download of the dataset using Kaggle's API

In [None]:
import pyspark
from pyspark.sql import functions as F
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import os
import re
import time
import math
from google.colab import files
from collections import Counter
from pyspark.storagelevel import StorageLevel

files.upload()
!mkdir -p ~/.kaggle
!cp kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json
!kaggle datasets list

!kaggle datasets download -d "mohamedbakhet/amazon-books-reviews" -p .
!unzip -o amazon-books-reviews.zip
!rm -f amazon-books-reviews.zip

start_time = time.time()
spark = pyspark.sql.SparkSession.builder.master("local[*]").appName("find_similar_reviews").getOrCreate()
sc = spark.sparkContext

books_rating_df = spark.read.csv(
    "Books_rating.csv",
    header=True,
    inferSchema=True,
    multiLine=True,
    escape="\""
)

Saving kaggle.json to kaggle.json
ref                                                              title                                                    size  lastUpdated                 downloadCount  voteCount  usabilityRating  
---------------------------------------------------------------  -------------------------------------------------  ----------  --------------------------  -------------  ---------  ---------------  
yashdevladdha/uber-ride-analytics-dashboard                      Uber Data Analytics Dashboard                        17324552  2025-08-08 11:13:42.920000          45479       1010  1.0              
mdsultanulislamovi/student-stress-monitoring-datasets            Student Stress Monitoring Datasets                      24336  2025-07-24 16:30:01.617000          25180        425  1.0              
msnbehdani/mock-dataset-of-second-hand-car-sales                 Car Sales Dataset: Model, Features, and Pricing        501188  2025-08-20 17:47:58.207000           2

# First step: sample generation

The Amazon Review Book dataset consists of two tables:
- books_rating: which contains information for approximately 3 million reviews of books
- books_data: which contains information for approximately 212000 books

**The task is to find similar reviews using Jaccard Similarity and a combination of MinHashing and LSH to reduce the computational complexity.**

in particular, the procedure that has been implemented is the following:
1. Pick a value of k and $\forall$ document construct a set of k-shingles.
2. Pick a length n for the Min-Hash signatures, and construct the signature matrix having dimensions $n \times D$, where D is the number of reviews.
3. Choose a threshold t that defines how similar two documents have to be in order for them t be considered as "candidates" for the actual similarity computation. Then, pick the values of b (bands) and r (rows within a band) such that $br=n$ and $t \approx (\frac{1}{b})^\frac{1}{r}$.
4. Construct the candidate pairs by applying LSH.
5. Examine each candidate pair's signatures and determine whether the fraction of components in which they agree is at least t.

For the work performed, only the *books_rating* dataset was considered because it contains information on reviews.
After downloading the dataset using Kaggle, books_rating.csv is available, which consists of 10 columns. At this point, the first steps are to generate a reasonably sized sample (since we don't have a real execution cluster) and retain only the columns necessary to perform the work. The sample is generated randomly by taking 30000 reviews, and the following columns are extracted from this sample:
- ID: the book ID
- Title: the book title
- User_ID: the id of the user who wrote the review.
- review/text: the entire text of the review

A filter was then applied to discard any reviews with null values ​​or empty ones.

In [None]:
total_time = time.time()
sample_size = 30000
books_rating_df.printSchema()

#Selecting only the columns needed
base_df = (
    books_rating_df
    .select("Id", "User_id", "Title", "review/text")
    .filter(
        F.col("Id").isNotNull() &
        F.col("User_id").isNotNull() &
        F.col("review/text").isNotNull() &
        (F.length(F.col("review/text")) > 0)
    )
)
#t0 = time.time()
total = base_df.count()
frac = min(1.0, (sample_size / max(total, 1)) * 1.20)

#creation of the subsample
sampled_df = (
    base_df
    .sample(withReplacement=False, fraction=frac)
    .limit(sample_size)
    .persist(StorageLevel.MEMORY_AND_DISK)
)

sampled_df.show(10, truncate=False)

root
 |-- Id: string (nullable = true)
 |-- Title: string (nullable = true)
 |-- Price: double (nullable = true)
 |-- User_id: string (nullable = true)
 |-- profileName: string (nullable = true)
 |-- review/helpfulness: string (nullable = true)
 |-- review/score: double (nullable = true)
 |-- review/time: integer (nullable = true)
 |-- review/summary: string (nullable = true)
 |-- review/text: string (nullable = true)

+----------+--------------+-------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

# Second step: normalization and shingling of the text

After the generation of the text, the next step is to apply a normalization of the text (the review/text column) and create the shingles for each review.

A shingle is a k-gram (or k-set) of atomic information, such that the individual characters. For the work done, the choice of the parameter k has been 5 since there are both rather short and rather long reviews, and choosing an ideal k parameter for very long texts (for example k=9) would have instead penalized the shorter texts, so k=5 is a good compromise.

Speaking instead of the text normalization phase, this one has involved two different steps:
- Converting the entire text to lowercase
- Removing punctuation marks and non-alphanumeric characters, including spaces.

Another step performed was the elimination of stop words from the review's text, because such words are very frequent and not much informative.

In [None]:
whitespaces = re.compile(r"\s+")
notalphachars = re.compile(r"[^a-z0-9]+")

#set containing the most common stopwords
stopwords = {
    "a","an","the","and","or","but","if","then","else","when","while","for","to","of","in","on","at","by","from",
    "as","with","about","into","over","after","before","between","through","during","without","within","against",
    "is","am","are","was","were","be","been","being","do","does","did","doing","have","has","had","having",
    "can","could","will","would","shall","should","may","might","must",
    "i","you","he","she","it","we","they","me","him","her","us","them","my","your","his","her","its","our","their",
    "this","that","these","those","there","here","where","who","whom","which","what","why","how",
    "not","no","nor","only","own","same","so","than","too","very","just","also","once","again","ever","never",
    "all","any","both","each","few","more","most","other","some","such","many","much","less","least","lot","lots",
    "up","down","out","off","over","under","above","below","into","onto","back","forward",
    "dont","doesnt","didnt","cant","couldnt","wont","wouldnt","shouldnt","isnt","arent","wasnt","werent","havent","hasnt","hadnt",
}

'''
This function takes the text of a review and removes from it non-alphanumeric characters,
multiple whitespaces and stopwords. It then returns the normalizes text
Params: text (string)
Returns: string
'''
def normalize_text(text: str) -> str:
    if text is None:
        return ""
    t = text.lower()
    t = notalphachars.sub(" ", t)
    t = whitespaces.sub(" ", t).strip()
    if not t:
        return ""
    tokens = t.split(" ")
    tokens = [token for token in tokens if token and token not in stopwords]
    if not tokens:
        return ""
    return " ".join(tokens)

#this RDD contains couples (K, normalized_text) where the key K is (book_id, user_id)
normalized_rdd = (
    sampled_df
    .select(F.col("Id"), F.col("User_ID"), F.col("review/text"))
    .rdd
    .mapPartitions(lambda it: (
        ((row["Id"], row["User_ID"]), normalize_text(row["review/text"]))
        for row in it
    ))
    .persist(StorageLevel.MEMORY_AND_DISK)
)

In addition to generating the shingles, in this step the generated shingles are also encoded using 64-bit encoding.

In [None]:
shingles_size = 5
encode_offset = 1469598103934665603
encode_prime = 1099511628211

'''
This function takes the text of a review and returns its 64 bit encode
Params: text (string)
Returns: int
'''
def encode_text(text: str) -> int:
    encode = encode_offset
    for char in text:
        encode ^= ord(char)
        encode = (encode * encode_prime) & ((1 << 64) - 1)
    return encode

'''
This function takes the normalized text of a review and the size of the shingles
and returns the list of its k-shingles
Params: text_norm (string), k (int)
Returns: list (string)
'''
def generate_encoded_shingles(text_norm: str, k: int):
    text_length = len(text_norm)
    if text_length < k:
        return
    for i in range(text_length - k + 1):
        yield encode_text(text_norm[i:i+k])

#this RDD contains couples (K, hashed_shingles) where K = (book_id, user_id)
#and hashed_shingles is a set containing the 64 bit encoded shingles of the
#normalized text of a review
shingles_rdd = (
    normalized_rdd
    .mapPartitions(lambda it: (
        (K, set(generate_encoded_shingles(text_norm, shingles_size)))
        for K, text_norm in it
    ))
    .persist(StorageLevel.MEMORY_AND_DISK)
)

# Third step: Min-Hashing
Min-hashing is a data compression method that allows us to efficiently estimate Jaccard similarity between sets without having to work directly with the full sets or characteristic matrix.

In this work, H = 128 independent hash functions has been defined and each of them is in the form of

$h_i(x) = (a_i \cdot x + b_i) \text{ mod } p$

where a and b are two random values, x is the encode of a shingle and p is a prime number.

The signature matrix has been constructed exploiting RDDs: each row

In [None]:
minhash_h = 128

nonempty_rdd = shingles_rdd.filter(lambda x: len(x[1]) > 0).persist(StorageLevel.MEMORY_AND_DISK)

rng = np.random.RandomState(1337)
a_params = rng.randint(1, ((1 << 61) - 1), size=minhash_h, dtype=np.uint64)
b_params = rng.randint(0, ((1 << 61) - 1), size=minhash_h, dtype=np.uint64)

params = sc.broadcast((a_params, b_params, np.uint64((1 << 61) - 1), minhash_h))


'''
This function takes as input an iterator of tuples in the form (k, shingle_set)
where
  - k = is the composed key of a review (book_id, user_id)
  - shingle_set = is the list of shingles of a review
and calculate the corresponding minhash signature for a review.
Params: iter_rows(iterator)
Returns:
'''
def minhash(shingles_iterator):
    a, b, p, minhash_h = params.value
    for K, shingle_set in shingles_iterator:
        signature = np.full(minhash_h, p - 1, dtype=np.uint64)
        for i in shingle_set:
            h_i = (a * np.uint64(i) + b) % p
            signature = np.minimum(signature, h_i)
        yield (K, signature)

#this RDD contains couples (K, signature) where K = (book_id, user_id) and
#signature is the minhash signature for a specific review
minhash_rdd = nonempty_rdd.mapPartitions(minhash).persist(StorageLevel.MEMORY_AND_DISK)

# Fourth step: Locality-Sensitive Hashing

To further reduce the computational cost, LSH technique has been implemented. LSH (Locality-Sensitive Hashing) allows to drastically reduce the number of documents pairs (in this case, reviews) to compare, only selecting a subset of candidates that are highly likely to be similar.

This technique divides the signature matrix into b bands of r rows each: for this work, the parameters chosen has been $b=32$ and $r=4$. The chosen minimum similarity threshold was around $0.4$, according to this simple system

$
\begin{cases}
br=n \\ t \approx (\frac{1}{b})^\frac{1}{r}
\end{cases}
$

where n is the lenght of the MinHashing signatures, which in this case is 128.

In [None]:
lsh_bands = 32
lsh_rows = 4

max_bucket_size = 2000   #to avoid very large buckets
n_partitions = min(minhash_rdd.getNumPartitions() * 2, 1024)

'''
This function takes as input the subsignature of a minhash signature and
returns a 64 bit bucket identificator
Params: subsig (array)
Returns: int
'''
def generate_bucket_id(subsig):
    acc = encode_offset
    for v in subsig:
        acc ^= (int(v) & ((1 << 64) - 1))
        acc = (acc * encode_prime) & ((1 << 64) - 1)
    return acc

'''
This function takes as input an iterator of tuples in the form (k, signature)
where
  - k = is the composed key of a document (book_id, user_id)
  - signature = minhash signature of lenght H
and returns a list of tuples (bucket_key, document_id)
Params: iterator ((book_id, user_id), signature)
Returns: list (bucket_id, document_id)
'''
def emit_band_keys(iter_rows):
    for K, signature in iter_rows:
        for band_id in range(lsh_bands):
            start = band_id * lsh_rows
            subsig = signature[start:start+lsh_rows]
            key = (band_id, generate_bucket_id(subsig))
            yield (key, K)

#this RDD contains couples ()
pairs_by_band = (
    minhash_rdd
    .mapPartitions(emit_band_keys)
    .partitionBy(n_partitions)
    .persist(StorageLevel.MEMORY_AND_DISK)
)

bucket_docs_rdd = (
    pairs_by_band
    .combineByKey(
        lambda v: [v],
        lambda acc, v: (acc.append(v) or acc),
        lambda a, b: (a.extend(b) or a),
        numPartitions=n_partitions
    )
    .persist(StorageLevel.MEMORY_AND_DISK)
)

'''
This function takes as input a list of documents that shares the same bucket
and returns the sorted list of candidate pairs
'''
def pairs_from_ids(ids_list):
    sorted_pairs = sorted(set(ids_list))
    if len(sorted_pairs) < 2:
        return
    m = len(sorted_pairs) if len(sorted_pairs) <= max_bucket_size else max_bucket_size
    for i in range(m):
        a = sorted_pairs[i]
        for j in range(i+1, m):
            b = sorted_pairs[j]
            yield (a, b)

candidate_pairs_rdd = (
    bucket_docs_rdd
    .flatMap(lambda kv: pairs_from_ids(kv[1]))
    .persist(StorageLevel.MEMORY_AND_DISK)
)

unique_candidate_pairs_rdd = (
    candidate_pairs_rdd
    .distinct()
    .persist(StorageLevel.MEMORY_AND_DISK)
)

pairs_by_band.unpersist()
candidate_pairs_rdd.unpersist()


PythonRDD[37] at RDD at PythonRDD.scala:53

In [None]:
output_file = "similar_reviews.txt"
eps = 1e-12
min_similarity = 0.42

#Extracts the IDs of the candidate pairs and transforms them into key-value pairs
#for a join
app = unique_candidate_pairs_rdd.flatMap(lambda ab: [ab[0], ab[1]]).distinct()
pairs_ids = app.map(lambda k: (k, None))


needed_sigs_rdd = (
    minhash_rdd
    .join(pairs_ids)
    .mapValues(lambda v: v[0])
)

pairs_A = unique_candidate_pairs_rdd.map(lambda ab: (ab[0], ('A', ab)))
pairs_B = unique_candidate_pairs_rdd.map(lambda ab: (ab[1], ('B', ab)))


parts = (
    pairs_A.union(pairs_B)
    .join(needed_sigs_rdd)
    .map(lambda kv: (kv[1][0][1], (kv[1][0][0], kv[1][1])))
    .combineByKey(
        lambda v: {'A': v[1]} if v[0]=='A' else {'B': v[1]},
        lambda acc, v: (acc.update({'A': v[1]}) or acc) if v[0]=='A' else (acc.update({'B': v[1]}) or acc),
        lambda a, b: (a.update(b) or a)
    )
)

def jaccard_evaluate(kv):
    (A, B), d = kv
    signature_A, signature_B = d['A'], d['B']
    if signature_A is None or signature_B is None:
        return None
    matches = int(np.sum(signature_A == signature_B))
    jhat = matches / minhash_h
    if (jhat + eps) >= min_similarity and (1.0 - jhat) > eps:
        return (A, B, jhat)
    return None

jaccard_hits_rdd = parts.map(jaccard_evaluate).filter(lambda x: x is not None)

final_pairs = jaccard_hits_rdd.collect()

reviews_keys_set = {(str(A[0]), str(A[1])) for A, B, _ in final_pairs} | \
          {(str(B[0]), str(B[1])) for A, B, _ in final_pairs}
reviews_keys_df = spark.createDataFrame(list(reviews_keys_set), ["Id", "User_id"])

reviews_rdd = (
        sampled_df
        .select(F.col("Id"),
                F.col("User_id"),
                F.col("review/text"))
        .join(F.broadcast(reviews_keys_df), on=["Id", "User_id"], how="inner")
        .rdd.map(lambda r: ((r["Id"], r["User_id"]), r["review/text"]))
    )
results = dict(reviews_rdd.collect())

#Writing of the results in a txt file
with open(output_file, "w", encoding="utf-8") as f:
      for (review_A, review_B, jaccard) in final_pairs:
          review_A_key = (str(review_A[0]), str(review_A[1]))
          review_B_key = (str(review_B[0]), str(review_B[1]))
          review_A_text = results.get(review_A_key, "")
          review_B_text = results.get(review_B_key, "")
          f.write(f"Review 1: {review_A_text}\n")
          f.write(f"Review 2: {review_B_text}\n")
          f.write(f"Jaccard similarity: {jaccard:.4f}\n")
          f.write("\n" + "="*80 + "\n\n")

#Total execution time from just after downloading the dataset using Kaggle
print(f"Total time (in seconds): {time.time()-start_time} seconds")

Total time (in seconds): 318.41648173332214 seconds
