In [2]:
import pyspark
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

#Create spark configuration object
conf = SparkConf()
conf.setMaster("local").setAppName("Get hourly kpi data")
#Create spark context and sparksession
sc = SparkContext.getOrCreate(conf=conf)
spark = SparkSession(sc)

# Loading the data
##### http://www.tomdrabas.com/data/LearningPySpark/births_transformed.csv.gz

In [4]:
import pyspark.sql.types as typ
labels = [
    ('INFANT_ALIVE_AT_REPORT', typ.IntegerType()),
    ('BIRTH_PLACE', typ.StringType()),
    ('MOTHER_AGE_YEARS', typ.IntegerType()),
    ('FATHER_COMBINED_AGE', typ.IntegerType()),
    ('CIG_BEFORE', typ.IntegerType()),
    ('CIG_1_TRI', typ.IntegerType()),
    ('CIG_2_TRI', typ.IntegerType()),
    ('CIG_3_TRI', typ.IntegerType()),
    ('MOTHER_HEIGHT_IN', typ.IntegerType()),
    ('MOTHER_PRE_WEIGHT', typ.IntegerType()),
    ('MOTHER_DELIVERY_WEIGHT', typ.IntegerType()),
    ('MOTHER_WEIGHT_GAIN', typ.IntegerType()),
    ('DIABETES_PRE', typ.IntegerType()),
    ('DIABETES_GEST', typ.IntegerType()),
    ('HYP_TENS_PRE', typ.IntegerType()),
    ('HYP_TENS_GEST', typ.IntegerType()),
    ('PREV_BIRTH_PRETERM', typ.IntegerType())
]

schema = typ.StructType([ typ.StructField(e[0], e[1], False) for e in labels ])
births = spark.read.csv('births_transformed.csv',
                            header=True,
                            schema=schema
                        )
                        

In [9]:
births.show(n=5)

+----------------------+-----------+----------------+-------------------+----------+---------+---------+---------+----------------+-----------------+----------------------+------------------+------------+-------------+------------+-------------+------------------+
|INFANT_ALIVE_AT_REPORT|BIRTH_PLACE|MOTHER_AGE_YEARS|FATHER_COMBINED_AGE|CIG_BEFORE|CIG_1_TRI|CIG_2_TRI|CIG_3_TRI|MOTHER_HEIGHT_IN|MOTHER_PRE_WEIGHT|MOTHER_DELIVERY_WEIGHT|MOTHER_WEIGHT_GAIN|DIABETES_PRE|DIABETES_GEST|HYP_TENS_PRE|HYP_TENS_GEST|PREV_BIRTH_PRETERM|
+----------------------+-----------+----------------+-------------------+----------+---------+---------+---------+----------------+-----------------+----------------------+------------------+------------+-------------+------------+-------------+------------------+
|                     0|          1|              29|                 99|         0|        0|        0|        0|              99|              999|                   999|                99|           0| 

# Creating transformers

In [16]:
import pyspark.ml.feature as ft

# Cast column from string to int
births = births.withColumn('BIRTH_PLACE_INT', 
                            births['BIRTH_PLACE'].cast(typ.IntegerType()))
encoder = ft.OneHotEncoder(
                            inputCol='BIRTH_PLACE_INT',
                            outputCol='BIRTH_PLACE_VEC'
                        )

featuresCreator = ft.VectorAssembler(
        inputCols=[ col[0] for col in labels[2:]] + [encoder.getOutputCol()],
        outputCol='features'
    )

# Creating an estimator

In [17]:
import pyspark.ml.classification as cl
logistic = cl.LogisticRegression(
    maxIter=10,
    regParam=0.01,
    labelCol='INFANT_ALIVE_AT_REPORT'
)

# Creating a pipeline

In [18]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[
        encoder,
        featuresCreator,
        logistic
    ])

# Fitting the model

In [19]:
births_train, births_test = births.randomSplit([0.7, 0.3], seed=666)

In [20]:
model = pipeline.fit(births_train)
test_model = model.transform(births_test)

In [40]:
test_model.show(n=2)

+----------------------+-----------+----------------+-------------------+----------+---------+---------+---------+----------------+-----------------+----------------------+------------------+------------+-------------+------------+-------------+------------------+---------------+---------------+--------------------+--------------------+--------------------+----------+
|INFANT_ALIVE_AT_REPORT|BIRTH_PLACE|MOTHER_AGE_YEARS|FATHER_COMBINED_AGE|CIG_BEFORE|CIG_1_TRI|CIG_2_TRI|CIG_3_TRI|MOTHER_HEIGHT_IN|MOTHER_PRE_WEIGHT|MOTHER_DELIVERY_WEIGHT|MOTHER_WEIGHT_GAIN|DIABETES_PRE|DIABETES_GEST|HYP_TENS_PRE|HYP_TENS_GEST|PREV_BIRTH_PRETERM|BIRTH_PLACE_INT|BIRTH_PLACE_VEC|            features|       rawPrediction|         probability|prediction|
+----------------------+-----------+----------------+-------------------+----------+---------+---------+---------+----------------+-----------------+----------------------+------------------+------------+-------------+------------+-------------+-------------

In [39]:
test_model.take(1)

[Row(INFANT_ALIVE_AT_REPORT=0, BIRTH_PLACE='1', MOTHER_AGE_YEARS=14, FATHER_COMBINED_AGE=16, CIG_BEFORE=0, CIG_1_TRI=0, CIG_2_TRI=0, CIG_3_TRI=0, MOTHER_HEIGHT_IN=63, MOTHER_PRE_WEIGHT=180, MOTHER_DELIVERY_WEIGHT=206, MOTHER_WEIGHT_GAIN=26, DIABETES_PRE=0, DIABETES_GEST=0, HYP_TENS_PRE=0, HYP_TENS_GEST=0, PREV_BIRTH_PRETERM=0, BIRTH_PLACE_INT=1, BIRTH_PLACE_VEC=SparseVector(9, {1: 1.0}), features=SparseVector(24, {0: 14.0, 1: 16.0, 6: 63.0, 7: 180.0, 8: 206.0, 9: 26.0, 16: 1.0}), rawPrediction=DenseVector([-0.2759, 0.2759]), probability=DenseVector([0.4315, 0.5685]), prediction=1.0)]

# Evaluating the performance of the model

In [41]:
import pyspark.ml.evaluation as ev
evaluator = ev.BinaryClassificationEvaluator(
        rawPredictionCol='probability',
        labelCol='INFANT_ALIVE_AT_REPORT'
    )
print(evaluator.evaluate(test_model, {evaluator.metricName: 'areaUnderROC'}))
print(evaluator.evaluate(test_model, {evaluator.metricName: 'areaUnderPR'}))

0.7353092462007923
0.7194629537417253


# Saving the model
PySpark allows you to save the Pipeline definition for later use. It not
only saves the pipeline structure, but also all the definitions of all the
Transformers and Estimators

In [None]:
pipelinePath = './infant_oneHotEncoder_Logistic_Pipeline'
pipeline.write().overwrite().save(pipelinePath)