# 数据预处理

数据分析/数据挖掘的好与坏，数据预处理起到了非常关键的作用。
数据预处理一共有如下几个部分：

1. 检查重复值、缺失值、检查离群值（异常数据）

## 1. 检查重复值、缺失值、检查离群值（异常数据）

In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("dataProprecessing").getOrCreate()

In [4]:
df = spark.createDataFrame([
        (1, 144.5, 5.9, 33, 'M'),
        (2, 167.2, 5.4, 45, 'M'),
        (3, 124.1, 5.2, 23, 'F'),
        (4, 144.5, 5.9, 33, 'M'),
        (5, 133.2, 5.7, 54, 'F'),
        (3, 124.1, 5.2, 23, 'F'),
        (5, 129.2, 5.3, 42, 'M'),
        ], ['id', 'weight', 'height', 'age', 'gender'])

### 1.1 检查重复值

先去完全相同记录，再去除id之外的重复记录，最后检查是否有id重复，如有则对数据集重新搞个id

第一步，去除完全相同的数据记录

方法：查看完整数据集数量和运行.distinct()方法后的数据集数量。

In [5]:
print('Count of rows: {0}'.format(df.count()))
print('Count of distinct rows: {0}'.format(df.distinct().count()))

Count of rows: 7
Count of distinct rows: 6


这两个数字不同，说明有完全相同的数据，需要使用下面的方法去重。

In [6]:
df = df.dropDuplicates()
df.show()

+---+------+------+---+------+
| id|weight|height|age|gender|
+---+------+------+---+------+
|  4| 144.5|   5.9| 33|     M|
|  1| 144.5|   5.9| 33|     M|
|  5| 129.2|   5.3| 42|     M|
|  5| 133.2|   5.7| 54|     F|
|  2| 167.2|   5.4| 45|     M|
|  3| 124.1|   5.2| 23|     F|
+---+------+------+---+------+



第二步，去除除了ID之外完全相同的数据记录

方法：只对id之外的数据使用.distinct(...)方法, 重复上面的方法。

In [7]:
print('Count of ids: {0}'.format(df.count()))
print('Count of distinct ids: {0}'.format(df.select(
    [c for c in df.columns if c != 'id']
).distinct().count()))

Count of ids: 6
Count of distinct ids: 5


In [10]:
# 两个数字不同，说明除了id之外，有重复数据
# .dropDuplicates(), 使用subset参数来指定要处理的列
df = df.dropDuplicates(subset=[c for c in df.columns if c != 'id'])
df.show()

+---+------+------+---+------+
| id|weight|height|age|gender|
+---+------+------+---+------+
|  5| 133.2|   5.7| 54|     F|
|  4| 144.5|   5.9| 33|     M|
|  2| 167.2|   5.4| 45|     M|
|  3| 124.1|   5.2| 23|     F|
|  5| 129.2|   5.3| 42|     M|
+---+------+------+---+------+



第三步，检查是否有重复的id

方法：使用.agg(...)对列执行指定的函数方法。

In [11]:
import pyspark.sql.functions as fun

df.agg(
    # .alias('name'), 允许给返回列取一个别名
    fun.count('id').alias('count'),
    fun.countDistinct('id').alias('distinct')
).show()

+-----+--------+
|count|distinct|
+-----+--------+
|    5|       4|
+-----+--------+



这两个数字不同，说明有重复的id，但是这个时候已经没有除了id列之外的数据重复，所以，我们认为id的重复是系统造成的。

方法：重新给每行记录一个唯一的id

In [12]:
df.withColumn('new_id', fun.monotonically_increasing_id()).show()

+---+------+------+---+------+-------------+
| id|weight|height|age|gender|       new_id|
+---+------+------+---+------+-------------+
|  5| 133.2|   5.7| 54|     F|  25769803776|
|  4| 144.5|   5.9| 33|     M| 171798691840|
|  2| 167.2|   5.4| 45|     M| 592705486848|
|  3| 124.1|   5.2| 23|     F|1236950581248|
|  5| 129.2|   5.3| 42|     M|1365799600128|
+---+------+------+---+------+-------------+



### 1.2检查缺失值

缺失值的处理就两种方法：移除和填充

(1)如果数据足够多的话，处理缺失值最简单的方法就是移除。

最粗略的指标，如果移除的数据超过了原始数据集的50%的话，就需要慎重。

(2)另一种方法：填充。
   如果是布尔变量，可以添加第三类别--Missing，将其转换为一个分类变量；

   如果数据是分类变量的，可以简单地扩展界别的数量，同时添加Missing类别；

   如果数据是数值变量，可以通过填充平均数、中位数或者四分位数等，可以根据数据分布的形状而定。


In [14]:
df_miss = spark.createDataFrame([
        (1, 143.5, 5.6, 28,   'M',  100000),
        (2, 167.2, 5.4, 45,   'M',  None),
        (3, None , 5.2, None, None, None),
        (4, 144.5, 5.9, 33,   'M',  None),
        (5, 133.2, 5.7, 54,   'F',  None),
        (6, 124.1, 5.2, None, 'F',  None),
        (7, 129.2, 5.3, 42,   'M',  76000),
    ], ['id', 'weight', 'height', 'age', 'gender', 'income'])

第一步，观测每个样本（每行）以及每列的缺失值数量情况

类比于spark的DataFrame的.agg(...)方法作用于列学习。作用于行的需要RDD的.map(lambda...)

我们要综合行和列的缺失值情况来决定是移除还是填充缺失值。

In [15]:
# 统计每行缺失值情况
df_miss.rdd.map(
    lambda row: (row['id'], sum([c == None for c in row]))
).collect()

[(1, 0), (2, 1), (3, 4), (4, 1), (5, 1), (6, 2), (7, 0)]

样本一共有6列，而id为3的样本就缺失了4列，查看id=3的样本，

然后再根据每列的缺失情况（目的是看看还有没有填充解决的可能性），

再决定是采用移除还是填充的方法。

发现，id=3，缺失的是weight、age、gender和income。

In [16]:
df_miss.where('id == 3').show()

+---+------+------+----+------+------+
| id|weight|height| age|gender|income|
+---+------+------+----+------+------+
|  3|  null|   5.2|null|  null|  null|
+---+------+------+----+------+------+



In [17]:
# 统计每列缺失值情况
df_miss.agg(*[
    (1 - (fun.count(c) / fun.count('*'))).alias(c + '_missing')
    for c in df_miss.columns]
).show()

+----------+------------------+--------------+------------------+------------------+------------------+
|id_missing|    weight_missing|height_missing|       age_missing|    gender_missing|    income_missing|
+----------+------------------+--------------+------------------+------------------+------------------+
|       0.0|0.1428571428571429|           0.0|0.2857142857142857|0.1428571428571429|0.7142857142857143|
+----------+------------------+--------------+------------------+------------------+------------------+



1. income 列有72%是缺失的，远远大于50%，故要移除income列
2. 发现weight、age和gender缺失情况不严重，可以通过填充来解决，id=3样本的缺失值。

In [19]:
# 移除income列
df_miss_no_income = df_miss.select([
    c for c in df_miss.columns if c != 'income'
])

# 处理完列之后，如果要移除某些样本（某些行记录）的话，可以使用.dropna(thresh=3)的方法。
# 即指定每一行样本缺失值的阈值，不超过阈值的行可以保留，否则移除。

# 处理完列之后，如果决定要填充每行缺失值的话，可以使用.fillna(...)的方法

# missing 填充 样本的 gender
# 样本的其他项用均值填充

# 计算除了gender列之外的数值型列的均值，数据类型转换为字典
# 先是在Spark的DataFrame中计算每一列(除了gender)的均值，
# 返回为数据类型是Spark的DataFrame，有列名和均值，gender只有列名
# 再将返回值转换为Pandas的DataFrame数据类型
# 最后将Pandas的DataFrame数据类型转换为字典
means = df_miss_no_income.agg(
    *[fun.mean(c).alias(c) 
      for c in df_miss_no_income.columns 
      if c != 'gender']
).toPandas().to_dict('records')[0]    #-> toPandas()本质和.collect()一样，但是除非有成千上万个列，否则toPandas不会有问题。
# 定义gender的缺失值
means['gender'] = 'missing'
# 用字典means来填充缺失值
df_miss_no_income.fillna(means).show()

+---+------------------+------+---+-------+
| id|            weight|height|age| gender|
+---+------------------+------+---+-------+
|  1|             143.5|   5.6| 28|      M|
|  2|             167.2|   5.4| 45|      M|
|  3|140.28333333333333|   5.2| 40|missing|
|  4|             144.5|   5.9| 33|      M|
|  5|             133.2|   5.7| 54|      F|
|  6|             124.1|   5.2| 40|      F|
|  7|             129.2|   5.3| 42|      M|
+---+------------------+------+---+-------+



In [21]:
means

{'id': 4.0,
 'weight': 140.28333333333333,
 'height': 5.471428571428571,
 'age': 40.4,
 'gender': 'missing'}

注意：RDD中的.collect()和DataFrame中的.toPandas()只可以在数据量小的时候使用。

### 1.3 检查离群值
这里的检查离群值是针对非异常分析的情况，如果是异常分析，那么离群值才是最有价值的分析对象。

这里的离群值指的是那些与样本其余部分的分布显著偏离的数据样本。显著性的定义各不相同，但是最普遍的形式是：

如果数据值落在 Q1-1.5IQR 和 Q3 + 1.5IQR范围内，可以认为是没有离群值。否则认为是有离群值。

其中， IQR定义为上分位和下分位之差，即75%（Q3）和25%(Q1)的差。

In [40]:
df_outliers = spark.createDataFrame([
        (1, 143.5, 5.3, 28),
        (2, 154.2, 5.5, 45),
        (3, 342.3, 5.1, 99),
        (4, 144.5, 5.5, 33),
        (5, 133.2, 5.4, 54),
        (6, 124.1, 5.1, 21),
        (7, 129.2, 5.3, 42),
    ], ['id', 'weight', 'height', 'age'])

使用.approxQuantile(...)方法来实现上述离群值的定义，该方法一共有三个参数，

第一个参数指定参数的列名，第二个参数指定0和1之间的一个数，或者是一个列表（本例子就是这样）

第三参数指定每个度量可接受的错误程度，如果设为0，则表示只接受准确值，在这种估计计算中，这样不可取。

In [29]:
cols = ['weight', 'height', 'age']
bounds = {}

for col in cols:
    quantiles = df_outliers.approxQuantile(
        col, [0.25, 0.75], 0.05
    )

    IQR = quantiles[1] - quantiles[0]
    bounds[col] = [
        quantiles[0] - 1.5 * IQR,
        quantiles[1] + 1.5 * IQR
    ]

In [31]:
bounds

{'weight': [91.69999999999999, 191.7],
 'height': [4.499999999999999, 6.1000000000000005],
 'age': [-11.0, 93.0]}

In [32]:
# 使用bounds来标记离群值
outliers = df_outliers.select(*['id'] + [
    (
        (df_outliers[c] < bounds[c][0]) |
        (df_outliers[c] > bounds[c][1])
    ).alias(c+'_o') for c in cols
])

outliers.show()

+---+--------+--------+-----+
| id|weight_o|height_o|age_o|
+---+--------+--------+-----+
|  1|   false|   false|false|
|  2|   false|   false|false|
|  3|    true|   false| true|
|  4|   false|   false|false|
|  5|   false|   false|false|
|  6|   false|   false|false|
|  7|   false|   false|false|
+---+--------+--------+-----+



id = 3的样本的weight和agg是离群值。除了过滤掉异常值之外，我们还可以查看这两个异常值。

In [41]:
# 过滤掉异常值:DataFrame的API查询方法
df_outliers = df_outliers.join(outliers, on='id')
df_no_outliers = df_outliers.select("id", "weight", "height", "age").filter("weight_o = false")
df_no_outliers = df_no_outliers.select("id", "weight", "height", "age").filter("height_o = false")
df_no_outliers = df_no_outliers.select("id", "weight", "height", "age").filter("age_o = false")
df_no_outliers.show()

+---+------+------+---+
| id|weight|height|age|
+---+------+------+---+
|  7| 129.2|   5.3| 42|
|  6| 124.1|   5.1| 21|
|  5| 133.2|   5.4| 54|
|  1| 143.5|   5.3| 28|
|  2| 154.2|   5.5| 45|
|  4| 144.5|   5.5| 33|
+---+------+------+---+



In [42]:
# 查看异常数据
df_outliers.filter("weight_o").select("id", "weight").show()
df_outliers.filter("age_o").select("id", "age").show()

+---+------+
| id|weight|
+---+------+
|  3| 342.3|
+---+------+

+---+---+
| id|age|
+---+---+
|  3| 99|
+---+---+

