In [11]:
from pyspark.sql import SparkSession
spark = SparkSession\
        .builder\
        .appName("TestExample").enableHiveSupport()\
        .getOrCreate()

 ## 归一化
将特征属性缩放到一个指定范围，也即将属性缩放到一个指定的最大和最小值之间。
- 这里有两种方法，首先介绍的是MaxAbsScaler，这种方法是计算dataframe中每列的最大值，然后将每列中的每个元素值与每相对应的每列的最大值相除，然后得到每列的元素值区间在[-1, 1]之间的dataframe。这种方法不会使得数据偏移或者中心化（shift/center the data），因此不会破坏数据的稀疏性（thus does not destroy any sparsity）

In [4]:
from pyspark.ml.feature import MaxAbsScaler
from pyspark.ml.linalg import Vectors

In [6]:
dataFrame = spark.createDataFrame([
    (0, Vectors.dense([1.0, 0.1, -8.0]),),
    (1, Vectors.dense([2.0, 1.0, -4.0]),),
    (2, Vectors.dense([4.0, 10.0, 8.0]),)
], ["id", "features"])
dataFrame.show()

+---+--------------+
| id|      features|
+---+--------------+
|  0|[1.0,0.1,-8.0]|
|  1|[2.0,1.0,-4.0]|
|  2|[4.0,10.0,8.0]|
+---+--------------+



In [7]:
scaler = MaxAbsScaler(inputCol="features", outputCol="scaledFeatures")

In [8]:
# Compute summary statistics and generate MaxAbsScalerModel
scalerModel = scaler.fit(dataFrame)

In [10]:
# rescale each feature to range [-1, 1].
scaledData = scalerModel.transform(dataFrame)
scaledData.select("features", "scaledFeatures").show()

+--------------+----------------+
|      features|  scaledFeatures|
+--------------+----------------+
|[1.0,0.1,-8.0]|[0.25,0.01,-1.0]|
|[2.0,1.0,-4.0]|  [0.5,0.1,-0.5]|
|[4.0,10.0,8.0]|   [1.0,1.0,1.0]|
+--------------+----------------+



- 另外一种方法是MinMaxScaler，这种方法是将dataframe的每列数据的每个元素缩放到[0,1]的数值区间内，其每个元素的计算公式是\begin{equation}
  Rescaled(e_i) = \frac{e_i - E_{min}}{E_{max} - E_{min}} * (max - min) + min
\end{equation}
其中$E_{max}$是每列的最大值，$E_{min}$是每列的最小值,并且$max=1，min=0$.当$E_{max} == E_{min}，Rescaled(e_i) = 0.5 * (max + min)$,也即某列的所有元素都相等时，归一化的结果是每个元素的值等于0.5

In [12]:
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.linalg import Vectors

In [25]:
dataFrame = spark.createDataFrame([
    (0, Vectors.dense([1.0, 0.1, -1.0, 3.0]),),
    (1, Vectors.dense([2.0, 1.1, 1.0, 3.0]),),
    (2, Vectors.dense([3.0, 10.1, 3.0, 3.0]),)
], ["id", "features"])
dataFrame.show()

+---+------------------+
| id|          features|
+---+------------------+
|  0|[1.0,0.1,-1.0,3.0]|
|  1| [2.0,1.1,1.0,3.0]|
|  2|[3.0,10.1,3.0,3.0]|
+---+------------------+



In [26]:
scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")

In [27]:
# Compute summary statistics and generate MinMaxScalerModel
scalerModel = scaler.fit(dataFrame)

In [28]:
# rescale each feature to range [min, max].
scaledData = scalerModel.transform(dataFrame)
print("Features scaled to range: [%f, %f]" % (scaler.getMin(), scaler.getMax()))
scaledData.select("features", "scaledFeatures").show()

Features scaled to range: [0.000000, 1.000000]
+------------------+-----------------+
|          features|   scaledFeatures|
+------------------+-----------------+
|[1.0,0.1,-1.0,3.0]|[0.0,0.0,0.0,0.5]|
| [2.0,1.1,1.0,3.0]|[0.5,0.1,0.5,0.5]|
|[3.0,10.1,3.0,3.0]|[1.0,1.0,1.0,0.5]|
+------------------+-----------------+



## 标准化
将属性特征标准化，使得每列的每个元素都有相同的标准差and/or零均值。具体是将数据按属性（按列进行）减去其均值，并除以其方差。得到的结果是，对于每个属性/每列来说所有数据都聚集在0附近，标准差为1。使用z-score方法规范化$(x-mean(x))/std(x)$,spark采用StandardScaler方法，其两个参数withStd: 默认是True.withMean: 默认是False.如果某列特征属性标准偏差是0，则该列元素的值则会返回0

In [30]:
from pyspark.ml.feature import StandardScaler

In [32]:
dataFrame = spark.createDataFrame([
    (0, Vectors.dense([1.0, 0.1, -1.0, 0.0]),),
    (1, Vectors.dense([2.0, 1.1, 1.0, 0.0]),),
    (2, Vectors.dense([3.0, 10.1, 3.0, 0.0]),)
], ["id", "features"])

In [39]:
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures",
                        withStd=True, withMean=True)

In [40]:
# Compute summary statistics by fitting the StandardScaler
scalerModel = scaler.fit(dataFrame)

In [41]:
# Normalize each feature to have unit standard deviation.
scaledData = scalerModel.transform(dataFrame)
scaledData.show(truncate=False)

+---+------------------+-----------------------------------+
|id |features          |scaledFeatures                     |
+---+------------------+-----------------------------------+
|0  |[1.0,0.1,-1.0,0.0]|[-1.0,-0.6657502859356826,-1.0,0.0]|
|1  |[2.0,1.1,1.0,0.0] |[0.0,-0.4841820261350419,0.0,0.0]  |
|2  |[3.0,10.1,3.0,0.0]|[1.0,1.1499323120707245,1.0,0.0]   |
+---+------------------+-----------------------------------+



## 正则化
主要思想是对每个样本(每行数据)计算其p-范数（这里的p是大于等于1的正整数），然后对该样本中每个元素除以该范数，这样处理的结果是使得每个处理后样本的p-范数（l1-norm,l2-norm等等）等于1。
![](https://wikimedia.org/api/rest_v1/media/math/render/svg/9f2d83bfa397bdf021046004b9a365079cab6a22)
当p趋近于最大值的时候，即![](https://wikimedia.org/api/rest_v1/media/math/render/svg/b0e215b4456f5edc7e31f009440ccc2cc3f61ad4)
此时范数的值是该样本所有元素中的最大值![](https://wikimedia.org/api/rest_v1/media/math/render/svg/8c4a4408043912b9b66fd58977a30db576bdf735)

In [42]:
from pyspark.ml.feature import Normalizer
from pyspark.ml.linalg import Vectors

In [43]:
dataFrame = spark.createDataFrame([
    (0, Vectors.dense([1.0, 0.5, -1.0]),),
    (1, Vectors.dense([2.0, 1.0, 1.0]),),
    (2, Vectors.dense([4.0, 10.0, 2.0]),)
], ["id", "features"])

In [44]:
# Normalize each Vector using $L^1$ norm.
normalizer = Normalizer(inputCol="features", outputCol="normFeatures", p=1.0)
l1NormData = normalizer.transform(dataFrame)
print("Normalized using L^1 norm")
l1NormData.show()

Normalized using L^1 norm
+---+--------------+------------------+
| id|      features|      normFeatures|
+---+--------------+------------------+
|  0|[1.0,0.5,-1.0]|    [0.4,0.2,-0.4]|
|  1| [2.0,1.0,1.0]|   [0.5,0.25,0.25]|
|  2|[4.0,10.0,2.0]|[0.25,0.625,0.125]|
+---+--------------+------------------+



In [45]:
# Normalize each Vector using $L^\infty$ norm.
lInfNormData = normalizer.transform(dataFrame, {normalizer.p: float("inf")})
print("Normalized using L^inf norm")
lInfNormData.show()

Normalized using L^inf norm
+---+--------------+--------------+
| id|      features|  normFeatures|
+---+--------------+--------------+
|  0|[1.0,0.5,-1.0]|[1.0,0.5,-1.0]|
|  1| [2.0,1.0,1.0]| [1.0,0.5,0.5]|
|  2|[4.0,10.0,2.0]| [0.4,1.0,0.2]|
+---+--------------+--------------+

