In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
cd drive/MyDrive/C247-NNDL/

# IMPORTS

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import math
from tqdm import tqdm

from matplotlib.collections import LineCollection

from sklearn.metrics import accuracy_score

%pylab inline

import torch
import torch.nn as nn
import torchvision
import torchvision.models as models
import torchvision.transforms as T
import torch.nn.functional as F

from torch.utils.data import Dataset, DataLoader
from torch.utils.tensorboard import SummaryWriter

writer = SummaryWriter(log_dir="Results/runs")
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

# DATA PREPROCESSING AND RESHAPING

In [None]:
## Loading the numpy arrays corresponding to the EEG dataset

X_test = np.load("./Dataset/X_test.npy")
y_test = np.load("./Dataset/y_test.npy")
person_train_valid = np.load("./Dataset/person_train_valid.npy")
X_train_valid = np.load("./Dataset/X_train_valid.npy")
y_train_valid = np.load("./Dataset/y_train_valid.npy")
person_test = np.load("./Dataset/person_test.npy")

## Printing the shapes of the numpy arrays

print ('Training/Valid data shape: {}'.format(X_train_valid.shape))
print ('Test data shape: {}'.format(X_test.shape))
print ('Training/Valid target shape: {}'.format(y_train_valid.shape))
print ('Test target shape: {}'.format(y_test.shape))
print ('Person train/valid shape: {}'.format(person_train_valid.shape))
print ('Person test shape: {}'.format(person_test.shape))

In [None]:
## Adjusting the labels to {0,1,2,3}

# Cue onset left - 0
# Cue onset right - 1
# Cue onset foot - 2
# Cue onset tongue - 3

y_train_valid -= 769
y_test -= 769

In [None]:
print(np.unique(y_train_valid, return_counts=True))

In [None]:
## Creating the training and validation sets

# First generating the training and validation indices using random splitting
# ind_valid = np.random.choice(2115, 100, replace=False)
# ind_train = np.array(list(set(range(2115)).difference(set(ind_valid))))
# 
# # Creating the training and validation sets using the generated indices
# (x_train, x_valid) = X_train_valid[ind_train], X_train_valid[ind_valid] 
# (y_train, y_valid) = y_train_valid[ind_train], y_train_valid[ind_valid]
# print('Shape of training set:',x_train.shape)
# print('Shape of validation set:',x_valid.shape)
# print('Shape of training labels:',y_train.shape)
# print('Shape of validation labels:',y_valid.shape)

# DATA PREPROCESSING AND AUGMENTATION

In [None]:
## Visualizing the data

ch_data = X_train_valid[:,8,:]


class_0_ind = np.where(y_train_valid == 0)
ch_data_class_0 = ch_data[class_0_ind]
avg_ch_data_class_0 = np.mean(ch_data_class_0,axis=0)


class_1_ind = np.where(y_train_valid == 1)
ch_data_class_1 = ch_data[class_1_ind]
avg_ch_data_class_1 = np.mean(ch_data_class_1,axis=0)

class_2_ind = np.where(y_train_valid == 2)
ch_data_class_2 = ch_data[class_2_ind]
avg_ch_data_class_2 = np.mean(ch_data_class_2,axis=0)

class_3_ind = np.where(y_train_valid == 3)
ch_data_class_3 = ch_data[class_3_ind]
avg_ch_data_class_3 = np.mean(ch_data_class_3,axis=0)


plt.plot(np.arange(1000),avg_ch_data_class_0)
plt.plot(np.arange(1000),avg_ch_data_class_1)
plt.plot(np.arange(1000),avg_ch_data_class_2)
plt.plot(np.arange(1000),avg_ch_data_class_3)
plt.axvline(x=500, label='line at t=500',c='cyan')

plt.legend(["Cue Onset left", "Cue Onset right", "Cue onset foot", "Cue onset tongue"])

In [None]:
def data_prep(X,y,sub_sample,average,noise):
    
    total_X = None
    total_y = None
    
    # Trimming the data (sample,22,1000) -> (sample,22,500)
    X = X[:,:,0:500]
    print('Shape of X after trimming:',X.shape)
    
    # Maxpooling the data (sample,22,1000) -> (sample,22,500/sub_sample)
    X_max = np.max(X.reshape(X.shape[0], X.shape[1], -1, sub_sample), axis=3)
    
    
    total_X = X_max
    total_y = y
    print('Shape of X after maxpooling:',total_X.shape)
    
    # Averaging + noise 
    X_average = np.mean(X.reshape(X.shape[0], X.shape[1], -1, average),axis=3)
    X_average = X_average + np.random.normal(0.0, 0.5, X_average.shape)
    
    total_X = np.vstack((total_X, X_average))
    total_y = np.hstack((total_y, y))
    print('Shape of X after averaging+noise and concatenating:',total_X.shape)
    
    # Subsampling
    
    for i in range(sub_sample):
        
        X_subsample = X[:, :, i::sub_sample] + \
                            (np.random.normal(0.0, 0.5, X[:, :,i::sub_sample].shape) if noise else 0.0)
            
        total_X = np.vstack((total_X, X_subsample))
        total_y = np.hstack((total_y, y))
        
    
    print('Shape of X after subsampling and concatenating:',total_X.shape)
    return total_X,total_y

In [None]:
## Preprocessing the dataset

X_train_valid_prep,y_train_valid_prep = data_prep(X_train_valid,y_train_valid,2,2,True)
X_test_prep,y_test_prep = data_prep(X_test,y_test,2,2,True)

print(X_train_valid_prep.shape)
print(y_train_valid_prep.shape)
print(X_test_prep.shape)
print(y_test_prep.shape)

In [None]:
## Creating the training and validation sets

# First generating the training and validation indices using random splitting
ind_valid = np.random.choice(8460, 1500, replace=False)
ind_train = np.array(list(set(range(8460)).difference(set(ind_valid))))

# Creating the training and validation sets using the generated indices
(x_train, x_valid) = X_train_valid_prep[ind_train], X_train_valid_prep[ind_valid] 
(y_train, y_valid) = y_train_valid_prep[ind_train], y_train_valid_prep[ind_valid]
print('Shape of training set:',x_train.shape)
print('Shape of validation set:',x_valid.shape)
print('Shape of training labels:',y_train.shape)
print('Shape of validation labels:',y_valid.shape)

X_test = X_test_prep
y_test = y_test_prep

print('Shape of test set:',X_test.shape)
print('Shape of test labels:',y_test.shape)

# VISUALIZING A SAMPLE EEG DATA

In [None]:
# Taken from: https://notebook.community/joannekoong/neuroscience_tutorials/basic/1.%20Load%20EEG%20data%20and%20plot%20ERP

def plot_eeg(EEG, vspace=100, color='k'):   
    bases = vspace * np.arange(22)    
    EEG = EEG.T + bases
    
    # Calculate a timeline in seconds, assuming that the sample rate of the EEG recorder was 2048 Hz.
    samplerate = 2048
    time = arange(EEG.shape[0]) / samplerate
    
    # Plot EEG versus time
    plot(time, EEG, color=color)
    grid()
    xlabel('Time (s)')
    ylabel('Channels')
    gca().yaxis.set_ticks(bases)
    title('EEG data for a single sample')

In [None]:
figure(figsize=(15,22))
plot_eeg(X_train_valid[0])

# DATALOADER

In [None]:
x_train = np.swapaxes(x_train, 1, 2)
x_valid = np.swapaxes(x_valid, 1, 2)
x_test = np.swapaxes(X_test, 1, 2)

print('Shape of training set after dimension reshaping:',x_train.shape)
print('Shape of validation set after dimension reshaping:',x_valid.shape)
print('Shape of test set after dimension reshaping:',x_test.shape)

In [None]:
# Normalize each channel to have mean 0 and std 1
def standardize(x):
    mean = np.mean(x, axis=1)
    var = np.var(x, axis=1)

    return (x - mean[:, None]) / np.sqrt(var)[:, None]

x_train = standardize(x_train)
x_valid = standardize(x_valid)
x_test = standardize(x_test)

In [None]:
class LoadData(Dataset):
    def __init__(self, data, labels):
        self.data = torch.Tensor(data)
        tensor_labels = torch.Tensor(labels).long()
        self.labels = F.one_hot(tensor_labels, num_classes=4)

    def __len__(self) -> int:
        return len(self.data)
    
    def __getitem__(self, index: int):
        input_data = self.data[index]
        input_label = self.labels[index]
        return (input_data, input_label)

In [None]:
train_dataset = LoadData(data=x_train, labels=y_train,)
val_dataset = LoadData(data=x_valid, labels=y_valid,)
train_dl = DataLoader(train_dataset, batch_size=16, num_workers=1, pin_memory=True, shuffle=True)
val_dl = DataLoader(val_dataset, batch_size=16, num_workers=1, pin_memory=True)

data = next(iter(train_dl))
input_data, input_labels = data
print(input_data.size())

# Autoencoder

In [None]:
class EEGEncoder(nn.Module):
  def __init__(self, encoding_dim, chunk_len, n_channels):
    super().__init__()

    self.encoder_conv = nn.Sequential(
        nn.Conv2d(1, 4, 3, padding=1),
        nn.ReLU(),
        nn.BatchNorm2d(4),
        nn.Conv2d(4, 8, 3, padding=1),
        nn.ReLU(),
        nn.BatchNorm2d(8),
        nn.Conv2d(8, 16, 3, padding=0),
        nn.ReLU(),
    )

    self.flatten = nn.Flatten()

    self.encoder_lin = nn.Sequential(
        nn.Linear(2560, 128),
        nn.ReLU(),
        nn.Linear(128, encoding_dim)
    )

  def forward(self, x):
    x = self.encoder_conv(x)
    x = self.flatten(x)
    x = self.encoder_lin(x)
    return x

class EEGDecoder(nn.Module):
  def __init__(self, encoding_dim):
    super().__init__()
    self.decoder_lin = nn.Sequential(
        nn.Linear(encoding_dim, 128),
        nn.ReLU(),
        nn.Linear(128, 2560),
        nn.ReLU()
    )

    self.unflatten = nn.Unflatten(dim=1, unflattened_size=(16, 8, 20))

    self.decoder_conv = nn.Sequential(
        nn.ConvTranspose2d(16, 8, 3, padding=0),
        nn.BatchNorm2d(8),
        nn.ReLU(),
        nn.ConvTranspose2d(8, 4, 3, padding=1),
        nn.BatchNorm2d(4),
        nn.ReLU(),
        nn.ConvTranspose2d(4, 1, 3, padding=1),
    )
  
  def forward(self, x):
    x = self.decoder_lin(x)
    x = self.unflatten(x)
    x = self.decoder_conv(x)
    return x

class EEGAutoencoder(nn.Module):
  def __init__(self, encoding_dim, chunk_len, n_channels):
    super().__init__()
    self.encoder = EEGEncoder(encoding_dim, chunk_len, n_channels)
    self.decoder = EEGDecoder(encoding_dim)

  def forward(self, x):
    B, S, C = x.shape
    x = x.view(B, 1, S, C)
    x = self.encoder(x)
    x = self.decoder(x)
    x = x.view(B, S, C)
    return x

# Transformer/Attention

**Note**: Input data taking shape of (batch_size, length, num_channels)

In [None]:
class MultiheadAttention(nn.Module):
  def __init__(self, n_heads, hidden_size, dropout=0.1):
    super().__init__()
    self.n_heads = n_heads
    self.hidden_size = hidden_size
    self.dropout = dropout

    assert hidden_size % n_heads == 0, "Hidden size must be divisable by n_heads"
    self.attention_head_size = int(hidden_size / self.n_heads)
    self.all_head_size = self.n_heads * self.attention_head_size

    self.query = nn.Linear(hidden_size, self.all_head_size)
    self.key = nn.Linear(hidden_size, self.all_head_size)
    self.value = nn.Linear(hidden_size, self.all_head_size)

    self.out = nn.Linear(hidden_size, hidden_size)
    self.attn_dropout = nn.Dropout(dropout)
    self.proj_dropout = nn.Dropout(dropout)

    self.softmax = nn.Softmax(dim=-1)

  def transpose_for_scores(self, x):
    new_x_shape = x.size()[:-1] + (self.n_heads, self.attention_head_size)
    x = x.view(*new_x_shape)
    return x.permute(0, 2, 1, 3)

  def forward(self, x):
    # [Batch_size x Seq_length x Hidden_size]
    mixed_query_layer = self.query(x)
    mixed_key_layer = self.key(x)
    mixed_value_layer = self.value(x)

    # [Batch_size x Num_of_heads x Seq_length x Head_size]
    query_layer = self.transpose_for_scores(mixed_query_layer)
    key_layer = self.transpose_for_scores(mixed_key_layer)
    value_layer = self.transpose_for_scores(mixed_value_layer)

    scores = torch.matmul(query_layer, key_layer.transpose(-1, -2))
    scores = scores / np.sqrt(self.attention_head_size)
    probs = self.attn_dropout(self.softmax(scores))

    context_layer = torch.matmul(probs, value_layer)
    context_layer = context_layer.permute(0, 2, 1, 3).contiguous()
    new_context_layer_shape = context_layer.size()[:-2] + (self.all_head_size, )
    context_layer = context_layer.view(*new_context_layer_shape)
    out = self.proj_dropout(self.out(context_layer))
    return out

class MLP(nn.Module):
  def __init__(self, hidden_size, mlp_dim, dropout=0.1):
    super().__init__()
    self.fc1 = nn.Linear(hidden_size, mlp_dim)
    self.fc2 = nn.Linear(mlp_dim, hidden_size)
    self.act_fn = nn.ReLU()
    self.dropout = nn.Dropout(dropout)

    self._init_weights()

  def _init_weights(self):
    nn.init.xavier_uniform_(self.fc1.weight)
    nn.init.xavier_uniform_(self.fc2.weight)
    nn.init.normal_(self.fc1.bias, std=1e-6)
    nn.init.normal_(self.fc2.bias, std=1e-6)

  def forward(self, x):
    x = self.fc1(x)
    x = self.act_fn(x)
    x = self.dropout(x)
    x = self.fc2(x)
    x = self.dropout(x)
    return x

class EncoderBlock(nn.Module):
  def __init__(self, n_heads, hidden_size, dropout=0.1):
    super().__init__()
    self.hidden_size = hidden_size
    self.attn_norm = nn.LayerNorm(hidden_size)
    self.ffn_norm = nn.LayerNorm(hidden_size)
    self.ffn = MLP(hidden_size, hidden_size, dropout)
    self.attn = MultiheadAttention(n_heads, hidden_size, dropout)

  def forward(self, x):
    # Residual Self-Attention
    h = x
    x = self.attn(x)
    x = self.attn_norm(x)
    x += h

    # Residual Feed-forward
    h = x
    x = self.ffn(x)
    x = self.ffn_norm(x)
    x += h
    return x

class ClassifierBlock(nn.Module):
  def __init__(self, hidden_size, mlp_dim, dropout=0.1):
    super().__init__()
    self.fc1 = nn.Linear(hidden_size, mlp_dim)
    self.act_fn = nn.ReLU()
    self.fc2 = nn.Linear(mlp_dim, 4)
    self.softmax = nn.Softmax(dim=-1) 
    self.dropout = nn.Dropout(dropout)
    self.ln1 = nn.LayerNorm(hidden_size)
    self.ln2 = nn.LayerNorm(mlp_dim)

  def forward(self, x):
    x = self.ln1(x)
    x = self.fc1(x)
    x = self.act_fn(x)
    x = self.dropout(x)
    x = self.ln2(x)
    x = self.fc2(x)
    x = self.softmax(x)

    return x

class PositionalEncoding(nn.Module):
  def __init__(self, d_model, dropout=0.1, max_len=1000):
    super().__init__()
    self.dropout = nn.Dropout(dropout)

    position = torch.arange(max_len).unsqueeze(1)
    div_term = torch.exp(torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model))
    pe = torch.zeros(max_len, 1, d_model)
    pe[:, 0, 0::2] = torch.sin(position * div_term)
    pe[:, 0, 1::2] = torch.cos(position * div_term)
    self.register_buffer('pe', pe)

  def forward(self, x):
    x = x + self.pe[:x.size(0)]
    return self.dropout(x)


class TransformerClassifier(nn.Module):
  def __init__(self, input_size, n_heads, hidden_size, encoder, n_blocks=2, dropout=0.1, max_len=1000, patch_size=10):
    super().__init__()
    
    assert max_len % patch_size == 0, "sequence length must be divisible by patch size"

    self.patch_size = patch_size
    self.max_len = max_len
    self.hidden_size = hidden_size
    self.pos_encoder = PositionalEncoding(hidden_size, dropout=dropout, max_len=max_len // patch_size)
    self.encoder = encoder
    self.encoder.eval()
    self.encoder_blocks = nn.ModuleList([EncoderBlock(n_heads, hidden_size, dropout) for _ in range(n_blocks)])
    self.classifier = ClassifierBlock(hidden_size, hidden_size, dropout)

  def forward(self, x):
    B, S, C = x.shape

    # some dimensionality tweaking to do embeddings
    # input is (B, S, C)
    x = x.view(B * S // self.patch_size, 1, self.patch_size, C)
    x = self.encoder.encoder(x)
    x = x.view(B, S // self.patch_size, self.hidden_size)

    x = self.pos_encoder(x)

    for encoder_block in self.encoder_blocks:
      x = encoder_block(x)

    x = torch.mean(x, dim=1)
    probs = self.classifier(x)
    return probs


# Training

## Autoencoder

In [None]:
class AutoencoderTrainer:
    def __init__(self, epochs, batch_size, learning_rate, num_workers, train_data, valid_data, train_labels, valid_labels, patience, validate_after):
        self.epochs = epochs
        self.batch_size = batch_size
        self.learning_rate = learning_rate
        self.num_workers = num_workers
        self.train_data = train_data
        self.valid_data = valid_data
        self.train_labels = train_labels
        self.valid_labels = valid_labels
        self.patience = patience
        self.validate_after = validate_after
        self.chunk_len = 10

    def train(self):
        the_last_loss = 100
        trigger_times = 0
        global_val_loss_minima = 100
            
        train_dataset = LoadData(data=self.train_data, labels=self.train_labels)
        train_dataloader = DataLoader(train_dataset, batch_size=self.batch_size, num_workers=self.num_workers,pin_memory=True)
        
        # Change class
        model = EEGAutoencoder(128, self.chunk_len, 22).to(device)
        
        criterion = nn.MSELoss()
        optimizer = torch.optim.Adam(model.parameters(),lr=self.learning_rate, weight_decay=1e-6)
        
    
        for epoch in range(self.epochs):
            print("Starting Training Epoch " + str(epoch + 1))
            avg_loss = 0.0
            model.train()
            for i, data in enumerate(tqdm(train_dataloader)):
                inputs_all, targets_all = data
                inputs_all = inputs_all.to(device)

                for i in range(len(inputs_all) // self.chunk_len):
                    inputs = inputs_all[:, i*self.chunk_len:(i+1)*self.chunk_len, ...]

                    optimizer.zero_grad()
            
                    outputs = model(inputs)
            
                    loss = criterion(outputs, inputs)
            
                    writer.add_scalar("Loss/train", loss, epoch)
            
                    loss.backward()
                    optimizer.step()
          
                avg_loss += loss.item()

            # print(outputs[:5])
            # print(targets[:5])

            print(f'Epoch {epoch + 1} \t\t Training Loss: {\
                avg_loss / len(train_dataloader)}')
            
            if (epoch + 1) % self.validate_after == 0:
                val_loss, val_len = self.validate(model, criterion, epoch)
                print(f'Epoch {epoch + 1} \t\t Validation Loss: {\
                    val_loss / val_len}')
                global_val_loss_minima = min(global_val_loss_minima, val_loss)
                # if val_loss > global_val_loss_minima:
                #     trigger_times += 1
                #     print('trigger times:', trigger_times)
        # 
                #     if trigger_times >= self.patience:
                #         print('Early stopping!\nStart to test process.')
                #         return best_model
        # 
                # else:
                #     print('trigger times: 0')
                #     trigger_times = 0
                #     best_model = model
    
                the_last_loss = val_loss
              
            # torch.save(model.state_dict(), './Results/Models/saved_model_' + str(epoch + 1) + '.pth')

        return model


    def validate(self, model, criterion, epoch):
        model.eval()
        with torch.no_grad():
            valid_loss = 0.0
            valid_dataset = LoadData(data=self.valid_data, labels=self.valid_labels)
            valid_dataloader = DataLoader(valid_dataset, batch_size=self.batch_size, num_workers=self.num_workers,pin_memory=True)
            for i, data in enumerate(valid_dataloader):
                inputs_all, _ = data
                inputs_all = inputs_all.to(device)
                
                for i in range(len(inputs_all) // self.chunk_len):
                    inputs = inputs_all[:, i*self.chunk_len:(i+1)*self.chunk_len, ...]

                    outputs = model(inputs)
    
                    loss = criterion(outputs, inputs)
                    writer.add_scalar("Loss/validation", loss, epoch)
    
                    valid_loss += loss.item()
        return valid_loss, len(valid_dataloader) 

In [None]:
autoencodertrainer = AutoencoderTrainer(epochs = 100, batch_size = 32, learning_rate = 0.0001, num_workers = 2, train_data=x_train, valid_data=x_valid, train_labels=y_train, valid_labels=y_valid, patience=5, validate_after=1)

In [None]:
best_encoder = autoencodertrainer.train()

In [None]:
torch.save(best_encoder, "./encoder_weights")

In [None]:
best_encoder = torch.load("./encoder_weights")

## Transformer

In [None]:
class Trainer:
    def __init__(self, epochs, batch_size, learning_rate, num_workers, train_data, valid_data, train_labels, valid_labels, patience, validate_after):
      self.epochs = epochs
      self.batch_size = batch_size
      self.learning_rate = learning_rate
      self.num_workers = num_workers
      self.train_data = train_data
      self.valid_data = valid_data
      self.train_labels = train_labels
      self.valid_labels = valid_labels
      self.patience = patience
      self.validate_after = validate_after

    def train(self):
        the_last_loss = 100
        trigger_times = 0
        global_val_loss_minima = 100
            
        train_dataset = LoadData(data=self.train_data, labels=self.train_labels)
        train_dataloader = DataLoader(train_dataset, batch_size=self.batch_size, num_workers=self.num_workers,pin_memory=True)
        
        # Change class
        model = TransformerClassifier(22, 4, 128, best_encoder, n_blocks=4, dropout=0.5).to(device)
        
        criterion = nn.CrossEntropyLoss().to(device)
        optimizer = torch.optim.Adam(model.parameters(),lr=self.learning_rate, weight_decay=1e-6)
        
    
        for epoch in range(self.epochs):
            print("Starting Training Epoch " + str(epoch + 1))
            avg_loss = 0.0
            model.train()
            for i, data in enumerate(tqdm(train_dataloader)):
                inputs, targets = data
                inputs = inputs.to(device)
                targets = targets.to(device, dtype=torch.float64)
                optimizer.zero_grad()
        
                outputs = model(inputs)
        
                loss = criterion(outputs, targets)
        
                writer.add_scalar("Loss/train", loss, epoch)
        
                loss.backward()
                optimizer.step()
        
                avg_loss += loss.item()

            # print(outputs[:5])
            # print(targets[:5])

            print(f'Epoch {epoch + 1} \t\t Training Loss: {\
                avg_loss / len(train_dataloader)}')
            
            if (epoch + 1) % self.validate_after == 0:
                val_loss, val_len = self.validate(model, criterion, epoch)
                print(f'Epoch {epoch + 1} \t\t Validation Loss: {\
                    val_loss / val_len}')
                global_val_loss_minima = min(global_val_loss_minima, val_loss)

        if val_loss > global_val_loss_minima:
            trigger_times += 1
            print('trigger times:', trigger_times)
        
            if trigger_times >= self.patience:
                print('Early stopping!\nStart to test process.')
                return best_model
        
        else:
            print('trigger times: 0')
            trigger_times = 0
            best_model = model
    
        the_last_loss = val_loss
              
            # torch.save(model.state_dict(), './Results/Models/saved_model_' + str(epoch + 1) + '.pth')

        return model


    def validate(self, model, criterion, epoch):
        model.eval()
        with torch.no_grad():
            valid_loss = 0.0
            valid_dataset = LoadData(data=self.valid_data, labels=self.valid_labels)
            valid_dataloader = DataLoader(valid_dataset, batch_size=self.batch_size, num_workers=self.num_workers,pin_memory=True)
            for i, data in enumerate(valid_dataloader):
                inputs, targets = data
                inputs = inputs.to(device)
                targets = targets.to(device, torch.float64)

                outputs = model(inputs)

                loss = criterion(outputs, targets)
                writer.add_scalar("Loss/validation", loss, epoch)

                valid_loss += loss.item()
        return valid_loss, len(valid_dataloader)

In [None]:
trainer = Trainer(epochs = 25, batch_size = 64, learning_rate = 0.0001, num_workers = 2, train_data=x_train, valid_data=x_valid, train_labels=y_train, valid_labels=y_valid, patience=5, validate_after=1)

In [None]:
# torch.cuda.empty_cache()
best_model = trainer.train()
writer.flush()

In [None]:
from torchsummary import summary
best_model = summary(best_model, (1000, 22), 64)

# TESTING

USING BEST MODEL FROM EARLY STOPPING

In [None]:
model_test = best_model

In [None]:
print(best_model)

In [None]:
test_dataset = LoadData(data=x_test, labels=y_test)
test_dataloader = DataLoader(test_dataset, batch_size=64, num_workers=2,pin_memory=True)

In [None]:
correct = 0
total = 0
for i, (inputs, targets) in enumerate(test_dataloader):
    # evaluate the model on the test set
    model_test.to(device)
    inputs = inputs.to(device)
    targets = targets.to(device)
    outputs = model_test(inputs)
    y = torch.argmax(targets, dim=1)
    pred_y = torch.argmax(outputs, dim=1)
    if i == 0:
        print(y[:10])
        print(pred_y[:10])
    total += targets.size(0)
    correct += (y == pred_y).sum().item()

print(f'Accuracy of the network on the test images: {100 * correct // total} %')

# TENSORBOARD

In [None]:
!tensorboard --logdir=Results