# 读取数据

In [1]:
from pyspark.sql.session import SparkSession
from pyspark import SparkContext
import pyspark.sql.types as typ

sc = SparkContext.getOrCreate()
spark = SparkSession(sc)

In [2]:
births = spark.read.csv('./births_transformed.csv', header=True, inferSchema=True)

In [3]:
births.printSchema() #原始

root
 |-- INFANT_ALIVE_AT_REPORT: integer (nullable = true)
 |-- BIRTH_PLACE: integer (nullable = true)
 |-- MOTHER_AGE_YEARS: integer (nullable = true)
 |-- FATHER_COMBINED_AGE: integer (nullable = true)
 |-- CIG_BEFORE: integer (nullable = true)
 |-- CIG_1_TRI: integer (nullable = true)
 |-- CIG_2_TRI: integer (nullable = true)
 |-- CIG_3_TRI: integer (nullable = true)
 |-- MOTHER_HEIGHT_IN: integer (nullable = true)
 |-- MOTHER_PRE_WEIGHT: integer (nullable = true)
 |-- MOTHER_DELIVERY_WEIGHT: integer (nullable = true)
 |-- MOTHER_WEIGHT_GAIN: integer (nullable = true)
 |-- DIABETES_PRE: integer (nullable = true)
 |-- DIABETES_GEST: integer (nullable = true)
 |-- HYP_TENS_PRE: integer (nullable = true)
 |-- HYP_TENS_GEST: integer (nullable = true)
 |-- PREV_BIRTH_PRETERM: integer (nullable = true)



# 创建转换器

In [4]:
import pyspark.ml.feature as ft

由于统计模型只能计算数值数据，所以需要对BIRTH_PLACE进行独热编码进行转换

In [5]:
encoder = ft.OneHotEncoder(inputCol='BIRTH_PLACE', outputCol='BIRTH_PLACE_VEC')

指定数据类型的方式创建离散特征

In [6]:
labels = [
    ('INFANT_ALIVE_AT_REPORT', typ.IntegerType()),
    ('BIRTH_PLACE', typ.StringType()),
    ('MOTHER_AGE_YEARS', typ.IntegerType()),
    ('FATHER_COMBINED_AGE', typ.IntegerType()),
    ('CIG_BEFORE', typ.IntegerType()),
    ('CIG_1_TRI', typ.IntegerType()),
    ('CIG_2_TRI', typ.IntegerType()),
    ('CIG_3_TRI', typ.IntegerType()),
    ('MOTHER_HEIGHT_IN', typ.IntegerType()),
    ('MOTHER_PRE_WEIGHT', typ.IntegerType()),
    ('MOTHER_DELIVERY_WEIGHT', typ.IntegerType()),
    ('MOTHER_WEIGHT_GAIN', typ.IntegerType()),
    ('DIABETES_PRE', typ.IntegerType()),
    ('DIABETES_GEST', typ.IntegerType()),
    ('HYP_TENS_PRE', typ.IntegerType()),
    ('HYP_TENS_GEST', typ.IntegerType()),
    ('PREV_BIRTH_PRETERM', typ.IntegerType())
]


整合特征

In [7]:
featuresCreator = ft.VectorAssembler(   #VectorAssembler() 的inputCols 参数为一个列表，包含所有的特征，组成一个outputCol，名称是特定为‘features’
    inputCols=[col[0] for col in labels[2:]] + [encoder.getOutputCol()],  #独热编码对象的输出 调用了getOutputCol()方法——忽略输出列名称改变的影响
    outputCol='features'
)

# 创建评估器

In [8]:
import pyspark.ml.classification as cl

创建逻辑回归评估器

In [9]:
logistic = cl.LogisticRegression(maxIter=10, regParam=0.01, labelCol='INFANT_ALIVE_AT_REPORT')

如果目标列名称为‘label’，则不必指定labelCol；

如果featuresCreator 输出名称不是 ‘features’， 则需要通过featuresCreator 对象调用getOutputCol() 指定featuresCol


# 创建管道

In [10]:
from pyspark.ml import Pipeline

pipeline =  Pipeline(stages=[encoder, featuresCreator, logistic])


# 拟合模型

划分数据集

In [11]:
births_train, births_test = births.randomSplit([0.7, 0.3], seed=666)

训练模型

测试数据

In [12]:
model = pipeline.fit(births_train)

test_model = model.transform(births_test)
test_model.take(1)

[Row(INFANT_ALIVE_AT_REPORT=0, BIRTH_PLACE=1, MOTHER_AGE_YEARS=14, FATHER_COMBINED_AGE=16, CIG_BEFORE=0, CIG_1_TRI=0, CIG_2_TRI=0, CIG_3_TRI=0, MOTHER_HEIGHT_IN=63, MOTHER_PRE_WEIGHT=180, MOTHER_DELIVERY_WEIGHT=206, MOTHER_WEIGHT_GAIN=26, DIABETES_PRE=0, DIABETES_GEST=0, HYP_TENS_PRE=0, HYP_TENS_GEST=0, PREV_BIRTH_PRETERM=0, BIRTH_PLACE_VEC=SparseVector(9, {1: 1.0}), features=SparseVector(24, {0: 14.0, 1: 16.0, 6: 63.0, 7: 180.0, 8: 206.0, 9: 26.0, 16: 1.0}), rawPrediction=DenseVector([-0.3229, 0.3229]), probability=DenseVector([0.42, 0.58]), prediction=1.0)]

# 评估模型

In [13]:
import pyspark.ml.evaluation as ev

In [14]:
evaluator = ev.BinaryClassificationEvaluator(rawPredictionCol='probability', labelCol='INFANT_ALIVE_AT_REPORT')

print('ROC', evaluator.evaluate(test_model, {evaluator.metricName:'areaUnderROC'}))
print('PR', evaluator.evaluate(test_model, {evaluator.metricName:'areaUnderPR'}))

ROC 0.7343101403374708
PR 0.7169195458786022


# 模型保存

In [15]:
pipelinePath = './infant_oneHotEncoder_Logistic_Pipeline'
pipeline.write().overwrite().save(pipelinePath)

用加载的管道训练数据

In [16]:
loadPipeline = Pipeline.load(pipelinePath)
loadPipeline.fit(births_train).transform(births_test).take(1)

[Row(INFANT_ALIVE_AT_REPORT=0, BIRTH_PLACE=1, MOTHER_AGE_YEARS=14, FATHER_COMBINED_AGE=16, CIG_BEFORE=0, CIG_1_TRI=0, CIG_2_TRI=0, CIG_3_TRI=0, MOTHER_HEIGHT_IN=63, MOTHER_PRE_WEIGHT=180, MOTHER_DELIVERY_WEIGHT=206, MOTHER_WEIGHT_GAIN=26, DIABETES_PRE=0, DIABETES_GEST=0, HYP_TENS_PRE=0, HYP_TENS_GEST=0, PREV_BIRTH_PRETERM=0, BIRTH_PLACE_VEC=SparseVector(9, {1: 1.0}), features=SparseVector(24, {0: 14.0, 1: 16.0, 6: 63.0, 7: 180.0, 8: 206.0, 9: 26.0, 16: 1.0}), rawPrediction=DenseVector([-0.3229, 0.3229]), probability=DenseVector([0.42, 0.58]), prediction=1.0)]

In [17]:
from pyspark.ml import PipelineModel

modelPath = './infant_oneHotEncoder_Logistic_Pipemodel'
model.write().overwrite().save(modelPath)

用加载的模型测试数据

In [18]:
loadPipelineModel = PipelineModel.load(modelPath)
test_reloadModel = loadPipelineModel.transform(births_test)
test_reloadModel.take(1)

[Row(INFANT_ALIVE_AT_REPORT=0, BIRTH_PLACE=1, MOTHER_AGE_YEARS=14, FATHER_COMBINED_AGE=16, CIG_BEFORE=0, CIG_1_TRI=0, CIG_2_TRI=0, CIG_3_TRI=0, MOTHER_HEIGHT_IN=63, MOTHER_PRE_WEIGHT=180, MOTHER_DELIVERY_WEIGHT=206, MOTHER_WEIGHT_GAIN=26, DIABETES_PRE=0, DIABETES_GEST=0, HYP_TENS_PRE=0, HYP_TENS_GEST=0, PREV_BIRTH_PRETERM=0, BIRTH_PLACE_VEC=SparseVector(9, {1: 1.0}), features=SparseVector(24, {0: 14.0, 1: 16.0, 6: 63.0, 7: 180.0, 8: 206.0, 9: 26.0, 16: 1.0}), rawPrediction=DenseVector([-0.3229, 0.3229]), probability=DenseVector([0.42, 0.58]), prediction=1.0)]

# 参数调优

网格搜索法，是根据给定的评估指标，循环遍历定义的参数值列表，估计各个单独的模型，从而选择一个最优的模型。

In [19]:
#导包 tuning
import pyspark.ml.tuning as tune 

In [20]:
# 指定模型和参数列表
logistic = cl.LogisticRegression(labelCol='INFANT_ALIVE_AT_REPORT')

ParamGridBuilder对象使用addGrid()方法将参数添加到网格中：参数一为要优化的模型的参数对象，参数二为要循环的列表的值。最后调用对象的build()方法，来构建网络。

In [21]:
grid = tune.ParamGridBuilder().addGrid(logistic.maxIter, [2, 10, 50]).addGrid(logistic.regParam, [0.01, 0.05, 0.3]).build()

创建模型评估的评估器

In [22]:
evaluator = ev.BinaryClassificationEvaluator(rawPredictionCol='probability', labelCol='INFANT_ALIVE_AT_REPORT')

In [23]:
cv = tune.CrossValidator(estimator=logistic, estimatorParamMaps=grid, evaluator=evaluator)

创建管道

In [24]:
pipeline = Pipeline(stages=[encoder, featuresCreator])
data_transformer = pipeline.fit(births_train)

建立模型


In [25]:
cvModel = cv.fit(data_transformer.transform(births_train))

检验效果

In [26]:
data_train = data_transformer.transform(births_test)
results = cvModel.transform(data_train)

In [27]:
print('ROC', evaluator.evaluate(results, {evaluator.metricName:'areaUnderROC'}))
print('PR', evaluator.evaluate(results, {evaluator.metricName:'areaUnderPR'}))

ROC 0.7353349101843502
PR 0.7193606779239169


查看最优参数

In [28]:
# 两次遍历，提出对应的参数和名称
results = [
    ([
        {key.name: paramValue} for key, paramValue in zip(params.keys(), params.values())
    ], metric) for params, metric in zip(cvModel.getEstimatorParamMaps(), cvModel.avgMetrics)
]

sorted(results, key=lambda el:el[1], reverse=True)[0]

([{'maxIter': 50}, {'regParam': 0.01}], 0.7410335729615861)

训练数据划分

In [29]:
#ChiSqSelector 选择数据的特征数量
selector = ft.ChiSqSelector(numTopFeatures=5, featuresCol=featuresCreator.getOutputCol(), outputCol='selectedFeatures',
                           labelCol='INFANT_ALIVE_AT_REPORT')

创建管道 模型

In [30]:
logistic = cl.LogisticRegression(labelCol='INFANT_ALIVE_AT_REPORT', featuresCol='selectedFeatures')
pipeline = Pipeline(stages=[encoder, featuresCreator, selector])
data_transformer = pipeline.fit(births_train)

In [31]:
tvs = tune.TrainValidationSplit(estimator=logistic, estimatorParamMaps=grid, evaluator=evaluator)

拟合数据到模型

In [33]:
tvsModel = tvs.fit(data_transformer.transform(births_train))
data_train = data_transformer.transform(births_test)
results = tvsModel.transform(data_train)

#评估结果
print('ROC', evaluator.evaluate(results, {evaluator.metricName:'areaUnderROC'}))
print('PR', evaluator.evaluate(results, {evaluator.metricName:'areaUnderPR'}))

ROC 0.6067232391648452
PR 0.5823462144916927
