<a href="https://colab.research.google.com/github/GHMelany/AMD_project/blob/main/AMD%20.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

PROJECT: Finding similar items. Implement a detector of pairs of similar book reviews


LIBRARIES

In [1]:
!pip install langid




In [2]:
# Disinstalla pyspark attuale
!pip uninstall -y pyspark

# Installa PySpark stabile
!pip install pyspark==3.4.1

# Imposta JAVA_HOME (Java 11 va bene per 3.4.1)
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["PATH"] = os.environ["JAVA_HOME"] + "/bin:" + os.environ["PATH"]

# Avvia Spark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("MyApp").getOrCreate()
print("Spark versione:", spark.version)


Found existing installation: pyspark 3.4.1
Uninstalling pyspark-3.4.1:
  Successfully uninstalled pyspark-3.4.1
Collecting pyspark==3.4.1
  Using cached pyspark-3.4.1-py2.py3-none-any.whl
Installing collected packages: pyspark
[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
dataproc-spark-connect 0.8.3 requires pyspark[connect]~=3.5.1, but you have pyspark 3.4.1 which is incompatible.[0m[31m
[0mSuccessfully installed pyspark-3.4.1
Spark versione: 3.4.1


In [3]:
import re, html, langid
import os
import zipfile
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import StringType
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, NGram, HashingTF, MinHashLSH
from pyspark.ml import Pipeline


In [4]:
os.environ['KAGGLE_USERNAME'] = "melanygomez"
os.environ['KAGGLE_KEY'] = "38db1cce93622035560027022e9cafc"

!pip install -q kaggle

from kaggle.api.kaggle_api_extended import KaggleApi
api = KaggleApi()
api.authenticate()

!kaggle datasets download -d mohamedbakhet/amazon-books-reviews

Dataset URL: https://www.kaggle.com/datasets/mohamedbakhet/amazon-books-reviews
License(s): CC0-1.0
amazon-books-reviews.zip: Skipping, found more recently modified local copy (use --force to force download)


In [5]:
zip_path = "amazon-books-reviews.zip"
extract_dir = "amazon_books_reviews"
os.makedirs(extract_dir, exist_ok=True)

with zipfile.ZipFile(zip_path, "r") as z:
    z.extractall(extract_dir)

for root, dirs, files in os.walk(extract_dir):
    for file in files:
        print(os.path.join(root, file))

folder = "amazon_books_reviews"
csv_path = os.path.join(folder, "Books_rating.csv")



amazon_books_reviews/Books_rating.csv
amazon_books_reviews/books_data.csv


In [23]:
data = spark.read.csv(csv_path, header=True, inferSchema=True, quote='"', escape='"')
data = data.select("Id", F.col("review/score").alias("score"), F.col("review/text").alias("text")).dropna(subset=["text"])

In [24]:
data = data.limit(5000)

In [25]:
def preprocess(raw):
    if not raw: return ""
    txt = html.unescape(raw).lower()
    txt = re.sub(r"[^a-z0-9 ]+", " ", txt)
    return re.sub(r"\s+", " ", txt).strip()

def detect_lang(text):
    snippet = " ".join(text.split()[:50])
    lang, _ = langid.classify(snippet)
    return lang

def classify(score):
    if score > 3:
        return "positive"
    else:
        return "negative"


In [26]:
clean_udf = F.udf(preprocess, StringType())
lang_udf = F.udf(detect_lang, StringType())
sentiment_udf = F.udf(classify, StringType())

In [27]:
data = data.withColumn("clean_text", clean_udf("text")).dropDuplicates(["clean_text"])
data = data.withColumn("lang", lang_udf("clean_text")).filter(F.col("lang") == "en").drop("lang")
data = data.withColumn("sentiment", sentiment_udf("score"))


In [46]:
tokenize = RegexTokenizer(inputCol="clean_text", outputCol="tokens", pattern="\\W+")
remove_sw = StopWordsRemover(inputCol="tokens", outputCol="content_words")
prep_pipeline = Pipeline(stages=[tokenize, remove_sw])
data = prep_pipeline.fit(data).transform(data)

IllegalArgumentException: Output column tokens already exists.

In [12]:
#data = data.withColumn("length", F.size("content_words"))
#data = data.filter((F.col("length") >= 20) & (F.col("length") <= 200))

In [35]:
reviews = data.select("Id", "score", "sentiment", "content_words")


In [36]:
vocab_size = reviews.select(F.explode("content_words")).distinct().count()
hash_space = 2 ** vocab_size.bit_length()


In [37]:
from pyspark.ml.linalg import Vectors
from pyspark.sql.types import BooleanType

def has_content(v):
    return v.numNonzeros() > 0

non_empty_vector_udf = F.udf(has_content, BooleanType())

In [47]:
def find_similar_pairs(input_df, shingle_len=3, threshold=0.8, hash_tables=40):
    ngram_gen = NGram(n=shingle_len, inputCol="content_words", outputCol="shingles")
    df_shingled = ngram_gen.transform(input_df)

    tf = HashingTF(inputCol="shingles", outputCol="vector", numFeatures=hash_space, binary=True)
    vectorized = tf.transform(df_shingled).filter(non_empty_vector_udf("vector"))

    minhash = MinHashLSH(inputCol="vector", outputCol="signature", numHashTables=hash_tables)
    model = minhash.fit(vectorized)
    transformed = model.transform(vectorized)

    candidates = model.approxSimilarityJoin(
        transformed, transformed, distCol="jaccard_dist", threshold=1 - threshold
    )

    similar = (candidates
               .filter(F.col("datasetA.Id") < F.col("datasetB.Id"))
               .withColumn("similarity", 1 - F.col("jaccard_dist"))
               .select(
                   F.col("datasetA.Id").alias("doc1"),
                   F.col("datasetB.Id").alias("doc2"),
                   "similarity"
               ))

    return similar

In [48]:
pos_reviews = reviews.filter(F.col("sentiment") == "positive")
neg_reviews = reviews.filter(F.col("sentiment") == "negative")

pairs_pos = find_similar_pairs(pos_reviews, shingle_len=3, threshold=0.5)
pairs_neg = find_similar_pairs(neg_reviews, shingle_len=3, threshold=0.5)

In [49]:
mod_pos = pairs_pos.filter((F.col("similarity") >= 0.5) & (F.col("similarity") <= 0.95))
mod_neg = pairs_neg.filter((F.col("similarity") >= 0.5) & (F.col("similarity") <= 0.95))

print(f"[✓] Coppie moderatamente simili tra recensioni positive: {mod_pos.count()}")
print(f"[✓] Coppie moderatamente simili tra recensioni negative: {mod_neg.count()}")

print("\n📋 Esempi recensioni simili (positive):")
mod_pos.orderBy(F.desc("similarity")).show(5, truncate=False)

print("\n📋 Esempi recensioni simili (negative):")
mod_neg.orderBy(F.desc("similarity")).show(5, truncate=False)

Py4JJavaError: An error occurred while calling o1719.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 85.0 failed 1 times, most recent failure: Lost task 0.0 in stage 85.0 (TID 292) (519724fcff89 executor driver): org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user defined function (LSHModel$$Lambda$3911/0x00000008415e5040: (struct<type:tinyint,size:int,indices:array<int>,values:array<double>>) => array<struct<type:tinyint,size:int,indices:array<int>,values:array<double>>>).
	at org.apache.spark.sql.errors.QueryExecutionErrors$.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala:217)
	at org.apache.spark.sql.errors.QueryExecutionErrors.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage5.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.ContextAwareIterator.hasNext(ContextAwareIterator.scala:39)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:1160)
	at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:1176)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1214)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:320)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.writeIteratorToStream(PythonUDFRunner.scala:57)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:440)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2088)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:274)
Caused by: java.lang.IllegalArgumentException: requirement failed: Must have at least 1 non zero entry.
	at scala.Predef$.require(Predef.scala:281)
	at org.apache.spark.ml.feature.MinHashLSHModel.hashFunction(MinHashLSH.scala:61)
	at org.apache.spark.ml.feature.LSHModel.$anonfun$transform$1(LSH.scala:99)
	... 20 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2785)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2721)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2720)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2720)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1206)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1206)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1206)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2984)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2923)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2912)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user defined function (LSHModel$$Lambda$3911/0x00000008415e5040: (struct<type:tinyint,size:int,indices:array<int>,values:array<double>>) => array<struct<type:tinyint,size:int,indices:array<int>,values:array<double>>>).
	at org.apache.spark.sql.errors.QueryExecutionErrors$.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala:217)
	at org.apache.spark.sql.errors.QueryExecutionErrors.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage5.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.ContextAwareIterator.hasNext(ContextAwareIterator.scala:39)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:1160)
	at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:1176)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1214)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:320)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.writeIteratorToStream(PythonUDFRunner.scala:57)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:440)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2088)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:274)
Caused by: java.lang.IllegalArgumentException: requirement failed: Must have at least 1 non zero entry.
	at scala.Predef$.require(Predef.scala:281)
	at org.apache.spark.ml.feature.MinHashLSHModel.hashFunction(MinHashLSH.scala:61)
	at org.apache.spark.ml.feature.LSHModel.$anonfun$transform$1(LSH.scala:99)
	... 20 more
