In [9]:
import pyspark
from pyspark import SparkContext

In [10]:
sc=SparkContext("local","Recommender System")

In [2]:
import os
datasets_path = os.path.join('/Users/ayushkumar/Documents/SelfStudy/Python', 'datasets')
complete_dataset_path = os.path.join(datasets_path, 'ml-latest')
small_dataset_path = os.path.join(datasets_path, 'ml-latest-small')

In [20]:
# download the data
import urllib 
complete_dataset_url = 'http://files.grouplens.org/datasets/movielens/ml-latest.zip'
small_dataset_url = 'http://files.grouplens.org/datasets/movielens/ml-latest-small.zip'

small_f = urllib.request.urlretrieve (small_dataset_url, small_dataset_path)
complete_f = urllib.request.urlretrieve (complete_dataset_url, complete_dataset_path)

In [6]:
datasets_path

'/Users/ayushkumar/Documents/SelfStudy/Python/datasets'

In [7]:
import zipfile

with zipfile.ZipFile(small_dataset_path, "r") as z:
    z.extractall(datasets_path)

with zipfile.ZipFile(complete_dataset_path, "r") as z:
    z.extractall(datasets_path)

FileNotFoundError: [Errno 2] No such file or directory: '/Users/ayushkumar/Documents/SelfStudy/Python/datasets/ml-latest-small'

In [None]:
# loading ,parsing the dataset

In [11]:
small_ratings_raw_data=sc.textFile('/Users/ayushkumar/Documents/SelfStudy/Python/datasets/ml-latest-small/ratings.csv')

In [12]:
small_ratings_raw_data_header = small_ratings_raw_data.take(1)[0]

In [15]:
small_ratings_raw_data_header

'userId,movieId,rating,timestamp'

In [16]:
small_ratings_data = small_ratings_raw_data.filter(lambda line: line!=small_ratings_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1],tokens[2])).cache()

In [21]:
small_ratings_data.take(2)

[('1', '31', '2.5'), ('1', '1029', '3.0')]

In [22]:
#movies.csv
small_movies_raw_data = sc.textFile('/Users/ayushkumar/Documents/SelfStudy/Python/datasets/ml-latest-small/movies.csv')
small_movies_raw_data_header = small_movies_raw_data.take(1)[0]

small_movies_data = small_movies_raw_data.filter(lambda line: line!=small_movies_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1])).cache()
    
small_movies_data.take(3)

[('1', 'Toy Story (1995)'),
 ('2', 'Jumanji (1995)'),
 ('3', 'Grumpier Old Men (1995)')]

In [None]:
'''
ALS MLIB Parameters:
    numBlocks is the number of blocks used to parallelize computation (set to -1 to auto-configure).
    rank is the number of latent factors in the model.
    iterations is the number of iterations to run.
    lambda specifies the regularization parameter in ALS.
    implicitPrefs specifies whether to use the explicit feedback ALS variant or one adapted for implicit feedback data.
    alpha is a parameter applicable to the implicit feedback variant of ALS that governs the baseline confidence in preference observations.

'''

In [24]:
training_RDD, validation_RDD, test_RDD = small_ratings_data.randomSplit([6, 2, 2], seed=0)

In [25]:
validation_for_predict_RDD = validation_RDD.map(lambda x: (x[0], x[1]))
test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))

In [31]:
from pyspark.mllib.recommendation import ALS
import math

seed = 5
iterations = 10
regularization_parameter = 0.1
ranks = [4, 8, 12]
errors = [0, 0, 0]
err = 0
tolerance = 0.02

min_error = float('inf')
best_rank = -1
best_iteration = -1
for rank in ranks:
    model = ALS.train(training_RDD, rank, seed=seed, iterations=iterations,
                      lambda_=regularization_parameter)
    predictions = model.predictAll(validation_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
    rates_and_preds = validation_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
    error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    errors[err] = error
    err += 1
    print ('For rank %s the RMSE is %s' % (rank, error))
    if error < min_error:
        min_error = error
        best_rank = rank

print ('The best model was trained with rank %s'%best_rank)

For rank 4 the RMSE is 0.9434358506197258
For rank 8 the RMSE is 0.9462567143051793
For rank 12 the RMSE is 0.9425023000110846
The best model was trained with rank 12


In [32]:
predictions.take(3)

[((390, 667), 3.5792800508449423),
 ((48, 44828), 0.3350496551427177),
 ((428, 5618), 4.345656412438634)]

In [33]:
rates_and_preds.take(3)

[((1, 1061), (3.0, 2.6052835007639903)),
 ((1, 1129), (2.0, 1.9477235071360743)),
 ((1, 1371), (2.5, 1.857560319630406))]

In [34]:
model = ALS.train(training_RDD, best_rank, seed=seed, iterations=iterations,
                      lambda_=regularization_parameter)

In [35]:
predictions = model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds = test_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())

In [39]:
print("Testing RMSE %s"%error)

Testing RMSE 0.9467756493586419


In [40]:
complete_ratings_raw_data = sc.textFile('/Users/ayushkumar/Documents/SelfStudy/Python/datasets/ml-latest/ratings.csv')
complete_ratings_raw_data_header = complete_ratings_raw_data.take(1)[0]


In [41]:
complete_ratings_data = complete_ratings_raw_data.filter(lambda line: line!=complete_ratings_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),int(tokens[1]),float(tokens[2]))).cache()

In [42]:
print ("There are %s recommendations in the complete dataset" % (complete_ratings_data.count()))

There are 24404096 recommendations in the complete dataset


In [43]:
training_RDD, test_RDD = complete_ratings_data.randomSplit([7, 3], seed=0)

complete_model = ALS.train(training_RDD, best_rank, seed=seed, 
                           iterations=iterations, lambda_=regularization_parameter)

In [45]:
test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))

predictions = complete_model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds = test_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    
print('For testing data the RMSE is %s' % (error))

For testing data the RMSE is 0.816306996838434
