<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"><li><span><a href="#1.-pandas加载数据" data-toc-modified-id="1.-pandas加载数据-1">1. pandas加载数据</a></span><ul class="toc-item"><li><span><a href="#1.1-熟悉数据集" data-toc-modified-id="1.1-熟悉数据集-1.1">1.1 熟悉数据集</a></span></li><li><span><a href="#1.2-sklearn拆分数据集" data-toc-modified-id="1.2-sklearn拆分数据集-1.2">1.2 sklearn拆分数据集</a></span></li><li><span><a href="#1.3-pandas-df转为pyspark-df" data-toc-modified-id="1.3-pandas-df转为pyspark-df-1.3">1.3 pandas df转为pyspark df</a></span><ul class="toc-item"><li><span><a href="#1.3.1-熟悉sql.df的属性和方法" data-toc-modified-id="1.3.1-熟悉sql.df的属性和方法-1.3.1">1.3.1 熟悉sql.df的属性和方法</a></span></li></ul></li></ul></li><li><span><a href="#2-构建ALS模型" data-toc-modified-id="2-构建ALS模型-2">2 构建ALS模型</a></span></li><li><span><a href="#3-为用户推荐评分Top-5的电影" data-toc-modified-id="3-为用户推荐评分Top-5的电影-3">3 为用户推荐评分Top-5的电影</a></span><ul class="toc-item"><li><span><a href="#3.1-生成为所有训练集用户推荐的结果" data-toc-modified-id="3.1-生成为所有训练集用户推荐的结果-3.1">3.1 生成为所有训练集用户推荐的结果</a></span></li><li><span><a href="#3.2-为指定的用户和电影预测评分" data-toc-modified-id="3.2-为指定的用户和电影预测评分-3.2">3.2 为指定的用户和电影预测评分</a></span></li></ul></li><li><span><a href="#4评估模型" data-toc-modified-id="4评估模型-4">4评估模型</a></span></li><li><span><a href="#5-ALS参数不同取值，得到的均方根误差值汇总表如下：" data-toc-modified-id="5-ALS参数不同取值，得到的均方根误差值汇总表如下：-5">5 ALS参数不同取值，得到的均方根误差值汇总表如下：</a></span></li><li><span><a href="#6-总结：" data-toc-modified-id="6-总结：-6">6 总结：</a></span></li></ul></div>

- Action01项目要求：
    - 对MovieLens数据集进行评分预测
    - 工具：可以使用Surprise或者其他
    - 说明使用的模型，及简要原理
    - 我们需要补全评分矩阵，然后对指定用户，比如userID为1-5进行预测


jupyter中使用pyspark，需要先通过findspark运行。

In [1]:
import findspark
findspark.__version__

'1.4.2'

In [2]:
# jupyter连接pyspark
findspark.init()

In [3]:
import pyspark

In [4]:
pyspark.__version__

'3.0.1'

In [5]:
import pandas as pd
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator # 回归器评估

In [6]:
from pyspark.sql.types import * # 导入它的目的不清楚

In [7]:
from sklearn.model_selection import train_test_split # 拆分数据集

# 1. pandas加载数据

In [8]:
pd.set_option('display.float_format',  lambda x: '%.4f' % x)

In [9]:
# 用pandas读取
df = pd.read_csv('ratings.csv', 
                 usecols=['userId', 'movieId','rating'])
df.head(2)

Unnamed: 0,userId,movieId,rating
0,1,2,3.5
1,1,29,3.5


## 1.1 熟悉数据集

In [10]:
df.describe()

Unnamed: 0,userId,movieId,rating
count,1048575.0,1048575.0,1048575.0
mean,3527.0861,8648.9883,3.5293
std,2018.4244,19100.1439,1.0519
min,1.0,1.0,0.5
25%,1813.0,903.0,3.0
50%,3540.0,2143.0,4.0
75%,5233.0,4641.0,4.0
max,7120.0,130642.0,5.0


有1048575条记录，评分范围为`[0.5, 5.0]`。

In [11]:
userid = df['userId'].unique()
movieid = df['movieId'].unique()

In [12]:
userid.shape, movieid.shape

((7120,), (14026,))

可知，用户id数为7120个，电影id数为14026。

In [13]:
# 统计每部电影共被打分过多少次，默认按value降序
temp = df['movieId'].value_counts()

In [14]:
# 只被打过一次分数的电影，共计2558部电影
temp[temp == 1]

5342      1
100226    1
42176     1
118985    1
128510    1
         ..
67702     1
86330     1
51894     1
116159    1
6553      1
Name: movieId, Length: 2558, dtype: int64

In [15]:
new_df = df.set_index(keys='movieId', drop=False)

In [16]:
new_df.head(2)

Unnamed: 0_level_0,userId,movieId,rating
movieId,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
2,1,2,3.5
29,1,29,3.5


In [17]:
# 按被打分次数由高到低重新提取数据集
new_df = new_df.loc[temp.index]

In [19]:
# 只被评分过一次的电影
singles = new_df.iloc[-2558:]
singles.head(2)

Unnamed: 0_level_0,userId,movieId,rating
movieId,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
5342,5843,5342,2.0
100226,6738,100226,4.5


In [20]:
# 被评分过2次及以上的电影
need_spilted = new_df.iloc[:-2558]

## 1.2 sklearn拆分数据集 
pyspark的`sqldf.randomSplit()`方法拆分数据集时，无法实现分层抽样。为了保证每个userid都可以被拆分到2个数据集中，所用sklearn的train_test_split进行拆分；

In [21]:
x_train, x_test, y_train, y_test = train_test_split(need_spilted[['userId', 'movieId']], 
                                                    need_spilted['rating'],
                                                    test_size=0.2, 
                                                    random_state=30, 
                                                    stratify=need_spilted['movieId'])

In [22]:
type(x_train), type(y_train)

(pandas.core.frame.DataFrame, pandas.core.series.Series)

In [23]:
# 拼接被拆分的label和特征
x_train['rating'] = y_train
x_test['rating'] = y_test
x_train.shape, x_test.shape

((836813, 3), (209204, 3))

In [24]:
x_train['userId'].unique().shape, x_test['userId'].unique().shape

((7120,), (7115,))

In [25]:
x_train['movieId'].unique().shape, x_test['movieId'].unique().shape

((11468,), (10222,))

In [26]:
# 把被打分过一次的电影拼接进训练集中
x_train_ = pd.concat((x_train, singles), axis=0)

In [27]:
# 原数据集中用户id数和电影id数
userid.shape, movieid.shape

((7120,), (14026,))

In [28]:
x_train_['userId'].unique().shape, x_train_['movieId'].unique().shape

((7120,), (14026,))

拆分后训练集用户id数与总数据集用户id数相同，训练集的电影id与总数据集用户id数相同，说明训练集中已包含所有不同用户id和所有不同电影id。测试集用户id数、电影id数均少于训练集。但训练集中已包含所有用户id和电影id，因此可以确保测试集评估时不再出现空值。

## 1.3 pandas df转为pyspark df

In [29]:
sc = SparkContext()

In [30]:
sql_sc = SQLContext(sc)

In [31]:
%%time
# 把pandas的DataFrame转为pyspark下的DataFrame
spk_df_train = sql_sc.createDataFrame(data=x_train_)
spk_df_test = sql_sc.createDataFrame(data=x_test)

CPU times: user 24.2 s, sys: 234 ms, total: 24.4 s
Wall time: 28.3 s


### 1.3.1 熟悉sql.df的属性和方法

In [32]:
# 也是DataFrame类型，继承了pandas中部分df的方法
type(spk_df_train)

pyspark.sql.dataframe.DataFrame

In [33]:
# 查看数据列信息
spk_df_train.printSchema()

root
 |-- userId: long (nullable = true)
 |-- movieId: long (nullable = true)
 |-- rating: double (nullable = true)



In [34]:
# 查看总行数 和 字段列表
spk_df_train.count(), spk_df_train.columns

(839371, ['userId', 'movieId', 'rating'])

In [None]:
# # 将每条记录转成Row对象，以列表形式返回
# temp = spark_df_ratings.collect()

In [None]:
# type(temp), type(temp[0]), temp[:5]

In [35]:
# 查看训练集前5条记录, 输出结果为sql数据表风格
spk_df_train.show(n=5, truncate=True, vertical=False)

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|  6921|   6890|   4.5|
|  7032|   1645|   2.0|
|  6000|    353|   3.0|
|  6553|   7926|   4.0|
|   208|     29|   5.0|
+------+-------+------+
only showing top 5 rows



In [36]:
# 查看测试集前5条，take方法以列表形式返回Row对象
spk_df_test.take(5)

[Row(userId=741, movieId=7617, rating=5.0),
 Row(userId=3688, movieId=5072, rating=4.5),
 Row(userId=5198, movieId=1262, rating=4.0),
 Row(userId=1789, movieId=6537, rating=3.0),
 Row(userId=6175, movieId=53121, rating=4.0)]

In [37]:
%%time
# 支持where条件语句查询
spk_df_train.where(spk_df_train.userId==1).take(2)

CPU times: user 3.95 ms, sys: 2.47 ms, total: 6.42 ms
Wall time: 382 ms


[Row(userId=1, movieId=2804, rating=3.5),
 Row(userId=1, movieId=7454, rating=4.0)]

# 2 构建ALS模型

ALS显示反馈目标函数：
$$\underset{x,y}{min}\sum _{(u, i)\varepsilon K}(r_{ui} - x_{u}^{T}y_{i})^2 + \lambda (\sum _{u}||x_{u}||_{2}^{2} + \sum _{i}||y_{i}||_{2}^{2})$$

- $r_{ui}$实际评分
- $x_{u}^{T}y_{i}$用户向量与物品向量的内积，表示用户u 对物品i 的预测评分
- $\lambda (\sum _{u}||x_{u}||_{2}^{2} + \sum _{i}||y_{i}||_{2}^{2})$  L2正则项，保证数值计算稳定性，防止过拟合


pyspark的API:
```
pyspark.ml.recommendation.ALS()
```
- **参数：**
    - rank=10, 模型中潜在因子的数量（默认10，一般取10~1000，太小误差大；太大泛化能力差）
    - maxIter=10, 要运行的最大迭代次数（ 默认为 10）
    - regParam=0.1, 指定 ALS 中的正则化参数（ 默认为 0.1）
    - numUserBlocks=10, 用户矩阵将被分区为块以便并行计算的块数（默认为10）
    - numItemBlocks=10,  项目矩阵将被分区为块以便并行计算的块数（默认为10)
    - implicitPrefs=False, 指定是使用显式反馈ALS变体，还是使用适用于隐式反馈数据的变体；默认值 False，表示使用显式反馈
    - alpha=1.0, 适用于 ALS 的隐式反馈变量的参数，其控制偏好观察中的基线置信度（ 默认为 1.0）
    - userCol='user', 用户列的字段名称，默认'user'
    - itemCol='item', 项目列的字段名称，默认'item'
    - seed=None,
    - ratingCol='rating', 评分列的字段名称，默认'rating'
    - nonnegative=False, 指定是否对最小二乘使用非负约束（ 默认False）
    - checkpointInterval=10, 每10次迭代检查一次 
    - intermediateStorageLevel='MEMORY_AND_DISK',
    - finalStorageLevel='MEMORY_AND_DISK',
    - coldStartStrategy='nan',
    - blockSize=4096    
    

In [38]:
# 实例化模型
als = ALS(
    rank=3,  
    maxIter=10,
    regParam=0.1,
    userCol='userId',
    itemCol='movieId',
    ratingCol='rating'
)

In [39]:
%%time
# 训练模型
model = als.fit(spk_df_train)

CPU times: user 30.4 ms, sys: 9.62 ms, total: 40 ms
Wall time: 12.2 s


# 3 为用户推荐评分Top-5的电影

## 3.1 生成为所有训练集用户推荐的结果

In [40]:
%%time
# 为所有训练集用户推荐Top-5
recommends = model.recommendForAllUsers(numItems=5)

CPU times: user 1.12 ms, sys: 1.26 ms, total: 2.38 ms
Wall time: 444 ms


In [41]:
type(recommends)

pyspark.sql.dataframe.DataFrame

In [42]:
%%time
# 查找模型生成的为userID=100推荐的top5电影明细
user100 = recommends.where(recommends['userId'] == 100)
user100.collect()

CPU times: user 22.3 ms, sys: 9.53 ms, total: 31.8 ms
Wall time: 6.03 s


[Row(userId=100, recommendations=[Row(movieId=95218, rating=5.648101806640625), Row(movieId=40697, rating=5.163963794708252), Row(movieId=4261, rating=5.124624252319336), Row(movieId=82931, rating=5.112427711486816), Row(movieId=95776, rating=5.040829658508301)])]

In [48]:
type(user100)

pyspark.sql.dataframe.DataFrame

In [49]:
user100.printSchema()

root
 |-- userId: integer (nullable = false)
 |-- recommendations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- movieId: integer (nullable = true)
 |    |    |-- rating: float (nullable = true)



In [43]:
df[df['userId'] == 100][(df['movieId'] == 95218) | \
   (df['movieId'] == 40697) | (df['movieId'] == 4261) | (df['movieId'] ==82931)]

  


Unnamed: 0,userId,movieId,rating


查询没结果，说明推荐的top-5电影都是用户以前未评分过的，ALS模型填充了评分矩阵中缺失的评分，使得这些评分能反应用户的喜好程度，从而让我们能利用这个预测评分排名后，选取了top-5部电影给指定用户。

---

## 3.2 为指定的用户和电影预测评分

In [44]:
test = sql_sc.createDataFrame([[100, 95218], 
                               [100, 40697],
                               [100, 4261],
                               [100, 82931],
                               [100, 95776]], 
                               ['userId', 'movieId'])

In [45]:
# 为指定用户和电影预测评分，按评分排序
predictions = sorted(model.transform(test).collect(), 
                     key=lambda r: r[2], reverse=True)

In [46]:
predictions

[Row(userId=100, movieId=95218, prediction=5.648101806640625),
 Row(userId=100, movieId=40697, prediction=5.163963794708252),
 Row(userId=100, movieId=4261, prediction=5.124624252319336),
 Row(userId=100, movieId=82931, prediction=5.112427711486816),
 Row(userId=100, movieId=95776, prediction=5.040829658508301)]

可以看到预测的评分值，与3.1中`recommends=model.recommendForAllUsers()`中的对应用户和电影的评分一致。不同的是，recommends中只保存了每个用户预测评分top5的电影；而`model.transform()`可以为任何指定的用户id和电影id预测其评分。

# 4评估模型

In [53]:
df.head()

Unnamed: 0,userId,movieId,rating
0,1,2,3.5
1,1,29,3.5
2,1,32,3.5
3,1,47,3.5
4,1,50,3.5


In [54]:
# 取df中前5条数据进行预测, 按movieId升序排序
test = sql_sc.createDataFrame(df.iloc[:5])

sorted(model.transform(test).collect(), 
       key=lambda r: r[1], reverse=False)

[Row(userId=1, movieId=2, rating=3.5, prediction=3.3998942375183105),
 Row(userId=1, movieId=29, rating=3.5, prediction=3.707843780517578),
 Row(userId=1, movieId=32, rating=3.5, prediction=3.7725272178649902),
 Row(userId=1, movieId=47, rating=3.5, prediction=4.022514820098877),
 Row(userId=1, movieId=50, rating=3.5, prediction=4.220706462860107)]

可以看出预测值和实际值之间有出入，引入回归评估器评估预测效果：

In [55]:
preds = model.transform(spk_df_test)

In [56]:
preds.columns

['userId', 'movieId', 'rating', 'prediction']

In [57]:
preds.take(3)

[Row(userId=3439, movieId=148, rating=1.0, prediction=2.7326953411102295),
 Row(userId=4948, movieId=148, rating=3.0, prediction=2.7196924686431885),
 Row(userId=1716, movieId=148, rating=2.0, prediction=3.1051182746887207)]

In [58]:
# 查看数据列信息
preds.printSchema()

root
 |-- userId: long (nullable = true)
 |-- movieId: long (nullable = true)
 |-- rating: double (nullable = true)
 |-- prediction: float (nullable = false)



In [59]:
%%time
# 测试集记录数209204，非空值209204，说明出现0条空值记录
preds.dropna().count()

CPU times: user 32.7 ms, sys: 12.3 ms, total: 45.1 ms
Wall time: 14.9 s


209204

In [60]:
# 构建回归评估器，评估误差
evaluator = RegressionEvaluator(metricName='rmse', 
                                predictionCol='prediction',
                                labelCol='rating')

In [61]:
%%time
rms = evaluator.evaluate(preds.dropna())

CPU times: user 36.1 ms, sys: 12.5 ms, total: 48.7 ms
Wall time: 14.4 s


In [62]:
#  # 0.8395
"均方根误差值: {:.4f}".format(rms)

'均方根误差值: 0.8366'

---
# 5 ALS参数不同取值，得到的均方根误差值汇总表如下：

| 序号 | rank | maxIter | regParam |  rmse  |
|:----:|:----:|:-------:|:--------:|:------:|
|   1  |   3  |    10   |   0.01   | 0.8528 |
|   2  |   3  |    10   |    0.1   | 0.8395 |
|   3  |   3  |    15   |    0.1   | 0.8370 |
|   4  |   3  |    20   |    0.1   | 0.8376 |
|   5  |   3  |    10   |    0.5   | 0.9983 |
|   6  |  10  |    10   |    0.5   | 0.9982 |
|   7  |  10  |    10   |    0.1   | 0.8239 |
|   8  |  20  |    10   |    0.1   | 0.8219 |
|   9  |  20  |    20   |    0.1   | 0.8162 |
|  10  |  50  |    20   |    0.1   | 0.8153 |
|  11  |  3   |    10   |    2     | 2.1745 |
|      |      |         |          |        |

In [63]:
0.8153 / 5.0

0.16306

# 6 总结：

- 关于ALS算法：    

它通过`userCol`指定的用户列, 和`itemCol`指定的项目列，由`implicitPrefs`指定处理的是显示反馈还是隐式行为的场景，根据`rank`指定的隐特征个数拟合评分矩阵；拟合后再通过`recommendForAllUsers(N)`为所有用户生成评分最高的Top-N的推荐结果，根据指定的用户id即可在生成的结果中查找推荐结果。

pyspark进行拟合时，ALS的`rank, maxIter, regParam`三个参数的取值会影响rmse。    

当rank太小时，单纯提高迭代次数maxIter很快会过拟合。    

汇总表序号2和序号10的结果可以看出，rank、maxIter同时提高取值，rmse会更好的降低；但增大rank和maxIter耗时会增加，rank取值1000的时候，跑了1个多小时仍没跑完。

- 其他注意事项：
    1. 必须保证训练集中包含所有的用户id和电影id，即：要保证训练集样本覆盖全部用户和电影，如：该数据集中共有7120个不同用户、14026部不同电影，则训练集中必须包含7120个用户和14026部电影下的部分数据；
    2. 如果训练集中未覆盖全部用户和电影，则训练集评估时预测值将会出现空值，导致rmse也是空值，影响拟合；
    3. 虽然真实数据集中rating的值在`[0.5, 5.0]`范围，而预测评分会出现大于5.0的情况。经过多次尝试，发现当rmse误差值降低后，大于5.0的评分情况会减少，所以猜测是因为误差过大导致（最低rmse占最高真实评分5.0的16.31%）；
    4. 预测结果DataFrame的字段prediction数据列信息是: `float (nullable = false)`，当`nullable = false`不代表一定原空值;
    5. SQLContext是Spark SQL进行结构化数据处理的入口，它兼具了pandas的DataFrame的大部分方法和SQL语句，实际应用中可以多尝试。