In [None]:
import pandas as pd
import numpy as np
from tqdm import tqdm
import math
import matplotlib.pyplot as plt
import tensorflow as tf
from keras.models import Sequential
from keras.layers import Dense
from keras.layers import Flatten
from keras.layers.convolutional import Conv1D
from keras.layers.convolutional import MaxPooling1D
from keras.callbacks import CSVLogger
from numpy import mean
from numpy import std
from keras import backend as K
from keras.models import load_model
from tensorflow.keras.callbacks import Callback
import six, csv
from collections import OrderedDict, Iterable
from sklearn.model_selection import cross_val_score
from keras.wrappers.scikit_learn import KerasClassifier
from sklearn.model_selection import KFold
from keras.utils import np_utils
from sklearn.preprocessing import LabelEncoder
from keras.layers import Dropout
from keras.layers import SpatialDropout1D
from sklearn.metrics import precision_score

# CNN Model (Classification)

### Import dataset

In [None]:
# loading the datasets
sc_x_train = np.load('sc_x_train_class.npy')
sc_y_train = np.load('sc_y_train_class.npy')
sc_x_test = np.load('sc_x_test.npy')
sc_y_test = np.load('sc_y_test_class.npy')
y_test = np.load('y_test_class.npy', allow_pickle = True)

In [None]:
print(sc_x_train.shape)
print(sc_y_train.shape)
print(sc_x_test.shape)
print(sc_y_test.shape)
print(y_test)

### Define and evaluate model

In [None]:
def evaluate_model(trainX, trainy, testX, testy, rawy):

    # define model architecture
    verbose, epochs, batch_size = 0, 100, 100
    n_timesteps, n_features = trainX.shape[1], trainX.shape[2]
    model_class = Sequential()
    model_class.add(Conv1D(filters=20, kernel_size=2, activation='relu', input_shape=(n_timesteps,n_features), kernel_initializer = 'he_normal'))
    model_class.add(SpatialDropout1D(0.5))
    model_class.add(Conv1D(filters=10, kernel_size=2, activation='relu', kernel_initializer = 'he_normal'))
    model_class.add(MaxPooling1D(pool_size=3))
    model_class.add(Flatten())
    model_class.add(Dropout(0.5))
    model_class.add(Dense(20, activation='relu', kernel_initializer = 'he_normal'))
    model_class.add(Dense(6, activation = 'softmax'))
    model_class.compile(loss='categorical_crossentropy', metrics =['accuracy'], optimizer='adam')
    
    # fit network
    csv_logger = CSVLogger('training_data_class.log')
    history = model_class.fit(trainX, trainy, epochs=epochs, batch_size=batch_size, validation_data = (testX, testy), callbacks=[csv_logger], verbose=verbose)
    
    # save everything for later
    wfname = "weights_class.hdf5"
    fname = "model_class.hdf5"
    model_class.save_weights(wfname,overwrite = True)
    model_class.save(fname, overwrite = True)
    
    # evaluate model
    loss , _ = model_class.evaluate(testX, testy, batch_size=batch_size, verbose=verbose)
    print('> %.3f' % (loss))
    
    #predict model
    prediction = model_class.predict(testX, verbose = 0)
    report = np.argmax(prediction, axis = -1)
        # inverse transform model predictions
    encoder = LabelEncoder()
    encoder.fit(rawy)
    report1 = encoder.inverse_transform(report)
    pre = precision_score(rawy, report1, labels=['good', 'hazardous', 'moderate', 'unhealthy', 'unhealthy for sensitive groups', 'very unhealthy'], average='micro')
    print('> %.3f' % (pre * 100.0))
    
    return loss, pre

In [None]:
# run simulations
scores, error = list(), list()

def run_experiment(repeats=10):
    # repeat experiment
    for r in range(repeats):
        loss, pre = evaluate_model(sc_x_train, sc_y_train, sc_x_test, sc_y_test, y_test)
        scores.append(pre)
        error.append(loss)

run_experiment()

### Presenting results

In [None]:
# loading last repetition of model
fname = "model_class.hdf5"
model_class = load_model(fname)

# Loading model training history 
history=pd.read_csv("training_data_class.log")

In [None]:
# plot loss
plt.subplot(211)
plt.title('Cross Entropy Loss')
plt.plot(history['loss'], color='g', label='Training loss')
plt.plot(history['val_loss'], color='b', label='Validation loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.show

In [None]:
# plot accuracy
plt.subplot(212)
plt.title('Classification Accuracy')
plt.plot(history['accuracy'], color='g', label='Training accuracy')
plt.plot(history['val_accuracy'], color='b', label='Validation accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.show

In [None]:
# summarize model performance
def summarize_performance(scores, loss):
    # print summary
    print('mean=%.3f std=%.3f, n=%d' % (mean(loss), std(loss), len(loss)))
    print('mean=%.3f std=%.3f, n=%d' % (mean(scores)*100, std(scores)*100, len(scores)))

In [None]:
# summarize estimated loss
summarize_performance(scores = scores ,loss = error)

### Checking

In [None]:
# loading last repetition of model
fname = "model_class.hdf5"
model_class = load_model(fname)

# Loading model training history 
history=pd.read_csv("training_data_class.log")

In [None]:
print(model_class.summary())

In [None]:
top_layer = model_class.layers[0]
plt.imshow(top_layer.get_weights()[0][:, :, 0].squeeze(), cmap='gray')

In [None]:
#checking output of the model
model_prediction = model_class.predict(sc_x_test, verbose = 0)

In [None]:
model_prediction

In [None]:
som = np.sum(model_prediction[0,:])
som

In [None]:
# convert back to numbers

report = np.argmax(model_prediction, axis = -1)

In [None]:
report

In [None]:
# inverse transform model predictions

encoder = LabelEncoder()
encoder.fit(y_test)
report2 = encoder.inverse_transform(report)

In [None]:
# plotting predictions

target = y_test
prediction = report2
targe = target[0:21]
pred =prediction[0:21]
patterns = range(0,21)
plt.plot(patterns, pred, 'g', label='Model predictions')
plt.plot(patterns, targe, 'b', label='Target values')
plt.xticks(patterns)
plt.title('Target vs Predicted values')
plt.xlabel('Patterns')
plt.ylabel('Classes')
plt.legend()
plt.show()

### Precision Score

In [None]:
len(target)

In [None]:
from sklearn.metrics import precision_score
precision = precision_score(target, prediction, labels=['good', 'hazardous', 'moderate', 'unhealthy', 'unhealthy for sensitive groups', 'very unhealthy'], average='micro')
print('Precision: %.3f' % precision)

### Custom test

In [None]:
correct = 0
for a in range(0,len(y_test)):
    if y_test[a] == report2[a]:
        correct  = correct +1
    else:
        pass

accuracy_test = correct/len(y_test)*100
accuracy_test

### Extreme cases

In [None]:
# extreme cases from 150 onwards
x_test_ex = np.reshape(sc_x_test, (sc_x_test.shape[0],sc_x_test.shape[1]*sc_x_test.shape[2]))
print(x_test_ex.shape)
y_test_ex = np.reshape(y_test, (y_test.shape[0],1))
print(y_test_ex.shape)
y_test_ex

In [None]:
checking = np.concatenate((x_test_ex, y_test_ex), axis = 1)
print(checking.shape)
extreme = checking[:,105:106]
extreme

In [None]:
checking2 = checking[(extreme[:,0]=='hazardous')|(extreme[:,0]=='very unhealthy')]
print(checking2.shape)
checking3 = checking2[:,0:105]
print(checking3.shape)

In [None]:
checking4 = np.reshape(checking3, (checking3.shape[0],sc_x_test.shape[1], sc_x_test.shape[2]))
checking4.shape

In [None]:
extreme_predict = model_class.predict(checking4, verbose = 0)

In [None]:
report3 = np.argmax(extreme_predict, axis = -1)

In [None]:
report4 = encoder.inverse_transform(report3)

In [None]:
checking5 = checking2[:,105:106]

In [None]:
# plotting extreme predictions

target = list(checking5.flatten())
prediction = list(report2.flatten())
targe = target[0:21]
pred =prediction[0:21]
patterns = range(0,21)
plt.plot(patterns, pred, 'g', label='Model predictions')
plt.plot(patterns, targe, 'b', label='Target values')
plt.xticks(patterns)
plt.title('Target vs Predicted values')
plt.xlabel('Patterns')
plt.ylabel('Classes')
plt.legend()
plt.show()