***jointly training 1 layer base 0.5 with only first segement training data***

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
pip install mne



In [None]:
import numpy as np

In [None]:
from dataset import Raw_PhysionNet,PSD_PhysioNet
from torch.utils.data import  random_split,DataLoader
import torch
import os
import config
import torch.nn as nn
import torch.nn.functional as F
import matplotlib.pyplot as plt
import time

In [None]:
num_channels = 64
hidden_size = 128
num_layers = 1
NUM_SUBJS=106

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"

In [None]:
base_dir = os.path.join("/content/drive/MyDrive","EEG_AUTH_Experiments","exp22")
if not os.path.isdir(base_dir):
  os.makedirs(base_dir)

In [None]:
chk_point_best = os.path.join(base_dir,"best_model.pth")
chk_point_last = os.path.join(base_dir,"last_model.pth")

In [None]:
chk_point_best

'/content/drive/MyDrive/EEG_AUTH_Experiments/exp22/best_model.pth'

In [None]:
def collate_fn(batch):
    """Modifies the batch by creating random sequence length on every input"""
    # print(len(batch))
    signals, labels = zip(*batch)

    # Optional: Set seed to a random value to ensure randomness
    torch.manual_seed(int(time.time() * 1000) % (2**32 - 1))

    # Getting random length of signal to be used
    sig_len = torch.randint(low=1, high=20, size=(1,)).item() / 10 # between (0.1 and 2.0)
    # print(sig_len)

    # Modify signals based on the generated random length
    signals = tuple(sig[:,:int(sig_len * 160)] for sig in signals)

    return signals, labels

In [None]:
def gen_dataloader(dataset,split_ratios):
    """
    Args:
        dataset: torch dataset
        split_ratio: list of floats that sums to one, represeting the size of train ,val and test_set
    """

    gen1= torch.Generator().manual_seed(37)

    train,val = random_split(dataset,lengths=split_ratios,generator=gen1)
    # train,val,test = random_split(dataset,lengths=split_ratios)

    train_loader = DataLoader(train,batch_size=64,shuffle=True,collate_fn=collate_fn)
    val_loader = DataLoader(val,batch_size=64,shuffle=False,collate_fn=collate_fn)
    # test_loader = DataLoader(test,batch_size=32,shuffle=False)

    return train_loader,val_loader
    # return train_loader,val_loader,test_loader

In [None]:
def train(model, train_loader, val_loader, optimizer, criterion, num_epochs, loss_train, loss_val, acc_train, acc_val):
    global best_eval_acc

    for epoch in range(num_epochs):

        model.train()
        epoch_loss = 0
        correct_train = 0
        total_train = 0

        for batch_data, batch_labels in train_loader:
            batch_data = batch_data.permute(0, 2, 1)
            batch_data = batch_data.float().to(device)
            batch_labels = batch_labels.long().to(device)

            if batch_data.shape[0] == 1:
                continue  # Skipping any batch size with only one example since batch normalization is being used

            optimizer.zero_grad()
            outputs = model(batch_data)
            loss = criterion(outputs, batch_labels)
            epoch_loss += loss.item()
            loss.backward()
            optimizer.step()

            _, predicted = torch.max(outputs, 1)
            total_train += batch_labels.size(0)
            correct_train += (predicted == batch_labels).sum().item()

        # Calculate and save the epoch loss and accuracy for training
        epoch_loss /= len(train_loader)
        train_accuracy = correct_train / total_train
        loss_train.append(epoch_loss)
        acc_train.append(train_accuracy)

        # Validation loop
        model.eval()
        with torch.no_grad():
            val_loss = 0.0
            correct_val = 0
            total_val = 0
            for batch_data, batch_labels in val_loader:
                batch_data = batch_data.permute(0, 2, 1)
                batch_data = batch_data.float().to(device)
                batch_labels = batch_labels.long().to(device)

                outputs = model(batch_data)
                _, predicted = torch.max(outputs, 1)
                total_val += batch_labels.size(0)
                correct_val += (predicted == batch_labels).sum().item()
                val_loss += criterion(outputs, batch_labels).item()

            val_loss /= len(val_loader)
            val_accuracy = correct_val / total_val

            loss_val.append(val_loss)
            acc_val.append(val_accuracy)

        if val_accuracy > best_eval_acc:
            print(f"---- new best val acc achieved {val_accuracy} ----")
            torch.save(model.state_dict(), chk_point_best)
            best_eval_acc = val_accuracy

        torch.save(model.state_dict(), chk_point_last)

        print(f'Epoch [{epoch+1}/{num_epochs}], Train Loss: {epoch_loss:.4f}, Train Acc: {train_accuracy:.2f}, Val Loss: {val_loss:.4f}, Val Acc: {val_accuracy:.2f}')

    return loss_train, loss_val, acc_train, acc_val


In [None]:
def train_variable_length(model, train_loaders, val_loaders, optimizer, criterion, num_epochs, loss_train, loss_val, acc_train, acc_val):
    global best_eval_acc

    for epoch in range(num_epochs):
        model.train()
        total_epoch_loss = 0
        correct_train = 0
        total_train = 0

        optimizer.zero_grad()

        loss_combined = 0
        # Iterate over all training loaders
        for train_loader in train_loaders:
            for batch_data, batch_labels in train_loader:
                batch_data = batch_data.permute(0, 2, 1)
                batch_data = batch_data.float().to(device)
                batch_labels = batch_labels.long().to(device)

                if batch_data.shape[0] == 1:
                    continue  # Skip batches with only one example

                optimizer.zero_grad()
                outputs = model(batch_data)
                loss = criterion(outputs, batch_labels)
                loss_combined = loss_combined + loss
                total_epoch_loss += loss.item()


                _, predicted = torch.max(outputs, 1)
                total_train += batch_labels.size(0)
                correct_train += (predicted == batch_labels).sum().item()



        # Step after processing all loaders
        loss_combined.backward()
        optimizer.step()

        # Calculate and save the epoch loss and accuracy for training
        total_epoch_loss /= sum(len(loader) for loader in train_loaders)
        train_accuracy = correct_train / total_train
        loss_train.append(total_epoch_loss)
        acc_train.append(train_accuracy)

        # Validation loop
        model.eval()
        with torch.no_grad():
            total_val_loss = 0.0
            correct_val = 0
            total_val = 0

            # Iterate over all validation loaders
            for val_loader in val_loaders:
                for batch_data, batch_labels in val_loader:
                    batch_data = batch_data.permute(0, 2, 1)
                    batch_data = batch_data.float().to(device)
                    batch_labels = batch_labels.long().to(device)

                    outputs = model(batch_data)
                    loss = criterion(outputs, batch_labels)
                    total_val_loss += loss.item()

                    _, predicted = torch.max(outputs, 1)
                    total_val += batch_labels.size(0)
                    correct_val += (predicted == batch_labels).sum().item()

            total_val_loss /= sum(len(loader) for loader in val_loaders)
            val_accuracy = correct_val / total_val

            loss_val.append(total_val_loss)
            acc_val.append(val_accuracy)

            if val_accuracy > best_eval_acc:
                print(f"---- new best val acc achieved {val_accuracy:.4f} ----")
                torch.save(model.state_dict(), chk_point_best)
                best_eval_acc = val_accuracy

            torch.save(model.state_dict(), chk_point_last)

            print(f'Epoch [{epoch+1}/{num_epochs}], Train Loss: {total_epoch_loss:.4f}, Train Acc: {train_accuracy:.2f}, Val Loss: {total_val_loss:.4f}, Val Acc: {val_accuracy:.2f}')

    return loss_train, loss_val, acc_train, acc_val


In [None]:
def test(model,test_loader,criterion):

    model.eval()

    prdicted_labels = [] # list of all predicted labels

    with torch.no_grad():
        val_loss = 0.0
        correct = 0
        total = 0
        for batch_data, batch_labels in test_loader:
            batch_data = batch_data.permute(0,2,1)
            outputs = model(batch_data.float().to(device))
            _, predicted = torch.max(outputs, 1)
            total += batch_labels.size(0)
            correct += (predicted == batch_labels.to(device)).sum().item()

            val_loss += criterion(outputs, batch_labels.long().to(device)).item()

            prdicted_labels = prdicted_labels + list(predicted)

        val_loss /= len(test_loader)
        accuracy = correct / total


    print(f'Test Loss: {val_loss:.4f}, Test Acc: {accuracy:.2f}')

    return val_loss , accuracy , prdicted_labels


In [None]:
class LSTM_Encoder(nn.Module):
    def __init__(self, input_dim, hidden_dim, num_layers=1,bi=False):
        super(LSTM_Encoder, self).__init__()
        self.hidden_dim = hidden_dim
        self.num_layers = num_layers

        self.bi = bi
        # Define the LSTM layer
        self.lstm = nn.LSTM(input_dim, hidden_dim, num_layers, batch_first=True,bidirectional=bi)

        if bi == True:
            self.num_layers = self.num_layers*2

    def forward(self, x):
        # x: (batch_size, sequence_length, input_dim)
        # Initialize hidden and cell state with zeros


        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_dim).to(x.device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_dim).to(x.device)

        # Pass through the LSTM layer
        output, (hn, cn) = self.lstm(x, (h0, c0))

        # output = output[:,-1,:]
        # print("hn.shape ",hn.shape)
        if self.bi:

          hn = torch.cat((hn[0],hn[1]),dim=-1)
        # print("hn.shape ",hn.shape)
        hn = hn[-1,:,:]
        return hn  # Return the last output


class LSTM_Classifier(nn.Module):
    def __init__(self, input_dim, hidden_dim, num_classes,dropout=0.7,enc_layers=2,bi=False):
        super(LSTM_Classifier, self).__init__()

        self.lstm_enc = LSTM_Encoder(input_dim,hidden_dim,enc_layers,bi)

        if bi == True:
            hidden_dim = hidden_dim*2

        self.fc1 = nn.Linear(hidden_dim, hidden_dim)

        self.dropout = nn.Dropout(dropout)

        # self.fc2 = nn.Linear(hidden_dim, 100)


        self.classifier = nn.Linear(hidden_dim, num_classes)

        self.relu = nn.ReLU()

    def forward(self, x):

        x = self.relu(self.lstm_enc(x)) # getting encodig from the lstm based encoder

        x = self.dropout(x)

        x = self.relu(self.fc1(x))
        # x = self.relu(self.fc2(x))

        x = self.classifier(x)
        return x

**Singature Extractor based on the Classifier**


In [None]:
raw_dataset_train = Raw_PhysionNet(activity="fist_real",sample_windows=False,include_rest=False,extract_delta=False,train=True,window_length=2,slide_delta=0.1)



.... found 327 edf files ....
---- data from subject 88 is being excluded because of lesser sampling rate ---- 
---- data from subject 92 is being excluded because of lesser sampling rate ---- 
---- data from subject 100 is being excluded because of lesser sampling rate ---- 
---- data loaded from total of 106 -----


In [None]:
# raw_dataset_train_1 = Raw_PhysionNet(activity="fist_real",include_rest=False,extract_delta=False,train=True,window_length=1.0,slide_delta=0.1)

In [None]:
# raw_dataset_train_1_5 = Raw_PhysionNet(activity="fist_real",include_rest=False,extract_delta=False,train=True,window_length=1.5,slide_delta=0.1)

In [None]:
# raw_dataset_train_2_0 = Raw_PhysionNet(activity="fist_real",include_rest=False,extract_delta=False,train=True,window_length=2.0,slide_delta=0.1)

In [None]:
raw_dataset_test = Raw_PhysionNet(activity="fist_real",sample_windows=False,include_rest=False,extract_delta=False,train=False,window_length=1.25,slide_delta=0.1)



.... found 327 edf files ....
---- data from subject 88 is being excluded because of lesser sampling rate ---- 
---- data from subject 92 is being excluded because of lesser sampling rate ---- 
---- data from subject 100 is being excluded because of lesser sampling rate ---- 
---- data loaded from total of 106 -----


In [None]:
deivce = "cuda" if torch.cuda.is_available() else "cpu"

In [None]:
model = LSTM_Classifier(input_dim=num_channels,hidden_dim=hidden_size,enc_layers=num_layers,num_classes=NUM_SUBJS,bi=False)
criterion =nn.CrossEntropyLoss()

In [None]:
model.to(device)

LSTM_Classifier(
  (lstm_enc): LSTM_Encoder(
    (lstm): LSTM(64, 128, batch_first=True)
  )
  (fc1): Linear(in_features=128, out_features=128, bias=True)
  (dropout): Dropout(p=0.7, inplace=False)
  (classifier): Linear(in_features=128, out_features=106, bias=True)
  (relu): ReLU()
)

In [None]:
model.load_state_dict(torch.load("/content/drive/MyDrive/exp10/Copy of best_model.pth")) # loading best model

<All keys matched successfully>

In [None]:
# model.load_state_dict(torch.load(chk_point_best))

In [None]:
# train_loaders = []
# val_loaders = []
# for dataset in (raw_dataset_train_5,raw_dataset_train_1_5,raw_dataset_train_1,raw_dataset_train_2_0):
#  train_loader,val_loader = gen_dataloader(dataset,[0.85,0.15])

#  train_loaders.append(train_loader)
#  val_loaders.append(val_loader)

In [None]:
train_loader,val_loader = gen_dataloader(raw_dataset_train,[0.8,0.2])
test_loader = DataLoader(raw_dataset_test,batch_size=32,shuffle=False)

In [None]:
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)

In [None]:
loss_train=[]
loss_val=[]
acc_val= []
acc_train = []

In [None]:
for i in train_loader:
    print(i[0].shape)
    break

AttributeError: 'tuple' object has no attribute 'shape'

In [None]:
len(raw_dataset_train) * 0.85

In [None]:
len(raw_dataset_test)

In [None]:
best_eval_acc = 0


In [None]:
# train_variable_length(model,train_loaders,val_loaders,optimizer,criterion,1000,loss_train,loss_val,acc_train,acc_val)


In [None]:
train(model,train_loader,val_loader,optimizer,criterion,1000,loss_train,loss_val,acc_train,acc_val)

In [None]:
x = np.arange(start=0,stop=len(loss_train))
plt.title("Losses")
plt.plot(x,loss_train,label="train")
plt.plot(x,loss_val,label="val")
plt.xlabel("epochs")
plt.ylabel("losses")
plt.legend()
plt.grid()
plt.show()

In [None]:
plt.title("Accuracies")
plt.plot(x,acc_train,label="train")
plt.plot(x,acc_val,label="val")
plt.xlabel("epochs")
plt.ylabel("losses")
plt.legend()
plt.grid()
plt.show()

In [None]:
def test_on_segements():
  losses = []
  accs = []
  labels = []

  max = 10
  for i in range(1,20):
    sig_len = i/max

    raw_dataset_test = Raw_PhysionNet(activity="fist_real",include_rest=False,extract_delta=False,train=False,window_length=sig_len,slide_delta=0.1)
    test_loader = DataLoader(raw_dataset_test,batch_size=32,shuffle=False)
    y = test(model,test_loader,criterion)
    losses.append(y[0])
    accs.append(y[1])
    labels.append(y[2])
    del raw_dataset_test
    del test_loader

  return losses,accs,labels


In [None]:
def test_incremental_segements(raw_dataset_test):

  losses = []
  accs = []
  labels = []
  truth = []

  # raw_dataset_test = Raw_PhysionNet(activity="fist_real",include_rest=False,extract_delta=False,train=False,window_length=2.0,slide_delta=0.1)
  sample_rate = 160

  x_raw = raw_dataset_test.eeg_data_x
  x_standard = np.zeros_like(x_raw)

  for idx,sample in enumerate(x_raw):
    sample = raw_dataset_test.standardize_rows(sample)
    x_standard[idx] = sample



  max = 10
  for i in range(1,21):
    sig_len = i/max

    x = torch.tensor(x_standard[:,:,:int(sig_len*sample_rate)])
    print(x.shape)
    y = torch.tensor(raw_dataset_test.eeg_data_y)
    y = y.view(y.shape[0])

    test_dataset= torch.utils.data.TensorDataset(x,y)

    test_loader = DataLoader(test_dataset,batch_size=32,shuffle=False)
    y = test(model,test_loader,criterion)
    losses.append(y[0])
    accs.append(y[1])
    labels.append(y[2])
    # truth.append(y[3])

    del test_loader
    del test_dataset
    del x
    del y

  return losses,accs,labels,truth


In [None]:
losses,accs,labels,truth = test_incremental_segements(raw_dataset_test)

In [None]:
x = np.arange(start=0.1,stop=2.1,step=0.1)

In [None]:
plt.title("Signal Length Vs Time")
plt.plot(x,accs)
plt.xlabel("time in seconds")
plt.ylabel("accuracy")
plt.grid()

In [None]:
torch.save({"losses_test":losses,"accs":accs,"labels":labels},os.path.join(base_dir,"test_variable_results.pth"))

In [None]:
y = test(model,val_loaders[2],criterion)


In [None]:
import numpy as np
import matplotlib.pyplot as plt
x = list(np.arange(1, len(loss_train)+1))
plt.plot(np.arange(1, len(loss_val)+1),loss_val,label='loss validation')
plt.plot(x,loss_train,label='loss train')
plt.legend()

In [None]:
plt.plot(np.arange(1,len(acc_val)+1),acc_val,label="validation accuracy")
plt.plot(np.arange(1,len(acc_train)+1),acc_train,label="training accuracy")

plt.legend()
plt.show()

In [None]:
chk_point_best

In [None]:
torch.save({"loss_val":loss_val,"loss_train":loss_train,"acc_val":acc_val,"acc_train":acc_train},os.path.join(base_dir,"last_lists.pth"))