<a href="https://colab.research.google.com/github/Gianluca119712/project_AMD_finding_similar_items/blob/main/GianlucaMorena_project_AMD_finding_similar_items.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

author: **Gianluca Morena (42508A)**

course: *Algorithms for massive data, cloud and distributed computing*

degree: *MSc Data science for economics*

# Project 1: **Finding similar items**


 TASK: implement a detector of pairs of similar book review

## PySpark, Libraries, set up environment

In [3]:
!pip install -q pyspark
!pip install -q kaggle
!pip install -q stop-words

# Import necessary libraries
import os
import re

from pyspark.sql.functions import size
from pyspark.sql.functions import col, sum
from pyspark.sql.window import Window
from pyspark.sql import SparkSession


from itertools import combinations
from collections import defaultdict
from stop_words import get_stop_words
import time

In [5]:
spark = SparkSession.builder.appName("finding_similar_items").getOrCreate()


## Functions

In [6]:
def sample_size(df, percent, output_name="dataset", seed=42):

  if not isinstance(percent, (int, float)):
    raise ValueError("The percent value must be an INT or a FLOAT datatype")
  if not (0 < percent <= 1):
    raise ValueError("The percent value for the sample size must fall within the interval (0, 1]")

  print(f"{output_name} --> sampling {percent * 100:.3f}% of the dataset")
  df_sample = df.sample(False, float(percent), 42)

  return df_sample

In [7]:
stop_word=get_stop_words('en')
split_regex = r'\W+'

def tokenize(string):
  return [s for s in re.split(split_regex, string.lower()) if s != '' and not s in stop_word and not s.isdigit()]

## Downloading and sample size Dataset

In [None]:
# Download the Kaggle datasets

# Create folder for storing imported Kaggle dataset
os.makedirs('dataset', exist_ok=True)

# Authenticate Kaggle API
kaggle_username = 'xxxxx' # Insert your Kaggle credentials
kaggle_key = 'xxxxx'

os.environ['KAGGLE_USERNAME'] = kaggle_username
os.environ['KAGGLE_KEY'] = kaggle_key

# Download the datasets and unzip
!kaggle datasets download mohamedbakhet/amazon-books-reviews -p dataset --unzip

Dataset URL: https://www.kaggle.com/datasets/mohamedbakhet/amazon-books-reviews
License(s): CC0-1.0
Downloading amazon-books-reviews.zip to dataset
 99% 1.05G/1.06G [00:16<00:00, 208MB/s]
100% 1.06G/1.06G [00:16<00:00, 68.4MB/s]


In [None]:
books_rating = spark.read.csv("dataset/Books_rating.csv", header=True, inferSchema=True)

In [None]:
# choose the % of dataset that you want to retrieve

sample_rating = sample_size(books_rating,0.10,'books_rating')

n_rows_sample_rating,n_cols_sample_rating=sample_rating.count(), len(sample_rating.columns)

print(f'sample rating --> has {n_rows_sample_rating} row and {n_cols_sample_rating} columns')

books_rating --> sampling 10.000% of the dataset
sample rating --> has 301036 row and 10 columns


## Data tansformation and Tokenizer

In [None]:
# rename some columns for better reading

sample_rating = sample_rating.select( col('Id').alias('Id'),
                                      col('User_id').alias('User_id'),
                                      #col('Title').alias('Title'),
                                      #col('review/score').alias('score'),
                                      #col('review/summary').alias('summary'),
                                      col('review/text').alias('text')
                                    )

In [None]:
# null values, beacause are noise for the tokenizer

sample_rating=sample_rating.na.drop(subset=['text','User_id'])

In [None]:
rdd_sample_rating=sample_rating.rdd

In [None]:
token_sample_rating=rdd_sample_rating.map(lambda s: ((s[0], s[1], s[2]), tokenize(s[2])))
rating_flat = token_sample_rating.map(lambda x: (x[0][0], x[0][1],x[0][2], x[1]))

In [None]:
# filter the array token that are empty

rating = rating_flat.toDF(
    ["Id", "User_id",'text', "tokens"]
)

rating=rating.filter(size(col("tokens")) > 0)

## Algorithm implementation

implementation through PySpark classes of hashing, MinHashing and computation of Jaccard distance

In [None]:

from pyspark.ml.feature import HashingTF

hashing_model = HashingTF(inputCol="tokens", outputCol="features", numFeatures= 8192)
hashing_rating = hashing_model.transform(rating)


from pyspark.ml.feature import MinHashLSH

minhashing_model = MinHashLSH(inputCol="features", outputCol="minhashes", numHashTables=2)
model = minhashing_model.fit(hashing_rating)

s_th= 0.6

similarity = model.approxSimilarityJoin(
                                            hashing_rating,
                                            hashing_rating,
                                            threshold=s_th,
                                            distCol="JaccardDistance"
                                          ).filter("datasetA.Id < datasetB.Id")

similarity = similarity.orderBy("JaccardDistance", ascending=True)

In [None]:
# a good evaluation of the model is to focus on these documents that are not identical

similarity = similarity.filter(
    (col("JaccardDistance") > 0.1))


visualization of the algorithm results and their execution time

In [None]:

similarity.select(
    "datasetA.Id", "datasetB.Id", "datasetA.text", "datasetB.text", "JaccardDistance"
).limit(10).collect()



In [None]:
first_pair = similarity.first()

review_a = first_pair['datasetA']['text']
review_b = first_pair['datasetB']['text']

# Printing the reviews
print("Review 1 (datasetA):")
print(review_a)
print("\nReview 2 (datasetB):")
print(review_b)