In [1]:
from __future__ import division
import pyspark
from pyspark.sql.functions import *
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.sql import SQLContext
import numpy as np
import math
import matplotlib.pyplot as plt
import itertools
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
sql = pyspark.SQLContext(sc)

In [2]:
# Read parque files 
data = sqlContext.read.parquet("/FileStore/tables/se7o9yrc1496965975603/ratings_dat.parq")
display(data)
data.cache()

user_data = sqlContext.read.parquet("/FileStore/tables/3202ecjc1498244879905/users_dat.parq")
display(user_data)

book_data = sqlContext.read.parquet("/FileStore/tables/idxrkakn1498252895102/books_dat.parq")
display(book_data)

In [3]:
data.schema.names
df = data.select(col("User-ID").alias("userid"), col("ISBN").alias("isbn"), col("Book-Rating").alias("bookrating"))
df.take(10)
df.printSchema()

In [4]:
# mllib als requires that userid and bookid both integer, so here stringIndexer() is used to index string ids
# isbn is given a new id since there are letters in isbn
stringIndexer = StringIndexer(inputCol="isbn", outputCol="bookid")
model = stringIndexer.fit(df)
indexed = model.transform(df)
indexed.show()

# to convert back to string id
#converter = IndexToString(inputCol="categoryIndex", outputCol="originalCategory")
#converted = converter.transform(indexed)
#converted.show()

# convert ids into integer
df2 = indexed.select(indexed.userid.cast("integer"),
                     indexed.bookid.cast("integer"), 
                     indexed.bookrating.cast("integer"))
df2.printSchema()
df2.show()

In [5]:
print type(data)

numRatings = df2.count()
numUsers = df2.map(lambda r: r[0]).distinct().count()
numBooks = df2.map(lambda r: r[1]).distinct().count()

print "Got %d ratings from %d users on %d books." % (numRatings, numUsers, numBooks)

In [6]:
splits = df2.randomSplit([0.8, 0.2], 18)
train_set = splits[0]
vali_set = splits[1]
print "Got %d in train_set, %d in vali_set." % (train_set.count(), vali_set.count())

In [7]:
type(train_set), type(df2), type(data)

In [8]:
# Check sparsity in tain_set
# have to change train_set from dataframe to RDD in order to query
rating_ct = train_set.filter('bookrating != 0').count()
train_users_ct = train_set.map(lambda r: r[0]).distinct().count()
train_books_ct = train_set.map(lambda r: r[1]).distinct().count()
sparsity = rating_ct/(train_users_ct * train_books_ct)
sparsity
print 'Sparsity: {:4.4f}%'.format(sparsity*100)

In [9]:
train_set.describe('bookrating').show()

In [10]:
## In this section, we will use ALS.train to train a bunch of models, and select and evaluate the best. Among the training paramters of ALS, the most important ones are rank, lambda (regularization constant), and number of iterations. 

In [11]:
ranks = [5, 10, 40]
regularizations = [0.1, 1., 10., 100.]
regularizations.sort()
iter_array = [1, 5, 10, 15]

best_params = {}
best_params['n_factors'] = ranks[0]
best_params['reg'] = regularizations[0]
best_params['n_iter'] = 0
best_params['train_mse'] = np.inf
best_params['test_mse'] = np.inf
best_params['model'] = None


In [12]:
import math 
vali_set_x = vali_set.map(lambda p: (p[0], p[1]))
train_set_x = train_set.map(lambda p: (p[0], p[1]))

In [13]:
import math 
vali_set_x = vali_set.map(lambda p: (p[0], p[1]))
train_set_x = train_set.map(lambda p: (p[0], p[1]))

for rank in ranks:
  print 'rank: {}'.format(rank)
  for regularization in regularizations:
    print 'regparam: {}'.format(regularization)
    for iter_round in iter_array:
      print 'iter_round: {}'.format(iter_round)
      model = ALS.train(train_set, 
                        rank=rank, iterations=iter_round, lambda_=regularization)
      
      predictions = model.predictAll(vali_set_x).map(lambda r: ((r[0], r[1]), r[2]))
      ratesAndPreds = vali_set.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)      
      RMSE =  math.sqrt(ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
      print("Test RMSE = " + str(RMSE))
      
      train_predictions = model.predictAll(train_set_x).map(lambda r: ((r[0], r[1]), r[2]))
      train_ratesAndPreds = train_set.map(lambda r: ((r[0], r[1]), r[2])).join(train_predictions)      
      train_RMSE =  math.sqrt(train_ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
      print("Train RMSE = " + str(train_RMSE))
      
      if RMSE < best_params['test_mse']:
        print 'New optimal hyperparameters updated. New RMSE is {}'.format(RMSE)
        best_params['n_factors'] = rank
        best_params['reg'] = regularization
        best_params['n_iter'] = iter_round
        best_params['train_mse'] = train_RMSE
        best_params['test_mse'] = RMSE
        best_params['model'] = model  

In [14]:
best_params
# {'model': <pyspark.mllib.recommendation.MatrixFactorizationModel at 0x7f736b71ed50>,
# 'n_factors': 40,
# 'n_iter': 15,
# 'reg': 1.0,
# 'test_mse': 3.8086599896164373,
# 'train_mse': 2.1490012046727935}

In [15]:
# plot RMSE VS. Iteration 
rank = 40
regularization = 1.0
iter_array = [1, 2, 5, 10, 15, 20]
RMSE_series = []
train_RMSE_series = []

for iter_round in iter_array:
  print 'iter_round: {}'.format(iter_round)
  model = ALS.train(train_set,
                    rank=rank, iterations=iter_round, lambda_=regularization)
      
  predictions = model.predictAll(vali_set_x).map(lambda r: ((r[0], r[1]), r[2]))
  ratesAndPreds = vali_set.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
  RMSE =  math.sqrt(ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
  RMSE_series.append(RMSE)
  print("Test RMSE = " + str(RMSE))
      
  train_predictions = model.predictAll(train_set_x).map(lambda r: ((r[0], r[1]), r[2]))
  train_ratesAndPreds = train_set.map(lambda r: ((r[0], r[1]), r[2])).join(train_predictions)
  train_RMSE =  math.sqrt(train_ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
  train_RMSE_series.append(train_RMSE)
  print("Train RMSE = " + str(train_RMSE))     

In [16]:
best_params

In [17]:
import matplotlib.pyplot as plt

fig, ax = plt.subplots()

ax.plot(iter_array, train_RMSE_series,
         label='Training', linewidth=3)
ax.plot(iter_array, RMSE_series,
         label='Test', linewidth=3)

ax.plot(iter_array, train_RMSE_series, 'ro')
ax.plot(iter_array, RMSE_series, 'ro')

# Add a legend
ax.legend()
# Show the plot
display(fig)

In [18]:
model_path = './book_reco'
# Save model
model = best_params['model']
model.save(sc, model_path)

In [19]:
model_path = './book_reco'
model = MatrixFactorizationModel.load(sc, model_path)

#### Besides spliting into train nad test set, we can also musk some ratings from the datasets and use these as test data

In [21]:
dat = df2
# valid rating counts
type(dat)
dat.describe()
rating_ttl = dat.count()
non_zero_ct = dat.filter(dat.bookrating != 0).count()

print('%s ratings, %s non-zero ratings' %(rating_ttl, non_zero_ct))

In [22]:
non_zero_ratings = dat.filter(dat.bookrating != 0)
zero_ratings = dat.filter(dat.bookrating == 0)

# sample from non_zero_ratings, split by 20%, 80%
splits = non_zero_ratings.randomSplit([0.8, 0.2], 18)
non_zero_train = splits[0]
non_zero_test = splits[1]

new_train = zero_ratings.unionAll(non_zero_train)
new_test = non_zero_test

In [23]:
new_train.take(4)

In [24]:
n_factors = 40
n_iter = 20
reg = 1.0
model = ALS.train(non_zero_train,
                  rank=n_factors, iterations=n_iter, lambda_=reg)
test_x = new_test.map(lambda p: (p[0], p[1]))
predictions = model.predictAll(test_x).map(lambda r: ((r[0], r[1]), r[2]))
ratesAndPreds = new_test.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
RMSE =  math.sqrt(ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
print('masked cells has test RMSE of %s' % RMSE)

#### summarize rating counts by books so we can only recommend books that has ratings over a threshold

In [26]:
# print train_set.take(2)
# train_set.map(lambda r: r[2]).distinct().collect()
# rating 0 to 10

In [27]:
rating_ct_by_book = train_set.filter(train_set.bookrating != 0).groupBy('bookid').count().sort('count', ascending=False)
rating_ct_by_book.take(3)

#### randomly select one user in the validation set and treat it as a unseen user. Generate recommendation to this user

In [29]:
# randomly pick a user in test set

new_user = vali_set.filter(vali_set.bookrating != 0).select('userid').rdd.takeSample(False, 1, seed=0)
new_user
# userid is 1848
new_user_ID = new_user[0][0]

In [30]:
# find out his unrated books

rated_books = vali_set.filter(vali_set.userid == new_user_ID).filter(vali_set.bookrating != 0)
rated_books.show()
rated_books_ids = rated_books.map(lambda x: x[1]).collect()
type(rated_books_ids)
#input for prediction new_user_x should include only the books that are NOT rated by this new user
unrated_x = train_set.where(~col("bookid").isin(rated_books_ids)).map(lambda x: (new_user_ID, x[1])).distinct()
unrated_x.take(5)
#new_user_pred = model.predictAll(new_user_x).map(lambda r: ((r[0], r[1]), r[2]))

In [32]:
from datetime import datetime
aa = sc.parallelize[('10:40:31', '10:39:31')]
FMT = "%H:%M:%S"
duration = aa.map(lambda p: (datetime.strptime(p[0], FMT) - datetime.strptime(p[1], FMT) ))

In [33]:
# create new traing set to retrain the model that include this new user's rated books

# include these rating records of the new user into train_set and re-train the model with best_params to get new_user_model and use new_user_model to predict the scores for books that are not rated by the new user
new_train_set = train_set.unionAll(rated_books)
from time import time

t0 = time()
new_user_model = ALS.train(train_set,
                  rank=40, iterations=20, lambda_=1.0)
time_diff = time() - t0

print "New model trained in %s seconds" % time_diff

In [34]:
# use the new model to predict new user's rating on the books he never rated before
new_user_pred = new_user_model.predictAll(unrated_x).map(lambda row: (row.product, row.rating))
new_user_pred.take(4), type(new_user_pred) # pyspark.rdd.PipelinedRDD

In [35]:
book_data.show()

In [36]:
print type(book_data)
book_data2 = book_data.select(col("ISBN").alias("ISBN"), col("Book-Title").alias("book_title"))
stringIndexer = StringIndexer(inputCol="ISBN", outputCol="bookid")
model = stringIndexer.fit(book_data2)
indexed = model.transform(book_data2)
indexed.show()

In [37]:
print(type(indexed))
book_data2 = indexed.select(indexed.bookid.cast("integer"),
                            indexed.book_title).rdd
book_data2.take(3)

In [38]:
rating_ct_by_book_rdd.take(3)

In [39]:
new_user_pred.take(3)

In [40]:
rating_ct_by_book_rdd = rating_ct_by_book.rdd
type(rating_ct_by_book_rdd)
rdd_join = rating_ct_by_book_rdd.join(new_user_pred).join(book_data2)    
# rdd join rdd, the key will by default be the first element. If there are multiple, you have to zip the rest to isolate the first one. eg. (x0,(x1,x2,x3))

In [41]:
rdd_join.take(3)

In [42]:
final_dat = rdd_join.map(lambda r: (r[0], r[1][1], r[1][0][0], r[1][0][1]))
final_dat.take(2)
top10_reco = final_dat.filter(lambda x: x[2] > 30).takeOrdered(5, key=lambda x: -x[3])
top10_reco

In [43]:
new_user = vali_set.filter(vali_set.bookrating != 0).select('userid').rdd.takeSample(False, 1)
print(new_user)
# userid is 1848
new_user_ID = new_user[0][0]

In [44]:
vali_set.take(3)
type(vali_set)

In [46]:
from time import time
# write a function that takes new_user_ID and gives recommendation
def give_recomendation(new_user_ID):

  rated_books = vali_set.filter(vali_set.userid == new_user_ID).filter(vali_set.bookrating != 0)
  rated_books_ids = rated_books.map(lambda x: x[1]).collect()
  new_train_set = train_set.unionAll(rated_books)
  
  #input for prediction new_user_x should include only the books that are NOT rated by this new user
  # find out his unrated books
  unrated_x = train_set.where(~col("bookid").isin(rated_books_ids)).map(lambda x: (new_user_ID, x[1])).distinct()

  t0 = time()
  new_user_model = ALS.train(train_set,
                  rank=40, iterations=20, lambda_=1.0)
  time_diff = time() - t0
  print "New model trained in %s seconds" % time_diff
  # use the new model to predict new user's rating on the books he never rated before
  new_user_pred = new_user_model.predictAll(unrated_x).map(lambda row: (row.product, row.rating))

  rdd_join = rating_ct_by_book_rdd.join(new_user_pred).join(book_data2)

  final_dat = rdd_join.map(lambda r: (r[0], r[1][1], r[1][0][0], r[1][0][1]))
  top10_reco = final_dat.filter(lambda x: x[2] > 30).takeOrdered(15, key=lambda x: -x[3])
  
  return(top10_reco, rated_books)

x, rated = give_recomendation(37644)

In [47]:
# Show rated books
rated.select(rated.bookid, rated.userid, rated.bookrating).map(lambda x: (x[0], (x[1], x[2]))).join(book_data2).collect()

In [48]:
# Recommended books
x

In [49]:
vali_set.show(2)