#### Installing Dependencies

In [None]:
!pip install torchmetrics



In [None]:
!sudo apt install octave

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
octave is already the newest version (6.4.0-2).
0 upgraded, 0 newly installed, 0 to remove and 20 not upgraded.


In [None]:
!pip install oct2py



#### Importing Pkgs

In [None]:
import torch;
import torch.nn as nn;
import torch.nn.functional as F;
from torch.utils.data import Dataset, DataLoader;

import matplotlib.pyplot as plt;
import numpy as np;

from tqdm import tqdm;

In [None]:
from scipy import signal;
from skimage.transform import resize;

In [None]:
import os;
import os.path;

In [None]:
import numpy as np;
import pandas as pd;
import matplotlib.pyplot as plt;

In [None]:
import re;

In [None]:
import json;

In [None]:
from torchmetrics.classification import Accuracy,Precision, Recall, F1Score, ConfusionMatrix;

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu");
print(device);

cuda


In [None]:
from oct2py import Oct2Py

oc = Oct2Py();

#### Dataset Creation

In [None]:
def min_max_norm(ary):
    ary = (ary - ary.min()) / np.abs(ary.max() - ary.min())
    return ary

In [None]:
def convertSignalToSpectrogram2d(filename, field = "vibration"):

    fieldToIndex = {
        "force" : 0,
        "current_1" : 1,
        "current_2" : 2,
        "speed" : 3,
        "temp" : 4,
        "torque" : 5,
        "vibration" : 6,
    };


    data = oc.load(filename);
    ex = list(data.keys())[0];

    x = data[ex]['Y']['Data'][0][fieldToIndex[field]].flatten();
    fs = 64000;

    f,t,sxx = signal.spectrogram(x, fs);
    sxx_db = 10*np.log10(sxx + 1e-10);

    sxx_db = resize(sxx_db, (256,256), mode="constant", anti_aliasing=True);
    sxx_db_nrm = (sxx_db - np.mean(sxx_db)) / np.std(sxx_db);

    return sxx_db_nrm

In [None]:
def getYLabel(filename):
    output = {
            "K" : 0,
            "KA" : 1,
            "KI" : 2
    };

    code = filename.split("_")[3];

    if code.startswith("KA"):
        return output["KA"];
    elif code.startswith("KI"):
        return output["KI"];
    else:
        return output["K"];

In [None]:
class CustomDataset(Dataset):
    def __init__(self, root_dir, code, healty_end_points, faulty_end_points,oc, index, width, signal="vibration"):
        self.root_dir = root_dir;
        self.healty_end_points = healty_end_points;
        self.faulty_end_points = faulty_end_points;
        self.oc = oc;
        self.index = index;

        self.width = width;

        self.X = [];
        self.Y = [];

        self.signal = signal;

        for healty_end_point in self.healty_end_points:
            for mat_healthy_end_point in os.listdir(os.path.join(self.root_dir, healty_end_point)):
                if mat_healthy_end_point.endswith(".mat") and mat_healthy_end_point.startswith(code):
                    self.X.append(os.path.join(self.root_dir, healty_end_point, mat_healthy_end_point));
                    self.Y.append(getYLabel(mat_healthy_end_point));

        for faulty_end_point in self.faulty_end_points:
            for mat_faulty_end_point in os.listdir(os.path.join(self.root_dir, faulty_end_point)):
                if mat_faulty_end_point.endswith(".mat") and mat_faulty_end_point.startswith(code):
                    self.X.append(os.path.join(self.root_dir, faulty_end_point, mat_faulty_end_point));
                    self.Y.append(getYLabel(mat_faulty_end_point));

    def __len__(self):
        return len(self.X);

    def __getitem__(self, idx):
        x = convertSignalToSpectrogram2d(self.X[idx], field=self.signal);
        y = torch.tensor(self.Y[idx]);

        X = torch.tensor(x).to(torch.float32);

        return X.unsqueeze(dim=0),y;

In [None]:
def getCode(N, M, F):
    return f"N{N:02}" + "_" + f"M{M:02}" + "_" + f"F{F:02}";

In [None]:
# ratios = {2:5, 6:5, 8:5}
def getDataset(code, signal = "vibration", width=256000, ratios="6:5"):
    root_dir = "/content/drive/MyDrive/PaderbornExtracted";
    healty_end_points = ["K001","K002","K003","K004","K005","K006"];
    faulty_end_points_inner = ["KI04","KI14","KI16","KI17","KI18","KI21"];
    faulty_end_points_outer = ["KA04","KA15","KA16","KA22","KA30"];

    f,h = ratios.split(":");

    f,h = int(f), int(h);

    healthy_training_end_points = healty_end_points[:h];
    healthy_testing_end_points = healty_end_points[h:];

    faulty_training_end_points = faulty_end_points_inner[:f//2] + faulty_end_points_outer[:f//2];
    faulty_testing_end_points = faulty_end_points_inner[f//2:] + faulty_end_points_outer[f//2:];

    train_dataset = CustomDataset(root_dir, code, healthy_training_end_points, faulty_training_end_points, oc, width, signal);
    test_dataset = CustomDataset(root_dir, code, healthy_testing_end_points, faulty_testing_end_points, oc, width, signal);

    return train_dataset, test_dataset;

In [None]:
N = [9,15];
M = [7,1];
Fa = [10,4];

code = getCode(15,7,10);

In [None]:
training_dataset, testing_dataset = getDataset(code,signal="vibration",width=256000,ratios="6:5");

N15_M07_F10_K001_1.mat
N15_M07_F10_K001_2.mat
N15_M07_F10_K001_3.mat
N15_M07_F10_K001_4.mat
N15_M07_F10_K001_5.mat
N15_M07_F10_K001_6.mat
N15_M07_F10_K001_7.mat
N15_M07_F10_K001_8.mat
N15_M07_F10_K001_9.mat
N15_M07_F10_K001_10.mat
N15_M07_F10_K001_11.mat
N15_M07_F10_K001_12.mat
N15_M07_F10_K001_13.mat
N15_M07_F10_K001_14.mat
N15_M07_F10_K001_15.mat
N15_M07_F10_K001_16.mat
N15_M07_F10_K001_17.mat
N15_M07_F10_K001_18.mat
N15_M07_F10_K001_20.mat
N15_M07_F10_K001_19.mat
N15_M07_F10_K002_1.mat
N15_M07_F10_K002_2.mat
N15_M07_F10_K002_4.mat
N15_M07_F10_K002_5.mat
N15_M07_F10_K002_6.mat
N15_M07_F10_K002_11.mat
N15_M07_F10_K002_14.mat
N15_M07_F10_K002_15.mat
N15_M07_F10_K002_17.mat
N15_M07_F10_K002_20.mat
N15_M07_F10_K002_3.mat
N15_M07_F10_K002_7.mat
N15_M07_F10_K002_8.mat
N15_M07_F10_K002_10.mat
N15_M07_F10_K002_13.mat
N15_M07_F10_K002_16.mat
N15_M07_F10_K002_9.mat
N15_M07_F10_K002_12.mat
N15_M07_F10_K002_19.mat
N15_M07_F10_K002_18.mat
N15_M07_F10_K003_1.mat
N15_M07_F10_K003_2.mat
N15_M07_F10_

In [None]:
training_dataloader = DataLoader(training_dataset, batch_size=32, shuffle=True);
testing_dataloader = DataLoader(testing_dataset, batch_size=32, shuffle=True);

#### Model Creation

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class CNNModel(nn.Module):
    def __init__(self):
        super(CNNModel, self).__init__()

        self.conv1 = nn.Conv2d(in_channels=1, out_channels=5, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(in_channels=5, out_channels=10, kernel_size=3, padding=1)
        self.conv3 = nn.Conv2d(in_channels=10, out_channels=10, kernel_size=3, padding=1)

        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)

        self.fc1 = nn.Linear(10 * 64 * 64, 10)  # Adjusted for 256x256 input
        self.fc2 = nn.Linear(10, 5)
        self.fc3 = nn.Linear(5, 3)

        self.dropout = nn.Dropout(p=0.2)

    def forward(self, x):
        x = F.leaky_relu(self.conv1(x), negative_slope=0.01)
        x = F.leaky_relu(self.conv2(x), negative_slope=0.01)
        x = self.pool(x)
        x = F.leaky_relu(self.conv3(x), negative_slope=0.01)
        x = self.pool(x)

        x = torch.flatten(x, start_dim=1)
        x = F.leaky_relu(self.fc1(x), negative_slope=0.01)
        x = F.leaky_relu(self.fc2(x), negative_slope=0.01)
        x = self.dropout(x)
        x = self.fc3(x)

        return F.softmax(x, dim=1)

In [None]:
model = CNNModel();
model = model.to(device);

print(model);

CNNModel(
  (conv1): Conv2d(1, 5, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (conv2): Conv2d(5, 10, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (conv3): Conv2d(10, 10, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (pool): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (fc1): Linear(in_features=40960, out_features=10, bias=True)
  (fc2): Linear(in_features=10, out_features=5, bias=True)
  (fc3): Linear(in_features=5, out_features=3, bias=True)
  (dropout): Dropout(p=0.5, inplace=False)
)


In [None]:
loss = nn.CrossEntropyLoss();
optimizer = torch.optim.AdamW(model.parameters(), lr = 0.0001);

In [None]:
accuracy = Accuracy(task="multiclass", num_classes=3).to(device);
precision = Precision(task="multiclass", num_classes=3).to(device);
recall = Recall(task="multiclass", num_classes=3).to(device);
f1_score = F1Score(task="multiclass", num_classes=3).to(device);
conf_matrix_metric = ConfusionMatrix(task="multiclass", num_classes=3).to(device);

In [None]:
result_dir = "/content/drive/MyDrive/Results/CNN2D";
if not os.path.exists(result_dir):
    os.makedirs(result_dir);

#### Model Training

In [None]:
epochs = 25;

In [None]:
metrics_list = [];

for epoch in range(epochs):

    train_loss_per_epoch = [];
    test_loss_per_epcoh = [];

    train_acc_per_epoch = [];
    test_acc_per_epoch = [];

    train_precision_per_epoch = [];
    test_precision_per_epoch = [];

    train_recall_per_epoch = [];
    test_recall_per_epoch = [];

    train_f1_per_epoch = [];
    test_f1_per_epoch = [];

    traing_cf = np.zeros((3,3));
    test_cf = np.zeros((3,3));

    model.train();
    for x, y in training_dataloader:
        optimizer.zero_grad();
        x = x.to(device);
        y = y.to(device);

        y_pred = model(x);
        loss_value = loss(y_pred, y);

        loss_value.backward();
        optimizer.step();

        acc = accuracy(y_pred, y);
        prec = precision(y_pred, y);
        rec = recall(y_pred, y);
        f1 = f1_score(y_pred, y);
        conf_matrix = conf_matrix_metric(y_pred, y);

        traing_cf += conf_matrix.cpu().numpy();

        train_loss_per_epoch.append(loss_value.item());
        train_acc_per_epoch.append(acc.item());
        train_precision_per_epoch.append(prec.item());
        train_recall_per_epoch.append(rec.item());
        train_f1_per_epoch.append(f1.item());

    model.eval();
    with torch.no_grad():
        for x, y in testing_dataloader:
            x = x.to(device);
            y = y.to(device);

            y_pred = model(x);
            loss_value = loss(y_pred, y);

            acc = accuracy(y_pred, y);
            prec = precision(y_pred, y);
            rec = recall(y_pred, y);
            f1 = f1_score(y_pred, y);
            conf_matrix = conf_matrix_metric(y_pred, y);

            test_cf += conf_matrix.cpu().numpy();

            test_loss_per_epcoh.append(loss_value.item());
            test_acc_per_epoch.append(acc.item());
            test_precision_per_epoch.append(prec.item());
            test_recall_per_epoch.append(rec.item());
            test_f1_per_epoch.append(f1.item());


    metrics_list.append({
        "train_loss": np.mean(train_loss_per_epoch),
        "test_loss": np.mean(test_loss_per_epcoh),
        "train_acc": np.mean(train_acc_per_epoch),
        "test_acc": np.mean(test_acc_per_epoch),
        "train_precision": np.mean(train_precision_per_epoch),
        "test_precision": np.mean(test_precision_per_epoch),
        "train_recall": np.mean(train_recall_per_epoch),
        "test_recall": np.mean(test_recall_per_epoch),
        "train_f1": np.mean(train_f1_per_epoch),
        "test_f1": np.mean(test_f1_per_epoch),
        "train_cf": json.dumps(traing_cf.tolist()),
        "test_cf": json.dumps(test_cf.tolist())
    });

    maxx = max(metrics_list, key=lambda x: x["test_f1"]);

    if maxx["test_f1"] <= metrics_list[-1]["test_f1"]:
        torch.save(model.state_dict(), os.path.join(result_dir,"bb_mm.pth"));

    print(f"Epoch {epoch+1}/{epochs} | Train Loss: {metrics_list[-1]['train_loss']:.4f} | Test Loss: {metrics_list[-1]['test_loss']:.4f} | Train Acc: {metrics_list[-1]['train_acc']:.4f} | Test Acc: {metrics_list[-1]['test_acc']:.4f} | Train F1: {metrics_list[-1]['train_f1']:.4f} | Test F1: {metrics_list[-1]['test_f1']:.4f}");

Epoch 1/25 | Train Loss: 0.8457 | Test Loss: 0.9639 | Train Acc: 0.7462 | Test Acc: 0.4948 | Train F1: 0.7462 | Test F1: 0.4948
Epoch 2/25 | Train Loss: 0.8387 | Test Loss: 0.9030 | Train Acc: 0.7385 | Test Acc: 0.6641 | Train F1: 0.7385 | Test F1: 0.6641
Epoch 3/25 | Train Loss: 0.8097 | Test Loss: 0.8800 | Train Acc: 0.7634 | Test Acc: 0.6068 | Train F1: 0.7634 | Test F1: 0.6068
Epoch 4/25 | Train Loss: 0.8357 | Test Loss: 0.9205 | Train Acc: 0.7411 | Test Acc: 0.6406 | Train F1: 0.7411 | Test F1: 0.6406
Epoch 5/25 | Train Loss: 0.8633 | Test Loss: 0.8641 | Train Acc: 0.7385 | Test Acc: 0.6224 | Train F1: 0.7385 | Test F1: 0.6224
Epoch 6/25 | Train Loss: 0.7982 | Test Loss: 0.9798 | Train Acc: 0.7761 | Test Acc: 0.4870 | Train F1: 0.7761 | Test F1: 0.4870
Epoch 7/25 | Train Loss: 0.8273 | Test Loss: 0.8820 | Train Acc: 0.6939 | Test Acc: 0.5443 | Train F1: 0.6939 | Test F1: 0.5443
Epoch 8/25 | Train Loss: 0.8124 | Test Loss: 0.9323 | Train Acc: 0.7277 | Test Acc: 0.6510 | Train F1: 0

#### Storing Results

In [None]:
df_epochs = pd.DataFrame(metrics_list);
df_epochs.to_csv(os.path.join(result_dir,"result.csv"),index=False);