<a href="https://colab.research.google.com/github/aidan-sc/val_WDCNN_subset/blob/main/val_WDCNN_subset.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Data preprocessing

In [None]:
from google.colab import drive
drive.mount('/content/drive')

import os
print(f"cwd:{os.getcwd()}")

if os.getcwd() != 'content/drive/MyDrive/paderborn':
  os.chdir('./drive/MyDrive/paderborn')

In [None]:
import numpy as np

from paderborn_data_loader_subset import PaderbornData

In [None]:
import time

start_time = time.time()

In [None]:
faults_train = {
    # normal healthy bearings
    'K001': 0,
    #'K002': 0,
    #'K003': 0,
    # 'K004': 0,
    # 'K005': 0,
    # 'K006': 0,
    # artificial damage
    'KA01': 1,
    #'KA03': 1,
    'KA05': 1,
    # 'KA06': 1,
    'KA07': 1,
    #'KA08': 1,
    #'KA09': 1,
    'KI01': 2,
    #'KI03': 2,
    'KI05': 2,
    'KI07': 2,
    #'KI08': 2,
    # real damage
    #'KI04': 1,
    #'KI14': 1,
    # 'KI16': 1,
    # 'KI17': 1,
    # 'KI18': 1,
    # 'KI21': 1,
    # 'KA04': 2,
    # 'KA15': 2,
    #'KA16': 2,
    #'KA22': 2,
    # 'KA30': 2,
    #'KB23': 'IROR',
    #'KB24': 'IROR',
    #'KB27': 'IROR',
}

faults_test = {
    # normal healthy bearings
    #'K001': 0,
    'K002': 0,
    # 'K003': 0,
    #'K004': 0,
    # 'K005': 0,
    # 'K006': 0,
    # artificial damage
    #'KA01': 1,
    #'KA03': 1,
    #'KA05': 1,
    # 'KA06': 1,
    #'KA07': 1,
    # 'KA08': 1,
    # 'KA09': 1,
    #'KI01': 2,
    # 'KI03': 2,
    #'KI05': 2,
    #'KI07': 2,
    # 'KI08': 2,
    # real damage
    #'KI04': 1,
    'KI14': 1,
    'KI16': 1,
    'KI17': 1,
    'KI18': 1,
    'KI21': 1,
    'KA04': 2,
    'KA15': 2,
    'KA16': 2,
    #'KA22': 2,
    'KA30': 2,
    #'KB23': 'IROR',
    #'KB24': 'IROR',
    #'KB27': 'IROR',
}

### Split the Paderborn bearing data into training and testing sets

In [None]:
root_dir = './data/raw/'
experiment = PaderbornData(root_dir, experiment='artificial', datastream='vibration', normalisation='robust-zscore')

x_data, y_data, _, _ = experiment.split_data(250000,
                                                         train_fraction=1,
                                                         window_step=1024,
                                                         window_length=4500,
                                                         faults_idx=faults_train,
                                                         verbose=False)

In [None]:
print(x_data.shape)
print(y_data.shape)

## Train our WDCNN model

### First split the data, scale it, and convert labels to one hot encoding

In [None]:
import pandas as pd
import numpy as np
import os
import time
import matplotlib.pyplot as plt

import tensorflow as tf

In [None]:
from tensorflow.keras.utils import to_categorical

# y_train = to_categorical(y_train)
# y_test = to_categorical(y_test)

### Build our WDCNN model

In [None]:
from tensorflow.keras.models import Model

from tensorflow.keras.layers import Input
from tensorflow.keras.layers import Dense
from tensorflow.keras.layers import Activation
from tensorflow.keras.layers import Conv1D
from tensorflow.keras.layers import BatchNormalization
from tensorflow.keras.layers import Flatten
from tensorflow.keras.layers import MaxPooling1D

from tensorflow.keras.layers import Dropout

# build the wdcnn model
def generate_model(n_class, n_timesteps, n_variables, first_kernel=64):

    # set up the shape of the input
    ip = Input(shape=(n_timesteps, n_variables))

    # convolutional layers
    y = Conv1D(16, (first_kernel), strides=16, padding='same')(ip)
    y = Activation('relu')(y)
    y = BatchNormalization()(y)
    y = MaxPooling1D(2, strides=2, padding='same')(y)

    y = Conv1D(32, (3), padding='same')(y)
    y = Activation('relu')(y)
    y = BatchNormalization()(y)
    y = MaxPooling1D(2, strides=2, padding='same')(y)

    y = Conv1D(64, (3), padding='same')(y)
    y = Activation('relu')(y)
    y = BatchNormalization()(y)
    y = MaxPooling1D(2, strides=2, padding='same')(y)

    y = Conv1D(64, (3), padding='same')(y)
    y = Activation('relu')(y)
    y = BatchNormalization()(y)
    y = MaxPooling1D(2, strides=2, padding='same')(y)

    y = Conv1D(64, (3), padding='same')(y)
    y = Activation('relu')(y)
    y = BatchNormalization()(y)
    y = MaxPooling1D(2, strides=2, padding='same')(y)

    # flatten
    y = Flatten()(y)

    # dense
    y = Dense(100)(y)
    y = BatchNormalization()(y)

    # add the softmax classification outpuy
    out = Dense(n_class, activation='softmax')(y)

    # join the input and the output and return the model
    model = Model(ip, out)

    return model


In [None]:
# model = generate_model(3, x_train.shape[1], x_train.shape[2], first_kernel=256)

### Train the model

In [None]:
# model.compile(optimizer="adam", loss="categorical_crossentropy", metrics='acc')

In [None]:
# val_dataset = tf.data.Dataset.from_tensor_slices((x_test, y_test)).batch(64)
# history = model.fit(x_train, y_train, batch_size=64, epochs=10, validation_data=val_dataset)

# Validate the Model

In [None]:
# set the root directory for results
results_dir = ('./weights/wdcnn_loads/' +
               'cross_validation/{0}').format(time.strftime("%Y%m%d_%H%M"))

In [None]:
from pickle import FALSE

import gc
# progress ...
print('doing cross validation ...')

# fix random seed for reproduciblity
seed = 1337
np.random.seed(seed)

# store the accuracy results
accuracies = [list() for i in range(len(x_data))]

# store the ground truths and predictions for target load(s) (for each fold)
# so we can produce confusion plots later
ground_truth = [list() for i in range(len(x_data))]
predictions  = [list() for i in range(len(x_data))]

xval_history = list()
final_accuracy = list()
final_loss = list()

from sklearn.model_selection import StratifiedKFold
from keras.callbacks import ModelCheckpoint
from keras import backend
from numba import cuda

# create the kfold object with 10 splits
n_folds = 10
kf = StratifiedKFold(n_splits=n_folds, shuffle=True, random_state=seed)

# run the crossvalidation
fold = 0
i=False

for train_index, test_index in kf.split(x_data, y_data):
    if i == True:
      del model, t_data, x_data_t, y_data_t, x_train, y_train, x_test, y_test
      gc.collect()
      tf.keras.backend.clear_session()
    # progress ...
    print('evaluating fold {0}'.format(fold))

    # set up a model checkpoint callback (including making the directory where
    # to save our weights)
    directory = results_dir + 'fold_{0}/'.format(fold)
    os.makedirs(directory, exist_ok=True)
    filename  = 'weights.{epoch:02d}-{val_loss:.2f}.hdf5'
    checkpointer = ModelCheckpoint(filepath=directory+filename,
                                   verbose=0,
                                   save_best_only=True)
    # get train and test data
    x_train, y_train = x_data[train_index], y_data[train_index]
    x_test, y_test   = x_data[test_index], y_data[test_index]

    # one hot encode the labels
    y_train = to_categorical(y_train)
    y_test  = to_categorical(y_test)

    # build and compile the model
    model = generate_model(3, x_train.shape[1], x_train.shape[2], first_kernel=256)

    model.compile(optimizer="adam", loss="categorical_crossentropy", metrics='acc')

    val_dataset = tf.data.Dataset.from_tensor_slices((x_test, y_test)).batch(64)
    history = model.fit(x_train, y_train, batch_size=64, epochs=10, validation_data=val_dataset)

    # save the final model
    model.save(directory + 'final_model.h5')

    # store the training history
    xval_history.append(history.history)

    # print the validation result
    final_loss.append(history.history['val_loss'][-1])
    final_accuracy.append(history.history['val_acc'][-1])
    print('validation loss is {0} and accuracy is {1}'.format(final_loss[-1],
          final_accuracy[-1]))

    # get the data (using testing sets - i.e. non overlapping windows)
    #t_data = PaderbornData(data_path, experiment, [l], normalisation=normalisation)
    t_data = PaderbornData(root_dir, experiment='real', datastream='vibration', normalisation='robust-zscore')

    # _, _, x_data_t, y_data_t = t_data.split_data(360000,
    #                                                     train_fraction=0.0,
    #                                                     window_length=window_length,
    #                                                     verbose=False)

    experiment = PaderbornData(root_dir, experiment='real', datastream='vibration', normalisation='robust-zscore')
    _, _, x_data_t, y_data_t = experiment.split_data(250000,
                                                            train_fraction=0,
                                                            window_step=2048,
                                                            window_length=4500,
                                                            faults_idx=faults_test,
                                                            verbose=False)

    # reformat the data for use in predictions
    x_t = x_data_t
    y_true_t = y_data_t
    y_t = to_categorical(y_data_t)

    # evaluate
    scores_t = model.evaluate(x=x_t, y=y_t, batch_size=128, verbose=0)
    print('scores = {0}'.format(scores_t[1]))

    # what do we get wrong?
    y_pred_t = model.predict(x_t)

    # # store the results
    # accuracies[ix].append(scores_t[1])
    # ground_truth[ix].append(y_t)
    # predictions[ix].append(y_pred_t)


    # display the results for this fold so we can see how we are doing ...

    # done testing generalisation ...


    # next fold ...
    fold = fold + 1

### Plot some results

In [None]:
# summarize history for accuracy
plt.plot(history.history['acc'])
plt.plot(history.history['val_acc'])
plt.title('model accuracy')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
plt.show()

In [None]:
# summarize history for loss
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
plt.show()

In [None]:
loss, acc = model.evaluate(val_dataset)
print("loss: %.2f" % loss)
print("acc:  %.2f" % acc)

### Get the confusion matrix to see what we struggle with

In [None]:
"""
visualisation_utils.py

make pretty graphs to show classifier performance

(most of these are based on the really useful examples from the
scikit learn user guides!)

author:     alex shenfield
date:       27/04/2018
"""

# numpy is needed for everything :)
import numpy as np
import matplotlib.pyplot as plt

# utilities for managing the data
import itertools

# data analysis functions from scikit learn
from sklearn.metrics import confusion_matrix


# get the classes and actually plot the confusion matrix
def plot_confusion_matrix(y_true, y_pred):

    cm = confusion_matrix(y_true, y_pred)
    classes = np.unique(y_true)
    plot_cm(cm, classes=classes, title=None)


# define a function for plotting a confusion matrix
def plot_cm(cm,
            classes,
            normalize=False,
            title='Confusion matrix',
            cmap=plt.cm.Blues):

    # should we normalise the confusion matrix?
    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
        print('Confusion matrix, with normalization')
    else:
        print('Confusion matrix, without normalization')

    # display in command windows
    print(cm)

    # create a plot for the confusion matrix
    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.colorbar()
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=45)
    plt.yticks(tick_marks, classes)

    # if we want a title displayed
    if title:
        plt.title(title)

    fmt = '.3f' if normalize else 'd'
    thresh = cm.max() / 2.
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, format(cm[i, j], fmt),
                 horizontalalignment="center",
                 color="white" if cm[i, j] > thresh else "black")

    #plt.tight_layout()
    plt.ylabel('True label')
    plt.xlabel('Predicted label')

In [None]:
predictions = model.predict(val_dataset)
print(predictions.shape)

In [None]:
y_true = np.argmax(y_test, axis=1)
y_pred = np.argmax(predictions, axis=1)
print(y_true.shape)
print(y_pred.shape)

In [None]:
plot_confusion_matrix(y_true, y_pred)

In [None]:
end_time = time.time()

print(end_time - start_time)