In [1]:
#import modules
from pyspark import SparkConf, SparkContext
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.types import StringType, IntegerType, StructType, StructField
from pyspark.sql import SQLContext
from pyspark.sql import functions as sqlFunc

#set schema
df_schema = StructType([StructField("age", IntegerType(), True), StructField("sex", StringType(), True), \
                     StructField("lang", StringType(), True), StructField("ethnicity", StringType(), True), \
                     StructField("diabetes", StringType(), True), StructField("alcoholism", StringType(), True), \
                     StructField("hf", StringType(),True), StructField("liver", StringType(), True), \
                     StructField("lung", StringType(), True), StructField("smoking", StringType(), True), \
                     StructField("prior_pneumo", StringType(), True), StructField("prior_tdap", StringType(), True), \
                     StructField("post_pneumo", StringType(), True), StructField("post_tdap", StringType(), True)])

#load data
directory = 's3://applifier-mapreduce/elia/vaccines/vaccinations.csv'
df = spark.read.csv(path = directory, header = True, schema = df_schema)
df.persist()

##Clean Data

In [3]:
#clean non ordinal columns: sex, lang, ethnicity
df = df.withColumn("sex", sqlFunc.trim(col("sex")))
df = df.withColumn("sex", sqlFunc.lower(col("sex")))

In [4]:
df.select('sex').distinct().show()

In [5]:
df = df.withColumn("lang", sqlFunc.trim(col("lang")))
df = df.withColumn("lang", sqlFunc.lower(col("lang")))
df = df.withColumn("lang", sqlFunc.when(sqlFunc.trim(col('lang')) == 'unreported', 'unknown').otherwise(col('lang')))
df = df.withColumn("lang", sqlFunc.when(col('lang') == 'haitian creole', 'haitian').otherwise(col('lang')))
df = df.withColumn("lang", sqlFunc.when(col('lang') == 'unknown', 'other').otherwise(col('lang')))

In [6]:
df.select('lang').distinct().show()

In [7]:
df = df.withColumn("ethnicity", sqlFunc.trim(col("ethnicity")))
df = df.withColumn("ethnicity", sqlFunc.lower(col("ethnicity")))
df = df.withColumn("ethnicity", sqlFunc.when((col('ethnicity') == 'hisp')|(col('ethnicity') == 'hispanic or latino'), 'hispanic').otherwise(col('ethnicity')))
df = df.withColumn("ethnicity", sqlFunc.when((col('ethnicity') == 'not hispanic or latino(black)')|(col('ethnicity') == 'not hispanic (black)')|(col('ethnicity') == 'not hispanic or latino (black)'), 'black/african american').otherwise(col('ethnicity')))
df = df.withColumn("ethnicity", sqlFunc.when((col('ethnicity') == 'not hispanic (white)')|(col('ethnicity') == 'not hispanic or latino (white)'), 'white').otherwise(col('ethnicity')))
df = df.withColumn("ethnicity", sqlFunc.when((col('ethnicity') == 'not hispanic or latino (asian)')|(col('ethnicity') == 'not hispanic (asian)'), 'asian').otherwise(col('ethnicity')))      
df = df.withColumn("ethnicity", sqlFunc.when((col('ethnicity') == 'not hispanic or latino'), 'not hispanic').otherwise(col('ethnicity')))      
df = df.withColumn("ethnicity", sqlFunc.when((col('ethnicity') == 'not hispanic (other)')|(col('ethnicity') == 'unknown'), 'other').otherwise(col('ethnicity')))
df = df.withColumn("ethnicity", sqlFunc.when(col('ethnicity') == 'unreported', 'other').otherwise(col('ethnicity')))
df = df.withColumn("ethnicity", sqlFunc.when(col('ethnicity') == 'not hispanic (native american)', 'native american').otherwise(col('ethnicity')))


In [8]:
df.select('ethnicity').distinct().show(n=15, truncate = False)

In [9]:
#impute missing categorical values
def convert_space_nulls(c):
    '''
    Convert categoricals that are null to NA
    
    '''
    return (sqlFunc.when(sqlFunc.trim(col(c)) == '', "NA").when(col(c).isNull(), "NA").otherwise(col(c))).alias(c)

exprs = [convert_space_nulls(c[0]) if c[1] == 'string' else c[0] for c in df.dtypes ]

df = df.select(*exprs)

In [10]:
#bucketize age for every 10 years starting at min
def age_bucket(x): 
    
    if x == 0 or x < 0 or x == None: 
        return 'bin1'
    if 18 <= x < 28:
        return 'bin2'
    if 28 <= x <38:
        return 'bin3'
    if 38<= x < 48:
        return 'bin4'
    if 48<= x < 58: 
        return 'bin5'
    if 58 <= x < 68:
        return 'bin6'
    if 68 <= x < 78:
        return 'bin7'
    if 78 <= x < 88:
        return 'bin8'
    else:
        return 'bin9'

udf_age = udf(age_bucket, StringType())
df = df.withColumn("age_bucket", udf_age(df.age))

##Preprocess and set up pipeline

In [12]:
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder
from pyspark.ml import Pipeline

# create pipeline for model using post_pnuemo as target
stages = [] 
columns = ['age_bucket', 'sex', 'lang', 'ethnicity', 'diabetes', 'alcoholism', 'hf', 'liver', 'lung', 'smoking', 'prior_pneumo', 'prior_tdap']

# step 1: convert our target into a label using stringindexer
label_indexer = StringIndexer(inputCol = 'post_pneumo', outputCol = 'label')

# add these things to stages for ml pipeline
stages += [label_indexer]

# step 3: now we do some categorical indexing and encoding using string indexer and onehot
for column in columns: 

    # add hierarchal ordering to categories
    stringIndexer = StringIndexer(inputCol = column, outputCol = column+'Index')

    # now we need to encode this string into a binary vector
    encoder = OneHotEncoder(inputCol = column+'Index', outputCol = column+'Vect')

    stages += [stringIndexer, encoder]

assemblerInput = [column+'Vect' for column in columns]

assemble_feat = VectorAssembler(inputCols=assemblerInput, outputCol = 'features')

stages += [assemble_feat]

# build pipeline for model 
pipeline = Pipeline(stages = stages)

#run data through pipeline
pipeline_model = pipeline.fit(df)
dataset = pipeline_model.transform(df)

In [13]:
df.unpersist()
dataset.persist()

In [14]:
dataset.take(1)

##Set up Logistic Regression (Binary)
- This is predicting the probability of having a pnuemo vaccine given all other information about that individual excluding whether they had a tdap at that time. However, prior tdap and prior pnemo vaccine info is included

In [16]:
from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel

#split data
train_fraction = 0.7
(trainingData, testData) = dataset.randomSplit([train_fraction, 1-train_fraction], seed = 100)

#train model
lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10)
lr_model = lr.fit(trainingData)


In [17]:
trainingData.count()

In [18]:
testData.count()

In [19]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics, BinaryClassificationMetrics

#test model with test data
test_predictions = lr_model.transform(testData)
predictionAndLabels = test_predictions.select("prediction", "label").rdd
metrics = MulticlassMetrics(predictionAndLabels)
auc_metric = BinaryClassificationMetrics(predictionAndLabels).areaUnderROC
cm = metrics.confusionMatrix().toArray()
accuracy = (cm[0][0]+ cm[1][1])/(cm[0][1] + cm[1][0]+cm[0][0]+ cm[1][1])
precision = cm[0][0]/ (cm[0][0]+ cm[1][0])
recall = cm[0][0]/ (cm[0][0]+ cm[0][1])

print("****Summary Stats****")
print ("Confusion Matrix: ")
print cm

print("Area under ROC = %s" % auc_metric)
print("Precision = %s" % precision)
print("Recall = %s" % recall)
print("Accuracy = %s" % accuracy)
