In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
import pandas as pd
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
sqlContext = SQLContext(sc)
#data source: https://data.nber.org/data/vital-statistics-natality-data.html

In [2]:
import pyspark.sql.types as typ
labels = [
    ('_c0',typ.IntegerType()), #it seems we has to add it here, otherwise the positions of columns would be incorrect
    ('birth_place',typ.IntegerType()),
    ('mothers_age',typ.IntegerType()),
    ('fathers_age',typ.IntegerType()),
    ('prental_care',typ.IntegerType()),
    ('cigarettes_before_pregnancy',typ.IntegerType()),
    ('cigarettes_1_trimester',typ.IntegerType()),
    ('cigarettes_2_trimester',typ.IntegerType()),
    ('cigarettes_3_trimester',typ.IntegerType()),
    ('mothers_height',typ.IntegerType()),
    ('bmi',typ.DecimalType()),
    ('prepregnancy_weight',typ.DecimalType()), #note: it has be Decimal, otherwise the value will become null
    ('delivery_weight',typ.IntegerType()),
    ('weight_gain',typ.IntegerType()),
    ('prepregnancy_diabetes',typ.StringType()),
    ('gestational_diabetes',typ.StringType()),
    ('prepregnancy_hypertension',typ.StringType()),
    ('gestational_hypertension',typ.StringType()),
    ('hypertension_eclampsia',typ.StringType()),
    ('previous_preterm_birth',typ.StringType()),
    ('infant_sex',typ.StringType()),
    ('infant_live',typ.StringType()),
]
schema = typ.StructType([
    typ.StructField(e[0],e[1],False) for e in labels
])
#This is for databrick
#births = spark.read.csv('/FileStore/tables/nat18.csv', header = True, schema = schema)
#this is for jupyter:
births = spark.read.csv('nat18.csv', header = True, schema = schema)

In [3]:
bool(births.head(1)) #if the result is False，then the dataset has missing values，otherwise no.

In [4]:
births = births.filter((births.infant_live != 'U'))
births = births.filter((births.prepregnancy_diabetes != 'U'))
births = births.filter((births.gestational_diabetes != 'U'))
births = births.filter((births.prepregnancy_hypertension != 'U'))
births = births.filter((births.gestational_hypertension!= 'U'))
births = births.filter((births.hypertension_eclampsia!= 'U'))
births = births.filter((births.previous_preterm_birth!= 'U'))
births = births.filter((births.fathers_age!= 99))
births = births.filter((births.cigarettes_before_pregnancy!= 99))
births = births.filter((births.cigarettes_1_trimester!= 99))
births = births.filter((births.cigarettes_2_trimester!= 99))
births = births.filter((births.cigarettes_3_trimester!= 99))
births = births.filter((births.mothers_height!= 99))
births = births.filter((births.bmi!= 100))
births = births.filter((births.prepregnancy_weight!= 999))
births = births.filter((births.delivery_weight!= 999))
births = births.filter((births.weight_gain!= 99))

In [5]:
births.groupby('infant_live').count().show() #imbalaced classes

In [6]:
#Basical dataset statistical check
#step1: if the dataset contains duplicate rows:
print('Count of rows: {0}'.format(births.count()))
print('Count of distinct rows: {0}'.format(births.distinct().count()))
#the results show our dataset contains no same rows

In [7]:
#step3: For numerical data, apply the describe() to get statistical summay.
numerical = ['mothers_age', 'fathers_age', 'cigarettes_before_pregnancy','cigarettes_1_trimester'
            ,'cigarettes_2_trimester','cigarettes_3_trimester','mothers_height','bmi'
            ,'prepregnancy_weight','delivery_weight','weight_gain']
desc = births.describe(numerical)
desc.toPandas()

In [8]:
#feature engineering - covert categorical data to dummy variable
import pyspark.sql.functions as fn
#1.
categ = births.select('infant_live').distinct().rdd.flatMap(lambda x:x).collect()
exprs = [fn.when(fn.col('infant_live') == Y,1).otherwise(0)\
            .alias(str(Y)) for Y in categ]
births = births.select(exprs+births.columns)
#drop useless columns and rename target column
births = births.drop('N')
births = births.drop('_c0')
births = births.drop('infant_live')
births = births.withColumnRenamed('Y','infant_live_encoded')

In [9]:
births.printSchema()

In [10]:
#mother's age and smoking before pregnancy
ax = sns.pairplot(births.toPandas()[['mothers_age','cigarettes_before_pregnancy']])
ax.fig.suptitle("Vital Data Smoking Pair Plot", y=1.01)

In [11]:
import pyspark.ml.feature as ft
import pyspark.sql.types as typ
from pyspark.ml import Pipeline
#Feature engineering on categrical data by create multiple pipelines
#1.
infant_sex_pipe = Pipeline(stages=[ft.StringIndexer(inputCol='infant_sex', handleInvalid='skip',outputCol = "indexed_sex"), 
                                 ft.OneHotEncoder(inputCol='indexed_sex',outputCol = "infant_sex_encoded")])
#2.
prepregnancy_diabetes_pipe = Pipeline(stages=[ft.StringIndexer(inputCol='prepregnancy_diabetes', handleInvalid='skip',outputCol = "indexed_prepregnancy_diabetes"), 
    ft.OneHotEncoder(inputCol='indexed_prepregnancy_diabetes',outputCol = "prepregnancy_diabetes_encoded")])
#3.
gestational_diabetes_pipe = Pipeline(stages=[ft.StringIndexer(inputCol='gestational_diabetes', handleInvalid='skip',outputCol = "indexed_gestational_diabetes"), 
    ft.OneHotEncoder(inputCol='indexed_gestational_diabetes',outputCol = "gestational_diabetes_encoded")])
#4.
prepregnancy_hypertension_pipe = Pipeline(stages=[ft.StringIndexer(inputCol='prepregnancy_hypertension', handleInvalid='skip',outputCol = "indexed_prepregnancy_hypertension"), 
    ft.OneHotEncoder(inputCol='indexed_prepregnancy_hypertension',outputCol = "prepregnancy_hypertension_encoded")])
#5.
gestational_hypertension_pipe = Pipeline(stages=[ft.StringIndexer(inputCol='gestational_hypertension', handleInvalid='skip',outputCol = "indexed_gestational_hypertension"), 
    ft.OneHotEncoder(inputCol='indexed_gestational_hypertension',outputCol = "gestational_hypertension_encoded")])
#6.
hypertension_eclampsia_pipe = Pipeline(stages=[ft.StringIndexer(inputCol='hypertension_eclampsia', handleInvalid='skip',outputCol = "indexed_hypertension_eclampsia"), 
    ft.OneHotEncoder(inputCol='indexed_hypertension_eclampsia',outputCol = "hypertension_eclampsia_encoded")])
#7.
previous_preterm_birth_pipe = Pipeline(stages=[ft.StringIndexer(inputCol='previous_preterm_birth', handleInvalid='skip',outputCol = "indexed_previous_preterm_birth"), 
    ft.OneHotEncoder(inputCol='indexed_previous_preterm_birth',outputCol = "previous_preterm_birth_encoded")])


In [12]:
#Feature engineering on numerical data by create multiple pipelines
numerical_pipe = Pipeline(stages = [ft.VectorAssembler(inputCols=['birth_place', 'mothers_age', 'fathers_age', 'prental_care', 'cigarettes_before_pregnancy', 'cigarettes_1_trimester'
                                    ,'cigarettes_2_trimester','cigarettes_3_trimester','mothers_height'
                                    ,'bmi','prepregnancy_weight','delivery_weight','weight_gain'], outputCol = 'num_features'),
                                   ft.StandardScaler(inputCol="num_features", outputCol="scaledFeatures")])                                 
                                    

In [13]:
#zip string pipe and numeric pipe
all_features = Pipeline(stages = [numerical_pipe,
                                 infant_sex_pipe,
                                 prepregnancy_diabetes_pipe,
                                 gestational_diabetes_pipe,
                                 prepregnancy_hypertension_pipe,
                                 gestational_hypertension_pipe,
                                 hypertension_eclampsia_pipe,
                                 previous_preterm_birth_pipe,
                                 ft.VectorAssembler(inputCols = ['scaledFeatures',
                                                                'infant_sex_encoded',
                                                                'prepregnancy_diabetes_encoded',
                                                                'gestational_diabetes_encoded',
                                                                'prepregnancy_hypertension_encoded',
                                                                'gestational_hypertension_encoded',
                                                                'hypertension_eclampsia_encoded',
                                                                'previous_preterm_birth_encoded'],
                                                   outputCol = 'features')])

In [14]:
#build a logistic regression model
import pyspark.ml.classification as cl
logistic = cl.LogisticRegression(labelCol ='infant_live_encoded')

In [15]:
#build a simple pipeline
pipeline = Pipeline(stages = [all_features, logistic])

In [16]:
births_v1 = births
#split dataset into training and testing sets
births_train, births_val, births_test = births_v1.randomSplit([0.7,0.2,0.1],seed = 666)
#fit and transform 
model = pipeline.fit(births_train)
test_model = model.transform(births_val)
#check the model details
test_model.take(1)

In [17]:
import pyspark.ml.evaluation as ev
#Binary Classification evaluation
evaluator = ev.BinaryClassificationEvaluator(
                rawPredictionCol = 'probability',
                labelCol = 'infant_live_encoded')
#rawPredictionCol can be either rawPredictionCol or probability
print(evaluator.evaluate(test_model, {evaluator.metricName: 'areaUnderROC'}))
print(evaluator.evaluate(test_model, {evaluator.metricName: 'areaUnderPR'}))

In [18]:
#Average Accuracy rate:
test_model.select(fn.avg(fn.expr('prediction = infant_live_encoded ').cast('float'))).show()

In [19]:
births_v2 = births
major_df = births_v2.filter(fn.col("infant_live_encoded")==1)
minor_df = births_v2.filter(fn.col("infant_live_encoded")==0)
ratio = int(major_df.count()/minor_df.count())
print("ratio: {}".format(ratio))

In [20]:
#Since the ratio and the dataset size are fairly large, we decided to perform undersampling:
sampled_majority_df = major_df.sample(False, 1/ratio)
combined_df = sampled_majority_df.unionAll(minor_df)
combined_df.groupby('infant_live_encoded').count().show()

In [21]:
#test the undersampling dataset performance
train, val, test = combined_df.randomSplit([0.7,0.2,0.1],seed = 666)
model2 = pipeline.fit(train)
test_model2 = model.transform(val)
#print out the results:
print(evaluator.evaluate(test_model2, {evaluator.metricName: 'areaUnderROC'}))
print(evaluator.evaluate(test_model2, {evaluator.metricName: 'areaUnderPR'}))

In [22]:
test_model2.select(fn.avg(fn.expr('prediction = infant_live_encoded ').cast('float'))).show()

In [23]:
#It seems undersampling dropped our model performance, therefore we will just go with the orignial 
#dataset in the future.
#build a new pipeline for grid search
#1.numerical data
numerical_pipe2 = Pipeline(stages = [ft.VectorAssembler(inputCols=['birth_place', 'mothers_age', 'fathers_age', 'prental_care', 'cigarettes_before_pregnancy', 'cigarettes_1_trimester'
                                    ,'cigarettes_2_trimester','cigarettes_3_trimester','mothers_height'
                                    ,'bmi','prepregnancy_weight','delivery_weight','weight_gain'], outputCol = 'num_features'),
                                   ft.StandardScaler(inputCol="num_features", outputCol="scaledFeatures",withMean = True, withStd = True)]) 
#
all_features2 = Pipeline(stages = [numerical_pipe2,
                                 infant_sex_pipe,
                                 prepregnancy_diabetes_pipe,
                                 gestational_diabetes_pipe,
                                 prepregnancy_hypertension_pipe,
                                 gestational_hypertension_pipe,
                                 hypertension_eclampsia_pipe,
                                 previous_preterm_birth_pipe,
                                 ft.VectorAssembler(inputCols = ['scaledFeatures',
                                                                'infant_sex_encoded',
                                                                'prepregnancy_diabetes_encoded',
                                                                'gestational_diabetes_encoded',
                                                                'prepregnancy_hypertension_encoded',
                                                                'gestational_hypertension_encoded',
                                                                'hypertension_eclampsia_encoded',
                                                                'previous_preterm_birth_encoded'],
                                                   outputCol = 'features')])

In [24]:
import pyspark.ml.tuning as tune
lr = cl.LogisticRegression(labelCol ='infant_live_encoded')

grid = tune.ParamGridBuilder().addGrid(lr.maxIter,[2,5,10])\
                              .addGrid(lr.regParam,[0.01,0.05,0.3])\
                              .addGrid(lr.elasticNetParam,[0.2,0.5,0.8])\
                              .build()
evaluator = ev.BinaryClassificationEvaluator(
                rawPredictionCol = 'probability',
                labelCol = 'infant_live_encoded')
cv = tune.CrossValidator(
                estimator = lr,
                estimatorParamMaps = grid,
                evaluator = evaluator)

In [25]:
data_transformer = all_features2.fit(births_train)
cvModel = cv.fit(data_transformer.transform(births_train))

In [26]:
#check the cv model results:
data_train = data_transformer.transform(births_test)
results = cvModel.transform(data_train)
print(evaluator.evaluate(results,{evaluator.metricName:'areaUnderROC'}))
print(evaluator.evaluate(results,{evaluator.metricName:'areaUnderPR'}))

In [27]:
from handyspark import *
bcm = BinaryClassificationMetrics(data_train, scoreCol='probability', labelCol='infant_live_encoded')
fig, axs = plt.subplots(1, 2, figsize=(12, 4))
bcm.plot_roc_curve(ax=axs[0])
bcm.plot_pr_curve(ax=axs[1])

In [28]:
#get the best model hyperparameters:
results = [
  (
  [
    {key.name:paramValue}
    for key, paramValue in zip(params.keys(),params.values())
  ],metric)
  for params, metric in zip(cvModel.getEstimatorParamMaps(),
                            cvModel.avgMetrics)
]
sorted(results, key = lambda e1:e1[1],
      reverse = True)[0]

In [29]:
#second model: random forest