In [None]:
import shlex
import itertools
from math import sqrt
from operator import add

from pyspark.mllib.recommendation import ALS


CLIMB_FIELDS = ['id', 'name', 'rating', 'type', 'location', 'url',
                'location_url']
PARENT_DIR = "../data/climb_data/{}"
MY_RATINGS = PARENT_DIR.format("my_ratings.csv")
REVIEWS = PARENT_DIR.format("reviews.csv")
CLIMBS = PARENT_DIR.format("climbs.csv")

def parse_my_ratings(line):
    """                                                                                                                                                                                                                                                                    
    climb_id, user_id, rating                                                                                                                                                                                                                                              
    """
    vals = line.split(",")
    return (int(vals[1]), int(vals[0]), int(vals[2]))

def parse_ratings(line):
    vals = line.split(",")
    return (int(vals[0]), (int(vals[2]), int(vals[1]), int(vals[3])))

def parse_climb(line):
    vals = line.split(",")
    climbs_dict = dict(zip(CLIMB_FIELDS[1:], vals[1:]))
    climbs_dict['id'] = line.split(",")[0]
    return climbs_dict


In [None]:
my_ratings = sc.textFile(MY_RATINGS).map(parse_my_ratings)
ratings = sc.textFile(REVIEWS).map(parse_ratings)
climbs = sc.textFile(CLIMBS).map(parse_climb)
max_ratings = ratings.count()
my_ratings.take(5)
climbs_dict = {int(climb['id']): climb for climb in climbs.collect()}

In [None]:
def computeRmse(model, data, n):
    """
    Compute RMSE (Root Mean Squared Error).
    """
    ds = data.map(lambda x: (x[0], x[1]))
    predictions = model.predictAll(ds)
    predictionsAndRatings = predictions.map(lambda x: ((x[0], x[1]), x[2])) \
      .join(data.map(lambda x: ((x[0], x[1]), x[2]))) \
      .values()
    return sqrt(predictionsAndRatings.map(lambda x: (x[0] - x[1]) ** 2).reduce(add) / float(n))

In [None]:
numPartitions = 6

In [None]:
training = ratings.values().union(my_ratings).repartition(numPartitions).cache()


In [None]:
validation = ratings.filter(lambda x: x[0] >= 6 and x[0] < 8).values().repartition(numPartitions).cache()
print "VALIDATION: {}".format(validation.take(5))
test = ratings.filter(lambda x: x[0] >  8).values().repartition(numPartitions).cache()
print "TEST: {}".format(test.take(5))
numValidation = validation.count()

In [None]:
ranks = [8, 12]
lambdas = [0.1, 10.0]
numIters = [10, 20]

bestModel = None
bestValidationRmse = float("inf")
bestRank = 0
bestLambda = -1.0
bestNumIter = -1

for rank, lmbda, numIter in itertools.product(ranks, lambdas, numIters):
   model = ALS.train(training, rank, numIter, lmbda)
   validationRmse = computeRmse(model, validation, numValidation)
   print ("RMSE (validation) = {} for the model trained with ".format(validationRmse),
       "rank = {}, lambda = {}, and numIter = {}.".format(rank, lmbda, numIter))
   if (validationRmse < bestValidationRmse):
       bestModel = model
       bestValidationRmse = validationRmse
       bestRank = rank
       bestLambda = lmbda
       bestNumIter = numIter


In [None]:
my_rated = [rating[2] for rating in my_ratings.collect()]
candidate_climbs = sc.parallelize([climb for climb in climbs_dict.keys()])
candidate_climbs.collect()

In [None]:
predictions = bestModel.predictAll(candidate_climbs.map(lambda row: (0, row))).collect()
recommendations = sorted(predictions, key=lambda x: x[2], reverse=True)[:50]

In [None]:
print recommendations[:5]


In [None]:
for index, recommendation in enumerate(recommendations):
    climb = climbs_dict[recommendation[1]]
    print "{}: {} {} - {}".format(index, climb['id'], climb['name'], recommendation[2])
    