# Book Recommender System using Apache Spark and Elasticsearch

### Barkat Sikder, Summer 2021

This is an attempt at builiding a scalable recommender engine for books in the same flavor as the following IBM tutorial:
https://github.com/IBM/elasticsearch-spark-recommender/tree/master#steps

But dataset I will be utilizing contains information from Goodreads and can be found at Kaggle:
https://www.kaggle.com/zygmunt/goodbooks-10k?select=books.csv


## STEP 1: Set up Spark and Elasticsearch

First, download the latest version of Elasticsearch from: https://www.elastic.co/downloads/elasticsearch
You can then unzip the package and then run the following commandline code in the appropriate directory: ./bin/elasticsearch
Keep this running and then do the following in a separate commandline window.

Second, download the latest version of the Elasticsearch Spark Connector from https://www.elastic.co/downloads/hadoop. 
As the name suggests, this makes Elasticsearch compatible with Spark (among other Hadoop-compatible systems). 
Unzip the downloaded file and then keep note of the JAR for the Spark connector: mine is elasticsearch-spark-20_2.11-7.14.0.jar 


Third, download the latest version of Spark from: https://spark.apache.org/downloads.html
As usual, unzip the downloaded file.
My version is spark-3.1.2-bin-hadoop3.2


Finally, you can launch the notebook by typing (for my versions of the softwares; make edits as you see necessary): 

PYSPARK_DRIVER_PYTHON="jupyter-lab" spark-3.1.2-bin-hadoop3.2/bin/pyspark --driver-memory 4g --driver-class-path elasticsearch-hadoop-7.14.0/dist/elasticsearch-spark-20_2.11-7.14.0.jar

Once you launch Jupyter, you can begin working with this notebook.

In [1]:
# check Spark is running running the following command
spark

## STEP 2: Prepare data

I'm using Pandas to manipulagte the data to get them ready for importing into Spark.

For our model, we need data for ratings per user per book, and general data about the books. 

In [42]:
# Initialize the directory path
PATH_TO_DATA = "/Users/barkatsikder/Downloads/Cool Notebooks/Projectsx3/"

In [None]:
# load ratings data into Spark
ratings = spark.read.csv(PATH_TO_DATA + "books10k/ratings.csv", header=True, inferSchema=True)
ratings.cache()

In [4]:
# Select the ratings variables that are necessary later for the model
ratings = ratings.select(
    ratings.book_id, ratings.user_id, ratings.rating)
ratings.show(5)

+-------+-------+------+
|book_id|user_id|rating|
+-------+-------+------+
|      1|    314|     5|
|      1|    439|     3|
|      1|    588|     5|
|      1|   1169|     4|
|      1|   1185|     4|
+-------+-------+------+
only showing top 5 rows



Once we have ratings data, we need a dataset about the books that we need to prepare from books data and tags data
More on these two separate dataset can be found on the Kaggle page

In [5]:
import pandas as pd

books = pd.read_csv(PATH_TO_DATA + "books10k/books.csv")

In [6]:
# inspect the books variables
books.info() # book_id original_title image_url

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 10000 entries, 0 to 9999
Data columns (total 23 columns):
 #   Column                     Non-Null Count  Dtype  
---  ------                     --------------  -----  
 0   id                         10000 non-null  int64  
 1   book_id                    10000 non-null  int64  
 2   best_book_id               10000 non-null  int64  
 3   work_id                    10000 non-null  int64  
 4   books_count                10000 non-null  int64  
 5   isbn                       9300 non-null   object 
 6   isbn13                     9415 non-null   float64
 7   authors                    10000 non-null  object 
 8   original_publication_year  9979 non-null   float64
 9   original_title             9415 non-null   object 
 10  title                      10000 non-null  object 
 11  language_code              8916 non-null   object 
 12  average_rating             10000 non-null  float64
 13  ratings_count              10000 non-null  int6

In [7]:
# To get tag information about books, e.g. genres, we need to connect tag ids with tag info

tags = pd.read_csv(PATH_TO_DATA + "books10k/tags.csv")
book_tags = pd.read_csv(PATH_TO_DATA + "books10k/book_tags.csv")

In [8]:
new_tags = book_tags.merge(tags, left_on='tag_id', right_on='tag_id', how='left')

In [9]:
new_tags = new_tags.drop(columns=["count"])

In [10]:
# amazing function to roll up values : https://stackoverflow.com/questions/60427975/collapse-values-from-multiple-rows-of-a-column-into-an-array-when-all-other-colu

changes = ['tag_id','tag_name']
cols = new_tags.columns.difference(changes).tolist()

new_tags = new_tags.groupby(cols)[changes].agg(list).reset_index().reindex(new_tags.columns, axis=1)

In [11]:
new_tags = new_tags.drop(columns=["tag_id"])

In [12]:
books2 = books[["book_id", "original_title", "authors", "original_publication_year", "language_code", "average_rating", "image_url"]]


In [13]:
books2 = books2.merge(new_tags, left_on='book_id', right_on='goodreads_book_id', how='left')
books2 = books2.drop(columns=["goodreads_book_id"])

In [14]:
books2.head(1) 

Unnamed: 0,book_id,original_title,authors,original_publication_year,language_code,average_rating,image_url,tag_name
0,2767052,The Hunger Games,Suzanne Collins,2008.0,eng,4.34,https://images.gr-assets.com/books/1447303603m...,"[favorites, currently-reading, young-adult, fi..."


In [15]:
books3 = books2[["book_id", "original_title", "original_publication_year", "tag_name"]]

In [16]:
books3.head(1)

Unnamed: 0,book_id,original_title,original_publication_year,tag_name
0,2767052,The Hunger Games,2008.0,"[favorites, currently-reading, young-adult, fi..."


In [17]:
# after all that preprocessing, we're saving a version that we can then use for Spark
books3.to_csv('books3.csv')

One could perform all of the above using PySpark's SQL module instead of Python, but using Pandas is more straightforward for our task



In [18]:
book_data = spark.read.csv(PATH_TO_DATA + "books3.csv", header=True, inferSchema=True)
#raw_books.show(5, truncate=False)

In [19]:
# Some modules we might need later

from pyspark.sql.functions import udf
from pyspark.sql.types import *

In [20]:
# a break when running the whole notebook at once

bla bla bla

SyntaxError: invalid syntax (<ipython-input-20-7c4a8d5050ba>, line 3)

## STEP 3: Prepare Elasticsearch

Next, load the preprocessed data into Elasticsearch. But first, test that Elasticsearch is instantiated and running.

In [44]:
from elasticsearch import Elasticsearch

# declare ES object

# test your ES instance is running
es = Elasticsearch()
es.info(pretty=True)

ConnectionError: ConnectionError(<urllib3.connection.HTTPConnection object at 0x7f888aff3b70>: Failed to establish a new connection: [Errno 61] Connection refused) caused by: NewConnectionError(<urllib3.connection.HTTPConnection object at 0x7f888aff3b70>: Failed to establish a new connection: [Errno 61] Connection refused)

Create Elasticsearch indices, with mappings for users, books and rating events.

In Elasticsearch, an "index" is roughly similar to a "database" or "database table". The schema for an index is called an index mapping.

In [24]:
# delete the previously created indices if you already ran the following cell 
es.indices.delete(index="ratings,users,books")

{'acknowledged': True}

In [25]:
# set the factor vector dimension for the recommendation model
VECTOR_DIM = 20

create_ratings = {
    "mappings": {
        "properties": {
            "user_id": {
                "type": "integer"
            },
            "book_id": {
                "type": "integer"
            },
            "rating": {
                "type": "double"
            }
        }  
    }
}

create_users = {
    "mappings": {
        "properties": {
            "user_id": {
                "type": "integer"
            },
            "model_factor": {
                "type": "dense_vector",
                "dims" : VECTOR_DIM
            },
            "model_version": {
                "type": "keyword"
            },
            "model_timestamp": {
                "type": "date"
            }
        }
    }
}

create_books= {
    "mappings": {
        "properties": {
            "book_id": {
                "type": "integer"
            },
            "tag_name": {
                "type": "keyword"
            },
            "original_publication_year": {
                "type": "date",
                "format": "year"
            },
            "model_factor": {
                "type": "dense_vector",
                "dims" : VECTOR_DIM
            },
            "model_version": {
                "type": "keyword"
            },
            "model_timestamp": {
                "type": "date"
            }          
        }
    }
}



# indices are basically dataframes

# create indices with the settings and mappings above
res_ratings = es.indices.create(index="ratings", body=create_ratings)
res_users = es.indices.create(index="users", body=create_users)
res_books = es.indices.create(index="books", body=create_books)

print("Created indices:")
print(res_ratings)
print(res_users)
print(res_books)

Created indices:
{'acknowledged': True, 'shards_acknowledged': True, 'index': 'ratings'}
{'acknowledged': True, 'shards_acknowledged': True, 'index': 'users'}
{'acknowledged': True, 'shards_acknowledged': True, 'index': 'books'}


Load books and books DataFrames into Elasticsearch.
You can use the Spark Elasticsearch connector to write a DataFrame with the native Spark datasource API by specifying format("es"). 
First you will write the ratings data to Elasticsearch. 

"""
            "original_title": {
                "type": "keyword"
            },
            "author": {
                "type": "keyword"
            },
            "langauge_code": {
                "type": "keyword"
            },
"""

In [26]:
# write ratings data
# ratings was spark df i think?
ratings.write.format("es").save("ratings")


num_ratings_es = es.count(index="ratings")['count']
num_ratings_df = ratings.count()
# check write went ok
print("Dataframe count: {}".format(num_ratings_df))
print("ES index count:  {}".format(num_ratings_es))

Py4JJavaError: An error occurred while calling o63.save.
: java.lang.NoClassDefFoundError: scala/Product$class
	at org.elasticsearch.spark.sql.ElasticsearchRelation.<init>(DefaultSource.scala:220)
	at org.elasticsearch.spark.sql.DefaultSource.createRelation(DefaultSource.scala:105)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:132)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:131)
	at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:293)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.ClassNotFoundException: scala.Product$class
	at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:588)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)
	... 34 more


In [None]:
# write books data, specifying the DataFrame column to use as the id mapping
book_data.write.format("es").option("es.mapping.id", "book_id").save("books")


num_books_df = book_data.count()
num_books_es = es.count(index="books")['count']
# check load went ok
print("Books DF count: {}".format(num_books_df))
print("ES index count: {}".format(num_books_es))

Search based on book metadata to test if all of the above worked:

In [None]:
# test things out by searching for books containing "love" in the title
es.search(index="book", q="title:love", size=3)

## STEP 4: Train the recommender on the Elasticsearch ratings data in Spark

In [None]:
# User-Item Collaborative Filtering

ratings_from_es = spark.read.format("es").load("ratings")
ratings_from_es.show(5)


In [None]:
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import col

# basically we get/read the ratings from spark and feed it into ALS model and thats about it

als = ALS(userCol="user_id", itemCol="book_id", ratingCol="rating", regParam=0.02, rank=VECTOR_DIM, seed=42)
model = als.fit(ratings_from_es)
model.userFactors.show(5)
model.itemFactors.show(5)

In [None]:
from pyspark.sql.functions import lit, current_timestamp, unix_timestamp

# putting it back into elasticsearch

ver = model.uid
ts = unix_timestamp(current_timestamp())
movie_vectors = model.itemFactors.select("id",\
                                         col("features").alias("model_factor"),\
                                         lit(ver).alias("model_version"),\
                                         ts.alias("model_timestamp"))
movie_vectors.show(5)
user_vectors = model.userFactors.select("id",\
                                        col("features").alias("model_factor"),\
                                        lit(ver).alias("model_version"),\
                                        ts.alias("model_timestamp"))
user_vectors.show(5)

In [None]:
# not quite sure what this/next cell does 


# write data to ES, use:
# - "id" as the column to map to ES book id
# - "update" write mode for ES, since you want to update new fields only
# - "append" write mode for Spark
movie_vectors.write.format("es") \
    .option("es.mapping.id", "id") \
    .option("es.write.operation", "update") \
    .save("books", mode="append")

In [None]:
# write data to ES, use:
# - "id" as the column to map to ES book id
# - "index" write mode for ES, since you have not written to the user index previously
# - "append" write mode for Spark
user_vectors.write.format("es") \
    .option("es.mapping.id", "id") \
    .option("es.write.operation", "index") \
    .save("users", mode="append")

In [None]:
# Test
    


In [None]:
# 5 Helper functions

    

In [None]:
# Filter based on title



In [None]:
# Filter based on genres


In [None]:
# Recommend movies liked by other users who liked the same movies as the given user
    # Filter by release date