## Tutorial: MLlib Classification Modeling

### University of California, Santa Barbara  
### PSTAT 135/235  
### Last Updated: Oct 22, 2018

---  

### Sources 

Learning PySpark, Chapter 5

### OBJECTIVES
- Train a classification model

### PREREQUISITES
- RDDs
- Spark DataFrames
- Schemas

---  

### MODULES

In [1]:
import os
import pyspark.sql.types as typ
import pyspark.sql.functions as F

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .master("local") \
    .appName("data preprocessing") \
    .config("spark.executor.memory", '8g') \
    .config('spark.executor.cores', '4') \
    .config('spark.cores.max', '4') \
    .config("spark.driver.memory",'8g') \
    .getOrCreate()

sc = spark.sparkContext

### PARAMETERS

In [3]:
path_to_data = os.path.join('/home/jovyan/UCSB_BigDataAnalytics/data/infant/births_train.csv.gz')

In [4]:
target='INFANT_ALIVE_AT_REPORT'

In [5]:
# select features for predicting target

selected_features = [
 'INFANT_ALIVE_AT_REPORT',
 'BIRTH_PLACE',
 'MOTHER_AGE_YEARS',
 'FATHER_COMBINED_AGE',
 'CIG_BEFORE',
 'CIG_1_TRI',
 'CIG_2_TRI',
 'CIG_3_TRI',
 'MOTHER_HEIGHT_IN',
 'MOTHER_PRE_WEIGHT',
 'MOTHER_DELIVERY_WEIGHT',
 'MOTHER_WEIGHT_GAIN',
 'DIABETES_PRE',
 'DIABETES_GEST',
 'HYP_TENS_PRE',
 'HYP_TENS_GEST',
 'PREV_BIRTH_PRETERM'
]

In [6]:
labels = [
    ('INFANT_ALIVE_AT_REPORT', typ.StringType()),
    ('BIRTH_YEAR', typ.IntegerType()),
    ('BIRTH_MONTH', typ.IntegerType()),
    ('BIRTH_PLACE', typ.StringType()),
    ('MOTHER_AGE_YEARS', typ.IntegerType()),
    ('MOTHER_RACE_6CODE', typ.StringType()),
    ('MOTHER_EDUCATION', typ.StringType()),
    ('FATHER_COMBINED_AGE', typ.IntegerType()),
    ('FATHER_EDUCATION', typ.StringType()),
    ('MONTH_PRECARE_RECODE', typ.StringType()),
    ('CIG_BEFORE', typ.IntegerType()),
    ('CIG_1_TRI', typ.IntegerType()),
    ('CIG_2_TRI', typ.IntegerType()),
    ('CIG_3_TRI', typ.IntegerType()),
    ('MOTHER_HEIGHT_IN', typ.IntegerType()),
    ('MOTHER_BMI_RECODE', typ.IntegerType()),
    ('MOTHER_PRE_WEIGHT', typ.IntegerType()),
    ('MOTHER_DELIVERY_WEIGHT', typ.IntegerType()),
    ('MOTHER_WEIGHT_GAIN', typ.IntegerType()),
    ('DIABETES_PRE', typ.StringType()),
    ('DIABETES_GEST', typ.StringType()),
    ('HYP_TENS_PRE', typ.StringType()),
    ('HYP_TENS_GEST', typ.StringType()),
    ('PREV_BIRTH_PRETERM', typ.StringType()),
    ('NO_RISK', typ.StringType()),
    ('NO_INFECTIONS_REPORTED', typ.StringType()),
    ('LABOR_IND', typ.StringType()),
    ('LABOR_AUGM', typ.StringType()),
    ('STEROIDS', typ.StringType()),
    ('ANTIBIOTICS', typ.StringType()),
    ('ANESTHESIA', typ.StringType()),
    ('DELIV_METHOD_RECODE_COMB', typ.StringType()),
    ('ATTENDANT_BIRTH', typ.StringType()),
    ('APGAR_5', typ.IntegerType()),
    ('APGAR_5_RECODE', typ.StringType()),
    ('APGAR_10', typ.IntegerType()),
    ('APGAR_10_RECODE', typ.StringType()),
    ('INFANT_SEX', typ.StringType()),
    ('OBSTETRIC_GESTATION_WEEKS', typ.IntegerType()),
    ('INFANT_WEIGHT_GRAMS', typ.IntegerType()),
    ('INFANT_ASSIST_VENTI', typ.StringType()),
    ('INFANT_ASSIST_VENTI_6HRS', typ.StringType()),
    ('INFANT_NICU_ADMISSION', typ.StringType()),
    ('INFANT_SURFACANT', typ.StringType()),
    ('INFANT_ANTIBIOTICS', typ.StringType()),
    ('INFANT_SEIZURES', typ.StringType()),
    ('INFANT_NO_ABNORMALITIES', typ.StringType()),
    ('INFANT_ANCEPHALY', typ.StringType()),
    ('INFANT_MENINGOMYELOCELE', typ.StringType()),
    ('INFANT_LIMB_REDUCTION', typ.StringType()),
    ('INFANT_DOWN_SYNDROME', typ.StringType()),
    ('INFANT_SUSPECTED_CHROMOSOMAL_DISORDER', typ.StringType()),
    ('INFANT_NO_CONGENITAL_ANOMALIES_CHECKED', typ.StringType()),
    ('INFANT_BREASTFED', typ.StringType())
]


In [7]:
schema = typ.StructType([
 typ.StructField(e[0], e[1], False) for e in labels
])

### READ IN DATA

In [8]:
births = spark.read.csv(path_to_data,
 header=True,
 schema=schema)

In [None]:
births.take(1)

In [11]:
# compute distribution of target variable

births.groupBy(target).count().show()

In [12]:
# compute distribution of a selected feature

births.groupBy('INFANT_LIMB_REDUCTION').count().show()

In [13]:
# narrow the dataset to selected features and target

births_trimmed = births.select(selected_features)

In [None]:
print('rows={},columns={}'.format(births_trimmed.count(), len(births_trimmed.columns)))

### DATA PREP

Data issue: cigarettes smoked variable values
- 0-97 is actual number
- 98 is capped value (98+)
- 99 is default value for unknown

We will recode 99 as 0

In [14]:
# if feature value not 99, then return value, else return 0

def correct_cig(feat):
    impute_value = 0
    return F \
         .when(F.col(feat) != 99, F.col(feat))\
         .otherwise(impute_value)

In [15]:
# correct the features related to cigarettes smoked

births_transformed = births_trimmed \
 .withColumn('CIG_BEFORE', correct_cig('CIG_BEFORE'))\
 .withColumn('CIG_1_TRI', correct_cig('CIG_1_TRI'))\
 .withColumn('CIG_2_TRI', correct_cig('CIG_2_TRI'))\
 .withColumn('CIG_3_TRI', correct_cig('CIG_3_TRI'))

In [None]:
births_transformed.describe(['CIG_BEFORE','CIG_1_TRI','CIG_2_TRI','CIG_3_TRI']).show()

#### Some variables are coded as strings Yes/No/Unknown. The will need to be recoded as numerics.
Recode dictionary

In [17]:
recode_dictionary = {
 'YNU': {
 'Y': 1,
 'N': 0,
 'U': 0
 }
}

In [18]:
# lookup code in dict

def recode(col, key):
 return recode_dictionary[key][col]

In [19]:
# UDF to allow recode() to be applied to dataframe

rec_integer = F.udf(recode, typ.IntegerType())

In [20]:
# Locate Yes/No/Unknown (YNU) columns

cols = [(col.name, col.dataType) for col in births_trimmed.schema]
YNU_cols = []
for i, s in enumerate(cols):
     if s[1] == typ.StringType():
         dis = births.select(s[0]) \
         .distinct() \
         .rdd \
         .map(lambda row: row[0]) \
         .collect()
     if 'Y' in dis:
         YNU_cols.append(s[0])

In [None]:
YNU_cols

In [21]:
# convert the YNU columns

exprs_YNU = [
 rec_integer(x, F.lit('YNU')).alias(x)
 if x in YNU_cols
 else x
 for x in births_transformed.columns
]

In [22]:
births_transformed = births_transformed.select(exprs_YNU)

### Summarize the data

In [26]:
import pyspark.mllib.stat as st
import numpy as np

In [27]:
numeric_cols = ['MOTHER_AGE_YEARS','FATHER_COMBINED_AGE',
                'CIG_BEFORE','CIG_1_TRI','CIG_2_TRI','CIG_3_TRI',
                 'MOTHER_HEIGHT_IN','MOTHER_PRE_WEIGHT',
                 'MOTHER_DELIVERY_WEIGHT','MOTHER_WEIGHT_GAIN'
                ]

In [28]:
numeric_rdd = births_transformed \
             .select(numeric_cols) \
             .rdd \
             .map(lambda row: [e for e in row])

In [29]:
# Use MLlib package for compute stats on columns

mllib_stats = st.Statistics.colStats(numeric_rdd)

In [None]:
# extract some stats, zip and print mean, sd

for col, m, v in zip(numeric_cols,
                     mllib_stats.mean(),
                     mllib_stats.variance()):
 print('{0}: \t{1:.2f} \t {2:.2f}'.format(col, m, np.sqrt(v)))

In [33]:
# narrow the list of features for modeling

features_to_keep = [
 'INFANT_ALIVE_AT_REPORT',
 'MOTHER_AGE_YEARS',
 'FATHER_COMBINED_AGE',
 'CIG_1_TRI',
 'MOTHER_HEIGHT_IN',
 'MOTHER_PRE_WEIGHT',
 'DIABETES_PRE',
 'DIABETES_GEST',
 'HYP_TENS_PRE',
 'HYP_TENS_GEST',
 'PREV_BIRTH_PRETERM'
]

In [34]:
births_transformed = births_transformed.select([e for e in features_to_keep])

### MLlib requires an RDD of LabeledPoints

LabeledPoint consists of two attributes: *label* and *features*  

features can be:

- NumPy array,  
- list,  
- pyspark.mllib.linalg.SparseVector,  
- pyspark.mllib.linalg.DenseVector,  
- scipy.sparse column matrix


In [23]:
import pyspark.mllib.regression as reg

In [24]:
births_lp = births_transformed \
 .rdd \
 .map(lambda row: reg.LabeledPoint(row[0], row[1:]))

### Need to split data into training and testing sets (*Data Splitting*)

In [25]:
births_train, births_test = births_lp.randomSplit([0.6, 0.4])

In [None]:
print('count_total={},count_train={},count_test={}'.format(births_transformed.count(),births_train.count(),births_test.count()))

In [26]:
from pyspark.mllib.classification import LogisticRegressionWithLBFGS

In [39]:
LR_Model = LogisticRegressionWithLBFGS.train(births_train, iterations=10)

### Predict

In [86]:
# from test set, zip labels with predicted labels and cast to float

act_pred_test_set = births_test.map(lambda p: (p.label, LR_Model.predict(p.features))) \
                                    .map(lambda row: (row[0], row[1] * 1.0))

In [87]:
act_pred_test_set.take(3)

[(0.0, 0.0), (0.0, 0.0), (0.0, 0.0)]

### Evaluate Performance

In [88]:
from pyspark.mllib.evaluation import BinaryClassificationMetrics

In [89]:
metrics = BinaryClassificationMetrics(act_pred_test_set)

In [90]:
print('Area under PR: {0:.2f}'.format(metrics.areaUnderPR))
print('Area under ROC: {0:.2f}'.format(metrics.areaUnderROC))

Area under PR: 0.70
Area under ROC: 0.69
