In [1]:
%matplotlib inline
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
from datetime import datetime

import pyspark
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import SparkSession
import numpy as np


import os
from time import time

In [2]:
#file = 'ratings.csv'
file = ('ratings_small.csv')
ratings_df = pd.read_csv(file)

ratings_df.shape

(100004, 4)

In [3]:
sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))
small_ratings_raw_data = sc.textFile('ratings_small.csv')
small_ratings_raw_data_header = small_ratings_raw_data.take(1)[0]

In [4]:
small_ratings_data = small_ratings_raw_data.filter(lambda line: line!=small_ratings_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1],tokens[2])).cache()

In [5]:
small_ratings_data.take(3)

[('1', '31', '2.5'), ('1', '1029', '3.0'), ('1', '1061', '3.0')]

In [6]:
training_RDD, validation_RDD, test_RDD = small_ratings_data.randomSplit([6, 2, 2], seed=0)
validation_for_predict_RDD = validation_RDD.map(lambda x: (x[0], x[1]))
test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))

In [7]:
from pyspark.mllib.recommendation import ALS
import math

In [8]:
seed = 5
iterations = [5,10, 15, 25]
regularization_parameter = [0.05, 0.1, 0.2]
ranks = [5, 10, 15]
errors = []
err = 0

In [9]:
min_error = float('inf')
best_rank = -1
best_iteration = -1
best_reg_par=-1

In [10]:
for rank in ranks:
    for itert in iterations:
        for reg_par in regularization_parameter:
            t0=time()
            print(rank, itert, reg_par)
            model = ALS.train(training_RDD, rank, seed=seed, iterations=itert, lambda_=reg_par)
            predictions = model.predictAll(validation_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
            rates_and_preds = validation_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
            error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
            t1=time()-t0
            errors.append(tuple((rank, itert, reg_par, error)))
            err += 1
            print ('For rank %s, iterations %s the RMSE is %s in %s sec' % (rank, itert, error, t1))
            if error < min_error:
                min_error = error
                best_rank = rank
                best_iteration = itert
                best_reg_par=reg_par
            

5 5 0.05
For rank 5, iterations 5 the RMSE is 0.9909944177563392 in 6.752425670623779 sec
5 5 0.1
For rank 5, iterations 5 the RMSE is 0.9431141589594201 in 4.5080602169036865 sec
5 5 0.2
For rank 5, iterations 5 the RMSE is 0.9225513302402039 in 4.220797061920166 sec
5 10 0.05
For rank 5, iterations 10 the RMSE is 0.9933565693338079 in 3.916440963745117 sec
5 10 0.1
For rank 5, iterations 10 the RMSE is 0.9366908878153927 in 3.585076093673706 sec
5 10 0.2
For rank 5, iterations 10 the RMSE is 0.9167557864526742 in 3.908825159072876 sec
5 15 0.05
For rank 5, iterations 15 the RMSE is 0.9969180104204706 in 3.708817720413208 sec
5 15 0.1
For rank 5, iterations 15 the RMSE is 0.9387085472995429 in 3.952730894088745 sec
5 15 0.2
For rank 5, iterations 15 the RMSE is 0.9169477037160668 in 3.632023334503174 sec
5 25 0.05
For rank 5, iterations 25 the RMSE is 0.9989888606647397 in 4.134925842285156 sec
5 25 0.1
For rank 5, iterations 25 the RMSE is 0.9396901243067682 in 4.328730821609497 sec


In [11]:
print ('The best model was trained with rank %s interations %s, reg param %s ' % (best_rank, best_iteration, best_reg_par))

The best model was trained with rank 15 interations 10, reg param 0.2 


In [13]:
predictions.take(3)

[((390, 667), 3.587789609945542),
 ((48, 44828), 0.4064959666892259),
 ((428, 5618), 4.091352887329464)]

In [14]:
rates_and_preds.take(3)

[((1, 1061), (3.0, 2.638034823999758)),
 ((1, 1129), (2.0, 2.515218704015662)),
 ((1, 1371), (2.5, 2.1242737804306535))]

In [16]:
model = ALS.train(training_RDD, best_rank, seed=seed, iterations=best_iteration, lambda_=best_reg_par)
predictions = model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds = test_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    
print ('For testing data the RMSE is %s' % (error))

For testing data the RMSE is 0.9226471804373351


In [17]:
complete_ratings_raw_data = sc.textFile('ratings.csv')
complete_ratings_raw_data_header  = complete_ratings_raw_data.take(1)[0]

In [18]:
complete_ratings_data = complete_ratings_raw_data.filter(lambda line: line!=complete_ratings_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),int(tokens[1]),float(tokens[2]))).cache()
    
print ("There are %s recommendations in the complete dataset" % (complete_ratings_data.count()))

There are 26024289 recommendations in the complete dataset


In [19]:
training_RDD, test_RDD = complete_ratings_data.randomSplit([7, 3], seed=0)

In [21]:
t0 = time()
complete_model = ALS.train(training_RDD, best_rank, seed=seed, 
                           iterations=best_iteration, lambda_=best_reg_par)
tt = time() - t0

In [22]:
print ("Time taken to train the full data for %s rank, %s iterations %s" % (best_rank, best_iteration, tt))

Time taken to train the full data for 15 rank, 10 iterations 213.7662742137909


In [23]:
test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))

In [24]:
t0=time()
predictions = complete_model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2] ))
rates_and_preds = test_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
#print ('For testing data the RMSE is %s' % (error))
t1=time()-t0

In [25]:
print ('For testing data the RMSE is %s in %s secs' % (error, t1))

For testing data the RMSE is 0.8649513155281516 in 498.9620997905731 secs


In [None]:
predictions_rounded = complete_model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), (r[2]))

In [None]:
predictions_rounded.take(5)