## 数据采集：
- 以下这个单元格的功能是将您上传的csv文件读入到pyspark程序里，并将其的格式转换为pyspark dataframe。
- 您可以通过改变spark.read.csv()中的路径来读入您本人上传的数据集。
- data.show()是将dataframe的内容展示出来的方程。展示出来的数据集默认是前20行，您可以通过在show（）中填写数字，来改变展示的行数。例如show（1），会展示数据集中的第一行。

In [1]:
data=spark.read.csv('hdfs://default/user/erin/ratings_demo.csv',header=True)
name=spark.read.csv('hdfs://default/user/erin/movies_metadata.csv', header=True)
name=name.select("id","title")
data.show(1)
name.show(1)

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
5,,pyspark,idle,Link,,erin,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|    110|   1.0|1425941529|
+------+-------+------+----------+
only showing top 1 row

+---+---------+
| id|    title|
+---+---------+
|862|Toy Story|
+---+---------+
only showing top 1 row

In [2]:
for col in data.columns:
    data = data.withColumnRenamed(col, col.lower())
for col in name.columns:
    name = name.withColumnRenamed(col, col.lower())
print('success')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

success

In [3]:
data.show(10)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+-------+------+----------+
|userid|movieid|rating| timestamp|
+------+-------+------+----------+
|     1|    110|   1.0|1425941529|
|     1|   2959|   4.0|1425941601|
|     1|  81834|   5.0|1425942133|
|     1|  98809|   0.5|1425942640|
|     1| 112552|   5.0|1425941336|
|     2|     58|   3.0| 867039325|
|     4|   1221|   5.0|1042667903|
|     4|   1732|   3.0|1042674761|
|     4|   2541|   3.0|1042668576|
|     4|   3160|   4.0|1042672335|
+------+-------+------+----------+
only showing top 10 rows

In [4]:
data = data.join(name, data.movieid == name.id)
data=data.select("userid","movieid","rating","timestamp","title")
data.show(1)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+-------+------+----------+-----------------+
|userid|movieid|rating| timestamp|            title|
+------+-------+------+----------+-----------------+
|     1|    110|   1.0|1425941529|Three Colors: Red|
+------+-------+------+----------+-----------------+
only showing top 1 row

## 数据处理：1.处理重复数据
- 整行去重：data.distinct()这个方程可以对于完全相同的行进行去重。
- 对于某一列或者多列相同情况，进行去重： 您可以在dropDuplicates（）这个方程中，在队列中，填入您需要筛选的相同情况。例如以下单元格筛选了两个条件的情况，在产品，用户都一样的情况下，删除掉重复行（["User", "Movie"]）。
- data.count()方程会打印出这个列表的行数，以便于您了解去重之后的数据集行数。

In [5]:
data=data.distinct()
data = data.dropDuplicates(subset=[c for c in data.columns if c in ["userid","movieid"]])
data.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

1134791

## 数据处理：2.处理空缺数据
- 运用agg方程算出每列空缺的数据比
- 若某列的空缺比重过大，可以将该列从数据中删除。因为data的空缺比均很小(为0.0，表示没有空缺)，所以使用df_miss来展示如何进行某列的删除，但后续处理中并不使用df_miss。这里删除的列为“Time”，您可以将“Time”换成您需要删除的列名。
- 删除掉结果项空缺的数据（data.na.drop(subset=['Score'])），这个数据分析主要是对于“Score”项，当Score缺失，该行对于后续分析则没有帮助。
- 删除掉空缺项过多的行。您可以通过利用thresh参数，为每一行缺失数据的数量指定一个阈值，从而限定要删除的行。以下我们选择的阈值为2，则删除掉空缺项等于或大约2的行。您可以改变thresh对应的值来改变阈值。

In [6]:
import pyspark.sql.functions as f
data.agg(*[(1-(f.count(c) /f.count('*'))).alias(c+'_missing') for c in data.columns]).show()
data=data.na.drop(subset=['rating'])
data=data.dropna(thresh=2)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------+---------------+--------------+-----------------+--------------------+
|userid_missing|movieid_missing|rating_missing|timestamp_missing|       title_missing|
+--------------+---------------+--------------+-----------------+--------------------+
|           0.0|            0.0|           0.0|              0.0|0.009246636605330871|
+--------------+---------------+--------------+-----------------+--------------------+

## 数据处理：3.对列进行重命名
- 有些数据集自带的列名代表的含义不太清晰，或者拼写起来比较复杂，您可以对列名进行自定义
- 您可以使用withColumnRenamed（）这个方程，在括号中填入原列名（比如‘User’），在下一位填入新列名（比如‘user’），然后原列名就会被新列名替换。

In [7]:
data=data.withColumnRenamed('userid', 'user')
data=data.withColumnRenamed('movieid', 'movie')
data

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[user: string, movie: string, rating: string, timestamp: string, title: string]

## 数据处理：4.将某列数据的类别进行转化
- 当您直接用spark的read_csv导入数据时，生成的dataframe往往会默认每列的类别为string，然而string类别会让您接下来的数据分析以及训练的步骤报错。
- 以下这个单元格将四列的数据种类进行了改变。您可以通过在withColumn这个方程来进行改变，在括号中填入您想要改变的列名，以及想改变成的种类形式。形式：（“列名”,dataframe["列名"].cast('type')）
- 以下为一些您可能需要使用的类别：</br>binary;boolean;int;string;float;timestamp;date;float</br>您可以根据数据种类将以上类别替换到'type'里。

In [8]:
data= data.withColumn("user",data['user'].cast('int'))
data= data.withColumn("rating",data['rating'].cast('int'))
data= data.withColumn("movie",data["movie"].cast('int'))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## 数据处理：5.将包含不符合打分要求的结果项的行删除
- 首先将数据集根据rating项的不同值进行分类，并把每类的数量列出，再筛选出合理的rating值，并删掉rating值在合理范围外的行。
- 您可以通过修改groupBy（)中的列名来选择依照分类的结果项。
- 对于筛选结果项的范围，您可以将合理的结果值填入filter（）方程中，例如（data.rating==0），并且将这些等式用‘|’隔开，表示并列。

In [9]:
from pyspark.sql.functions import col
data.groupBy("rating").count().orderBy(col("count").desc()).show(truncate=False)
data=data.filter(( data.rating== 0)|( data.rating== 1)|(data.rating == 2) | (data.rating == 3) | (data.rating == 4)|(data.rating == 5))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+------+
|rating|count |
+------+------+
|4     |391970|
|3     |366406|
|5     |178675|
|2     |128131|
|1     |54642 |
|0     |14967 |
+------+------+

In [10]:
data.write.format("hive").mode("overwrite").saveAsTable("user_erin.rating")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
spark.stop()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…