# 预测电影评级
大数据最常见的用途之一是预测用户想要的内容。这样，Google就可以向您展示相关广告，亚马逊推荐相关产品，以及Netflix推荐您喜欢的电影。本实验将演示如何使用Apache Spark向用户推荐电影。我们将从一些基本技术开始，然后使用Spark ML库的Alternating Least Squares方法进行更复杂的预测。

对于本实验，我们将使用2000万个评级的子集数据集。此数据集预先安装在Databricks上，来自MovieLens稳定基准评级数据集。但是，您编写的相同代码也适用于完整数据集（尽管使用Community Edition上的完整数据集运行可能需要相当长的时间）。

在这个实验室中：
- 第0部分：预赛
- 第1部分：基本建议
- 第2部分：协作过滤
- 第3部分：对自己的预测

正如在第一个学习Spark实验室中提到的那样，在对任何数据集调用collect（）之前要仔细考虑。当您使用一个小数据集时，调用collect（）然后使用Python来了解本地数据（在驱动程序中）将正常工作，但是当您使用不具有大数据集的大数据集时，这将不起作用适合一台机器的内存。调用collect（）并进行可能用Spark完成的本地分析的解决方案可能会在自动编程器中失败并且不会获得完全信用。

# 代码
可以使用基本Python和pySpark DataFrame转换和操作完成此分配。 不需要数学以外的库。 除了我们在此作业中引入的ML函数之外，您应该只使用您在之前的实验练习中使用的Spark函数来完成此作业的所有部分（尽管如果您可以使用Spark的更多功能） 喜欢！）。

我们将使用电影数据，去年使用的CS100.1x数据相同。 但是，在本课程中，我们使用的是DataFrame，而不是RDD。

以下单元格定义数据文件的位置。 如果您想在自己的机器上运行本实验的导出版本（即在Databricks之外），您需要下载自己的2000万电影数据集副本，并且需要调整下面的路径。

**待办事项**：运行以下单元格。

In [1]:
import os
# from databricks_test_helper import Test

dbfs_dir = '../ml-latest-small/'
ratings_filename = os.path.join(dbfs_dir, 'ratings.csv')
movies_filename = os.path.join(dbfs_dir, 'movies.csv')

## 第0部分：预赛
我们读入每个文件并创建一个由解析行组成的DataFrame。

**2000万电影样本**
2000万电影样本由CSV文件（带标题）组成，因此无需手动解析文件，因为Spark CSV可以完成这项工作。

In [2]:
# import dbutils
# display(dbutils.fs.ls(dbfs_dir))

### CPU与I / O权衡
请注意，我们有压缩文件（以.gz结尾）和未压缩文件。 我们这里有CPU与I / O权衡。 如果I / O是瓶颈，那么我们想要处理压缩文件并支付额外的CPU开销。 如果CPU是瓶颈，则处理未压缩文件更有意义。

我们已经做了一些实验，我们已经确定在Community Edition上，CPU比I / O更容易成为瓶颈。 因此，我们将处理未压缩的数据。 此外，我们将通过显式指定DataFrame架构来进一步加快速度。 （当Spark CSV适配器从CSV文件中推断出架构时，它必须对文件进行额外的传递。这会减慢这里的速度，而且实际上并不是必需的。）

**待办事项**：运行以下单元格，该单元格将定义模式。

In [3]:
from pyspark.sql.types import *

ratings_df_schema = StructType(
  [StructField('userId', IntegerType()),
   StructField('movieId', IntegerType()),
   StructField('rating', DoubleType())]
)
movies_df_schema = StructType(
  [StructField('ID', IntegerType()),
   StructField('title', StringType())]
)


In [4]:
from pyspark.sql.functions import regexp_extract
from pyspark.sql.types import *
from pyspark.sql import SparkSession

spark = SparkSession.builder \
     .master('local[*]') \
     .appName('Daddy') \
     .getOrCreate()

### 加载和缓存
Databricks文件系统（DBFS）位于S3之上。 我们将要访问这些数据。 我们不是一遍又一遍地从S3中读取它，而是将电影DataFrame和评级DataFrame缓存在内存中。

**待办事项**：运行以下单元格以加载和缓存数据。 请耐心等待：代码大约需要30秒才能运行。

In [5]:
ratings_df = spark.read.format('csv') \
                .options(header=True, inferSchema=False) \
                .schema(ratings_df_schema).load(ratings_filename)

movies_df = spark.read.format('csv') \
                .options(header=True, inferSchema=False) \
                .schema(movies_df_schema).load(movies_filename)
ratings_df.cache()
movies_df.cache()

DataFrame[ID: int, title: string]

## 第1部分：基本建议
推荐电影的一种方法是始终推荐平均评分最高的电影。在这部分中，我们将使用Spark查找平均评分最高且至少500条评论的20部电影的名称，评分数和平均评分。我们希望过滤我们的电影评分较高但大于或等于500条评论，因为评论较少的电影可能不会对每个人产生广泛的吸引力。

### （1a）平均评分最高的电影
让我们确定平均评分最高的电影。

您应该执行的步骤是：

1. 回想一下ratings_df包含三列：
- 评价电影的用户的ID
- 被评级的电影的ID
- 和评级。

首先，将ratings_df转换为第二个DataFrame，movie_ids_with_avg_ratings，其中包含以下列：

- 电影ID
- 电影的评分数量
- 所有电影评级的平均值

2. 将movie_ids_with_avg_ratings转换为另一个DataFrame，movie_names_with_avg_ratings_df，将电影名称添加到每一行。 movie_names_with_avg_ratings_df将包含以下列：

- 电影ID
- 电影名称
- 电影的评分数量
- 所有电影评级的平均值

**提示**：你需要加入。

您最终应该得到以下内容：

In [6]:
from pyspark.sql import functions as F
movie_ids_with_avg_ratings_df = ratings_df.groupBy('movieId') \
        .agg(
                F.count(ratings_df.rating).alias("count"),
                F.avg(ratings_df.rating).alias("average"))

print("movie_ids_with_avg_ratings_df:")
movie_ids_with_avg_ratings_df.show(3)

movie_names_df = \
    movie_ids_with_avg_ratings_df \
        .join(movies_df, 
              movie_ids_with_avg_ratings_df['movieId']==movies_df['Id'])
movie_names_with_avg_ratings_df = movie_names_df.drop('ID')
print("movie_names_with_avg_ratings_df:")
movie_names_with_avg_ratings_df.show(3)

movie_ids_with_avg_ratings_df:
+-------+-----+-----------------+
|movieId|count|          average|
+-------+-----+-----------------+
|   1580|  165|3.487878787878788|
|   2366|   25|             3.64|
|   3175|   75|             3.58|
+-------+-----+-----------------+
only showing top 3 rows

movie_names_with_avg_ratings_df:
+-------+-----+-----------------+--------------------+
|movieId|count|          average|               title|
+-------+-----+-----------------+--------------------+
|   1580|  165|3.487878787878788|Men in Black (a.k...|
|   2366|   25|             3.64|    King Kong (1933)|
|   3175|   75|             3.58| Galaxy Quest (1999)|
+-------+-----+-----------------+--------------------+
only showing top 3 rows



### （1b）具有最高平均评分和至少250评论的电影
现在我们有一个平均收视率最高的电影的数据框，我们可以使用Spark来确定平均收视率最高的20部电影和至少250个评论。

添加单个DataFrame转换（代替下面的 \\<FILL_IN\\>），将结果限制为评分至少为250人的电影。

In [12]:
movies_with_250_ratings_or_more = \
    movie_names_with_avg_ratings_df.where(movie_names_with_avg_ratings_df["count"]>=250)
print('Movies with highest ratings:')
movies_with_250_ratings_or_more.show(20, truncate=False)

Movies with highest ratings:
+-------+-----+-----------------+-----------------------------------------+
|movieId|count|average          |title                                    |
+-------+-----+-----------------+-----------------------------------------+
|296    |307  |4.197068403908795|Pulp Fiction (1994)                      |
|593    |279  |4.161290322580645|Silence of the Lambs, The (1991)         |
|2571   |278  |4.192446043165468|Matrix, The (1999)                       |
|318    |317  |4.429022082018927|Shawshank Redemption, The (1994)         |
|356    |329  |4.164133738601824|Forrest Gump (1994)                      |
|260    |251  |4.231075697211155|Star Wars: Episode IV - A New Hope (1977)|
+-------+-----+-----------------+-----------------------------------------+



使用评论数量的阈值是改进建议的一种方法，但还有许多其他改善质量的好方法。 例如，您可以通过评级数量来评分。

## 第2部分：协作过滤
在本课程中，您了解了Spark允许我们应用于分布式数据集的许多基本转换和操作。 Spark还暴露了一些更高级别的功能; 特别是，机器学习使用Spark的一个组件称为MLlib。 在本部分中，您将学习如何使用MLlib使用我们一直在分析的电影数据制作个性化的电影推荐。
我们将使用一种称为协同过滤的技术。协同过滤是一种通过从许多用户收集偏好或品味信息（协作）来自动预测（过滤）关于用户兴趣的方法。协同过滤方法的基本假设是，如果一个人A在一个问题上与一个人B具有相同的意见，那么A更有可能对另一个问题x有一个B的意见，而不是对一个人选择x的意见随机。您可以在此处详细了解协作过滤。

右侧的图像（来自维基百科）显示了使用协同过滤预测用户评级的示例。首先，人们评价不同的项目（如视频，图像，游戏）。之后，系统正在预测用户对项目的评级，用户尚未评级。这些预测建立在其他用户的现有评级之上，这些用户与活跃用户具有相似的评级。例如，在下面的图像中，系统已做出预测，即活动用户不喜欢该视频。

---

对于电影推荐，我们从一个矩阵开始，其条目是用户的电影评级（在下图中以红色显示）。每列代表一个用户（以绿色显示），每行代表一个特定的电影（以蓝色显示）。

由于并非所有用户都对所有电影进行了评分，因此我们不知道此矩阵中的所有条目，这正是我们需要协作过滤的原因。对于每个用户，我们只对部分电影进行评分。通过协同过滤，我们的想法是通过将其分解为两个矩阵的乘积来近似评级矩阵：一个描述每个用户的属性（以绿色显示），另一个描述每个电影的属性（以蓝色显示）。
![图1](./pic/matrix_factorization.png)
我们想要选择这两个矩阵，以便最小化我们知道正确评级的用户/电影对的错误。 [交替最小二乘算法(ASL)](https://en.wikiversity.org/wiki/Least-Squares_Method)通过首先用值随机填充用户矩阵然后优化电影的值使得误差最小化来实现这一点。 然后，它保持电影矩阵不变并优化用户矩阵的值。 这种在优化矩阵之间的交替是名称中“交替”的原因。

此优化正如上图中右侧所示。 给定一组固定的用户因素（即用户矩阵中的值），我们使用已知的评级来使用在图的底部公式的优化来找到电影因子的最佳值。 然后我们“交替”并选择给定固定电影因素的最佳用户因素。

有关用户和电影矩阵的简单示例，请查看[第2讲](https://courses.edx.org/courses/course-v1:BerkeleyX+CS110x+2T2016/courseware/9d251397874d4f0b947b606c81ccf83c/3cf61a8718fe4ad5afcd8fb35ceabb6e/)的[视频或第8讲](https://d37djvu3ytnwxt.cloudfront.net/assets/courseware/v1/fb269ff9a53b669a46d59e154b876d78/asset-v1:BerkeleyX+CS110x+2T2016+type@asset+block/Lecture2s.pdf)的幻灯片。

### （2a）创建训练集
在我们开始使用机器学习之前，我们需要将ratings_df数据集分解为三个部分：

- 训练集（DataFrame），我们将用它来训练模型
- 验证集（DataFrame），我们将使用它来选择最佳模型
- 测试集（DataFrame），我们将用于我们的实验

要将数据集随机拆分为多个组，我们可以使用pySpark randomSplit（）转换。 randomSplit（）接受一组拆分和种子并返回多个DataFrame。

In [8]:
split_60_df, split_a_20_df, split_b_20_df = ratings_df.randomSplit([0.6,0.2,0.2],5)

training_df = split_60_df.cache()
validation_df = split_a_20_df.cache()
test_df = split_b_20_df.cache()

print('Training: {0}, validation: {1}, test: {2}\n'.format(
  training_df.count(), validation_df.count(), test_df.count())
)
training_df.show(3)
validation_df.show(3)
test_df.show(3)

Training: 60343, validation: 20222, test: 20271

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|      1|   4.0|
|     1|      3|   4.0|
|     1|     47|   5.0|
+------+-------+------+
only showing top 3 rows

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|      6|   4.0|
|     1|    235|   4.0|
|     1|    349|   4.0|
+------+-------+------+
only showing top 3 rows

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|    157|   5.0|
|     1|    216|   5.0|
|     1|    231|   5.0|
+------+-------+------+
only showing top 3 rows



拆分数据集后，您的训练集大约有1200万个条目，验证和测试集各有大约400万个条目。 （由于randomSplit（）转换的随机性，每个数据集中的条目的确切数量会略有不同。）

### （2b）交替最小二乘
在这一部分中，我们将使用Apache Spark ML Pipeline实现交替最小二乘法，ALS。 ALS采用训练数据集（DataFrame）和几个控制模型创建过程的参数。要确定参数的最佳值，我们将使用ALS训练多个模型，然后我们将选择最佳模型，并在本练习的其余部分中使用该模型中的参数。

我们将用于确定最佳模型的过程如下：

1. 选择一组模型参数。模型最重要的参数是秩，即用户矩阵中的列数（上图中的绿色）或电影矩阵中的行数（上图中的蓝色）。通常，较低等级将意味着训练数据集上的较高误差，但是较高等级可能导致过度拟合。我们将使用training_df数据集训练排名为4,8和12的模型。

2. 在ALS对象上设置适当的参数：

- “用户”列将设置为userId DataFrame列中的值。
- “Item”列将设置为movieId DataFrame列中的值。
- “评级”列将设置为我们的评级DataFrame列中的值。
- 我们将使用0.1的正则化参数。

**注意**：请仔细阅读ALS类的文档。它将帮助您完成此步骤。

3. 让ALS输出转换（即ALS.fit（）的结果）产生一个名为“预测”的新列，其中包含预测值。

4. 使用ALS.fit（）创建多个模型，每个模型对应一个等级值。我们将适应训练数据集（training_df）。

5. 对于每个模型，我们将针对我们的验证数据集（validation_df）运行预测并检查错误。

6. 我们将保持模型的最佳错误率。

### 我们为什么要进行自己的交叉验证？
协作过滤的一个挑战是如何为新用户（根本没有提供任何评级的用户）提供评级。一些推荐系统选择向新用户提供一组默认评级（例如，所有评级的平均值），而其他推荐系统选择不为新用户提供评级。当要求为新用户提供评级时，Spark的ALS算法会生成NaN（非数字）值。

因此，使用具有ALS的ML管道的CrossValidator是有问题的，因为交叉验证涉及将训练数据划分为一组折叠（例如，三组），然后使用这些折叠来在参数网格搜索过程期间测试和评估参数。某些折叠可能包含不在其他折叠中的用户，因此，ALS会为这些新用户生成NaN值。当CrossValidator使用Evaluator（RMSE）计算错误度量时，RMSE算法将返回NaN。这将使参数网格中的所有参数看起来同样好（或坏）。

您可以阅读Spark JIRA 14489关于此问题的讨论。建议使用ALS提供默认值或使RMSE降低NaN值的变通方法。两者都引入潜在问题。我们选择让RMSE降低NaN值。虽然这不能解决ALS不能预测新用户价值的根本问题，但它确实提供了一些评估价值。我们使用for循环（下面）手动实现参数网格搜索过程，并在使用RMSE之前删除NaN值。

对于生产应用程序，您可能需要考虑如何处理新用户的权衡。

**注意**：此单元格可能需要几分钟才能运行。

In [26]:
from pyspark.ml.recommendation import ALS

als = ALS()

seed = 5

als.setMaxIter(5) \
   .setSeed(seed) \
   .setRegParam(0.1) \
   .setUserCol("userId").setItemCol("movieId").setRatingCol("rating")

from pyspark.ml.evaluation import RegressionEvaluator

reg_eval = RegressionEvaluator(
                predictionCol="prediction", 
                labelCol='rating', 
                metricName='rmse')

tolerance = 0.03
ranks = [4, 8, 12]
# ranks = [12, 8, 4]
errors = [0, 0, 0]
models = [0, 0, 0]
err = 0
min_error = float('inf')
best_rank = -1
for rank in ranks:
    als.setRank(rank)
    model = als.fit(training_df)
    predict_df = model.transform(validation_df)
    
    predicted_ratings_df = predict_df.filter(predict_df.prediction != float('nan'))
    
    error = reg_eval.evaluate(predicted_ratings_df)
    errors[err] = error
    models[err] = model
    print("For rank %s the RMSE is %s" % (rank, error))
    if error < min_error:
        min_error = error
        best_rank = err
    err += 1
    print(models)
als.setRank(ranks[best_rank])
print("The best mode was trained with rank %s" % ranks[best_rank])
my_model = models[best_rank]

For rank 4 the RMSE is 0.9192889621485235
[ALS_44bbbfa2439c495e80a1, 0, 0]
For rank 8 the RMSE is 0.9227985582444523
[ALS_44bbbfa2439c495e80a1, ALS_44bbbfa2439c495e80a1, 0]
For rank 12 the RMSE is 0.9148624839481305
[ALS_44bbbfa2439c495e80a1, ALS_44bbbfa2439c495e80a1, ALS_44bbbfa2439c495e80a1]
The best mode was trained with rank 12


### （2c）测试您的模型
到目前为止，我们使用training_df和validation_df数据集来选择最佳模型。 由于我们使用这两个数据集来确定哪种模型最好，我们不能用它们来测试模型的好坏; 否则，我们很容易过度拟合。 为了确定我们的模型有多好，我们需要使用test_df数据集。 我们将使用您在（2b）部分中确定的best_rank来创建用于预测测试数据集的评级的模型，然后我们将计算RMSE。

您应该执行的步骤是：

- 使用上面创建的my_model在测试数据集（test_df）上运行预测，生成新的predict_df DataFrame。
- 过滤掉不需要的NaN值（由于Spark中的错误，这是必需的）。 我们已经为您提供了这段代码。
- 使用先前创建的RMSE评估程序reg_eval来评估过滤的DataFrame。

In [46]:
predict_df = my_model.transform(test_df)

predicted_ratings_df = predict_df.filter(predict_df.prediction != float('nan'))

error = reg_eval.evaluate(predicted_ratings_df)

print("测试集上的误差为: %s" % error)
print("测试集上的数据预测示例")
predicted_ratings_df.show(10)

测试集上的误差为: 0.9167305919316049
测试集上的数据预测示例
+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|   409|    471|   3.0| 3.9891672|
|   372|    471|   3.0| 3.0194802|
|   182|    471|   4.5| 3.3084137|
|   474|    471|   3.0| 2.9631383|
|   273|    471|   5.0| 4.6632466|
|   411|    471|   4.0| 3.1806765|
|   260|    471|   4.5| 3.5837674|
|   373|    471|   5.0| 3.5970507|
|   606|   1088|   3.0|  3.119496|
|   132|   1088|   4.0|  3.202322|
+------+-------+------+----------+
only showing top 10 rows



### （2d）比较你的模型
查看模型预测的结果与测试集中的值的RMSE是评估模型质量的一种方法。 评估模型的另一种方法是评估测试集中的误差，其中每个评级是训练集的平均评级。

您应该执行的步骤是：

- 使用training_df计算该训练数据集中所有电影的平均评分。
- 使用您刚刚确定的平均评分和test_df创建一个DataFrame（test_for_avg_df），其中包含一个包含平均评分的预测列。 提示：您将要使用来自`pyspark.sql.functions`的`lit()`函数，此处可用作`F.lit()`。
- 使用我们先前创建的`reg_eval`对象来评估`test_for_avg_df`并计算RMSE。

   

```
Help on function lit in module pyspark.sql.functions:

lit(col)
    Creates a :class:`Column` of literal value.

    >>> df.select(lit(5).alias('height')).withColumn('spark_user', lit(True)).take(1)
    [Row(height=5, spark_user=True)]

    .. versionadded:: 1.3
```

## 第3部分：对自己的预测
本练习的最终目标是预测要向自己推荐的电影。 为此，您首先需要为ratings_df数据集添加自己的评级。

### （3a）你的电影评级

为了帮助您为自己提供评分，我们提供了以下代码，列出了我们在实验室第1部分中创建的movies_with_500_ratings_or_more中评价最高的50部电影的名称和电影ID。

movieId	count	average	title

用户ID 0未分配，因此我们将其用于您的评级。 我们将变量my_user_ID设置为0。 接下来，创建一个名为my_ratings_df的新DataFrame，其评分至少为10个电影评级。 每个条目的格式应为（my_user_id，movieID，rating）。 与原始数据集一样，评级应介于1和5之间（含）。 如果你还没有看过这些电影中的至少10部，你可以在上面的单元格中增加传递给take（）的参数，直到你看过10部电影（或者你也可以猜出你对电影的评分是多少） 没有见过）。

In [51]:
from pyspark.sql import Row
my_user_id = 0

# Note that the movie IDs are the *last* number on each line. A common error was to use the number of ratings as the movie ID.
my_rated_movies = [
      (my_user_id, 1193, 3.5),
      (my_user_id, 914, 2.5),
      (my_user_id, 2355, 4.2),
      (my_user_id, 1287, 3.7),
      (my_user_id, 594, 3.1),
      (my_user_id, 595, 2.6),
      (my_user_id, 2398, 1.7),
      (my_user_id, 1035, 4.0),
      (my_user_id, 2687, 5.0),
      (my_user_id, 3105, 4.7),
      (my_user_id, 1270, 2.5),
     # The format of each line is (my_user_id, movie ID, your rating)
     # For example, to give the movie "Star Wars: Episode IV - A New Hope (1977)" a five rating, you would add the following line:
     #   (my_user_id, 260, 5),
]

my_ratings_df = spark.createDataFrame(my_rated_movies, ['userId','movieId','rating'])
print('My movie ratings:')
display(my_ratings_df.show(10))

My movie ratings:
+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     0|   1193|   3.5|
|     0|    914|   2.5|
|     0|   2355|   4.2|
|     0|   1287|   3.7|
|     0|    594|   3.1|
|     0|    595|   2.6|
|     0|   2398|   1.7|
|     0|   1035|   4.0|
|     0|   2687|   5.0|
|     0|   3105|   4.7|
+------+-------+------+
only showing top 10 rows



None

### （3b）将您的电影添加到训练数据集
现在您已经为自己获得了评分，您需要将您的评分添加到训练数据集中，以便您训练的模型将包含您的偏好。 Spark的unionAll（）转换结合了两个DataFrame; 使用unionAll（）创建一个新的训练数据集，其中包括您的评级和原始训练数据集中的数据。

In [52]:
training_with_my_ratings_df = training_df.unionAll(my_ratings_df)

print ('The training dataset now has %s more entries than the original training dataset' %
       (training_with_my_ratings_df.count() - training_df.count()))

The training dataset now has 11 more entries than the original training dataset


### （3c）用您的评级训练模型
现在，训练一个模型，其中添加了您的评级以及您在第（2b）和（2c）部分中使用的参数。 确保包含所有参数。

注意：此单元格大约需要30秒才能运行。

In [87]:
als.setPredictionCol('prediction') \
   .setMaxIter(5) \
   .setSeed(seed) \
   .setRegParam(0.1) \
   .setUserCol('userId').setItemCol('movieId').setRatingCol('rating') \
   .setRank(ranks[best_rank])

my_ratings_model = als.fit(training_with_my_ratings_df)

### （3d）使用您的评级检查RMSE的新模型
在测试集上计算此新模型的RMSE。

根据test_df中设置的测试数据运行模型（刚训练的模型）。
然后，使用我们先前计算的reg_eval对象来计算您的评级的RMSE。

In [64]:
my_predict_df = my_ratings_model.transform(test_df).cache()

predicted_test_my_ratings_df = \
    my_predict_df.filter(my_predict_df.prediction != float('nan'))
test_RMSE_my_ratings = reg_eval.evaluate(predicted_test_my_ratings_df)
print('The model had a RMSE on the test set of {0}'.format(test_RMSE_my_ratings))

The model had a RMSE on the test set of 0.9219973035931176


In [69]:
# 有一些值预测不出来？？
# TODO
nan_perc = my_predict_df.filter(my_predict_df.prediction == float('nan')).count() / \
    my_predict_df.count()
print("测试集中共有 %s 的数据无法预测" % nan_perc)

测试集中共有 0.047506289773568154 的数据无法预测


In [97]:
my_predict_df.filter(my_predict_df.prediction == float('nan'))['userId']

Column<b'userId'>

### （3e）预测你的评分
到目前为止，我们只计算了模型的误差。接下来，让我们预测您将给予您尚未提供评级的电影的评分。

您应该执行的步骤是：

- 过滤掉您已手动评分的电影。 （使用my_rated_movie_ids变量。）将结果放入新的not_rated_df中。

**提示**：`Column.isin（）`方法以及`〜`（“not”）DataFrame逻辑运算符可能会派上用场。以下是使用`isin（）`的示例：
```
    > df1 = sqlContext.createDataFrame（[（"Jim",10），（“Julie”，9），（“Abdul”，20），（“Mireille”，19）]，[“name”，“age”] ）
    > df1.show（）
    + -------- + --- +
    |名称     |年龄|
    + -------- + --- +
    |吉姆     | 10 |
    |朱莉     | 9  |
    |阿卜杜勒   | 20 |
    |米雷耶    | 19 |
    + -------- + --- +

    > names_to_delete = [“Julie”，“Abdul”]＃这只是一个Python列表
    > df2 = df1.filter（~df1 [“name”].isin（names_to_delete））＃“NOT IN”
    > df2.show（）
    + -------- + --- +
    |名称     |年龄|
    + -------- + --- +
    |吉姆     | 10 |
    |米雷耶    | 19 |
    + -------- + --- +
```
- 通过以下方式将 `not_rated_df`转换为`my_unrated_movies_df`：
    - 将“ID”列重命名为“movieId”
    - 添加“userId”列，其中包含上面定义的my_user_id变量中包含的值。
- 通过将`my_ratings_model`应用于`my_unrated_movies_df`来创建`predict_ratings_df DataFrame`。

In [88]:
# TODO: Replace <FILL_IN> with the appropriate code

# Create a list of my rated movie IDs
my_rated_movie_ids = [x[1] for x in my_rated_movies]

# Filter out the movies I already rated.
not_rated_df = movies_df.filter(~ movies_df["ID"].isin(my_rated_movie_ids))

# Rename the "ID" column to be "movieId", and add a column with my_user_id as "userId".
my_unrated_movies_df = not_rated_df.selectExpr("ID as movieId").withColumn('userId', F.lit(my_user_id))

# Use my_rating_model to predict ratings for the movies that I did not manually rate.
raw_predicted_ratings_df = my_ratings_model.transform(my_unrated_movies_df)

predicted_ratings_df = raw_predicted_ratings_df.filter(raw_predicted_ratings_df['prediction'] != float('nan'))

In [89]:
raw_predicted_ratings_df.show()

AnalysisException: 'Detected implicit cartesian product for LEFT OUTER join between logical plans\nProject [ID#6 AS movieId#3794]\n+- Filter NOT ID#6 INSET (1287,3105,1270,2355,1035,2398,594,595,914,1193,2687)\n   +- InMemoryRelation [ID#6, title#7], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)\n         +- *(1) FileScan csv [ID#6,title#7] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/E:/Dk/Spark/tutorial/ml-latest-small/movies.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<ID:int,title:string>\nand\nProject [_2#3777 AS features#3780]\n+- Filter (UDF(0) = _1#3776)\n   +- SerializeFromObject [assertnotnull(input[0, scala.Tuple2, true])._1 AS _1#3776, staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(FloatType,false), fromPrimitiveArray, assertnotnull(input[0, scala.Tuple2, true])._2, true, false) AS _2#3777]\n      +- ExternalRDD [obj#3775]\nJoin condition is missing or trivial.\nEither: use the CROSS JOIN syntax to allow cartesian products between these\nrelations, or: enable implicit cartesian products by setting the configuration\nvariable spark.sql.crossJoin.enabled=true;'

In [80]:
print('Detected implicit cartesian product for LEFT OUTER join between logical plans\nProject [ID#6 AS movieId#3574]\n+- Filter NOT ID#6 INSET (1287,3105,1270,2355,1035,2398,594,595,914,1193,2687)\n   +- InMemoryRelation [ID#6, title#7], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)\n         +- *(1) FileScan csv [ID#6,title#7] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/E:/Dk/Spark/tutorial/ml-latest-small/movies.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<ID:int,title:string>\nand\nProject [_2#2883 AS features#2886]\n+- Filter (UDF(0) = _1#2882)\n   +- SerializeFromObject [assertnotnull(input[0, scala.Tuple2, true])._1 AS _1#2882, staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(FloatType,false), fromPrimitiveArray, assertnotnull(input[0, scala.Tuple2, true])._2, true, false) AS _2#2883]\n      +- ExternalRDD [obj#2881]\nJoin condition is missing or trivial.\nEither: use the CROSS JOIN syntax to allow cartesian products between these\nrelations, or: enable implicit cartesian products by setting the configuration\nvariable spark.sql.crossJoin.enabled=true;'
)

Detected implicit cartesian product for LEFT OUTER join between logical plans
Project [ID#6 AS movieId#3574]
+- Filter NOT ID#6 INSET (1287,3105,1270,2355,1035,2398,594,595,914,1193,2687)
   +- InMemoryRelation [ID#6, title#7], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
         +- *(1) FileScan csv [ID#6,title#7] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/E:/Dk/Spark/tutorial/ml-latest-small/movies.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<ID:int,title:string>
and
Project [_2#2883 AS features#2886]
+- Filter (UDF(0) = _1#2882)
   +- SerializeFromObject [assertnotnull(input[0, scala.Tuple2, true])._1 AS _1#2882, staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(FloatType,false), fromPrimitiveArray, assertnotnull(input[0, scala.Tuple2, true])._2, true, false) AS _2#2883]
      +- ExternalRDD [obj#2881]
Join condition is missing or trivial.
Either: use the CROSS JOIN syntax to allow c

### （3f）预测您的评分
我们有预测的评级。 现在我们可以打印出预测评分最高的25部电影。

您应该执行的步骤是：

使用`movie_names_with_avg_ratings_df DataFrame`加入您的`predict_ratings_df DataFrame`，以获取每部电影的评分计数。
按预测评级（最高评级优先）对结果`DataFrame(predict_with_counts_df)`进行排序，并删除计数为75或更少的任何评级。
打印剩下的前25部电影。

In [90]:
predicted_with_counts_df = predicted_ratings_df \
            .join(movie_names_with_avg_ratings_df, \
                  movie_names_with_avg_ratings_df["movieId"]==predicted_ratings_df["movieId"])
predicted_highest_rated_movies_df = predicted_with_counts_df \
            .filter(predicted_with_counts_df["count"]>75) \
            .sort("prediction",ascending=False)

print ('My 25 highest rated movies as predicted (for movies with more than 75 reviews):')
predicted_highest_rated_movies_df.show(25)

My 25 highest rated movies as predicted (for movies with more than 75 reviews):


AnalysisException: 'Detected implicit cartesian product for LEFT OUTER join between logical plans\nProject [ID#6 AS movieId#3794]\n+- Filter (NOT ID#6 INSET (1287,3105,1270,2355,1035,2398,594,595,914,1193,2687) && isnotnull(ID#6))\n   +- InMemoryRelation [ID#6, title#7], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)\n         +- *(1) FileScan csv [ID#6,title#7] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/E:/Dk/Spark/tutorial/ml-latest-small/movies.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<ID:int,title:string>\nand\nProject [_2#3777 AS features#3780]\n+- Filter (UDF(0) = _1#3776)\n   +- SerializeFromObject [assertnotnull(input[0, scala.Tuple2, true])._1 AS _1#3776, staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(FloatType,false), fromPrimitiveArray, assertnotnull(input[0, scala.Tuple2, true])._2, true, false) AS _2#3777]\n      +- ExternalRDD [obj#3775]\nJoin condition is missing or trivial.\nEither: use the CROSS JOIN syntax to allow cartesian products between these\nrelations, or: enable implicit cartesian products by setting the configuration\nvariable spark.sql.crossJoin.enabled=true;'