## Introduction
A book recommender system using collaborative-filtering, built using PySpark.
- Create spark session and load data into spark dataframe
- Feature engineering
    - Convert string cols to integer
- Model
    - Alternating Least Squares (ALS) model for collaborative filtering from Spark ML Lib
    - Fit model to train set
    - Predict on test set and evaluate root mean squared error (RMSE)
- Generate recommendations
    - Predict ratings on unrated books for each user, using fitted model
    - Recommend top-n books

### Imports

In [18]:
# core
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import matplotlib.pyplot as plt
import sklearn
import random, os
# spark & ML
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.functions import col
from pyspark.ml.feature import StringIndexer
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

In [2]:
# create spark session
#spark = SparkSession.builder.appName('Recommender').config("spark.driver.memory", "12g").getOrCreate()
spark = SparkSession.builder.appName('rec-sys').config("spark.python.profile.memory","true").config("spark.driver.memory", "15g").config("spark.executor.memory", "15g").getOrCreate()

In [3]:
#spark.stop()

## Data and preprocessing

In [5]:
# load data into spark dataframe
ratings_df = spark.read.csv('BX-Book-Ratings.csv', sep=';',
                            inferSchema=True,header=True)
ratings_df.show()

+-------+----------+-----------+
|User-ID|      ISBN|Book-Rating|
+-------+----------+-----------+
| 276725|034545104X|          0|
| 276726|0155061224|          5|
| 276727|0446520802|          0|
| 276729|052165615X|          3|
| 276729|0521795028|          6|
| 276733|2080674722|          0|
| 276736|3257224281|          8|
| 276737|0600570967|          6|
| 276744|038550120X|          7|
| 276745| 342310538|         10|
| 276746|0425115801|          0|
| 276746|0449006522|          0|
| 276746|0553561618|          0|
| 276746|055356451X|          0|
| 276746|0786013990|          0|
| 276746|0786014512|          0|
| 276747|0060517794|          9|
| 276747|0451192001|          0|
| 276747|0609801279|          0|
| 276747|0671537458|          9|
+-------+----------+-----------+
only showing top 20 rows



In [6]:
import py4j.protocol
from py4j.protocol import Py4JJavaError
from py4j.java_gateway import JavaObject
from py4j.java_collections import JavaArray, JavaList

from pyspark import RDD, SparkContext
from pyspark.serializers import PickleSerializer, AutoBatchedSerializer

In [7]:
def _to_java_object_rdd(rdd):
    """ Return a JavaRDD of Object by unpickling
    It will convert each Python object into Java object by Pyrolite, whenever the
    RDD is serialized in batch or not.
    """
    rdd = rdd._reserialize(AutoBatchedSerializer(PickleSerializer()))
    return rdd.ctx._jvm.org.apache.spark.mllib.api.python.SerDe.pythonToJava(rdd._jrdd, True)

JavaObj = _to_java_object_rdd(ratings_df.rdd)
spark._jvm.org.apache.spark.util.SizeEstimator.estimate(ratings_df._jdf)

4414360

In [8]:
ratings_df.toPandas().info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1149780 entries, 0 to 1149779
Data columns (total 3 columns):
 #   Column       Non-Null Count    Dtype 
---  ------       --------------    ----- 
 0   User-ID      1149780 non-null  int32 
 1   ISBN         1149780 non-null  object
 2   Book-Rating  1149780 non-null  int32 
dtypes: int32(2), object(1)
memory usage: 17.5+ MB


In [9]:
# show schema
ratings_df.printSchema()

root
 |-- User-ID: integer (nullable = true)
 |-- ISBN: string (nullable = true)
 |-- Book-Rating: integer (nullable = true)



In [10]:
# load books data into spark dataframe
books_df = spark.read.csv('BX-Books.csv', sep=';', inferSchema=True, header=True)
books_df = books_df.drop('Image-URL-S', 'Image-URL-M', 'Image-URL-L')
books_df.show()

+----------+--------------------+--------------------+-------------------+--------------------+
|      ISBN|          Book-Title|         Book-Author|Year-Of-Publication|           Publisher|
+----------+--------------------+--------------------+-------------------+--------------------+
|0195153448| Classical Mythology|  Mark P. O. Morford|               2002|Oxford University...|
|0002005018|        Clara Callan|Richard Bruce Wright|               2001|HarperFlamingo Ca...|
|0060973129|Decision in Normandy|        Carlo D'Este|               1991|     HarperPerennial|
|0374157065|Flu: The Story of...|    Gina Bari Kolata|               1999|Farrar Straus Giroux|
|0393045218|The Mummies of Ur...|     E. J. W. Barber|               1999|W. W. Norton &amp...|
|0399135782|The Kitchen God's...|             Amy Tan|               1991|    Putnam Pub Group|
|0425176428|What If?: The Wor...|       Robert Cowley|               2000|Berkley Publishin...|
|0671870432|     PLEADING GUILTY|       

In [11]:
# convert string to int for ALS
stringToInt = StringIndexer(inputCol='ISBN', outputCol='ISBN_int').fit(ratings_df)
ratings_dfs = stringToInt.transform(ratings_df)
ratings_df = ratings_dfs.filter(ratings_dfs['Book-Rating'] != 0)
ratings_df.show()

+-------+----------+-----------+--------+
|User-ID|      ISBN|Book-Rating|ISBN_int|
+-------+----------+-----------+--------+
| 276726|0155061224|          5| 89067.0|
| 276729|052165615X|          3|205984.0|
| 276729|0521795028|          6|206014.0|
| 276736|3257224281|          8| 43132.0|
| 276737|0600570967|          6|216574.0|
| 276744|038550120X|          7|   232.0|
| 276745| 342310538|         10|135627.0|
| 276747|0060517794|          9|  1413.0|
| 276747|0671537458|          9|   914.0|
| 276747|0679776818|          8|  2367.0|
| 276747|0943066433|          7|273158.0|
| 276747|1885408226|          7|296143.0|
| 276748|0747558167|          6| 53332.0|
| 276751|3596218098|          8| 28525.0|
| 276754|0684867621|          8|   562.0|
| 276755|0451166892|          5|   148.0|
| 276760|8440682697|         10|325376.0|
| 276762|0380711524|          5|  2387.0|
| 276762|3453092007|          8| 43258.0|
| 276762|3453213025|          3|310147.0|
+-------+----------+-----------+--

In [16]:
spark.sparkContext.show_profiles()

In [12]:
ratings_dfs.show(20)

+-------+----------+-----------+--------+
|User-ID|      ISBN|Book-Rating|ISBN_int|
+-------+----------+-----------+--------+
| 276725|034545104X|          0|  1637.0|
| 276726|0155061224|          5| 89067.0|
| 276727|0446520802|          0|   568.0|
| 276729|052165615X|          3|205984.0|
| 276729|0521795028|          6|206014.0|
| 276733|2080674722|          0| 80774.0|
| 276736|3257224281|          8| 43132.0|
| 276737|0600570967|          6|216574.0|
| 276744|038550120X|          7|   232.0|
| 276745| 342310538|         10|135627.0|
| 276746|0425115801|          0|   445.0|
| 276746|0449006522|          0|   606.0|
| 276746|0553561618|          0|   424.0|
| 276746|055356451X|          0|   286.0|
| 276746|0786013990|          0| 27579.0|
| 276746|0786014512|          0| 15790.0|
| 276747|0060517794|          9|  1413.0|
| 276747|0451192001|          0|   937.0|
| 276747|0609801279|          0|  6511.0|
| 276747|0671537458|          9|   914.0|
+-------+----------+-----------+--

In [13]:
# split data into training and test datatset
train_df, test_df = ratings_df.randomSplit([0.8,0.2])

## Model

In [9]:
# ALS model
rec_model = ALS( maxIter=20 ,regParam=0.01,userCol='User-ID',itemCol='ISBN_int',ratingCol='Book-Rating',
                nonnegative=True, coldStartStrategy="drop")

rec_model = rec_model.fit(train_df)

10

In [10]:
# making predictions on test set
predicted_ratings=rec_model.transform(test_df)

In [11]:
predicted_ratings.show(20)

+-------+----------+-----------+--------+----------+
|User-ID|      ISBN|Book-Rating|ISBN_int|prediction|
+-------+----------+-----------+--------+----------+
|  17950|0446605239|          6|    26.0| 5.6643753|
|  11676|0446605239|          6|    26.0|  5.598119|
|   6347|0446605239|          7|    26.0|  2.919608|
| 278543|0446605239|          5|    26.0|  7.455864|
|  36606|0446605239|         10|    26.0|  5.722691|
|  18082|0446605239|          9|    26.0|  8.941052|
|  26374|0446605239|          5|    26.0| 7.9713297|
|  53174|0446605239|         10|    26.0| 10.003544|
|  66323|0446605239|          4|    26.0|   6.82341|
|  56554|0446605239|         10|    26.0|  6.007414|
|  49277|0446605239|          7|    26.0| 11.355914|
|  66473|0446605239|          8|    26.0|  6.300741|
|  41460|0446605239|          4|    26.0| 5.7223916|
|  43246|0446605239|          9|    26.0|  7.123988|
|  82164|0446605239|          7|    26.0| 2.5638971|
|  93631|0446605239|          2|    26.0| 3.20

## Evaluation

In [12]:
# calculate RMSE
evaluator = RegressionEvaluator(metricName='rmse', predictionCol='prediction',labelCol='Book-Rating')
rmse = evaluator.evaluate(predicted_ratings)
rmse

3.2397048668237725

## Recommendation

In [13]:
# function to recommend top-n books for a user using trained model
def recommend_for_user(user_id, n):
    ratings_user = ratings_dfs.filter(col('User-Id')==user_id)
    pred_ratings_user = rec_model.transform(ratings_user.filter(col('Book-Rating')==0))
    recs_user = books_df.join(pred_ratings_user.select(['ISBN', 'prediction']), on='ISBN')
    recs_user = recs_user.sort('prediction', ascending=False).drop('prediction').limit(n)
    return recs_user, pred_ratings_user

In [14]:
recs_user, pred_ratings_user = recommend_for_user(240567, 20)
recs_user.show()

+----------+--------------------+--------------------+-------------------+--------------------+
|      ISBN|          Book-Title|         Book-Author|Year-Of-Publication|           Publisher|
+----------+--------------------+--------------------+-------------------+--------------------+
|0743206053|Kitchen Privilege...|  Mary Higgins Clark|               2002|Simon &amp; Schuster|
|0345463927|          Dead Wrong|      MARIAH STEWART|               2004|    Ballantine Books|
|0066214440|Enemy Women: A Novel|      Paulette Jiles|               2002|William Morrow &a...|
|0140113827|Murder at the Gar...|        Jane Langton|               1989|       Penguin Books|
|0553571656|The Beekeeper's A...|      LAURIE R. KING|               1996|              Bantam|
|0060740450|One Hundred Years...|Gabriel Garcia Ma...|               2004|           Perennial|
|0440215730|Recalled to Life ...|       REGINALD HILL|               1993|                Dell|
|0312966091|Three To Get Dead...|     Ja

In [15]:
pred_ratings_user.sort('prediction', ascending=False).show(20)

+-------+----------+-----------+--------+----------+
|User-ID|      ISBN|Book-Rating|ISBN_int|prediction|
+-------+----------+-----------+--------+----------+
| 240567|0743206053|          0|  3536.0|  8.962223|
| 240567|0345463927|          0| 10103.0|  8.607035|
| 240567|0066214440|          0|  1916.0|  8.446151|
| 240567|0140113827|          0| 21128.0|   7.85118|
| 240567|0553571656|          0|  1189.0|   7.75862|
| 240567|0060740450|          0|   792.0| 7.7272353|
| 240567|0440215730|          0| 10285.0|  7.612417|
| 240567|0312966091|          0|   156.0|  7.599072|
| 240567|0060928336|          0|     3.0|  7.399512|
| 240567|1575667673|          0|130512.0| 7.3839936|
| 240567|0399142649|          0| 13487.0|   7.37114|
| 240567|0786890169|          0|  5860.0|  7.255721|
| 240567|0061097314|          0|   404.0|  7.240314|
| 240567|0380733285|          0| 25593.0|   7.21983|
| 240567|0553271636|          0|   262.0| 7.1672974|
| 240567|0804108692|          0|  5862.0| 7.12