In [None]:
!pip install kagglehub pyspark

Collecting datasketch
  Downloading datasketch-1.6.5-py3-none-any.whl.metadata (5.8 kB)
Downloading datasketch-1.6.5-py3-none-any.whl (89 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m89.2/89.2 kB[0m [31m2.3 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: datasketch
Successfully installed datasketch-1.6.5


In [None]:
import kagglehub

path = kagglehub.dataset_download("mohamedbakhet/amazon-books-reviews")

print("Path to dataset files:", path)

Path to dataset files: /kaggle/input/amazon-books-reviews


In [None]:
import pandas as pd
from collections import defaultdict
import hashlib
import re
import numpy as np
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, IntegerType
from pyspark.sql import Row
from pyspark.sql.functions import pandas_udf


In [None]:
#import data
df_data_raw = pd.read_csv("/kaggle/input/amazon-books-reviews/books_data.csv")
df_rating_raw = pd.read_csv("/kaggle/input/amazon-books-reviews/Books_rating.csv")

df_data_raw = df_data_raw.drop(["image", "previewLink",'infoLink'], axis = 1)

In [None]:
df_data_raw[1:5]

Unnamed: 0,Title,description,authors,publisher,publishedDate,categories,ratingsCount
1,Dr. Seuss: American Icon,Philip Nel takes a fascinating look into the k...,['Philip Nel'],A&C Black,2005-01-01,['Biography & Autobiography'],
2,Wonderful Worship in Smaller Churches,This resource includes twelve principles in un...,['David R. Ray'],,2000,['Religion'],
3,Whispers of the Wicked Saints,Julia Thomas finds her life spinning out of co...,['Veronica Haddon'],iUniverse,2005-02,['Fiction'],
4,"Nation Dance: Religion, Identity and Cultural ...",,['Edward Long'],,2003-03-01,,


In [None]:
df_rating_raw['review/text'][1]

"I don't care much for Dr. Seuss but after reading Philip Nel's book I changed my mind--that's a good testimonial to the power of Rel's writing and thinking. Rel plays Dr. Seuss the ultimate compliment of treating him as a serious poet as well as one of the 20th century's most interesting visual artists, and after reading his book I decided that a trip to the Mandeville Collections of the library at University of California in San Diego was in order, so I could visit some of the incredible Seuss/Geisel holdings they have there.There's almost too much to take in, for, like William Butler Yeats, Seuss led a career that constantly shifted and metamoprhized itself to meet new historical and political cirsumstances, so he seems to have been both a leftist and a conservative at different junctures of his career, both in politics and in art. As Nel shows us, he was once a cartoonist for the fabled PM magazine and, like Andy Warhol, he served his time slaving in the ad business too. All was in

In [None]:
#Preprocessing
def clean_text(text):
    text = str(text).lower()
    text = re.sub(r'[^\w\s]', '', text)
    text = re.sub(r'\s+', ' ', text)
    return text.strip()

df_rating_raw = df_rating_raw[1:400000]
df_rating_raw['cleaned_text'] = df_rating_raw['review/text'].apply(clean_text)


In [None]:
cleaned_df = df_rating_raw[['Title', 'cleaned_text']].copy()


In [None]:
cleaned_df.to_csv("cleaned_df.csv")

In [None]:
from pyspark.sql.functions import col, udf
from pyspark.sql.types import ArrayType, IntegerType
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
spark_df = spark.read.csv("/content/cleaned_df.csv", header=True)

@udf(ArrayType(IntegerType()))
def spark_shingle_hash(text):
    if not text:
        return []
    words = text.split()
    return [int(hashlib.sha1(' '.join(words[i:i+5]).encode()).hexdigest()[:8], 16)
            for i in range(len(words)-4)]


spark_df = spark_df.withColumn("hashed_shingles_5", spark_shingle_hash(col("cleaned_text")))

In [None]:
# cleaned_df = pd.read_csv("new_data.csv")
# cleaned_df = cleaned_df.reset_index()

In [None]:
#sort shingle for min hash optimization step
# cleaned_df = cleaned_df.sort_values('hashed_shingles_5')
@pandas_udf(ArrayType(IntegerType()))
def sort_shingles_udf(shingles_series: pd.Series) -> pd.Series:
    return shingles_series.apply(lambda x: sorted(x) if x is not None and len(x) > 0 else [])


spark_df = spark_df.withColumn("sorted_shingles", sort_shingles_udf(col("hashed_shingles_5")))


In [None]:
spark_df = spark_df.drop("hashed_shingles_5")
spark_df.count()

399999

In [None]:
# Minhash signature implementation
# a trade-off between accuracy and computational cost, according to the graph https://ekzhu.com/datasketch/minhash.html, 110 is a balance number for mh acc and mh performance
review_minhashes = {}

class CustomMinHash:
    def __init__(self, num_perm=110):
        self.num_perm = num_perm
        self.max_hash = (1 << 32) - 1
        self.permutations = self._generate_permutations()

    def _generate_permutations(self):
        """Generate random hash functions of form h(x) = (a*x + b) % prime"""
        np.random.seed(42)
        a = np.random.randint(1, self.max_hash, size=self.num_perm)
        b = np.random.randint(0, self.max_hash, size=self.num_perm)
        return list(zip(a, b))

    def _hash_func(self, x, a, b):
        """Universal hash function implementation"""
        return (a * x + b) % self.max_hash

    def compute_signature(self, hashed_shingles):
        """Compute MinHash signature for a list of hashed shingles"""
        signature = np.full(self.num_perm, np.inf)

        for shingle in hashed_shingles:
            x = int(shingle)
            for i, (a, b) in enumerate(self.permutations):
                hash_val = self._hash_func(x, a, b)
                if hash_val < signature[i]:
                    signature[i] = hash_val

        return [int(x) for x in signature]

custom_minhash = CustomMinHash(num_perm=110)

@udf(ArrayType(IntegerType()))
def compute_minhash_signature(hashed_shingles):
    if not hashed_shingles:
        return [0] * 110
    return custom_minhash.compute_signature(hashed_shingles)

spark_df = spark_df.withColumn(
    "minhash_signature",
    compute_minhash_signature("sorted_shingles")
)

spark_df.select("_c0", "minhash_signature").show(5, truncate=False)

+---+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [None]:
#compute LSH n = 110, b = 22, r = 5 -> t about 0.53 dont care about false negative
# LSH Banding Implementation
# from datasketch import MinHashLSH

# lsh = MinHashLSH(threshold=0.53, num_perm=110)  # num_perm = b*r = 22*5 = 110

# for review, mh in review_minhashes.items():
#     lsh.insert(review, mh)

# candidate_pairs = set()
# for review in review_minhashes:
#     matches = lsh.query(review_minhashes[review])
#     for match in matches:
#         if match != review:
#             candidate_pairs.add(tuple(sorted((review, match))))



In [None]:
#Create LSH from scratch
class CustomLSH:
    def __init__(self, threshold=0.53, num_perm=110):
        self.num_perm = num_perm
        self.b, self.r = self._optimal_bands(threshold)
        self.hash_tables = [defaultdict(set) for _ in range(self.b)]

    def _optimal_bands(self, threshold):
        """Calculate optimal bands (b) and rows (r) for target threshold"""
        # Solve for (1/b)^(1/r) ≈ threshold
        best_b, best_r = 22, 5
        return best_b, best_r

    def _get_band_hashes(self, signature):
        """Split signature into bands and compute band hashes"""
        bands = []
        for band_idx in range(self.b):
            start = band_idx * self.r
            end = start + self.r
            band = tuple(signature[start:end])
            band_hash = hash(band)
            bands.append((band_idx, band_hash))
        return bands

    def insert(self, doc_id, signature):
        """Insert document into LSH index"""
        for band_idx, band_hash in self._get_band_hashes(signature):
            self.hash_tables[band_idx][band_hash].add(doc_id)

    def query(self, signature):
        """Find candidate matches for a document"""
        candidates = set()
        for band_idx, band_hash in self._get_band_hashes(signature):
            candidates.update(self.hash_tables[band_idx].get(band_hash, set()))
        return candidates

custom_lsh = CustomLSH(threshold=0.53, num_perm=110)


for row in spark_df.select('_c0', 'minhash_signature').collect():
    custom_lsh.insert(row['_c0'], row['minhash_signature'])




Found 136984 candidate pairs


In [None]:
# Find all candidate pairs
candidate_pairs = set()
for row in spark_df.select('_c0', 'minhash_signature').collect():
    doc_id = row['_c0']
    matches = custom_lsh.query(row['minhash_signature'])
    for match in matches:
        if match != doc_id:
            candidate_pairs.add(frozenset({doc_id, match}))

print(f"Found {len(candidate_pairs)} candidate pairs")

In [None]:
first_20_pairs = list(candidate_pairs)[:100]
first_20_pairs

[frozenset({'103134', '94993'}),
 frozenset({'297646', '94595'}),
 frozenset({'379261', '57594'}),
 frozenset({'294075', '388275'}),
 frozenset({'315497', '349664'}),
 frozenset({'204389', '282851'}),
 frozenset({'93658', '93864'}),
 frozenset({'103066', '389260'}),
 frozenset({'29314', '29316'}),
 frozenset({'115224', '82166'}),
 frozenset({'115232', '234535'}),
 frozenset({'10252', '7308'}),
 frozenset({'71964', '82818'}),
 frozenset({'103207', '290899'}),
 frozenset({'387051', '387282'}),
 frozenset({'115256', '361412'}),
 frozenset({'313322', '77596'}),
 frozenset({'115252', '388274'}),
 frozenset({'136736', '136848'}),
 frozenset({'134068', '294848'}),
 frozenset({'102203', '1033'}),
 frozenset({'103325', '95144'}),
 frozenset({'159713', '368912'}),
 frozenset({'159810', '357223'}),
 frozenset({'220365', '286205'}),
 frozenset({'368069', '368609'}),
 frozenset({'290359', '94470'}),
 frozenset({'220360', '31344'}),
 frozenset({'255525', '82050'}),
 frozenset({'104249', '294075'}),
