### Preparation: 1. Read file function from professor's lecture content

In [2]:
# read only cell

import os

# get the databricks runtime version
db_env = os.getenv("DATABRICKS_RUNTIME_VERSION")

# Define a function to read the data file.  The full path data file name is constructed
# by checking runtime environment variables to determine if the runtime environment is 
# databricks, or a student's personal computer.  The full path file name is then
# constructed based on the runtime env.
# 
# Params
#   data_file_name: The base name of the data file to load
# 
# Returns the full path file name based on the runtime env
#
def get_training_filename(data_file_name):    
    # if the databricks env var exists
    if db_env != None:
        # build the full path file name assuming data brick env
        full_path_name = "/FileStore/tables/%s" % data_file_name
    # else the data is assumed to be in the same dir as this notebook
    else:
        # Assume the student is running on their own computer and load the data
        # file from the same dir as this notebook
        full_path_name = data_file_name
    
    # return the full path file name to the caller
    return full_path_name

###  Preparation: 2. import neccessary packages and functions

In [4]:
from pyspark.sql import SparkSession
from pyspark.ml import feature
from pyspark.ml import classification
from pyspark.sql import functions as fn
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator, \
    MulticlassClassificationEvaluator, \
    RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.sql import SparkSession
from pyspark.sql import Row
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

## Part 1: Exploring

- Data Resource
[https://www.kaggle.com/pitasr/falldata](https://www.kaggle.com/pitasr/falldata)

- Attribute Description:

  - ACTIVITY    :   activity classification  
      - 0- Standing 1- Walking 2- Sitting 3- Falling 4- Cramps 5- Running
  - TIME        :   monitoring time
  - SL          :  sugar level
  - EEGEEG      :  monitoring rate
  - BP          :  Blood pressure
  - HR          :  Heart beat rate
  - CIRCLUATION :  Blood circulation

In [6]:
# Read file from csv file
df = spark.read.csv(get_training_filename('falldeteciton.csv'), header=True, inferSchema=True)

In [7]:
#1. Show first 10 rows
df.show(10)

#2. Print the shape of df (num_rows, num_cols)
print(df.count(),len(df.columns))

In [8]:
# 3. Check Missing Values
from pyspark.sql.functions import col,sum
df.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in df.columns)).show()

## Part2: Visualization

In [10]:
#1. histgram
pd_df = df.toPandas()
help(plt.subplots)
fig, ax = plt.subplots()
ax = sns.distplot(pd_df['ACTIVITY'], kde=True)
fig.set_size_inches(5,5)
display(fig)

In [11]:
#  histgrams of atttibutes
fig = plt.figure(figsize=(18,9))

plt.subplot(231)
sns.distplot(pd_df['TIME'])
plt.title('TIME distribution')

plt.subplot(232)
sns.distplot(pd_df['SL'])
plt.title('SL distribution')

plt.subplot(233)
sns.distplot(pd_df['EEG'])
plt.title('EEG distribution')

plt.subplot(234)
sns.distplot(pd_df['BP'])
plt.title('BP distribution')

plt.subplot(235)
sns.distplot(pd_df['HR'])
plt.title('HR distribution')

plt.subplot(236)
sns.distplot(pd_df['CIRCLUATION'])
plt.title('CIRCLUATION distribution')
display(fig)

In [12]:
#2. Pairplot
ax=sns.pairplot(pd_df[['TIME', 'SL', 'EEG', 'BP', 'HR','CIRCLUATION']], diag_kind = 'kde',
             plot_kws = {'alpha': 0.7, 's': 18, 'edgecolor': 'None'},
             height = 4)
ax.fig.suptitle("Data Pair Plot", y=1.01)
ax.fig.set_size_inches(12,12)
display(ax.fig)

In [13]:
#3. Box plot : Time
fig, ax = plt.subplots()
fig = pd_df.boxplot(column = 'TIME',by='ACTIVITY') 
fig.set_xticklabels(['Standing', 'Walking', 'Sitting', 'Falling','Cramps','Running'])
display(fig)

In [14]:
#4. Box plot : SL
fig, ax = plt.subplots()
fig = pd_df.boxplot(column = 'SL',by='ACTIVITY') 
fig.set_xticklabels(['Standing', 'Walking', 'Sitting', 'Falling','Cramps','Running'])
display(fig)


In [15]:
#5. Box plot : Circulation
fig, ax = plt.subplots()
fig = pd_df.boxplot(column = 'CIRCLUATION',by='ACTIVITY') 
fig.set_xticklabels(['Standing', 'Walking', 'Sitting', 'Falling','Cramps','Running'])
display(fig)

In [16]:
#6. Scatter plot
ax = sns.FacetGrid(pd_df, col="ACTIVITY") 
ax.map(plt.scatter, "TIME", "SL")
ax.fig.suptitle("Fall Detection", y=1.01)
ax.fig.set_size_inches(20,10)
display(ax.fig)

## Part2. Preprocessing

In [18]:
from pyspark.sql.functions import udf,col
from pyspark.sql.types import IntegerType

# 1. Transform the target variable into 2 classes: 1(falling) or 0(other activity)
def func(s):
  if s == 3:
    return 1
  else:
    return 0
my_func = udf(func,IntegerType())
df_fall = df.withColumn('FALL', my_func('ACTIVITY'))



In [19]:
# 2. Split the dataset with the split ratio 8:2

training, test = df_fall.randomSplit([0.8, 0.2], 0)
training.groupBy('FALL').agg(fn.count('*')).show()
test.groupBy('FALL').agg(fn.count('*')).show()

## Part4: Modeling and Evaluation

### 4.1 Logistic Regression

In [22]:
# logistic regression pipeline

va = feature.VectorAssembler(inputCols=['TIME', 'SL', 'EEG', 'BP', 'HR','CIRCLUATION'],
                                        outputCol='features')
st = feature.StandardScaler(withMean=True, inputCol='features')
lr = classification.LogisticRegression(labelCol='FALL', featuresCol='features')
model1 = Pipeline(stages=[va,lr])

model1_fitted = model1.fit(training)

In [23]:
# check the AUC score
from pyspark.ml import evaluation
evaluator = evaluation.MulticlassClassificationEvaluator(metricName="accuracy",labelCol='FALL')
bce = BinaryClassificationEvaluator(labelCol='FALL')
bce.evaluate(model1_fitted.transform(test))
print("Test Area Under ROC: " + str(bce.evaluate(model1_fitted.transform(test), {bce.metricName: "areaUnderROC"})))

print("accuracy: " + str(evaluator.evaluate(model1_fitted.transform(test), {evaluator.metricName: "accuracy"})))


In [24]:
# Grid search to tune the model
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.tuning import ParamGridBuilder
import numpy as np

paramGrid = ParamGridBuilder()\
    .addGrid(lr.regParam, [0., 0.015, 0.025, 0.05]) \
    .addGrid(lr.fitIntercept, [False, True])\
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])\
    .build()
lr = classification.LogisticRegression(labelCol='FALL', featuresCol='features')

bce = BinaryClassificationEvaluator(labelCol='FALL')

lr_cv = CrossValidator(estimator=lr, evaluator=bce,estimatorParamMaps=paramGrid,numFolds=3)

lr_cv_model =  Pipeline(stages=[va,lr_cv]).fit(training)


In [25]:
# check the AUC score again
print("Test Area Under ROC: " + str(bce.evaluate(lr_cv_model.transform(test), {bce.metricName: "areaUnderROC"})))

In [26]:
# ROC scatter Plot
import matplotlib.pyplot as plt
import numpy as np
tpr_fpr = lr_cv_model.stages[-1].bestModel.summary.roc.toPandas()
fig, ax = plt.subplots()
X = np.linspace(0,1,10)
plt.plot( X, X, color = 'black', linewidth=2, linestyle="-" )
plt.scatter(tpr_fpr['FPR'],tpr_fpr['TPR'],color = 'blue',s=14,alpha=0.6)
plt.xlim(0,1)
plt.ylim(0,1)
plt.xlabel('FPR')
plt.ylabel('TPR')
plt.title('ROC scatter plot')
display(fig)

In [27]:
bestLRModel = lr_cv_model.stages[-1].bestModel
param_dict = bestLRModel.extractParamMap()
name_dict = {}
for k, v in param_dict.items():
  name_dict[k.name] = v
best_reg = name_dict["regParam"]
best_elastic_net = name_dict["elasticNetParam"]
print('Best Param (regParam):{:.4f} '.format(best_reg))
print('Best Param (elasticNetParam):{:.4f} '.format(best_elastic_net))


### 4.2 SVM (Support Vector Machine)

In [29]:
# SVM
from pyspark.ml.classification import LinearSVC
svm = LinearSVC(labelCol='FALL',maxIter=50, regParam=0.01)

svm_pipeline = Pipeline(stages=[va,st,svm])

svm_pipeline_fitted = svm_pipeline.fit(training)


In [30]:
# check result
print("Test Area Under ROC: " + str(bce.evaluate(svm_pipeline_fitted.transform(test), {bce.metricName: "areaUnderROC"})))
print("accuracy: " + str(evaluator.evaluate(svm_pipeline_fitted.transform(test), {evaluator.metricName: "accuracy"})))

### 4.3 Decistion Tree

In [32]:
#2.1 decision tree (DT)
from pyspark.ml.classification import DecisionTreeClassifier
dt = DecisionTreeClassifier(featuresCol = 'features', labelCol = 'FALL', maxDepth =5)

dt_pipeline = Pipeline(stages=[va,dt])

dt_pipeline_fitted = dt_pipeline.fit(training)



In [33]:
print("Test Area Under ROC: " + str(bce.evaluate(dt_pipeline_fitted.transform(test), {bce.metricName: "areaUnderROC"})))
print("accuracy: " + str(evaluator.evaluate(dt_pipeline_fitted.transform(test), {evaluator.metricName: "accuracy"})))

In [34]:
display(dt_pipeline_fitted.stages[-1])
# feature 1-6: 'TIME', 'SL', 'EEG', 'BP', 'HR', 'CIRCLUATION'

treeNode
"{""index"":23,""featureType"":""continuous"",""prediction"":null,""threshold"":101.5,""categories"":null,""feature"":3,""overflow"":false}"
"{""index"":11,""featureType"":""continuous"",""prediction"":null,""threshold"":515.5,""categories"":null,""feature"":5,""overflow"":false}"
"{""index"":5,""featureType"":""continuous"",""prediction"":null,""threshold"":75.5,""categories"":null,""feature"":4,""overflow"":false}"
"{""index"":1,""featureType"":""continuous"",""prediction"":null,""threshold"":165.5,""categories"":null,""feature"":5,""overflow"":false}"
"{""index"":0,""featureType"":null,""prediction"":0.0,""threshold"":null,""categories"":null,""feature"":null,""overflow"":false}"
"{""index"":3,""featureType"":""continuous"",""prediction"":null,""threshold"":2199.3950000000004,""categories"":null,""feature"":1,""overflow"":false}"
"{""index"":2,""featureType"":null,""prediction"":1.0,""threshold"":null,""categories"":null,""feature"":null,""overflow"":false}"
"{""index"":4,""featureType"":null,""prediction"":0.0,""threshold"":null,""categories"":null,""feature"":null,""overflow"":false}"
"{""index"":7,""featureType"":""continuous"",""prediction"":null,""threshold"":47.5,""categories"":null,""feature"":3,""overflow"":false}"
"{""index"":6,""featureType"":null,""prediction"":0.0,""threshold"":null,""categories"":null,""feature"":null,""overflow"":false}"


In [35]:
#maxDepth 30
dt = DecisionTreeClassifier(featuresCol = 'features', labelCol = 'FALL', maxDepth =30)

dt_pipeline_30 = Pipeline(stages=[va,dt])

dt_pipeline_fitted_30 = dt_pipeline_30.fit(training)
print("Test Area Under ROC: " + str(bce.evaluate(dt_pipeline_fitted_30.transform(test), {bce.metricName: "areaUnderROC"})))
print("accuracy: " + str(evaluator.evaluate(dt_pipeline_fitted_30.transform(test), {evaluator.metricName: "accuracy"})))

### 4.4 Random Forest

In [37]:
# random forest
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'FALL')
rf_pipeline = Pipeline(stages=[va,st,rf])

rf_pipeline_fitted = rf_pipeline.fit(training)
bce = BinaryClassificationEvaluator(labelCol='FALL')
print("Test Area Under ROC: " + str(bce.evaluate(rf_pipeline_fitted.transform(test), {bce.metricName: "areaUnderROC"})))

In [38]:
# grid search
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.tuning import ParamGridBuilder
paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [int(x) for x in np.linspace(start = 100, stop = 200, num = 5)]) \
    .addGrid(rf.maxDepth, [int(x) for x in np.linspace(start = 8, stop = 25, num = 3)]) \
    .build()

bce = BinaryClassificationEvaluator(labelCol='FALL')

cv = CrossValidator(estimator=Pipeline(stages=[va, rf]), evaluator=bce,estimatorParamMaps=paramGrid,numFolds=3)

rn_cv_model = cv.fit(training)


# best hyperparameters:  numTree : 150. maxDepth:16

In [39]:
print("Test Area Under ROC: " + str(bce.evaluate(rn_cv_model.transform(test), {bce.metricName: "areaUnderROC"})))

In [40]:
# fast run( with best hyperparameters)
rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'FALL',numTrees=150,maxDepth=16)
rf_pipeline_best = Pipeline(stages=[va,st,rf])

rf_pipeline_best_fitted = rf_pipeline_best.fit(training)
bce = BinaryClassificationEvaluator(labelCol='FALL')
print("Test Area Under ROC: " + str(bce.evaluate(rf_pipeline_best_fitted.transform(test), {bce.metricName: "areaUnderROC"})))

In [41]:
print("accuracy: " + str(evaluator.evaluate(rf_pipeline_best_fitted.transform(test), {evaluator.metricName: "accuracy"})))

In [42]:
# Confusion Matrix
from sklearn.metrics import confusion_matrix
from sklearn.metrics import f1_score
y_true = test.select("FALL")
y_true = y_true.toPandas()

y_pred = rf_pipeline_best_fitted.transform(test).select("prediction")
y_pred = y_pred.toPandas()

c_matrix = confusion_matrix(y_true, y_pred)
print(f1_score(y_true, y_pred))


In [43]:
# confusion matrix visulization function
# Method from https://stackoverflow.com/questions/19233771/sklearn-plot-confusion-matrix-with-labels
import matplotlib.pyplot as plt
import numpy as np
import itertools

def plot_confusion_matrix(cm, classes,
                          title='Confusion matrix',
                          cmap=plt.cm.bone_r):

    print('Confusion matrix')

    print(cm)

    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar()
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=45)
    plt.yticks(tick_marks, classes)

    fmt = 'd'
    thresh = cm.max() / 2.
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, format(cm[i, j], fmt),
                 horizontalalignment="center",
                 color="white" if cm[i, j] > thresh else "black")

    plt.tight_layout()
    plt.ylabel('True label')
    plt.xlabel('Predicted label')
 

In [44]:
# Plot the Confusion Matrix

class_names = ["No","Yes"]

fig = plt.figure()
plot_confusion_matrix(c_matrix, classes=class_names,
                      title='Confusion matrix')
display(fig)

In [45]:
# Extract Feature Importance
def ExtractFeatureImp(featureImp, dataset, featuresCol):
  list_extract = []
  for i in dataset.schema[featuresCol].metadata["ml_attr"]["attrs"]:
    list_extract = list_extract + dataset.schema[featuresCol].metadata["ml_attr"]["attrs"][i]
  varlist = pd.DataFrame(list_extract)
  varlist['score'] = varlist['idx'].apply(lambda x: featureImp[x])
  return(varlist.sort_values('score', ascending = False))

feature_importance_rf=ExtractFeatureImp(rn_cv_model.bestModel.stages[-1].featureImportances,va.transform(df_fall),"features").head(20)
feature_importance_rf

In [46]:
# Plot the feature importance
sns_plot = sns.barplot(x = feature_importance_rf['score'], y = feature_importance_rf['name'],palette=sns.color_palette("Set2", 6))
display(sns_plot)

### 4.5 Gradient-Boosted Tree

In [48]:
# Gradient-Boosted Tree Classifier
from pyspark.ml.classification import GBTClassifier
gbt = GBTClassifier(maxIter=10,labelCol = 'FALL')

gbt_pipeline = Pipeline(stages=[va,st,gbt])

gbt_pipeline_fitted = gbt_pipeline.fit(training)

print("Test Area Under ROC: " + str(bce.evaluate(gbt_pipeline_fitted.transform(test), {bce.metricName: "areaUnderROC"})))
print("accuracy: " + str(evaluator.evaluate(gbt_pipeline_fitted.transform(test), {evaluator.metricName: "accuracy"})))

In [49]:
#grid search
paramGrid = ParamGridBuilder() \
    .addGrid(gbt.minInfoGain, [int(x) for x in np.linspace(start = 0, stop = 3, num = 1)]) \
    .addGrid(gbt.maxDepth, [int(x) for x in np.linspace(start = 5, stop = 25, num = 3)]) \
    .build()


bce = BinaryClassificationEvaluator(labelCol='FALL')

gbt_cv = CrossValidator(estimator=Pipeline(stages=[va,st, gbt]), evaluator=bce,estimatorParamMaps=paramGrid,numFolds=3)

gbt_model = gbt_cv.fit(training)

In [50]:
# auc score
print("Test Area Under ROC: " + str(bce.evaluate(gbt_model.transform(test), {bce.metricName: "areaUnderROC"})))


In [51]:
# Extract feature importance
feature_importance_gbt=ExtractFeatureImp(gbt_model.bestModel.stages[-1].featureImportances,va.transform(df_fall),"features").head(20)
feature_importance_gbt

In [52]:
# Plot feature importance
sns_plot = sns.barplot(x = feature_importance_rf['score'], y = feature_importance_rf['name'],palette=sns.color_palette("Set2", 6))
display(sns_plot)

### 4.6 Multilayer perceptron

In [54]:
#4. Multilayer perceptron classifier
from pyspark.ml.classification import MultilayerPerceptronClassifier
# specify layers for the neural network:
# input layer of size 4 (features), two intermediate of size 5 and 4
# and output of size 3 (classes)
layers = [6, 100, 100, 2]
# create the trainer and set its parameters
mp = MultilayerPerceptronClassifier(maxIter=100, layers=layers, blockSize=128, seed=1234,labelCol='FALL')


mp1 = Pipeline(stages=[va,st,mp])

mp1_fitted = mp1.fit(training)



In [55]:
# auc score
print("Test Area Under ROC: " + str(bce.evaluate(mp1_fitted.transform(test), {bce.metricName: "areaUnderROC"})))

In [56]:
# Other evaluation method
from pyspark.ml import evaluation
evaluator = evaluation.MulticlassClassificationEvaluator(metricName="accuracy",labelCol='FALL')
evaluator.evaluate(mp1_fitted.transform(test))

In [57]:
# try something different 
mp2 = classification.MultilayerPerceptronClassifier(seed=0,labelCol='FALL').\
    setStepSize(0.2).\
    setMaxIter(200).\
    setLayers([6, 100, 3])
mp2_pipeline_fitted = Pipeline(stages=[va,st,mp2]).fit(training)
evaluator.evaluate(mp2_pipeline_fitted.transform(test))

### 5. Other Ideas

In [59]:
#Filter those data with SL higher than 10000
filtered_df = df_fall.filter(df_fall['SL']< 10000)

filtered_training, filtered_test = filtered_df.randomSplit([0.8, 0.2], 0)
filtered_training.groupBy('FALL').agg(fn.count('*')).show()
filtered_test.groupBy('FALL').agg(fn.count('*')).show()

In [60]:
rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'FALL')
rf_pipeline_sl10000 = Pipeline(stages=[va,st,rf])

rf_pipeline_sl10000_fitted = rf_pipeline_sl10000.fit(training)

print("Test Area Under ROC: " + str(bce.evaluate(rf_pipeline_sl10000_fitted.transform(test), {bce.metricName: "areaUnderROC"})))

In [61]:
# auc score comparation:
y = [0.6077,0.5163,0.7364,0.8923,0.8571,0.611]
x = ['LR','SVM','DT','RN','GBT','ANN']
fig = sns.barplot(x=x,y=y,palette=sns.color_palette("Set2", 6))
fig.set_title('AUC score comparation with different models')
fig.set(ylim=(0.5, 1))
display(fig)

In [62]:
# cost time comparation with some models(mins):
y = [4.4,109, 91,2.5]
x = ['LR','RN','GBT','ANN']
fig = sns.barplot(x=x,y=y,palette=sns.color_palette("Set2", 4))
fig.set_title('cost time comparation with some models (mins)')
fig.set_ylabel('mins')
display(fig)

#### Exploring with misclassification

In [64]:
# using rf_pipeline_best_fitted
df_predict = rf_pipeline_best_fitted.transform(test)

In [65]:
df_predict = df_predict.select('TIME','SL','EEG','BP','HR','FALL','prediction').where(df_predict.FALL != df_predict.prediction)
df_predict.show()

In [66]:
# histgrams of misclassification
fig = plt.figure(figsize=(10,5))

pd_df_pred = df_predict.toPandas()
plt.subplot(121)
sns.distplot(pd_df_pred['TIME'])
plt.title('pred_time distribution')

plt.subplot(122)
sns.distplot(pd_df['TIME'])
plt.title('original time distribution')

display(fig)

In [67]:
# histgrams of misclassification
fig = plt.figure(figsize=(10,5))

pd_df_pred = df_predict.toPandas()
plt.subplot(121)
sns.distplot(pd_df_pred['SL'])
plt.title('pred_SL distribution')

plt.subplot(122)
sns.distplot(pd_df['SL'])
plt.title('original SL distribution')

display(fig)

In [68]:
# histgrams of misclassification
fig = plt.figure(figsize=(10,5))

pd_df_pred = df_predict.toPandas()
plt.subplot(121)
sns.distplot(pd_df_pred['EEG'])
plt.title('pred_EEG distribution')

plt.subplot(122)
sns.distplot(pd_df['EEG'])
plt.title('original EEG distribution')

display(fig)

In [69]:
# histgrams of misclassification
fig = plt.figure(figsize=(10,5))

pd_df_pred = df_predict.toPandas()
plt.subplot(121)
sns.distplot(pd_df_pred['BP'])
plt.title('pred_BP distribution')

plt.subplot(122)
sns.distplot(pd_df['BP'])
plt.title('original BP distribution')

display(fig)