## import

# Model Setting

In [None]:
# Import necessary packages
import numpy as np
import os
import pandas as pd
# Import for read data function
from scipy import io
from scipy.io import loadmat
from collections import OrderedDict

import random

## model structure

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class Model(nn.Module):
    def __init__(self,numNeurons=16):
        super(Model, self).__init__()

        # self.conv1 = nn.Conv2d(1,3,3,stride=2,padding=0)
        # self.bn2d1 = nn.BatchNorm2d(3)

        self.fc1 = nn.Linear(160, numNeurons*8).double()
        self.fc2 = nn.Linear(numNeurons*8, numNeurons*8).double()
        self.fc3 = nn.Linear(numNeurons*8, numNeurons*4).double()
        self.fc4 = nn.Linear(numNeurons*4, numNeurons*2).double()
        self.fc5 = nn.Linear(numNeurons*2, 4).double()
        self.dropout=nn.Dropout(p=0.25)

        # self.fc1 = nn.Linear(164, numNeurons*2)
        # # self.fc2 = nn.Linear(numNeurons*8, numNeurons*4)
        # self.fc3 = nn.Linear(numNeurons*2, numNeurons)
        # self.fc4 = nn.Linear(numNeurons, 6)
        # self.dropout=nn.Dropout(p=0.25)

    def forward(self, x):
        # x = self.conv1(x)
        # x=self.bn2d1(x)
        # x=F.relu6(x)

        # x=x.view(x.size()[0],-1) # flatten the activation
        x=self.fc1(x)
        x=F.relu(x)
        x = self.dropout(x)

        x=self.fc2(x)
        x=F.relu(x)
        x = self.dropout(x)

        x=self.fc3(x)
        x=F.relu(x)

        x=self.fc4(x)
        x=F.relu(x)

        x=self.fc5(x)

        return x

## connect to dirve

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


## set GPU

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

## set hyperparameter

In [None]:
epochs = 200
lr = 0.001
lossFunc=nn.CrossEntropyLoss()

In [None]:
# remove all files
# os.chdir('/content')
# !ls
# !rm -rf train
# !ls

# Data Preprocessing

## load labels

In [None]:
path = "/content/"
os.chdir(path)
excel_file_path = 'drive/MyDrive/label.xlsx'

df = pd.read_excel(excel_file_path)
label_row = df.iloc[:6876, 0]
labels = label_row.to_numpy()

labels = labels - 1

print(labels)
print(len(labels))

[1 5 5 ... 0 0 0]
6875


## load .mat file

In [None]:
path = "/content/drive/MyDrive/train/"

os.chdir(path)
files = os.listdir(path)
files = [ff for ff in files if '.mat' in ff]

def extract_number(filename):
    return int(filename.split()[0])

files = sorted(files, key=extract_number)

print(files)

['01 (1~45).mat', '02 (46~88).mat', '03 (89~132).mat', '04 (133~189).mat', '05 (190~241).mat', '06 (242~277).mat', '07 (278~308).mat', '08 (309~333).mat', '09 (334~364).mat', '10 (365~404).mat', '11 (405~435).mat', '12 (436~475).mat', '13 (476~509).mat', '14 (510~551).mat', '15 (552~587).mat', '16 (588~625).mat', '17 (626~670).mat', '18 (671~710).mat', '19 (711~740).mat', '20 (741~774).mat', '21 (775~824).mat', '22 (825~863).mat', '23 (864~906).mat', '24 (907~948).mat', '25 (949~1000).mat', '26 (1001~1033).mat', '27 (1034~1058).mat', '28 (1059~1122).mat', '29 (1123~1205).mat', '30 (1206~1250).mat', '31 (1251~1293).mat', '32 (1294~1330).mat', '33 (1331~1372).mat', '34 (1373~1433).mat', '35 (1434~1468).mat', '36 (1469~1516).mat', '37 (1517~1556).mat', '38 (1557~1594).mat', '39 (1595~1626).mat', '40 (1627~1654).mat', '41 (1655~1690).mat', '42 (1691~1730).mat', '43 (1731~1765).mat', '44 (1766~1800).mat', '45 (1801~1831).mat', '46 (1832~1853).mat', '47 (1854~1876).mat', '48 (1877~1901).mat'

In [None]:
Data = []

for i in files:
  f = loadmat(i)
  data = f['cellList']['meshData']
  cells_file = data[0][0][0][0][0]
  # print(len(cells_file))
  for j in range (len(cells_file)):
    Base = cells_file[j][0][0]
    Data.append(Base['signal1'])
    Data.append(Base['signal2'])

# print(len(Data))
# print(Data[0])
# print(Data[1])
print(Data[0].dtype)

float32


## data preprocessing

expand length to 164

In [None]:
def extrapo(s, extra_length):
  desired_length = 80
  new_vector = np.zeros(desired_length)

  cnt = 0
  for i in range(len(s)-1):
    cnt = i
    if i == extra_length:
      break
    new_vector[2 * i] = s[i]
    new_vector[2 * i + 1] = (s[i] + s[i+1]) / 2

  cnt_s = cnt
  cnt = cnt * 2
  nokori_len = len(s) - cnt_s
  for i in range(nokori_len):
    new_vector[cnt] = s[cnt_s]
    cnt += 1
    cnt_s += 1

  for i in range(desired_length-cnt):
    new_vector[cnt+i] = s[cnt_s-1]

  return new_vector

In [None]:
def interpo(s):
  desired_length = 80

  remove_length = len(s) - desired_length
  indices_to_remove = random.sample(range(len(s)), remove_length)

  new_vector = [value for index, value in enumerate(s) if index not in indices_to_remove]

  # print(len(new_vector))
  for i in range(len(new_vector)):
    new_vector[i] = new_vector[i]*2 / 2

  return new_vector

concate & normalize

In [None]:
modified_Data = []
additional_Data = []
indexes = []

for i in range(int(len(Data)/2)):
  temp1 = [element for sublist in Data[i*2] for element in sublist]
  temp1 = np.array(temp1)
  # normalization
  if max(temp1) != 0:
    temp1 = 255*temp1/max(temp1)
  if len(temp1) < 3:
    indexes.append(i)
  if len(temp1) < 80:
    temp1 = extrapo(temp1, (80-len(temp1)))
  elif len(temp1) >= 80:
    temp1 = interpo(temp1)
  temp3 = np.flip(temp1)

  temp2 = [element for sublist in Data[i*2+1] for element in sublist]
  temp2 = np.array(temp2)
  # normlization
  if max(temp2) != 0:
    temp2 = 255*temp2/max(temp2)
  if len(temp2) < 80:
    temp2 = extrapo(temp2, (80-len(temp2)))
  elif len(temp2) >= 80:
    temp2 = interpo(temp2)
  temp4 = np.flip(temp2)

  temp5 = np.concatenate((temp1, temp2), axis = 0)
  modified_Data.append(temp5)

  temp6 = np.concatenate((temp3, temp4), axis = 0)
  additional_Data.append(temp6)

# print(modified_Data[4])
print(len(modified_Data))
print(indexes)
print(modified_Data[0].dtype)

6875
[241, 3091, 5587, 6432]
float64


remove some bad data
- length < 3
- label = 7

In [None]:
for i in range(len(modified_Data)):
  if labels[i] == 6:
    indexes.append(i)

pruned_modified_Data = []
modified_additional_Data = []
for i in range(len(modified_Data)):
  if i not in indexes:
    pruned_modified_Data.append(modified_Data[i])
for i in range(len(additional_Data)):
  if i not in indexes:
    modified_additional_Data.append(additional_Data[i])
pruned_labels = np.delete(labels, indexes)
# temp = np.copy(pruned_labels)
# pruned_labels = np.concatenate((pruned_labels, temp), axis=0)

print(len(indexes))
print(len(pruned_modified_Data))
print(len(pruned_labels))
print(indexes)
# print(pruned_labels)
# print(pruned_modified_Data[0])

4
6871
6871
[241, 3091, 5587, 6432]


In [None]:
combined_data = list(zip(pruned_modified_Data, modified_additional_Data, pruned_labels))
np.random.shuffle(combined_data)
shuffled_data, shuffled_additional_data, shuffled_labels = zip(*combined_data)

print(len(shuffled_data))
# print(shuffled_data[0])
# print(shuffled_data[1])
# print(Data[0].dtype)
print(len(shuffled_labels))

6871
6871


## build dataset

In [None]:
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader
from torch.utils.data.dataset import random_split

class MyDataset(Dataset):
    def __init__(self, data, labels):
        self.data = data
        self.labels = labels

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data[idx], self.labels[idx]

## partitioning dataset

size of each set:
- training set: 0.8
- validation set: 0.1
- testing set: 0.1

In [None]:
total_data = len(shuffled_data)
train_size = int(0.8 * total_data)
val_size = int(0.1 * total_data)
test_size = total_data - train_size - val_size

training_data = shuffled_data[:train_size]
training_label = shuffled_labels[:train_size]
val_data = shuffled_data[train_size:train_size+val_size]
val_label = shuffled_labels[train_size:train_size+val_size]
test_data = shuffled_data[train_size+val_size:]
test_label = shuffled_labels[train_size+val_size:]

# dataset = MyDataset(pruned_modified_Data, pruned_labels)

# train_set, val_set, test_set = random_split(dataset, [train_size, val_size, test_size])

# print(test_data[0].dtype)
# print(len(train_set))
# print(train_set.indices)
print(len(training_data))
print(len(training_label))

5496
5496


Data augmentaion

In [None]:
# flip
addition = []
for i in range(train_size):
  addition.append(shuffled_additional_data[i])
addition = np.array(addition)
training_data = np.concatenate((training_data, addition), axis=0)
temp = training_label[:train_size]
training_label = np.concatenate((training_label, temp), axis=0)

print(len(training_data))
print(len(training_label))

# # double
# temp_d = [sublist*1.5 for sublist in training_data]
# for i in range(len(temp_d)):
#   training_data.append(temp_d[i])
# temp = np.copy(training_label)
# training_label = np.concatenate((training_label, temp), axis=0)

# noise
noise_lower = 0.5
noise_upper = 2
augmented_data = [sublist + np.random.uniform(noise_lower, noise_upper, size=len(sublist)) for sublist in training_data]
for i in range(len(augmented_data)):
  if max(augmented_data[i]) != 0:
    augmented_data[i] = 255 * augmented_data[i] / max(augmented_data[i])
# training_data = np.vstack([training_data, augmented_data])
training_data = np.concatenate((training_data, augmented_data), axis=0)
temp = np.copy(training_label)
training_label = np.concatenate((training_label, temp), axis=0)

print(len(training_data))
print(len(training_label))
# print(training_data[0])

# augmented_data = [sublist + np.random.uniform(noise_lower, noise_upper, size=len(sublist)) for sublist in training_data]
# for i in range(len(augmented_data)):
#   if max(augmented_data[i]) != 0:
#     augmented_data[i] = 255 * augmented_data[i] / max(augmented_data[i])
# training_data = np.vstack([training_data, augmented_data])
# temp = np.copy(training_label)
# training_label = np.concatenate((training_label, temp), axis=0)

# print(len(training_data))
# print(len(training_label))
# print(training_data[0])

10992
10992
21984
21984


In [None]:
val_label = np.array(val_label)
test_label = np.array(test_label)

train_data_tensor = [torch.from_numpy(sample) for sample in training_data]
train_label_tensor = torch.from_numpy(training_label)
val_data_tensor = [torch.from_numpy(sample) for sample in val_data]
val_label_tensor = torch.from_numpy(val_label)
test_data_tensor = [torch.from_numpy(sample) for sample in test_data]
test_label_tensor = torch.from_numpy(test_label)

train_set = MyDataset(train_data_tensor, train_label_tensor)
val_set = MyDataset(val_data_tensor, val_label_tensor)
test_set = MyDataset(test_data_tensor, test_label_tensor)

In [None]:
from torch.utils.data import random_split

batch_size = 64

# construct the three data loader
train_loader = torch.utils.data.DataLoader(train_set, batch_size=batch_size, shuffle=True)
valid_loader = torch.utils.data.DataLoader(val_set, batch_size=batch_size, shuffle=False)
test_loader = torch.utils.data.DataLoader(test_set, batch_size=batch_size, shuffle=False)

# Training

In [None]:
model = Model().to(device)
modelSaveName = "Model.pt"
optimizer = torch.optim.Adam(model.parameters(), lr=lr)
os.chdir("/content")
l2_lambda = 0.01

bestValidAcc=0
for epoch in range(1,epochs+1):
    #model.train()
    running_loss = 0.0
    for step,(x,y) in enumerate(train_loader):
        x,y=x.to(device),y.to(device)
        optimizer.zero_grad() # clear the gradient in the previous round
        output=model(x) # run the forward pass of the model
        loss=lossFunc(output,y) # compute the loss
        # l2_regularization_loss = 0
        # for param in model.parameters():
        #   l2_regularization_loss += torch.norm(param, p=2) ** 2
        # loss += 0.5 * l2_lambda * l2_regularization_loss
        loss.backward() # compute the gradient of the loss
        optimizer.step() # update the weights
        running_loss += loss.item()
    avg_loss = running_loss / len(train_loader)
    # print(f'Epoch [{epoch}/{epochs}], Loss: {avg_loss}')
    if epoch%5==0:
        model.eval() # switch the model to evaluation mode
        with torch.no_grad():
            correctCount=0
            for x,y in train_loader:
                x,y=x.to(device),y.to(device)
                # print(x.dtype)
                pred=model(x).max(1)[1] # argmax
                correctCount+=torch.sum(pred==y).item()
            trainAcc=correctCount/len(train_loader.dataset)
            correctCount=0
            for x,y in valid_loader:
                x,y=x.to(device),y.to(device)
                pred=model(x).max(1)[1]# argmax
                correctCount+=torch.sum(pred==y).item()
            validAcc=correctCount/len(valid_loader.dataset)
        model.train()# switch the model to training mode
        print("epoch:{}, train acc:{}, valid acc:{}".format(epoch,trainAcc,validAcc))
        if validAcc>bestValidAcc:
            bestValidAcc=validAcc
            torch.save(model.state_dict(),"drive/MyDrive/"+modelSaveName) # save the model to the desired location
print("best valid acc:",bestValidAcc)

epoch:5, train acc:0.8677674672489083, valid acc:0.8529839883551674
epoch:10, train acc:0.8771834061135371, valid acc:0.8573508005822417
epoch:15, train acc:0.8938318777292577, valid acc:0.8748180494905385
epoch:20, train acc:0.8990629548762736, valid acc:0.868995633187773
epoch:25, train acc:0.9072052401746725, valid acc:0.8850072780203785
epoch:30, train acc:0.9068868267831149, valid acc:0.8748180494905385
epoch:35, train acc:0.9120724163027657, valid acc:0.8602620087336245
epoch:40, train acc:0.9047943959243085, valid acc:0.8544395924308588
epoch:45, train acc:0.9147106986899564, valid acc:0.8544395924308588
epoch:50, train acc:0.925900655021834, valid acc:0.8733624454148472
epoch:55, train acc:0.9259916302765647, valid acc:0.8850072780203785
epoch:60, train acc:0.9338609898107715, valid acc:0.8777292576419214
epoch:65, train acc:0.933542576419214, valid acc:0.8777292576419214
epoch:70, train acc:0.9382732896652111, valid acc:0.8733624454148472
epoch:75, train acc:0.9356804949053857

# Evaluate

In [None]:
model = Model().to(device)
model.load_state_dict(torch.load("drive/MyDrive/Model.pt")) # load the model with the highest valid accuracy
model.eval()
correctCount=0
class_correct = [0 for _ in range(6)]
class_total = [0 for _ in range(6)]
for x,y in test_loader:
    x,y=x.to(device),y.to(device)
    pred=model(x).max(1)[1]
    correctCount+=torch.sum(pred==y).item()
    for i in range(6):
        class_correct[i] += torch.sum((pred == i) & (y == i)).item()
        class_total[i] += torch.sum(y == i).item()

testAcc=correctCount/len(test_loader.dataset)
print("Testing accuracy of the model: ",testAcc)

for i in range(6):
    class_acc = class_correct[i] / class_total[i] if class_total[i] != 0 else 0
    print(f"Class {i+1} accuracy: {class_correct[i]} / {class_total[i]} = {class_acc}")

Testing accuracy of the model:  0.8633720930232558
Class 1 accuracy: 130 / 145 = 0.896551724137931
Class 2 accuracy: 129 / 138 = 0.9347826086956522
Class 3 accuracy: 169 / 180 = 0.9388888888888889
Class 4 accuracy: 75 / 92 = 0.8152173913043478
Class 5 accuracy: 59 / 69 = 0.855072463768116
Class 6 accuracy: 32 / 64 = 0.5
