# 基于Spark的电影推荐

本例在pyspark中使用交替最小二乘法ALS来实现协同过滤推荐主要原因是它支持稀疏的输入数据（用户对物品评分是稀疏矩阵）。ALS算法常用于基于矩阵分解（或基于隐语义模型算法）的推荐系统中，例如：使用矩阵分解将用户矩阵分解为用户和隐类之间的关系矩阵，物品和隐类之间的关系矩阵的积，换句话说：首先对物品进行聚类分析，并计算出物品属于每个隐类的权重；然后，确定用户对哪一隐含类别特别感兴趣，在该类中，选择权重高的物品推荐给客户。ALS在最小化损失函数时，基本思想是：固定其中一类参数，使其变为单变量优化问题，利用解析方法进行优化（即求偏导），在反过来，固定先前优化的参数，再优化另一组参数；迭代进行，直到收敛。

In [1]:
import os
import math
import time
from pyspark import SparkContext
from pyspark.sql import SQLContext, Row, SparkSession
from pyspark.mllib.recommendation import ALS

sc = SparkContext()

#文件访问
small_raw_data = sc.textFile('ratings.csv')
small_data = small_raw_data.map(lambda line: line.split(",")).map(lambda col: (col[0], col[1], col[2])).cache()

In [2]:
#按照6:2:2分为训练集、验证集、测试集
training_RDD, validation_RDD, test_RDD = small_data.randomSplit([6, 2, 2], seed=10)
validation_predict_RDD = validation_RDD.map(lambda x: (x[0], x[1]))
test_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))


In [19]:
#ALS参数配置
seed = 5
#迭代次数
iterations = 10
#正则系数
regularization_param = 0.01
#矩阵分解的秩rank(即是矩阵的最高阶非零子阵)
ranks = [4, 8, 12]
errors = [0, 0, 0]
err = 0
tolerance = 0.02

#模型训练确认rank值（最小误差）
min_error = float('inf')
best_rank = -1
best_iteration = -1
for rank in ranks:
    model = ALS.train(training_RDD, rank, seed=seed, iterations=iterations, lambda_=regularization_param)
    predict = model.predictAll(validation_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
    rates_predictions = validation_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predict)
    error = math.sqrt(rates_predictions.map(lambda r: (r[1][0] - r[1][1]) ** 2).mean())
    errors[err] = error
    err += 1
    if error < min_error:
        min_error = error
        best_rank = rank

In [20]:
#以最佳rank值新重训练模型
model = ALS.train(training_RDD, best_rank, seed=seed, iterations=iterations, lambda_=regularization_param)
#模型测试
predictions = model.predictAll(test_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))

rates_and_predictions = test_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)

In [21]:
#计算RMSE指标
error = math.sqrt(rates_predictions.map(lambda r: (r[1][0] - r[1][1]) ** 2).mean())
print('Model RMSE = %s' % error)


Model RMSE = 1.2785490567481572


In [17]:
#预测某一用户对某一电影的评分
user_id = 76
movie_id = 10
predictedRating = model.predict(user_id, movie_id)
print("用户编号:"+str(user_id)+" 对电影:"+str(movie_id)+" 的评分为:"+str(predictedRating))


用户编号:76 对电影:10 的评分为:2.801511003072946


In [18]:
#向某一用户推荐10部电影
topKRecs = model.recommendProducts(user_id, 10)
print("向用户编号:"+str(user_id)+"的用户推荐10部电影:")
for rec in topKRecs:
    print(rec)



向用户编号:76的用户推荐10部电影:
Rating(user=76, product=67504, rating=6.145038516863403)
Rating(user=76, product=7371, rating=5.799425615918739)
Rating(user=76, product=2351, rating=5.769024042796536)
Rating(user=76, product=2267, rating=5.626008433587019)
Rating(user=76, product=390, rating=5.558889185840102)
Rating(user=76, product=565, rating=5.558889185840102)
Rating(user=76, product=31547, rating=5.530534662291334)
Rating(user=76, product=8609, rating=5.530534662291334)
Rating(user=76, product=80599, rating=5.530534662291334)
Rating(user=76, product=5059, rating=5.530534662291334)
