<a href="https://colab.research.google.com/github/LaZoark/FaultNet/blob/main/cwru_fault_classification.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# !wget https://github.com/LaZoark/FaultNet/raw/main/CWRU.zip
!unzip /content/CWRU.zip

Archive:  /content/CWRU.zip
   creating: CWRU files/
  inflating: CWRU files/featurized_data.npy  
  inflating: CWRU files/featurized_data_labels.npy  
  inflating: CWRU files/signal_data.npy  
  inflating: CWRU files/signal_data_labels.npy  


In [2]:
import numpy as np
import pandas as pd

In [3]:
import torch
import torch.nn as nn
from torchvision.datasets import CIFAR10
from torchvision.transforms import transforms
from torch.utils.data import DataLoader
from torch.optim import Adam
from torch.autograd import Variable
import torch.nn.functional as F

In [4]:
# data = np.load('CWRU_dataset.npy')
# labels = np.load('CWRU_lables.npy')
data = np.load('/content/CWRU files/signal_data.npy')
labels = np.load('/content/CWRU files/signal_data_labels.npy')

In [5]:
x=data[:, 0:1600]

In [6]:
def mean(data, no_elements):
    X=np.zeros((data.shape[0], data.shape[1]))
    for i in range(data.shape[1]-no_elements+1):
        X[:, i]=np.mean(data[:, i:i+no_elements], axis=1)
    return X.astype(np.float16)
def median(data, no_elements):
    X=np.zeros((data.shape[0], data.shape[1]))
    for i in range(data.shape[1]-no_elements+1):
        X[:, i]=np.median(data[:, i:i+no_elements], axis=1)
    return X.astype(np.float16)
def sig_image(data, size):
    X=np.zeros((data.shape[0], size, size))
    for i in range(data.shape[0]):
        X[i]=(data[i, :].reshape(size, size))
    return X.astype(np.float16)

In [7]:
channel_mean=(mean(x, 10)).astype(np.float16)
x_m=sig_image(channel_mean, 40)
channel_median=(median(x, 10)).astype(np.float16)
x_md=sig_image(x, 40)

In [8]:
x_n=sig_image(x, 40)

In [9]:
x_n.shape

(2800, 40, 40)

In [10]:
x_m.shape

(2800, 40, 40)

In [11]:
X=np.stack((x_n, x_m, x_md), axis=1).astype(np.float16)

In [12]:
X.shape

(2800, 3, 40, 40)

In [13]:
from sklearn.model_selection import train_test_split
trainx,  testx,  trainlabel,  testlabel = train_test_split(X,  labels,  test_size=0.2,  random_state=20)

In [14]:
sig_train, sig_test = trainx, testx
lab_train, lab_test = trainlabel, testlabel

In [15]:
sig_train = torch.from_numpy(sig_train)
sig_test = torch.from_numpy(sig_test)
lab_train= torch.from_numpy(lab_train)
lab_test = torch.from_numpy(lab_test)

In [16]:
import torch.utils.data as data_utils
batch_size = 128 
train_tensor = data_utils.TensorDataset(sig_train, lab_train) 
train_loader = data_utils.DataLoader(dataset=train_tensor, batch_size=batch_size, shuffle=True)

In [17]:
batch_size = 1024
test_tensor = data_utils.TensorDataset(sig_test, lab_test) 
test_loader = data_utils.DataLoader(dataset=test_tensor, batch_size=batch_size, shuffle=False)

In [18]:
sig_train.size()

torch.Size([2240, 3, 40, 40])

In [19]:
sig_test.size()

torch.Size([560, 3, 40, 40])

In [20]:
class CNN(nn.Module):
    def __init__(self):
        super(CNN, self).__init__()
        self.conv1 = nn.Conv2d(3, 32, kernel_size=4, stride=1, padding=1)
        self.mp1 = nn.MaxPool2d(kernel_size=4, stride=2)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=4, stride=1)
        self.mp2 = nn.MaxPool2d(kernel_size=4, stride=2)
        self.fc1= nn.Linear(2304, 256)
        self.dp1 = nn.Dropout(p=0.2)
        self.fc2 = nn.Linear(256, 10)

    def forward(self, x):
        in_size = x.size(0)
        x = F.relu(self.mp1(self.conv1(x)))
        x = F.relu(self.mp2(self.conv2(x)))
        x = x.view(in_size, -1)
        x = F.relu(self.fc1(x))
        x = self.dp1(x)
        x = self.fc2(x)
        
        return F.log_softmax(x, dim=1)

In [21]:
cnn = CNN().double()

In [22]:
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(cnn.parameters(), lr=0.001)

In [23]:
num_epochs = 100

In [24]:
total_step = len(train_loader)
loss_list = []
acc_list = []
for epoch in range(num_epochs):
    for i, (signals, labels) in enumerate(train_loader):
        optimizer.zero_grad()
        # Run the forward pass
        signals=signals
        labels=labels
        outputs = cnn(signals.double())
        loss = criterion(outputs,  labels.long())
        
        loss_list.append(loss.item())

        # Backprop and perform Adam optimisation
        
        loss.backward()
        optimizer.step()
        # Track the accuracy
        total = labels.size(0)
        _,  predicted = torch.max(outputs.data,  1)
        correct = (predicted == labels.long()).sum().item()
        acc_list.append(correct / total)

        if (epoch+1) % 5 == 0 or epoch==0:
            print('Epoch [{}/{}],  Step [{}/{}],  Loss: {:.4f},  Train Accuracy: {:.2f}%'
                  .format(epoch + 1,  num_epochs,  i + 1,  total_step,  loss.item(), 
                          (correct / total) * 100))
        

Epoch [1/100],  Step [1/18],  Loss: 2.3034,  Train Accuracy: 7.03%
Epoch [1/100],  Step [2/18],  Loss: 2.1506,  Train Accuracy: 9.38%
Epoch [1/100],  Step [3/18],  Loss: 2.1171,  Train Accuracy: 14.06%
Epoch [1/100],  Step [4/18],  Loss: 2.0692,  Train Accuracy: 11.72%
Epoch [1/100],  Step [5/18],  Loss: 2.0657,  Train Accuracy: 18.75%
Epoch [1/100],  Step [6/18],  Loss: 1.8881,  Train Accuracy: 21.09%
Epoch [1/100],  Step [7/18],  Loss: 2.0054,  Train Accuracy: 8.59%
Epoch [1/100],  Step [8/18],  Loss: 2.0169,  Train Accuracy: 11.72%
Epoch [1/100],  Step [9/18],  Loss: 1.9566,  Train Accuracy: 17.19%
Epoch [1/100],  Step [10/18],  Loss: 1.7607,  Train Accuracy: 27.34%
Epoch [1/100],  Step [11/18],  Loss: 1.8369,  Train Accuracy: 21.09%
Epoch [1/100],  Step [12/18],  Loss: 1.8315,  Train Accuracy: 20.31%
Epoch [1/100],  Step [13/18],  Loss: 1.7869,  Train Accuracy: 32.81%
Epoch [1/100],  Step [14/18],  Loss: 1.8298,  Train Accuracy: 36.72%
Epoch [1/100],  Step [15/18],  Loss: 1.6931,  

In [25]:
total_step = len(test_loader)
print(total_step)
loss_list_test = []
acc_list_test = []
with torch.no_grad():
    for i,  (signals,  labels) in enumerate(test_loader):
        # Run the forward pass
        signals=signals
        labels=labels
        outputs = cnn(signals.double())
        loss = criterion(outputs,  labels.long())
        loss_list_test.append(loss.item())
        if epoch%10 ==0:
            print(loss)
        total = labels.size(0)
        _,  predicted = torch.max(outputs.data,  1)
        correct = (predicted == labels.long()).sum().item()
        acc_list_test.append(correct / total)
        if (epoch) % 1 == 0:
            print('Epoch [{}/{}],  Step [{}/{}],  Loss: {:.4f},  Accuracy: {:.2f}%'
                  .format(epoch + 1,  num_epochs,  i + 1,  total_step,  loss.item(), 
                          (correct / total) * 100))

1
Epoch [100/100],  Step [1/1],  Loss: 0.0667,  Accuracy: 98.04%


In [26]:
# if you need to save
torch.save(cnn, 'cwru_9804.pth')