In [1]:
### This file containes codes written as part of the research "Towards End-to-end Deep Learning Analysis of Electrical Machines"
### The manuscript is available at:
### https://www.techrxiv.org/articles/preprint/Towards_End-to-end_Deep_Learning_Analysis_of_Electrical_Machines/13134932/1
### The datasets are available at:
### https://data.mendeley.com/datasets/j7bbrmmn4k/1

### Codes here are set up to work with datasets that have "average torque" data
### For torque curve datasets please see another file

### Because CNNs trained on average torque datasets performed relatively poorly, this file does not contain a data visualization section
### Please refer to other files for more information

model is created


In [1]:
### Custom shallow 3-layer network

import os, os.path
import cv2
import numpy as np
import time
from tqdm import tqdm
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

import csv
import pandas as pd

img_size = 56

class Net(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(1, 32, 5)
        self.conv2 = nn.Conv2d(32, 64, 5)
        self.conv3 = nn.Conv2d(64, 128, 5) 

        x = torch.randn(img_size,img_size).view(-1,1,img_size,img_size) 
        self._to_linear = None
        self.convs(x)

        self.fc1 = nn.Linear(self._to_linear, 512)
        self.fc2 = nn.Linear(512, 1)

    def convs(self, x):
        x = F.max_pool2d(F.relu(self.conv1(x)), (2,2)) 
        x = F.max_pool2d(F.relu(self.conv2(x)), (2,2))
        x = F.max_pool2d(F.relu(self.conv3(x)), (2,2))

        if self._to_linear is None:
            self._to_linear = x[0].shape[0]*x[0].shape[1]*x[0].shape[2]
        return x

    def forward(self, x):
        x = self.convs(x)
        x = x.view(-1, self._to_linear)
        x = F.relu(self.fc1(x)) 
        x = self.fc2(x)
        return x 
    
device = 'cuda' if torch.cuda.is_available() else 'cpu'

net = Net().to(device)

print("model is created")

model is created


In [None]:
### ResNet

import os, os.path
import cv2
import numpy as np
import time
from tqdm import tqdm
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

import csv
import pandas as pd

class block(nn.Module):
    def __init__(self, in_channels, intermediate_channels, identity_downsample=None, stride=1):
        super(block, self).__init__()
        self.expansion = 4
        self.conv1 = nn.Conv2d(in_channels, intermediate_channels, kernel_size = 1, stride=1, padding=0)
        self.bn1 = nn.BatchNorm2d(intermediate_channels)
        self.conv2 = nn.Conv2d(intermediate_channels, intermediate_channels, 
                               kernel_size = 3, stride=stride, padding=1)
        self.bn2 = nn.BatchNorm2d(intermediate_channels)
        self.conv3 = nn.Conv2d(intermediate_channels, intermediate_channels*self.expansion, 
                               kernel_size = 1, stride=1, padding=0)
        self.bn3 = nn.BatchNorm2d(intermediate_channels*self.expansion)
        self.relu = nn.ReLU()
        self.identity_downsample = identity_downsample
        self.stride = stride
        
    def forward(self, x):
        identity = x

        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.conv2(x)
        x = self.bn2(x)
        x = self.relu(x)
        x = self.conv3(x)
        x = self.bn3(x)
        
        if self.identity_downsample is not None:
            identity = self.identity_downsample(identity)

        x += identity
        x = self.relu(x)
        return x
        
class ResNet(nn.Module):
    def __init__(self, block, layers, image_channels, num_classes):
        super(ResNet, self).__init__()
        self.in_channels = 64
        self.conv1 = nn.Conv2d(image_channels, 64, kernel_size=7, stride=2, padding=3)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU()
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        
        self.layer1 = self._make_layer(block, layers[0], intermediate_channels = 64, stride=1)
        self.layer2 = self._make_layer(block, layers[1], intermediate_channels = 128, stride = 2)
        self.layer3 = self._make_layer(block, layers[2], intermediate_channels = 256, stride = 2)
        self.layer4 = self._make_layer(block, layers[3], intermediate_channels = 512, stride = 2)
        
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512 * 4, num_classes)
    
    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = x.reshape(x.shape[0], -1)
        x = self.fc(x)

        return x
        
    def _make_layer(self, block, num_residual_blocks, intermediate_channels, stride):
        identity_downsample = None
        layers = []

        if stride != 1 or self.in_channels != intermediate_channels * 4:
            identity_downsample = nn.Sequential(nn.Conv2d(self.in_channels, intermediate_channels*4, kernel_size=1,
                                                          stride=stride),
                                                nn.BatchNorm2d(intermediate_channels*4))
        
        layers.append(block(self.in_channels, intermediate_channels, identity_downsample, stride))      
        self.in_channels = intermediate_channels * 4

        for i in range(num_residual_blocks - 1):
            layers.append(block(self.in_channels, intermediate_channels))
            
        return nn.Sequential(*layers)
    
def ResNet50(img_channel=1,num_classes=1):
    num_classes = num_classes
    return ResNet(block, [3, 4, 6, 3], img_channel, num_classes)
    
device = 'cuda' if torch.cuda.is_available() else 'cpu'
net = ResNet50(img_channel=1,num_classes=1).to(device)

print("model is created")    

In [2]:
MODEL_NAME = f"model-{int(time.time())}"

all_data = np.load("56_average_cust_norm.npy", allow_pickle=True)

VAL_PCT = 0.2
val_size = int(len(all_data)*VAL_PCT)

training_data = all_data[:-val_size].tolist()
validation_data = all_data[-val_size:].tolist()
print("all, training, val lengths: ", len(all_data),len(training_data),len(validation_data))

loader_train = DataLoader(training_data, batch_size=64,num_workers=0,pin_memory = True)
loader_val = DataLoader(validation_data, batch_size=8,num_workers=0,pin_memory = True)

img_size = 56

print("data is loaded")

log_val_acc = []
log_val_loss = []

all, training, val lengths:  4054 3244 810
data is loaded


In [3]:
optimizer = optim.Adam(net.parameters(), lr=0.001)
scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, milestones=[70,140,190], gamma=0.1)
loss_function = nn.MSELoss()     

def fwd_pass(X, y, train=False):

    if train:
        net.zero_grad() 
    outputs = net(X)
    error = [abs(abs(i)-abs(j)) / abs(j) for i, j in zip(outputs, y)]
    for ind,er in enumerate(error):
        if er > 1:
            error[ind] = 1
        elif er < 0:
            error[ind] = 0
    acc = 1 - sum(error)/len(error)
    loss = loss_function(outputs, y) #computes losses
    
    loss = loss * 100
    
    if train:
        loss.backward() #backprop
        optimizer.step() #runs the optimizer

    return acc, loss

def train(net):
    BATCH_SIZE = 8
    EPOCHS = 202

    with open("model.log", "a") as f:
        for epoch in range(EPOCHS):
            #print("current epoch: ", epoch)
            if (epoch+1)%1000 == 0:
                print("training {}th epoch(+1)".format(epoch+1))
                val_acc, val_loss = test(size=BATCH_SIZE)
                print(val_acc, val_loss)
                log_val_acc.append(val_acc)
                log_val_loss.append(val_loss)
                for param_group in optimizer.param_groups:
                    print("checking learning rate! epoch {}, lr {}".format(epoch, param_group['lr']))
                    
            if epoch == 0 or epoch%5 == 0 or epoch == (EPOCHS-1):
                torch.save(net.state_dict(), "model_states/state_ResNet_meany_normy_epoch_{}.pt".format(epoch+1))
            
            for i, data in enumerate(tqdm(loader_train)):
                
                batch_X = data[0]
                batch_y = data[1]
                
                batch_X = batch_X.view(-1,1,img_size,img_size)
                batch_y = batch_y.view(-1,1)

                batch_X, batch_y = batch_X.to(device, dtype=torch.float), batch_y.to(device)

                acc, loss = fwd_pass(batch_X, batch_y, train=True) #.detach().item() 
                
                if i % 10 == 0:
                    val_acc, val_loss = test(size=BATCH_SIZE)
                    f.write(f"{MODEL_NAME},{round(time.time(),3)},{round(float(acc),2)},{round(float(loss), 4)},{round(float(val_acc),2)},{round(float(val_loss),4)}\n")
            
            if (epoch+1)%5 == 0:
                print("training loss in {}th epoch(+1)".format(epoch+1))
                print(acc, loss)
                for param_group in optimizer.param_groups:
                    print("checking learning rate! epoch {}, lr {}".format(epoch, param_group['lr']))
            
            scheduler.step()
            
def test(size=8):
    BATCH_SIZE = size
    X = torch.Tensor([i[0] for i in validation_data[:BATCH_SIZE]])
    y = torch.Tensor([i[1] for i in validation_data[:BATCH_SIZE]]) 
    X = X.view(-1,1,img_size,img_size).to(device, dtype = torch.float)
    y = y.view(-1,1).to(device)
    val_acc, val_loss = fwd_pass(X, y)
   
    return val_acc, val_loss

print("functions have been defined")

#lines below are only for validation on a previously trained CNN with parameters stored in a file "dictionary_name.pt"

#net.load_state_dict(torch.load("dictionary_name.pt",map_location="cuda:0"))
#net.eval()

functions have been defined


In [None]:
train(net)
print("finished")

 71%|███████   | 36/51 [00:02<00:00, 16.23it/s]

In [5]:
### TRAINING DATA ACCURACY CHECK ###

all_train = [None] *len(training_data)

for check_one in range(len(training_data)):
    if check_one%500 == 0:
        print("checking {}'th sample".format(check_one))
    X_one = torch.Tensor(training_data[check_one][0]) 
    y_net = net(X_one.view(-1, 1, img_size, img_size).to(device))[0]
    y_one = torch.Tensor([i[1] for i in training_data[check_one:check_one+2]]).to(device)  
    
    error = abs(torch.mean(y_net-y_one)) / torch.mean(y_one) 
    error = error.cpu().detach().numpy()

    if error > 1:
        error = 1
    elif error < 0:
        error = 0

    acc = 1 - error
    
    y_net = y_net.cpu().detach().numpy() #another modification to use numpy 
    y_one = y_one.cpu().detach().numpy()
    
    all_train[check_one] = acc
    
acc_train = sum(all_train)/len(all_train)

print("training accuracy:",acc_train)

checking 0'th sample
checking 500'th sample
checking 1000'th sample
checking 1500'th sample
checking 2000'th sample
checking 2500'th sample
checking 3000'th sample
training accuracy: 0.5879950818301979


In [7]:
### Checking the distribution of errors

y = torch.Tensor([i[1] for i in training_data])
counter = {0.1: 0, 0.5: 0, 2.5:0, 10:0 }
print(len(y))
for ans in y:
    if ans < 0.2:
        counter[0.1] +=1
    if ans > 0.2 and ans <1:
        counter[0.5] += 1
    if ans > 1 and ans <5:
        counter[2.5] += 1
    if ans > 5:
        counter[10] += 1
print(counter)

3244
{0.1: 1576, 0.5: 1660, 2.5: 8, 10: 0}
