## Collaborative Filtering with Spark

In [132]:
import pandas as pd
import numpy as np
from math import sqrt

The Book Crossing data set is widely used for collaborative filtering, and consists of users' rating of around 100k users' ratings of around a million books, identified by their ISBN.

In [97]:
ratings = pd.read_csv("data/ratings.csv")
ratings.head()

Unnamed: 0,User-ID,ISBN,Book-Rating
0,276725,034545104X,0
1,276726,0155061224,5
2,276727,0446520802,0
3,276729,052165615X,3
4,276729,0521795028,6


We'll want to clean this up a little. In particular, I like to turn categoricals into values between 0 and N, where N is the number of distinct values for the feature.

In [98]:
# map users and isbns into reasonable values

def map_to_N(feature, new_name):

    users = ratings[feature].unique()
    n_users = len(users)
    userdict = dict(zip(users, range(n_users)))
    ratings[new_name] = ratings[feature].map(lambda x: userdict[x])
    
map_to_N("User-ID", "User_idx")
map_to_N("ISBN", "ISBN_idx")

# and drop old columns
ratings = ratings[['User_idx', 'ISBN_idx', 'Book-Rating']]

unknown = ratings[ratings['Book-Rating'] == 0]
known = ratings[ratings['Book-Rating'] != 0]

You'll want to make sure you import SparkContext and SparkConf before going any further.

In [418]:
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating

In [427]:
traindata = sc.textFile("train_ratings")
train_ratings = traindata.map(lambda l: l.split(',')).map(lambda m: Rating(int(m[0]), int(m[1]), int(m[2])))

Once loaded, it's simple to use MLLib's built in ALS to train a model. A few parameters you can configure are:
* numBlocks, how many computations you want to run in parallel
* rank, the number of latent factors, and 
* iterations.

In [428]:
rank = 5; iterations = 10
model = ALS.train(train_ratings, rank=rank, iterations=iterations)

Finally, let's evaluate our model on a portion of our data to see how accurate our predictions were. If all goes well, we can fill in our "unknown" matrix with predictions from this model.

In [429]:
test_data = sc.textFile("test_ratings").map(lambda l: l.split(','))
all_fields = test_data.map(lambda p: ((int(p[0]), int(p[1])), int(p[2])))
user_and_product = test_data.map(lambda p: (p[0], p[1]))
predictions = model.predictAll(user_and_product).map(lambda r: ((r[0], r[1]), r[2]))

In [430]:
ratesAndPreds = all_fields.join(predictions)
diffs = ratesAndPreds.map(lambda r: (r[1][1] - r[1][0])**2)

In [431]:
from operator import add
mse = diffs.reduce(add) / diffs.count()
print "RMSE: ", sqrt(mse)

RMSE:  0.797772454855
