In [1]:
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster('local').setAppName('myApp')
sc   = SparkContext(conf=conf)

In [2]:
path = '/home/userx/Downloads/2018/09/19/'
json1 ='13:00:00_13:59:59_S0.json'

In [2]:
from pyspark.sql import SQLContext
sQ = SQLContext(sc)

df = sQ.createDataFrame([
(1, 144.5, 5.9, 33, 'M'),
(2, 167.2, 5.4, 45, 'M'),
(3, 124.1, 5.2, 23, 'F'),
(4, 144.5, 5.9, 33, 'M'),
(5, 133.2, 5.7, 54, 'F'),
(3, 124.1, 5.2, 23, 'F'),
(5, 129.2, 5.3, 42, 'M'),
], ['id', 'weight', 'height', 'age', 'gender'])

In [4]:
df.printSchema()

root
 |-- id: long (nullable = true)
 |-- weight: double (nullable = true)
 |-- height: double (nullable = true)
 |-- age: long (nullable = true)
 |-- gender: string (nullable = true)



In [5]:
ff = 'file:///home/userx/Downloads/births_transformed.csv'
rdd = sc.textFile(ff)
rdd.take(2)

['INFANT_ALIVE_AT_REPORT,BIRTH_PLACE,MOTHER_AGE_YEARS,FATHER_COMBINED_AGE,CIG_BEFORE,CIG_1_TRI,CIG_2_TRI,CIG_3_TRI,MOTHER_HEIGHT_IN,MOTHER_PRE_WEIGHT,MOTHER_DELIVERY_WEIGHT,MOTHER_WEIGHT_GAIN,DIABETES_PRE,DIABETES_GEST,HYP_TENS_PRE,HYP_TENS_GEST,PREV_BIRTH_PRETERM',
 '0,1,29,99,0,0,0,0,99,999,999,99,0,0,0,0,0']

In [6]:
head = rdd.first()
data = rdd.filter(lambda x : x!=head)
data.take(5)

['0,1,29,99,0,0,0,0,99,999,999,99,0,0,0,0,0',
 '0,1,22,29,0,0,0,0,65,180,198,18,0,0,0,0,0',
 '0,1,38,40,0,0,0,0,63,155,167,12,0,0,0,0,0',
 '0,1,39,42,0,0,0,0,60,128,152,24,0,0,0,0,1',
 '0,1,18,99,6,4,2,2,61,110,130,20,0,0,0,0,0']

In [13]:
datX = sQ.read.csv('file:///home/userx/Downloads/births_transformed.csv.gz',header=True)

In [14]:
datX.head()

Row(INFANT_ALIVE_AT_REPORT='0', BIRTH_PLACE='1', MOTHER_AGE_YEARS='29', FATHER_COMBINED_AGE='99', CIG_BEFORE='0', CIG_1_TRI='0', CIG_2_TRI='0', CIG_3_TRI='0', MOTHER_HEIGHT_IN='99', MOTHER_PRE_WEIGHT='999', MOTHER_DELIVERY_WEIGHT='999', MOTHER_WEIGHT_GAIN='99', DIABETES_PRE='0', DIABETES_GEST='0', HYP_TENS_PRE='0', HYP_TENS_GEST='0', PREV_BIRTH_PRETERM='0')

## This is an example from the tutorial

In [3]:
import pyspark.sql.types as typ
labels = [
            ('INFANT_ALIVE_AT_REPORT', typ.IntegerType()),
            ('BIRTH_PLACE', typ.StringType()),
            ('MOTHER_AGE_YEARS', typ.IntegerType()),
            ('FATHER_COMBINED_AGE', typ.IntegerType()),
            ('CIG_BEFORE', typ.IntegerType()),
            ('CIG_1_TRI', typ.IntegerType()),
            ('CIG_2_TRI', typ.IntegerType()),
            ('CIG_3_TRI', typ.IntegerType()),
            ('MOTHER_HEIGHT_IN', typ.IntegerType()),
            ('MOTHER_PRE_WEIGHT', typ.IntegerType()),
            ('MOTHER_DELIVERY_WEIGHT', typ.IntegerType()),
            ('MOTHER_WEIGHT_GAIN', typ.IntegerType()),
            ('DIABETES_PRE', typ.IntegerType()),
            ('DIABETES_GEST', typ.IntegerType()),
            ('HYP_TENS_PRE', typ.IntegerType()),
            ('HYP_TENS_GEST', typ.IntegerType()),
            ('PREV_BIRTH_PRETERM', typ.IntegerType())
]

schema = typ.StructType([
        typ.StructField(e[0], e[1], False) for e in labels
])

births = sQ.read.csv('file:///home/userx/Downloads/births_transformed.csv.gz',
                        header=True,
                        schema=schema)

In [4]:
births.head()

Row(INFANT_ALIVE_AT_REPORT=0, BIRTH_PLACE='1', MOTHER_AGE_YEARS=29, FATHER_COMBINED_AGE=99, CIG_BEFORE=0, CIG_1_TRI=0, CIG_2_TRI=0, CIG_3_TRI=0, MOTHER_HEIGHT_IN=99, MOTHER_PRE_WEIGHT=999, MOTHER_DELIVERY_WEIGHT=999, MOTHER_WEIGHT_GAIN=99, DIABETES_PRE=0, DIABETES_GEST=0, HYP_TENS_PRE=0, HYP_TENS_GEST=0, PREV_BIRTH_PRETERM=0)

In [5]:
births.registerTempTable('births')

In [6]:
sQ.sql('select BIRTH_PLACE,count(*) from births group by 1 ').show()

+-----------+--------+
|BIRTH_PLACE|count(1)|
+-----------+--------+
|          7|      91|
|          3|     224|
|          5|      74|
|          6|      11|
|          9|       8|
|          1|   44558|
|          4|     327|
|          2|     136|
+-----------+--------+



In [10]:
# need to hot encode birth place
# first convert to integer in order to hot encode it
#import pyspark.ml.feature as ft
births = births \
.withColumn('BIRTH_PLACE_INT', births['BIRTH_PLACE'].cast(typ.IntegerType()))


In [13]:
births.registerTempTable('births')
sQ.sql('select BIRTH_PLACE_INT,count(*) from births group by 1 order by 1 ').show()

+---------------+--------+
|BIRTH_PLACE_INT|count(1)|
+---------------+--------+
|              1|   44558|
|              2|     136|
|              3|     224|
|              4|     327|
|              5|      74|
|              6|      11|
|              7|      91|
|              9|       8|
+---------------+--------+



In [15]:
import pyspark.ml.feature as ft
encoder = ft.OneHotEncoder(inputCol='BIRTH_PLACE_INT', outputCol= 'BIRTH_PLACE_VEC')

In [25]:
encoder.extract

{Param(parent='OneHotEncoder_8463df2221d7', name='outputCol', doc='output column name.'): 'BIRTH_PLACE_VEC',
 Param(parent='OneHotEncoder_8463df2221d7', name='dropLast', doc='whether to drop the last category'): True,
 Param(parent='OneHotEncoder_8463df2221d7', name='inputCol', doc='input column name.'): 'BIRTH_PLACE_INT'}

In [21]:
featureCreator = ft.VectorAssembler( 
    inputCols  = [ col[0] for col in labels[2:] ] + [encoder.getOutputCol()],
    outputCol  = 'features'
)

In [37]:

featureCreator.getInputCols()

['MOTHER_AGE_YEARS',
 'FATHER_COMBINED_AGE',
 'CIG_BEFORE',
 'CIG_1_TRI',
 'CIG_2_TRI',
 'CIG_3_TRI',
 'MOTHER_HEIGHT_IN',
 'MOTHER_PRE_WEIGHT',
 'MOTHER_DELIVERY_WEIGHT',
 'MOTHER_WEIGHT_GAIN',
 'DIABETES_PRE',
 'DIABETES_GEST',
 'HYP_TENS_PRE',
 'HYP_TENS_GEST',
 'PREV_BIRTH_PRETERM',
 'BIRTH_PLACE_VEC']

In [28]:
# Logistic Regression
import pyspark.ml.classification as cl
logistic = cl.LogisticRegression(
                maxIter=10,
                regParam=0.01,
                featureCols = featureCreator.getOutputCol,(),
                labelCol    ='INFANT_ALIVE_AT_REPORT')

In [31]:
# Create a pipeline
from pyspark.ml import Pipeline
pipline = Pipeline(stages = [ encoder, featureCreator, logistic])

In [32]:
# Train and test splits 
train, test = births.randomSplit([0.7, 0.3], seed = 123)

In [33]:
model = pipline.fit(train)
model_test = model.transform(test)

In [35]:
model_test.take(1)

[Row(INFANT_ALIVE_AT_REPORT=0, BIRTH_PLACE='1', MOTHER_AGE_YEARS=12, FATHER_COMBINED_AGE=99, CIG_BEFORE=0, CIG_1_TRI=0, CIG_2_TRI=0, CIG_3_TRI=0, MOTHER_HEIGHT_IN=62, MOTHER_PRE_WEIGHT=145, MOTHER_DELIVERY_WEIGHT=152, MOTHER_WEIGHT_GAIN=7, DIABETES_PRE=0, DIABETES_GEST=0, HYP_TENS_PRE=0, HYP_TENS_GEST=0, PREV_BIRTH_PRETERM=0, BIRTH_PLACE_INT=1, BIRTH_PLACE_VEC=SparseVector(9, {1: 1.0}), features=SparseVector(24, {0: 12.0, 1: 99.0, 6: 62.0, 7: 145.0, 8: 152.0, 9: 7.0, 16: 1.0}), rawPrediction=DenseVector([1.101, -1.101]), probability=DenseVector([0.7504, 0.2496]), prediction=0.0)]

In [39]:
#Model assessment
import pyspark.ml.evaluation as ev

evaluator = ev.BinaryClassificationEvaluator(
                            rawPredictionCol  ='probability',
                            labelCol          ='INFANT_ALIVE_AT_REPORT'
                            )

In [40]:
print(evaluator.evaluate(model_test, {evaluator.metricName: 'areaUnderROC'}))
print(evaluator.evaluate(model_test, {evaluator.metricName: 'areaUnderPR'}))

0.7187355793173213
0.6819691176245866


In [42]:
#Tuning
import pyspark.ml.tuning as tune


In [43]:
logistic = cl.LogisticRegression( labelCol='INFANT_ALIVE_AT_REPORT')

grid = tune.ParamGridBuilder().addGrid(logistic.maxIter,[2, 10, 50]) \
            .addGrid(logistic.regParam, [0.01, 0.05, 0.3]) \
            .build()

In [None]:
evaluator = ev.BinaryClassificationEvaluator(
                rawPredictionCol='probability',
                labelCol='INFANT_ALIVE_AT_REPORT')

In [46]:
cv = tune.CrossValidator(
        estimator          = logistic,
        estimatorParamMaps = grid,
        evaluator          = evaluator
     )

In [47]:
# prepare the data with the pipeline
pipeline = Pipeline(stages=[encoder ,featureCreator])
data_transformer = pipeline.fit(train)

# start tunin
cvModel = cv.fit(data_transformer.transform(train))

In [54]:
for r in cvModel.avgMetrics: print(r)
cvModel.

0.7006044687905117
0.7007246496676794
0.7014474539413368
0.73377554680586
0.7316258506352412
0.7239219682794967
0.7404103462475178
0.7352273403679913
0.7211737166059197


In [49]:
train2  = data_transformer.transform(test)
results = cvModel.transform(train2)
print(evaluator.evaluate(results, {evaluator.metricName: 'areaUnderROC'}))
print(evaluator.evaluate(results, {evaluator.metricName: 'areaUnderPR'}))

0.735848884034915
0.6959036715961695


In [56]:
results = [
 (
   [{key.name: paramValue} for key, paramValue  in zip( params.keys(), params.values())], 
   metric
 )
 for params, metric in zip( cvModel.getEstimatorParamMaps(), cvModel.avgMetrics)
]

sorted(results,key=lambda el: el[1],reverse=True)

[([{'maxIter': 50}, {'regParam': 0.01}], 0.7404103462475178),
 ([{'maxIter': 50}, {'regParam': 0.05}], 0.7352273403679913),
 ([{'maxIter': 10}, {'regParam': 0.01}], 0.73377554680586),
 ([{'maxIter': 10}, {'regParam': 0.05}], 0.7316258506352412),
 ([{'maxIter': 10}, {'regParam': 0.3}], 0.7239219682794967),
 ([{'maxIter': 50}, {'regParam': 0.3}], 0.7211737166059197),
 ([{'maxIter': 2}, {'regParam': 0.3}], 0.7014474539413368),
 ([{'maxIter': 2}, {'regParam': 0.05}], 0.7007246496676794),
 ([{'maxIter': 2}, {'regParam': 0.01}], 0.7006044687905117)]