In [2]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import isnan, when, count, col
from pyspark.sql.types import StringType, BooleanType, IntegerType, DoubleType, NumericType
import matplotlib.pyplot as plt
import seaborn as sns
from pyspark.ml.feature import VectorAssembler, MinMaxScaler
from pyspark.ml import Pipeline
from pyspark.ml.functions import vector_to_array
import pandas as pd
import numpy as np

In [14]:
spark = SparkSession.builder \
   .master("local") \
   .appName("DataPreparation") \
   .config("spark.executor.memory", "8gb") \
   .master("local[*]") \
   .getOrCreate()

sc = spark.sparkContext

In [22]:
num_cores = spark.sparkContext.defaultParallelism
print(f"Spark is using {num_cores} cores.")

1

#Upload the Datasets

In [1]:
import kagglehub
import shutil
from pathlib import Path

# Kaggle dataset: https://www.kaggle.com/datasets/mohamedbakhet/amazon-books-reviews
DATASET = "mohamedbakhet/amazon-books-reviews"

def main():
    print(f"[INFO] Downloading dataset: {DATASET}")
    path = kagglehub.dataset_download(DATASET)
    print("Downloaded to:", path)

    # Copy data to local ./data directory for consistency
    target_dir = Path("data")
    target_dir.mkdir(exist_ok=True)

    print(f"[INFO] Copying files to {target_dir}/")
    for item in Path(path).iterdir():
        dest = target_dir / item.name
        if item.is_file():
            shutil.copy(item, dest)
        elif item.is_dir():
            shutil.copytree(item, target_dir / item.name, dirs_exist_ok=True)

    print("\n[✔] Done. Dataset files are now available in ./data")

if __name__ == "__main__":
    main()

[INFO] Downloading dataset: mohamedbakhet/amazon-books-reviews
Downloading from https://www.kaggle.com/api/v1/datasets/download/mohamedbakhet/amazon-books-reviews?dataset_version_number=1...


100%|██████████| 1.06G/1.06G [00:09<00:00, 117MB/s]

Extracting files...





Downloaded to: /root/.cache/kagglehub/datasets/mohamedbakhet/amazon-books-reviews/versions/1
[INFO] Copying files to data/

[✔] Done. Dataset files are now available in ./data


In [4]:
df_rating = spark.read.csv('/content/data/Books_rating.csv', header=True, inferSchema=True)
df_books = spark.read.csv('/content/data/books_data.csv', header=True, inferSchema=True)

#Important recall on the Understanding task

Rating

In [8]:
df_rating.printSchema()

root
 |-- Id: string (nullable = true)
 |-- Title: string (nullable = true)
 |-- Price: string (nullable = true)
 |-- User_id: string (nullable = true)
 |-- profileName: string (nullable = true)
 |-- review/helpfulness: string (nullable = true)
 |-- review/score: string (nullable = true)
 |-- review/time: string (nullable = true)
 |-- review/summary: string (nullable = true)
 |-- review/text: string (nullable = true)



In [11]:
for col_name in df_rating.columns:
    print(f"The column '{col_name}' has {df_rating.select(col_name).distinct().count()} unique values.")

The column 'Id' has 221998 unique values.
The column 'Title' has 212400 unique values.
The column 'Price' has 6191 unique values.
The column 'User_id' has 1008435 unique values.
The column 'profileName' has 854499 unique values.
The column 'review/helpfulness' has 16533 unique values.
The column 'review/score' has 2038 unique values.
The column 'review/time' has 6831 unique values.
The column 'review/summary' has 1585313 unique values.
The column 'review/text' has 2059450 unique values.


In [36]:
df_rating.show(5)
# Id:                    string,
# Title:                 string,
# Price:                 string,
# User_id:               string,
# profileName:           string,
# review/helpfulness:    string,
# review/score:          string,
# review/time:           string,
# review/summary:        string,
# review/text:           string

+----------+--------------------+-----+--------------+--------------------+------------------+------------+-----------+--------------------+--------------------+
|        Id|               Title|Price|       User_id|         profileName|review/helpfulness|review/score|review/time|      review/summary|         review/text|
+----------+--------------------+-----+--------------+--------------------+------------------+------------+-----------+--------------------+--------------------+
|1882931173|Its Only Art If I...| NULL| AVCGYZL8FQQTD|"Jim of Oz ""jim-...|               7/7|         4.0|  940636800|Nice collection o...|This is only for ...|
|0826414346|Dr. Seuss: Americ...| NULL|A30TK6U7DNS82R|       Kevin Killian|             10/10|         5.0| 1095724800|   Really Enjoyed It|I don't care much...|
|0826414346|Dr. Seuss: Americ...| NULL|A3UH4UZ4RSVO82|        John Granger|             10/11|         5.0| 1078790400|Essential for eve...|"If people become...|
|0826414346|Dr. Seuss: Ameri

In [40]:
#Additional check on "price" to show that there i no money symobl
df_rating.filter(col("Price").isNotNull()).show(5)

+----------+--------------------+-----+--------------+--------------------+------------------+------------+-----------+--------------------+--------------------+
|        Id|               Title|Price|       User_id|         profileName|review/helpfulness|review/score|review/time|      review/summary|         review/text|
+----------+--------------------+-----+--------------+--------------------+------------------+------------+-----------+--------------------+--------------------+
|0829814000|Wonderful Worship...|19.40| AZ0IOBU20TBOP|  Rev. Pamela Tinnin|              8/10|         5.0|  991440000|Outstanding Resou...|I just finished t...|
|0829814000|Wonderful Worship...|19.40|A373VVEU6Z9M0N|Dr. Terry W. Dorsett|               1/1|         5.0| 1291766400|Small Churches CA...|"Many small churc...|
|0829814000|Wonderful Worship...|19.40| AGKGOH65VTRR4|"Cynthia L. Lajoy...|               1/1|         5.0| 1248307200|Not Just for Past...|I just finished r...|
|0829814000|Wonderful Worshi

In [26]:
num_rows = df_rating.count()
num_cols = len(df_rating.columns)

print(f"Numero di righe: {num_rows}") #3.000.000
print(f"Numero di colonne: {num_cols}") #10


Numero di righe: 3000000
Numero di colonne: 10


Books

In [24]:
df_books.printSchema()

root
 |-- Title: string (nullable = true)
 |-- description: string (nullable = true)
 |-- authors: string (nullable = true)
 |-- image: string (nullable = true)
 |-- previewLink: string (nullable = true)
 |-- publisher: string (nullable = true)
 |-- publishedDate: string (nullable = true)
 |-- infoLink: string (nullable = true)
 |-- categories: string (nullable = true)
 |-- ratingsCount: string (nullable = true)



In [25]:
for col_name in df_books.columns:
    print(f"The column '{col_name}' has {df_books.select(col_name).distinct().count()} unique values.")

The column 'Title' has 212400 unique values.
The column 'description' has 133257 unique values.
The column 'authors' has 133019 unique values.
The column 'image' has 149421 unique values.
The column 'previewLink' has 186014 unique values.
The column 'publisher' has 34265 unique values.
The column 'publishedDate' has 28948 unique values.
The column 'infoLink' has 180644 unique values.
The column 'categories' has 28362 unique values.
The column 'ratingsCount' has 16102 unique values.


In [27]:
df_books.show(3)
# Title:              string,
# description:        string,
# authors:            string,
# image:              string,
# previewLink:        string,
# publisher:          string,
# publishedDate:      string,
# infoLink:           string,
# categories:         string,
# ratingsCount:       string

DataFrame[Title: string, description: string, authors: string, image: string, previewLink: string, publisher: string, publishedDate: string, infoLink: string, categories: string, ratingsCount: string]

In [28]:
num_rows = df_books.count()
num_cols = len(df_books.columns)

print(f"Numero di righe: {num_rows}") #212.404
print(f"Numero di colonne: {num_cols}") #10


Numero di righe: 212404
Numero di colonne: 10


#Rating preprocessing

In [41]:
#Clean the column name to avoid problem raised from '/'
df_rating = df_rating.withColumnRenamed("review/helpfulness", "ReviewHelpfulness") \
       .withColumnRenamed("review/score", "ReviewScore") \
       .withColumnRenamed("review/time", "ReviewTime") \
       .withColumnRenamed("review/summary", "ReviewSummary") \
       .withColumnRenamed("review/text", "ReviewText")


In [54]:
from pyspark.sql.functions import col, when, split

#1. Split the values
df_rating = df_rating.withColumn("help_yes", split(col("ReviewHelpfulness"), "/")[0].cast("int"))
df_rating = df_rating.withColumn("help_total", split(col("ReviewHelpfulness"), "/")[1].cast("int"))

# 2. Calculate Helpfulness Rate
df_rating = df_rating.withColumn(
    "ReviewHelpRrate",
    when(col("help_total") > 0, col("help_yes") / col("help_total")).otherwise(0)
)

df_rating = df_rating.drop("ReviewHelpfulness", "help_yes", "help_total")

#dtype: ('ReviewHelpRrate', 'double')

In [57]:
from pyspark.sql.functions import regexp_replace

df_rating = df_rating.withColumn("Price", regexp_replace("Price", "[$€]", "")) \
       .withColumn("Price", col("Price").cast("float"))

df_rating = df_rating.withColumn("ReviewScore", col("ReviewScore").cast("float"))

#dtype: ('Price', 'float')
#dtype: ('ReviewScore', 'float')


In [64]:
from pyspark.sql import functions as F

# Group by Id, Title, Price to remove duplicate reviews
# For numeric columns 'ReviewScore' and 'ReviewHelpRate' we calculate the average
# For text columns 'ReviewSummary' and 'ReviewText', we concatenate all reviews into a single string
# We keep the latest review timestamp to track the most recent activity.

df_ratingAGG = df_rating.groupBy("Id", "Title", "Price").agg(
    F.collect_list("User_id").alias("User_ids"),
    F.collect_list("profileName").alias("ProfileNames"),
    F.avg("ReviewScore").alias("AvgScore"),
    F.collect_list("ReviewScore").alias("AllScores"),
    F.max("ReviewTime").alias("LatestReviewTime"),
    F.collect_list("ReviewSummary").alias("ReviewSummaries"),
    F.collect_list("ReviewText").alias("ReviewTexts"),
    F.avg("ReviewHelpRrate").alias("AvgHelpRate")
)
df_rating = df_ratingAGG.drop("AllScores")

In [45]:
#Preparation of the time column: from timestamp Unix to a readable date in the format YYYY-MM-DD
from pyspark.sql.functions import from_unixtime, col, to_date

df_rating = df_rating.withColumn("ReviewDate", to_date(from_unixtime(col("ReviewTime"))))
df_rating.select("ReviewTime", "ReviewDate").show(5, truncate=False)

#The dtype of this column now is ('ReviewDate', 'date')

+----------+----------+
|ReviewTime|ReviewDate|
+----------+----------+
|940636800 |1999-10-23|
|1095724800|2004-09-21|
|1078790400|2004-03-09|
|1090713600|2004-07-25|
|1107993600|2005-02-10|
+----------+----------+
only showing top 5 rows



In [56]:
#check for the studies, can we retrive using an API?????
null_price_count = df_rating.filter(col("Price").isNull()).count()
print("Number of rows where Price is null:", null_price_count)

Number of rows where Price is null: 2517579


In [58]:
df_rating.show(5)

+----------+--------------------+-----+--------------+--------------------+-----------+----------+--------------------+--------------------+----------+------------------+
|        Id|               Title|Price|       User_id|         profileName|ReviewScore|ReviewTime|       ReviewSummary|          ReviewText|ReviewDate|   ReviewHelpRrate|
+----------+--------------------+-----+--------------+--------------------+-----------+----------+--------------------+--------------------+----------+------------------+
|1882931173|Its Only Art If I...| NULL| AVCGYZL8FQQTD|"Jim of Oz ""jim-...|        4.0| 940636800|Nice collection o...|This is only for ...|1999-10-23|               1.0|
|0826414346|Dr. Seuss: Americ...| NULL|A30TK6U7DNS82R|       Kevin Killian|        5.0|1095724800|   Really Enjoyed It|I don't care much...|2004-09-21|               1.0|
|0826414346|Dr. Seuss: Americ...| NULL|A3UH4UZ4RSVO82|        John Granger|        5.0|1078790400|Essential for eve...|"If people become...|2004-

In [34]:
duplicates = df_rating.join(
    df_rating.groupBy("Id").count().filter("count > 1"),
    on="Id",
    how="inner"
)

duplicates.show(truncate=False)

+----------+-----------------------------------------------------------------------------------+-----+--------------+---------------------------------+------------------+------------+-----------+------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

#initial answer for BQ1, it's not in this phase

In [61]:
from pyspark.sql.functions import count, avg

product_stats = df_rating.groupBy("Title").agg(
    count("*").alias("review_count"),
    avg("ReviewScore").alias("avg_rating"),
    avg("Price").alias("avg_price")
)


In [62]:
corr_value = product_stats.corr("review_count", "avg_rating")
print("Correlation review_count ↔ avg_rating:", corr_value)

Correlation review_count ↔ avg_rating: -0.00028006229374682225


#Books preprocessing


In [70]:
df_books = df_books.drop("previewLink", "infoLink", "image", "ratingsCount")
#"previewLink", "infoLink", "image" --> not helpful for our project
# "ratingsCount" --> contain a lot of missing values

In [77]:
from pyspark.sql.functions import col, regexp_extract
from pyspark.sql.types import IntegerType

# Keep only the year andput null when the values are wrong
df_books = df_books.withColumn(
    "publishedYear",
    regexp_extract(col("publishedDate"), r"(\d{4})", 1).cast(IntegerType())
)
df_books = df_books.drop("publishedDate")

In [92]:
from pyspark.sql.functions import regexp_replace, when, col, trim

#clean the column
df_books = df_books.withColumn(
    "authors_clean",
    regexp_replace(col("authors"), r"[\[\]'\" ]", " ")
)

# keep only the capitalized words
df_books = df_books.withColumn(
    "authors_clean",
    regexp_replace(col("authors_clean"), r"\b[a-z]+\b", "")
)

# remove extra spaces
df_books = df_books.withColumn(
    "authors_clean",
    regexp_replace(col("authors_clean"), r"\s+", " ").alias("authors_clean")
)

# Keep only the part befor a stop(.)
df_books = df_books.withColumn(
    "authors_clean",
    trim(split(col("authors_clean"), "\\.")[0])
)

#Blanck spaces or value too short are memorized as "unknown"
df_books = df_books.withColumn(
    "authors_clean",
    when(
        (col("authors_clean").isNull()) | (col("authors_clean") == ""),
        "Unknown"
    ).otherwise(col("authors_clean"))
)

#final result
df_books = df_books.drop("authors")



In [None]:
# df_books.show(5)

In [73]:
df_books.filter(df_books.published_year.isNull()).show(truncate=False)


+----------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------+--------------------+-------------+------------+--------------+
|Title                                                                             |description                                                                                                                                                                                           |authors                    |publisher           |publishedDate|categories  |published_year|
+----------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------

categories need an intelligent preprocessing because it's useful for the project

the others let your immagination fly

#final dataset

In [94]:
# df_merged = df_rating.join(df_books, on="Title", how="inner") --> it's wrong