💡 **Summary:**  
There are way too many books to read even in a lifetime, so there is a question which books to recommend in the first place. In this project, I build an explicit-based recommendation engine to predict ratings on unread books, based on the previous explicit ratings. It's a collaborative filtering recommendation engine, that works based on similar user preferences. 

🔢 **Dataset:**    
Book ratings from individual Goodreads users.


🧰🤖 **Tools and techniques:**  
The recommendation system works on Spark, which is a popular framework for Big Data analysis.  
I use an Alternating Least Squares model (ALS) with a non-negative matrix factorization algorithm to factorize the user-book matrix. Then I can approximate the original matrix and predict the blank cells (user haven't read this book).


🔧 **Possible improvements:**  
1. Use the latent features to extract unobservable features that imply some kind of user preferences - f.eg movies with Tom Hank as an actor.  
2. Add more information from other .csv files.

👍 I hope this notebook will be helpful to you.   
💬 I will appreciate any comments, thoughts, ideas and questions in the comment section.  
💡 Constructive criticism will be appreciated.  

🎉 Have fun! 😁

🌍 Notebook published: 19-10-09  
🔧 Last update: 19-10-09   
👨‍💻 By Artur Górlicki 

<a id='table_of_contents'></a>
# Table of Contents:

0. <a href='#section_s0'>SETTINGS</a>

1. <a href='#section_s1'>INTRODUCTION</a>
2. <a href='#section_s2'>DATA PREPERATION</a>  
3. <a href='#section_s3'>EXPLORATORY DATA ANALYSIS (EDA)</a>
4. <a href='#section_s4'>MACHINE LEARNING (ML) </a>  
4.1 <a href='#section_s41'>Cross Validation Pipeline</a>  
4.2 <a href='#section_s42'>Alternating Least Squares Model (ALS) </a>  

<a href='#table_of_contents'>Back to Table of Contents</a>  
#  
<a id='section_s0'></a>
# 0. SETTINGS

In [1]:
! pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/87/21/f05c186f4ddb01d15d0ddc36ef4b7e3cedbeb6412274a41f26b55a650ee5/pyspark-2.4.4.tar.gz (215.7MB)
[K     |████████████████████████████████| 215.7MB 73kB/s 
[?25hCollecting py4j==0.10.7 (from pyspark)
[?25l  Downloading https://files.pythonhosted.org/packages/e3/53/c737818eb9a7dc32a7cd4f1396e787bd94200c3997c72c1dbe028587bd76/py4j-0.10.7-py2.py3-none-any.whl (197kB)
[K     |████████████████████████████████| 204kB 37.6MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l- \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - done
[?25h  Created wheel for pyspark: filename=pyspark-2.4.4-py2.py3-none-any.whl size=216131250 sha256=d2b1bb085cdb88

In [2]:
import os
import pandas as pd
import numpy as np

# PySpark
from pyspark import SparkContext
from pyspark.sql import SparkSession

# Exploratory Data Analysis (EDA)
from pyspark.sql.functions import col, min, max, avg, lit

# Machine Learning (ML)
from pyspark.ml.recommendation import ALS # Alternating Least Squares model
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator # Cross-Validation
from pyspark.ml.evaluation import RegressionEvaluator # Performance metric

# Visualization
import seaborn as sns
import matplotlib.pyplot as plt

from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

pd.set_option('display.max_columns', 200)
pd.set_option('display.max_colwidth', 400)

from matplotlib import rcParams
sns.set(context='notebook', style='whitegrid', rc={'figure.figsize': (18,4)})
rcParams['figure.figsize'] = 18,4

%matplotlib inline
%config InlineBackend.figure_format = 'retina'

# Setting random seed for reproducability
SEED = 23
np.random.seed = SEED
np.random.set_state = SEED

In [3]:
sc = SparkContext(appName = "Book-Recommendation")
print(sc)

<SparkContext master=local[*] appName=Book-Recommendation>


In [4]:
spark = SparkSession.Builder().getOrCreate()
print(spark)

<pyspark.sql.session.SparkSession object at 0x7f16d84f7f60>


<a href='#table_of_contents'>Back to Table of Contents</a>  
#  
<a id='section_s1'></a>
# 1. INTRODUCTION

<a href='#table_of_contents'>Back to Table of Contents</a>  
#  
<a id='section_s2'></a>
# 2. DATA PREPERATION

In [5]:
# Read csv into Spark DataFrame
ratings = spark.read.csv('../input/goodbooks-10k/ratings.csv',
                         header = True,
                         inferSchema=True)
print(type(ratings))

<class 'pyspark.sql.dataframe.DataFrame'>


In [6]:
# Read csv
to_read = spark.read.csv('../input/goodbooks-10k/to_read.csv',
                         header = True,
                         inferSchema=True)
print(type(to_read))

<class 'pyspark.sql.dataframe.DataFrame'>


Dataset is quite big, so for the purpose of showing the technique I will extract a sample.

In [7]:
ratings = ratings.sample(withReplacement = False, 
                         fraction = 0.01, # 1% of observation
                         seed = 2019)
print(ratings.count())

9837


In [8]:
# Use .printSchema() to see the datatypes of the ratings dataset
ratings.printSchema()

root
 |-- book_id: integer (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- rating: integer (nullable = true)



In [9]:
# Convert the columns to the proper data types
ratings = ratings.select(ratings.user_id,
                         ratings.book_id,
                         ratings.rating.cast("double"))

In [10]:
# Call .printSchema() again to confirm the columns are now in the correct format
ratings.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- book_id: integer (nullable = true)
 |-- rating: double (nullable = true)



<a href='#table_of_contents'>Back to Table of Contents</a>  
#  
<a id='section_s3'></a>
# 3. EXPLORATORY DATA ANALYSIS (EDA)

In [11]:
# First few rows of data
print(ratings.show(n = 5))

+-------+-------+------+
|user_id|book_id|rating|
+-------+-------+------+
|  30681|      1|   5.0|
|  14546|      2|   5.0|
|   5885|      3|   4.0|
|  17228|      8|   4.0|
|  49288|      8|   5.0|
+-------+-------+------+
only showing top 5 rows

None


### Sparsity

Calculate the ratio bewteen empty cells/all cells in matrix.  

In [12]:
# Count the total number of ratings in the dataset
numerator = ratings.select("rating").count()

# Count the number of distinct Id's
num_users = ratings.select("user_id").distinct().count()
num_items = ratings.select("book_id").distinct().count()

# Set the denominator equal to the number of users multiplied by the number of items
denominator = num_users * num_items

# Divide the numerator by the denominator
sparsity = (1.0 - (numerator * 1.0)/ denominator) * 100
print("The ratings dataframe is ", "%.2f" % sparsity + "% empty.")

The ratings dataframe is  99.98% empty.


Basically empty! But its not a problem for ALS as we will see soon.

### The GroupBy and Filter Methods

In [13]:
# Group data by user_id, count ratings
(ratings.groupBy("user_id")
    .count()
    .filter("`count` > 1")
    .sort(col("count").desc())
    .show(n = 10))

+-------+-----+
|user_id|count|
+-------+-----+
|  38082|    6|
|  17073|    6|
|  18857|    6|
|  27934|    5|
|  13991|    5|
|  12734|    5|
|  21820|    5|
|   8340|    5|
|   8959|    5|
|  26629|    5|
+-------+-----+
only showing top 10 rows



In [14]:
# Group data by book_id, count ratings
(ratings.groupBy("book_id")
    .count()
    .filter("`count` > 1")
    .sort(col("count").desc())
    .show(n = 10))

+-------+-----+
|book_id|count|
+-------+-----+
|   4245|    6|
|   9119|    6|
|   4230|    6|
|   3917|    6|
|   8822|    5|
|   1580|    5|
|   9088|    5|
|   4317|    5|
|   4515|    5|
|   1546|    5|
+-------+-----+
only showing top 10 rows



### Summary Statistics

In [15]:
# Min num ratings for items
print("Item with the fewest ratings: ")
ratings.groupBy("book_id").count().select(min("count")).show()

Item with the fewest ratings: 
+----------+
|min(count)|
+----------+
|         1|
+----------+



In [16]:
# Avg num ratings per item
print("Avg num ratings per item: ")
ratings.groupBy("book_id").count().select(avg("count")).show()

Avg num ratings per item: 
+------------------+
|        avg(count)|
+------------------+
|1.5701516360734238|
+------------------+



In [17]:
# Min num ratings for user
print("User with the fewest ratings: ")
ratings.groupBy("user_id").count().select(min("count")).show()

User with the fewest ratings: 
+----------+
|min(count)|
+----------+
|         1|
+----------+



In [18]:
# Avg num ratings per users
print("Avg num ratings per user: ")
ratings.groupBy("user_id").count().select(avg("count")).show()

Avg num ratings per user: 
+------------------+
|        avg(count)|
+------------------+
|1.2592165898617511|
+------------------+



<a href='#table_of_contents'>Back to Table of Contents</a>  
#  
<a id='section_s4'></a>
# 4. MACHINE LEARNING (ML)

I will use the ALS model to factorize the sparse user-book matrix into two separate matrixes. When we reverse this process and multiply two factor matrixes, we will get an approximation for each cell in the initial matrix, also those cells that were blank. Every cell will be filled with numbers. Those values that were present in the initial matrix, will act as a label for the training process. The model will try to minimize the difference between actual values and values from the multiplication of factor matrixes. In some sense, as a byproduct, it predicts the empty values.

The factorization algorithm will be non-negative matrix factorization (NMF or NNMF), because we want to have only positive values. In goodreads book rating system ratings are only positive (0-5). 

<a href='#table_of_contents'>Back to Table of Contents</a>  
#  
<a id='section_s41'></a>
### 4.1. Cross Validation Pipeline

In [19]:
# Create Generic ALS model - without hyperparameters
als = ALS(userCol="user_id", itemCol="book_id", ratingCol="rating", 
          nonnegative = True, # Non negative matrix factorization
          coldStartStrategy = "drop", # What to do if user do not appear in train and test set
          implicitPrefs = False) # Explicit preference

In [20]:
# Confirm that a model called "als" was created
type(als)

pyspark.ml.recommendation.ALS

### Train test split

In [21]:
# Create test and train set
(train, test) = ratings.randomSplit([0.8, 0.2], 
                                    seed = 1234)
print(type(train))

<class 'pyspark.sql.dataframe.DataFrame'>


### Hyperparameter Grid

In [22]:
# Add hyperparameters and their respective values to param_grid
param_grid = ParamGridBuilder() \
            .addGrid(als.rank, [10, 50, 100]) \
            .addGrid(als.maxIter, [5, 50, 100]) \
            .addGrid(als.regParam, [.01, .05, .1]) \
            .build()

### Performance metric

We want to minimize the Root Mean Square Error (RMSE), so the difference between predicted rating and actual rating.

In [23]:
# Define evaluator as RMSE
evaluator = RegressionEvaluator(metricName = "rmse", 
                                labelCol = "rating", 
                                predictionCol = "prediction")
# Print length of evaluator
print ("Num models to be tested: ", len(param_grid))

Num models to be tested:  27


### Cross validation

In [24]:
# Build cross validation using CrossValidator
cv = CrossValidator(estimator = als, 
                    estimatorParamMaps = param_grid, 
                    evaluator = evaluator, 
                    numFolds = 5)

In [25]:
# Confirm cv was built
print(cv)

CrossValidator_822218f59a45


<a href='#table_of_contents'>Back to Table of Contents</a>  
#  
<a id='section_s42'></a>
### 4.2. Alternating Least Squares Model (ALS)

### Generic model

In [26]:
# Fit generic model to the 'train' dataset
als_mod = als.fit(train)

In [27]:
test_pred = als_mod.transform(test)

In [28]:
# View the predictions 
test_pred.show(n = 10)

+-------+-------+------+----------+
|user_id|book_id|rating|prediction|
+-------+-------+------+----------+
|  33864|   1580|   3.0| 1.9183098|
|  14841|   1645|   5.0| 2.0209856|
|   2749|   6336|   4.0|  3.371828|
|  17328|   8638|   5.0|0.36690933|
|  41460|   4158|   3.0| 1.3361363|
|  26400|   1303|   5.0| 1.1609883|
|  15007|    808|   5.0| 1.6413031|
|  41616|    458|   3.0|  2.994131|
|  32204|   5578|   4.0|  1.913218|
|  11285|   3028|   3.0|  2.065584|
+-------+-------+------+----------+
only showing top 10 rows



In [29]:
# Calculate and print the RMSE of test_predictions
print(evaluator.evaluate(test_pred))

2.7316151044723673


Benchmark value. On average we are above or below the actual rating of 2.7. This is huge taking into account that rating is in range 0-5. 

# Cross-validated model

Unfortunately, there is a error when I try to fit the model to the data. I will try to correct it soon. For now the best model is generic als model.

In [30]:
# Fit cross validator to the 'train' dataset
# cv_mod = cv.fit(train)

In [31]:
#Extract best model from the cv model above
# best_model = cv_mod.bestModel

In [32]:
# Print best_model
# print(type(best_model))

In [33]:
# # Complete the code below to extract the ALS model parameters
# print("**Best Model**")

# # Print "Rank"
# print("  Rank:", best_model.getRank())

# # Print "MaxIter"
# print("  MaxIter:", best_model.getMaxIter())

# # Print "RegParam"
# print("  RegParam:", best_model.getRegParam())

### Predictions and Performance Evaluation

In [34]:
best_model = als_mod

In [35]:
test_predictions = best_model.transform(test)

In [36]:
# View the predictions 
test_predictions.show(n = 10)

# Calculate and print the RMSE of test_predictions
print ("RMSE: ", evaluator.evaluate(test_predictions))

+-------+-------+------+----------+
|user_id|book_id|rating|prediction|
+-------+-------+------+----------+
|  33864|   1580|   3.0| 1.9183098|
|  14841|   1645|   5.0| 2.0209856|
|   2749|   6336|   4.0|  3.371828|
|  17328|   8638|   5.0|0.36690933|
|  41460|   4158|   3.0| 1.3361363|
|  26400|   1303|   5.0| 1.1609883|
|  15007|    808|   5.0| 1.6413031|
|  41616|    458|   3.0|  2.994131|
|  32204|   5578|   4.0|  1.913218|
|  11285|   3028|   3.0|  2.065584|
+-------+-------+------+----------+
only showing top 10 rows

RMSE:  2.7316151044723673


RMSE is quite big.

### Model Performance Evaluation and Output Cleanup

In [37]:
# Generate n recommendations for all users
ALS_recommendations = best_model.recommendForAllUsers(numItems = 10) # n - 10

It will generate 10 recommendations for all users. It may also generate recommendations for books that the user has already read. I will filter them out later. 

In [38]:
ALS_recommendations.show(n = 10)

+-------+--------------------+
|user_id|     recommendations|
+-------+--------------------+
|   4900|[[2820, 4.9217324...|
|   6620|[[4911, 4.555248]...|
|   7240|[[9464, 4.9488525...|
|  21700|[[7350, 4.9506803...|
|  27760|[[7622, 2.6407099...|
|  32460|[[7404, 4.3317385...|
|  48510|[[4454, 5.725117]...|
|  18051|[[3970, 4.921773]...|
|  38311|[[8339, 4.9450903...|
|  40011|[[5456, 5.0649285...|
+-------+--------------------+
only showing top 10 rows



Let's transform the output to user_id | book_id | predicted rating shape.

In [39]:
# Temporary table
ALS_recommendations.registerTempTable("ALS_recs_temp")

In [40]:
clean_recs = spark.sql("""SELECT user_id,
                            movieIds_and_ratings.book_id AS book_id,
                            movieIds_and_ratings.rating AS prediction
                        FROM ALS_recs_temp
                        LATERAL VIEW explode(recommendations) exploded_table
                            AS movieIds_and_ratings""")
clean_recs.show()

+-------+-------+----------+
|user_id|book_id|prediction|
+-------+-------+----------+
|   4900|   2820| 4.9217324|
|   4900|   6486| 4.6845436|
|   4900|   1440| 4.5387926|
|   4900|   5492|  4.516373|
|   4900|    534|  4.511571|
|   4900|   6285|  4.491678|
|   4900|   8907|  4.403657|
|   4900|   2041|  4.391653|
|   4900|   4126| 4.3852353|
|   4900|   6175|    4.3761|
|   6620|   4911|  4.555248|
|   6620|   1744|  4.470581|
|   6620|   1025|  4.362337|
|   6620|   4508| 4.3102474|
|   6620|   6920| 4.2992077|
|   6620|   3542|  4.296758|
|   6620|   9501| 4.2871203|
|   6620|   8591| 4.2662153|
|   6620|   9768|  4.259452|
|   6620|   4012|  4.242892|
+-------+-------+----------+
only showing top 20 rows



To check, which book wasn't read by the user I will join the table with the actual rating and filter out those where there is a rating.

In [41]:
# Recommendations for unread books
(clean_recs.join(ratings, ["user_id", "book_id"], "left")
    .filter(ratings.rating.isNull()).show())

+-------+-------+----------+------+
|user_id|book_id|prediction|rating|
+-------+-------+----------+------+
|   4900|   6486| 4.6845436|  null|
|   4900|   1440| 4.5387926|  null|
|   4900|   5492|  4.516373|  null|
|   4900|    534|  4.511571|  null|
|   4900|   6285|  4.491678|  null|
|   4900|   8907|  4.403657|  null|
|   4900|   2041|  4.391653|  null|
|   4900|   4126| 4.3852353|  null|
|   4900|   6175|    4.3761|  null|
|   6620|   4911|  4.555248|  null|
|   6620|   1744|  4.470581|  null|
|   6620|   1025|  4.362337|  null|
|   6620|   4508| 4.3102474|  null|
|   6620|   6920| 4.2992077|  null|
|   6620|   3542|  4.296758|  null|
|   6620|   9501| 4.2871203|  null|
|   6620|   8591| 4.2662153|  null|
|   6620|   9768|  4.259452|  null|
|   6620|   4012|  4.242892|  null|
|   7240|   8063|  4.673035|  null|
+-------+-------+----------+------+
only showing top 20 rows



In [42]:
new_books = (clean_recs.join(ratings, ["user_id", "book_id"], "left")
    .filter(ratings.rating.isNull()))

In [43]:
print(new_books.count())

63255


I will joint the table with to_read table. This table stores information which books user marked as "I would like to read it". This might indicate that he is likely to rate them high, because he has chosen them in the first place.

In [44]:
recommendations = new_books.join(to_read.withColumn('to_read', lit(1)), 
                              on = ["user_id", "book_id"], 
                              how = "left")
print(recommendations.show())

+-------+-------+----------+------+-------+
|user_id|book_id|prediction|rating|to_read|
+-------+-------+----------+------+-------+
|    513|   1078| 3.8150978|  null|   null|
|    927|    739| 4.9066334|  null|   null|
|   1121|   9797| 5.3307767|  null|   null|
|   1175|   2021| 4.9970274|  null|   null|
|   1576|   8864| 3.2155378|  null|   null|
|   1799|   2268| 3.9579291|  null|   null|
|   1866|   6303|  4.394198|  null|   null|
|   1916|   3124| 4.2498875|  null|   null|
|   2163|   3668| 5.1476393|  null|   null|
|   2373|   4054| 4.7827616|  null|   null|
|   2382|   2638| 4.5336623|  null|   null|
|   2432|   6312|  4.377811|  null|   null|
|   2477|   5109| 2.1214325|  null|   null|
|   2485|   5237| 4.0008345|  null|   null|
|   2660|   1257| 4.0051045|  null|   null|
|   3050|   8505| 4.3424144|  null|   null|
|   3306|     95| 4.5352917|  null|   null|
|   3548|   1919| 4.4738526|  null|   null|
|   3962|   3234|  4.467758|  null|   null|
|   3971|   7402| 3.6912634|  nu

In [45]:
(recommendations
     .groupby('to_read')
     .count()
     .show())

+-------+-----+
|to_read|count|
+-------+-----+
|   null|63166|
|      1|   89|
+-------+-----+



There are only several dozen books, that users marked "I want to read".  
Let's see what is the predicted rating for those books. The hypothesis is that on average it will be high, because user wants to read them. 

In [46]:
(recommendations
     .filter(recommendations.to_read.isNotNull())
     .select(['user_id', 'book_id', 'prediction','to_read'])
     .show())

+-------+-------+----------+-------+
|user_id|book_id|prediction|to_read|
+-------+-------+----------+-------+
|  41946|   5810| 3.5906856|      1|
|  49101|   3241|  2.087709|      1|
|  12139|   7683| 4.4270315|      1|
|  40670|   2702| 4.4551253|      1|
|  25812|   4695|  4.103054|      1|
|  32860|   2244|  4.399159|      1|
|  11468|   7402|  3.953835|      1|
|  20452|    329|  4.757228|      1|
|  40110|    276| 4.5199947|      1|
|  38214|    319|  4.705698|      1|
|   4293|   2954| 4.1656017|      1|
|  44401|   5072|  4.462099|      1|
|   2962|   1325| 2.4017355|      1|
|  22846|   2114| 3.2482762|      1|
|  28530|   8039| 4.5299163|      1|
|  40851|   9797| 4.6195726|      1|
|  36987|    476| 3.9769363|      1|
|  24962|    150|       0.0|      1|
|  29464|   9587| 4.2431493|      1|
|  43131|    883| 4.5905147|      1|
+-------+-------+----------+-------+
only showing top 20 rows



In [47]:
to_read_recs = (recommendations
     .filter(recommendations.to_read.isNotNull())
     .select(['user_id', 'book_id', 'prediction','to_read']))
to_read_recs.show(n = 5)

+-------+-------+----------+-------+
|user_id|book_id|prediction|to_read|
+-------+-------+----------+-------+
|  41946|   5810| 3.5906856|      1|
|  49101|   3241|  2.087709|      1|
|  12139|   7683| 4.4270315|      1|
|  40670|   2702| 4.4551253|      1|
|  25812|   4695|  4.103054|      1|
+-------+-------+----------+-------+
only showing top 5 rows



In [48]:
(to_read_recs
     .withColumn('pred_trunc', to_read_recs.prediction.substr(1,1))
     .groupby('pred_trunc')
     .count()
     .sort('pred_trunc')
    .show())

+----------+-----+
|pred_trunc|count|
+----------+-----+
|         0|    2|
|         1|    2|
|         2|    5|
|         3|   18|
|         4|   52|
|         5|    9|
|         7|    1|
+----------+-----+



Most of the times people choose to read books, that are predicted to be 4 and 5, so it seems that the recommendation engine works quite good in recommending books, that people might like. 

Thanks to ALS, we have a huge pool of books that we can recommend to specific user. We can filter the unread books with highest ratings and propose them to the user.  

👍 I hope this notebook was helpful to you.  
💬 I will appreciate any comments, thoughts, ideas and questions in the comment section.  
💡 Constructive criticism will be appreciated.  
			
🎉 Thanks! 😁

📚 Learning resources:  
[Collaborative Filtering-Apache.org](https://spark.apache.org/docs/2.2.0/ml-collaborative-filtering.html)  
[Building Recommendation Engines with PySpark!-DataCamp](https://www.datacamp.com/courses/recommendation-engines-in-pyspark) 

🔝  <a href='#table_of_contents'>Back to Table of Contents</a> 