# Authors
 - Nwamaka Nzeocha
 - Fabian Okeke

# Recommendation System Datasets

This notebook uses the following datasets:

- [MovieLens 10M data set](http://grouplens.org/datasets/movielens/10m/)
- [MovieLens 22M data set](http://grouplens.org/datasets/movielens/latest/)
- [Million song data set](http://labrosa.ee.columbia.edu/millionsong/tasteprofile)

## Split dataset into 60-20-20 train-validate-test partitions

In [None]:
import os

def exists(filepath):
    return os.path.exists(filepath)

In [None]:
if (exists('ml-10M100K/train60.dat') and exists('ml-10M100K/validation20.dat') and exists('ml-10M100K/test20.dat')):
    print "Already created files: train60.dat, validation20.dat, test20.dat"    

else:
    # sort by timestamp (4th column)
    print 'sorting file...'
    !sort -t ':' -k4 ml-10M100K/ratings.dat > ml-10M100K/new_ratings.dat 
    print "sorting complete."
    
    # split into 5 parts of 2 million each: train(3 parts), validation (1 part), test (1 part)
    print "splitting file..."
    !split -l 2000000 ml-10M100K/new_ratings.dat ff
    !cat ffaa ffab ffac > ml-10M100K/train60.dat
    !mv ffad ml-10M100K/validation20.dat
    !mv ffae ml-10M100K/test20.dat
    
    # remove tmp files used to create partitions
    !rm new_ratings.dat ff*
    print "splitting complete."    
    print "Newly created files: train60.dat, validation20.dat, test20.dat"

In [None]:
import contextlib
from math import sqrt
from operator import add
import sys
from pyspark.mllib.recommendation import ALS

help(ALS.train)

### Meaning of parameters

- numBlocks is the number of blocks used to parallelize computation (set to -1 to auto-configure).
- ***rank*** is the number of latent factors in the model.
- iterations is the number of iterations to run.
- ***lambda*** specifies the regularization parameter in ALS.
- implicitPrefs specifies whether to use the explicit feedback ALS variant or one adapted for implicit feedback data.
- alpha is a parameter applicable to the implicit feedback variant of ALS that governs the baseline confidence in preference observations.


# Using train data, learn ALS model

In [None]:
def parse_rating(line):
    """
    Parses a rating record that's in MovieLens format.
    
    :param str line: userId::movieId::rating::timestamp
    """
    fields = line.strip().split("::")

    return (int(fields[0]),   # User ID
            int(fields[1]),   # Movie ID
            float(fields[2])) # Rating


def compute_rmse(model, data, dataCount, bias=None):
    """
    Compute RMSE (Root Mean Squared Error).
    :param object model
    :param list data
    :param integer validation_count
    :biasDict: biased values to be added back
    """
    predictions = model.predictAll(data.map(lambda x: (x[0], x[1]))) #userId and #movieId

    if type(bias) == float:
        predictions = predictions.map(lambda(u,m,r): (u,m,r+bias))
    elif type(bias) == dict:
        predictions = predictions.map(lambda(u,m,r): (u,m,r+bias.get(0,0.409)))
        
    predictionsAndRatings = \
        predictions.map(lambda x: ((x[0], x[1]), x[2])) \
                   .join(data.map(lambda x: ((x[0], x[1]), x[2]))) \
                   .values()
    return sqrt(
        predictionsAndRatings.map(
            lambda x: (x[0] - x[1]) ** 2
        ).reduce(add) / float(dataCount)
    )

In [None]:
training = sc.textFile('ml-10M100K/train60.dat') \
         .filter(lambda x: x and len(x.split('::')) == 4) \
         .map(parse_rating)

In [None]:
validation = sc.textFile('ml-10M100K/validation20.dat') \
         .filter(lambda x: x and len(x.split('::')) == 4) \
         .map(parse_rating)

In [None]:
test = sc.textFile('ml-10M100K/test20.dat') \
         .filter(lambda x: x and len(x.split('::')) == 4) \
         .map(parse_rating)

In [None]:
trainCount = training.count()
trainCount

In [None]:
validationCount = validation.count()
validationCount

In [None]:
testCount = test.count()
testCount

In [None]:
training.take(3)

In [None]:
validation.take(3)

In [None]:
test.take(3)

### Train ALS model using different regularization parameter and latent factors

In [None]:
rank_list = [10, 20, 30, 40, 50] # latent factor
lamda_list = [0.01, 0.1, 1.0, 10.0] # regularization parameter
iterations = 10
chosenModel = None
smallestRMSE = 9999999

for rank in rank_list:
    for lamda in lamda_list:
        model = ALS.train(training, rank, iterations, lamda)
        rmse = compute_rmse(model, validation, validationCount)
        
        if rmse < smallestRMSE:
            smallestRMSE = rmse
            chosenModel = model

        print 'Rank={}, Lambda={}, RMSE={}'.format(rank, lamda, rmse)

In [None]:
print 'The smallest RMSE is:{0: .2f}'.format(smallestRMSE)

### Use chosen model with test set

In [None]:
testRMSE = compute_rmse(chosenModel, test, testCount)
print 'Final error metric using test set ={0: .2f}'.format(testRMSE)

### Create ratings file that contains movie ratings for one user

In [None]:
user01Ratings = sc.textFile('ml-10M100K/ratings.dat')
user01Ratings = newRatings.filter(lambda x: x.split('::')[0] == '1') # userId == 1
if not exists('ml-10M100K/user01Ratings.dat'):
    user01Ratings.saveAsTextFile('ml-10M100K/user01Ratings.dat')

In [None]:
def generate_recommendations(model, ratingsFile, numRecommended=5):

    userMovies = sc.textFile(ratingsFile) \
        .filter(lambda x: x and len(x.split('::')) == 4) \
        .map(parse_rating) \
        .map(lambda x: x[1]).collect()

    # get all the rated films that the user has not seen yet
    moviesNotSeen = sc.textFile('ml-10M100K/ratings.dat')\
        .filter(lambda x: x and len(x.split('::')) == 4)\
        .map(parse_rating).map(lambda r: (r[1], 1)) \
        .reduceByKey(add).map(lambda r: r[0])\
        .filter(lambda r: r not in userMovies).collect()

    candidates = sc.parallelize(moviesNotSeen) \
                .map(lambda x: (x, 1)) \
                .cache()
            
    predictions = model.predictAll(candidates).collect()
    predictions = sorted(predictions, key=lambda x: x[2], reverse=True)[:numRecommended]

    movies = ''
    with open('ml-10M100K/movies.dat', 'r') as open_file:
        movies = {int(line.split('::')[0]): line.split('::')[1]
              for line in open_file
              if len(line.split('::')) == 3}

    recommendations = []
    for movieId, _, _ in predictions:
        if movieId in movies:
            recommendations.append(movies[movieId]) 
 
    return recommendations

In [None]:
ratingsFile = 'ml-10M100K/user01Ratings.dat'
generate_recommendations(chosenModel, ratingsFile)

## Remove Global Bias/User Bias/Item Bias

In [None]:
def getBestTrainingParameters(training, validation, validationCount, biasDict=None, isImplicit=False):
    #rank_list = [10, 20, 30] # latent factor
    rank_list = [10]
    #lamda_list = [0.01, 0.1, 1.0] # regularization parameter
    lamda_list = [0.01]
    iterations = 5
    bestModel, bestRMSE, bestRank, bestLamda = None, float("inf"), None, None

    for rank in rank_list:
        for lamda in lamda_list:
            
            if isImplicit:
                model = ALS.trainImplicit(training, rank, iterations, lamda)
            else:
                model = ALS.train(training, rank, iterations, lamda)
                
            rmse = compute_rmse(model, validation, validationCount, biasDict)

            if rmse < bestRMSE:
                bestModel, bestRMSE, bestRank, bestLamda = model, rmse, rank, lamda

            print 'RMSE={}: Rank={}, Lambda={}'.format(rmse, rank, lamda)
    
    return (bestModel,bestRMSE,bestRank,bestLamda)

### Global average bias

In [None]:
sumCount = training.map(lambda (u,m,r): (m,r)).combineByKey(lambda value: (value, 1),
                             lambda x, value: (x[0] + value, x[1] + 1),
                             lambda x, y: (x[0] + y[0], x[1] + y[1]))

globalAvg = sumCount.map(lambda (label, (value_sum, count)): (label, round(value_sum / count, 3))) # 3 dp
globalAvg = globalAvg.collectAsMap() # dict

sumKeys, sumValues = 0, 0
for k,v in enumerate(globalAvg):
    sumKeys += k
    sumValues += v

globalAvg = float(sumKeys)/sumValues
print "globalAvg:", globalAvg

### Remove global average bias

In [None]:
trainingWithoutGlobalAvg = training.map(lambda (u,m,r): (u,m,r-globalAvg))
trainingWithoutGlobalAvg.take(3)

In [None]:
globalAvgResults = getBestTrainingParameters(trainingWithoutGlobalAvg, validation, validationCount, globalAvg)

In [None]:
print "rmse after treating avg ratings bias:", globalAvgResults[1]

### Item/Movie bias

In [None]:
sumCount = training.map(lambda (u,m,r): (m,r)).combineByKey(lambda value: (value, 1),
                             lambda x, value: (x[0] + value, x[1] + 1),
                             lambda x, y: (x[0] + y[0], x[1] + y[1]))

itemBias = sumCount.map(lambda (label, (value_sum, count)): (label, round(value_sum / count, 3))) # 3 dp
itemBias = itemBias.collectAsMap() # dict

# show n keys
i, N = 0, 5
for k,v in enumerate(itemBias):
    print k, ":", v
    i += 1
    if i == N: break

### Remove item bias

In [None]:
trainingWithoutItemBias = training.map(lambda (u,m,r): (u,m,r-globalAvg[m]))
itemBiasResults = getBestTrainingParameters(trainingWithoutItemBias, validation, validationCount, itemBias)

In [None]:
print "best rmse (item bias):", itemBiasResults[1]

### User bias

In [None]:
sumCount = training.map(lambda (u,m,r): (u,r)).combineByKey(lambda value: (value, 1),
                             lambda x, value: (x[0] + value, x[1] + 1),
                             lambda x, y: (x[0] + y[0], x[1] + y[1]))

userBias = sumCount.map(lambda (label, (value_sum, count)): (label, round(value_sum / count, 3))) # 3 dp
userBias = userBias.collectAsMap() # dict

# show n keys
i, N = 0, 5
for k,v in enumerate(userBias):
    print k, ":", v
    i += 1
    if i == N: break

### Remove user bias

In [None]:
trainingWithoutUserBias = training.map(lambda (u,m,r): (u,m,userBias.get(u,0.409))) #replace with avg when no rating
userBiasResults = getBestTrainingParameters(trainingWithoutUserBias, validation, validationCount, userBias)

In [None]:
print "best rmse (user bias):", userBiasResults[1]

## Million Song Dataset

### Split dataset into 60-20-20

In [None]:
if (exists('songTrain60.txt') and exists('songValidation20.txt') and exists('songTest20.txt')):
    print "Already created files: songTrain60.txt, songValidation20.txt, songTest20.txt"    

else:
    # split into chunks of 3.2 million each (total dataset: 48373586 lines)
    print "splitting file..."
    !split -l 3200000 train_triplets.txt ff
    
    !cat ffae ffaj ffab ffai ffaf ffad ffam ffac ffah > songTrain60.txt
    !rm ffae ffaj ffab ffai ffaf ffad ffam ffac ffah
    
    !cat ffal ffag ffaa > songValidation20.txt
    !rm ffal ffag ffaa
    
    !cat ff* > songTest20.txt
    !rm ff*

    print "splitting complete."    
    print "Newly created files: songTrain60.txt, songValidation20.txt, songTest20.txt"

In [None]:
songTraining = sc.textFile('songTrain60.txt', use_unicode=False)
songTraining.take(3)

In [None]:
songValidation = sc.textFile('songValidation20.txt', use_unicode=False)
songValidation.take(3)

In [None]:
songTest = sc.textFile('songTest20.txt', use_unicode=False)
songTest.take(3)

In [None]:
songTrainingCount = songTraining.count()
songTrainingCount

In [None]:
songValidationCount = songValidation.count()
songValidationCount

In [None]:
songTestCount = songTest.count()
songTestCount

In [None]:
binarySongs = songDataset.map(lambda x: x.split("\t")).map(lambda(x,y,z): (x,y,1) if int(z) > 5 else (x,y,0))
binarySongs.take(3)

### Create single user ratings file

In [None]:
user01Songs = sc.textFile('train_triplets.txt')  #(user, song, play count) 

In [None]:
user01Songs = user01Songs.filter(lambda x: x.split('\t')[0] == 'b80344d063b5ccb3212f76538f3d9e43d87dca9e') # userId
if not exists('user01Songs.txt'):
    user01Songs.saveAsTextFile('user01Songs.txt')

### Create song implicit ALS model

In [None]:
songResults = getBestTrainingParameters(binarySongs, songValidation, songValidationCount, isImplicit=True)

In [None]:
chosenSongModel = songResults[0]
print "songResults best rmse:", songResults[1]

In [None]:
songRatingsFile = 'user01Songs.txt'
generate_recommendations(chosenSongModel, songRatingsFile)

In [None]:
binarySongs.take(4)