<a href="https://colab.research.google.com/github/El-Mehdi-Oudal/AMD-Project/blob/main/El_Mehdi_Oudal_AMD_project_.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import nltk
from nltk.corpus import stopwords

nltk.download('stopwords')

In [None]:
# Libraries
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, monotonically_increasing_id
from pyspark.sql.types import StringType, BooleanType
from pyspark.ml.feature import (
    Tokenizer,
    StopWordsRemover,
    HashingTF,
    MinHashLSH
)
import nltk
from nltk.corpus import stopwords
from tqdm import tqdm
import psutil
import zipfile
import os, re, time, math, pickle
from multiprocessing import Pool, cpu_count
from itertools import combinations

In [None]:

os.environ['KAGGLE_USERNAME'] = "xxxxxxxxxx"
os.environ['KAGGLE_KEY'] = "xxxxxxxxxx"
!kaggle datasets download -d'mohamedbakhet/amazon-books-reviews'

Dataset URL: https://www.kaggle.com/datasets/mohamedbakhet/amazon-books-reviews
License(s): CC0-1.0
amazon-books-reviews.zip: Skipping, found more recently modified local copy (use --force to force download)


In [None]:
# Extract archive
with zipfile.ZipFile('amazon-books-reviews.zip', 'r') as zip_ref:
    zip_ref.extractall()

In [None]:
# Similarity functions

def jaccard_similarity(tokens_a, tokens_b):
    token_set_a, token_set_b = set(tokens_a), set(tokens_b)
    return float(len(token_set_a & token_set_b) / len(token_set_a | token_set_b)) if token_set_a and token_set_b else 0.0


def clean_and_tokenize(text: str):
    """Clean and tokenize review text, removing stopwords."""
    if pd.isna(text):
        return []
    return [tok for tok in TOKEN_PATTERN.findall(text.lower()) if tok not in ENGLISH_STOPWORDS]


In [None]:
# Brute-force Jaccard benchmark

similarity_benchmarks = {"brute": []}

N_SAMPLE_REVIEWS   = 10000
N_TOP_PAIRS        = 10
reviews_csv_path   = r"Books_rating.csv"

current_process    = psutil.Process(os.getpid())
TOKEN_PATTERN      = re.compile(r"\b[a-z]{2,}\b")
ENGLISH_STOPWORDS  = set(stopwords.words('english'))

t0                 = time.time()
mem_before_mb      = current_process.memory_info().rss / (1024**2)

sample_reviews_pd = pd.read_csv(
    reviews_csv_path,
    usecols=['review/text'],
    nrows=N_SAMPLE_REVIEWS
)
sample_reviews_pd['tokens'] = sample_reviews_pd['review/text'].fillna('').map(clean_and_tokenize)

# compute all pairs and scores
review_index_pairs = combinations(sample_reviews_pd.index, 2)
jaccard_scores = [
    (
        idx_i,
        idx_j,
        jaccard_similarity(
            sample_reviews_pd.loc[idx_i, 'tokens'],
            sample_reviews_pd.loc[idx_j, 'tokens']
        )
    )
    for idx_i, idx_j in tqdm(review_index_pairs)
]

top_jaccard_pairs = sorted(jaccard_scores, key=lambda x: -x[2])[:N_TOP_PAIRS]
similarity_benchmarks['brute'] = [(i, j, score) for i, j, score in top_jaccard_pairs]

elapsed_seconds   = time.time() - t0
mem_after_mb      = current_process.memory_info().rss / (1024**2)

print("Brute-force top-10:", similarity_benchmarks['brute'])
print(f"Resources Used: {elapsed_seconds:.2f}s, Δmem: {mem_after_mb - mem_before_mb:.1f}MiB")


49995000it [29:20, 28394.54it/s]


Brute-force top-10: [(253, 256, 1.0), (262, 267, 1.0), (422, 423, 1.0), (428, 429, 1.0), (535, 536, 1.0), (596, 597, 1.0), (724, 790, 1.0), (726, 727, 1.0), (776, 784, 1.0), (884, 889, 1.0)]
Resources Used: 1791.96s, Δmem: 5046.8MiB


In [None]:
# MinHashLSH pipeline

import pyspark
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql import Window

spark = (
    SparkSession.builder
    .appName("AMD Projecto")
    .getOrCreate()
)

In [None]:
t0 = time.time()
mem_before_mb = current_process.memory_info().rss / (1024**2)

# Spark session for MinHash experiment
spark = (
    SparkSession.builder
    .appName("MinHashSimilarReviews")
    .config("spark.driver.memory",  "8g")
    .config("spark.executor.memory", "8g")
    .getOrCreate()
)

# Load and cleaning
reviews_sdf = (
    spark.read
         .csv(reviews_csv_path, header=True, inferSchema=True)
         .withColumnRenamed("review/text", "text")
         .select("text")
         .filter(col("text").isNotNull() & (col("text") != ""))
         .withColumn("doc_id", monotonically_increasing_id())
)

In [None]:
# Normalize text: lowercase + strip punctuation
normalize_text_udf = udf(
    lambda s: re.sub(r"[^\w\s]", "", s.lower()) if s else "",
    StringType()
)
reviews_sdf = reviews_sdf.withColumn("clean_text", normalize_text_udf(col("text")))


In [None]:
# Tokenize
reviews_sdf = Tokenizer(
    inputCol="clean_text",
    outputCol="tokens"
).transform(reviews_sdf)

In [None]:
# Remove stop-words
reviews_sdf = StopWordsRemover(
    inputCol="tokens",
    outputCol="filtered_tokens"
).transform(reviews_sdf)

In [None]:
# HashingTF vectorization
reviews_sdf = HashingTF(
    inputCol="filtered_tokens",
    outputCol="features",
    numFeatures=1 << 16
).transform(reviews_sdf)

In [None]:
# Drop all-zero vectors
has_nonzero_features = udf(lambda v: v.numNonzeros() > 0, BooleanType())
reviews_sdf = reviews_sdf.filter(has_nonzero_features("features")).cache()


In [None]:
# MinHashLSH model
minhash_lsh = MinHashLSH(
    inputCol="features",
    outputCol="hashes",
    numHashTables=3,
    seed=42
)
minhash_model      = minhash_lsh.fit(reviews_sdf)
hashed_reviews_sdf = minhash_model.transform(reviews_sdf)

In [None]:
# Approx similarity join (distance = 1 − Jaccard)
MIN_JACCARD_SIM  = 0.7
max_lsh_distance = 1.0 - MIN_JACCARD_SIM

candidate_pairs_sdf = (
    minhash_model
        .approxSimilarityJoin(
            hashed_reviews_sdf,
            hashed_reviews_sdf,
            max_lsh_distance,
            distCol="distance"
        )
        .filter(col("datasetA.doc_id") < col("datasetB.doc_id"))
)

In [None]:
# Compute Jaccard, filter, select useful columns
similar_reviews_sdf = (
    candidate_pairs_sdf
        .select(
            col("datasetA.doc_id").alias("docA"),
            col("datasetB.doc_id").alias("docB"),
            (1 - col("distance")).alias("jaccard"),
            col("datasetA.text").alias("review_content")
        )
        .filter(col("jaccard") >= MIN_JACCARD_SIM)
)

In [None]:
# Results
(
    similar_reviews_sdf
        .repartition(10)
        .write
        .mode("overwrite")
        .option("header", True)
        .csv("similar_reviews_full")
)

similar_reviews_sdf.show(20, truncate=False)

+-----+-----+------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [None]:
# Stop the session
spark.stop()