# IMPORTS

In [15]:
!pip install nltk pyspark
import os
import zipfile
import time
import numpy as np
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import FloatType, ArrayType, StringType, IntegerType
from pyspark.sql.functions import udf
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, MinHashLSH, HashingTF
from pyspark.ml.linalg import Vectors, VectorUDT
import nltk
from nltk.stem import SnowballStemmer
nltk.download('punkt')



[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


True

# SETUP AND DATA LOADING

In [4]:
# Kaggle credentials setup
os.environ['KAGGLE_USERNAME'] = 'fabiocasalingo'
os.environ['KAGGLE_KEY'] = 'f634e9d85346043bccbcb39b5cd6917e'

# Create directory for dataset
dataset_dir = "./kaggle_data"
os.makedirs(dataset_dir, exist_ok=True)

# File paths
file_name = "Books_rating.csv"
zip_path = os.path.join(dataset_dir, "amazon-books-reviews.zip")

# Download dataset
print("Download in corso...")
get_ipython().system('kaggle datasets download -d mohamedbakhet/amazon-books-reviews -p {dataset_dir} --force')
print("Download completato.")

file_path = os.path.join(dataset_dir, "Books_rating.csv")
print(f"File CSV utilizzato: {file_path}")

# Extract the zip file
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
    zip_ref.extractall(dataset_dir)
    print("Estrazione completata.")

# Configuration parameters
SAMPLE_SIZE = 10000
LSH_HASH_TABLES = 3
VOCAB_SIZE = 5000
JACCARD_THRESHOLD = 0.2
RANDOM_SEED = 42

print(f"Using optimized configuration:")
print(f"- Sample size: {SAMPLE_SIZE}")
print(f"- LSH hash tables: {LSH_HASH_TABLES}")
print(f"- Vocabulary size: {VOCAB_SIZE}")
print(f"- Jaccard threshold: {JACCARD_THRESHOLD}")

# Spark session initialization
spark = SparkSession.builder \
    .appName("BookRatingAnalysisWithLSH") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .config("spark.sql.shuffle.partitions", "200") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .getOrCreate()

print(f"Spark version: {spark.version}")
print("SparkSession creata con successo!")

# Load the dataset
df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv(file_path)

print("Dataset caricato correttamente.")
df.show(5)

Download in corso...
Dataset URL: https://www.kaggle.com/datasets/mohamedbakhet/amazon-books-reviews
License(s): CC0-1.0
Downloading amazon-books-reviews.zip to ./kaggle_data
 99% 1.05G/1.06G [00:17<00:00, 125MB/s]
100% 1.06G/1.06G [00:17<00:00, 64.1MB/s]
Download completato.
File CSV utilizzato: ./kaggle_data/Books_rating.csv
Estrazione completata.
Using optimized configuration:
- Sample size: 10000
- LSH hash tables: 3
- Vocabulary size: 5000
- Jaccard threshold: 0.2
Spark version: 3.5.1
SparkSession creata con successo!
Dataset caricato correttamente.
+----------+--------------------+-----+--------------+--------------------+------------------+------------+-----------+--------------------+--------------------+
|        Id|               Title|Price|       User_id|         profileName|review/helpfulness|review/score|review/time|      review/summary|         review/text|
+----------+--------------------+-----+--------------+--------------------+------------------+------------+--------

# DATA PREPROCESSING

In [5]:
# Filter empty or short reviews
df_clean = df.filter(
    (F.col("review/text").isNotNull()) &
    (F.col("review/text") != "") &
    (F.length(F.col("review/text")) >= 10)
).select(
    F.monotonically_increasing_id().alias("review_id"),
    F.col("review/text").alias("review_text")
).limit(SAMPLE_SIZE)

print(f"Clean dataset: {df_clean.count()} reviews")

# Install and import NLTK for stemming
stemmer = SnowballStemmer("english")

def stem_words(word_list):
    return [stemmer.stem(word) for word in word_list if len(word) > 2]

stem_udf = F.udf(stem_words, ArrayType(StringType()))

Clean dataset: 10000 reviews


# TEXT PROCESSING PIPELINE

In [6]:
print("Processing text...")

# Clean and normalize text
df_processed = df_clean.withColumn(
    "review_text_clean",
    F.regexp_replace(F.lower(F.col("review_text")), "[^a-z\\s]", " ")
).withColumn(
    "review_text_clean",
    F.regexp_replace(F.col("review_text_clean"), "\\s+", " ")
)

# Tokenize
tokenizer = RegexTokenizer(
    inputCol="review_text_clean",
    outputCol="words",
    pattern="\\s+",
    gaps=True
)
df_tokenized = tokenizer.transform(df_processed)

# Remove stopwords
remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
df_filtered = remover.transform(df_tokenized)

# Remove short words
df_filtered = df_filtered.withColumn(
    "filtered_words",
    F.expr("filter(filtered_words, x -> length(x) > 2)")
)

# Apply stemming
df_stemmed = df_filtered.withColumn(
    "stemmed_words",
    stem_udf("filtered_words")
)

# Remove duplicate words
df_final = df_stemmed.withColumn(
    "unique_words",
    F.array_distinct(F.col("stemmed_words"))
).withColumn(
    "num_words",
    F.size("unique_words")
).filter(
    F.col("num_words") >= 3
)

df_final.cache()
print(f"Final dataset: {df_final.count()} reviews")

Processing text...
Final dataset: 9949 reviews


# FEATURE ENGINEERING WITH HASHINGTF

In [7]:
print("Creating optimized feature vectors...")

hashingTF = HashingTF(
    inputCol="unique_words",
    outputCol="features",
    numFeatures=VOCAB_SIZE
)

df_with_features = hashingTF.transform(df_final)

# Remove empty vectors
nnz_udf = F.udf(lambda v: len(v.indices), IntegerType())

df_with_features = df_with_features.withColumn("nnz", nnz_udf("features")) \
    .filter(F.col("nnz") > 0) \
    .drop("nnz") \
    .cache()

print("Feature vectors created successfully!")
print(f"Reviews with non-zero features: {df_with_features.count()}")

Creating optimized feature vectors...
Feature vectors created successfully!
Reviews with non-zero features: 9949


# CREATE ARTIFICIAL DUPLICATES

In [8]:
print(f"\n{'='*60}")
print("CREATING ARTIFICIAL DUPLICATES FOR EVALUATION")
print(f"{'='*60}")

num_duplicates = 25
dup_source = df_final.orderBy(F.rand(seed=RANDOM_SEED)).limit(num_duplicates)

duplicated = dup_source.withColumn(
    "review_id",
    F.concat(F.lit("dup_"), F.col("review_id").cast("string"))
)

df_augmented = df_final.unionByName(duplicated)
print(f"Augmented dataset size: {df_augmented.count()}")
print(f"Added {num_duplicates} artificial duplicates")

df_aug_with_features = hashingTF.transform(df_augmented)

df_aug_with_features = df_aug_with_features.withColumn("nnz", nnz_udf("features")) \
    .filter(F.col("nnz") > 0) \
    .drop("nnz") \
    .cache()

print(f"Augmented reviews with non-zero features: {df_aug_with_features.count()}")


CREATING ARTIFICIAL DUPLICATES FOR EVALUATION
Augmented dataset size: 9974
Added 25 artificial duplicates
Augmented reviews with non-zero features: 9974


# LSH SIMILARITY DETECTION FUNCTION

In [9]:
def optimized_lsh_similarity_with_evaluation(df, df_augmented, threshold=0.5):
    print(f"Running LSH evaluation (threshold: {threshold})...")

    start_time = time.time()

    mh = MinHashLSH(
        inputCol="features",
        outputCol="hashes",
        numHashTables=LSH_HASH_TABLES,
        seed=RANDOM_SEED
    )

    print("Fitting LSH model on augmented data...")
    model = mh.fit(df_augmented)

    df_hashed = model.transform(df_augmented.select("review_id", "features"))

    print("Finding similar pairs...")
    distance_threshold = 1.0 - threshold

    similar_pairs = model.approxSimilarityJoin(
        df_hashed,
        df_hashed,
        distance_threshold,
        distCol="distance"
    ).filter(
        F.col("datasetA.review_id") < F.col("datasetB.review_id")
    ).withColumn(
        "similarity",
        1.0 - F.col("distance")
    ).select(
        F.col("datasetA.review_id").alias("review_id_1"),
        F.col("datasetB.review_id").alias("review_id_2"),
        F.col("similarity").alias("lsh_score")
    ).cache()

    elapsed_time = time.time() - start_time
    result_count = similar_pairs.count()

    print(f"LSH completed in {elapsed_time:.2f} seconds")
    print(f"Found {result_count} similar pairs")

    return similar_pairs, model

# METRIC CALCULATION FUNCTION

In [10]:
def calculate_evaluation_metrics(predicted_pairs, num_duplicates):
    print(f"\n{'='*60}")
    print("CALCULATING EVALUATION METRICS")
    print(f"{'='*60}")

    pred_pairs_for_eval = predicted_pairs.select("review_id_1", "review_id_2")

    true_positives = pred_pairs_for_eval.filter(
        F.col("review_id_2").startswith("dup_") &
        (F.col("review_id_2") == F.concat(F.lit("dup_"), F.col("review_id_1")))
    ).count()

    total_predictions = pred_pairs_for_eval.count()
    total_ground_truth = num_duplicates

    precision = true_positives / total_predictions if total_predictions > 0 else 0.0
    recall = true_positives / total_ground_truth if total_ground_truth > 0 else 0.0
    f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0.0

    print(f"Evaluation Results:")
    print(f"{'─' * 40}")
    print(f"Ground truth duplicate pairs  : {total_ground_truth}")
    print(f"Total predicted pairs         : {total_predictions}")
    print(f"True positives (TP)          : {true_positives}")
    print(f"Precision                    : {precision:.3f}")
    print(f"Recall                       : {recall:.3f}")
    print(f"F1-Score                     : {f1_score:.3f}")

    if true_positives > 0:
        print(f"\nExample True Positive Pairs:")
        example_tp = pred_pairs_for_eval.filter(
            F.col("review_id_2").startswith("dup_") &
            (F.col("review_id_2") == F.concat(F.lit("dup_"), F.col("review_id_1")))
        ).limit(5)
        example_tp.show(truncate=False)

    return {
        'precision': precision,
        'recall': recall,
        'f1_score': f1_score,
        'true_positives': true_positives,
        'total_predictions': total_predictions,
        'total_ground_truth': total_ground_truth
    }

# EXECUTE SIMILARITY DETECTION

In [11]:
print(f"\n{'='*60}")
print("OPTIMIZED SIMILARITY DETECTION WITH EVALUATION")
print(f"{'='*60}")

threshold = 0.5
print(f"\nTesting threshold: {threshold}")
print(f"{'-'*40}")

lsh_results, lsh_model = optimized_lsh_similarity_with_evaluation(
    df_with_features, df_aug_with_features, threshold
)

metrics = calculate_evaluation_metrics(lsh_results, num_duplicates)

if lsh_results.count() > 0:
    print(f"\nTop 10 Similar Pairs Found:")
    lsh_results.orderBy(F.col("lsh_score").desc()).show(10, truncate=False)


OPTIMIZED SIMILARITY DETECTION WITH EVALUATION

Testing threshold: 0.5
----------------------------------------
Running LSH evaluation (threshold: 0.5)...
Fitting LSH model on augmented data...
Finding similar pairs...
LSH completed in 2.02 seconds
Found 272 similar pairs

CALCULATING EVALUATION METRICS
Evaluation Results:
────────────────────────────────────────
Ground truth duplicate pairs  : 25
Total predicted pairs         : 272
True positives (TP)          : 25
Precision                    : 0.092
Recall                       : 1.000
F1-Score                     : 0.168

Example True Positive Pairs:
+-----------+-----------+
|review_id_1|review_id_2|
+-----------+-----------+
|9900       |dup_9900   |
|5444       |dup_5444   |
|2567       |dup_2567   |
|4425       |dup_4425   |
|7317       |dup_7317   |
+-----------+-----------+


Top 10 Similar Pairs Found:
+-----------+-----------+---------+
|review_id_1|review_id_2|lsh_score|
+-----------+-----------+---------+
|8998       |90

# THRESHOLD ANALYSIS

In [12]:
def analyze_different_thresholds(df_features, df_aug_features, thresholds=[0.3, 0.5, 0.7, 0.8]):
    print(f"\n{'='*60}")
    print("THRESHOLD ANALYSIS")
    print(f"{'='*60}")

    results = []

    for thresh in thresholds:
        print(f"\nAnalyzing threshold: {thresh}")
        pairs, _ = optimized_lsh_similarity_with_evaluation(df_features, df_aug_features, thresh)
        metrics = calculate_evaluation_metrics(pairs, num_duplicates)
        metrics['threshold'] = thresh
        results.append(metrics)
        print(f"Threshold {thresh}: P={metrics['precision']:.3f}, R={metrics['recall']:.3f}, F1={metrics['f1_score']:.3f}")

    return results

print(f"\nRunning threshold analysis...")
threshold_results = analyze_different_thresholds(
    df_with_features,
    df_aug_with_features,
    thresholds=[0.3, 0.5, 0.7]
)

print(f"\n{'='*60}")
print("THRESHOLD ANALYSIS SUMMARY")
print(f"{'='*60}")
print(f"{'Threshold':<10} {'Precision':<10} {'Recall':<10} {'F1-Score':<10} {'Pairs':<10}")
print(f"{'-'*50}")
for result in threshold_results:
    print(f"{result['threshold']:<10} {result['precision']:<10.3f} {result['recall']:<10.3f} "
          f"{result['f1_score']:<10.3f} {result['total_predictions']:<10}")

best_result = max(threshold_results, key=lambda x: x['f1_score'])
print(f"\nBest threshold: {best_result['threshold']} (F1-Score: {best_result['f1_score']:.3f})")


spark.stop()


Running threshold analysis...

THRESHOLD ANALYSIS

Analyzing threshold: 0.3
Running LSH evaluation (threshold: 0.3)...
Fitting LSH model on augmented data...
Finding similar pairs...
LSH completed in 0.59 seconds
Found 520 similar pairs

CALCULATING EVALUATION METRICS
Evaluation Results:
────────────────────────────────────────
Ground truth duplicate pairs  : 25
Total predicted pairs         : 520
True positives (TP)          : 25
Precision                    : 0.048
Recall                       : 1.000
F1-Score                     : 0.092

Example True Positive Pairs:
+-----------+-----------+
|review_id_1|review_id_2|
+-----------+-----------+
|9900       |dup_9900   |
|5444       |dup_5444   |
|2567       |dup_2567   |
|4425       |dup_4425   |
|7317       |dup_7317   |
+-----------+-----------+

Threshold 0.3: P=0.048, R=1.000, F1=0.092

Analyzing threshold: 0.5
Running LSH evaluation (threshold: 0.5)...
Fitting LSH model on augmented data...
Finding similar pairs...
LSH completed