## Model Selection, Training & Evaluation
We will select the model which is most appropriate to predict the extent of the earthquake damage for the building characteristics defined in the data model.

load save processed data from IBM Data Store

In [1]:
import ibmos2spark
# @hidden_cell
credentials = {
    'endpoint': 'https://s3-api.us-geo.objectstorage.service.networklayer.com',
    'service_id': 'iam-ServiceId-1ffa7090-b46d-409d-a4c2-d72608483d6a',
    'iam_service_endpoint': 'https://iam.bluemix.net/oidc/token',
    'api_key': 'rJ9gWSE1VMXj0qVCzTfP36owXVp9NrU3vaRpVRtcgyz3'
}

configuration_name = 'os_e69b22554ae141c3a275fabb55b0f50e_configs'
cos = ibmos2spark.CloudObjectStorage(sc, credentials, configuration_name, 'bluemix_cos')

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
train_proc = spark.read.parquet(cos.url('Train_Proc.parquet', 'advanceddatasciencewithibm-donotdelete-pr-z8s5dzzkvq4bck'))
train_proc.createOrReplaceTempView("train_proc")

# display first 5 rows, with scroll like in Pandas df
train_proc.limit(5).toPandas().head()


Waiting for a Spark session to start...
Spark Initialization Done! ApplicationId = app-20191013231354-0000
KERNEL_ID = 6f7fcb2a-9606-4984-b437-22f8709f5878


Unnamed: 0,building_id,features,label
0,299215,"(1.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 1.0, ...",3
1,377492,"(1.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 1.0, ...",1
2,294802,"(0.0, 0.0, 1.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, ...",3
3,843787,"(1.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 1.0, ...",3
4,343815,"(1.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",3


In [2]:
# how many rows & columns? 
print((train_proc.count(), len(train_proc.columns)))

(260601, 3)


In [3]:
# show column types
train_proc.dtypes

[('building_id', 'int'), ('features', 'vector'), ('label', 'int')]

In [4]:
import scipy.sparse
from pyspark.ml.linalg import Vectors, _convert_to_vector, VectorUDT
from pyspark.sql.functions import udf, col
from pyspark.mllib.util import MLUtils
from pyspark.mllib.regression import LabeledPoint

def dense_to_sparse(vector):
    return _convert_to_vector(scipy.sparse.csc_matrix(vector.toArray()).T)

to_sparse = udf(dense_to_sparse, VectorUDT())
train_proc = train_proc.withColumn('features_sparse', to_sparse(col('features')))

train_proc.dtypes

[('building_id', 'int'),
 ('features', 'vector'),
 ('label', 'int'),
 ('features_sparse', 'vector')]

### Split into Training and Test Datasets
Using 60% training and 40% test

In [5]:
# split into training and test sets
splits = train_proc.randomSplit([0.6, 0.4])
df_train = splits[0] # training dataset
df_test  = splits[1]  # test dataset

In [6]:
# how many count of training and test datasets 
print('Training Set ',(df_train.count(), len(df_train.columns)))
print('Test Set ',(df_test.count(), len(df_test.columns)))

Training Set  (156181, 4)
Test Set  (104420, 4)


# Choice of Model
The user case is a multiclass classification and there are various algorithms available.  I select 2 Supervised Machine Learning and one Deep Learning using
1. Logistics Regression
2. Random Forest Classifier
3. Feedforward Neural Network

### 1. Logistics Regression

In [8]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import StandardScaler
from pyspark.ml import Pipeline

scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures",
                        withStd=True, withMean=False)

logr = LogisticRegression(featuresCol='scaledFeatures', labelCol='label',standardization=False)

# Chain indexers and tree in a Pipeline
pipeline = Pipeline(stages=[scaler, logr])

# Train model.  
model = pipeline.fit(df_train)

# Make predictions.
predictions = model.transform(df_test)

# Select example rows to display.
predictions.select("features","label","prediction").show(5)

+--------------------+-----+----------+
|            features|label|prediction|
+--------------------+-----+----------+
|(59,[0,3,8,11,17,...|    3|       3.0|
|(59,[0,3,8,11,16,...|    2|       2.0|
|(59,[0,3,9,12,16,...|    2|       2.0|
|(59,[1,3,8,11,16,...|    2|       2.0|
|(59,[0,5,10,13,18...|    2|       2.0|
+--------------------+-----+----------+
only showing top 5 rows



#### Evaluation
Display the confusion matrix

Evaluate the predictions using the F1 metric, which is a weighted average of precision and recall scores, which a perfect score at 1.0.

In [9]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

predictions.groupBy('label','prediction').count().show()

evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")

evaluator.evaluate(predictions)


+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    2|       1.0| 2011|
|    1|       3.0|  283|
|    3|       1.0|  184|
|    1|       2.0| 6605|
|    2|       3.0| 5662|
|    3|       2.0|27785|
|    3|       3.0| 6987|
|    2|       2.0|51735|
|    1|       1.0| 3168|
+-----+----------+-----+



0.5417550673320801

We will use F1 score as we want to focus on decreasing both false positives and false negatives.
f1 for first logistic regression model is 0.54

In [10]:
# look at distribution of labels and see if they are balanced
train_proc.groupBy('label').count().show()

+-----+------+
|label| count|
+-----+------+
|    1| 25124|
|    3| 87218|
|    2|148259|
+-----+------+



#### Imbalance Handling
The dataset is unbalanced, with Label 2 accounting for more than half (56.89%).  Label 3 occupies 33.47% while Label 1 is the smallest contributer at 9.64%.  We will rerun Logistic Regression by passing weights that will give more weightage to smaller contributers and less to bigger contributers.
By calculations, the weights for Labels 1,2,3 are 0.4518, 0.2155 and 0.3327 respectively.  

In [11]:
from pyspark.sql.functions import when

df_train=df_train.withColumn("classWeights", when(df_train.label == 1,0.4518).when(df_train.label == 2,0.2155).otherwise(0.3327))

df_train.select("classWeights").show(5)


+------------+
|classWeights|
+------------+
|      0.2155|
|      0.3327|
|      0.3327|
|      0.2155|
|      0.2155|
+------------+
only showing top 5 rows



In [12]:
# re-run Logistic Regression with weights

from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import StandardScaler
from pyspark.ml import Pipeline

scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures",
                        withStd=True, withMean=False)

logr = LogisticRegression(featuresCol='scaledFeatures', labelCol='label',weightCol='classWeights',standardization=False)

# Chain indexers and tree in a Pipeline
pipeline = Pipeline(stages=[scaler, logr])

# Train model.  
model = pipeline.fit(df_train)

# Make predictions.
predictions = model.transform(df_test)

# Select example rows to display.
predictions.select("features","label","prediction").show(5)

+--------------------+-----+----------+
|            features|label|prediction|
+--------------------+-----+----------+
|(59,[0,3,8,11,17,...|    3|       3.0|
|(59,[0,3,8,11,16,...|    2|       2.0|
|(59,[0,3,9,12,16,...|    2|       2.0|
|(59,[1,3,8,11,16,...|    2|       2.0|
|(59,[0,5,10,13,18...|    2|       2.0|
+--------------------+-----+----------+
only showing top 5 rows



In [13]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

predictions.groupBy('label','prediction').count().show()

evaluatorf1 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
f1 = evaluatorf1.evaluate(predictions)
print("f1 = %g" % f1)

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    2|       1.0| 5002|
|    1|       3.0| 1547|
|    3|       1.0|  533|
|    1|       2.0| 3117|
|    2|       3.0|23449|
|    3|       2.0|13207|
|    3|       3.0|21216|
|    2|       2.0|30957|
|    1|       1.0| 5392|
+-----+----------+-----+

f1 = 0.554662


F1 score improved from 0.54 to 0.55 with class weighing technique to overcome the imbalance of label distribution

#### Tuning
We alter some of these pipeline parameters to see if we can improve on our F1 score from before. 
We'll set up a hyperparameter grid and do an exhaustive grid search on these hyperparameters. We start by setting up our hyperparameter grid using the ParamGridBuilder, then we determine their performance using the CrossValidator, which does k-fold cross validation (k=3 in this case).

In [15]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

paramGrid = (ParamGridBuilder()
  .addGrid(logr.regParam, [0.01, 0.5, 2.0]) \
  .addGrid(logr.maxIter, [10, 20, 50]) \
  .addGrid(logr.elasticNetParam, [0.0, 0.5]) \
  .build())

crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=3)

In [16]:
model2 = crossval.fit(df_train)

In [17]:
predictions2 = model2.transform(df_test)

evaluatorf1 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
f1 = evaluatorf1.evaluate(predictions)
print("f1 = %g" % f1)

f1 = 0.554662


With hyperparameter tuning, the F1 score remains indicating that the default parameters have worked well.

### 2. Classification using RandomForest classifier

Define the model for Random Forest Classifier and train.

In [18]:
from pyspark.ml.classification import RandomForestClassifier

# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=10)

# Train model.  
model = rf.fit(df_train)

# Make predictions.
predictions = model.transform(df_test)

# Select example rows to display.
predictions.select("features","label","prediction").show(5)

+--------------------+-----+----------+
|            features|label|prediction|
+--------------------+-----+----------+
|(59,[0,3,8,11,17,...|    3|       2.0|
|(59,[0,3,8,11,16,...|    2|       2.0|
|(59,[0,3,9,12,16,...|    2|       2.0|
|(59,[1,3,8,11,16,...|    2|       2.0|
|(59,[0,5,10,13,18...|    2|       2.0|
+--------------------+-----+----------+
only showing top 5 rows



#### Evaluation

In [19]:
predictions.groupBy('label','prediction').count().show()

evaluatorf1 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
f1 = evaluatorf1.evaluate(predictions)
print("f1 = %g" % f1)


+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    2|       1.0| 1497|
|    1|       3.0|   22|
|    3|       1.0|   66|
|    1|       2.0| 7618|
|    2|       3.0| 1629|
|    3|       2.0|29827|
|    3|       3.0| 5063|
|    2|       2.0|56282|
|    1|       1.0| 2416|
+-----+----------+-----+

f1 = 0.532707


F1 score for random forest is 0.53

### 3. Feedforward Neural Network

In [20]:
# convert the data to dense vector
def transData(data):
    return data.rdd.map(lambda r: [r[2], Vectors.dense(r[1])]).\
           toDF(['label','features'])

from pyspark.sql import Row
from pyspark.ml.linalg import Vectors

data= transData(train_proc)
data.show(10)

+-----+--------------------+
|label|            features|
+-----+--------------------+
|    3|[1.0,0.0,0.0,1.0,...|
|    1|[1.0,0.0,0.0,1.0,...|
|    3|[0.0,0.0,1.0,0.0,...|
|    3|[1.0,0.0,0.0,1.0,...|
|    3|[1.0,0.0,0.0,1.0,...|
|    2|[0.0,0.0,1.0,1.0,...|
|    3|[1.0,0.0,0.0,1.0,...|
|    2|[1.0,0.0,0.0,1.0,...|
|    3|[0.0,0.0,1.0,1.0,...|
|    3|[1.0,0.0,0.0,1.0,...|
+-----+--------------------+
only showing top 10 rows



In [21]:
# The class for Neural network needs to be zero-based
data = data.withColumn('label', data.label - 1)

In [22]:
# split into training and test sets
(df2_train, df2_test) = data.randomSplit([0.6, 0.4])

df2_train.show(5)
df2_test.show(5)

+-----+--------------------+
|label|            features|
+-----+--------------------+
|    0|[0.0,0.0,1.0,0.0,...|
|    0|[0.0,0.0,1.0,0.0,...|
|    0|[0.0,0.0,1.0,0.0,...|
|    0|[0.0,0.0,1.0,0.0,...|
|    0|[0.0,0.0,1.0,0.0,...|
+-----+--------------------+
only showing top 5 rows

+-----+--------------------+
|label|            features|
+-----+--------------------+
|    0|[0.0,0.0,1.0,0.0,...|
|    0|[0.0,0.0,1.0,0.0,...|
|    0|[0.0,0.0,1.0,0.0,...|
|    0|[0.0,0.0,1.0,0.0,...|
|    0|[0.0,0.0,1.0,0.0,...|
+-----+--------------------+
only showing top 5 rows



In [23]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import MultilayerPerceptronClassificationModel, MultilayerPerceptronClassifier

# specify layers for the neural network:
# input layer of size 59 (features), two intermediate of size 13 and 7
# and output of size 3 (classes)
layers = [59, 25, 12 , 3]

#scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")

# create the trainer and set its parameters
FNN = MultilayerPerceptronClassifier(labelCol="label", \
                                     featuresCol="features",\
                                     maxIter=100, layers=layers, \
                                     blockSize=128, seed=1234)

# Chain indexers and tree in a Pipeline
pipeline = Pipeline(stages=[FNN])

# Train model.  
model = pipeline.fit(df2_train)

In [24]:
# Make predictions
predictions = model.transform(df2_test)

#### Evaluation

In [25]:
predictions.groupBy('label','prediction').count().show()

evaluatorf1 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
f1 = evaluatorf1.evaluate(predictions)
print("f1 = %g" % f1)

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    0|       1.0| 9937|
|    1|       1.0|59496|
|    2|       1.0|34794|
+-----+----------+-----+

f1 = 0.414873


F1 score for Neural Networks is 0.41

The best F1 score is achieved with the Logistics Regression Model with imbalance handling.