<a href="https://colab.research.google.com/github/EdoZano/AMD/blob/main/Main.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [5]:
!pip install -q pyspark==3.5.1
!pip install -q spark-nlp==3.4.3


[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/53.0 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m53.0/53.0 kB[0m [31m2.0 MB/s[0m eta [36m0:00:00[0m
[?25h[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/144.6 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m144.6/144.6 kB[0m [31m6.1 MB/s[0m eta [36m0:00:00[0m
[?25h

In [1]:
import os
import pandas as pd
import zipfile

from pyspark.sql import SparkSession
import sparknlp
from sparknlp.base import *
from sparknlp.annotator import *
from pyspark.ml import Pipeline
from pyspark.sql.functions import col, size, expr, avg, abs #expr allows you to write SQL inside pyspark
from pyspark.ml.feature import HashingTF, MinHashLSH


In [4]:
os.environ['KAGGLE_USERNAME'] = "XXXX"  # <-- your username
os.environ['KAGGLE_KEY'] = "XXXX"       # <-- your key API
!kaggle datasets download -d mohamedbakhet/amazon-books-reviews

!unzip amazon-books-reviews.zip

Dataset URL: https://www.kaggle.com/datasets/mohamedbakhet/amazon-books-reviews
License(s): CC0-1.0
Downloading amazon-books-reviews.zip to /content
 95% 1.01G/1.06G [00:06<00:00, 94.2MB/s]
100% 1.06G/1.06G [00:07<00:00, 161MB/s] 
Archive:  amazon-books-reviews.zip
  inflating: Books_rating.csv        
  inflating: books_data.csv          


In [2]:
ratings_df = pd.read_csv("Books_rating.csv")
print(len(ratings_df))

print(f'{ratings_df.columns}')

3000000
Index(['Id', 'Title', 'Price', 'User_id', 'profileName', 'review/helpfulness',
       'review/score', 'review/time', 'review/summary', 'review/text'],
      dtype='object')


In [3]:
# Check if the same 'Id' (book identifier) has multiple different 'Title' entries
inconsistent_id_title = ratings_df.groupby("Id")["Title"].nunique()
print(inconsistent_id_title[inconsistent_id_title > 1])  # Show only inconsistent cases

# Remove rows with missing review text, keep only these columns
ratings_df = ratings_df[["Id", "Title", "User_id", "review/text"]].dropna(subset=["review/text"])

# Drop completely identical rows (same user, book, text, etc.)
ratings_df = ratings_df.drop_duplicates()
print(f'After first cleanins we have : {len(ratings_df)} datapoints')
# Remove reviews with the same exact text
ratings_df = ratings_df.drop_duplicates(subset=["review/text"])
print(f'At the end we have : {len(ratings_df)} datapoints')



Series([], Name: Title, dtype: int64)
2970140
2062648


In [4]:
print(f'Percentage of the total dataset kept: {len(ratings_df)/30000}%')

68.75493333333333

In [5]:
USE_SAMPLING = True
if USE_SAMPLING:
    sample_df = ratings_df.sample(frac=0.05, random_state=42)
else:
    sample_df = ratings_df

In [6]:
# 🔹 Start Spark NLP session
spark = sparknlp.start()

# 🔹 Create Spark DataFrame
spark_df = spark.createDataFrame(sample_df)


In [7]:
spark_df.show(5)
spark_df.printSchema()

+----------+--------------------+--------------+--------------------+
|        Id|               Title|       User_id|         review/text|
+----------+--------------------+--------------+--------------------+
|0201379619|Dynamic HTML: The...|           NaN|I like it. It's v...|
|B0000A9AYK|       Dry: A Memoir|A2KEH545A8283S|This book has it ...|
|1590481208|Hidalgo and Other...| AOL4M7FJMQC6Q|After watching th...|
|B000J58B8Y|FIFTH ELEPHANT (D...|A2EMBVAUBHA5WI|I admit that this...|
|0761125493|What to Expect Wh...|           NaN|I found that even...|
+----------+--------------------+--------------+--------------------+
only showing top 5 rows

root
 |-- Id: string (nullable = true)
 |-- Title: string (nullable = true)
 |-- User_id: string (nullable = true)
 |-- review/text: string (nullable = true)



In [9]:
# 🔹 Document assembler
document_assembler = DocumentAssembler() \
    .setInputCol("review/text") \
    .setOutputCol("document")

# 🔹 Tokenizer
tokenizer = Tokenizer() \
    .setInputCols(["document"]) \
    .setOutputCol("token")

# 🔹 Normalizer: lowercase + punctuation removed
normalizer = Normalizer() \
    .setInputCols(["token"]) \
    .setOutputCol("normalized") \
    .setLowercase(True)

# 🔹 Stopword remover
stopwords_cleaner = StopWordsCleaner() \
    .setInputCols(["normalized"]) \
    .setOutputCol("clean_tokens") \
    .setCaseSensitive(False)

# 🔹 Finisher
finisher = Finisher() \
    .setInputCols(["clean_tokens"]) \
    .setOutputCols(["final_tokens"]) \
    .setCleanAnnotations(True)

# 🔹 Pipeline NLP
nlp_pipeline = Pipeline(stages=[
    document_assembler,
    tokenizer,
    normalizer,
    stopwords_cleaner,
    finisher
])

# 🔹 Fit & Transform
model = nlp_pipeline.fit(spark_df)
processed_df = model.transform(spark_df)

# 🔹 Keep rows with at least 3 token (for shingles)
filtered_df = processed_df.withColumn("num_tokens", size(col("final_tokens"))) \
    .filter(col("num_tokens") >= 3)

# 🔹 Create shingles (k=3)
shingled_df = filtered_df.withColumn(
    "shingles",
    expr("transform(sequence(0, size(final_tokens)-3), i -> concat_ws(' ', final_tokens[i], final_tokens[i+1], final_tokens[i+2]))")
)
#concat_ws(' ', ...) Unisce le tre parole in una stringa unica con spazio:


# 🔹 Show final dataframe
shingled_df.select("Id", "shingles").show(5, truncate=False)


+----------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [10]:

# 🔹 Apply HashingTF to transform shingles into vectors
hashingTF = HashingTF(inputCol="shingles", outputCol="features", numFeatures=512)
featurized_df = hashingTF.transform(shingled_df)

# 🔹 Build MinHashLSH model
mh = MinHashLSH(inputCol="features", outputCol="hashes", numHashTables=3)
lsh_model = mh.fit(featurized_df)
mh_df = lsh_model.transform(featurized_df).cache()

# 🔹 Find reviews pair close to each others (Jaccard approximated)
similar_pairs = lsh_model.approxSimilarityJoin(
    mh_df,
    mh_df,
    threshold=0.2,  # puoi regolare
    distCol="JaccardDistance"
)

# 🔹 Don't count same IDs
similar_pairs_filtered = similar_pairs.filter(col("datasetA.Id") < col("datasetB.Id"))

filtered_output = similar_pairs_filtered \
    .select(
        col("datasetA.Id").alias("ID_A"),
        expr("substring(datasetA.`review/text`, 1, 200)").alias("ReviewA_Preview"),
        col("datasetA.num_tokens").alias("n_tokens_A"),

        col("datasetB.Id").alias("ID_B"),
        expr("substring(datasetB.`review/text`, 1, 200)").alias("ReviewB_Preview"),
        col("datasetB.num_tokens").alias("n_tokens_B"),

        col("JaccardDistance")
    )


# 🔹 Show similar pairs
filtered_output = filtered_output.cache()
filtered_output.show(5, truncate=False)
filtered_output.count()



+----------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------+----------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------+-------------------+
|ID_A      |ReviewA_Preview                                                                                                                                                                                         |n_tokens_A|ID_B      |ReviewB_Preview                                                                                                                                                                                         |n_tokens_B|JaccardDistance    |
+----------+----------------------------------------------------

61

In [11]:
#Calculate difference in token numbers
filtered_output = filtered_output.withColumn(
    "token_diff",
    abs(col("n_tokens_A") - col("n_tokens_B"))
)
#Show analysis for each column
filtered_output.select("JaccardDistance","n_tokens_A", "n_tokens_B", "token_diff").describe().show()
#Correlation between variables
corr_diff = filtered_output.stat.corr("JaccardDistance", "token_diff")

print(f"Correlation JaccardDistance vs token_diff: {corr_diff:.4f}")
#Mean of token difference
filtered_output.select(avg(col("token_diff"))).show()
#Mean of Jaccard Distance
filtered_output.select(avg(col("JaccardDistance"))).show()
#Show in ascending order of Jaccard similarity all the pairs
filtered_output.orderBy(col("JaccardDistance").asc()).select(
    "ID_A", "ID_B", "JaccardDistance", "token_diff", "n_tokens_A", "n_tokens_B"
).show(10, truncate=False)

#Show pairs with small Jaccard and big token difference
filtered_output \
    .filter((col("JaccardDistance") < 0.1) & (col("token_diff") > 30)) \
    .select("ID_A", "ID_B", "JaccardDistance", "token_diff", "n_tokens_A", "n_tokens_B") \
    .orderBy(col("token_diff").desc()) \
    .show(10, truncate=False)

+-------+-------------------+------------------+------------------+-----------------+
|summary|    JaccardDistance|        n_tokens_A|        n_tokens_B|       token_diff|
+-------+-------------------+------------------+------------------+-----------------+
|  count|                 61|                61|                61|               61|
|   mean| 0.1371365473505715|            1478.0|1598.5409836065573|660.4426229508197|
| stddev|0.04285744364225551|488.35871378867944| 554.7056749235127|506.5517586120943|
|    min|        0.029296875|               249|               261|               12|
|    max|0.19842829076620827|              2602|              2602|             1786|
+-------+-------------------+------------------+------------------+-----------------+

Correlation JaccardDistance vs token_diff: 0.0068
+-----------------+
|  avg(token_diff)|
+-----------------+
|660.4426229508197|
+-----------------+

+--------------------+
|avg(JaccardDistance)|
+--------------------+
|  0.