#### <font color=red>这份代码是在我的windows笔记本上运行的结果，云平台上直接运行会出错，同学们可以参考如下链接中的环境搭建方法，在自己电脑上运行</font>

http://47.93.208.249:9925/notebooks/0.Teacher/Online/frist_read_me.ipynb

## Spark推荐系统

基于spark中ALS的推荐系统，针对movielens中电影打分数据做推荐

In [1]:
import sys
import itertools
from math import sqrt
from operator import add
from os.path import join, isfile, dirname

from pyspark import SparkConf, SparkContext
from pyspark.mllib.recommendation import ALS

In [2]:
def parseRating(line):
    """
        MovieLens的打分格式是userId::movieId::rating::timestamp
        我们对格式做一个解析
    """
    fields = line.strip().split("::")
    return int(fields[3]) % 10, (int(fields[0]), int(fields[1]), float(fields[2]))

def parseMovie(line):
    """
        对应的电影文件的格式为movieId::movieTitle
        解析成int id, 文本
    """
    fields = line.strip().split("::")
    return int(fields[0]), fields[1]

def loadRatings(ratingsFile):
    """
        载入得分
    """
    if not isfile(ratingsFile):
        print("File %s does not exist." % ratingsFile)
        sys.exit(1)
    f = open(ratingsFile, 'r')
    ratings = filter(lambda r: r[2] > 0, [parseRating(line)[1] for line in f])
    f.close()
    if not ratings:
        print("No ratings provided.")
        sys.exit(1)
    else:
        return ratings

def computeRmse(model, data, n):
    """
        评估的时候要用的，计算均方根误差
    """
    predictions = model.predictAll(data.map(lambda x: (x[0], x[1])))
    predictionsAndRatings = predictions.map(lambda x: ((x[0], x[1]), x[2])) \
      .join(data.map(lambda x: ((x[0], x[1]), x[2]))) \
      .values()
    return sqrt(predictionsAndRatings.map(lambda x: (x[0] - x[1]) ** 2).reduce(add) / float(n))

注： 有SparkContext的这个cell只能运行一次，否则会报错：ValueError: Cannot run multiple SparkContexts at once（再次运行需要重启kernel）

In [3]:
# 设定环境
conf = SparkConf() \
  .setAppName("MovieLensALS") \
  .set("spark.executor.memory", "2g")
sc = SparkContext(conf=conf)

# 载入打分数据
myRatings = loadRatings('./movielens/ml-1m/ratings.dat')
myRatingsRDD = sc.parallelize(myRatings, 1)

movieLensHomeDir = './movielens/ml-1m'

# 得到的ratings为(时间戳最后一位整数, (userId, movieId, rating))格式的RDD
ratings = sc.textFile(join(movieLensHomeDir, "ratings.dat")).map(parseRating)

# 得到的movies为(movieId, movieTitle)格式的RDD
movies = dict(sc.textFile(join(movieLensHomeDir, "movies.dat")).map(parseMovie).collect())

numRatings = ratings.count()
numUsers = ratings.values().map(lambda r: r[0]).distinct().count()
numMovies = ratings.values().map(lambda r: r[1]).distinct().count()

print("Got %d ratings from %d users on %d movies." % (numRatings, numUsers, numMovies))

# 根据时间戳最后一位把整个数据集分成训练集(60%), 交叉验证集(20%), 和评估集(20%)

# 训练, 交叉验证, 测试 集都是(userId, movieId, rating)格式的RDD

numPartitions = 4
training = ratings.filter(lambda x: x[0] < 6) \
  .values() \
  .union(myRatingsRDD) \
  .repartition(numPartitions) \
  .cache()

validation = ratings.filter(lambda x: x[0] >= 6 and x[0] < 8) \
  .values() \
  .repartition(numPartitions) \
  .cache()

test = ratings.filter(lambda x: x[0] >= 8).values().cache()

numTraining = training.count()
numValidation = validation.count()
numTest = test.count()

print("Training: %d, validation: %d, test: %d" % (numTraining, numValidation, numTest))

# 训练模型，在交叉验证集上看效果

ranks = [8, 12]
lambdas = [0.1, 10.0]
numIters = [10, 20]
bestModel = None
bestValidationRmse = float("inf")
bestRank = 0
bestLambda = -1.0
bestNumIter = -1

for rank, lmbda, numIter in itertools.product(ranks, lambdas, numIters):
    model = ALS.train(training, rank, numIter, lmbda)
    validationRmse = computeRmse(model, validation, numValidation)
    print("RMSE (validation) = %f for the model trained with " % validationRmse + \
          "rank = %d, lambda = %.1f, and numIter = %d." % (rank, lmbda, numIter))
    if (validationRmse < bestValidationRmse):
        bestModel = model
        bestValidationRmse = validationRmse
        bestRank = rank
        bestLambda = lmbda
        bestNumIter = numIter

testRmse = computeRmse(bestModel, test, numTest)

# 在测试集上评估 交叉验证集上最好的模型
print("The best model was trained with rank = %d and lambda = %.1f, " % (bestRank, bestLambda) \
  + "and numIter = %d, and its RMSE on the test set is %f." % (bestNumIter, testRmse))

# 我们把基线模型设定为每次都返回平均得分的模型
meanRating = training.union(validation).map(lambda x: x[2]).mean()
baselineRmse = sqrt(test.map(lambda x: (meanRating - x[2]) ** 2).reduce(add) / numTest)
improvement = (baselineRmse - testRmse) / baselineRmse * 100
print("The best model improves the baseline by %.2f" % (improvement) + "%.")

# 个性化的推荐(针对某个用户)

myRatedMovieIds = set([x[1] for x in myRatings])
candidates = sc.parallelize([m for m in movies if m not in myRatedMovieIds])
predictions = bestModel.predictAll(candidates.map(lambda x: (0, x))).collect()
recommendations = sorted(predictions, key=lambda x: x[2], reverse=True)[:50]

print("Movies recommended for you:")
for i in range(len(recommendations)):
    print("%2d: %s" % (i + 1, movies[recommendations[i][1]])).encode('ascii', 'ignore')

# clean up
sc.stop()

Got 1000209 ratings from 6040 users on 3706 movies.
Training: 1602450, validation: 198919, test: 199049
RMSE (validation) = 0.845176 for the model trained with rank = 8, lambda = 0.1, and numIter = 10.
RMSE (validation) = 0.838930 for the model trained with rank = 8, lambda = 0.1, and numIter = 20.
RMSE (validation) = 3.756192 for the model trained with rank = 8, lambda = 10.0, and numIter = 10.
RMSE (validation) = 3.756192 for the model trained with rank = 8, lambda = 10.0, and numIter = 20.
RMSE (validation) = 0.837425 for the model trained with rank = 12, lambda = 0.1, and numIter = 10.
RMSE (validation) = 0.830261 for the model trained with rank = 12, lambda = 0.1, and numIter = 20.
RMSE (validation) = 3.756192 for the model trained with rank = 12, lambda = 10.0, and numIter = 10.
RMSE (validation) = 3.756192 for the model trained with rank = 12, lambda = 10.0, and numIter = 20.
The best model was trained with rank = 12 and lambda = 0.1, and numIter = 20, and its RMSE on the test s