# Lab : Spark DataFrames and ML Pipeline in Analytics Zoo

### Overview

Analytics Zoo is well integrated with piplines from Apache Spark Machine Learning (aka MLLib).   The preferred API in Spark is the new (as of 2.x) dataframes-based API, which we will show here.

We can use the processing of the 


### Runtime
30 mins

## Step 1 -About Data

[About data](https://archive.ics.uci.edu/ml/datasets/heart+Disease)

This is a classification dataset.  We have a number of patient attributes that wewill use to try to predict the outcome variable, which we are calling `target`.   

The input variables are as follows:
1. Age (`age`)
2. Sex (`sex`)
3. CP  (`cp`)
4. Resting Blood Pressure (`trestbps`)
5. FBS (`fbs`)
6. Resting ECG (`restecg`)
7. thalach (`thalach`)
8. oldpeak (`oldpeak`)
9. slope (`slope`)
10. ca (`ca`)
11. thal (`thal`)


Note that the `thal` variable is categorical, having values such as `fixed`, `normal`, or `reversible`.  We are going to need to index this categorical variable.


Sample Data:
```text
age,sex,cp,trestbps,chol,fbs,restecg,thalach,exang,oldpeak,slope,ca,thal,target
63,1,1,145,233,1,2,150,0,2.3,3,0,fixed,0
67,1,4,160,286,0,2,108,1,1.5,2,3,normal,1
67,1,4,120,229,0,2,129,1,2.6,2,2,reversible,0
37,1,3,130,250,0,0,187,0,3.5,3,0,normal,0
41,0,2,130,204,0,2,172,0,1.4,1,0,normal,0

```

## Step 2 - Init

Here we will be initializign Analytics Zoo

In [None]:
import zoo
from zoo.common.nncontext import init_nncontext

sc = init_nncontext("Cardiac")
print("zoo version : ", zoo.__version__)

## Spark UI
print('Spark UI running on http://localhost:' + sc.uiWebUrl.split(':')[2])
sc

## Step 3 - Explore Dataset

### 3.1 - Load Data

We will load the data.  It can be found [here](https://elephantscale-public.s3.amazonaws.com/data/heart/heart.csv).

In [None]:
!wget 'https://elephantscale-public.s3.amazonaws.com/data/heart/heart.csv'

In [None]:

data = spark.read.csv("heart.csv", \
                      header=True, inferSchema=True)
print("record count ", data.count())
data = data.na.drop()
print ("clean data count ", data.count())
data.printSchema()
data.show()

### 3.2 - Basic Exploration

Let's first do a describe funciton and see that.

In [None]:
data.describe().toPandas().T

In [None]:
data.groupBy('target').count().show()

### 3.3 - Graph

Let's look at a breakdown of the frequency of the output.  Based on what we see here, the output is slightly unbalanced, but not dramatically.  We will have to see if our confusion matrix reflects this inbalance.

In [None]:
## basic frequency graph

%matplotlib inline
import pandas as pd
import matplotlib.pyplot as plt

a = data.groupBy("target").count().toPandas()
print(a)
a = a.set_index('target')
a.plot(kind='bar', rot=0)
plt.show()

## Step 4 - Create Feature Vectors

Here, we will be 

### 4.1 - No zeroes in Target Label column

First, Analytics zoo requires us to have all values in the target column as `1` or hgher.  Since in this case our target column is indexed from zero we will simply add a `1` to it.

In [None]:
# Analyitics-Zoo prefers no 0 (zero) in label column
# so we will to add +1 to label
data = data.withColumn("target2", data['target']+1)
data.groupBy("target2").count().show()
data.show(10)

### Convert Categorical Columns

Here we are going to convert categorical columns using the `StringIndexer` class.  This is not as sophisticated as doing a `OneHotEncoder` or something similar, but it will do for this situation.


In [None]:
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol="thal", outputCol="thalIndex")
data = indexer.fit(data).transform(data)

### Select Columns of Interest

Now let's select columms of interest:

1. Age
2. Sex
3. trestbps
4. chol
5. thalach
6. oldpeak
7. slope
8. ca
9. thalIndex (the converted thal column from the last step)



In [None]:
feature_columns = ['age', 'sex', 'trestbps', 'chol', 'thalach', 'oldpeak', 'slope', 'ca', 'thalIndex']
target_columns = ['target2']

data = data.select(feature_columns + target_columns)
data.show()

### 4.2 - Convert to Double
Analytics Zoo likes all numbers in Double. So we will do a simple cast operation to make it that.

In [None]:
from pyspark.sql.functions import col, udf
# convert everything to double
data = data.select([col(c).cast("double") for c in data.columns])
data.printSchema()
data.show(5)

### 4.3 - Feature Vector

Here we will create the feature vector in a new column called `assembled`, using the `VectorAssembler` class and our columns.

In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.types import DoubleType

assembler = VectorAssembler (inputCols=feature_columns, outputCol='assembled')
fv = assembler.transform(data)
fv = fv.withColumn ('label', fv['target2'])
fv.show(5)

### 4.4 - Scaling

We will use the `StandardScaler` class to standardize all values using Z-Scoring. This will help avoid problems with differing magnitudes of features.


In [None]:
from pyspark.ml.feature import StandardScaler

scaler = StandardScaler (inputCol="assembled", outputCol="features")
fv = scaler.fit(fv).transform(fv)
fv.show(5)

In [None]:

fv.printSchema()
fv.limit(5).toPandas()

## Step 5 - Split training / validation

We will do a simple train/test split here of 70% training and 30% test.

In [None]:
(training, validation) = fv.randomSplit([0.7,0.3])
print("training set count ", training.count())
print("validation set count ", validation.count())

## Step 6 - Design Network

### 6.1 - Designing the network
Here's a picture of a simple neural network, like what we have in this example:



As you can see, we have a total of 4 layers:

1. Input layer (sized as number of features -- in this case 9 : 'a' -- 'h')
2. Hidden Layer (size we have to specify as part of the model).
3. Hidden Layer (size we will also specify)
3. Output Layer (Number of output classes we are trying to classify -- in this case 2)

## Step  7 -  Create the Network

### 7.1 - Network HyperParameters

Here we are going to set some of of hyperparameters of our network. If we want to tweak things, this is the place to start doing so.

In [None]:
learning_rate = 0.001  
training_epochs = 100
# batch size should be multiple of number of cores.
# So powers of two is a good bet
batch_size = 16

# Network Parameters
n_input = len(feature_columns)
n_classes=2
n_hidden_1 = 128 # 1st layer number of neurons
n_hidden_2 = 128  # 2nd layer number of neurons

### 7.2 - setup network

Our network here has a total of 2 hidden layers

In [None]:
# using keras style APIs to build model
from zoo.pipeline.api.keras.models import Sequential
from zoo.pipeline.api.keras.layers import Dense


## 4 layers = input [9] + first hidden [128]  + second hind layer [128] + output [2]
nn = Sequential().add(Dense(n_hidden_1, input_dim=n_input)).\
                  add(Dense(n_hidden_2)).\
                  add(Dense(n_classes, activation="log_softmax"))

In [None]:
# Create NNClassifier ML pipleline to train model
from bigdl.nn.criterion import ClassNLLCriterion
from zoo.pipeline.nnframes import  NNClassifier
from zoo.pipeline.api.keras.optimizers import Adam

estimator = NNClassifier(nn, ClassNLLCriterion(), [n_input])

estimator.setMaxEpoch(training_epochs)\
            .setBatchSize(batch_size)\
            .setLearningRate(learning_rate)

estimator.setLabelCol("label").setFeaturesCol("features")

# optimizer method, default is SGD
estimator.setOptimMethod(Adam())

print ("nn \n", nn)

## Step 8 - Train / Predict

### 8.1 - Train

Now let's start training. Notice that our `NNCLassifier` class fron `nnframes` corresponds with the Spark MLLib `estimator.fit` semantics.

In [None]:
%%time 

## training
print ("starting training...")
model = estimator.fit(training)
print("initial model training finished.")

### 8.2 -  Prediction

Here we are going to call `.transform()` on our model to get our validation data. This will give us a reasonable estimate of how we did.  Naturally, we want to use test data rather than training data here.

In [None]:
%%time

predictions = model.transform(validation)

In [None]:
predictions.groupBy("prediction").count().show()
predictions.sample(False, 0.1).limit(5).toPandas()

## Step 9 - Evalauating

### 9.1 - Basic Eval

Let's look at our matching vs missed predictions:

In [None]:
print ("matching predictions ", predictions.filter("prediction == label").count())
print ("missed predictions ", predictions.filter("prediction != label").count())

### 9.2 - Accuracy, Precision, AUC

Let's get the Accuracy, Precision, and AUC (Area Under the Curve of ROC Curve).




In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction")
auPRC = evaluator.evaluate(predictions)
print("Area under precision-recall curve = " , auPRC)
    
recall = MulticlassClassificationEvaluator(metricName="weightedRecall").evaluate(predictions)
print("recall = " , recall)

precision = MulticlassClassificationEvaluator(metricName="weightedPrecision").evaluate(predictions)
print("Precision = ", precision)

accuracy = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy").\
            evaluate(predictions)
print("accuracy = ",  accuracy)

### 9.3 - Confusion Matrix

In [None]:
# Confusion matrix
# we use Spark to calculate confusion matrix as the prediction set can be rather large
cm = predictions.groupBy('label').pivot('prediction', [1,2]).count().na.fill(0).orderBy('label')
cm.show()

In [None]:
# basic imports

import matplotlib.pyplot as plt
import seaborn as sn

cm_pd = cm.toPandas()
# print(cm_pd)
cm_pd = cm_pd.set_index('label')  # make 'label' as index
# print(cm_pd)

plt.figure(figsize = (10,8))
sn.heatmap(cm_pd, annot=True,fmt='d');

## Step 10 - Experiment
Try the following :
- increase number of hidden layers (3 --> 4 --> 5)
- you can also adjust the number of neurons on each 

See if you can improve the accuracy and confusion matrix.