### Problem

One of the biggest challenges when wanting to read books is finding the right book to read. That is why we made BookForYou. BookForYou is a recommender system that suggests books for the user based on their inputted preferences for author, title, and book category. It uses book reviews from Amazon’s Book database to find the ideal book candidate.

### Identification of required data

The dataset used is [Amazon Book Reviews](https://www.kaggle.com/datasets/mohamedbakhet/amazon-books-reviews?select=books_data.csv).

The dataset consists of two entities, one with book details and the second containing book reviews. Each entity has 10 features, for a combined dataset size of 3.04 GB. As shown below, one book can have many reviews, but a review can only belong to a single book. Books are identified by their titles. From the book details, the title, author, year and category will be used. From the reviews entity, the content of the reviews, book rating, and the helpfulness rating of a given review will be used.

![Entities picture](images\entities.png)

For Book_details the following features will be used:
For reviews the following features will be used:

# can check relevancy of features using correlation metrics (pearson similarity) of decision tree?
* Id (the id of the book)
* title (Book Title)
* user_id (Id of user who rate the book)
* review/score (rating from 0 to 5 for the book)
* review/summary (the summary of text review)

### Data PreProcessing

The following imports will be used for data preprocesing.

In [1]:
import csv
import os
import sys
# Spark imports
from pyspark.rdd import RDD
from pyspark.sql import DataFrame
from pyspark.sql import SparkSession
from pyspark.sql.functions import desc
# Dask imports
import dask.bag as db
import dask.dataframe as df  # you can use Dask bags or dataframes
from csv import reader
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
import sklearn
import sklearn.tree        # For DecisionTreeClassifier class
import sklearn.ensemble    # For RandomForestClassifier class
import sklearn.datasets    # For make_circles
import sklearn.metrics     # For accuracy_score
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.recommendation import ALS 
from pyspark.sql.functions import col

Creating SparkSession

In [2]:
def init_spark():
    spark = SparkSession \
        .builder \
        .appName("BookForYou Recommender System") \
        .config("spark.driver.memory", "8g") \
        .config("spark.executor.memory", "8g") \
        .getOrCreate()
    return spark

Decision Trees for feature importance
(USELESS CODE)

In [15]:

# # define the categorical features to one-hot encode
# categorical_cols = ['Id', 'Title', 'Price', 'User_id', 'profileName', 'review/helpfulness', 'review/score', 'review/time', 'review/summary', 'review/text']


# # create a list of string indexers for each categorical feature
# indexers = [StringIndexer(inputCol=col, outputCol=col + "_index") for col in categorical_cols]

# encoder = OneHotEncoder(inputCols=[indexer.getOutputCol() for indexer in indexers],
#                                  outputCols=[col + "_encoded" for col in categorical_cols])

# # fit the indexers and encoder to the data
# indexers_models = [indexer.fit(df) for indexer in indexers]
# encoded_df = encoder.fit(
#     reduce(lambda data, model: model.transform(data), indexers_models, df)
# ).transform(
#     reduce(lambda data, model: model.transform(data), indexers_models, df)
# )
# print(encoded_df.columns)
# dt = DecisionTreeClassifier(labelCol="review/score_index", featuresCol='review/summary_encoded', maxDepth=2, maxBins=1000)
# model = dt.fit(encoded_df)
# print(model.featureImportances)
# print(df.count())

### Reviews Table 
We will read the data from the csv file

In [3]:
spark  = init_spark()
df_ratings = spark.read.csv("data/preprocessed/reviews.csv", header=True)

Missing Data
There may frequently be gaps in data sources, which leaves you with three main possibilities for completing the gaps

1. Just keep the missing data points.
2. Drop them missing data points (including the entire row)
3. Fill them in with some other value.

In [4]:
df_ratings = df_ratings.select("Id", "Title", "User_id", "review/score", "review/summary")
df_ratings = df_ratings.na.drop(subset=["Id","Title","User_id","review/score","review/summary"])
df_ratings = df_ratings.withColumnRenamed("Id", "book_string")
df_ratings = df_ratings.withColumnRenamed("User_id", "User_string")
df_ratings = df_ratings.filter(df_ratings["review/score"] <= 5)
df_ratings = df_ratings.filter(df_ratings["review/score"] >= 1)
#df.describe().show()

In [5]:
from pyspark.ml.feature import StringIndexer

# Create a StringIndexer for the "User_id" column
indexer = StringIndexer(inputCols=["book_string","User_string","review/score","Title"], outputCols=["book_id","User_id","rating","Book_title"])

# Fit the StringIndexer to the DataFrame
df_ratings = indexer.fit(df_ratings).transform(df_ratings).drop("book_string","User_string", "review/score","Title", "review/summary")

# Show the result
#df_ratings.show()

In [6]:
# Transform the dataset
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=df_ratings.columns, outputCol='features')
df_ratings = assembler.transform(df_ratings)

In [7]:
# # Feature scaling
# from pyspark.ml.feature import StandardScaler

# input_cols = ['Id', 'Title', 'User']
# output_cols = ['scaled_col1', 'scaled_col2', 'scaled_col3']

# scaler = StandardScaler(inputCols=input_cols, outputCols=output_cols)
# df = scaler.fit(df).transform(df)
# #scaler = StandardScaler(inputCol='features', outputCol='scaled_features')
# #df = scaler.fit(df).transform(df)


# Feature scaling
from pyspark.ml.feature import StandardScaler
scaler = StandardScaler(inputCol='features', outputCol='scaled_features')
df_ratings = scaler.fit(df_ratings).transform(df_ratings)
df_ratings.show()

+--------+--------+------+----------+--------------------+--------------------+
| book_id| User_id|rating|Book_title|            features|     scaled_features|
+--------+--------+------+----------+--------------------+--------------------+
|180201.0|167334.0|   1.0|  164569.0|[180201.0,167334....|[4.48501061902150...|
| 40116.0|    64.0|   0.0|   37769.0|[40116.0,64.0,0.0...|[0.99844443700460...|
| 40116.0|105599.0|   0.0|   37769.0|[40116.0,105599.0...|[0.99844443700460...|
| 40116.0|  4472.0|   1.0|   37769.0|[40116.0,4472.0,1...|[0.99844443700460...|
| 40116.0| 31627.0|   1.0|   37769.0|[40116.0,31627.0,...|[0.99844443700460...|
| 40116.0|  3581.0|   1.0|   37769.0|[40116.0,3581.0,1...|[0.99844443700460...|
| 40116.0|     0.0|   0.0|   37769.0|[40116.0,0.0,0.0,...|[0.99844443700460...|
| 40116.0|637113.0|   0.0|   37769.0|[40116.0,637113.0...|[0.99844443700460...|
| 40116.0|130558.0|   0.0|   37769.0|[40116.0,130558.0...|[0.99844443700460...|
| 40116.0|837115.0|   1.0|   37769.0|[40

In [14]:
# Creating the model
from pyspark.ml.clustering import KMeans
kmeans = KMeans(featuresCol='scaled_features', k=3)
kmeans = kmeans.fit(df_ratings)
#kmeans.show()

In [15]:
# Getting the predictions
kmeans = kmeans.transform(df_ratings)

In [16]:
kmeans.show(20)

+--------+--------+------+----------+--------------------+--------------------+----------+
| book_id| User_id|rating|Book_title|            features|     scaled_features|prediction|
+--------+--------+------+----------+--------------------+--------------------+----------+
|180201.0|167334.0|   1.0|  164569.0|[180201.0,167334....|[4.48501061902150...|         0|
| 40116.0|    64.0|   0.0|   37769.0|[40116.0,64.0,0.0...|[0.99844443700460...|         1|
| 40116.0|105599.0|   0.0|   37769.0|[40116.0,105599.0...|[0.99844443700460...|         1|
| 40116.0|  4472.0|   1.0|   37769.0|[40116.0,4472.0,1...|[0.99844443700460...|         1|
| 40116.0| 31627.0|   1.0|   37769.0|[40116.0,31627.0,...|[0.99844443700460...|         1|
| 40116.0|  3581.0|   1.0|   37769.0|[40116.0,3581.0,1...|[0.99844443700460...|         1|
| 40116.0|     0.0|   0.0|   37769.0|[40116.0,0.0,0.0,...|[0.99844443700460...|         1|
| 40116.0|637113.0|   0.0|   37769.0|[40116.0,637113.0...|[0.99844443700460...|         1|

In [17]:
# Creating the model
from pyspark.ml.clustering import KMeans
kmeans = KMeans(featuresCol='scaled_features', k=2)
kmeans = kmeans.fit(df_ratings)
#kmeans.show()

In [19]:
# Getting the predictions
kmeans = kmeans.transform(df_ratings)

In [20]:
kmeans.show(20)

+--------+--------+------+----------+--------------------+--------------------+----------+
| book_id| User_id|rating|Book_title|            features|     scaled_features|prediction|
+--------+--------+------+----------+--------------------+--------------------+----------+
|180201.0|167334.0|   1.0|  164569.0|[180201.0,167334....|[4.48501061902150...|         0|
| 40116.0|    64.0|   0.0|   37769.0|[40116.0,64.0,0.0...|[0.99844443700460...|         1|
| 40116.0|105599.0|   0.0|   37769.0|[40116.0,105599.0...|[0.99844443700460...|         1|
| 40116.0|  4472.0|   1.0|   37769.0|[40116.0,4472.0,1...|[0.99844443700460...|         1|
| 40116.0| 31627.0|   1.0|   37769.0|[40116.0,31627.0,...|[0.99844443700460...|         1|
| 40116.0|  3581.0|   1.0|   37769.0|[40116.0,3581.0,1...|[0.99844443700460...|         1|
| 40116.0|     0.0|   0.0|   37769.0|[40116.0,0.0,0.0,...|[0.99844443700460...|         1|
| 40116.0|637113.0|   0.0|   37769.0|[40116.0,637113.0...|[0.99844443700460...|         1|