# Collaborative Filtering - RDD-based API 

### Loading and parsing datasets

In [1]:
from pyspark.mllib.recommendation import ALS
import math

In [2]:
from pyspark import SparkContext
sc = SparkContext()

On charge le dataset.

In [10]:
#small_ratings_raw_data = sc.textFile('/home/moscaale/Téléchargements/ml-latest-small/ratings.csv') # small standard dataset
small_ratings_raw_data = sc.textFile('/media/moscaale/NBD20/samples10M/sample2.csv') # small standard dataset
#complete_ratings_raw_data = sc.textFile('/home/moscaale/Téléchargements/ml-latest/ratings.csv') # full standard dataset
#complete_ratings_raw_data = sc.textFile('/media/moscaale/NBD20/samples20M/sample0.csv') # random sample dataset from Netflix

On filtre le header inclus dans le fichier.

In [11]:
small_ratings_raw_data_header = small_ratings_raw_data.take(1)[0]
#complete_ratings_raw_data_header = complete_ratings_raw_data.take(1)[0]

On parse les données brutes dans un nouvel RDD.

In [12]:
small_ratings_data = small_ratings_raw_data.filter(lambda line: line!=small_ratings_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1],tokens[2])).cache()

In [14]:
small_ratings_data.take(3)

[('13462', '94598', '3'), ('13555', '2315130', '3'), ('2580', '1346432', '3')]

### How to use Spark MLlib with collaborative-filtering

Spark MLlib fournit un implémentation pour le collaborative-filtering qui utilise ALS (Alternative Least Squares).


In [None]:
iterations = 10
regularization_parameter = 0.1
ranks = [4, 8, 12]
errors = [0, 0, 0]
err = 0
tolerance = 0.02

min_error = float('inf')
best_rank = -1
best_iteration = -1

In [None]:
# small dataset

training_RDD, validation_RDD, test_RDD = small_ratings_data.randomSplit([6, 2, 2]) # , seed='0L'
validation_for_predict_RDD = validation_RDD.map(lambda x: (x[0], x[1]))
test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))

for rank in ranks:
    model = ALS.train(training_RDD, rank=rank, iterations=iterations, lambda_=regularization_parameter)
    predictions = model.predictAll(validation_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
    rates_and_preds = validation_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
    error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    errors[err] = error
    err += 1
    print('For rank {} the RMSE is {}'.format(rank, error))
    if error < min_error:
        min_error = error
        best_rank = rank

print('The best model was trained with rank {}'.format(best_rank))


In [None]:
model = ALS.train(training_RDD, best_rank, iterations=iterations,
                      lambda_=regularization_parameter)
predictions = model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds = test_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    
print('For testing data the RMSE is {}'.format(error))


In [None]:
# Full dataset
complete_ratings_data = complete_ratings_raw_data.filter(lambda line: line!=complete_ratings_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),int(tokens[1]),float(tokens[2]))).cache()
print('There are {} recommendations in the complete dataset'.format(complete_ratings_data.count()))

training_RDD, test_RDD = complete_ratings_data.randomSplit([7, 3])

complete_model = ALS.train(training_RDD, best_rank, iterations=iterations, lambda_=regularization_parameter)
test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))

predictions = complete_model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds = test_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    
print('For testing data the RMSE is {}'.format(error))