<a href="https://colab.research.google.com/github/aidan-sc/WDCNN_classifier_subset/blob/main/WDCNN_classifier_subset.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Data preprocessing

In [None]:
from google.colab import drive
drive.mount('/content/drive')

import os
print(f"cwd:{os.getcwd()}")

if os.getcwd() != 'content/drive/MyDrive/paderborn':
  os.chdir('./drive/MyDrive/paderborn')

In [None]:
import numpy as np

from paderborn_data_loader_subset import PaderbornData
import time

t_start = time.time()

In [None]:
faults_train = {
    # normal healthy bearings
    'K001': 0,
    #'K002': 0,
    #'K003': 0,
    # 'K004': 0,
    # 'K005': 0,
    # 'K006': 0,
    # artificial damage
    'KA01': 1,
    #'KA03': 1,
    'KA05': 1,
    # 'KA06': 1,
    'KA07': 1,
    #'KA08': 1,
    #'KA09': 1,
    'KI01': 2,
    #'KI03': 2,
    'KI05': 2,
    'KI07': 2,
    #'KI08': 2,
    # real damage
    #'KI04': 1,
    #'KI14': 1,
    # 'KI16': 1,
    # 'KI17': 1,
    # 'KI18': 1,
    # 'KI21': 1,
    # 'KA04': 2,
    # 'KA15': 2,
    #'KA16': 2,
    #'KA22': 2,
    # 'KA30': 2,
    #'KB23': 'IROR',
    #'KB24': 'IROR',
    #'KB27': 'IROR',
}

faults_test = {
    # normal healthy bearings
    #'K001': 0,
    'K002': 0,
    # 'K003': 0,
    #'K004': 0,
    # 'K005': 0,
    # 'K006': 0,
    # artificial damage
    #'KA01': 1,
    #'KA03': 1,
    #'KA05': 1,
    # 'KA06': 1,
    #'KA07': 1,
    # 'KA08': 1,
    # 'KA09': 1,
    #'KI01': 2,
    # 'KI03': 2,
    #'KI05': 2,
    #'KI07': 2,
    # 'KI08': 2,
    # real damage
    #'KI04': 1,
    'KI14': 1,
    'KI16': 1,
    'KI17': 1,
    'KI18': 1,
    'KI21': 1,
    'KA04': 2,
    'KA15': 2,
    'KA16': 2,
    #'KA22': 2,
    'KA30': 2,
    #'KB23': 'IROR',
    #'KB24': 'IROR',
    #'KB27': 'IROR',
}

### Split the Paderborn bearing data into training and testing sets

In [None]:
root_dir = './data/raw/'
experiment = PaderbornData(root_dir, experiment='artificial', datastream='vibration', normalisation='robust-zscore')
x_train, y_train, _, _ = experiment.split_data(250000,
                                                         train_fraction=1,
                                                         window_step=1024,
                                                         window_length=4500,
                                                         faults_idx=faults_train,
                                                         verbose=False)

In [None]:
print(x_train.shape)
print(y_train.shape)

In [None]:
root_dir = './data/raw/'
experiment = PaderbornData(root_dir, experiment='real', datastream='vibration', normalisation='robust-zscore')
_, _, x_test, y_test = experiment.split_data(250000,
                                                         train_fraction=0,
                                                         window_step=1024,
                                                         window_length=4500,
                                                         faults_idx=faults_test,
                                                         verbose=False)

In [None]:
print(x_test.shape)
print(y_test.shape)

## Train our WDCNN model

### First split the data, scale it, and convert labels to one hot encoding

In [None]:
import pandas as pd
import numpy as np
import os

import matplotlib.pyplot as plt

import tensorflow as tf

In [None]:
from tensorflow.keras.utils import to_categorical

y_train = to_categorical(y_train)
y_test = to_categorical(y_test)

### Build our WDCNN model

In [None]:
from tensorflow.keras.models import Model

from tensorflow.keras.layers import Input
from tensorflow.keras.layers import Dense
from tensorflow.keras.layers import Activation
from tensorflow.keras.layers import Conv1D
from tensorflow.keras.layers import BatchNormalization
from tensorflow.keras.layers import Flatten
from tensorflow.keras.layers import MaxPooling1D

from tensorflow.keras.layers import Dropout


# build the wdcnn model
def generate_model(n_class, n_timesteps, n_variables, first_kernel=64):

    # set up the shape of the input
    ip = Input(shape=(n_timesteps, n_variables))

    # convolutional layers
    y = Conv1D(16, (first_kernel), strides=16, padding='same')(ip)
    y = Activation('relu')(y)
    y = BatchNormalization()(y)
    y = MaxPooling1D(2, strides=2, padding='same')(y)

    y = Conv1D(32, (3), padding='same')(y)
    y = Activation('relu')(y)
    y = BatchNormalization()(y)
    y = MaxPooling1D(2, strides=2, padding='same')(y)

    y = Conv1D(64, (3), padding='same')(y)
    y = Activation('relu')(y)
    y = BatchNormalization()(y)
    y = MaxPooling1D(2, strides=2, padding='same')(y)

    y = Conv1D(64, (3), padding='same')(y)
    y = Activation('relu')(y)
    y = BatchNormalization()(y)
    y = MaxPooling1D(2, strides=2, padding='same')(y)

    y = Conv1D(64, (3), padding='same')(y)
    y = Activation('relu')(y)
    y = BatchNormalization()(y)
    y = MaxPooling1D(2, strides=2, padding='same')(y)

    # flatten
    y = Flatten()(y)

    # dense
    y = Dense(100)(y)
    y = BatchNormalization()(y)

    # add the softmax classification outpuy
    out = Dense(n_class, activation='softmax')(y)

    # join the input and the output and return the model
    model = Model(ip, out)
    return model



In [None]:
#/usr/local/lib/python3.10/dist-packages/visualkeras/layered.py
model = generate_model(3, x_train.shape[1], x_train.shape[2], first_kernel=256)

### Train the model

In [None]:
model.compile(optimizer="adam", loss="categorical_crossentropy", metrics='acc')

In [None]:
val_dataset = tf.data.Dataset.from_tensor_slices((x_test, y_test)).batch(64)
history = model.fit(x_train, y_train, batch_size=64, epochs=10, validation_data=val_dataset)

### Plot some results

In [None]:
# summarize history for accuracy
plt.plot(history.history['acc'])
plt.plot(history.history['val_acc'])
plt.title('model accuracy')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
plt.show()

In [None]:
# summarize history for loss
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
plt.show()

In [None]:
loss, acc = model.evaluate(val_dataset)
print("loss: %.2f" % loss)
print("acc:  %.2f" % acc)

### Get the confusion matrix to see what we struggle with

In [None]:
"""
visualisation_utils.py

make pretty graphs to show classifier performance

(most of these are based on the really useful examples from the
scikit learn user guides!)

author:     alex shenfield
date:       27/04/2018
"""

# numpy is needed for everything :)
import numpy as np
import matplotlib.pyplot as plt

# utilities for managing the data
import itertools

# data analysis functions from scikit learn
from sklearn.metrics import confusion_matrix


# get the classes and actually plot the confusion matrix
def plot_confusion_matrix(y_true, y_pred):

    cm = confusion_matrix(y_true, y_pred)
    classes = np.unique(y_true)
    plot_cm(cm, classes=classes, title=None)


# define a function for plotting a confusion matrix
def plot_cm(cm,
            classes,
            normalize=False,
            title='Confusion matrix',
            cmap=plt.cm.Blues):

    # should we normalise the confusion matrix?
    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
        print('Confusion matrix, with normalization')
    else:
        print('Confusion matrix, without normalization')

    # display in command windows
    print(cm)

    # create a plot for the confusion matrix
    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.colorbar()
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=45)
    plt.yticks(tick_marks, classes)

    # if we want a title displayed
    if title:
        plt.title(title)

    fmt = '.3f' if normalize else 'd'
    thresh = cm.max() / 2.
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, format(cm[i, j], fmt),
                 horizontalalignment="center",
                 color="white" if cm[i, j] > thresh else "black")

    #plt.tight_layout()
    plt.ylabel('True label')
    plt.xlabel('Predicted label')


In [None]:
predictions = model.predict(val_dataset)
print(predictions.shape)

In [None]:
y_true = np.argmax(y_test, axis=1)
y_pred = np.argmax(predictions, axis=1)
print(y_true.shape)
print(y_pred.shape)

In [None]:
plot_confusion_matrix(y_true, y_pred)

In [None]:
t_end = time.time()
t_infer = t_end - t_start
print(t_infer)