# Amazon reviews K-NN

1.   Scaricare i dati disponibili a questo url: [amazon reviews](https://www.kaggle.com/datasets/kritanjalijain/amazon-reviews?resource=download) o questo [amazon review dropbox](https://www.dropbox.com/scl/fi/ucfoh391qalha3lz0bzjx/amazon_review_polarity_csv.tgz.zip?rlkey=m3a0bbp2ep4sh2qisaz0xwo1w&dl=0)


2.   Il dataset è composto da due file: train and test in csv. Ogni file contiene le seguenti informazioni
  *   polarity - 1 for negative and 2 for positive
  *   title - review heading
  *   text - review body

3.  Generare i vettori sparsi applicando il q-shingle ai dati di training con q=3.
4. Sui vettori sparsi Applicare il MinHashing LSH sul dataset di training.
5. USare il file di testing e applicare una k-nearest neighbor con i dati di testing su cui è stato applicato l'hashing. Usare k=3 e classificare l'elemento con del test set con la polarità maggiormente presente.


6. *Identificare i cluster di recensioni. Ogni cluster di recensione contiene le coppie di recensioni che hanno una similarità  > di 0.6. Da svolgere dopo l'introduzione alle network







In [2]:
import pyspark
from pyspark.mllib import *
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf
from pyspark.ml.feature import MinHashLSH
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.ml.feature import CountVectorizer
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from collections import Counter
from pyspark.sql.functions import lit

In [3]:
# create the session
conf = SparkConf().set("spark.ui.port", "4050")

# create the context
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()

25/04/03 17:52:26 WARN Utils: Your hostname, spurspc resolves to a loopback address: 127.0.1.1; using 151.97.150.177 instead (on interface wlp0s20f3)
25/04/03 17:52:26 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/03 17:52:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# Getting the datasets

In [4]:
base_path = 'datasets/'
# base_path = ''

schema = StructType([
    StructField('polarity', IntegerType(), True),
    StructField('title', StringType(), True),
    StructField('description', StringType(), True),
])
df_train = spark.read.format("csv") \
               .option("header", "false") \
               .schema(schema) \
               .load("datasets/train.csv")

# Mostra lo schema del DataFrame
df_train.printSchema()

root
 |-- polarity: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- description: string (nullable = true)



In [5]:
df_train.count()

                                                                                

3600000

In [6]:
df_test = spark.read.format("csv") \
               .option("header", "false") \
               .schema(schema) \
               .load("datasets/test.csv")

# Mostra lo schema del DataFrame
df_test.printSchema()

root
 |-- polarity: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- description: string (nullable = true)



In [7]:
df_test.count()

400000

# Q-shingle
$q = 3$

In [8]:
def shingle(text: str, q: int):
    shingle_set = []
    if(text is None):
        return list()
    
    for i in range(len(text) - q + 1):
        shingle_set.append(text[i:i+q])
    return list(set(shingle_set))

shingle_udf = F.udf(shingle, ArrayType(StringType()))

q = 3
df_train = df_train.limit(10).withColumn("shingles", shingle_udf(F.col("description"), F.lit(q)))

# Mostra il DataFrame con i shingles
df_train.select('shingles').show(truncate=False)

                                                                                

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [9]:
shingles_df = df_train.select(F.explode(F.col('shingles')).alias('shingle')).distinct().orderBy("shingle")
shingles_df.show()

+-------+
|shingle|
+-------+
|     ! |
|     !!|
|     ""|
|     'g|
|     (D|
|     (I|
|     (R|
|     (T|
|     (w|
|     - |
|     --|
|     3 |
|     5 |
|     8t|
|     Am|
|     An|
|     Be|
|     Bu|
|     CD|
|     Ch|
+-------+
only showing top 20 rows



In [10]:
shingles_df.count()

1676

In [11]:
# Creazione del CountVectorizer con binary=True per ottenere un vettore binario (multi-hot encoding)
cv = CountVectorizer(inputCol="shingles", outputCol="one_hot_shingles", binary=True)

# Fit del modello sul dataset e trasformazione
cv_model = cv.fit(df_train)
df_train = cv_model.transform(df_train)

# Visualizzare il risultato
df_train.select("one_hot_shingles").show(truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

# Min-hash

In [12]:
mh = MinHashLSH(inputCol="one_hot_shingles", outputCol="hashes", numHashTables=3)
model = mh.fit(df_train)

df_train = model.transform(df_train)
df_train.show()

+--------+--------------------+--------------------+--------------------+--------------------+--------------------+
|polarity|               title|         description|            shingles|    one_hot_shingles|              hashes|
+--------+--------------------+--------------------+--------------------+--------------------+--------------------+
|       2|Stuning even for ...|This sound track ...|[ tr, ist, wel,  ...|(1676,[0,1,2,3,4,...|[[1784878.0], [11...|
|       2|The best soundtra...|I'm reading a lot...|[uy , g I, ney, o...|(1676,[0,1,2,3,4,...|[[1784878.0], [11...|
|       2|            Amazing!|"This soundtrack ...|[mor, nte, u'v, i...|(1676,[0,1,2,3,4,...|[[1.2129679E7], [...|
|       2|Excellent Soundtrack|I truly like this...|[ tr, Gal, pea, c...|(1676,[0,1,2,3,4,...|[[1784878.0], [11...|
|       2|Remember, Pull Yo...|If you've played ...|[da , t a, rot, i...|(1676,[0,1,2,3,4,...|[[1784878.0], [49...|
|       2|an absolute maste...|I am quite sure a...|[ tr, mor, tly, o...

Exception ignored in: <_io.BufferedWriter name=5>
Traceback (most recent call last):
  File "/home/kevin/Documents/uni/3anno/2_semestre/BIG DATA/ex/1_amazon_reviews/venv/lib/python3.13/site-packages/pyspark/python/lib/pyspark.zip/pyspark/daemon.py", line 193, in manager
BrokenPipeError: [Errno 32] Broken pipe


# K-NN

In [13]:
df_test = df_test.limit(10).withColumn("shingles", shingle_udf(F.col("description"), F.lit(q)))
df_test.select('shingles').show(truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [14]:
# Mostra il DataFrame con i shingles
df_test = cv_model.transform(df_test)
df_test.select('one_hot_shingles').show(truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [15]:
df_test.printSchema()

root
 |-- polarity: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- description: string (nullable = true)
 |-- shingles: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- one_hot_shingles: vector (nullable = true)



In [22]:
rows = df_train.limit(10).collect()

Exception ignored in: <_io.BufferedWriter name=5>
Traceback (most recent call last):
  File "/home/kevin/Documents/uni/3anno/2_semestre/BIG DATA/ex/1_amazon_reviews/venv/lib/python3.13/site-packages/pyspark/python/lib/pyspark.zip/pyspark/daemon.py", line 193, in manager
BrokenPipeError: [Errno 32] Broken pipe


In [23]:
k = 3

predictions = []
for row in rows:
	neighbors = model.approxNearestNeighbors(df_train, row["one_hot_shingles"], k)

	result_row = neighbors.select(round(avg(col("polarity"))).alias('pred_polarity')).first()

	if result_row and result_row[0] is not None:
			pred_polarity = int(result_row[0])
	else:
			if not result_row:
						print(f"Warning: No neighbors found for row. Using default polarity (None).")
			else:
						print(f"Warning: Average polarity calculation resulted in None for row. Using default polarity (None).")
			pred_polarity = None

	predictions.append((row["title"], row["description"], row["polarity"], pred_polarity))

In [24]:
df_predictions = spark.createDataFrame(predictions, ["title", "description", "polarity", "pred_polarity"])

In [26]:
df_predictions.show()

+--------------------+--------------------+--------+-------------+
|               title|         description|polarity|pred_polarity|
+--------------------+--------------------+--------+-------------+
|Stuning even for ...|This sound track ...|       2|            2|
|The best soundtra...|I'm reading a lot...|       2|            2|
|            Amazing!|"This soundtrack ...|       2|            2|
|Excellent Soundtrack|I truly like this...|       2|            2|
|Remember, Pull Yo...|If you've played ...|       2|            2|
|an absolute maste...|I am quite sure a...|       2|            2|
|        Buyer beware|"This is a self-p...|       1|            2|
|      Glorious story|I loved Whisper o...|       2|            2|
|    A FIVE STAR BOOK|I just finished r...|       2|            2|
|Whispers of the W...|This was a easy t...|       2|            2|
+--------------------+--------------------+--------+-------------+



# Evaluation

In [27]:
errors = df_predictions.filter(col("polarity") != col("pred_polarity")).count()
total = df_predictions.count()
error_rate = errors / total

print(f"Error Rate: {error_rate:.4f}")

Error Rate: 0.1000
