In [1]:
import numpy as np
import pandas as pd

In [2]:
import sklearn
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.feature_extraction.text import CountVectorizer

In [3]:
# Importing book data

books = pd.read_csv('/Users/snehalchavan/Desktop/MS /531 Adv Database Management/Project Prep/Database files/Kaggle dataset/Goodread_books.csv')
# Books = 100000

books.shape

(10000, 23)

In [4]:
books.head(2)

Unnamed: 0,id,book_id,best_book_id,work_id,books_count,isbn,isbn13,Book-Author,original_publication_year,original_title,...,ratings_count,work_ratings_count,work_text_reviews_count,ratings_1,ratings_2,ratings_3,ratings_4,ratings_5,image_url,small_image_url
0,1,2767052,2767052,2792775,272,439023483,9780000000000.0,Suzanne Collins,2008.0,The Hunger Games,...,4780653,4942365,155254,66715,127936,560092,1481305,2706317,https://images.gr-assets.com/books/1447303603m...,https://images.gr-assets.com/books/1447303603s...
1,2,3,3,4640799,491,439554934,9780000000000.0,"J.K. Rowling, Mary GrandPré",1997.0,Harry Potter and the Philosopher's Stone,...,4602479,4800065,75867,75504,101676,455024,1156318,3011543,https://images.gr-assets.com/books/1474154022m...,https://images.gr-assets.com/books/1474154022s...


In [5]:
books.columns

Index(['id', 'book_id', 'best_book_id', 'work_id', 'books_count', 'isbn',
       'isbn13', 'Book-Author', 'original_publication_year', 'original_title',
       'Book-Title', 'language_code', 'average_rating', 'ratings_count',
       'work_ratings_count', 'work_text_reviews_count', 'ratings_1',
       'ratings_2', 'ratings_3', 'ratings_4', 'ratings_5', 'image_url',
       'small_image_url'],
      dtype='object')

In [6]:
# Creating books dataframe and keeping only the required fields in dataset
books = books[['book_id', 'isbn', 'Book-Title', 'Book-Author', 'original_publication_year', 'ratings_count' ]]

# renaming columns as per requirements
books.rename(columns={'book_id':'Book_Id', 'isbn': 'ISBN', 'original_publication_year': 'Year_of_publication'}, inplace=True)

books.head(2)

Unnamed: 0,Book_Id,ISBN,Book-Title,Book-Author,Year_of_publication,ratings_count
0,2767052,439023483,"The Hunger Games (The Hunger Games, #1)",Suzanne Collins,2008.0,4780653
1,3,439554934,Harry Potter and the Sorcerer's Stone (Harry P...,"J.K. Rowling, Mary GrandPré",1997.0,4602479


In [7]:
# Keyword extraction

from sklearn.feature_extraction.text import CountVectorizer

# initiate vectorizer to remove stop words
Vect = CountVectorizer(analyzer= 'word', ngram_range=(1,2), stop_words= 'english', min_df=0.002)


# Processing it into the book title

Vect.fit(books['Book-Title'])
title_matrix = Vect.transform(books['Book-Title'])
title_matrix.shape

# we can extract 261 features

(10000, 261)

In [8]:
# seeing the 261 words which we got from previous evaluation
features = Vect.get_feature_names_out()
features

array(['01', '10', '11', '12', '13', '14', '15', '16', '39', 'adventures',
       'alex', 'alex cross', 'america', 'american', 'angel', 'angels',
       'anita', 'anita blake', 'apprentice', 'art', 'bad', 'batman',
       'beautiful', 'beauty', 'best', 'big', 'black', 'blake',
       'blake vampire', 'blood', 'blue', 'body', 'bone', 'bones', 'book',
       'books', 'born', 'bosch', 'bosch universe', 'boy', 'boys',
       'broken', 'brothers', 'case', 'cat', 'child', 'children',
       'chronicles', 'circle', 'city', 'club', 'complete', 'confessions',
       'cross', 'cycle', 'dance', 'dark', 'dark hunter', 'darkest',
       'darkness', 'daughter', 'davenport', 'dawn', 'day', 'days', 'dead',
       'death', 'death death', 'detective', 'devil', 'diaries', 'diary',
       'die', 'discworld', 'dog', 'don', 'dragon', 'dream', 'dreams',
       'earth', 'empire', 'end', 'fall', 'fallen', 'family', 'fear',
       'files', 'food', 'forever', 'forgotten', 'forgotten realms',
       'game', 'gard

In [9]:
# Calculating cosine similarity

from sklearn.metrics.pairwise import cosine_similarity
Similar_titles = cosine_similarity(title_matrix, title_matrix)
Similar_titles.shape

(10000, 10000)

In [10]:
Book_id = 100
books['Book-Title'].iloc[Book_id]

'Me Talk Pretty One Day'

In [11]:
# Checking what features are considered for the provided title

feat_array = np.squeeze(title_matrix[Book_id].toarray())        # Squeeze converts matrix to array

index = np.where(feat_array > 0)
index[0]
[features[x] for x in index[0]]



['day', 'pretty']

In [12]:
index[0]                        # Index of the features 

array([ 63, 179])

In [13]:
# Checking cosine similarity with other similar titles

n = 10                          # No. of recommendation needed

top_index = np.flip(np.argsort(Similar_titles[Book_id]), axis = 0)[0:n]
top_sim_index = Similar_titles[Book_id, top_index]
top_sim_index

array([1.        , 0.70710678, 0.70710678, 0.70710678, 0.70710678,
       0.70710678, 0.70710678, 0.70710678, 0.70710678, 0.70710678])

In [14]:
# finding values > 0
top_index = top_index[top_sim_index > 0]

# Book matching
books['Book-Title'].iloc[top_index]

100                                Me Talk Pretty One Day
3729                                            Labor Day
988                                 The Day of the Jackal
836                             Every Day (Every Day, #1)
2348    No Easy Day: The Firsthand Account of the Miss...
3311                                          Pretty Baby
6804                     Graduation Day (The Testing, #3)
6886                                 Day Watch (Watch #2)
5765                          The Given Day (Coughlin #1)
783                                      For One More Day
Name: Book-Title, dtype: object

In [15]:
# Creating function for the above code

def Similar_books(Book_id, title_matrix, Vector, top_n):

    # Simliar matrix
    Similar_matrix = cosine_similarity(title_matrix, title_matrix)
    #features = Vect.get_feature_names_out()
    features = Vector.get_feature_names_out()
    
    top_index = np.flip(np.argsort(Similar_matrix[Book_id,]), axis= 0 )[0: top_n]
    top_sim_index = Similar_matrix[Book_id, top_index]

    # Getting top values >0

    top_index = top_index[top_sim_index > 0]
    scores = top_sim_index[top_sim_index > 0]

    # Evaluating features from vectorized matrix
    Similar_books = books['Book-Title'].iloc[top_index].index
    words = []

    for book_index in Similar_books:
        try:
            feat_array = np.squeeze(title_matrix[book_index,].toarray())
        except:
            feat_array = np.squeeze(title_matrix[book_index,])

        index = np.where(feat_array > 0)
        words.append([" , ".join([features[i] for i in index[0]])])

    # Merging the results

    result = pd.DataFrame({"Book-Title": books['Book-Title'].iloc[Book_id], 
                           "Similar_books": books["Book-Title"].iloc[top_index].values, "Words": words, 
                            "Scores": scores}, columns=["Book-Title", "Similar_books", "scores", "words" ]                          
                          )
    return result

In [16]:
Vect = CountVectorizer(analyzer='word', ngram_range= (1,2), stop_words= 'english', min_df= 0.01)
Vect.fit(books["Book-Title"])
title_matrix = Vect.transform(books["Book-Title"])
print(books["Book-Title"][100])

Similar_books(10,title_matrix, Vect, 6)

Me Talk Pretty One Day


Unnamed: 0,Book-Title,Similar_books,scores,words


In [17]:
# Using Tf-IDF method for recommendation

from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import linear_kernel

tf = TfidfVectorizer(analyzer = 'word', ngram_range= (1,2), min_df= 0.0, stop_words= 'english' )
tfid_matrix = tf.fit_transform(books["Book-Title"])
cosine_similar = cosine_similarity(tfid_matrix, tfid_matrix)
cosine_similar


array([[1., 0., 0., ..., 0., 0., 0.],
       [0., 1., 0., ..., 0., 0., 0.],
       [0., 0., 1., ..., 0., 0., 0.],
       ...,
       [0., 0., 0., ..., 1., 0., 0.],
       [0., 0., 0., ..., 0., 1., 0.],
       [0., 0., 0., ..., 0., 0., 1.]])

In [18]:
Books = books['Book-Title']

# Convert titles to series
indices = pd.Series(books.index, index= books["Book-Title"])

# Function to get book suggestion based on cosine similarity
def Book_recomender(title, n):
    index = indices[title]
    Similar_scores = list(enumerate(cosine_similar[index]))
    Similar_scores = sorted(Similar_scores, key = lambda X:X[1], reverse = True) 

    Similar_scores = Similar_scores[1:n+1]
    book_index = [i[0] for i in Similar_scores]
    
    return Books.iloc[book_index]

In [19]:
# Recommending n books with index 10

book_index = 10
n = 20

print(books["Book-Title"][book_index])
#Book_recomender(books.Books[book_index], n)

The Kite Runner


In [20]:
Book_recomender("Once a Runner", 10)



90                      The Maze Runner (Maze Runner, #1)
10                                        The Kite Runner
375                      The Death Cure (Maze Runner, #3)
945                    The Kill Order (Maze Runner, #0.5)
258                   The Scorch Trials (Maze Runner, #2)
6711    Ultramarathon Man: Confessions of an All-Night...
0                 The Hunger Games (The Hunger Games, #1)
1       Harry Potter and the Sorcerer's Stone (Harry P...
2                                 Twilight (Twilight, #1)
3                                   To Kill a Mockingbird
Name: Book-Title, dtype: object

In [21]:
Book_recomender("1984", 10)
b = Book_recomender("1984", 10)
# b.shape

In [22]:
# Collaborative filtering

In [23]:
pip install pyspark

Note: you may need to restart the kernel to use updated packages.


In [24]:
pip install numpy

Note: you may need to restart the kernel to use updated packages.


In [25]:
# Import all the required libraries

import pyspark as ps

from pyspark.sql import SQLContext                          # to create a pySpark session
from pyspark.ml.evaluation import RegressionEvaluator       # To predict continuous values
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder      # To find the stop words
from pyspark.ml import Pipeline
from pyspark.sql import Row
from pyspark.ml.recommendation import ALS                       # inbuilt libray provided to use in recommendation system
from pyspark.sql.functions import udf,col,when
import numpy as np                                              # used for performing mathematical operations faster

In [26]:
# Show predictions of book images

from IPython.display import Image
from IPython.display import display


In [27]:
# Creating Spark session on Jupyter notebook

spark = ps.sql.SparkSession.builder\
        .master("local")\
        .appName("Recommender System")\
        .getOrCreate()                      # function to obtain Spark sesssion, will create new swssion if don't exist


sc = spark.sparkContext

SQLContext = SQLContext(sc)


24/05/14 18:31:44 WARN Utils: Your hostname, Snehals-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 192.168.1.3 instead (on interface en0)
24/05/14 18:31:44 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/14 18:31:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/05/14 18:31:47 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [28]:
# Import ratings datafile 

ratings = spark.read.csv("/Users/snehalchavan/Desktop/MS /531 Adv Database Management/Project Prep/Database files/kaggle dataset/ratings.csv", header = True, inferSchema= True)    # "header" to include headings of dataset, inferschema converts string data to int
ratings.printSchema()       # outputs the columns of database file

[Stage 1:>                                                          (0 + 1) / 1]

root
 |-- book_id: integer (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- rating: integer (nullable = true)



                                                                                

In [29]:
# Reading books data

books = spark.read.csv('/Users/snehalchavan/Desktop/MS /531 Adv Database Management/Project Prep/Database files/Kaggle dataset/Goodread_books.csv', header= True, inferSchema= True)
books.printSchema()

[Stage 3:>                                                          (0 + 1) / 1]

root
 |-- id: integer (nullable = true)
 |-- book_id: integer (nullable = true)
 |-- best_book_id: integer (nullable = true)
 |-- work_id: integer (nullable = true)
 |-- books_count: integer (nullable = true)
 |-- isbn: string (nullable = true)
 |-- isbn13: double (nullable = true)
 |-- Book-Author: string (nullable = true)
 |-- original_publication_year: integer (nullable = true)
 |-- original_title: string (nullable = true)
 |-- Book-Title: string (nullable = true)
 |-- language_code: string (nullable = true)
 |-- average_rating: string (nullable = true)
 |-- ratings_count: string (nullable = true)
 |-- work_ratings_count: string (nullable = true)
 |-- work_text_reviews_count: string (nullable = true)
 |-- ratings_1: double (nullable = true)
 |-- ratings_2: integer (nullable = true)
 |-- ratings_3: integer (nullable = true)
 |-- ratings_4: integer (nullable = true)
 |-- ratings_5: integer (nullable = true)
 |-- image_url: string (nullable = true)
 |-- small_image_url: string (nullabl

                                                                                

In [30]:
# Spliting ratings data into training and testing

training, validation = ratings.randomSplit([.8,.2])


In [31]:
# Parameters used for implementing ALS algo


iterations = 10                     # Maximum number of iterations to run
regularization_parameter = 0.1      # Specifies regularization parameter
rank = 4                            # Number of latent factors in the model
errors = []                         
err = 0

In [32]:
# Processing the data

als = ALS(maxIter=iterations, regParam=regularization_parameter, rank = 5, userCol= "user_id", itemCol="book_id", ratingCol="rating" )
model = als.fit(training)                           # model creation
predictions = model.transform(validation)           # Validation of predictions
new_predict = predictions.filter(col('prediction')!= np.nan)            # Remove the NA predictions if any
evaluator = RegressionEvaluator(metricName= "rmse", labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(new_predict)
print(" Root Mean Square Error Value = " + str(rmse))



24/05/14 18:32:18 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/05/14 18:32:19 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK
[Stage 90:>                                                         (0 + 1) / 1]

 Root Mean Square Error Value = 0.8967211298963494


                                                                                

In [33]:
# Crossvalidator to validate the model and will give the best fit for the model

ls = ALS(maxIter=iterations, regParam=regularization_parameter, rank =5, userCol= "user_id", itemCol="book_id", ratingCol="rating" )
paramGrid = ParamGridBuilder()\
    .addGrid(als.regParam, [0.1,0.01,0.10])\
    .addGrid(als.rank, range(4,10))\
    .build()

evaluator = RegressionEvaluator(metricName= "rmse", labelCol= "rating", predictionCol="prediction")
crossval = CrossValidator(estimator=als, 
                          estimatorParamMaps=paramGrid,
                          evaluator = evaluator,
                          numFolds= 5)

CvModel = crossval.fit(training)


24/05/14 19:00:00 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 219566 ms exceeds timeout 120000 ms
24/05/14 19:00:01 WARN SparkContext: Killing executors is not supported by current scheduler.
24/05/14 19:00:01 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$

In [34]:
predictions = CvModel.transform(validation)           # Validation of predictions
new_predict = predictions.filter(col('prediction')!= np.nan)  

rmse = evaluator.evaluate(new_predict)
print(" Root Mean Square Error Value = " + str(rmse))

24/05/14 19:00:43 ERROR Inbox: Ignoring error>                     (6 + 1) / 10]
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$driverEndpoint(BlockManagerMasterEndpoint.scala:123)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.isExecutorAlive$lzycompute$1(BlockManagerMasterEndpoint.scala:688)
	at org.apach

 Root Mean Square Error Value = 0.8946035932586425


                                                                                

In [36]:
# Validate the data rating prediction using testing data

predictions = model.transform(validation)
predictions.show(n=10)

24/05/14 19:19:22 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:101)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:85)
	at org.apache.spark.storage.BlockManagerMaster.registerBlockManager(BlockManagerMaster.scala:80)
	at org.apache.spark.storage.BlockManager.reregister(BlockManager.scala:642)
	at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:1223)
	at org.apache.spark.executor.Executor.$anonfun$heartbeater$1(Executor.scala:295)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)

+-------+-------+------+----------+
|book_id|user_id|rating|prediction|
+-------+-------+------+----------+
|      1|    588|     5| 4.0968413|
|      1|   2077|     4| 3.5162528|
|      1|   5461|     3|  4.336852|
|      1|   6630|     5| 4.0857944|
|      1|   7563|     3| 3.7425392|
|      1|   9246|     1| 3.3241005|
|      1|  10610|     5|   4.41921|
|      1|  13282|     5| 4.9166317|
|      1|  21713|     5| 4.6534348|
|      1|  23612|     4| 4.0550675|
+-------+-------+------+----------+
only showing top 10 rows



24/05/14 19:19:32 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:101)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:85)
	at org.apache.spark.storage.BlockManagerMaster.registerBlockManager(BlockManagerMaster.scala:80)
	at org.apache.spark.storage.BlockManager.reregister(BlockManager.scala:642)
	at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:1223)
	at org.apache.spark.executor.Executor.$anonfun$heartbeater$1(Executor.scala:295)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)

In [39]:
# Join prediction data to book to get book name
spark.conf.set("spark.debug.maxToStringFields", 1000)
#predictions.join(books, "book_id").select("user_id","Book-Title","rating","prediction").show(10)

#predict = predictions.join(books, "book_id").select("Book-Title").show(5)
predict = predictions.join(books, "book_id").select("Book-Title", "prediction")



24/05/14 19:21:32 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:101)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:85)
	at org.apache.spark.storage.BlockManagerMaster.registerBlockManager(BlockManagerMaster.scala:80)
	at org.apache.spark.storage.BlockManager.reregister(BlockManager.scala:642)
	at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:1223)
	at org.apache.spark.executor.Executor.$anonfun$heartbeater$1(Executor.scala:295)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)

In [38]:
predict.show(5)

24/05/14 19:21:02 ERROR Inbox: Ignoring error) / 10][Stage 8162:>(0 + 0) / 10]
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$driverEndpoint(BlockManagerMasterEndpoint.scala:123)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.isExecutorAlive$lzycompute$1(BlockManagerMasterEndpoint.scala:688)
	at org.apache.

+--------------------+------+----------+
|          Book-Title|rating|prediction|
+--------------------+------+----------+
|Harry Potter and ...|     2| 4.7583313|
|The Lost Continen...|     4| 3.6137068|
|J.R.R. Tolkien 4-...|     4| 3.5912304|
|Dune Messiah (Dun...|     4| 3.1464858|
|The Lord of the R...|     3| 3.9147897|
+--------------------+------+----------+
only showing top 5 rows



24/05/14 19:21:12 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:101)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:85)
	at org.apache.spark.storage.BlockManagerMaster.registerBlockManager(BlockManagerMaster.scala:80)
	at org.apache.spark.storage.BlockManager.reregister(BlockManager.scala:642)
	at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:1223)
	at org.apache.spark.executor.Executor.$anonfun$heartbeater$1(Executor.scala:295)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)

In [40]:
predict = predict.toPandas()

24/05/14 19:21:42 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$driverEndpoint(BlockManagerMasterEndpoint.scala:123)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.isExecutorAlive$lzycompute$1(BlockManagerMasterEndpoint.scala:688)
	at org.apache.spark.storage.BlockManagerMasterE

In [41]:
type(predict)

pandas.core.frame.DataFrame

24/05/14 19:21:52 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:101)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:85)
	at org.apache.spark.storage.BlockManagerMaster.registerBlockManager(BlockManagerMaster.scala:80)
	at org.apache.spark.storage.BlockManager.reregister(BlockManager.scala:642)
	at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:1223)
	at org.apache.spark.executor.Executor.$anonfun$heartbeater$1(Executor.scala:295)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)

In [45]:
predict = predict.head(150)

In [46]:
# TO use this predictions we need to export the results using pickle

import pickle
pickle.dump(predict,(open('predict.pkl', 'wb')))

24/05/14 19:24:22 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:101)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:85)
	at org.apache.spark.storage.BlockManagerMaster.registerBlockManager(BlockManagerMaster.scala:80)
	at org.apache.spark.storage.BlockManager.reregister(BlockManager.scala:642)
	at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:1223)
	at org.apache.spark.executor.Executor.$anonfun$heartbeater$1(Executor.scala:295)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)

In [39]:
# Prediction of a user_id = 35982 and joining with books

single_user = predictions.filter(col("user_id")== 32592).join(
                                books, "book_id").select("user_id", "Book-Title", "rating", "prediction")
single_user.count()


                                                                                

5

In [40]:
single_user.show()

                                                                                

+-------+--------------------+------+----------+
|user_id|          Book-Title|rating|prediction|
+-------+--------------------+------+----------+
|  32592|The Lost Continen...|     4| 3.6479323|
|  32592|    Tropic of Cancer|     4| 3.8501437|
|  32592|What to Expect th...|     4| 4.0864534|
|  32592|              Zodiac|     5|  3.861659|
|  32592|     The Known World|     4| 3.9507499|
+-------+--------------------+------+----------+



In [41]:
for book in single_user.take(5):
    print(book) 
    #display(Image(url=book.image_url))


                                                                                

Row(user_id=32592, Book-Title='The Lost Continent: Travels in Small Town America', rating=4, prediction=3.647932291030884)
Row(user_id=32592, Book-Title='Tropic of Cancer', rating=4, prediction=3.8501436710357666)
Row(user_id=32592, Book-Title='What to Expect the First Year (What to Expect)', rating=4, prediction=4.086453437805176)
Row(user_id=32592, Book-Title='Zodiac', rating=5, prediction=3.861659049987793)
Row(user_id=32592, Book-Title='The Known World', rating=4, prediction=3.9507498741149902)


In [42]:
# Generate top 5 book recommendations for each user

userRecommen = model.recommendForAllUsers(5)
print(userRecommen)

DataFrame[user_id: int, recommendations: array<struct<book_id:int,rating:float>>]


In [43]:
# Generating top 5 user recommendations

bookRecommen = model.recommendForAllItems(5)
print(bookRecommen)

DataFrame[book_id: int, recommendations: array<struct<user_id:int,rating:float>>]


In [45]:
# Generating 5 recommendations to 10 users

userRecommen.select("user_id", "recommendations.book_id").show(10,False)
userRecommen.printSchema()



+-------+------------------------------+
|user_id|book_id                       |
+-------+------------------------------+
|1      |[4868, 5731, 8926, 9842, 4344]|
|2      |[5059, 2503, 7537, 4940, 6419]|
|3      |[1788, 1183, 8606, 3248, 6590]|
|4      |[9516, 7283, 5731, 4403, 9392]|
|5      |[1338, 6613, 6457, 7844, 6902]|
|6      |[4868, 8013, 3753, 9076, 5735]|
|7      |[3628, 4483, 5207, 7593, 6590]|
|8      |[6543, 8547, 7942, 9086, 9292]|
|9      |[3628, 6590, 8187, 5207, 7254]|
|10     |[4344, 9876, 9516, 4868, 5580]|
+-------+------------------------------+
only showing top 10 rows

root
 |-- user_id: integer (nullable = false)
 |-- recommendations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- book_id: integer (nullable = true)
 |    |    |-- rating: float (nullable = true)



                                                                                

In [46]:
# Recommending same books to 5 users

bookRecommen.select("book_id", "recommendations.user_id").show(10,False)



+-------+-----------------------------------+
|book_id|user_id                            |
+-------+-----------------------------------+
|1      |[17623, 12353, 14226, 49484, 52102]|
|2      |[45652, 33252, 7791, 3054, 13439]  |
|3      |[50062, 30699, 2137, 51241, 6205]  |
|4      |[29156, 17623, 12353, 44225, 29438]|
|5      |[29156, 23462, 46127, 35351, 48473]|
|6      |[17623, 47757, 29156, 29438, 35945]|
|7      |[23462, 17623, 45652, 49484, 6656] |
|8      |[29156, 23462, 46127, 49484, 17623]|
|9      |[13439, 31624, 7791, 13145, 18971] |
|10     |[52721, 17623, 40926, 12353, 23462]|
+-------+-----------------------------------+
only showing top 10 rows



                                                                                

In [47]:
bookRecommen.printSchema()

root
 |-- book_id: integer (nullable = false)
 |-- recommendations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- user_id: integer (nullable = true)
 |    |    |-- rating: float (nullable = true)



In [48]:
# Checking recommendations for random 5 users

users = ratings.select("user_id").distinct().limit(5)
users.show()

[Stage 8499:>                                                       (0 + 1) / 1]

+-------+
|user_id|
+-------+
|  32592|
|  19984|
|  35982|
|   1088|
|   3918|
+-------+



                                                                                

In [49]:
# Checking recommendations for above users

userSubsetRec = model.recommendForUserSubset(users,5)
userSubsetRec.select("user_id","recommendations.book_id").show(10,False)



+-------+------------------------------+
|user_id|book_id                       |
+-------+------------------------------+
|32592  |[4868, 7844, 7440, 6902, 3753]|
|35982  |[5084, 7081, 8109, 2937, 4291]|
|19984  |[3628, 5207, 862, 8415, 7844] |
|1088   |[3628, 9842, 4706, 4778, 7639]|
|3918   |[3628, 9842, 4706, 5207, 3248]|
+-------+------------------------------+



                                                                                

In [50]:
# generating users to whom the particular books can be recommended

Book = ratings.select("Book_id").distinct().limit(5)
Book.show()

[Stage 8557:>                                                       (0 + 1) / 1]

+-------+
|Book_id|
+-------+
|    148|
|    463|
|    471|
|    496|
|    833|
+-------+



                                                                                

In [51]:
# Recommending 5 books for above 5 users

BookSubSetRec = model.recommendForItemSubset(Book,5)

BookSubSetRec.select("book_id","recommendations.user_id").show(5, False)



+-------+-----------------------------------+
|book_id|user_id                            |
+-------+-----------------------------------+
|471    |[17623, 12353, 47757, 14226, 41609]|
|463    |[31624, 38263, 7791, 17623, 27329] |
|833    |[17623, 29156, 39736, 40706, 22551]|
|496    |[29156, 13439, 17623, 39736, 40706]|
|148    |[17623, 12353, 14226, 47757, 52102]|
+-------+-----------------------------------+



                                                                                

In [52]:
# validating the data from default recom file

book = [860, 1524, 2885, 2914,5297, 7397, 8802, 9506]
user = [4917]

new_pred = SQLContext.createDataFrame(zip(book, user), schema=['book_id', 'user_id'])


In [53]:
new_pred.show(5)



+-------+-------+
|book_id|user_id|
+-------+-------+
|    860|   4917|
+-------+-------+



                                                                                

In [54]:
new_preds = model.transform(new_pred)
new_preds.show()

                                                                                

+-------+-------+----------+
|book_id|user_id|prediction|
+-------+-------+----------+
|    860|   4917| 3.5662172|
+-------+-------+----------+



In [55]:
dict = []
df = userRecommen.toPandas()

                                                                                

In [56]:
df

Unnamed: 0,user_id,recommendations
0,1,"[(4868, 4.189826011657715), (5731, 4.173383712..."
1,2,"[(5059, 5.47130012512207), (2503, 5.4491872787..."
2,3,"[(1788, 1.0502387285232544), (1183, 1.04326701..."
3,4,"[(9516, 5.52226448059082), (7283, 5.3289680480..."
4,5,"[(1338, 5.6025710105896), (6613, 5.12774229049..."
...,...,...
53025,53420,"[(4868, 4.748240947723389), (9076, 4.604145050..."
53026,53421,"[(9076, 5.553898334503174), (6590, 5.540632247..."
53027,53422,"[(8249, 5.175082206726074), (619, 5.0732164382..."
53028,53423,"[(7401, 5.680562973022461), (9076, 5.635863304..."


In [57]:
# Hybrid Recommendations

recommendations = [*b, *df]

recommendations

['Animal Farm / 1984',
 'A Kiss for Little Bear (An I Can Read Book) by Minarik, Else Holmelund [1984]',
 'The Hunger Games (The Hunger Games, #1)',
 "Harry Potter and the Sorcerer's Stone (Harry Potter, #1)",
 'Twilight (Twilight, #1)',
 'To Kill a Mockingbird',
 'The Great Gatsby',
 'The Fault in Our Stars',
 'The Hobbit',
 'The Catcher in the Rye',
 'user_id',
 'recommendations']

In [58]:
Books.to_dict()
#Books['Book-Title'].values


{0: 'The Hunger Games (The Hunger Games, #1)',
 1: "Harry Potter and the Sorcerer's Stone (Harry Potter, #1)",
 2: 'Twilight (Twilight, #1)',
 3: 'To Kill a Mockingbird',
 4: 'The Great Gatsby',
 5: 'The Fault in Our Stars',
 6: 'The Hobbit',
 7: 'The Catcher in the Rye',
 8: 'Angels & Demons  (Robert Langdon, #1)',
 9: 'Pride and Prejudice',
 10: 'The Kite Runner',
 11: 'Divergent (Divergent, #1)',
 12: '1984',
 13: 'Animal Farm',
 14: 'The Diary of a Young Girl',
 15: 'The Girl with the Dragon Tattoo (Millennium, #1)',
 16: 'Catching Fire (The Hunger Games, #2)',
 17: 'Harry Potter and the Prisoner of Azkaban (Harry Potter, #3)',
 18: 'The Fellowship of the Ring (The Lord of the Rings, #1)',
 19: 'Mockingjay (The Hunger Games, #3)',
 20: 'Harry Potter and the Order of the Phoenix (Harry Potter, #5)',
 21: 'The Lovely Bones',
 22: 'Harry Potter and the Chamber of Secrets (Harry Potter, #2)',
 23: 'Harry Potter and the Goblet of Fire (Harry Potter, #4)',
 24: 'Harry Potter and the Deat

In [60]:
# importing the files to display on webpage

import pickle

#pickle.dump(recommendations, open('recommendations.pkl', 'wb'))
#pickle.dump(b, open('b.pkl', 'wb'))
#pickle.dump()

In [54]:
pickle.dump(Books.to_dict(), open('books_dict_1.pkl', 'wb'))

In [56]:
pickle.dump(recommendations, open('recommendations.pkl', 'wb'))

TypeError: cannot pickle '_thread.RLock' object

In [62]:
pickle.dump(df,open('collab_recom.pkl', 'wb'))