# Introducing ML package of PySpark

## 第6章 ML包介绍

## Predict chances of infant survival with ML

In [1]:
try:
    sc.stop()
except:
    pass
#---------------------------#
from pyspark.sql import SparkSession

spark = SparkSession\
    .builder \
    .master('spark://master:7077') \
    .appName('Predict chances of infant survival with ML') \
    .getOrCreate()

In [2]:
def prn_attributes(obj):
    '''输出对象的所有属性
    '''
    print ('\nTYPE is ',type(obj))
    print ('-*-','\n-*- '.join(['%s:%s' % item for item in obj.__dict__.items()]))
def prn_methods(obj,prn_private = False):
    '''格式化输出对象的所有方法，默认不打印私有变量
    '''
    print ('\nTYPE is ',type(obj))
    k = 0
    obj_list = dir(obj)
    if prn_private == False:
        obj_list = [i for i in obj_list if i.startswith("_") != 1]
    for i in range(0,len(obj_list)):
        print ('{:25}'.format(obj_list[i]),end='')
        k += 1
        if k%4 == 0 :
            print ('')

### Load the data

First, we load the data.

在顶层，该软件包公开了三个主要的抽象类：转换器（Transformer）、评估器（Estimator）和管道（Pipeline）。我们将用一些简短的例子来解释每个类。在本章的最后一节提供了一些更具体的模型例子。

### 转换器

Transformer：转换器，是一种可以将一个DataFrame转换为另一个DataFrame的算法。比如一个模型就是一个 Transformer。它可以把 一个不包含预测标签的测试数据集 DataFrame 打上标签，转化成另一个包含预测标签的 DataFrame。技术上，Transformer实现了一个方法transform（），它通过附加一个或多个列将一个DataFrame转换为另一个DataFrame。

从高层次上看，当从转换器的抽象类派生时，每个新的转换器类需要实现.transform（...）方法。该方法要求传递一个要被转换的DataFrame，该参数通常是第一个也是唯一的一个强制性参数。当然这在ML包的不同方法中也不尽相同：其他常见的参数有inputCol和outputCol；然而，这些参数通常用一些预定义的值作为默认值，例如inputCol参数的默认值“features”。

在spark.ml.feature中提供了许多转换器，我们会在此简要介绍一下（本章稍后会使用其中的一部分）：

- Binarizer：根据指定的阈值将连续变量转换为对应的二进制值。
- Bucketizer：与Binarizer类似，该方法根据阈值列表（分割的参数），将连续变量转换为多项值（即将连续变量离散化到指定的范围区间）。
- ChiSqSelector：对于分类目标变量（思考分类模型），此功能允许你选择预定义数量的特征（由numTopFeatures参数进行参数化），以便最好地说明目标的变化。如方法名所示，使用卡方（Chi-Square）检验完成选择。该方法需要两步：首先，需要.fit（...）你的数据（所以此方法可以计算卡方检验）。调用.fit（...）方法（将DataFrame作为参数传入）返回一个ChiSqSelectorModel对象，然后可以使用该对象的.transform（...）方法来转换DataFrame。
- CountVectorizer：该方法对于标记文本（例如[['Learning'，'PySpark'，'with'，'us']，['us'，'us'，'us']]）是有用的。这是一个需要两步的方法：首先，你需要用.fit（...），即从数据集中学习这些模式，然后才能使用.fit（...）方法返回的CountVectorizerModel对象的.transform（...）方法。对于如上所示的标记文本，该转换器的输出类似于[（4，[0，1，2，3]，[1.0，1.0，1.0，1.0]），（4，[3]，[3.0]）]。
- DCT：离散余弦变换取实数值向量，并返回相同长度的向量，但余弦函数之和在不同频率下振荡。这种转换对于提取数据或数据压缩中的一些基本频率很有用。
- ElementwiseProduct：该方法返回一个向量，其中的元素是传入该方法的向量和另一个传入参数scalingVec向量的乘积。例如，如果传入的向量是[10.0，3.0，15.0]，而传入的scalingVec为[0.99，3.30，0.66]，那么将获得如下所示的向量：[9.9，9.9，9.9]。
- HashingTF：一个哈希转换器，输入为标记文本的列表，返回一个带有计数的有预定长度的向量。摘自PySpark的文档：“由于使用简单的模数将散列函数转换为列索引，建议使用2的幂作为numFeatures参数；否则特征将不会均匀地映射到列。”
- IDF：该方法计算文档列表的逆向文件频率。请注意，文档需要提前用向量表示（例如，使用HashingTF或CountVectorizer）。
- IndexToString：与StringIndexer方法对应。它使用StringIndexerModel对象中的编码将字符串索引反转到原始值。另外请注意，如果有时不起作用，你需要指定StringIndexer中的值。
- MaxAbsScaler：将数据调整到[-1.0，1.0]范围内（因此，它不会移动数据的中心）。
- MinMaxScaler：这与MaxAbsScaler相似，区别在于它将数据缩放到[0.0，1.0]范围内。
- NGram：此方法的输入为标记文本的列表，返回结果包含一系列n-gram：以两个词、三个词或更多的n个词为一个n-gram。例如，如果你有一个['good'，'morning'，'Robin'，'Williams']，你会得到以下输出：['good morning'，'morning Robin'，'Robin Williams']。
- Normalizer：该方法使用p范数将数据缩放为单位范数（默认为L2）。
- OneHotEncoder：该方法将分类列编码为二进制向量列。
- PCA：使用主成分分析执行数据降维。
- PolynomialExpansion：执行向量的多项式展开。例如，假如你有一个如[x，y，z]的向量，则该方法将产生以下扩展：[x，x*x，y，x*y，y*y，z，x*z，y*z，z*z]。
- QuantileDiscretizer：与Bucketizer方法类似，但不是传递分隔参数，而是传递一个numBuckets参数。然后，该方法通过计算数据的近似分位数来决定分隔应该是什么。
- RegexTokenizer：这是一个使用正则表达式的字符串分词器。
- RFormula：对于狂热的R用户，你可以传递一个公式，如vec～alpha*3+beta（假设你的DataFrame具有alpha和beta列），它将产生给定表达式的vec列。
- SQLTransformer：与上一个类似，但不是类似R的公式，你可以使用SQL语法。

FROM语句应该从__THIS__中选择，表示你正在访问DataFrame。例如：SELECT alpha*3+beta AS vec FROM__THIS__。

- StandardScaler：标准化列，使其拥有零均值和等于1的标准差。
- StopWordsRemover：从标记文本中删除停用词（例如“the”或“a”）。
- StringIndexer：假设包含所有单词的列表都在一列，这将产生一个索引向量。
- Tokenizer（分词器）：该默认分词器将字符串转成小写，然后以空格为分隔符分词。
- VectorAssembler：这是一个非常有用的转换器，它将多个数字（包括向量）列合并为一列向量。例如，如果你的DataFrame中有三列：




In [3]:
df = spark.createDataFrame(
    [(12,10,3),(1,4,2)],
    ['a','b','c'])

In [4]:
import pyspark.ml.feature as ft

ft.VectorAssembler(inputCols=['a','b','c'])

VectorAssembler_c8982fd53a14

## 评估器

评估器可以被视为需要评估的统计模型，对你的观测对象做预测或分类。

如果从抽象的评估器类派生，新模型必须实现.fit（...）方法，该方法用给出的在DataFrame中找到的数据和某些默认或自定义的参数来拟合模型。

在PySpark中有很多评估器可用，现在我们将简要介绍Spark 2.0中提供的模型。

分类

ML包为数据科学家提供了七种分类（Classification）模型以供选择，范围覆盖了从最简单的（如逻辑回归）到更复杂的。我们将在以下章节中提供简短的描述：

- LogisticRegression：分类的基准模型。逻辑回归使用一个对数函数来计算属于特定类别的观察对象的概率。撰写本文时，PySpark ML仅支持二值分类问题。
- DecisionTreeClassifier：该分类器构建了一个决策树来预测一个观察对象的所属类别。指定maxDepth参数限制树的深度，minInstancePerNode确定需要进一步拆分的树节点的观察对象的最小数量，maxBins参数指定连续变量将被分割的Bin的最大数量，而impurity指定用于测量并计算来自分割的信息的度量。
- GBTClassifier：用于分类的梯度提升决策树模型。该模型属于集合模型家族：集合模型结合多个弱预测模型而形成一个强健的模型。目前，GBTClassifier模型支持二进制标签、连续特征和分类特征。
- RandomForestClassifier：该模型产生多个决策树（因此命名为森林），并使用模式输出的决策树来对观察对象进行分类。RandomForestClassifier支持二元标签和多项标签。
- NaiveBayes：基于贝叶斯定理，该模型使用条件概率理论对观测进行分类。PySpark ML中的NaiveBayes模型支持二元标签和多项标签。
- MultilayerPerceptronClassifier（多层感知器分类器）：模仿人类大脑本质的分类器。深深植根于人造神经网络理论，该模型是一个黑盒模型，也就是说，不容易解释模型的内部参数。该模型至少包含三个完全相连的人造神经元层（在创建模型对象时需要指定的参数）：输入层（需要和数据集中特征的数量一样）、多个隐藏层（至少一个）以及一个输出层，其神经元数量等于标签中的类别数量。输入层和隐藏层中的所有神经元都有sigmoid激活函数，而输出层神经元的激活函数则为softmax。
- OneVsRest：将多分类问题简化为二分类问题。例如，在多标签的情况下，模型可以训练成多个二元逻辑回归模型。例如，如果label==2，模型将构建一个逻辑回归，它将label==2转换为1（所有剩余的标签值将设置为0），然后训练二元模型。所有的模型分别积分，具有最高概率的模型获胜。

### 回归

PySpark ML软件包中有七种可用于回归（Regression）任务的模型。与分类一样，范围从一些基本的回归（如强制线性回归）到更复杂的回归：
- AFTSurvivalRegression：适合加速失效时间回归模型。它是一个参数化模型，假设其中一个特征的边际效应加速或减缓了预期寿命（或过程失败）。它非常适用于具有明确阶段的过程。
- DecisionTreeRegressor：类似于分类模型，明显不同的是其标签是连续的而不是二元（或多项）的。
- GBTRegressor：与DecisionTreeRegressor一样，区别在于标签的数据类型。
- GeneralizedLinearRegression：广义线性回归是具有不同内核功能（链接功能）的线性模型家族。与假设误差项的常态性的线性回归相反，GLM允许标签具有不同的误差项分布：PySpark ML包的GeneralizedLinearRegression模型支持gaussian、binomial、gamma和poisson家族的误差分布，它们有许多不同的链接功能。
- IsotonicRegression：这种回归拟合一个形式自由、非递减的行到数据中。对于拟合有序的和递增的观测数据集是有用的。
- LinearRegression：最简单的回归模型，它假设了特征与连续标签以及误差项的常态之间的线性关系。
- RandomForestRegressor：与DecisionTreeRegressor或GBTRegressor类似，Random-ForestRegressor适合连续的标签，而不是离散标签。

### 聚类

聚类是一系列无监督的模型，用于查找数据中的隐含模式。PySpark ML包提供了四种当前最流行的模型：
- BisectingKMeans（二分k均值算法）：该算法结合了k均值聚类算法和层次聚类算法。最初该算法将所有观察点作为一个簇，然后将数据迭代地分解为k个簇。
- KMeans：著名的k均值算法，将数据分成k个簇，迭代地搜索那些使每个观察点和它所属簇的质点之间距离平方和最小的那些质点。
- GaussianMixture（高斯混合模型）：该方法使用具有未知参数的k个高斯分布来剖析数据集。使用期望最大化算法，通过最大化对数似然函数找到高斯参数。

请注意，对于带有诸多特征的数据集，由于维度所限和高斯分布的数值问题，该模型可能表现不佳。

- LDA：该模型用于自然语言处理应用程序中的主题生成。PySpark ML还提供了一个推荐模型，但我们不会在这里描述。



## 管道

PySpark ML中管道的概念用来表示从转换到评估（具有一系列不同阶段）的端到端的过程，这个过程可以对输入的一些原始数据（以DataFrame形式）执行必要的数据加工（转换），最后评估统计模型。

一个管道可以被认为是由一系列不同阶段组成的。在Pipeline对象上执行.fit（...）方法时，所有阶段按照stages参数中指定的顺序执行；stages参数是转换器和评估器对象的列表。管道对象的.fit（...）方法执行每个转换器的.transform（...）方法和所有评估器的.fit（...）方法。

通常，前一阶段的输出会成为下一阶段的输入：当从转换器或评估器抽象类派生时，需要实现.getOutputCol（）方法，该方法返回创建对象时指定的outputCol参数的值。


### Create transformers

我们指定DataFrame的schema，严格限制数据集只有17列。

```bash
cd jupyternotebook/learningPySpark-master/Chapter06/;
wget http://www.tomdrabas.com/data/LearningPySpark/births_transformed.csv.gz;
hadoop fs -put -f ./births_transformed.csv.gz /jupyter-notebook-data/;
```

In [5]:
import pyspark.sql.types as typ

# 我们指定DataFrame的schema，严格限制数据集只有17列。
labels = [
    ('INFANT_ALIVE_AT_REPORT', typ.IntegerType()),
    ('BIRTH_PLACE', typ.StringType()),
    ('MOTHER_AGE_YEARS', typ.IntegerType()),
    ('FATHER_COMBINED_AGE', typ.IntegerType()),
    ('CIG_BEFORE', typ.IntegerType()),
    ('CIG_1_TRI', typ.IntegerType()),
    ('CIG_2_TRI', typ.IntegerType()),
    ('CIG_3_TRI', typ.IntegerType()),
    ('MOTHER_HEIGHT_IN', typ.IntegerType()),
    ('MOTHER_PRE_WEIGHT', typ.IntegerType()),
    ('MOTHER_DELIVERY_WEIGHT', typ.IntegerType()),
    ('MOTHER_WEIGHT_GAIN', typ.IntegerType()),
    ('DIABETES_PRE', typ.IntegerType()),
    ('DIABETES_GEST', typ.IntegerType()),
    ('HYP_TENS_PRE', typ.IntegerType()),
    ('HYP_TENS_GEST', typ.IntegerType()),
    ('PREV_BIRTH_PRETERM', typ.IntegerType())
]

schema = typ.StructType([
        typ.StructField(e[0], e[1], False) for e in labels
    ])


In [6]:
# 遇到奇怪bug：births.show(5)显示数据全为null
births = spark.read.csv('/jupyter-notebook-data/births_transformed.csv.gz', 
                        header=True, 
                        schema=schema)
births.show(3)

+----------------------+-----------+----------------+-------------------+----------+---------+---------+---------+----------------+-----------------+----------------------+------------------+------------+-------------+------------+-------------+------------------+
|INFANT_ALIVE_AT_REPORT|BIRTH_PLACE|MOTHER_AGE_YEARS|FATHER_COMBINED_AGE|CIG_BEFORE|CIG_1_TRI|CIG_2_TRI|CIG_3_TRI|MOTHER_HEIGHT_IN|MOTHER_PRE_WEIGHT|MOTHER_DELIVERY_WEIGHT|MOTHER_WEIGHT_GAIN|DIABETES_PRE|DIABETES_GEST|HYP_TENS_PRE|HYP_TENS_GEST|PREV_BIRTH_PRETERM|
+----------------------+-----------+----------------+-------------------+----------+---------+---------+---------+----------------+-----------------+----------------------+------------------+------------+-------------+------------+-------------+------------------+
|                     0|          1|              29|                 99|         0|        0|        0|        0|              99|              999|                   999|                99|           0| 

In [7]:
import pyspark.ml.feature as ft

births = births \
    .withColumn(
        'BIRTH_PLACE_INT',
        births['BIRTH_PLACE'].cast(typ.IntegerType())
               )

prn_methods(births)


TYPE is  <class 'pyspark.sql.dataframe.DataFrame'>
agg                      alias                    approxQuantile           cache                    
checkpoint               coalesce                 colRegex                 collect                  
columns                  corr                     count                    cov                      
createGlobalTempView     createOrReplaceGlobalTempViewcreateOrReplaceTempView  createTempView           
crossJoin                crosstab                 cube                     describe                 
distinct                 drop                     dropDuplicates           drop_duplicates          
dropna                   dtypes                   exceptAll                explain                  
fillna                   filter                   first                    foreach                  
foreachPartition         freqItems                groupBy                  groupby                  
head                     hint      

Having done this, we can now create our first `Transformer`.

我们将使用OneHotEncoder方法来对BIRTH_PLACE列进行编码。但是，该方法不接受StringType列；它只能处理数字类型，所以我们首先将该列转换为IntegerType：

In [8]:
encoder = ft.OneHotEncoder(
    inputCol='BIRTH_PLACE_INT', 
    outputCol='BIRTH_PLACE_VEC')

prn_methods(encoder)


TYPE is  <class 'pyspark.ml.feature.OneHotEncoder'>
copy                     dropLast                 explainParam             explainParams            
extractParamMap          getDropLast              getInputCol              getOrDefault             
getOutputCol             getParam                 hasDefault               hasParam                 
inputCol                 isDefined                isSet                    load                     
outputCol                params                   read                     save                     
set                      setDropLast              setInputCol              setOutputCol             
setParams                transform                uid                      write                    


Let's now create a single column with all the features collated together. 

现在我们来创建一个单一的列，它将所有特征整合在一起。我们将使用VectorAssembler方法：

传递给VectorAssembler对象的inputCols参数是一个列表，该列表包含所有要合并在一起以组成outputCol——“features”的列。请注意，我们使用编码器对象的输出（调用.getOutputCol（）方法），因此如果我们在任何时候更改了编码器对象中输出列的名称，那么我们就不必更改此参数的值。


In [9]:
featuresCreator = ft.VectorAssembler(
    inputCols=[ col[0] for col in labels[2:] ] +  [encoder.getOutputCol()],
    outputCol='features'
)

prn_methods(featuresCreator)


TYPE is  <class 'pyspark.ml.feature.VectorAssembler'>
copy                     explainParam             explainParams            extractParamMap          
getHandleInvalid         getInputCols             getOrDefault             getOutputCol             
getParam                 handleInvalid            hasDefault               hasParam                 
inputCols                isDefined                isSet                    load                     
outputCol                params                   read                     save                     
set                      setHandleInvalid         setInputCols             setOutputCol             
setParams                transform                uid                      write                    


### Create an estimator

In this example we will (once again) us the Logistic Regression model.

In [10]:
import pyspark.ml.classification as cl

Once loaded, let's create the model.

如果目标列的名称为“label”，则不必指定labelCol参数。另外，如果featuresCreator的输出名称不是“features”，那么必须通过在featuresCreator对象上调用getOutputCol（）方法来指定featuresCol（这是最方便的做法）。

In [11]:
logistic = cl.LogisticRegression(
    maxIter=10, 
    regParam=0.01, 
    labelCol='INFANT_ALIVE_AT_REPORT')

### Create a pipeline

All that is left now is to creat a `Pipeline` and fit the model. First, let's load the `Pipeline` from the package.

创建管道非常容易。从概念上看管道如下图：
![](https://img1.doubanio.com/view/ark_works_pic/common-largeshow/public/865752508.jpg)

In [12]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[
        encoder, 
        featuresCreator,
        logistic
    ])
prn_methods(pipeline)


TYPE is  <class 'pyspark.ml.pipeline.Pipeline'>
copy                     explainParam             explainParams            extractParamMap          
fit                      fitMultiple              getOrDefault             getParam                 
getStages                hasDefault               hasParam                 isDefined                
isSet                    load                     params                   read                     
save                     set                      setParams                setStages                
stages                   uid                      write                    

### Fit the model

Conventiently, `DataFrame` API has the `.randomSplit(...)` method.

In [13]:
births_train, births_test = births.randomSplit([0.7, 0.3], seed=666)

+----------------------+-----------+----------------+-------------------+----------+---------+---------+---------+----------------+-----------------+----------------------+------------------+------------+-------------+------------+-------------+------------------+---------------+
|INFANT_ALIVE_AT_REPORT|BIRTH_PLACE|MOTHER_AGE_YEARS|FATHER_COMBINED_AGE|CIG_BEFORE|CIG_1_TRI|CIG_2_TRI|CIG_3_TRI|MOTHER_HEIGHT_IN|MOTHER_PRE_WEIGHT|MOTHER_DELIVERY_WEIGHT|MOTHER_WEIGHT_GAIN|DIABETES_PRE|DIABETES_GEST|HYP_TENS_PRE|HYP_TENS_GEST|PREV_BIRTH_PRETERM|BIRTH_PLACE_INT|
+----------------------+-----------+----------------+-------------------+----------+---------+---------+---------+----------------+-----------------+----------------------+------------------+------------+-------------+------------+-------------+------------------+---------------+
|                     0|          1|              12|                 99|         0|        0|        0|        0|              60|              154|        

Now run our `pipeline` and estimate our model.

管道对象的.fit（...）方法以训练数据集为输入。在方法内部，births_train数据集首先被传给encoder对象。在encoder阶段创建的DataFrame将被传递给创建“features”列的featuresCreator。最后，此阶段的输出被传递给评估最终模型的logistic对象。

.fit（...）方法返回用于预测的PipelineModel对象（上述代码片段中的model对象），将之前创建的测试数据集传递给要调用的.transform（...）方法来获得预测。test_model示例在如下命令行中：


In [14]:
model = pipeline.fit(births_train)
test_model = model.transform(births_test)

Here's what the `test_model` looks like.

如你所见，我们通过转换器和评估器得到了所有的列。逻辑回归模型输出了几列：rawPrediction是特征和β系数的线性组合的值，probability是为每个类别计算出的概率，最后prediction是最终的类分配。

In [15]:
test_model.take(3)

[Row(INFANT_ALIVE_AT_REPORT=0, BIRTH_PLACE='1', MOTHER_AGE_YEARS=13, FATHER_COMBINED_AGE=99, CIG_BEFORE=0, CIG_1_TRI=0, CIG_2_TRI=0, CIG_3_TRI=0, MOTHER_HEIGHT_IN=66, MOTHER_PRE_WEIGHT=133, MOTHER_DELIVERY_WEIGHT=135, MOTHER_WEIGHT_GAIN=2, DIABETES_PRE=0, DIABETES_GEST=0, HYP_TENS_PRE=0, HYP_TENS_GEST=0, PREV_BIRTH_PRETERM=0, BIRTH_PLACE_INT=1, BIRTH_PLACE_VEC=SparseVector(9, {1: 1.0}), features=SparseVector(24, {0: 13.0, 1: 99.0, 6: 66.0, 7: 133.0, 8: 135.0, 9: 2.0, 16: 1.0}), rawPrediction=DenseVector([1.0573, -1.0573]), probability=DenseVector([0.7422, 0.2578]), prediction=0.0),
 Row(INFANT_ALIVE_AT_REPORT=0, BIRTH_PLACE='1', MOTHER_AGE_YEARS=14, FATHER_COMBINED_AGE=99, CIG_BEFORE=0, CIG_1_TRI=0, CIG_2_TRI=0, CIG_3_TRI=0, MOTHER_HEIGHT_IN=63, MOTHER_PRE_WEIGHT=93, MOTHER_DELIVERY_WEIGHT=100, MOTHER_WEIGHT_GAIN=0, DIABETES_PRE=0, DIABETES_GEST=0, HYP_TENS_PRE=0, HYP_TENS_GEST=0, PREV_BIRTH_PRETERM=0, BIRTH_PLACE_INT=1, BIRTH_PLACE_VEC=SparseVector(9, {1: 1.0}), features=SparseVector(

### Model performance

Obviously, we would like to now test how well our model did.

In [16]:
import pyspark.ml.evaluation as ev

evaluator = ev.BinaryClassificationEvaluator(
    rawPredictionCol='probability', 
    labelCol='INFANT_ALIVE_AT_REPORT')

print(evaluator.evaluate(test_model, 
     {evaluator.metricName: 'areaUnderROC'}))
print(evaluator.evaluate(test_model, {evaluator.metricName: 'areaUnderPR'}))

0.7401301847095617
0.7139354342365674


ROC区域74%，而PR区域为71%，这表明模型定义良好，但是并没有什么优越之处；如果有其他特征，我们可以提高一下，但是这不是本章的目的（也非本书的目的）。

### Saving the model

PySpark allows you to save the `Pipeline` definition for later use.

In [17]:
pipelinePath = './infant_oneHotEncoder_Logistic_Pipeline'
pipeline.write().overwrite().save(pipelinePath)

So, you can load it up later and use straight away to `.fit(...)` and predict.

In [18]:
loadedPipeline = Pipeline.load(pipelinePath)
loadedPipeline \
    .fit(births_train)\
    .transform(births_test)\
    .take(1)

[Row(INFANT_ALIVE_AT_REPORT=0, BIRTH_PLACE='1', MOTHER_AGE_YEARS=13, FATHER_COMBINED_AGE=99, CIG_BEFORE=0, CIG_1_TRI=0, CIG_2_TRI=0, CIG_3_TRI=0, MOTHER_HEIGHT_IN=66, MOTHER_PRE_WEIGHT=133, MOTHER_DELIVERY_WEIGHT=135, MOTHER_WEIGHT_GAIN=2, DIABETES_PRE=0, DIABETES_GEST=0, HYP_TENS_PRE=0, HYP_TENS_GEST=0, PREV_BIRTH_PRETERM=0, BIRTH_PLACE_INT=1, BIRTH_PLACE_VEC=SparseVector(9, {1: 1.0}), features=SparseVector(24, {0: 13.0, 1: 99.0, 6: 66.0, 7: 133.0, 8: 135.0, 9: 2.0, 16: 1.0}), rawPrediction=DenseVector([1.0573, -1.0573]), probability=DenseVector([0.7422, 0.2578]), prediction=0.0)]

You can also save the whole model

In [19]:
from pyspark.ml import PipelineModel

modelPath = './infant_oneHotEncoder_Logistic_PipelineModel'
model.write().overwrite().save(modelPath)

loadedPipelineModel = PipelineModel.load(modelPath)
test_loadedModel = loadedPipelineModel.transform(births_test)

In [61]:
prn_methods(test_loadedModel)
test_loadedModel.show(3)


TYPE is  <class 'pyspark.sql.dataframe.DataFrame'>
agg                      alias                    approxQuantile           cache                    
checkpoint               coalesce                 colRegex                 collect                  
columns                  corr                     count                    cov                      
createGlobalTempView     createOrReplaceGlobalTempViewcreateOrReplaceTempView  createTempView           
crossJoin                crosstab                 cube                     describe                 
distinct                 drop                     dropDuplicates           drop_duplicates          
dropna                   dtypes                   exceptAll                explain                  
fillna                   filter                   first                    foreach                  
foreachPartition         freqItems                groupBy                  groupby                  
head                     hint      

## Parameter hyper-tuning

### 超参调优 跳过

超参调优的概念是找到模型的最佳参数：例如正确估计逻辑回归模型所需的最大迭代次数或决策树的最大深度。
在本节中，我们将探讨两个概念，使我们能够为模型找到最佳参数：grid search和train-validation splitting。


Load the `.tuning` part of the package.

In [20]:
import pyspark.ml.tuning as tune

Next let's specify our model and the list of parameters we want to loop through.

In [21]:
logistic = cl.LogisticRegression(
    labelCol='INFANT_ALIVE_AT_REPORT')

grid = tune.ParamGridBuilder() \
    .addGrid(logistic.maxIter,  
             [2, 10, 50]) \
    .addGrid(logistic.regParam, 
             [0.01, 0.05, 0.3]) \
    .build()

Next, we need some way of comparing the models.

In [22]:
evaluator = ev.BinaryClassificationEvaluator(
    rawPredictionCol='probability', 
    labelCol='INFANT_ALIVE_AT_REPORT')

Create the logic that will do the validation work for us.

In [23]:
cv = tune.CrossValidator(
    estimator=logistic, 
    estimatorParamMaps=grid, 
    evaluator=evaluator
)

Create a purely transforming `Pipeline`.

In [24]:
pipeline = Pipeline(stages=[encoder,featuresCreator])
data_transformer = pipeline.fit(births_train)

Having done this, we are ready to find the optimal combination of parameters for our model.

In [25]:
cvModel = cv.fit(data_transformer.transform(births_train))

The `cvModel` will return the best model estimated. We can now use it to see if it performed better than our previous model.

In [26]:
data_train = data_transformer \
    .transform(births_test)
results = cvModel.transform(data_train)

print(evaluator.evaluate(results, 
     {evaluator.metricName: 'areaUnderROC'}))
print(evaluator.evaluate(results, 
     {evaluator.metricName: 'areaUnderPR'}))

0.7404959803309813
0.7157971108486731


What parameters has the best model? The answer is a little bit convoluted but here's how you can extract it.

In [27]:
results = [
    (
        [
            {key.name: paramValue} 
            for key, paramValue 
            in zip(
                params.keys(), 
                params.values())
        ], metric
    ) 
    for params, metric 
    in zip(
        cvModel.getEstimatorParamMaps(), 
        cvModel.avgMetrics
    )
]

sorted(results, 
       key=lambda el: el[1], 
       reverse=True)[0]

([{'maxIter': 50}, {'regParam': 0.01}], 0.7383162353924938)

### Train-Validation splitting

Use the `ChiSqSelector` to select only top 5 features, thus limiting the complexity of our model.

In [28]:
selector = ft.ChiSqSelector(
    numTopFeatures=5, 
    featuresCol=featuresCreator.getOutputCol(), 
    outputCol='selectedFeatures',
    labelCol='INFANT_ALIVE_AT_REPORT'
)

logistic = cl.LogisticRegression(
    labelCol='INFANT_ALIVE_AT_REPORT',
    featuresCol='selectedFeatures'
)

pipeline = Pipeline(stages=[encoder,featuresCreator,selector])
data_transformer = pipeline.fit(births_train)

The `TrainValidationSplit` object gets created in the same fashion as the `CrossValidator` model.

In [29]:
tvs = tune.TrainValidationSplit(
    estimator=logistic, 
    estimatorParamMaps=grid, 
    evaluator=evaluator
)

As before, we fit our data to the model, and calculate the results.

In [30]:
tvsModel = tvs.fit(
    data_transformer \
        .transform(births_train)
)

data_train = data_transformer \
    .transform(births_test)
results = tvsModel.transform(data_train)

print(evaluator.evaluate(results, 
     {evaluator.metricName: 'areaUnderROC'}))
print(evaluator.evaluate(results, 
     {evaluator.metricName: 'areaUnderPR'}))

0.7294296314442145
0.703775950281647


## Other features of PySpark ML in action

### Feature extraction

#### NLP related feature extractors

Simple dataset.

In [31]:
text_data = spark.createDataFrame([
    ['''Machine learning can be applied to a wide variety 
        of data types, such as vectors, text, images, and 
        structured data. This API adopts the DataFrame from 
        Spark SQL in order to support a variety of data types.'''],
    ['''DataFrame supports many basic and structured types; 
        see the Spark SQL datatype reference for a list of 
        supported types. In addition to the types listed in 
        the Spark SQL guide, DataFrame can use ML Vector types.'''],
    ['''A DataFrame can be created either implicitly or 
        explicitly from a regular RDD. See the code examples 
        below and the Spark SQL programming guide for examples.'''],
    ['''Columns in a DataFrame are named. The code examples 
        below use names such as "text," "features," and "label."''']
], ['input'])

First, we need to tokenize this text.

In [32]:
tokenizer = ft.RegexTokenizer(
    inputCol='input', 
    outputCol='input_arr', 
    pattern='\s+|[,.\"]')
# 该模式会将文本在所有的空格处分隔，而且还会删除逗号、句号、反斜杠和引号。

The output of the tokenizer looks similar to this.

In [33]:
tok = tokenizer \
    .transform(text_data) \
    .select('input_arr') 

tok.take(1)

[Row(input_arr=['machine', 'learning', 'can', 'be', 'applied', 'to', 'a', 'wide', 'variety', 'of', 'data', 'types', 'such', 'as', 'vectors', 'text', 'images', 'and', 'structured', 'data', 'this', 'api', 'adopts', 'the', 'dataframe', 'from', 'spark', 'sql', 'in', 'order', 'to', 'support', 'a', 'variety', 'of', 'data', 'types'])]

Use the `StopWordsRemover(...)`.

In [34]:
stopwords = ft.StopWordsRemover(
    inputCol=tokenizer.getOutputCol(), 
    outputCol='input_stop')

The output of the method looks as follows

In [35]:
stopwords.transform(tok).select('input_stop').take(1)

[Row(input_stop=['machine', 'learning', 'applied', 'wide', 'variety', 'data', 'types', 'vectors', 'text', 'images', 'structured', 'data', 'api', 'adopts', 'dataframe', 'spark', 'sql', 'order', 'support', 'variety', 'data', 'types'])]

Build `NGram` model and the `Pipeline`.

现在我们只剩下有用的单词。所以，我们来构建NGram模型和管道：

In [36]:
ngram = ft.NGram(n=2, 
    inputCol=stopwords.getOutputCol(), 
    outputCol="nGrams")

pipeline = Pipeline(stages=[tokenizer, stopwords, ngram])

Now that we have the `pipeline` we follow in the very similar fashion as before.

In [37]:
data_ngram = pipeline \
    .fit(text_data) \
    .transform(text_data)
    
data_ngram.select('nGrams').take(1)

[Row(nGrams=['machine learning', 'learning applied', 'applied wide', 'wide variety', 'variety data', 'data types', 'types vectors', 'vectors text', 'text images', 'images structured', 'structured data', 'data api', 'api adopts', 'adopts dataframe', 'dataframe spark', 'spark sql', 'sql order', 'order support', 'support variety', 'variety data', 'data types'])]

That's it. We got our n-grams and we can then use them in further NLP processing.

#### Discretize continuous variables 本节跳过

It is sometimes useful to *band* the values into discrete buckets.

In [38]:
import numpy as np

x = np.arange(0, 100)
x = x / 100.0 * np.pi * 4
y = x * np.sin(x / 1.764) + 20.1234

schema = typ.StructType([
    typ.StructField('continuous_var', 
                    typ.DoubleType(), 
                    False
   )
])

data = spark.createDataFrame([[float(e), ] for e in y], schema=schema)

Use the `QuantileDiscretizer` model to split our continuous variable into 5 buckets (see the `numBuckets` parameter).

In [39]:
discretizer = ft.QuantileDiscretizer(
    numBuckets=5, 
    inputCol='continuous_var', 
    outputCol='discretized')

Let's see what we got.

In [40]:
data_discretized = discretizer.fit(data).transform(data)

data_discretized \
    .groupby('discretized')\
    .mean('continuous_var')\
    .sort('discretized')\
    .collect()

[Row(discretized=0.0, avg(continuous_var)=12.314360733007913),
 Row(discretized=1.0, avg(continuous_var)=16.046244793347473),
 Row(discretized=2.0, avg(continuous_var)=20.250799478352594),
 Row(discretized=3.0, avg(continuous_var)=22.040988218437327),
 Row(discretized=4.0, avg(continuous_var)=24.264824657002862)]

#### Standardizing continuous variables

Create a vector representation of our continuous variable (as it is only a single float)


In [41]:
vectorizer = ft.VectorAssembler(
    inputCols=['continuous_var'], 
    outputCol= 'continuous_vec')

Build a `normalizer` and a `pipeline`.

In [42]:
normalizer = ft.StandardScaler(
    inputCol=vectorizer.getOutputCol(), 
    outputCol='normalized', 
    withMean=True,
    withStd=True
)

pipeline = Pipeline(stages=[vectorizer, normalizer])
data_standardized = pipeline.fit(data).transform(data)

### Classification

We will now use the `RandomForestClassfier` to model the chances of survival for an infant.

First, we need to cast the label feature to `DoubleType`.

In [43]:
import pyspark.sql.functions as func

births = births.withColumn(
    'INFANT_ALIVE_AT_REPORT', 
    func.col('INFANT_ALIVE_AT_REPORT').cast(typ.DoubleType())
)

births_train, births_test = births \
    .randomSplit([0.7, 0.3], seed=666)

We are ready to build our model.

In [44]:
classifier = cl.RandomForestClassifier(
    numTrees=5, 
    maxDepth=5, 
    labelCol='INFANT_ALIVE_AT_REPORT')

pipeline = Pipeline(
    stages=[
        encoder,
        featuresCreator, 
        classifier])

model = pipeline.fit(births_train)
test = model.transform(births_test)

Let's now see how the `RandomForestClassifier` model performs compared to the `LogisticRegression`.

In [45]:
evaluator = ev.BinaryClassificationEvaluator(
    labelCol='INFANT_ALIVE_AT_REPORT')
print(evaluator.evaluate(test, 
    {evaluator.metricName: "areaUnderROC"}))
print(evaluator.evaluate(test, 
    {evaluator.metricName: "areaUnderPR"}))

0.7386301717380878
0.7087601747579149


Let's test how well would one tree do, then.

In [46]:
classifier = cl.DecisionTreeClassifier(
    maxDepth=5, 
    labelCol='INFANT_ALIVE_AT_REPORT')
pipeline = Pipeline(stages=[
        encoder,
        featuresCreator, 
        classifier]
)

model = pipeline.fit(births_train)
test = model.transform(births_test)

evaluator = ev.BinaryClassificationEvaluator(
    labelCol='INFANT_ALIVE_AT_REPORT')
print(evaluator.evaluate(test, 
     {evaluator.metricName: "areaUnderROC"}))
print(evaluator.evaluate(test, 
     {evaluator.metricName: "areaUnderPR"}))

0.7039183639098666
0.709267119670054


### Clustering

In this example we will use k-means model to find similarities in the births data.

In [47]:
import pyspark.ml.clustering as clus

kmeans = clus.KMeans(k = 5, 
    featuresCol='features')

pipeline = Pipeline(stages=[
        encoder,
        featuresCreator, 
        kmeans]
)

model = pipeline.fit(births_train)

Having estimated the model, let's see if we can find some differences between clusters.

In [48]:
test = model.transform(births_test)

test \
    .groupBy('prediction') \
    .agg({
        '*': 'count', 
        'MOTHER_HEIGHT_IN': 'avg'
    }).collect()

[Row(prediction=1, avg(MOTHER_HEIGHT_IN)=83.91154791154791, count(1)=407),
 Row(prediction=3, avg(MOTHER_HEIGHT_IN)=66.64658634538152, count(1)=249),
 Row(prediction=4, avg(MOTHER_HEIGHT_IN)=65.3889041472123, count(1)=3641),
 Row(prediction=2, avg(MOTHER_HEIGHT_IN)=67.69473684210526, count(1)=475),
 Row(prediction=0, avg(MOTHER_HEIGHT_IN)=63.90993407084591, count(1)=8949)]

In the field of NLP, problems such as topic extract rely on clustering to detect documents with similar topics. First, let's create our dataset.

In [49]:
text_data = spark.createDataFrame([
    ['''To make a computer do anything, you have to write a 
    computer program. To write a computer program, you have 
    to tell the computer, step by step, exactly what you want 
    it to do. The computer then "executes" the program, 
    following each step mechanically, to accomplish the end 
    goal. When you are telling the computer what to do, you 
    also get to choose how it's going to do it. That's where 
    computer algorithms come in. The algorithm is the basic 
    technique used to get the job done. Let's follow an 
    example to help get an understanding of the algorithm 
    concept.'''],
    ['''Laptop computers use batteries to run while not 
    connected to mains. When we overcharge or overheat 
    lithium ion batteries, the materials inside start to 
    break down and produce bubbles of oxygen, carbon dioxide, 
    and other gases. Pressure builds up, and the hot battery 
    swells from a rectangle into a pillow shape. Sometimes 
    the phone involved will operate afterwards. Other times 
    it will die. And occasionally—kapow! To see what's 
    happening inside the battery when it swells, the CLS team 
    used an x-ray technology called computed tomography.'''],
    ['''This technology describes a technique where touch 
    sensors can be placed around any side of a device 
    allowing for new input sources. The patent also notes 
    that physical buttons (such as the volume controls) could 
    be replaced by these embedded touch sensors. In essence 
    Apple could drop the current buttons and move towards 
    touch-enabled areas on the device for the existing UI. It 
    could also open up areas for new UI paradigms, such as 
    using the back of the smartphone for quick scrolling or 
    page turning.'''],
    ['''The National Park Service is a proud protector of 
    America’s lands. Preserving our land not only safeguards 
    the natural environment, but it also protects the 
    stories, cultures, and histories of our ancestors. As we 
    face the increasingly dire consequences of climate 
    change, it is imperative that we continue to expand 
    America’s protected lands under the oversight of the 
    National Park Service. Doing so combats climate change 
    and allows all American’s to visit, explore, and learn 
    from these treasured places for generations to come. It 
    is critical that President Obama acts swiftly to preserve 
    land that is at risk of external threats before the end 
    of his term as it has become blatantly clear that the 
    next administration will not hold the same value for our 
    environment over the next four years.'''],
    ['''The National Park Foundation, the official charitable 
    partner of the National Park Service, enriches America’s 
    national parks and programs through the support of 
    private citizens, park lovers, stewards of nature, 
    history enthusiasts, and wilderness adventurers. 
    Chartered by Congress in 1967, the Foundation grew out of 
    a legacy of park protection that began over a century 
    ago, when ordinary citizens took action to establish and 
    protect our national parks. Today, the National Park 
    Foundation carries on the tradition of early park 
    advocates, big thinkers, doers and dreamers—from John 
    Muir and Ansel Adams to President Theodore Roosevelt.'''],
    ['''Australia has over 500 national parks. Over 28 
    million hectares of land is designated as national 
    parkland, accounting for almost four per cent of 
    Australia's land areas. In addition, a further six per 
    cent of Australia is protected and includes state 
    forests, nature parks and conservation reserves.National 
    parks are usually large areas of land that are protected 
    because they have unspoilt landscapes and a diverse 
    number of native plants and animals. This means that 
    commercial activities such as farming are prohibited and 
    human activity is strictly monitored.''']
], ['documents'])

First, we will once again use the `RegexTokenizer` and the `StopWordsRemover` models.

In [50]:
tokenizer = ft.RegexTokenizer(
    inputCol='documents', 
    outputCol='input_arr', 
    pattern='\s+|[,.\"]')

stopwords = ft.StopWordsRemover(
    inputCol=tokenizer.getOutputCol(), 
    outputCol='input_stop')

Next in our pipeline is the `CountVectorizer`.

In [51]:
stringIndexer = ft.CountVectorizer(
    inputCol=stopwords.getOutputCol(), 
    outputCol="input_indexed")

tokenized = stopwords \
    .transform(
        tokenizer\
            .transform(text_data)
    )
    
stringIndexer \
    .fit(tokenized)\
    .transform(tokenized)\
    .select('input_indexed')\
    .take(2)

[Row(input_indexed=SparseVector(257, {2: 7.0, 6: 1.0, 7: 3.0, 8: 3.0, 10: 3.0, 24: 1.0, 29: 2.0, 31: 1.0, 33: 1.0, 37: 2.0, 39: 1.0, 46: 1.0, 58: 1.0, 59: 1.0, 61: 1.0, 64: 1.0, 70: 1.0, 72: 1.0, 81: 1.0, 96: 1.0, 128: 1.0, 133: 1.0, 134: 1.0, 135: 1.0, 142: 1.0, 161: 1.0, 164: 1.0, 169: 1.0, 189: 1.0, 227: 1.0, 249: 1.0, 253: 1.0, 254: 1.0})),
 Row(input_indexed=SparseVector(257, {14: 1.0, 16: 2.0, 23: 2.0, 25: 2.0, 31: 1.0, 42: 2.0, 49: 1.0, 51: 1.0, 55: 1.0, 56: 1.0, 67: 1.0, 73: 1.0, 76: 1.0, 77: 1.0, 84: 1.0, 87: 1.0, 97: 1.0, 105: 1.0, 113: 1.0, 116: 1.0, 117: 1.0, 125: 1.0, 130: 1.0, 139: 1.0, 141: 1.0, 143: 1.0, 151: 1.0, 152: 1.0, 154: 1.0, 157: 1.0, 166: 1.0, 171: 1.0, 182: 1.0, 185: 1.0, 186: 1.0, 187: 1.0, 194: 1.0, 195: 1.0, 199: 1.0, 201: 1.0, 202: 1.0, 204: 1.0, 209: 1.0, 213: 1.0, 234: 1.0, 235: 1.0, 247: 1.0}))]

We will use the `LDA` model - the Latent Dirichlet Allocation model - to extract the topics.

In [52]:
clustering = clus.LDA(k=2, optimizer='online', featuresCol=stringIndexer.getOutputCol())

Put these puzzles together.

In [53]:
pipeline = Pipeline(stages=[
        tokenizer, 
        stopwords,
        stringIndexer, 
        clustering]
)

Let's see if we have properly uncovered the topics.

In [54]:
topics = pipeline \
    .fit(text_data) \
    .transform(text_data)

topics.select('topicDistribution').collect()

[Row(topicDistribution=DenseVector([0.0134, 0.9866])),
 Row(topicDistribution=DenseVector([0.9818, 0.0182])),
 Row(topicDistribution=DenseVector([0.9508, 0.0492])),
 Row(topicDistribution=DenseVector([0.3339, 0.6661])),
 Row(topicDistribution=DenseVector([0.0106, 0.9894])),
 Row(topicDistribution=DenseVector([0.0123, 0.9877]))]

### Regression

步骤清晰

In this section we will try to predict the `MOTHER_WEIGHT_GAIN`.

In [55]:
features = ['MOTHER_AGE_YEARS','MOTHER_HEIGHT_IN',
            'MOTHER_PRE_WEIGHT','DIABETES_PRE',
            'DIABETES_GEST','HYP_TENS_PRE', 
            'HYP_TENS_GEST', 'PREV_BIRTH_PRETERM',
            'CIG_BEFORE','CIG_1_TRI', 'CIG_2_TRI', 
            'CIG_3_TRI'
           ]

First, we will collate all the features together and use the `ChiSqSelector` to select only the top 6 most important features.

In [56]:
featuresCreator = ft.VectorAssembler(
    inputCols=[col for col in features[1:]], 
    outputCol='features'
)

selector = ft.ChiSqSelector(
    numTopFeatures=6, 
    outputCol="selectedFeatures", 
    labelCol='MOTHER_WEIGHT_GAIN'
)

In order to predict the weight gain we will use the gradient boosted trees regressor.

In [57]:
import pyspark.ml.regression as reg

regressor = reg.GBTRegressor(
    maxIter=15, 
    maxDepth=3,
    labelCol='MOTHER_WEIGHT_GAIN')

Finally, again, we put it all together into a `Pipeline`.

In [58]:
pipeline = Pipeline(stages=[
        featuresCreator, 
        selector,
        regressor])

weightGain = pipeline.fit(births_train)

Having created the `weightGain` model, let's see if it performs well on our testing data.

In [59]:
evaluator = ev.RegressionEvaluator(
    predictionCol="prediction", 
    labelCol='MOTHER_WEIGHT_GAIN')

print(evaluator.evaluate(
     weightGain.transform(births_test), 
    {evaluator.metricName: 'r2'}))

0.49019829267505


遗憾的是，这个模型没有什么特别之处。看起来如果没有与MOTHER_WEIGHT_GAIN标签相关的其他更好的独立特征，我们将无法充分解释其变化。

## 官方pipline DEMO

In [62]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer

# Prepare training documents from a list of (id, text, label) tuples.
training = spark.createDataFrame([
    (0, "a b c d e spark", 1.0),
    (1, "b d", 0.0),
    (2, "spark f g h", 1.0),
    (3, "hadoop mapreduce", 0.0)
], ["id", "text", "label"])

# Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.001)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

# Fit the pipeline to training documents.
model = pipeline.fit(training)

# Prepare test documents, which are unlabeled (id, text) tuples.
test = spark.createDataFrame([
    (4, "spark i j k"),
    (5, "l m n"),
    (6, "spark hadoop spark"),
    (7, "apache hadoop")
], ["id", "text"])

# Make predictions on test documents and print columns of interest.
prediction = model.transform(test)
selected = prediction.select("id", "text", "probability", "prediction")
for row in selected.collect():
    rid, text, prob, prediction = row
    print("(%d, %s) --> prob=%s, prediction=%f" % (rid, text, str(prob), prediction))

(4, spark i j k) --> prob=[0.1596407738787475,0.8403592261212525], prediction=1.000000
(5, l m n) --> prob=[0.8378325685476744,0.16216743145232562], prediction=0.000000
(6, spark hadoop spark) --> prob=[0.06926633132976037,0.9307336686702395], prediction=1.000000
(7, apache hadoop) --> prob=[0.9821575333444218,0.01784246665557808], prediction=0.000000
