# 基于Spark和Flask构建的在线电影推荐系统

这个笔记解释了如何使用Spark的Alternating Least Saqures实现的协同过滤算法来利用MovieLens数据集来构建电影推荐系统。这个教程分为两部分。
第一部分是关于加载和解析电影和评分数据进入到Spark RDDs。第二部分是构建和使用这个推荐系统并且在线持续的使用它。

可以单独使用本教程构建基于MovieLens数据集的影片推荐模型。第一部分中关于如何将ALS与MovieLens数据集一起使用的大多数代码来自我对
CS100.1x Introduction to Big Data with Apache Spark by Anthony D. Joseph on edX 中提出的练习题之一的解决方案，该解决方案自2014年起在
Spark Summit上公开发布。从那时起，我添加了一些小修改已使用更大的数据集和编写有关如何存储和重新加载模型以供以后使用的代码。

## 得到和处理数据

为了用Spark构建一个在线电影推荐系统，我们需要尽可能地预处理我们的模型数据。每次需要完成新的推荐时解析数据集并构建模型并不是最好的策略。
我们能预计算的任务列表包括：
- 加载和解析数据集。持续产生的RDD供以后使用。
- 用完整的数据构建推荐模型。维持这个数据集供以后使用。

### 文件下载

GroupLens Research已从MovieLens网站收集并提供评分数据集。根据集合的大小，在不同的时间段收集数据集。数据集可以在这里找到。
在我们的案例中，我们会用这个最新的数据集：
- 小数据：706个用户对8570部电影评价了100000条评分和2488个标签。最后更新于4/2015。
- 大数据：230000个用户对27000部电影评价了21000000条评分和470000个标签。最后更新于4/2015。

In [3]:
complete_dataset_url = 'http://files.grouplens.org/datasets/movielens/ml-latest.zip'
small_dataset_url = 'http://files.grouplens.org/datasets/movielens/ml-latest-small.zip'

我们需要指定下载的路径

In [4]:
import os

datasets_path = os.path.join('E:/miniAI/RecommendationSystem', 'datasets')

complete_dataset_path = os.path.join(datasets_path, 'ml-latest.zip')
small_dataset_path = os.path.join(datasets_path, 'ml-latest-small.zip')

现在我们可以继续下载

In [5]:
import urllib.request

small_f = urllib.request.urlretrieve (small_dataset_url, small_dataset_path)
complete_f = urllib.request.urlretrieve (complete_dataset_url, complete_dataset_path)

KeyboardInterrupt: 

它们都是包含带有评分、电影等文件夹的zip文件。我们需要将它们提取到各个文件夹中，以便我们以后可以使用每个文件。

In [6]:
import zipfile

with zipfile.ZipFile(small_dataset_path, "r") as z:
    z.extractall(datasets_path)

with zipfile.ZipFile(complete_dataset_path, "r") as z:
    z.extractall(datasets_path)

### 加载和解析数据

现在我们准备读每一个文件和创建一个由已解析数据行组成的RDD。
- 每一行在评分数据集(ratings.csv)的格式为：
userId,movieId,rating,timestap
- 每一行在电影数据集(movies.csv)的格式为：
movieId,title,genres
- 电影风格的格式为：
Genre1 | Genre2 | Genre3...
- 标签文件的格式为：
userId,movieId,tag,timestamp
- 最后，links.csv文件的格式为：
movieId,imdbId, tmdbId
这些文件的格式统一且简单，因此可以用Python的split()解析每一行并将它们加载到RDDs。解析电影和评分文件会产生两个RDDs：
- 对于评分数据集中的每一行，我们创建了一个元组(UserID, MovieID, Rating). 我们丢弃了timestamp，因为对于这个推荐系统我们不需要它。
- 对于电影数据集中的每一行，我们创建了一个元组(MovieId, Title).我们丢弃了genres, 因为对于这个推荐系统我们不需要它。
因此，让我们加载原始的评分数据，

In [9]:
from pyspark import SparkContext, SparkConf
sc = SparkContext("local[*]", "Test")

ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=Test, master=local[*]) created by __init__ at <ipython-input-7-94924be6ec5a>:2 

In [10]:
small_ratings_file = os.path.join(datasets_path, 'ml-latest-small', 'ratings.csv')

small_ratings_raw_data = sc.textFile(small_ratings_file)
small_ratings_raw_data_header = small_ratings_raw_data.take(1)[0]

In [11]:
small_ratings_raw_data_header

'userId,movieId,rating,timestamp'

In [13]:
small_ratings_data = small_ratings_raw_data.filter(lambda line: line!=small_ratings_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1],tokens[2])).cache()

In [14]:
small_ratings_data.take(3)

[('1', '1', '4.0'), ('1', '3', '4.0'), ('1', '6', '4.0')]

In [15]:
small_movies_file = os.path.join(datasets_path, 'ml-latest-small', 'movies.csv')

small_movies_raw_data = sc.textFile(small_movies_file)
small_movies_raw_data_header = small_movies_raw_data.take(1)[0]

small_movies_data = small_movies_raw_data.filter(lambda line: line!=small_movies_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1])).cache()
    
small_movies_data.take(3)

[('1', 'Toy Story (1995)'),
 ('2', 'Jumanji (1995)'),
 ('3', 'Grumpier Old Men (1995)')]

## 协同过滤(Collaborative Filtering)

### 用小数据集选择ALS的参数

In [16]:
training_RDD, validation_RDD, test_RDD = small_ratings_data.randomSplit([0.6, 0.2, 0.2], seed=1) 
validation_for_predict_RDD = validation_RDD.map(lambda x: (x[0], x[1]))
test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))

In [26]:
test_RDD.take(3)

[('1', '3', '4.0'), ('1', '110', '4.0'), ('1', '235', '4.0')]

In [25]:
validation_for_predict_RDD.take(3)

[('1', '47'), ('1', '50'), ('1', '70')]

In [27]:
from pyspark.mllib.recommendation import ALS
import math

seed = 5
iterations = 10
regularization_parameter = 0.1
ranks = [4, 8, 12]
errors = [0, 0, 0]
err = 0
tolerance = 0.02

min_error = float('inf')
best_rank = -1
best_iteration = -1
for rank in ranks:
    model = ALS.train(training_RDD, rank, seed=seed, iterations=iterations,
                      lambda_=regularization_parameter)
    predictions = model.predictAll(validation_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
    rates_and_preds = validation_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
    error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    errors[err] = error
    err += 1
    print('For rank %s the RMSE is %s' % (rank, error))
    if error < min_error:
        min_error = error
        best_rank = rank

print('The best model was trained with rank %s' % best_rank)

For rank 4 the RMSE is 0.8964156630609819
For rank 8 the RMSE is 0.9098270422994309
For rank 12 the RMSE is 0.9075958846344253
The best model was trained with rank 4


In [28]:
predictions.take(3)

[((610, 81132), 2.72976532735795),
 ((232, 6156), 3.150492378777464),
 ((89, 6156), 2.3853224244052043)]

In [29]:
rates_and_preds.take(3)

[((1, 2115), (5.0, 4.173134484547468)),
 ((1, 2427), (5.0, 3.7931955075834956)),
 ((4, 52), (3.0, 2.217282304095961))]

In [31]:
model = ALS.train(training_RDD, best_rank, seed=seed, iterations=iterations,
                      lambda_=regularization_parameter)
predictions = model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds = test_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    
print ('For testing data the RMSE is %s' % (error))

For testing data the RMSE is 0.9000974048561499


### 用完整的数据集构建最终的模型（Using the complete dataset to build the final model）

In [33]:
import os

# Load the complete dataset file
complete_ratings_file = os.path.join(datasets_path, 'ml-latest-small', 'ratings.csv')
complete_ratings_raw_data = sc.textFile(complete_ratings_file)
complete_ratings_raw_data_header = complete_ratings_raw_data.take(1)[0]

# Parse
complete_ratings_data = complete_ratings_raw_data.filter(lambda line: line!=complete_ratings_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),int(tokens[1]),float(tokens[2]))).cache()
    
print ("There are %s recommendations in the complete dataset" % (complete_ratings_data.count()))

There are 100836 recommendations in the complete dataset


In [34]:
complete_ratings_raw_data_header

'userId,movieId,rating,timestamp'

In [35]:
training_RDD, test_RDD = complete_ratings_data.randomSplit([7, 3], seed=0)

complete_model = ALS.train(training_RDD, best_rank, seed=seed, 
                           iterations=iterations, lambda_=regularization_parameter)

In [36]:
test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))

predictions = complete_model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds = test_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    
print ('For testing data the RMSE is %s' % (error))

For testing data the RMSE is 0.8921362857972317


## 如何使用训练好的模型做推荐（How to make recommendations）

In [37]:
complete_movies_file = os.path.join(datasets_path, 'ml-latest-small', 'movies.csv')
complete_movies_raw_data = sc.textFile(complete_movies_file)
complete_movies_raw_data_header = complete_movies_raw_data.take(1)[0]

# Parse
complete_movies_data = complete_movies_raw_data.filter(lambda line: line!=complete_movies_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),tokens[1],tokens[2])).cache()

complete_movies_titles = complete_movies_data.map(lambda x: (int(x[0]),x[1]))
    
print ("There are %s movies in the complete dataset" % (complete_movies_titles.count()))

There are 9742 movies in the complete dataset


In [39]:
def get_counts_and_averages(ID_and_ratings_tuple):
    nratings = len(ID_and_ratings_tuple[1])
    return ID_and_ratings_tuple[0], (nratings, float(sum(x for x in ID_and_ratings_tuple[1]))/nratings)

movie_ID_with_ratings_RDD = (complete_ratings_data.map(lambda x: (x[1], x[2])).groupByKey())
movie_ID_with_avg_ratings_RDD = movie_ID_with_ratings_RDD.map(get_counts_and_averages)
movie_rating_counts_RDD = movie_ID_with_avg_ratings_RDD.map(lambda x: (x[0], x[1][0]))

In [40]:
movie_ID_with_avg_ratings_RDD.take(4)

[(6, (102, 3.946078431372549)),
 (50, (204, 4.237745098039215)),
 (70, (55, 3.5090909090909093)),
 (110, (237, 4.031645569620253))]

In [41]:
movie_rating_counts_RDD.take(4)

[(6, 102), (50, 204), (70, 55), (110, 237)]

### 添加一个新的用户评分（Adding new user ratings）

In [43]:
new_user_ID = 0

# The format of each line is (userID, movieID, rating)
new_user_ratings = [
     (0,260,9), # Star Wars (1977)
     (0,1,8), # Toy Story (1995)
     (0,16,7), # Casino (1995)
     (0,25,8), # Leaving Las Vegas (1995)
     (0,32,9), # Twelve Monkeys (a.k.a. 12 Monkeys) (1995)
     (0,335,4), # Flintstones, The (1994)
     (0,379,3), # Timecop (1994)
     (0,296,7), # Pulp Fiction (1994)
     (0,858,10) , # Godfather, The (1972)
     (0,50,8) # Usual Suspects, The (1995)
    ]
new_user_ratings_RDD = sc.parallelize(new_user_ratings)
print ('New user ratings: %s' % new_user_ratings_RDD.take(10))

New user ratings: [(0, 260, 9), (0, 1, 8), (0, 16, 7), (0, 25, 8), (0, 32, 9), (0, 335, 4), (0, 379, 3), (0, 296, 7), (0, 858, 10), (0, 50, 8)]


In [44]:
complete_data_with_new_ratings_RDD = complete_ratings_data.union(new_user_ratings_RDD)

In [45]:
from time import time

t0 = time()
new_ratings_model = ALS.train(complete_data_with_new_ratings_RDD, best_rank, seed=seed, 
                              iterations=iterations, lambda_=regularization_parameter)
tt = time() - t0

print ("New model trained in %s seconds" % round(tt,3))

New model trained in 12.11 seconds


### 得到靠前的推荐（Getting top recommendations）

In [46]:
new_user_ratings_ids = map(lambda x: x[1], new_user_ratings) # get just movie IDs
# keep just those not on the ID list (thanks Lei Li for spotting the error!)
new_user_unrated_movies_RDD = (complete_movies_data.filter(lambda x: x[0] not in new_user_ratings_ids).map(lambda x: (new_user_ID, x[0])))

# Use the input RDD, new_user_unrated_movies_RDD, with new_ratings_model.predictAll() to predict new ratings for the movies
new_user_recommendations_RDD = new_ratings_model.predictAll(new_user_unrated_movies_RDD)

In [50]:
new_user_unrated_movies_RDD.take(5)

[(0, 2), (0, 3), (0, 4), (0, 5), (0, 6)]

In [51]:
new_user_recommendations_RDD.take(20)

[Rating(user=0, product=81132, rating=6.311039272860391),
 Rating(user=0, product=69720, rating=4.133021050011033),
 Rating(user=0, product=155820, rating=2.3634374762933117),
 Rating(user=0, product=6156, rating=4.6498700932524875),
 Rating(user=0, product=4476, rating=2.9705558141689856),
 Rating(user=0, product=912, rating=8.380393837239826),
 Rating(user=0, product=60408, rating=6.163541182770622),
 Rating(user=0, product=90888, rating=3.780290154026556),
 Rating(user=0, product=204, rating=3.3134313960395465),
 Rating(user=0, product=100044, rating=6.562865946476473),
 Rating(user=0, product=5460, rating=7.320578604722689),
 Rating(user=0, product=132660, rating=7.057402238937204),
 Rating(user=0, product=4992, rating=5.533516991772537),
 Rating(user=0, product=1128, rating=4.570383373820928),
 Rating(user=0, product=2892, rating=1.65858799649514),
 Rating(user=0, product=118248, rating=3.020314027206256),
 Rating(user=0, product=4260, rating=2.7107380930531058),
 Rating(user=0, p

In [54]:
complete_movies_titles.take(4)

[(1, 'Toy Story (1995)'),
 (2, 'Jumanji (1995)'),
 (3, 'Grumpier Old Men (1995)'),
 (4, 'Waiting to Exhale (1995)')]

In [55]:
movie_rating_counts_RDD.take(4)

[(6, 102), (50, 204), (70, 55), (110, 237)]

In [57]:
# Transform new_user_recommendations_RDD into pairs of the form (Movie ID, Predicted Rating)
new_user_recommendations_rating_RDD = new_user_recommendations_RDD.map(lambda x: (x.product, x.rating))
new_user_recommendations_rating_title_and_count_RDD = \
    new_user_recommendations_rating_RDD.join(complete_movies_titles).join(movie_rating_counts_RDD)
new_user_recommendations_rating_title_and_count_RDD.take(3)

[(143472, ((3.020314027206256, 'Into the Grizzly Maze (2015)'), 1)),
 (1344, ((7.910686493189808, 'Cape Fear (1962)'), 14)),
 (50064, ((6.040628054412512, '"Good German'), 1))]

In [58]:
new_user_recommendations_rating_title_and_count_RDD = \
    new_user_recommendations_rating_title_and_count_RDD.map(lambda r: (r[1][0][1], r[1][0][0], r[1][1]))

In [59]:
new_user_recommendations_rating_title_and_count_RDD.take(5)

[('Into the Grizzly Maze (2015)', 3.020314027206256, 1),
 ('Cape Fear (1962)', 7.910686493189808, 14),
 ('"Good German', 6.040628054412512, 1),
 ("Hotel Chevalier (Part 1 of 'The Darjeeling Limited') (2007)",
  7.894407061871672,
  1),
 ('Night of the Creeps (1986)', 5.1913507170757835, 3)]

In [29]:
top_movies = new_user_recommendations_rating_title_and_count_RDD.filter(lambda r: r[2]>=25).takeOrdered(25, key=lambda x: -x[1])

print ('TOP recommended movies (with more than 25 reviews):\n%s' %
        '\n'.join(map(str, top_movies)))

TOP recommended movies (with more than 25 reviews):
('"Godfather', 8.867909122444766, 192)
('"Philadelphia Story', 8.860713893056019, 29)
('12 Angry Men (1957)', 8.806855256537386, 57)
('Dr. Strangelove or: How I Learned to Stop Worrying and Love the Bomb (1964)', 8.7249809220612, 97)
('"Godfather: Part II', 8.715258273624283, 129)
('Seven Samurai (Shichinin no samurai) (1954)', 8.683443357883535, 48)
('Apocalypse Now (1979)', 8.640010215066827, 107)
('Lawrence of Arabia (1962)', 8.625612014484725, 45)
('Wallace & Gromit: The Best of Aardman Animation (1996)', 8.605308826224224, 27)
('Old Boy (2003)', 8.591162750243841, 39)
('"Boot', 8.563609859977248, 40)
('"Manchurian Candidate', 8.553446300523822, 30)
('Chinatown (1974)', 8.553315336452606, 59)
('Brazil (1985)', 8.546835835047412, 59)
('"Grand Day Out with Wallace and Gromit', 8.543870269780554, 28)
('Hoop Dreams (1994)', 8.540304276920823, 29)
("Schindler's List (1993)", 8.503066215109873, 220)
('Cool Hand Luke (1967)', 8.497215780

### 获取个人评分（Getting individual ratings）

In [61]:
my_movie = sc.parallelize([(0, 500)]) # Quiz Show (1994)
individual_movie_rating_RDD = new_ratings_model.predictAll(new_user_unrated_movies_RDD)
individual_movie_rating_RDD.take(5)

[Rating(user=0, product=81132, rating=6.311039272860391),
 Rating(user=0, product=69720, rating=4.133021050011033),
 Rating(user=0, product=155820, rating=2.3634374762933117),
 Rating(user=0, product=6156, rating=4.6498700932524875),
 Rating(user=0, product=4476, rating=2.9705558141689856)]

### 保持模型（Persisting the model）

In [31]:
from pyspark.mllib.recommendation import MatrixFactorizationModel

model_path = os.path.join('E:/miniAI/RecommendationSystem', 'models', 'movie_lens_als')

# Save and load model
model.save(sc, model_path)
same_model = MatrixFactorizationModel.load(sc, model_path)

### 题材和其它特征

我们没有使用genre和时间戳字段是为了简化转换和整个教程。合并它们不会带来任何问题。好的用法可能是过滤掉任何建议（例如，按流派的建议或最近的建议），就像我们用最少的评分标准所做的那样。