In [5]:
# -*- coding: utf-8 -*-
"""
19:28:32, Thur, 30 March,  2023

By Gr

Edition time: 2023:4.5
"""

import numpy as np
import numpy.random as rnd
import torch





# setting up GPU
device = 'cuda' if torch.cuda.is_available() else 'cpu'

# load the data from given dataset


print(torch.__version__)


print(torch.cuda.device_count())

2.0.0+cu118
1


In [6]:
#Define funtion that Get data and create train and test dataset
from torch.utils.data import DataLoader,random_split
class myDataset():
    def __init__(self,data_dir,label_dir):
    #data structure is 100x256xN
    #Label structure is N(0~7)
        data = np.load(data_dir)
        label = np.load(label_dir)

    # now convert integer to multinomial rerepsentation for classfication
    # one-hot code
    
        label_multi = np.zeros((label.shape[0],7))
        for i in range(label.shape[0]):
            tmp = round(label[i])-1
            label_multi[i,tmp]=1

        self.data = torch.tensor(data).float()
        self.label = torch.tensor(label_multi).float()
        
        del label_multi,data
    
    def __len__(self):
        return len(self.label)
    
    def __getitem__(self, index):
        return self.data[:,:,index],self.label[index,:]

def get_dataloader(data_dir, label_dir, batch_size, n_workers, train_len):
#In this lab, every feature has same length, so don't need padding
    dataset = myDataset(data_dir,label_dir)
    #for example, train_len = 0.8 means 80% of data will be assigned to training set
    trainlen = int(train_len * len(dataset))
    lengths = [trainlen, len(dataset)-trainlen]
    #split trainset and validset
    trainset, validset = random_split(dataset,lengths)

    train_loader = DataLoader(
        trainset,
        batch_size=batch_size,
        shuffle=True,
        drop_last=True,
        num_workers=n_workers,
        pin_memory=True,
  )
    valid_loader = DataLoader(
        validset,
        batch_size=batch_size,
        num_workers=n_workers,
        drop_last=True,
        pin_memory=True,
  )
    return train_loader, valid_loader

In [7]:
from torch import torch
import torch.nn as nn
        
class TransformerClassifier(nn.Module):
    def __init__(self, d_model, n_class, dropout):
        super().__init__()
        #input = [Batch_Size, Signal_length, Channel_nums] = 200 * 100 * 256
        #Transform the input become B * 1 * 100 * 256   (B = Batch_size = 200)
        self.Conv1 = nn.Sequential(
            nn.Conv2d(in_channels=1,out_channels=14,kernel_size=3,stride=1,padding=1),  # 1*100*256 => 8*100*256
            #nn.Conv2d(in_channels=14,out_channels=14,kernel_size=3,stride=1,padding=1), #8*100*256 => 16*100*256
            nn.ELU(), #Swish
            nn.AvgPool2d(kernel_size=2,stride=2),   #16*50*128
            nn.Dropout(0.5)
        )
 
        
        #For selfencoder layer, input = [Batch_size, length, 256] (label 256 * 7 )
        self.encoder_layer = nn.TransformerEncoderLayer(
            d_model = d_model, dim_feedforward = 256, nhead = 8, 
            dropout = dropout
        )
        self.encoder = nn.TransformerEncoder(self.encoder_layer, num_layers=1)

        #Decoder Part need a Memory as an input
        '''
        self.decoder_layer = nn.TransformerDecoderLayer(
            d_model = d_model, nhead = 8
        )
        '''
        
        self.pred_layer = nn.Sequential(
            #nn.Linear(d_model, d_model),
            #nn.ReLU(),
            nn.Linear(d_model, n_class),
        )
    
    
    
    def forward(self, mels):
        """
        args:
            mels: (batch size, length, 256)
            
        """
        
        input = mels.unsqueeze(1) # => (batch size, 1, length, 256)
        x = self.Conv1(input)
        x = x.reshape(x.shape[0],-1,256) # => (batch_size, 256, signal_length)
        #Transform mels: (batch size, length, 256)
        #x = x.transpose(1,2)

        x = self.encoder(x)
        stats = x.mean(dim=1)
        

        # out: (batch, n_spks)
        out = self.pred_layer(stats)
        return out

In [8]:
#Define train function
from tqdm import tqdm
def train(model, train_loader, optimizer, criterion, device):
    model.train()
    total_loss = 0
    total_correct = 0

    train_iter = iter(train_loader)
    
    #pbar = tqdm(total = 199, ncols = 100 )
    #Looping all the train_loader
    '''
    #There is also an alternative for you to write

    for mel, label in range emunerate(train_loader):
        optimzer.zero_grad()
    '''
    
    for i in range(len(train_iter)):
        mel, label = next(train_iter)
        mel, label = mel.to(device), label.to(device)
        optimizer.zero_grad()
        output = model(mel)
        loss = criterion(output,label)
        loss.backward()
        optimizer.step()

        total_loss += loss.item() * mel.shape[0]
        total_correct += (output.argmax(dim=1) == label.argmax(dim=1)).sum().item()

        '''
        pbar.update()
        pbar.set_postfix(
            loss = f"{loss.item() * mel.shape[0]/len(mel):.2f}",
            accuracy = f"{(output.argmax(dim=1) == label.argmax(dim=1)).sum().item()/len(mel):.2f}",
        )
        '''    
            
        #if (i%10 == 0):
            #print(f'Iteration = {i},Accuracy = {(output.argmax(dim=1) == label.argmax(dim=1)).sum().item()/ len(label)}')
    #pbar.close()
    print(f'Training_loss = {total_loss / len(train_loader.dataset):.4f}, Training_Accuracy = {total_correct / len(train_loader.dataset) * 100 :.2f}%',end = ' ')
    return total_loss / len(train_loader.dataset), total_correct / len(train_loader.dataset)

In [9]:
#Define valid function, return loss, accuracy, best_loss
def valid(model,valid_loader,criterion,save_path):
    model.eval()
    total_loss = 0
    total_correct = 0
    #Save best model    
    best_loss = 2


    valid_iter = iter(valid_loader)
    for i in range(len(valid_iter)):
        with torch.no_grad():
            mel, label = next(valid_iter)
            mel, label = mel.to(device), label.to(device)
            output = model(mel)
            loss = criterion(output,label)

            if (loss < best_loss):
                best_loss = loss
                torch.save(model.state_dict(), save_path)
                
        total_loss += loss.item() * mel.shape[0]
        total_correct += (output.argmax(dim=1) == label.argmax(dim=1)).sum().item()
    model.train()

    print(f'Valid_loss = {total_loss / len(valid_loader.dataset):.4f}, Valid_Accuracy = {total_correct / len(valid_loader.dataset) * 100 :.2f}%')
    return total_loss / len(valid_loader.dataset), total_correct / len(valid_loader.dataset), best_loss

In [10]:
#Load data, using dataloader spilt into training set and test set(validation set)
# mel, label = batch (N, 100, 256)
data_dir = 'data/condn_data_new.npy'  #TODO condn_data_new.npy path
label_dir = 'data/Y.npy'              #TODO Y.npy path
batch_size = 200
n_workers = 0
train_len = 0.8
train_loader, valid_loader = get_dataloader(data_dir, label_dir, batch_size, n_workers, train_len)

In [11]:
from torch import torch
import torch.nn as nn
class ConvModule(nn.Module):
    def __init__(self):
        super().__init__()
        #input = [Batch_Size, Signal_length, Channel_nums] = 200 * 100 * 256
        #Transform the input become B * 1 * 100 * 256   (B = Batch_size = 200)
        self.Conv1 = nn.Sequential(
            nn.Conv2d(in_channels=1,out_channels=8,kernel_size=3,stride=1,padding=1),  # 1*100*256 => 8*100*256
            nn.ReLU(), #Swish
            nn.MaxPool2d(kernel_size=2,stride=2),   #8*50*128
        )
        self.Conv2 = nn.Sequential(
            nn.Conv2d(8,out_channels=16,kernel_size=3,stride=1,padding=1), #8*50*128 => 16*50*128
            nn.ReLU(), #Swish
            nn.AvgPool2d(kernel_size=2,stride=2), #16*25*64
        )
    def forward(self, mels):
        """
        args:
            mels: (batch size, length, 256)
            
        """
        input = mels.unsqueeze(1) # => (batch size, 1, length, 256)
        x = self.Conv1(input)
        x = self.Conv2(x) # =>(batch_size,16, 25, 64)

        return x

In [12]:
#Transformer model
data_dir = 'data/condn_data_new.npy'  #TODO condn_data_new.npy path
label_dir = 'data/Y.npy'              #TODO Y.npy path
weight_path = 'model/conformer/save.pt'
batch_size = 200
n_workers = 0
train_len = 0.8
d_model = 256
n_class = 7
dropout = 0.3
lr = 1e-3
num_epochs = 100
in_channel = 1
model = TransformerClassifier(d_model,n_class,dropout)
model = model.to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=lr)
criterion = torch.nn.CrossEntropyLoss()



In [13]:
#Training
num_epochs = 100
train_loss = []
train_accuracy = []
valid_loss = []
valid_accuracy = []
for epoch in range(num_epochs):
    loss,accuracy = train(model,train_loader,optimizer,criterion,device)
    train_loss.append(loss), train_accuracy.append(accuracy)
    loss,accuracy,best_loss = valid(model,valid_loader,criterion,weight_path)
    valid_loss.append(loss), valid_accuracy.append(accuracy)


Training_loss = 1.6964, Training_Accuracy = 28.94% Valid_loss = 1.5915, Valid_Accuracy = 35.94%
Training_loss = 1.3491, Training_Accuracy = 46.85% Valid_loss = 1.2878, Valid_Accuracy = 50.18%
Training_loss = 1.1896, Training_Accuracy = 54.64% Valid_loss = 1.2454, Valid_Accuracy = 53.75%
Training_loss = 1.0777, Training_Accuracy = 59.86% Valid_loss = 1.1371, Valid_Accuracy = 57.95%
Training_loss = 1.0229, Training_Accuracy = 61.82% Valid_loss = 1.0573, Valid_Accuracy = 60.97%
Training_loss = 0.9808, Training_Accuracy = 63.71% Valid_loss = 1.1183, Valid_Accuracy = 61.51%
Training_loss = 0.9519, Training_Accuracy = 65.19% Valid_loss = 1.0641, Valid_Accuracy = 62.27%
Training_loss = 0.9049, Training_Accuracy = 67.09% Valid_loss = 1.0265, Valid_Accuracy = 63.75%
Training_loss = 0.8883, Training_Accuracy = 67.41% Valid_loss = 1.2339, Valid_Accuracy = 60.40%
Training_loss = 0.8487, Training_Accuracy = 69.26% Valid_loss = 0.9845, Valid_Accuracy = 66.39%
Training_loss = 0.8219, Training_Accurac

KeyboardInterrupt: 

In [None]:
#Load model dict
model = TransformerClassifier(d_model,n_class,dropout)
model.load_state_dict(torch.load(weight_path))
model.eval()

In [None]:
#valid(model,valid_loader,criterion)
import matplotlib.pyplot as plt
plt.figure()
plt.plot(train_loss)
plt.plot(valid_loss)


In [None]:
plt.figure()
plt.plot(train_accuracy)
plt.plot(valid_accuracy)

In [None]:
#Save to Excel
import numpy as np
import pandas as pd
def  writeto_excel(train_loss, train_accuracy,valid_loss,valid_accuracy):
    '''
    ##Loss: List, consist of num_epochs results
    ##Accuracy: List, consist of num_epochs results
    To be noted, results would be consist model's result from scratch
    '''
    #Confer list to array for pandas process
    data = []
    for i in range(len(train_loss)): 
        data.append([train_loss[i], train_accuracy[i],
                     valid_loss[i], valid_accuracy[i]])
    data = np.array(data)
    data = pd.DataFrame(data)
    writer = pd.ExcelWriter('Conformer_Result.xlsx')
    data.to_excel(writer, 'page_1', float_format='%.2f')
    writer.save()

    print('The Loading Process has been done')
    writer.close()
writeto_excel(train_loss,train_accuracy,valid_loss,valid_accuracy)
    