### 特征转换

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession\
        .builder\
        .appName("TransformerExample")\
        .getOrCreate()

#### 1. MinMaxScaler

MinMaxScaler将向量行数字调整到一个特定的范围（例如：[0,1]）

In [12]:
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.linalg import Vectors

In [14]:
dataFrame = spark.createDataFrame([
    (0, Vectors.dense([1.0, 0.1, -1.0]),),
    (1, Vectors.dense([2.0, 1.1, 1.0]),),
    (2, Vectors.dense([3.0, 10.1, 3.0]),)
], ["id", "features"])

In [15]:
scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")

In [16]:
scalerModel = scaler.fit(dataFrame)

In [17]:
scaledData = scalerModel.transform(dataFrame)

In [18]:
print("Features scaled to range: [%f, %f]" % (scaler.getMin(), scaler.getMax()))

Features scaled to range: [0.000000, 1.000000]


In [19]:
scaledData.select("features", "scaledFeatures").show()

+--------------+--------------+
|      features|scaledFeatures|
+--------------+--------------+
|[1.0,0.1,-1.0]|     (3,[],[])|
| [2.0,1.1,1.0]| [0.5,0.1,0.5]|
|[3.0,10.1,3.0]| [1.0,1.0,1.0]|
+--------------+--------------+



#### 2. Normalizer

In [2]:
from pyspark.ml.feature import Normalizer
from pyspark.ml.linalg import Vectors

In [3]:
dataFrame = spark.createDataFrame([
    (0, Vectors.dense([1.0, 0.5, -1.0]),),
    (1, Vectors.dense([2.0, 1.0, 1.0]),),
    (2, Vectors.dense([4.0, 10.0, 2.0]),)
], ["id", "features"])

In [7]:
normalizer = Normalizer(inputCol="features", outputCol="normFeatures", p=1.0)
l1NormData = normalizer.transform(dataFrame)

print("Normalized using L1 norm")
l1NormData.show()

Normalized using L1 norm
+---+--------------+------------------+
| id|      features|      normFeatures|
+---+--------------+------------------+
|  0|[1.0,0.5,-1.0]|    [0.4,0.2,-0.4]|
|  1| [2.0,1.0,1.0]|   [0.5,0.25,0.25]|
|  2|[4.0,10.0,2.0]|[0.25,0.625,0.125]|
+---+--------------+------------------+



In [8]:
l1InfNormData = normalizer.transform(dataFrame, {normalizer.p: float("inf")})

print("Normalized using Linf norm")
l1InfNormData.show()

Normalized using Linf norm
+---+--------------+--------------+
| id|      features|  normFeatures|
+---+--------------+--------------+
|  0|[1.0,0.5,-1.0]|[1.0,0.5,-1.0]|
|  1| [2.0,1.0,1.0]| [1.0,0.5,0.5]|
|  2|[4.0,10.0,2.0]| [0.4,1.0,0.2]|
+---+--------------+--------------+



#### 3. OneHotEncoder

In [9]:
from pyspark.ml.feature import OneHotEncoder

In [10]:
df = spark.createDataFrame([
    (0.0, 1.0),
    (1.0, 0.0),
    (2.0, 1.0),
    (0.0, 2.0),
    (0.0, 1.0),
    (2.0, 0.0)
], ["categoryIndex1", "categoryIndex2"])

In [11]:
encoder = OneHotEncoder(inputCols=["categoryIndex1", "categoryIndex2"],
                        outputCols=["categoryVec1", "categoryVec2"])

In [12]:
model = encoder.fit(df)

In [13]:
encoded = model.transform(df)

In [14]:
encoded.show()

+--------------+--------------+-------------+-------------+
|categoryIndex1|categoryIndex2| categoryVec1| categoryVec2|
+--------------+--------------+-------------+-------------+
|           0.0|           1.0|(2,[0],[1.0])|(2,[1],[1.0])|
|           1.0|           0.0|(2,[1],[1.0])|(2,[0],[1.0])|
|           2.0|           1.0|    (2,[],[])|(2,[1],[1.0])|
|           0.0|           2.0|(2,[0],[1.0])|    (2,[],[])|
|           0.0|           1.0|(2,[0],[1.0])|(2,[1],[1.0])|
|           2.0|           0.0|    (2,[],[])|(2,[0],[1.0])|
+--------------+--------------+-------------+-------------+



#### 4. PCA

In [16]:
from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors

In [21]:
data = [(Vectors.sparse(5, [(1, 1.0), (3, 7.0)]),),
        (Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),),
        (Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),)]

In [22]:
df = spark.createDataFrame(data, ["features"])

In [24]:
pca = PCA(k=3, inputCol="features", outputCol="pcaFeatures")
model = pca.fit(df)

In [25]:
result = model.transform(df).select("pcaFeatures")
result.show(truncate=False)

+-----------------------------------------------------------+
|pcaFeatures                                                |
+-----------------------------------------------------------+
|[1.6485728230883807,-4.013282700516296,-5.524543751369388] |
|[-4.645104331781534,-1.1167972663619026,-5.524543751369387]|
|[-6.428880535676489,-5.337951427775355,-5.524543751369389] |
+-----------------------------------------------------------+



#### 5. QuantileDiscretizer

In [26]:
from pyspark.ml.feature import QuantileDiscretizer

In [27]:
data = [(0, 18.0), (1, 19.0), (2, 8.0), (3, 5.0), (4, 2.2)]
df = spark.createDataFrame(data, ["id", "hour"])

In [28]:
discretizer = QuantileDiscretizer(numBuckets=3, inputCol="hour", outputCol="result")

In [29]:
result = discretizer.fit(df).transform(df)
result.show()

+---+----+------+
| id|hour|result|
+---+----+------+
|  0|18.0|   2.0|
|  1|19.0|   2.0|
|  2| 8.0|   1.0|
|  3| 5.0|   1.0|
|  4| 2.2|   0.0|
+---+----+------+

