## Import Packages

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

In [None]:
# Import 'Tensorflow' pakage
import tensorflow as tf
from tensorflow import keras

# Check the version of tensorflow
print(tf.__version__)

In [None]:
# Check if a GPU(in Google server) is allocated
device_name = tf.test.gpu_device_name()
if device_name != '/device:GPU:0':
    raise SystemError('GPU device not found')

print('Found GPU at: {}'.format(device_name))

In [None]:
# Acess to google drive
from google.colab import drive
drive.mount('/content/drive')

.

.

.
# Load Raw Data and Extract Acceleration Data
- Generate single array that consists of every acceleration data (normal and abnormal)

In [None]:
NoOfData = 180

for i in range(NoOfData):
    
    temp_path1 = 'https://github.com/Eunseob/purdue_me597/blob/main/ml_tutorial/Dataset/Normal_%d?raw=true'%(i+1)   # File path of temporary normal data
    temp_path2 = 'https://github.com/Eunseob/purdue_me597/blob/main/ml_tutorial/Dataset/Abnormal_%d?raw=true'%(i+1) # File path of temporary abnormal data

    exec("Normal_%d   = pd.read_csv(temp_path1 , sep=',' , header=None)"%(i+1))
    exec("Abnormal_%d = pd.read_csv(temp_path2 , sep=',' , header=None)"%(i+1))

In [None]:
DataLength = len(Normal_1)

AccData_Nor = pd.DataFrame(np.zeros((NoOfData, DataLength)))
AccData_Abn = pd.DataFrame(np.zeros((NoOfData, DataLength)))

for i in range(NoOfData):
  exec(f"tempNormal   = Normal_{i+1}")
  exec(f"tempAbnormal = Abnormal_{i+1}")

  AccData_Nor.iloc[i,:] = tempNormal.iloc[:,1]
  AccData_Abn.iloc[i,:] = tempAbnormal.iloc[:,1]

AccData = np.array(pd.concat([AccData_Nor, AccData_Abn], axis=0))
AccData.shape

# Convert Acceleration Data into Spectrogram by STFT

[Tip] 

You can define the size of spectrogram (resolution of time and frequency)

by adjusting 'Number of samples(N) per segment (nperseg)' and 'Number of samples(N) for overlap'

In [None]:
from scipy import signal

Fs = 12800  # Sampling Frequency
f,t,AccSTFT = signal.spectrogram(AccData, Fs, nperseg = 78, noverlap = 10)
AccSTFT.shape

Compare spectrograms between normal and abnormal

In [None]:
idx = 1  # Select index (1~180)

plt.figure(figsize=(12,4))

plt.subplot(1,2,1)
plt.pcolormesh(t, f, AccSTFT[idx-1], cmap='jet')
plt.title(f"STFT (Normal_{idx})", fontsize=15)
plt.xlabel('Time(s)', fontsize=12)
plt.ylabel('Frequency(Hz)', fontsize=12)
plt.colorbar()

plt.subplot(1,2,2)
plt.pcolormesh(t, f, AccSTFT[idx+NoOfData-1], cmap='jet')
plt.title(f"STFT (Abnormal_{idx})", fontsize=15)
plt.xlabel('Time(s)', fontsize=12)
plt.colorbar()

plt.show()

.

.

.

.

## Split Training & Test Data
- Use 'train_test_split' function
- It randomly samples the training and testing data according to the designated ratio.

In [None]:
NormalSet   = AccSTFT[:NoOfData]
AbnormalSet = AccSTFT[NoOfData:]

NoOfSensor  = 1
NormalSet   = NormalSet.reshape(NormalSet.shape[0], NormalSet.shape[1], NormalSet.shape[2], NoOfSensor)
AbnormalSet = AbnormalSet.reshape(AbnormalSet.shape[0], AbnormalSet.shape[1], AbnormalSet.shape[2], NoOfSensor)

NormalSet.shape, AbnormalSet.shape

In [None]:
from sklearn.model_selection    import train_test_split

# Designate test data ratio
TestData_Ratio = 0.2 

TrainData_Nor, TestData_Nor = train_test_split(NormalSet  , test_size=TestData_Ratio, random_state=777)
TrainData_Abn, TestData_Abn = train_test_split(AbnormalSet, test_size=TestData_Ratio, random_state=777)

print(TrainData_Nor.shape, TestData_Nor.shape)
print(TrainData_Abn.shape, TestData_Abn.shape)

## Data Labling (One-hot Encoding)
- Use 'np.zeros' and 'np.ones'
- '[1,0]' refers to 'Normal' and '[1,0]' refers to 'Abnormal' in this tutorial

In [None]:
TrainLabel_Nor = np.zeros((TrainData_Nor.shape[0],2))
TrainLabel_Abn = np.ones( (TrainData_Abn.shape[0],2)) 
TestLabel_Nor  = np.zeros((TestData_Nor.shape[0],2))
TestLabel_Abn  = np.ones( (TestData_Abn.shape[0],2)) 

TrainLabel_Nor[:,0] = 1  # [1,0]: Normal
TrainLabel_Abn[:,0] = 0  # [0,1]: Abnormal
TestLabel_Nor[:,0]  = 1  # [1,0]: Normal
TestLabel_Abn[:,0]  = 0  # [0,1]: Abnormal

print(TrainLabel_Nor.shape, TestLabel_Nor.shape)
print(TrainLabel_Abn.shape, TestLabel_Abn.shape)

## Data and Label Preparation

In [None]:
TrainData  = np.concatenate([TrainData_Nor , TrainData_Abn ], axis=0)
TestData   = np.concatenate([TestData_Nor  , TestData_Abn  ], axis=0)
TrainLabel = np.concatenate([TrainLabel_Nor, TrainLabel_Abn], axis=0)
TestLabel  = np.concatenate([TestLabel_Nor , TestLabel_Abn ], axis=0)

print(TrainData.shape,  TestData.shape)
print(TrainLabel.shape, TestLabel.shape)

.

.

.

.

.

## Setting hyperparameters for training CNN(Convolutional Neural Network) 

In [None]:
learningRate  = 0.0001
Epoch         = 2000

## Designing an CNN architecture (based on Keras)

- Types of Convolution layer: https://keras.io/api/layers/convolution_layers/

- Types of Pooling layer: https://keras.io/api/layers/pooling_layers/

- Flatten layer: https://keras.io/api/layers/reshaping_layers/flatten/

In [None]:
def CNN_model(input_data):
    keras.backend.clear_session()

    model = keras.Sequential()
    model.add(keras.layers.InputLayer(input_shape=(input_data.shape[1],input_data.shape[2],input_data.shape[3])))       # Input layer

    model.add(keras.layers.Conv2D(filters = 2, kernel_size=(3,3), strides=(1,1), padding='same', activation='relu'))    # Convolution layer 1
    model.add(keras.layers.MaxPooling2D(pool_size = (2,2), strides=(2,2)))                                              # Pooling layer 1
    model.add(keras.layers.Conv2D(filters = 4, kernel_size=(3,3), strides=(1,1), padding='same', activation='relu'))    # Convolution layer 2
    model.add(keras.layers.MaxPooling2D(pool_size = (2,2), strides=(2,2)))                                              # Pooling layer 2 

    model.add(keras.layers.Flatten())                                                                                   # Flatten layer
    model.add(keras.layers.Dense(units = 10, activation='relu'))                                                        # Dense layer

    model.add(keras.layers.Dense(units = 2, activation='softmax'))                                                      # Output Layer

    model.compile(optimizer= keras.optimizers.Adam(learning_rate = learningRate),
                  loss=keras.losses.CategoricalCrossentropy(),
                  metrics=['accuracy'])
    return model

In [None]:
# Check the model architecture and the number of parameters
CnnModel = CNN_model(TrainData)
CnnModel.summary()

## CNN Model Training

In [None]:
tf.random.set_seed(777) # Not necessarily required

# Model traning and validation
TraingHistory  = CnnModel.fit(TrainData, TrainLabel, epochs=Epoch, verbose = 1)

In [None]:
# Evaluation result for test data (not trained)
Loss, Accuracy = CnnModel.evaluate(TestData,  TestLabel, verbose=0)
Loss, Accuracy # The closer the Loss is to 0 and the closer the accuracy is to 1 (100%), the better.

In [None]:
# Check the training process (Loss, Accuracy)

fig, loss_ax = plt.subplots(figsize=(8,6))
acc_ax = loss_ax.twinx()

loss_ax.plot(TraingHistory.history['loss'], label='train loss', c = 'tab:red')
loss_ax.set_xlabel('epoch', fontsize=15)
loss_ax.set_ylabel('loss', fontsize=15)
loss_ax.legend(loc='center left', fontsize=12)

acc_ax.plot(TraingHistory.history['accuracy'], label='train acc', c = 'tab:blue')
acc_ax.set_ylabel('accuracy', fontsize=15)
acc_ax.legend(loc='center right', fontsize=12)

plt.show()

Save ML model (ANN) as a file

In [None]:
# Unlike SVM or KNN, no 'Joblib' package is needed.

CnnModel.save('/content/drive/MyDrive/Colab Notebooks/SavedFiles/ML_Models/CNN_model.h5')

Load the saved ML model (ANN) and test

In [None]:
LoadedModel = keras.models.load_model('/content/drive/MyDrive/Colab Notebooks/SavedFiles/ML_Models/CNN_model.h5')

Loss, Accuracy = LoadedModel.evaluate(TestData, TestLabel, verbose=0)
print('[Performance of CNN model] \n')
print('Accuracy : {:.2f}%'.format(Accuracy*100))

In [None]:
# Predicted result
Predicted = LoadedModel.predict(TestData)

# Convert TestLabel and Predicted into vectors to calculate the confusion matrix and evaluation metrics
TestLabel_rev = np.argmax(TestLabel, axis=1)
Predicted_rev = np.argmax(Predicted, axis=1)

# Plot the confusion matrix
import seaborn as sns
from sklearn.metrics import confusion_matrix

# Calculate the confusion matrix
cm = confusion_matrix(TestLabel_rev, Predicted_rev)

plt.figure(figsize=(6, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap=plt.cm.Blues, cbar=False, square=True)
plt.xlabel("Predicted label")
plt.ylabel("True label")
plt.title("Confusion Matrix of the CNN Model")
plt.show()

from sklearn import metrics

# Calculate the evaluation metrics
accuracy  = metrics.accuracy_score(TestLabel_rev, Predicted_rev)
precision = metrics.precision_score(TestLabel_rev, Predicted_rev)
recall    = metrics.recall_score(TestLabel_rev, Predicted_rev)
f1_score  = metrics.f1_score(TestLabel_rev, Predicted_rev)

# Print the evaluation metrics
print("\n\n")
print(f"CNN Model Evaluation:\n")
print(f"Accuracy : {accuracy:.2f}")
print(f"Precision: {precision:.2f}")
print(f"Recall   : {recall:.2f}")
print(f"F1 Score : {f1_score:.2f}")