# This is my (Antonio Focella) attempt at Project 1: "Finding similar items"

The task is to implement a detector of pairs of similar book reviews
The idea behind this task is that the computational complexity of finding pairs of similar documents in domains like e-commerce require algorithms that go beyond naive comparisons and can operate as scale.

As far as I know this solution is not based on one pulished on Kaggle or on other preexisting project, but I did follow the theory seen in the course as close as I could so I don't expect my project to be particularly original.

I wil mainly employ Shingling, MinHashing, and Locality-Sensitive Hashing (LSH). Such a pipeline allows to reduce the problem from O(N^2) to O(N).

This codes take about an hour to execute using 2 CPU cores.

I start by installing the dependencies and Spark features I'll use for the project

In [1]:
#IMPORTANT: Remeber to insert Kaggle credentials

In [2]:
#libraries

!pip install -q pyspark findspark

#standard imports
import os
import pandas as pd
import numpy as np
import re
from itertools import combinations
from tqdm import tqdm
import random
from functools import reduce

#Spark imports
import findspark
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, lower, col, length, size, least
from pyspark.sql.types import ArrayType, StringType
from pyspark.ml.feature import HashingTF, MinHashLSH
from pyspark.ml.functions import vector_to_array
from pyspark.storagelevel import StorageLevel


Quick hardware check.

It looks like I have two cores to work with.

In [3]:
#print the number of CPU cores available
print(os.cpu_count())

2


We have a dataset of Amazon book reviews, so we are working with language and variable length.

We need to identify pairs that are very similar in a scalable way. Checking every pair would be prohibitively slow (O(N^2) pairs).

To overcome this limitation I will use the aforementioned techniques to compress each review into a compact signature that preserves similarity while using hashing to find likely pair without resorting to brute-force.

First I Download the dataset from Kaggle using our user and key (write your own User and Key!)

In [4]:
###-WRITE YOUR CREDENTIALS-###

os.environ["KAGGLE_USERNAME"] = "Antonio Focella"
os.environ["KAGGLE_KEY"] = "arciduca"

###-WRITE YOUR CREDENTIALS-###

#I use the credentials above to download the dataset from Kaggle
!kaggle datasets download -d mohamedbakhet/amazon-books-reviews


Dataset URL: https://www.kaggle.com/datasets/mohamedbakhet/amazon-books-reviews
License(s): CC0-1.0
Downloading amazon-books-reviews.zip to /content
 97% 1.03G/1.06G [00:11<00:00, 86.5MB/s]
100% 1.06G/1.06G [00:11<00:00, 99.7MB/s]


I unzip and check what files are in the current working directory

In [5]:
#check data
!ls

#unzip dataset so spark can read it
!unzip -q amazon-books-reviews.zip

#confirm they have been extracted
!ls



amazon-books-reviews.zip  sample_data
amazon-books-reviews.zip  books_data.csv  Books_rating.csv  sample_data


I will use spark for this project, as it allows to handle massive operations. Here I create a new session and make sure I use all CPU cores.

In [6]:
#initialize findsaprk
findspark.init()

#stop already existing Spark sessions to avoid issues. If nothing defined, nothing to stop
try:
    spark.stop()
except NameError:
    pass

#create new Spark session, this should be configured to use all CPU cores (I think). Using 200 partitons as it's a big dataset
#We simulate a distributed cluster, the Map and Reduce operations are parallelized across the cores
#will need it for LSH which involves hashing millions of items independently
spark = (SparkSession.builder
         .master("local[*]")
         .config("spark.driver.memory", "8g") #6g better, 8g I think is the limit, I need 8 if I want to cache memory with 2^17 features but you might get kernel restarts
         .config("spark.sql.adaptive.enabled", "true")
         .config("spark.sql.adaptive.coalescePartitions.enabled", "true")
         .getOrCreate())

#I really want that my CPU remains fully utilized, but maybe should drop this
cores = spark.sparkContext.defaultParallelism
spark.conf.set("spark.sql.shuffle.partitions", str(cores * 4))

#print session info
spark

Let's read the CSV into a Spark dataframe and check the data, I rename column id (which originally is book id and not review id, so a bit misleading) and create a new unique Id variable, less confusing.

Here I can take just a subsample to work faster with the code, if necessary.

In [7]:
reviews_df = spark.read.csv("Books_rating.csv", header=True, escape="\"", quote="\"", multiLine=True)

#if too slow take just a fraction of reviews
reviews_df = reviews_df.limit(100000)

#I rename the Id column and create a new one cause original dataset is confusing
reviews_df = reviews_df.withColumnRenamed("Id", "book_id")
reviews_df = reviews_df.withColumn("id", pyspark.sql.functions.monotonically_increasing_id())

#print first rows
for row in reviews_df.take(5):
    print(f"review: {row[0]}: {row[4]}\n")

#also in tabluar form for quick human check
reviews_df.show(5)

review: 1882931173: Jim of Oz "jim-of-oz"

review: 0826414346: Kevin Killian

review: 0826414346: John Granger

review: 0826414346: Roy E. Perry "amateur philosopher"

review: 0826414346: D. H. Richards "ninthwavestore"

+----------+--------------------+-----+--------------+--------------------+------------------+------------+-----------+--------------------+--------------------+---+
|   book_id|               Title|Price|       User_id|         profileName|review/helpfulness|review/score|review/time|      review/summary|         review/text| id|
+----------+--------------------+-----+--------------+--------------------+------------------+------------+-----------+--------------------+--------------------+---+
|1882931173|Its Only Art If I...| NULL| AVCGYZL8FQQTD|Jim of Oz "jim-of...|               7/7|         4.0|  940636800|Nice collection o...|This is only for ...|  0|
|0826414346|Dr. Seuss: Americ...| NULL|A30TK6U7DNS82R|       Kevin Killian|             10/10|         5.0| 1095724

I write a function to generate 10-shingles from a text string (I initially tryed with k=5 but it appears to be too permissive as more unrelated documents share many shingles and it slows down the code).

A "shingle" is a substring of length k found within the document. This transformation is crucial as it converts the "similarity" problem in a "set intersection" problem, which we can handle.

I use sets so that each list has unique shingles, consistent with Jaccard's set definition. I then apply the function to our data.I also filter for empty reviews and for those shorter than the shingle length we chose.
This solution appears to be fast enough and feasible

In [8]:
#generate shingles of desired length from input text. If no text returns an empty list, also lowercases the text to make shingling case insensitive
shin_len = 10 #I think 10 is the upper bound, more implies shingles become whole sentences

def generate_shingles(text):
  if text is None:
    return []

  s = text.lower() #force lowercase
  shingles = set()

  for i in range(len(s) - shin_len + 1):
    shingles.add(s[i:i+shin_len]) #I "slide a window" of length k across the string

  return list(shingles) #notice I am converting from a set to a list, removes duplicates

#I create a UDF that applies the function above to a string column, returning the shingles
shingle_udf = udf(generate_shingles, ArrayType(StringType()))

#remove null & obviously too short text
reviews_df = reviews_df.filter(col("review/text").isNotNull()).filter(length(col("review/text")) >= shin_len)

#create a new column "shingles" by applying the shingling UDF to the reviews text
reviews_df = reviews_df.withColumn("shingles", shingle_udf(col("review/text")))

#display the first rows
reviews_df.select("shingles").show(5)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

ALTERNATIVE

I tried this alternative in Spark without a Python UDF but it appears to be much slower. I keep it as legacy code (I am not sure why it's slower, would have expected the opposite to be true).

In [9]:
# #generate shingles of desired length from input text. If no text returns an empty list, also lowercases the text to make shingling case insensitive
# shin_len = 10

# #generate shingles of desired length from input text. If no text returns an empty list, also lowercases the text to make shingling case insensitive
# text_col = pyspark.sql.functions.col("`review/text`") #'' needed because the column name has "/"
# s = pyspark.sql.functions.lower(text_col)
# n = pyspark.sql.functions.length(s)

# #build a list of starting positions for each possible k-character substring
# positions = pyspark.sql.functions.sequence(pyspark.sql.functions.lit(1), n - pyspark.sql.functions.lit(shin_len) + pyspark.sql.functions.lit(1))

# #remove duplicates
# shingles = pyspark.sql.functions.transform(positions, lambda i: pyspark.sql.functions.substring(s, i, shin_len))

# #prepare an empty array of strings, it darws from this if the text is too short
# empty_arr = pyspark.sql.functions.expr("array()").cast("array<string>")

# #if text is long enough, store it
# reviews_df = reviews_df.withColumn("shingles", pyspark.sql.functions.when(n >= shin_len, shingles).otherwise(empty_arr))

# reviews_df.select("shingles").show(5, truncate=False)

The set of all possible k-shingles in the English language is immense, so I must reduce the dimesnion of the "space" I work with.

The next step is thus to create a MinHash signature for each set.
MinHashing is a technique to compress a set into a small signature (i.e. a vector of integers) and simulate randomness instead of actual permutations.
The key property of MinHash is that the fraction of components in which two signatures "agree" provides us with an unbiased estimate of Jaccard similarity.

Spark has a built-in MinHash functionality to handle this efficiently, but before I need to convert each review's shingle set into a binary fearue vector, which I do using hashingTF (with "binary" option on as to adhere to what we saw in the course and to reduce work/memory).

In [10]:
#I use HashingTF to convert shingles to a sparse binary feature vectors

num_dim = 1<<20 #this is 2^20 features. The average number of shingles is about 1000 so should be ok, low enough collisions. If you want to chace you have to decrease (in my case to 2^18 using 8g memory)

#each shingle is hashed into an index in [0,numFeatures] in the feature vector
hashingTF = HashingTF(inputCol="shingles", outputCol="features", numFeatures=num_dim, binary=True) #here important to set binary=True

#I filter out rows where the 'shingles' array is empty (This prevents exceptions in MinHashLSH for empty feature vectors) before transforming
#I also keep only the columns needed downstream to speed up the code
features_df = hashingTF.transform(reviews_df.filter(size(col("shingles")) > 0)).select("id","features")

#I store and materialize the cache (because features_df is reused)
#features_df = features_df.persist(StorageLevel.MEMORY_AND_DISK)
#materialize_cache = features_df.count()

As expected each vector is very sparse

The MinHash LSH model hashes each vector into a smaller signature and use LSH to find candidate pairs that are likely to have Jaccard distance below a certain threshold.

The mh.fit model "trains" the LSH model on the data, computing the MinHash signature for each feature vector and preparing the hash tabels, while transform calculates the signatures by taking the (sparse) feature vector and applying the n hash functions.

Every book review is now represented by an array of n integers.

Since I want a similarity of 0.8 and we know that the "threshold" of banding is rougnly (1/b)^(1/r), I set the number of Hash Tables accordingly (I use low values for b and r as I need a very efficient code having just 2 CPU cores).

In [11]:
#signature length. If we want to be more precise we can increase the number of Hash Tables, but that would slow us down (can also use 6&8)
#I experimented a bit and noticed I don't need too long signature length to get accurate results, and using low values improves speed a lot
#as the error should be proportional to 1/sqrt(n), using 48 minhash values is a bit low (if true Jaccard similarity is 0.5, StD is 0.07)
b = 6 #number of bands
r = 16 #rows per band
n_sig = b*r #total number of minhash values per row (so signature length I use at this stage)

#this is necessary to avoid the code becoming too slow

#I use MinHash LSH model to simulate permutations, adding the output column for hash keys
mh = MinHashLSH(inputCol="features", outputCol="hashes", numHashTables=n_sig)
model = mh.fit(features_df) #this step prepares the random coefficients

#I compute the MinHash signature for the features (hashes is an array of 1D vectors so convert to an array for later)
sig = model.transform(features_df).select("id", "hashes").select("id", pyspark.sql.functions.transform("hashes", lambda v: vector_to_array(v)[0].cast("long")).alias("sig")).persist(StorageLevel.MEMORY_AND_DISK)
materialize_cache_2 = sig.count()



We still have the problem of finding pairs. To focus on pairs likely to be similar I use Locality-Sensitive Hashing (LSH).

The idea is to divide the signature matrix into bands, and for each band to hash the portion of the signature to a bucket. Two reviews are a candidate pair if they hash to the same bucket in at least one band.

BandKey is a hash that represents the content of the band. if two reviews have identical values in one band they will end up in the same bucket for that band.

I set a filter for too large buckets in order to improve performance (if a bucket has k items it might generate k^2 pairings).

In [12]:
#split signatures into b bands of size r, hash each band to a bandKey
#I basically tak sig, convert into a single string, slice it, use xxhash64 to hash, and this is my "bucket id" for that band
band_rows = []
for band in range(b):
    start = band * r + 1  #if band=0 start is 1 (Spark slice is 1 based not 0 based it appears...), if band=1 start is r+1 ecc
    band_rows.append(
        sig.select(
            "id",
            pyspark.sql.functions.lit(band).alias("band"), #add column to store the band index
            pyspark.sql.functions.xxhash64(pyspark.sql.functions.concat_ws("_", pyspark.sql.functions.slice("sig", start, r))).alias("bandKey")
        )
)


#union all rows (id, band, bandKey), each review appears b times (once per band) with a different bandKey each time
#then repartition (re-shuffle) so that rows with the same band & bandKey end up together, next operations should be faster
buckets = reduce(lambda a, d: a.unionByName(d), band_rows).repartition(spark.sparkContext.defaultParallelism * 4, "band", "bandKey")


#candidate pairs are those sharing a bandKey in any band. Since a few large buckets can slow down the code a lot, I add a bucket-size filter to drop huge buckets
bucket_sizes = buckets.groupBy("band", "bandKey").count() #count how many reviews are in each bucket
MAX_BUCKET = 200  #lower = faster, but we may miss some true matches. If more than 200 I drop it
buckets_small = (buckets.join(bucket_sizes.filter(pyspark.sql.functions.col("count") <= MAX_BUCKET), on=["band", "bandKey"],how="inner"))
buckets=buckets_small #here I substitute if I want to speed up the code

I now remove pairs where idA < idB to remove duplicates and self-matches and keep only couples where Jaccard similarity is above the chosen threshold, and I check a sample of results

In [13]:
#generate candidate pairs: join the table with itself is same band and bandKey, remove self-pairs & duplicates
cand = (buckets.alias("a")
        .join(buckets.alias("b"), on=["band", "bandKey"])
        .where(pyspark.sql.functions.col("a.id") < pyspark.sql.functions.col("b.id")) #remove duplicate
        .select(pyspark.sql.functions.col("a.id").alias("idA"), pyspark.sql.functions.col("b.id").alias("idB")) #keep only id of the pairs
        .distinct()) #want to make sure we don't have duplicates

In [14]:
#join back the corresponding signatures so we can estimate their Jaccard similarity by comparing signature components
#I select the useful output columns: first id, second id, first MinHash signature, second MinHash signature
pairs = (cand
         .join(sig.alias("sa"), cand.idA == pyspark.sql.functions.col("sa.id"))
         .join(sig.alias("sb"), cand.idB == pyspark.sql.functions.col("sb.id"))
         .select("idA", "idB", pyspark.sql.functions.col("sa.sig").alias("sigA"), pyspark.sql.functions.col("sb.sig").alias("sigB")))



In [15]:
#here we compute a MinHash based estimate of Jaccard similarity between the two reviews in each candidate pair
pairs = pairs.withColumn(
    "jacc_est", #create new column
    (pyspark.sql.functions.aggregate( #compare sigA and sigB element-by element and produce an array of 1/0
        pyspark.sql.functions.zip_with("sigA", "sigB", lambda x, y: pyspark.sql.functions.when(x == y, pyspark.sql.functions.lit(1)).otherwise(pyspark.sql.functions.lit(0))),
        pyspark.sql.functions.lit(0), #start from 0
        lambda acc, x: acc + x #accunulate the ones
    ) / pyspark.sql.functions.size(pyspark.sql.functions.col("sigA")) #divide by signature length
    )
  )

Finally, I keep only pairs whose estimated Jaccard similarity is above the chosen threshold.

We have achiedeved the near-duplicate detection goal.

In [16]:
threshold=0.9 #change this if you want

similar = pairs.where(pyspark.sql.functions.col("jacc_est") >= threshold).select("idA", "idB", "jacc_est")

In [17]:
#quick check
similar.show(10, truncate=False)

+----+----+--------+
|idA |idB |jacc_est|
+----+----+--------+
|262 |267 |1.0     |
|666 |667 |0.85    |
|2761|2766|1.0     |
|3600|3608|1.0     |
|4427|4439|1.0     |
|4790|4796|1.0     |
|5891|5892|0.8875  |
|6250|6514|1.0     |
|4293|6636|1.0     |
|4299|6642|1.0     |
+----+----+--------+
only showing top 10 rows


Coalesce allows to get a single CSV file

In [18]:
#similar.write.mode("overwrite").option("header", True).csv("similar_pairs_out")
similar.coalesce(1).write.csv("similar_review_pairs.csv", header=True, mode="overwrite")

If needed we can check whether specific reviews are actually similar

In [20]:
#If needed, just to check some reviews are actually the same
selected = reviews_df.where("id IN (5891, 5892)")
selected.show(truncate=False)

+----------+-----------------+-----+--------------+---------------------------------+------------------+------------+-----------+--------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

A quick human check confirms that the pairs are indeed very similar.
The solution scales to large datasets because shingling, hashing, and joining are done in a distributed manner by Spark across the cluster

I declare that this material, which I now submit for assessment, is entirely my own work and has not been taken from the work of others, save and to the extent that such work has been cited and acknowledged within the text of my work, and including any code produced using generative AI systems. I/ understand that plagiarism, collusion, and copying are grave and serious offences in the university and accept the penalties that would be imposed should I engage in plagiarism, collusion or copying. This assignment, or any part of it, has not been previously submitted by me or any other person for assessment on this or any other course of study