<h1>Predict chances of infant survival with Spark ML</h1>

<h3>Schema Prep and Data Loading</h3>

In [3]:
import pyspark.sql.types as typ

labels = [
    ('INFANT_ALIVE_AT_REPORT', typ.IntegerType()),
    ('BIRTH_PLACE', typ.StringType()),
    ('MOTHER_AGE_YEARS', typ.IntegerType()),
    ('FATHER_COMBINED_AGE', typ.IntegerType()),
    ('CIG_BEFORE', typ.IntegerType()),
    ('CIG_1_TRI', typ.IntegerType()),
    ('CIG_2_TRI', typ.IntegerType()),
    ('CIG_3_TRI', typ.IntegerType()),
    ('MOTHER_HEIGHT_IN', typ.IntegerType()),
    ('MOTHER_PRE_WEIGHT', typ.IntegerType()),
    ('MOTHER_DELIVERY_WEIGHT', typ.IntegerType()),
    ('MOTHER_WEIGHT_GAIN', typ.IntegerType()),
    ('DIABETES_PRE', typ.IntegerType()),
    ('DIABETES_GEST', typ.IntegerType()),
    ('HYP_TENS_PRE', typ.IntegerType()),
    ('HYP_TENS_GEST', typ.IntegerType()),
    ('PREV_BIRTH_PRETERM', typ.IntegerType())
]

#creating list of StructField that eventually becomes schema

schema = typ.StructType([ typ.StructField(e[0], e[1], False) for e in labels ])

births = spark.read.csv('/FileStore/tables/m10h45tz1490702575759/births_transformed_csv-da688.gz', 
                        header=True, 
                        schema=schema)

In [4]:
display(births)

<h3>Create Transformers</h3>

In [6]:
import pyspark.ml.feature as ft

births = births \
    .withColumn(       'BIRTH_PLACE_INT', 
                births['BIRTH_PLACE'] \
                    .cast(typ.IntegerType()))

In [7]:
births.columns

In [8]:
display(births)

In [9]:
births.printSchema()

<p>Having done this, we can now create our first Transformer.</p>

In [11]:
#Algo will take all possible words - UP MP AP GUJ AS
[0,1,0,0,0]

encoder = ft.OneHotEncoder(
    inputCol='BIRTH_PLACE_INT', 
    outputCol='BIRTH_PLACE_VEC')

<p>Let's now create a single column with all the features collated together.</p>

In [13]:
# Understanding VectorAssembler
df = spark.createDataFrame([(12,3,4),(10,5,6)], ['a','b','c'])
#display(df)
display(ft.VectorAssembler(inputCols=['a','b','c'], outputCol='features').transform(df))

In [14]:
# Transforms multiple numeric columns into a single column with vector representation
featuresCreator = ft.VectorAssembler(
    inputCols=[
        col[0] 
        for col 
        in labels[2:]] + \
    [encoder.getOutputCol()], 
    outputCol='features'
)

<h3>Create an estimator</h3>

In [16]:
import pyspark.ml.classification as cl
# Create a model

logistic = cl.LogisticRegression(
    maxIter=10, 
    regParam=0.01, 
    labelCol='INFANT_ALIVE_AT_REPORT')

<h3>Create a pipleline</h3>

<p>All that is left now is to creat a Pipeline and fit the model. First, let's load the Pipeline from the package.</p>

In [18]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[
        encoder,            #Creates a new df with one additional column - birth_place_int
        featuresCreator,    #Feature vector needs to be created so that estimator can create model - from multiple col to single col 
        logistic            #logistic take input of one vector
    ])

<h3>Fit the model </h3>
<p>Split data into train and test model</p>

In [20]:
births_train, births_test = births \
    .randomSplit([0.7, 0.3], seed=666)

<p>Run pipeline and estimate model</p>

In [22]:
model = pipeline.fit(births_train)
test_model = model.transform(births_test)

In [23]:
test_model.take(1)

<h3>Model performance</h3>
<p>Checking model performanace</p>

In [25]:
import pyspark.ml.evaluation as ev

evaluator = ev.BinaryClassificationEvaluator(
    rawPredictionCol='probability', 
    labelCol='INFANT_ALIVE_AT_REPORT')

print(evaluator.evaluate(test_model, {evaluator.metricName: 'areaUnderROC'}))
print(evaluator.evaluate(test_model, {evaluator.metricName: 'areaUnderPR'}))

<h2>Persistance - Saving the pipeline and model</h2>
<p>PySpark allows you to save the Pipeline definition for later use.</p>

In [27]:
pipelinePath = '/FileStore/tables/m10h45tz1490702575759/infant_oneHotEncoder_Logistic_Pipeline'
pipeline.write().overwrite().save(pipelinePath)

<p>load it up later and use straight away to .fit(...) and predict.</p>

In [29]:
loadedPipeline = Pipeline.load(pipelinePath)
loadedPipeline \
    .fit(births_train)\
    .transform(births_test)\
    .take(1)

In [30]:
from pyspark.ml import PipelineModel

modelPath = '/FileStore/tables/m10h45tz1490702575759/infant_oneHotEncoder_Logistic_PipelineModel'
model.write().overwrite().save(modelPath)

loadedPipelineModel = PipelineModel.load(modelPath)
# Now, predicting for test dataset
test_loadedModel = loadedPipelineModel.transform(births_test)

<h2>Parameter hyper-tuning</h2>

In [32]:
import pyspark.ml.tuning as tune
logistic = cl.LogisticRegression(
    labelCol='INFANT_ALIVE_AT_REPORT')

grid = tune.ParamGridBuilder() \
    .addGrid(logistic.maxIter,  
             [2, 10, 50]) \
    .addGrid(logistic.regParam, 
             [0.01, 0.05, 0.3]) \
    .build()

In [33]:
evaluator = ev.BinaryClassificationEvaluator(
    rawPredictionCol='probability', 
    labelCol='INFANT_ALIVE_AT_REPORT')

In [34]:
cv = tune.CrossValidator(
    estimator=logistic, 
    estimatorParamMaps=grid, 
    evaluator=evaluator
)

In [35]:
pipeline = Pipeline(stages=[encoder,featuresCreator])
data_transformer = pipeline.fit(births_train)

In [36]:
cvModel = cv.fit(data_transformer.transform(births_train))

In [37]:
data_train = data_transformer \
    .transform(births_test)
results = cvModel.transform(data_train)

print(evaluator.evaluate(results, 
     {evaluator.metricName: 'areaUnderROC'}))
print(evaluator.evaluate(results, 
     {evaluator.metricName: 'areaUnderPR'}))

In [38]:
results

In [39]:
sorted(results, key=lambda el: el[1], reverse=True)[0]

In [40]:
selector = ft.ChiSqSelector(
    numTopFeatures=5, 
    featuresCol=featuresCreator.getOutputCol(), 
    outputCol='selectedFeatures',
    labelCol='INFANT_ALIVE_AT_REPORT'
)

logistic = cl.LogisticRegression(
    labelCol='INFANT_ALIVE_AT_REPORT',
    featuresCol='selectedFeatures'
)

pipeline = Pipeline(stages=[encoder,featuresCreator,selector])
data_transformer = pipeline.fit(births_train)

In [41]:
tvs = tune.TrainValidationSplit(
    estimator=logistic, 
    estimatorParamMaps=grid, 
    evaluator=evaluator
)

In [42]:
tvsModel = tvs.fit(
    data_transformer \
        .transform(births_train)
)

data_train = data_transformer \
    .transform(births_test)
results = tvsModel.transform(data_train)

print(evaluator.evaluate(results, 
     {evaluator.metricName: 'areaUnderROC'}))
print(evaluator.evaluate(results, 
     {evaluator.metricName: 'areaUnderPR'}))