In [1]:
sc

<pyspark.context.SparkContext at 0xa5587b8>

In [2]:
numPartitions = 2
import math
from pyspark.mllib.recommendation import ALS

def get_ratings_tuple(entry):
    items = entry.split('::')
    return int(items[0]), float(items[1]), float(items[2])
    
def get_movie_tuple(entry):
    items = entry.split('::')
    return int(items[0]),items[1]
    
    
def getCountsAndAverages(RatingsTuple):
    total = 0.0
    for rating in RatingsTuple[1]:
        total += rating
        
    return ( RatingsTuple[0], (len(RatingsTuple[1]), total/len(RatingsTuple[1])))
    
    
def sortFunction(tuple):
    key = unicode('%.3f' % tuple[0])
    value = tuple[1]
    return (key + ' ' + value)


#calculate RMSE

def computeRMSE(predictedRDD, actualRDD):
    predictedReformattedRDD = (predictedRDD.map(lambda (UserID, MovieId, Rating): ((UserID, MovieId),Rating)))
    
    actualReformattedRDD = (actualRDD.map(lambda (UserID, MovieId, Rating): ((UserID, MovieId),Rating)))
    
    squaredErrorsRDD = (predictedReformattedRDD.join(actualReformattedRDD).map(lambda(k,(a,b)): math.pow((a-b),2)))
    
    totalErrors = squaredErrorsRDD.reduce(lambda a,b: a+b)
    numRatings = squaredErrorsRDD.count()
    
    return math.sqrt(float(totalErrors)/numRatings)
    
    
def apk(actual, predicted, k=10):

    if len(predicted)>k:
        predicted = predicted[:k]

    score = 0.0
    num_hits = 0.0

    for i,p in enumerate(predicted):
        if p in actual and p not in predicted[:i]:
            num_hits += 1.0
            score += num_hits / (i+1.0)

    if not actual:
        return 0.0

    return score / min(len(actual), k) 


rawRatings = sc.textFile('d:/spark/data/mllib/als/sample_movielens_ratings.txt').repartition(numPartitions)
rawMovies = sc.textFile('d:/spark/data/mllib/als/sample_movielens_movies.txt')
    
ratingsRDD = rawRatings.map(get_ratings_tuple).cache()
moviesRDD = rawMovies.map(get_movie_tuple).cache()
ratingsRDD.first()

(0, 2.0, 3.0)

In [3]:

movieIDsWithRatingsRDD = (ratingsRDD.map(lambda (userId, movieId, rating): (movieId, [rating])).reduceByKey(lambda a,b: a+b))
movieIDsWithRatingsRDD.take(10)
movieNameWithAvgRatingsRDD = movieIDsWithRatingsRDD.map(getCountsAndAverages)
movieNameWithAvgRatingsRDD.take(10)
movieNameWithAvgRatingsRDD = ( moviesRDD .join(movieNameWithAvgRatingsRDD) .map(lambda ( movieid,(name,(ratings, average)) ): (average, name, ratings)) )

movieNameWithAvgRatingsRDD.take(10)
#this is basic recommendations
movieLimitedAndSortedByRatingRDD = ( movieNameWithAvgRatingsRDD.filter( lambda(average,name,ratings): ratings > 2).sortBy(sortFunction, ascending=False))

movieLimitedAndSortedByRatingRDD.take(10)

[(2.9166666666666665, u'Movie 32', 12),
 (2.8125, u'Movie 90', 16),
 (2.5, u'Movie 30', 14),
 (2.473684210526316, u'Movie 94', 19),
 (2.466666666666667, u'Movie 23', 15),
 (2.4375, u'Movie 49', 16),
 (2.4, u'Movie 29', 20),
 (2.4, u'Movie 18', 15),
 (2.357142857142857, u'Movie 52', 14),
 (2.25, u'Movie 62', 16)]

In [4]:

#write these to a file

#this is for proper recommendations
trainingRDD, validationRDD, testRDD = ratingsRDD.randomSplit([6,2,2], seed=0L)
   
validationForPredictionRDD = validationRDD.map(lambda(userId, movieId, rating): (userId,movieId))

ranks = [4,8,12]
errors = [0,0,0]
err = 0
minError = float('inf')
bestRank = -1
bestIteration = -1

for rank in ranks:
    
    model = ALS.train(trainingRDD, rank, seed=5L, iterations =5, lambda_= 0.1)
    predictedRatingsRDD = model.predictAll(validationForPredictionRDD);
    error = computeRMSE(predictedRatingsRDD, validationRDD)
    errors[err] = error
    err += 1
    if error < minError:
        minError = error
        bestRank = rank

bestModel = ALS.train(trainingRDD, bestRank, seed=5L, iterations=5, lambda_=0.1)

testForPredictingRDD = testRDD.map(lambda (userId, movieId, rating): (userId, movieId))
predictedTestRDD = bestModel.predictAll(testForPredictingRDD)

testRMSE = computeRMSE(testRDD, predictedTestRDD)
#generate new recommendations for all users

print testRMSE



1.20217585459


In [5]:
userRatings = ratingsRDD.map(lambda(userId, movieId, rating): (userId,movieId))
predictionsRDD = model.predictAll(userRatings).map(lambda(userId, movieId, rating): ((userId, movieId),rating))
ratingsAndPredictionsRDD =  ratingsRDD.map(lambda (userId, movieId, rating): ((userId, movieId),rating)).join(predictionsRDD)

ratingsAndPredictionsRDD.take(10)

[((7, 77.0), (1.0, 1.1726902436479714)),
 ((27, 83.0), (3.0, 2.493215180459293)),
 ((24, 4.0), (1.0, 0.5497256272989122)),
 ((22, 68.0), (4.0, 3.606855753929029)),
 ((25, 1.0), (3.0, 1.2840620428001523)),
 ((22, 6.0), (2.0, 1.943453814031724)),
 ((7, 55.0), (1.0, 1.1291704658016397)),
 ((20, 48.0), (1.0, 1.1715859679617528)),
 ((10, 8.0), (1.0, 0.9384447793190722)),
 ((6, 48.0), (1.0, 0.92150181587232))]

In [6]:

MeanSquaredErr = ratingsAndPredictionsRDD.map(lambda ((UserID,MovieId),(Actual,Predicted)):(Actual-Predicted) **2).sum() / ratingsAndPredictionsRDD.count()
RootMeanSquaredError = math.sqrt(MeanSquaredErr)
randomUserRDD =  userRatings.takeSample('true',1,1)
userID = randomUserRDD[0][0]
print userID
user = userRatings.keyBy(lambda rating: rating[0]).lookup(userID)
print user


pred = predictionsRDD.keyBy(lambda rating: rating[0]).lookup(userID)
print user


26
[(26, 1.0), (26, 3.0), (26, 5.0), (26, 7.0), (26, 14.0), (26, 18.0), (26, 21.0), (26, 23.0), (26, 27.0), (26, 35.0), (26, 40.0), (26, 45.0), (26, 48.0), (26, 50.0), (26, 54.0), (26, 57.0), (26, 61.0), (26, 66.0), (26, 71.0), (26, 76.0), (26, 85.0), (26, 88.0), (26, 94.0), (26, 96.0), (26, 0.0), (26, 2.0), (26, 4.0), (26, 6.0), (26, 13.0), (26, 16.0), (26, 20.0), (26, 22.0), (26, 24.0), (26, 31.0), (26, 36.0), (26, 44.0), (26, 47.0), (26, 49.0), (26, 52.0), (26, 55.0), (26, 58.0), (26, 62.0), (26, 68.0), (26, 73.0), (26, 81.0), (26, 86.0), (26, 91.0), (26, 95.0), (26, 97.0)]
[(26, 1.0), (26, 3.0), (26, 5.0), (26, 7.0), (26, 14.0), (26, 18.0), (26, 21.0), (26, 23.0), (26, 27.0), (26, 35.0), (26, 40.0), (26, 45.0), (26, 48.0), (26, 50.0), (26, 54.0), (26, 57.0), (26, 61.0), (26, 66.0), (26, 71.0), (26, 76.0), (26, 85.0), (26, 88.0), (26, 94.0), (26, 96.0), (26, 0.0), (26, 2.0), (26, 4.0), (26, 6.0), (26, 13.0), (26, 16.0), (26, 20.0), (26, 22.0), (26, 24.0), (26, 31.0), (26, 36.0), (26

In [7]:
actualMovies = [MU[1] for MU in user]

predictedMovies = [MU[1] for MU in pred]

apk10= apk(actualMovies,predictedMovies,10)
print apk10

#http://blog.5ibc.net/p/20398.html
#https://books.google.com.au/books?id=syPHBgAAQBAJ&pg=PA103&lpg=PA103&dq=recommendproducts+movielens&source=bl&ots=X9TEXS538v&sig=lCKSQTf7EIjsQVHlcE5pC2yNxLE&hl=en&sa=X&ved=0ahUKEwjmpfqRzLnJAhWCmZQKHYdtBBMQ6AEIIzAB#v=onepage&q=recommendproducts%20movielens&f=false    
#   I am here
#unratedMoviesByUserRDD = (moviesRDD.map(lambda (movieID, name): movieID).filter(lambda movieID: movieID not in [this[1] for this in myRatedMovies]).map(lambda movieID: (userID, movieID)))

#recommendationRDD = bestModel.predictAll(unratedMoviesByUserRDD)

# get only the most rated movies
#movieCountsRDD = movieIDsWithAvgRatingsRDD.map(lambda (movie_id, (ratings, average)): (movie_id, ratings)) 
#predictedRDD = predictedRatingsRDD.map(lambda (uid, movie_id, rating): (movie_id, rating)) 
#predictedWithCountsRDD= (predictedRDD.join(movieCountsRDD)) 
#ratingsWithNamesRDD = (predictedWithCountsRDD.join(moviesRDD).map(lambda (movie_id, ((pred, ratings), name)): (pred, name, ratings)).filter(lambda (pred, name, ratings): ratings > 75))

#predictedMovies = ratingsWithNamesRDD.takeOrdered(20, key=lambda X: -x[0])


0.0


In [8]:
import numpy as np
itemFactors = model.productFeatures().map(lambda (userId, factor): factor).collect()

itemMatrix = np.array(itemFactors)
imBroadcast = sc.broadcast(itemMatrix)

In [9]:
scoresForUserRDD = model.userFeatures().map(lambda (userId, array): (userId, np.dot(imBroadcast.value, array)))
allRecsRDD = scoresForUserRDD.map(lambda (userId, scores): 
                            (userId, sorted(zip(np.arange(1, scores.size), scores), key=lambda x: x[1], reverse=True))
                           ).map(lambda (userId, sortedScores): (userId, np.array(sortedScores, dtype=int)[:,0]))
print allRecsRDD.first()[0]
print allRecsRDD.first()[1]

0
[64 15 59 84 80 13 20 38 25 68 47 73 10 19  5 74 76 70 65 54 24 49 36 85 72
 75 55 69  8 46 43 62  7  6 32 71 61 37 27 66 53 77 60 91 50 88 94 99 41 67
 33 56 40 39 79 16 48  9 92 78 95 97  4 98 31 12 83 58 89 52 21 86 57 63  3
  2 28 44 96 93 81 29 30 11 14 35 18 51 90 42 26  1 23 82 87 22 34 45 17]


In [10]:
userMoviesRDD = ratingsRDD.map(lambda r: (r[0], r[1])).groupByKey()
print userMoviesRDD.first()[0]
print userMoviesRDD.first()[1].data

0
[2.0, 5.0, 11.0, 15.0, 19.0, 23.0, 27.0, 29.0, 31.0, 37.0, 44.0, 46.0, 48.0, 51.0, 55.0, 61.0, 67.0, 69.0, 72.0, 79.0, 87.0, 91.0, 94.0, 96.0, 99.0, 3.0, 9.0, 12.0, 17.0, 21.0, 26.0, 28.0, 30.0, 34.0, 41.0, 45.0, 47.0, 50.0, 54.0, 59.0, 64.0, 68.0, 71.0, 77.0, 83.0, 89.0, 92.0, 95.0, 98.0]


In [11]:
K = 10
MAPK = allRecsRDD.join(userMoviesRDD).map(lambda (userId, (predicted, actual)):
                                    apk(actual.data, predicted, K)
                                   ).sum() / allRecsRDD.count()
print "Mean Average Precision at K =", MAPK

Mean Average Precision at K = 0.310900793651


In [12]:
from pyspark.mllib.evaluation import RegressionMetrics
predictedAndTrueRDD = ratingsAndPredictionsRDD.map(lambda ((user, product), (predicted, actual)): (predicted, actual))
regressionMetricsRDD = RegressionMetrics(predictedAndTrueRDD)
print "Mean Squared Error =", regressionMetricsRDD.meanSquaredError
print "Root Mean Squared Error =", regressionMetricsRDD.rootMeanSquaredError

Mean Squared Error = 0.61808161731
Root Mean Squared Error = 0.786181669406


In [13]:
from pyspark.mllib.evaluation import RankingMetrics
predictedAndTrueForRankingRDD = allRecsRDD.join(userMoviesRDD).map(lambda (userId, (predicted, actual)):
                                                        (map(int, list(predicted)), actual.data))
rankingMetricsRDD = RankingMetrics(predictedAndTrueForRankingRDD)
print "Mean Average Precision =", rankingMetricsRDD.meanAveragePrecision

Mean Average Precision = 0.512155932392


In [14]:
K = 2000
MAPK2000 = allRecsRDD.join(userMoviesRDD).map(lambda (userId, (predicted, actual)):
                                    apk(actual.data, predicted, K)
                                   ).sum() / allRecsRDD.count()
print "Mean Average Precision at 2000 =", MAPK2000

Mean Average Precision at 2000 = 0.512155932392


In [15]:
allRecsRDD.take(100)

[(0, array([64, 15, 59, 84, 80, 13, 20, 38, 25, 68, 47, 73, 10, 19,  5, 74, 76,
         70, 65, 54, 24, 49, 36, 85, 72, 75, 55, 69,  8, 46, 43, 62,  7,  6,
         32, 71, 61, 37, 27, 66, 53, 77, 60, 91, 50, 88, 94, 99, 41, 67, 33,
         56, 40, 39, 79, 16, 48,  9, 92, 78, 95, 97,  4, 98, 31, 12, 83, 58,
         89, 52, 21, 86, 57, 63,  3,  2, 28, 44, 96, 93, 81, 29, 30, 11, 14,
         35, 18, 51, 90, 42, 26,  1, 23, 82, 87, 22, 34, 45, 17])),
 (8, array([68, 59, 62, 71, 84, 34, 76, 27, 85, 69, 48, 45, 78, 60, 92, 36, 17,
         94, 10,  5, 13, 96,  6, 40, 75, 31, 80, 20, 25, 64, 47, 19, 51, 95,
         32, 97, 28, 89,  3, 73, 99, 82, 61, 50, 56, 55, 38, 90, 79, 26, 41,
         53, 21, 54, 12, 66, 24, 88, 72,  9, 15, 58, 33, 46, 57, 44,  7, 49,
         70, 39, 52,  2, 23, 91, 22, 42, 63,  8, 93, 29,  1, 30, 16,  4, 98,
         86, 18, 74, 77, 67, 65, 81, 37, 35, 83, 43, 14, 87, 11])),
 (16,
  array([83, 68, 79, 88, 59, 12, 36, 73, 84, 13, 49, 74, 39, 75, 47, 80, 46,
     