<a href="https://colab.research.google.com/github/ettoreotery/AMD-project-DSE-25/blob/main/AMD_Ettore_Oteri_v3_0_1_SPARK.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import os
os.environ['KAGGLE_USERNAME'] = "ettoreoteri"
os.environ['KAGGLE_KEY'] = "xxxxxxxxxxxxxxxxxxxxxxxxxxx"
!kaggle datasets download -d mohamedbakhet/amazon-books-reviews
!unzip -q "*.zip" -d /content/

Dataset URL: https://www.kaggle.com/datasets/mohamedbakhet/amazon-books-reviews
License(s): CC0-1.0


In [None]:
import pandas as pd
df1 = pd.read_csv("Books_rating.csv")
df1.head()

Unnamed: 0,Id,Title,Price,User_id,profileName,review/helpfulness,review/score,review/time,review/summary,review/text
0,1882931173,Its Only Art If Its Well Hung!,,AVCGYZL8FQQTD,"Jim of Oz ""jim-of-oz""",7/7,4.0,940636800,Nice collection of Julie Strain images,This is only for Julie Strain fans. It's a col...
1,826414346,Dr. Seuss: American Icon,,A30TK6U7DNS82R,Kevin Killian,10/10,5.0,1095724800,Really Enjoyed It,I don't care much for Dr. Seuss but after read...
2,826414346,Dr. Seuss: American Icon,,A3UH4UZ4RSVO82,John Granger,10/11,5.0,1078790400,Essential for every personal and Public Library,"If people become the books they read and if ""t..."
3,826414346,Dr. Seuss: American Icon,,A2MVUWT453QH61,"Roy E. Perry ""amateur philosopher""",7/7,4.0,1090713600,Phlip Nel gives silly Seuss a serious treatment,"Theodore Seuss Geisel (1904-1991), aka &quot;D..."
4,826414346,Dr. Seuss: American Icon,,A22X4XUPKF66MR,"D. H. Richards ""ninthwavestore""",3/3,4.0,1107993600,Good academic overview,Philip Nel - Dr. Seuss: American IconThis is b...


In [None]:
print(f"This dataset contains {df1.shape[0]} rows and {df1.shape[1]} columns.")

This dataset contains 3000000 rows and 10 columns.


In [None]:
!pip install pyspark nltk
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

from pyspark.sql import SparkSession
from pyspark import SparkContext
import nltk
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
import string
from itertools import combinations
import random

nltk.download('all')



[nltk_data] Downloading collection 'all'
[nltk_data]    | 
[nltk_data]    | Downloading package abc to /root/nltk_data...
[nltk_data]    |   Unzipping corpora/abc.zip.
[nltk_data]    | Downloading package alpino to /root/nltk_data...
[nltk_data]    |   Unzipping corpora/alpino.zip.
[nltk_data]    | Downloading package averaged_perceptron_tagger to
[nltk_data]    |     /root/nltk_data...
[nltk_data]    |   Unzipping taggers/averaged_perceptron_tagger.zip.
[nltk_data]    | Downloading package averaged_perceptron_tagger_eng to
[nltk_data]    |     /root/nltk_data...
[nltk_data]    |   Unzipping
[nltk_data]    |       taggers/averaged_perceptron_tagger_eng.zip.
[nltk_data]    | Downloading package averaged_perceptron_tagger_ru to
[nltk_data]    |     /root/nltk_data...
[nltk_data]    |   Unzipping
[nltk_data]    |       taggers/averaged_perceptron_tagger_ru.zip.
[nltk_data]    | Downloading package averaged_perceptron_tagger_rus to
[nltk_data]    |     /root/nltk_data...
[nltk_data]    |  

True

In [None]:
# Initialize Spark configuration
spark = SparkSession.builder \
    .appName("BookReviewsJaccard") \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.memory", "8g") \
    .config("spark.sql.shuffle.partitions", "8") \
    .getOrCreate()

sc = spark.sparkContext

In [None]:
# Subsampling
SAMPLE_SIZE = 1000  # Sample size when subsampling
SEED = 42
USE_FULL_DATA = False  # Set to True to disable subsampling

In [None]:
#Preprocessing with error handling
lemmatizer = WordNetLemmatizer()
stop_words = set(stopwords.words('english'))

def preprocess_text(text):
    try:
        if not isinstance(text, str):
            return []
        text = text.lower().translate(str.maketrans('', '', string.punctuation))
        tokens = word_tokenize(text)
        return [lemmatizer.lemmatize(w) for w in tokens
               if w not in stop_words and len(w) > 2]
    except Exception:
        return []

# Load function to extract only review texts
def load_review_texts():
    # Read the review/text column
    lines = sc.textFile("Books_rating.csv")
    header = lines.first()

    # Extract only the review text column
    review_lines = lines.filter(lambda line: line != header) \
                      .map(lambda line: line.split('"')[-2] if '"' in line else line.split(',')[-1])

    # Sampling logic
    sampled_reviews = review_lines.filter(lambda x: len(x) > 10).collect() if USE_FULL_DATA else review_lines.filter(lambda x: len(x) > 10).takeSample(False, SAMPLE_SIZE, SEED)

    # Process sampled reviews
    processed_reviews = []
    for text in sampled_reviews:
        processed = preprocess_text(text)
        if len(processed) >= 5:  # Only keep reviews with enough tokens
            processed_reviews.append((text, processed))

    return sc.parallelize(processed_reviews, numSlices=8)

# Process data
reviews_rdd = load_review_texts().cache()

print(f"Sample size loaded: {reviews_rdd.count()}")

Sample size loaded: 969


In [None]:
# Similarity calculation
def jaccard_similarity(pair):
    set1 = set(pair[0][1])  # processed tokens from first review
    set2 = set(pair[1][1])  # processed tokens from second review
    intersection = len(set1 & set2)
    union = len(set1 | set2)
    return (pair[0][0], pair[1][0], intersection / union if union else 0.0, set1 & set2)

# Generate and process pairs
def process_pairs(rdd):
    return rdd.zipWithIndex() \
            .map(lambda x: (x[1]//1000, x[0])) \
            .groupByKey() \
            .flatMap(lambda x: [(v1, v2) for v1, v2 in combinations(list(x[1]), 2)]) \
            .filter(lambda x: x[0] != x[1]) \
            .map(jaccard_similarity) \
            .filter(lambda x: x[2] > 0)

# Get top pairs
top_pairs = process_pairs(reviews_rdd).takeOrdered(20, key=lambda x: -x[2])

# Print results
print("\nTOP 20 MOST SIMILAR PAIRS:")
for idx, (text1, text2, sim, common_tokens) in enumerate(top_pairs, 1):
    print(f"\n#{idx}: Similarity = {sim:.4f}")
    print("\nReview 1:")
    print(text1)
    print("\nReview 2:")
    print(text2)
    print(f"\nCommon tokens ({len(common_tokens)}): {common_tokens}")
    print("="*100)

spark.stop()


TOP 20 MOST SIMILAR PAIRS:

#1: Similarity = 0.2609

Review 1:
This was pretty good overall. Nothing particulary ground-shaking and I've read a lot of these things in other books too. Still, it was well presented and helpful.Recommended.

Review 2:
It's a lot more fun to read to your kid if the book has humour and imagination like most of these beginner books do.Not the best but still pretty good.

Common tokens (6): {'pretty', 'good', 'lot', 'book', 'still', 'read'}

#2: Similarity = 0.2500

Review 1:
I read Blackout and All Clear b/f this novel. Blackout and All Clear were great. Doomsday Book is good but not nearly as interesting as the other two.

Review 2:
I just read it to my brother and he was happy and thought it was a great book. I finished the book and thought that it was a pretty good book.

Common tokens (4): {'great', 'read', 'book', 'good'}

#3: Similarity = 0.2500

Review 1:
The book arrived within the time range allotted for shipping, and in great condition. Would not 