# Lab 3 — recommender systems

In [None]:
import json
import matplotlib.pyplot as plt
import numpy as np
from operator import add
from operator import itemgetter
import pyspark.mllib.recommendation as rec

%matplotlib inline
plt.style.use("ggplot")

In [None]:
def makeItTuple(dataset, index, value):
    if(type(value) == int):
        return dataset.map(lambda x : (x[index], value))
    if(type(value) == str):
        return dataset.map(lambda x : (x[index], x[value]))

What does the data look like?

In [None]:
!hdfs dfs -cat /ix/ml-20m/ratings.txt | tail -n 2

#### Exercise 3.4

In [None]:
ratings = sc.textFile("/ix/ml-20m/ratings.txt").map(json.loads)
uidOne = makeItTuple(ratings, 'userId', 1)#creates a tuple (userId, 1)
movidOne = makeItTuple(ratings, 'userId', 1)#creates a tuple (filmId, 1)

#add values of all tuples with same key. Gives tuple (Id, occurences of this userId)
sortUser = sorted(uidOne.reduceByKey(add).collect())
sortMovie = sorted(movidOne.reduceByKey(add).collect())

In [None]:
#changes tuples (Id, occurence) to a list of occurences (random order)
tableUsers = np.zeros(len(sortUser))
tableMovies = np.zeros(len(sortMovie))
for i in range(len(sortUser)):
    tableUsers[i] = sortUser[i][1]
for i in range(len(sortMovie)):
    tableMovies[i] = sortMovie[i][1]
    
#Plot data
plt.plot(sorted(tableUsers)) #plot sorted values
plt.xlabel('User ID sorted by number of ratings')
plt.ylabel('Number of films rated')
plt.show();

plt.plot(sorted(tableMovies))
plt.xlabel('Movie ID sorted by number of ratings')
plt.ylabel('Number of ratings received')
plt.show();

We immediately see the distribution is not uniform : some users rated barely a few films (min is around 20), when some others rated more that 9'000 films. 

Same applies for the films. Most films have almost no rating : first 20'000 films have less than 200 ratings and first 4'000 have 1 ratings. On the other hand, a handful of them have tens of thousands.

#### Exercise 3.5

In [None]:
with open("my-ratings.txt", "r") as F: #open file
    newEntries = ",".join(F.read().split("\n")[:-1]) #one string of all entries
data = sc.parallelize(json.loads("["+newEntries+"]")) #interpret string as JSON and make pysparkRDD
joined = ratings.union(data) #fusion the datasets

#separate datasets w.r.t timestamp (~ 20%/80%)
validation = joined.filter(lambda entry : entry["timestamp"]%10 <= 1) 
training = joined.filter(lambda entry : entry['timestamp']%10 > 1)

#Making sure sizes are correct
totalsize = joined.count()
validationsize = validation.count()
trainingsize = training.count()
print("Training set represents", 100*trainingsize/totalsize,"% and validation set represents", 100*validationsize/totalsize, "%")

#### Exercise 3.6

In [None]:
##Computing global average
#transform to a list of ratings, then sum and divide by total size
global_avg = training.map(lambda entry : entry['rating']).fold(0, add)/trainingsize
print("Global average is", global_avg)

In [None]:
##Computing User bias
uid_sumRatings = makeItTuple(training, 'userId', 'rating').reduceByKey(add)
uid_numRatings = makeItTuple(training, 'userId', 1).reduceByKey(add)

user_bias = uid_sumRatings.join(uid_numRatings) #has shape (uid, (sumRatings, numRatings))
user_bias = user_bias.map(lambda x : (x[0], x[1][0]/x[1][1] - global_avg)); #has shape (uid, alpha)
listUserBias = user_bias.collect()
dicUserBias = dict(listUserBias)

In [None]:
##Computing remaining item bias
def computeBeta(x):
    """takes a tuple ((movId, list(uid, rating), size) and returns bias for the 
    """
    sumBias = 0
    sumRating = 0
    for i in x[0][1]: #i = (uid, rating)
        sumBias += dicUserBias[i[0]] #compute sum of bias
        sumRating += i[1] #compute sum of ratings
    return(x[0][0], (sumRating-sumBias)/x[1] - global_avg) #return tuple (movId, beta)

movId_ratings = training.map(lambda entry : (entry['movieId'], (entry['userId'], entry['rating'])))
item_bias = movId_ratings.groupByKey().map(lambda x : (x, len(x[1]))) #shape ((movId, list(uid, rating)), numRatings)
item_bias = item_bias.map(computeBeta)

list_item_bias = item_bias.collect()
dic_item_bias = dict(list_item_bias)

In [None]:
avg_item_bias = sum(list(zip(*list_item_bias))[1])/len(list_item_bias)
avg_user_bias = sum(list(zip(*listUserBias))[1])/len(listUserBias)
def predict_rating(user, movie): 
    userBias = dicUserBias.get(user,avg_user_bias)
    itemBias = dic_item_bias.get(movie,avg_item_bias)
    return global_avg + userBias + itemBias

#### Exercise 3.7

In [None]:
# def error_helper(t):
#     sumSquared = 0
#     for i in t[1]:
#         sumSquared += (i[0]-i[1])**2
#     return 0

# def error(dataset):
#     withPredict = dataset.map(lambda triplet : ( triplet[0], #shape : (uid, list (rating, predicted))
#                                                 (triplet[2],predict_rating(triplet[0],triplet[1])))).groupByKey()
# #     print(withPredict.take(5))
#     squaredDiff = withPredict.map(error_helper)
#     return 0

# to_evaluate = training.map(lambda entry : (entry['userId'], 
#                                            entry['movieId'], 
#                                            entry['rating'])
#                                           )
# # to_evaluate.take(2)
# error(to_evaluate)
def toDeltaRating(predict):
    def fun(t):
        uid = t["userId"]
        movId = t["movieId"]
        rating = t["rating"]
        sqrdDiff = (predict(uid, movId) - rating)**2  
        return (uid,sqrdDiff)
    return fun
    

def error(dataset, predict): #dataset = RDD of (uid, movId, rating)
    uid_numRatings_ds = makeItTuple(dataset, 'userId', 1).reduceByKey(add)
    ratingSquaredDiff = dataset.map(toDeltaRating(predict)) #shape (uid, difference) for specifics uid, movId
    ratingDiffSum = ratingSquaredDiff.reduceByKey(add) #for each uid, sum of squared differences
    perUserError = ratingDiffSum.join(uid_numRatings_ds) #shape (uid, (sumOfSquaredDiff, num))
    perUserError = perUserError.map(lambda x :(x[1][0]/x[1][1])**0.5)
    return perUserError.reduce(add)/len(listUserBias)

In [None]:
print("The error of the set is", error(validation,predict_rating))

#### Exercise 3.8

In [None]:
# Build model using ALS
from pyspark.mllib.recommendation import ALS,MatrixFactorizationModel, Rating
rank = 10
numIterations = 10

trainAls = training.map(lambda x: Rating(int(x["userId"]),int(x["movieId"]), float(x["rating"])))
valdationAls = validation.map(lambda x: Rating(int(x["userId"]),int(x["movieId"]), float(x["rating"])))

trainAls.cache()
valdationAls.cache()
model = rec.ALS.train(trainAls, rank, numIterations, 0.01)
print(model.predictAll())
def predictAls(user, movie):
    return model.predict(user, movie)
print(error(validation,predictAls))