<a href="https://colab.research.google.com/github/AndreaMoschetto/minhashingHSL/blob/main/AmazonMinHashingLSH.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Exercise on MinHashing HSL with Amazon reviews dataset
stud: **Andrea Moschetto**

## Libraries


In [None]:
# spark general
import pyspark
from pyspark.mllib import *
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col, count
from pyspark.sql.functions import udf


# google drive api
from google.colab import drive

# .tar file extraction
import tarfile
import os

# data preparation
from pyspark.sql.functions import concat_ws, lower, col
from pyspark.sql.functions import trim

# models
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.feature import MinHashLSH

## Utils Functions

In [None]:
def load_dataset(data_path: str, name: str, limit_num: int):
  df = spark.read.csv(data_path)
  df = df.withColumnRenamed('_c0', 'polarity').withColumnRenamed('_c1', 'title').withColumnRenamed('_c2', 'text')

  df.show(5)
  if limit_num > 0:
    print(f'{name} limited to {limit_num} units')
    return df.limit(limit_num)
  print('Using the whole dataset')
  return df


In [None]:
def data_preparation(df, name: str):
  # concat title e text
  df = df.withColumn('full_text',
    lower(concat_ws(" ", col("title"), col("text"))))

  # remove special characters and numbers
  df = df.withColumn(
      "full_text",
      regexp_replace(col("full_text"), "[^a-zA-Z\s]", "")  # solo lettere e spazi
  )

  # normalize multiple blank spaces
  df = df.withColumn(
      "full_text",
      regexp_replace(trim(col("full_text")), "\s+", " ")  # spazi singoli
  )
  print(f'PREPARED {name}')
  df.show(5)
  return df

In [None]:

def q_shingle(text, q=3):
    return [text[i:i+q] for i in range(len(text) - q + 1)] if text else []
q_shingle_udf = udf(q_shingle, ArrayType(StringType()))

## Extract dataset

In [None]:
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
# Download it from: https://www.kaggle.com/datasets/kritanjalijain/amazon-reviews
tar_path = '/content/drive/MyDrive/Bigdata/datasets/amazon_review_polarity_csv.tar'


if tarfile.is_tarfile(tar_path):
    with tarfile.open(tar_path) as tar:
        tar.extractall()
        print("Estrazione completata.")
        # Lista dei file estratti
        print("Contenuto estratto:")
        print(tar.getnames())
else:
    print("Il file non è un archivio .tar valido.")

Estrazione completata.
Contenuto estratto:
['amazon_review_polarity_csv', 'amazon_review_polarity_csv/train.csv', 'amazon_review_polarity_csv/readme.txt', 'amazon_review_polarity_csv/test.csv']


In [None]:
!ls amazon_review_polarity_csv/

readme.txt  test.csv  train.csv


## Spark session

In [None]:

conf = SparkConf().set("spark.ui.port", "4050")


sc = pyspark.SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()

## CONFIGS


In [None]:
TRAIN_UNITS = 100000
TEST_UNITS = 10000

## TRAIN

### Data Preparation

I create a new column containing concat of title and text in lower case

In [None]:
data_path='amazon_review_polarity_csv/train.csv'
df_train = load_dataset(data_path=data_path, name='TRAIN_SET', limit_num=TRAIN_UNITS)
df_train = data_preparation(df_train, 'TRAIN_SET')

+--------+--------------------+--------------------+
|polarity|               title|                text|
+--------+--------------------+--------------------+
|       2|Stuning even for ...|This sound track ...|
|       2|The best soundtra...|I'm reading a lot...|
|       2|            Amazing!|"This soundtrack ...|
|       2|Excellent Soundtrack|I truly like this...|
|       2|Remember, Pull Yo...|If you've played ...|
+--------+--------------------+--------------------+
only showing top 5 rows

TRAIN_SET limited to 100000 units
PREPARED TRAIN_SET
+--------+--------------------+--------------------+--------------------+
|polarity|               title|                text|           full_text|
+--------+--------------------+--------------------+--------------------+
|       2|Stuning even for ...|This sound track ...|stuning even for ...|
|       2|The best soundtra...|I'm reading a lot...|the best soundtra...|
|       2|            Amazing!|"This soundtrack ...|amazing this soun...|
|

### Shingles

In [None]:
df_shingled = df_train.withColumn("shingles", q_shingle_udf("full_text"))
df_shingled.show(5)

+--------+--------------------+--------------------+--------------------+--------------------+
|polarity|               title|                text|           full_text|            shingles|
+--------+--------------------+--------------------+--------------------+--------------------+
|       2|Stuning even for ...|This sound track ...|stuning even for ...|[stu, tun, uni, n...|
|       2|The best soundtra...|I'm reading a lot...|the best soundtra...|[the, he , e b,  ...|
|       2|            Amazing!|"This soundtrack ...|amazing this soun...|[ama, maz, azi, z...|
|       2|Excellent Soundtrack|I truly like this...|excellent soundtr...|[exc, xce, cel, e...|
|       2|Remember, Pull Yo...|If you've played ...|remember pull you...|[rem, eme, mem, e...|
+--------+--------------------+--------------------+--------------------+--------------------+
only showing top 5 rows



### One-Hot creation
A sparse matrix of 0 and 1


In [None]:

# Initialize CountVectorizer to do binary (one-hot-like) encoding
cv = CountVectorizer(inputCol="shingles", outputCol="raw_features", binary=True)

# Fit the model on DataFrame
onehot_model = cv.fit(df_shingled)

# Transform the DataFrame to get the binary vector
df_vectorized = onehot_model.transform(df_shingled)
df_vectorized.show(5)

+--------+--------------------+--------------------+--------------------+--------------------+--------------------+
|polarity|               title|                text|           full_text|            shingles|        raw_features|
+--------+--------------------+--------------------+--------------------+--------------------+--------------------+
|       2|Stuning even for ...|This sound track ...|stuning even for ...|[stu, tun, uni, n...|(13188,[0,1,2,3,4...|
|       2|The best soundtra...|I'm reading a lot...|the best soundtra...|[the, he , e b,  ...|(13188,[0,1,2,3,4...|
|       2|            Amazing!|"This soundtrack ...|amazing this soun...|[ama, maz, azi, z...|(13188,[0,1,2,3,4...|
|       2|Excellent Soundtrack|I truly like this...|excellent soundtr...|[exc, xce, cel, e...|(13188,[0,1,2,3,4...|
|       2|Remember, Pull Yo...|If you've played ...|remember pull you...|[rem, eme, mem, e...|(13188,[0,1,2,3,4...|
+--------+--------------------+--------------------+--------------------

### Hashes


In [None]:


minhash = MinHashLSH(inputCol="raw_features", outputCol="hashes", numHashTables=5)
minhash_model = minhash.fit(df_vectorized)
df_hashed = minhash_model.transform(df_vectorized)
print('TRAINING_SET HASHES')
df_hashed.show(5)

TRAINING_SET HASHES
+--------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|polarity|               title|                text|           full_text|            shingles|        raw_features|              hashes|
+--------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|       2|Stuning even for ...|This sound track ...|stuning even for ...|[stu, tun, uni, n...|(13188,[0,1,2,3,4...|[[717723.0], [155...|
|       2|The best soundtra...|I'm reading a lot...|the best soundtra...|[the, he , e b,  ...|(13188,[0,1,2,3,4...|[[5527257.0], [79...|
|       2|            Amazing!|"This soundtrack ...|amazing this soun...|[ama, maz, azi, z...|(13188,[0,1,2,3,4...|[[717723.0], [155...|
|       2|Excellent Soundtrack|I truly like this...|excellent soundtr...|[exc, xce, cel, e...|(13188,[0,1,2,3,4...|[[717723.0], [155...|
|       2|Remember, P

## TEST

### Data preparation

In [None]:
data_path='amazon_review_polarity_csv/test.csv'
df_test = load_dataset(data_path=data_path, name='TEST_SET', limit_num=TEST_UNITS)
df_test = data_preparation(df_test, 'TEST_SET')

+--------+--------------------+--------------------+
|polarity|               title|                text|
+--------+--------------------+--------------------+
|       2|            Great CD|"My lovely Pat ha...|
|       2|One of the best g...|Despite the fact ...|
|       1|Batteries died wi...|I bought this cha...|
|       2|works fine, but M...|Check out Maha En...|
|       2|Great for the non...|Reviewed quite a ...|
+--------+--------------------+--------------------+
only showing top 5 rows

TEST_SET limited to 10000 units
PREPARED TEST_SET
+--------+--------------------+--------------------+--------------------+
|polarity|               title|                text|           full_text|
+--------+--------------------+--------------------+--------------------+
|       2|            Great CD|"My lovely Pat ha...|great cd my lovel...|
|       2|One of the best g...|Despite the fact ...|one of the best g...|
|       1|Batteries died wi...|I bought this cha...|batteries died wi...|
|   

### Inference with fitted model


In [None]:
df_test_shingled = df_test.withColumn("shingles", q_shingle_udf("full_text"))
df_test_vectorized = onehot_model.transform(df_test_shingled)
df_test_hashed = minhash_model.transform(df_test_vectorized)
print('TEST_SET HASHES')
df_test_hashed.show(5)

TEST_SET HASHES
+--------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|polarity|               title|                text|           full_text|            shingles|        raw_features|              hashes|
+--------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|       2|            Great CD|"My lovely Pat ha...|great cd my lovel...|[gre, rea, eat, a...|(13188,[0,1,2,3,4...|[[717723.0], [196...|
|       2|One of the best g...|Despite the fact ...|one of the best g...|[one, ne , e o,  ...|(13188,[0,1,2,3,4...|[[1847422.0], [15...|
|       1|Batteries died wi...|I bought this cha...|batteries died wi...|[bat, att, tte, t...|(13188,[0,1,2,3,4...|[[1847422.0], [91...|
|       2|works fine, but M...|Check out Maha En...|works fine but ma...|[wor, ork, rks, k...|(13188,[0,1,3,4,5...|[[717723.0], [196...|
|       2|Great for the n

In [None]:
# Find most frequent polarity
majority_polarity = df_test.groupBy("polarity").count().orderBy(col("count").desc()).first()["polarity"]

# Take one row that has majority polarity
key_row = df_test_hashed.filter(col("polarity") == majority_polarity).limit(1)
key_text = key_row.select('text').collect()
print(f'polarità più frequente: {majority_polarity}')
print(f'key_text = {key_text}')


polarità più frequente: 2
key_text = [Row(text='"My lovely Pat has one of the GREAT voices of her generation. I have listened to this CD for YEARS and I still LOVE IT. When I\'m in a good mood it makes me feel better. A bad mood just evaporates like sugar in the rain. This CD just oozes LIFE. Vocals are jusat STUUNNING and lyrics just kill. One of life\'s hidden gems. This is a desert isle CD in my book. Why she never made it big is just beyond me. Everytime I play this, no matter black, white, young, old, male, female EVERYBODY says one thing ""Who was that singing ?"""')]


In [None]:
key_features = key_row.select("raw_features").collect()[0]["raw_features"]

# Find the 3 nearest neighbours to the key datapoint
neighbors = minhash_model.approxNearestNeighbors(df_test_hashed, key_features, 3)

neighbors.select("polarity","distCol", "text").show(truncate=False)


+--------+------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|polarity|distCol           |text                                                                                                                                                                                                                                         

In [None]:
# sc.stop()