## 数据采集：
- 以下这个单元格的功能是将您上传的csv文件读入到pyspark程序里，并将其的格式转换为pyspark dataframe。
- 您可以通过改变spark.read.csv()中的路径来读入您本人上传的数据集。
- data.show()是将dataframe的内容展示出来的方程。展示出来的数据集默认是前20行，您可以通过在show（）中填写数字，来改变展示的行数。例如show（5），会展示数据集中的前5行。

In [2]:
data=spark.sql("select * from user_erin.ratingsdemo1052")
data=data.filter(data.text!="Text")
data.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---+----------+--------------+--------------------+--------------------+----------------------+-----+----------+--------------------+--------------------+
| id| productid|        userid|         profilename|helpfulnessnumerator|helpfulnessdenominator|score|      time|             summary|                text|
+---+----------+--------------+--------------------+--------------------+----------------------+-----+----------+--------------------+--------------------+
|  1|B001E4KFG0|A3SGXH7AUHU8GW|          delmartian|                   1|                     1|    5|1303862400|Good Quality Dog ...|I have bought sev...|
|  2|B00813GRG4|A1D87F6ZCVE5NK|              dll pa|                   0|                     0|    1|1346976000|   Not as Advertised|"Product arrived ...|
|  5|B006K2ZZ7K|A1UQRSCLF8GW1T|"Michael D. Bigha...|                   0|                     0|    5|1350777600|         Great taffy|Great taffy at a ...|
|  8|B006K2ZZ7K|A3JRGQVEQN31IQ|  Pamela G. Williams|            

## 数据处理：1.处理重复数据
- 整行去重：data.distinct()这个方程可以对于完全相同的行进行去重。
- 对于某一列或者多列相同情况，进行去重： 您可以在dropDuplicates（）这个方程中，在队列中，填入您需要筛选的相同情况。例如以下单元格筛选了三个条件的情况，在产品，用户，评论时间都一样的情况下，删除掉重复行（["product", "user","Time"]）。
- dataframe.count()方程会打印出这个列表的行数，以便于您了解去重之后的数据集行数。

In [3]:
data=data.distinct()
data = data.dropDuplicates(subset=[c for c in data.columns if c in ["productid", "userid","time"]])
data.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

170405

## 数据处理：2.处理空缺数据
- 运用agg方程算出每列空缺的数据比
- 若某列的空缺比重过大，可以将该列从数据中删除。因为data的空缺比均很小，所以使用df_miss来展示如何进行某列的删除，但后续处理中并不使用df_miss。这里删除的列为“Time”，您可以将“Time”换成您需要删除的列名。
- 删除掉结果项空缺的数据（data.na.drop(subset=['Score'])），这个数据分析主要是对于“Score”项，当Score缺失，该行对于后续分析则没有帮助。
- 删除掉空缺项过多的行。您可以通过利用thresh参数，为每一行缺失数据的数量指定一个阈值，从而限定要删除的行。以下我们选择的阈值为3，则删除掉空缺项等于或大约3的行。您可以改变thresh对应的值来改变阈值。

In [4]:
import pyspark.sql.functions as f
data.agg(*[(1-(f.count(c) /f.count('*'))).alias(c+'_missing') for c in data.columns]).show()
data=data.na.drop(subset=['score'])
data=data.dropna(thresh=3)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+-----------------+--------------+-------------------+----------------------------+------------------------------+-------------+------------+---------------+------------+
|id_missing|productid_missing|userid_missing|profilename_missing|helpfulnessnumerator_missing|helpfulnessdenominator_missing|score_missing|time_missing|summary_missing|text_missing|
+----------+-----------------+--------------+-------------------+----------------------------+------------------------------+-------------+------------+---------------+------------+
|       0.0|              0.0|           0.0|                0.0|                         0.0|                           0.0|          0.0|         0.0|            0.0|         0.0|
+----------+-----------------+--------------+-------------------+----------------------------+------------------------------+-------------+------------+---------------+------------+

## 数据处理：3.将某列数据的类别进行转化
- 当您直接用spark的read_csv导入数据时，生成的dataframe往往会默认每列的类别为string，然而string类别会让您接下来的数据分析以及训练的步骤报错。
- 以下这个单元格将四列的数据种类进行了改变。您可以通过在withColumn这个方程来进行改变，在括号中填入您想要改变的列名，以及想改变成的种类形式。形式：（“列名”,dataframe["列名"].cast('type')）
- 以下为一些您可能需要使用的类别：binary;boolean;int;string;float;timestamp;date;float。您可以根据数据种类将以上类别替换到'type'里。

In [5]:
data = data.withColumn("helpfulnessnumerator",data['helpfulnessnumerator'].cast('int'))
data = data.withColumn("helpfulnessdenominator",data['helpfulnessdenominator'].cast('int'))
data = data.withColumn("score",data['score'].cast('int'))
data = data.withColumn("time",data['time'].cast('int'))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## 数据处理：4.把类别型数据转化为数字型数据 

- 将数据库中的种类值转化为数字，例如“south”，“north”，“east”，“west”，转换为“1”，“2”，“3”，“4”，以方便后续训练模型。
- 以下这个单元格的主要功能是将‘productId’和‘UserId’的值由一串字符转换为数字。
- StringIndexer这个方程的使用方法为，在inputCol后填入您需要转化的列名。然后用生成的模型（product_indexer）来transform原数据。

In [6]:
from pyspark.ml.feature import StringIndexer
product_indexer = StringIndexer(inputCol='productid', outputCol='product').fit(data)
data= product_indexer.transform(data)
user_indexer = StringIndexer(inputCol='userid', outputCol='user').fit(data)
data= user_indexer.transform(data)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## 数据处理：5.将包含不符合打分要求的结果项的行删除
- 首先将数据集根据Score项的不同值进行分类，并把每类的数量列出，再筛选出合理的Score值，并删掉Score值在合理范围外的行。
- 您可以通过修改groupBy（)中的列名来选择依照分类的结果项。
- 对于筛选结果项的范围，您可以将合理的结果值填入filter（）方程中，例如（data.Score==0），并且将这些等式用‘|’隔开，表示并列。

In [7]:
from pyspark.sql.functions import col
data.groupBy("score").count().orderBy(col("count").desc()).show(truncate=False)
data=data.filter((data.score == 0)|(data.score == 1)|(data.score == 2) | (data.score == 3) | (data.score == 4)|(data.score == 5))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+------+
|score|count |
+-----+------+
|5    |108269|
|4    |24044 |
|1    |15822 |
|3    |12748 |
|2    |9037  |
|0    |327   |
|null |78    |
|6    |14    |
|10   |6     |
|9    |5     |
|8    |5     |
|7    |5     |
|14   |4     |
|47   |4     |
|15   |3     |
|21   |3     |
|16   |3     |
|19   |3     |
|69   |2     |
|24   |2     |
+-----+------+
only showing top 20 rows

## 数据处理：6.将文字型数据中的数字删除转化为纯文字数据
- 您可以在withColumn（）方程中填入经过处理后的新列名（“only_str”），在regexp_replace中填入您需要处理的文字项数据列名（'Summary'）。

In [8]:
from pyspark.sql.functions import regexp_replace
data = data.withColumn("only_str",regexp_replace(col('summary'), '\d+', ''))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## 数据处理： 7.将纯文字数据用转化成主要词队列
- 运用RegexTokenizer将文字列转化成小写单词地队列，再运用StopWordsRemover将单词队列里的停用字（介词，代词等没有意义的词汇）删除

In [9]:
from pyspark.ml.feature import  RegexTokenizer,StopWordsRemover
regex_tokenizer = RegexTokenizer(inputCol="only_str", outputCol="nwords", pattern="\\W")
data = regex_tokenizer.transform(data)
remover= StopWordsRemover(inputCol="nwords", outputCol="filtered")
data = remover.transform(data)
data.select("nwords","filtered").show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+--------------------+
|              nwords|            filtered|
+--------------------+--------------------+
|[love, the, book,...|[love, book, miss...|
|     [canine, crack]|     [canine, crack]|
|[lots, of, crispy...|[lots, crispy, sw...|
|[i, love, this, m...|       [love, movie]|
|[flea, s, are, st...|       [flea, stuck]|
|   [needs, improved]|   [needs, improved]|
|[iexcl, oye, trem...|[iexcl, oye, trem...|
|[read, the, fine,...| [read, fine, print]|
|[as, good, as, we...|   [good, barcelona]|
|[doesn, t, taste,...|[doesn, taste, li...|
|[poor, product, q...|[poor, product, q...|
|[pleasantly, surp...|[pleasantly, surp...|
|[a, disappointed,...|[disappointed, bo...|
|[excellent, produ...|[excellent, produ...|
|[really, hot, but...|[really, hot, rea...|
|[absolutely, superb]|[absolutely, superb]|
|[not, the, right,...|[right, bottle, s...|
|[omaha, cheesecak...|[omaha, cheesecak...|
|[slightly, remini...|[slightly, remini...|
|    [mexican, mocha]|    [mexic

In [10]:
data.write.format("hive").mode("overwrite").saveAsTable("user_erin.review")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
spark.stop()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…