In [27]:
from pyspark.sql import SparkSession
from pyspark.ml.classification import LogisticRegression
import pyspark.sql.types as typ
import pyspark.sql.functions as fn

from pyspark.ml.feature import OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline

In [28]:
spark = SparkSession.builder.appName('Intro to MLlib').getOrCreate()

In [29]:
labels = [
    ('INFANT_ALIVE_AT_REPORT', typ.IntegerType()),
    ('BIRTH_PLACE', typ.StringType()),
    ('MOTHER_AGE_YEARS', typ.IntegerType()),
    ('FATHER_COMBINED_AGE', typ.IntegerType()),
    ('CIG_BEFORE', typ.IntegerType()),
    ('CIG_1_TRI', typ.IntegerType()),
    ('CIG_2_TRI', typ.IntegerType()),
    ('CIG_3_TRI', typ.IntegerType()),
    ('MOTHER_HEIGHT_IN', typ.IntegerType()),
    ('MOTHER_PRE_WEIGHT', typ.IntegerType()),
    ('MOTHER_DELIVERY_WEIGHT', typ.IntegerType()),
    ('MOTHER_WEIGHT_GAIN', typ.IntegerType()),
    ('DIABETES_PRE', typ.IntegerType()),
    ('DIABETES_GEST', typ.IntegerType()),
    ('HYP_TENS_PRE', typ.IntegerType()),
    ('HYP_TENS_GEST', typ.IntegerType()),
    ('PREV_BIRTH_PRETERM', typ.IntegerType())
]

schema = typ.StructType([typ.StructField(e[0], e[1], False) for e in labels])

births = spark.read.csv('births_transformed.csv.gz',
                        header=True,
                        schema=schema)

In [30]:
births = births.withColumn('BIRTH_PLACE_INT',fn.col('BIRTH_PLACE').cast(typ.IntegerType()) )

In [31]:
encoder = OneHotEncoder(inputCol='BIRTH_PLACE_INT', outputCol='BIRTH_PLACE_VEC')

In [32]:
vec_assemble = VectorAssembler(inputCols=[col for col in births.columns[2:]]+[encoder.getOutputCol()], outputCol='features')

In [33]:
births.printSchema()

root
 |-- INFANT_ALIVE_AT_REPORT: integer (nullable = true)
 |-- BIRTH_PLACE: string (nullable = true)
 |-- MOTHER_AGE_YEARS: integer (nullable = true)
 |-- FATHER_COMBINED_AGE: integer (nullable = true)
 |-- CIG_BEFORE: integer (nullable = true)
 |-- CIG_1_TRI: integer (nullable = true)
 |-- CIG_2_TRI: integer (nullable = true)
 |-- CIG_3_TRI: integer (nullable = true)
 |-- MOTHER_HEIGHT_IN: integer (nullable = true)
 |-- MOTHER_PRE_WEIGHT: integer (nullable = true)
 |-- MOTHER_DELIVERY_WEIGHT: integer (nullable = true)
 |-- MOTHER_WEIGHT_GAIN: integer (nullable = true)
 |-- DIABETES_PRE: integer (nullable = true)
 |-- DIABETES_GEST: integer (nullable = true)
 |-- HYP_TENS_PRE: integer (nullable = true)
 |-- HYP_TENS_GEST: integer (nullable = true)
 |-- PREV_BIRTH_PRETERM: integer (nullable = true)
 |-- BIRTH_PLACE_INT: integer (nullable = true)



In [34]:
vec_assemble.getInputCols()

['MOTHER_AGE_YEARS',
 'FATHER_COMBINED_AGE',
 'CIG_BEFORE',
 'CIG_1_TRI',
 'CIG_2_TRI',
 'CIG_3_TRI',
 'MOTHER_HEIGHT_IN',
 'MOTHER_PRE_WEIGHT',
 'MOTHER_DELIVERY_WEIGHT',
 'MOTHER_WEIGHT_GAIN',
 'DIABETES_PRE',
 'DIABETES_GEST',
 'HYP_TENS_PRE',
 'HYP_TENS_GEST',
 'PREV_BIRTH_PRETERM',
 'BIRTH_PLACE_INT',
 'BIRTH_PLACE_VEC']

In [40]:
logistic = LogisticRegression(
    maxIter=10,
    regParam=0.01,
    labelCol='INFANT_ALIVE_AT_REPORT')

In [41]:
pipeline = Pipeline(stages=[encoder, vec_assemble, logistic])

In [42]:
births_train, births_test = births.randomSplit([0.7, 0.3], seed=666)

In [43]:
train, test, val = births.randomSplit([0.7, 0.2, 0.1], seed=666)

In [44]:
model = pipeline.fit(births_train)
test_model = model.transform(births_test)

In [46]:
births_test.take(1)

[Row(INFANT_ALIVE_AT_REPORT=0, BIRTH_PLACE='1', MOTHER_AGE_YEARS=13, FATHER_COMBINED_AGE=99, CIG_BEFORE=0, CIG_1_TRI=0, CIG_2_TRI=0, CIG_3_TRI=0, MOTHER_HEIGHT_IN=66, MOTHER_PRE_WEIGHT=133, MOTHER_DELIVERY_WEIGHT=135, MOTHER_WEIGHT_GAIN=2, DIABETES_PRE=0, DIABETES_GEST=0, HYP_TENS_PRE=0, HYP_TENS_GEST=0, PREV_BIRTH_PRETERM=0, BIRTH_PLACE_INT=1)]

In [49]:
# tuning model
import pyspark.ml.evaluation as ev
import pyspark.ml.tuning as tune

evaluator = ev.BinaryClassificationEvaluator(rawPredictionCol='probability', labelCol='INFANT_ALIVE_AT_REPORT')

In [51]:
print(evaluator.evaluate(test_model, {evaluator.metricName: 'areaUnderROC'}))
print(evaluator.evaluate(test_model, {evaluator.metricName: 'areaUnderPR'}))

0.732649151771797
0.7094885863079984


In [53]:
pipelinePath = './infant_model'
model.write().overwrite().save(pipelinePath)

In [61]:
# model tuning
grid = tune.ParamGridBuilder().addGrid(logistic.maxIter, [2,10,50]).addGrid(logistic.regParam, [0.01,0.05,0.3]).build()
cv = tune.CrossValidator(estimator=logistic, estimatorParamMaps=grid, evaluator=evaluator)

pipeline = Pipeline(stages=[encoder, vec_assemble])
piped_data = pipeline.fit(births).transform(births)

train, test = piped_data.randomSplit([.7, .3], seed=123)

cvModel = cv.fit(train)
bestModel = cvModel.bestModel


In [63]:
acc = bestModel.evaluate(dataset=test)

In [64]:
acc.accuracy

0.6828872668288727

In [68]:
results = bestModel.transform(train)
print(evaluator.evaluate(results,
    {evaluator.metricName: 'areaUnderROC'}))
print(evaluator.evaluate(results,
    {evaluator.metricName: 'areaUnderPR'}))

0.7414689085108581
0.7183306759879492
