## Prepare Library

In [48]:
import os
import torch
import torchmetrics
from torchmetrics.classification.accuracy import Accuracy
import pandas as pd 
import numpy as np
import random
import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from torch.autograd import Variable 
from tqdm.auto import tqdm
from scipy import integrate
from model import LSTM
from random import sample
import torch.nn as nn
from scipy.signal import find_peaks
from livelossplot import PlotLosses


## Prepare CPU

In [None]:
print(torch.__version__ )
print(torch.version.cuda)
print(torch.cuda.is_available())
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

## Dataset

In [None]:
Index = ['Channel1', 'Channel2', 'Channel3', 'Channel4', 'Channel5', 'Channel6']

Data_path = 'D:/Workspace/TENG-Signal-Classification/dataset/preprocessed'

cases = os.listdir(Data_path)

random.shuffle(cases)

## Load data

In [None]:
test_set = cases[:50]

valid_set = cases[50:100]

train_set = cases[100:]

print('lenght of train set:', len(train_set))
print('lenght of valid set:', len(valid_set))
print('valid set:', valid_set)
print('lenght of test set:', len(test_set))
print('test set:', test_set)

In [None]:
x_train_data = [] 
y_train_data = []
x_valid_data = []
y_valid_data = []
x_test_data = []
y_test_data = []

print('| loading train set data..... |')
for case in train_set:
    label = case.split('_')[0]
    #   Read csv
    file = case + '.csv'
    data = pd.read_csv(Data_path + '/' + case + '/' + file, usecols = Index)
    #   Convert lable into int
    Encode_label = {
            'C1': 0,
            'C2': 1,
            'C3': 2,
            'C4': 3,
            'C5': 4,
            'C6': 5, 
    }
    x_train_data.append(data)
    y_train_data.append(Encode_label[label])
print('| done |')

print('| loading valid set data..... |')
for case in valid_set:
    label = case.split('_')[0]
    #   Read csv
    file = case + '.csv'
    data = pd.read_csv(Data_path + '/' + case + '/' + file, usecols = Index)
    #   Convert lable into int
    Encode_label = {
            'C1': 0,
            'C2': 1,
            'C3': 2,
            'C4': 3,
            'C5': 4,
            'C6': 5, 
    }
    x_valid_data.append(data)
    y_valid_data.append(Encode_label[label])
print('| done |')

print('| loading test set data..... |')
for case in test_set:
    label = case.split('_')[0]
    #   Read csv
    file = case + '.csv'
    data = pd.read_csv(Data_path + '/' + case + '/' + file, usecols = Index)
    #   Convert lable into int
    Encode_label = {
            'C1': 0,
            'C2': 1,
            'C3': 2,
            'C4': 3,
            'C5': 4,
            'C6': 5, 
    }
    x_test_data.append(data)
    y_test_data.append(Encode_label[label])
print('| done |')

ss = StandardScaler()

X_train_tensors = []
Y_train_tensors = []
X_valid_tensors = []
Y_valid_tensors = []
X_test_tensors = []
Y_test_tensors = []


print('| train set data to tensor..... |')
## To tensors
for i in range(len(train_set)):
    X = x_train_data[i]
    Y = y_train_data[i]
    X_ss = ss.fit_transform(X)
    
    X_tensors = torch.Tensor(abs(X_ss))
    Y_tensors = torch.tensor(Y)

    X_train_tensors.append(X_tensors.to(device))
    Y_train_tensors.append(Y_tensors.to(device))
print('| done |')

print('| valid data to tensor..... |')
for i in range(len(valid_set)):
    X = x_valid_data[i]
    Y = y_valid_data[i]
    X_ss = ss.fit_transform(X)
    
    X_tensors = torch.Tensor(abs(X_ss))
    Y_tensors = torch.tensor(Y)

    X_valid_tensors.append(X_tensors.to(device))
    Y_valid_tensors.append(Y_tensors.to(device))
print('| done |')

print('| test data to tensor..... |')
for i in range(len(test_set)):
    X = x_test_data[i]
    Y = y_test_data[i]
    X_ss = ss.fit_transform(X)
    
    X_tensors = torch.Tensor(abs(X_ss))
    Y_tensors = torch.tensor(Y)

    X_test_tensors.append(X_tensors.to(device))
    Y_test_tensors.append(Y_tensors.to(device))
print('| done |')

## Training 

In [None]:
num_epochs = 100
learning_rate = 1e-4

n_features = len(Index) #number of features
n_hidden = 32 #number of features in hidden state
n_layers = 2 #number of stacked lstm layers
n_classes = 6 #number of output classes 
dropout = 0.2
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [None]:
lstm = LSTM(n_features, n_classes, n_hidden, n_layers, dropout, device) #our lstm class
lstm.to(device)
criterion = torch.nn.CrossEntropyLoss() 
optimizer = torch.optim.Adam(lstm.parameters(), lr=learning_rate) 
accuracy = Accuracy(task="multiclass", num_classes=6)
train_loss = []
train_acc = []

trainer_path = 'G:/Workspace/TENG-Signal-Classification/trainer'

valid_loss = []

valid_acc = []

to_one_hot = [[1,0,0,0,0,0],[0,1,0,0,0,0],[0,0,1,0,0,0],[0,0,0,1,0,0],[0,0,0,0,1,0],[0,0,0,0,0,1]]

for epoch in tqdm(range(num_epochs)):
    l = []
    pre = []
    # mix = []
    log = {}
    liveloss = PlotLosses()
    
    for index, case in enumerate(train_set):
        # rand_index = random.randint(0,len(train_set) - 1)
        x1 = X_train_tensors[index]
        # x2 = X_train_tensors[rand_index]
        label1 = Y_train_tensors[index]
        # label2 = Y_train_tensors[rand_index]
        one_hot1 = to_one_hot[label1]
        # one_hot2 = to_one_hot[label2]

        # lam = np.random.beta(1, 1)
        # x_mixed_up = lam * x1 + (1 - lam) * x2
        # y_mixed_up = lam * torch.FloatTensor(one_hot1) + (1 - lam) * torch.FloatTensor(one_hot2)

        outputs = lstm.forward(x1) #forward pass
        optimizer.zero_grad() #caluclate the gradient, manually setting to 0

        # obtain the loss function
        step_loss = criterion(torch.unsqueeze(outputs,0), torch.unsqueeze(torch.FloatTensor(one_hot1),0).to(device))
        l.append(step_loss.item())

        outputs = outputs.softmax(dim = 0)
        prediction = torch.argmax(outputs)
        pre.append(prediction)
        step_loss.backward() #calculates the loss of the loss function
        optimizer.step() #improve from loss, i.e backprop

    acc = accuracy(torch.tensor(pre), torch.tensor(Y_train_tensors))
    train_acc.append(acc.item())
    loss = np.mean(l) 
    train_loss.append(loss)
    best_train_loss = min(train_loss)
    
    if epoch < 10:
        print("Epoch: %d | train loss: %1.5f, best: %1.5f | accuracy: %1.5f, best: %1.5f" % (epoch, loss, best_train_loss, acc, max(train_acc))) 
    else:
        print("Epoch: %d| train loss: %1.5f, best: %1.5f | accuracy: %1.5f, best: %1.5f" % (epoch, loss, best_train_loss, acc, max(train_acc))) 

    # if epoch % 5 == 0 or epoch == num_epochs - 1 :
    _l = []
    _pre = []
    _acc = []
    for index, case in enumerate(valid_set):

        label = Y_valid_tensors[index]
        one_hot = to_one_hot[label]
        
        outputs = lstm(X_valid_tensors[index]) #forward pass
        step_loss = criterion(torch.unsqueeze(outputs,0), torch.unsqueeze(torch.FloatTensor(one_hot),0).to(device))
        _l.append(step_loss.item())
         
        outputs = outputs.softmax(dim = 0)
        prediction = torch.argmax(outputs)
        
        _pre.append(prediction)
    
    _acc = accuracy(torch.tensor(_pre), torch.tensor(Y_valid_tensors)) 
    valid_acc.append(_acc.item())
    _loss = np.mean(_l)
    if  acc >= 0.8:
        torch.save(lstm.state_dict(), trainer_path + 'epoch_' + str(epoch) +
                   '_loss_' + str(_loss) + '_acc_' + str(_acc.item() * 100) + '%' + '.pt')
    
    valid_loss.append(_loss)
    best_valid_loss = min(valid_loss)
    
    if epoch == num_epochs - 1:
        print("         | valid loss: %1.5f, best: %1.5f | accuracy: %1.5f, best: %1.5f" % (loss, best_valid_loss, acc, max(valid_acc))) 
    else:
         print("         | valid loss: %1.5f, best: %1.5f | accuracy: %1.5f, best: %1.5f" % (loss, best_valid_loss, acc, max(valid_acc)))


  0%|          | 0/100 [00:00<?, ?it/s]

Epoch: 0 | train loss: 1.79638, best: 1.79638 | accuracy: 0.16500, best: 0.16500
         | valid loss: 1.79638, best: 1.78532 | accuracy: 0.16500, best: 0.20000
Epoch: 1 | train loss: 1.79562, best: 1.79562 | accuracy: 0.16500, best: 0.16500
         | valid loss: 1.79562, best: 1.78532 | accuracy: 0.16500, best: 0.22000
Epoch: 2 | train loss: 1.79480, best: 1.79480 | accuracy: 0.16500, best: 0.16500
         | valid loss: 1.79480, best: 1.78532 | accuracy: 0.16500, best: 0.22000
Epoch: 3 | train loss: 1.79453, best: 1.79453 | accuracy: 0.15500, best: 0.16500
         | valid loss: 1.79453, best: 1.78532 | accuracy: 0.15500, best: 0.22000
Epoch: 4 | train loss: 1.79373, best: 1.79373 | accuracy: 0.17500, best: 0.17500
         | valid loss: 1.79373, best: 1.78532 | accuracy: 0.17500, best: 0.22000
Epoch: 5 | train loss: 1.79319, best: 1.79319 | accuracy: 0.19500, best: 0.19500
         | valid loss: 1.79319, best: 1.78532 | accuracy: 0.19500, best: 0.22000
Epoch: 6 | train loss: 1.793

## Save Loss

In [None]:
path = 'D:/Workspace/Teng-Signal-Classification/output/'

csv_file = path + 'train_loss' + '.csv'
img_file = path + 'train_loss' + '.jpg'

epoch_scale = list(range(0, len(train_loss)))

plt.figure(figsize=(10, 6))
plt.plot(epoch_scale, train_loss, epoch_scale, valid_loss)
label = ['train loss', 'valid loss']
plt.legend(label, loc='upper right')
plt.savefig(path + 'loss.jpg')
plt.show()

plt.figure(figsize=(10, 6))
plt.plot(epoch_scale, train_acc, epoch_scale, valid_acc)
label = ['train acc', 'valid acc']
plt.legend(label, loc='upper right')
plt.savefig(path + 'acc.jpg')
plt.show()

Data = {
    'train_loss': [],
    'valid_loss': [],
    'train_acc': [],
    'valid_acc': [],

}

Data['train_loss'] = train_loss
Data['valid_loss'] = valid_loss
Data['train_acc'] = train_acc
Data['valid_acc'] = valid_acc

savedata = pd.DataFrame(data=Data)
savedata.to_csv(path + 'loss&acc.csv')