# PySpark构建推荐系统

Sandy Yu @zbyu81@hotmail.com 

以MovieLens数据集为例，学习如何使用协同过滤，借助于PySpark的Alternating Least Saqures算法 完成一个推荐系统。
下面的代码主要参考一下文档：
https://spark.apache.org/docs/2.3.0/mllib-collaborative-filtering.html
https://www.codementor.io/jadianes/building-a-recommender-with-apache-spark-python-example-app-part1-du1083qbw

## 数据的获取和解析RDD


In [3]:
import os

datasets_path = os.path.join('/Users/Sandy/ML/MovieLens', 'Latest')
small_ratings_file = os.path.join(datasets_path, 'ml-latest-small', 'ratings.csv')


In [4]:
from pyspark import SparkContext, SparkConf
#conf = SparkConf().setAppName("RecommendSystem")
#sc = SparkContext(conf)
sc =SparkContext.getOrCreate()
small_ratings_raw_data = sc.textFile(small_ratings_file)
small_ratings_raw_data_header = small_ratings_raw_data.take(1)[0]

In [7]:
small_ratings_raw_data.take(3)


[u'userId,movieId,rating,timestamp',
 u'1,31,2.5,1260759144',
 u'1,1029,3.0,1260759179']

In [10]:
small_ratings_data = small_ratings_raw_data.filter(lambda line : line!=small_ratings_raw_data_header)\
.map(lambda line: line.split(',')).map(lambda tokens: (tokens[0], tokens[1], tokens[2])).cache()

In [11]:
small_ratings_data.take(3)

[(u'1', u'31', u'2.5'), (u'1', u'1029', u'3.0'), (u'1', u'1061', u'3.0')]

In [12]:
small_movies_file = os.path.join(datasets_path, 'ml-latest-small', 'movies.csv')
small_movies_raw_data = sc.textFile(small_movies_file)
small_movies_raw_data_header = small_movies_raw_data.take(1)[0]
print(small_movies_raw_data_header)
small_movies_data = small_movies_raw_data.filter(lambda line: line!=small_movies_raw_data_header)\
.map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1])).cache()
    
small_movies_data.take(3)

movieId,title,genres


[(u'1', u'Toy Story (1995)'),
 (u'2', u'Jumanji (1995)'),
 (u'3', u'Grumpier Old Men (1995)')]

In [19]:
training_RDD, validation_RDD, test_RDD = small_ratings_data.randomSplit([6, 2, 2], seed=0L)
validation_for_predict_RDD = validation_RDD.map(lambda x: (x[0], x[1]))
#print 'validation for predict rdd: '

test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))
#print 'test for prediction rdd: '
#test_for_predict_RDD.take(3)

## 协同过滤

In [20]:
from pyspark.mllib.recommendation import ALS
import math

seed = 5L
iterations = 10
regularization_parameter = 0.1
ranks = [4, 8, 12]
errors = [0, 0, 0]
err = 0
tolerance = 0.02

min_error = float('inf')
best_rank = -1
best_iteration = -1
for rank in ranks:
    model = ALS.train(training_RDD, rank, seed=seed, iterations=iterations,
                      lambda_=regularization_parameter)
    predictions = model.predictAll(validation_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
    rates_and_preds = validation_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
    error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    errors[err] = error
    err += 1
    print 'For rank %s the RMSE is %s' % (rank, error)
    if error < min_error:
        min_error = error
        best_rank = rank

print 'The best model was trained with rank %s' % best_rank

For rank 4 the RMSE is 0.952401795348
For rank 8 the RMSE is 0.959968407203
For rank 12 the RMSE is 0.953274504895
The best model was trained with rank 4


In [21]:
predictions.take(3)

[((452, 1084), 3.0789427124972235),
 ((472, 1084), 3.469723768504703),
 ((529, 1084), 3.6833522903175013)]

In [22]:
rates_and_preds.take(3)

[((368, 2664), (4.0, 3.9869699179369316)),
 ((153, 3825), (3.0, 2.406334801407196)),
 ((148, 1208), (5.0, 4.332034819234105))]

In [24]:
# Load the complete dataset file
complete_ratings_file = os.path.join(datasets_path, 'ml-latest', 'ratings.csv')
complete_ratings_raw_data = sc.textFile(complete_ratings_file)
complete_ratings_raw_data_header = complete_ratings_raw_data.take(1)[0]

# Parse
complete_ratings_data = complete_ratings_raw_data.filter(lambda line: line!=complete_ratings_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),int(tokens[1]),float(tokens[2]))).cache()
    
print "There are %s recommendations in the complete dataset" % (complete_ratings_data.count())

There are 24404096 recommendations in the complete dataset


In [26]:
training_RDD, test_RDD = complete_ratings_data.randomSplit([7, 3], seed=0L)

complete_model = ALS.train(training_RDD, best_rank, seed=seed, 
                           iterations=iterations, lambda_=regularization_parameter)

In [27]:
test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))
predictions = complete_model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_predicts = test_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    
print 'For testing data the RMSE is %s' % (error)

For testing data the RMSE is 0.943727800028


## 进行预测

In [28]:
complete_movies_file = os.path.join(datasets_path, 'ml-latest', 'movies.csv')
complete_movies_raw_data = sc.textFile(complete_movies_file)
complete_movies_raw_data_header = complete_movies_raw_data.take(1)[0]

# Parse
complete_movies_data = complete_movies_raw_data.filter(lambda line: line!=complete_movies_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),tokens[1],tokens[2])).cache()

complete_movies_titles = complete_movies_data.map(lambda x: (int(x[0]),x[1]))
    
print "There are %s movies in the complete dataset" % (complete_movies_titles.count())

There are 40110 movies in the complete dataset


In [32]:
def get_counts_and_averages(ID_and_ratings_tuple):
    nratings = len(ID_and_ratings_tuple[1])
    return ID_and_ratings_tuple[0], (nratings, float(sum(x for x in ID_and_ratings_tuple[1]))/nratings)

movie_ID_with_ratings_RDD = (complete_ratings_data.map(lambda x: (x[1], x[2])).groupByKey())
movie_ID_with_avg_ratings_RDD = movie_ID_with_ratings_RDD.map(get_counts_and_averages)
movie_rating_counts_RDD = movie_ID_with_avg_ratings_RDD.map(lambda x: (x[0], x[1][0]))

#movie_rating_counts_RDD.take(3)

In [33]:
new_user_ID = 0

# 按照(userID, movieID, rating)的格式来
new_user_ratings = [
     (0,260,9), # Star Wars (1977)
     (0,1,8), # Toy Story (1995)
     (0,16,7), # Casino (1995)
     (0,25,8), # Leaving Las Vegas (1995)
     (0,32,9), # Twelve Monkeys (a.k.a. 12 Monkeys) (1995)
     (0,335,4), # Flintstones, The (1994)
     (0,379,3), # Timecop (1994)
     (0,296,7), # Pulp Fiction (1994)
     (0,858,10) , # Godfather, The (1972)
     (0,50,8) # Usual Suspects, The (1995)
    ]
new_user_ratings_RDD = sc.parallelize(new_user_ratings)
print 'New user ratings: %s' % new_user_ratings_RDD.take(10)

New user ratings: [(0, 260, 9), (0, 1, 8), (0, 16, 7), (0, 25, 8), (0, 32, 9), (0, 335, 4), (0, 379, 3), (0, 296, 7), (0, 858, 10), (0, 50, 8)]


In [34]:
complete_data_with_new_ratings_RDD = complete_ratings_data.union(new_user_ratings_RDD)
#complete_data_with_new_ratings_RDD.take(3)

[(1, 122, 2.0), (1, 172, 1.0), (1, 1221, 5.0)]

In [35]:
from time import time

t0 = time()
new_ratings_model = ALS.train(complete_data_with_new_ratings_RDD, best_rank, seed=seed, 
                              iterations=iterations, lambda_=regularization_parameter)
tt = time() - t0

print "New model trained in %s seconds" % round(tt,3)

New model trained in 118.726 seconds


In [36]:
new_user_ratings_ids = map(lambda x: x[1], new_user_ratings) # get just movie IDs
# keep just those not on the ID list
new_user_unrated_movies_RDD = (complete_movies_data.filter(lambda x: x[0] not in new_user_ratings_ids).map(lambda x: (new_user_ID, x[0])))

# Use the input RDD, new_user_unrated_movies_RDD, with new_ratings_model.predictAll() to predict new ratings for the movies
new_user_recommendations_RDD = new_ratings_model.predictAll(new_user_unrated_movies_RDD)

In [37]:
# Transform new_user_recommendations_RDD into pairs of the form (Movie ID, Predicted Rating)
new_user_recommendations_rating_RDD = new_user_recommendations_RDD.map(lambda x: (x.product, x.rating))
new_user_recommendations_rating_title_and_count_RDD = \
    new_user_recommendations_rating_RDD.join(complete_movies_titles).join(movie_rating_counts_RDD)
new_user_recommendations_rating_title_and_count_RDD.take(3)

[(52224, ((2.1131440766906975, u'Turn of Faith (2002)'), 2)),
 (8194, ((7.129448167987935, u'Baby Doll (1956)'), 93)),
 (130730, ((6.255394770805573, u'Lemon Popsicle (1978)'), 3))]

In [38]:
new_user_recommendations_rating_title_and_count_RDD = \
    new_user_recommendations_rating_title_and_count_RDD.map(lambda r: (r[1][0][1], r[1][0][0], r[1][1]))

In [39]:
top_movies = new_user_recommendations_rating_title_and_count_RDD.filter(lambda r: r[2]>=25).takeOrdered(25, key=lambda x: -x[1])

print ('TOP recommended movies (with more than 25 reviews):\n%s' %
        '\n'.join(map(str, top_movies)))

TOP recommended movies (with more than 25 reviews):
(u'Connections (1978)', 9.007338499758863, 47)
(u'Duck Amuck (1953)', 8.910165791740123, 120)
(u'"Lonely Wife', 8.887354943453646, 25)
(u'Planet Earth (2006)', 8.877455399163544, 193)
(u'Mei and the Kittenbus (2002)', 8.827787318675227, 25)
(u'Death on the Staircase (Soup\xe7ons) (2004)', 8.793820485024217, 63)
(u'The War (2007)', 8.775937177844039, 34)
(u'"Human Condition III', 8.752423508668906, 73)
(u'"I', 8.704252948764076, 45)
(u'Alone in the Wilderness (2004)', 8.703487273569356, 295)
(u'Harakiri (Seppuku) (1962)', 8.69651210651247, 542)
(u'"Unvanquished', 8.674580277251934, 368)
(u'"Civil War', 8.655541784137927, 380)
(u'Only Old Men Are Going to Battle (V boy idut odni stariki) (1973)', 8.654755420509932, 30)
(u'Ikiru (1952)', 8.636333623045992, 1340)
(u'Dylan Moran Live: What It Is (2009)', 8.632939945464802, 59)
(u'Powers of Ten (1977)', 8.630372032965507, 46)
(u'Song of the Little Road (Pather Panchali) (1955)', 8.625707420

### 计算某一部电影的打分

In [40]:
my_movie = sc.parallelize([(0, 500)]) # Quiz Show (1994)
individual_movie_rating_RDD = new_ratings_model.predictAll(new_user_unrated_movies_RDD)
individual_movie_rating_RDD.take(1)

[Rating(user=0, product=116688, rating=2.1416612140111138)]

### 模型的持久化

In [41]:
from pyspark.mllib.recommendation import MatrixFactorizationModel

model_path = os.path.join('..', 'models', 'movie_lens_als')

# Save and load model
model.save(sc, model_path)
same_model = MatrixFactorizationModel.load(sc, model_path)