In [1]:
import os
import sys

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

import random
import networkx as nx
import glob
from pymatreader import read_mat

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, ConcatDataset

from sklearn.metrics import mean_squared_error
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay, classification_report

import warnings
warnings.filterwarnings('ignore')
%matplotlib widget

In [2]:
# dataset class

class SeqDataset(Dataset):
    
    def __init__(self, root, seq_length, is_train, transform=None):
        self.transform = transform
        self.seqs = []
        self.seq_labels = []
        self.class_names = os.listdir(root)
        self.class_names.sort()
        self.numof_classes = len(self.class_names)
        self.seq_length = seq_length
        self.is_train = is_train

        for (i,x) in enumerate(self.class_names):
            temp = glob.glob(os.path.join(root, x, '*'))
            temp.sort()
            self.seq_labels.extend([i]*len(temp))
            for t in temp:
                df = pd.read_csv(t, header=None)
                tensor = preprocess(df)
                self.seqs.append(tensor)

    def __getitem__(self, index):
        seq = self.seqs[index]
        if self.transform is not None:
            seq = self.transform(seq, is_train=self.is_train, seq_length=self.seq_length)
        return {'seq':seq, 'label':self.seq_labels[index]}

    def __len__(self):
        return len(self.seqs)

def preprocess(df: pd.DataFrame)->np.ndarray:
    mat = df.T.values
    mat = standardization(mat, axis=1)

    return mat

def standardization(a, axis=None, ddof=0):
    a_mean = a.mean(axis=axis, keepdims=True)
    a_std = a.std(axis=axis, keepdims=True, ddof=ddof)
    a_std[np.where(a_std==0)] = 1

    return (a - a_mean) / a_std

def add_noise(data, noise_level=0.01):
    noise = np.random.normal(0, noise_level, data.shape)
    data_noisy = data + noise

    return data_noisy.astype(np.float32)

def time_shift(data, shift):
    data_shifted = np.roll(data, shift)

    return data_shifted

def transform(array, is_train, seq_length):
    if is_train:
        _, n = array.shape
        s = random.randint(0, n-seq_length)
        ts = array[:,s:s+seq_length]
        ts = add_noise(ts).astype(np.float32)
        if random.randint(0,1):
            ts_r = ts[:,::-1].copy()
            return ts_r
        return ts
    else:
        ts = array[:,:seq_length].astype(np.float32)
        return ts

In [3]:
# train dataset

train_datasets = []
train_ids = ['subject0', 'subject1', 'subject2', 'subject3']

for train_id in train_ids:
    train_dir = os.path.join('Motion Decoding Using Biosignals', 'dataset', 'train', train_id)
    dataset = SeqDataset(root=train_dir, seq_length=250, is_train=True, transform=transform)
    train_datasets.append(dataset)

train_dataset = ConcatDataset(train_datasets)
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=10, shuffle=True)

In [4]:
# test dataset

batch_size = 32

test_dir = os.path.join('Motion Decoding Using Biosignals', 'dataset', 'val', 'subject4')

test_dataset = SeqDataset(root=test_dir, seq_length=250, is_train=False, transform=transform)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=10, shuffle=False)

In [14]:
# modeling lstm_reservoir

class LSTM_RCLayer(nn.Module):
    def __init__(self, input_size, reservoir_size, spectral_radius=0.9, sparsity=0.1):
        super(LSTM_RCLayer, self).__init__()
        self.reservoir_size = reservoir_size
        self.input_size = input_size

        self.Win = torch.empty(reservoir_size, input_size).uniform_(-1.0, 1.0)

        self.W = torch.empty(reservoir_size, reservoir_size).uniform_(-1.0, 1.0)
        mask = torch.rand_like(self.W) < sparsity
        self.W = self.W * mask

        eigvals = torch.linalg.eigvals(self.W)
        max_eigval = torch.max(torch.abs(eigvals))
        self.W = self.W * (spectral_radius / max_eigval)

        self.state = None

    def forward(self, x):
        device = x.device
        batch_size, seq_len, _ = x.size()
        self.Win = self.Win.to(device)
        self.W = self.W.to(device)
        self.state = torch.zeros(batch_size, self.reservoir_size, dtype=torch.float32).to(device)
        outputs = []

        for t in range(seq_len):
            u = x[:, t, :] 
            new_state = torch.tanh(torch.matmul(u, self.Win.T) + torch.matmul(self.state, self.W.T))
            self.state = new_state
            outputs.append(self.state.unsqueeze(1))

        return torch.cat(outputs, dim=1)

class LSTM_Reservoir(nn.Module):
    def __init__(self, num_channels, num_classes, reservoir_size=400, hidden_size=64, num_layers=2):
        super(LSTM_Reservoir, self).__init__()

        self.lstm = nn.LSTM(input_size=num_channels, hidden_size=hidden_size, num_layers=num_layers, batch_first=True, bidirectional=True)
        self.reservoir = LSTM_RCLayer(input_size=hidden_size * 2, reservoir_size=reservoir_size)
        self.fc = nn.Linear(reservoir_size, num_classes)

    def forward(self, x):
        x = x.permute(0, 2, 1)  
        lstm_output, _ = self.lstm(x)
        reservoir_output = self.reservoir(lstm_output)

        out = reservoir_output[:, -1, :]
        out = self.fc(out)

        return out

num_channels = 72
num_classes = 3

model = LSTM_Reservoir(num_channels, num_classes)

input_data = torch.randn(32, num_channels, 300) 
output_data = model(input_data)

In [15]:
# model training

def train(log_interval, model, device, train_loader, optimizer, epoch, iteration):

    model.train()
    criterion = torch.nn.CrossEntropyLoss()

    for sample_batched in train_loader:
        data, target = sample_batched['seq'].to(device), sample_batched['label'].to(device)

        optimizer.zero_grad()
        output = model(data)
        pred = output.max(1, keepdim=True)[1]
        correct = pred.eq(target.view_as(pred)).sum().item()
        loss = criterion(output, target)
        loss.backward()
        optimizer.step()
        iteration += 1

    #    if iteration % log_interval == 0:
    #        print('Train Accracy: {3:5.2f}%  train_loss: {2:.6f} \n'.format(epoch, iteration, loss.item(), 100.*correct/float(len(sample_batched['label']))))
            
    return iteration

def val(model, device, test_loader):

    model.eval()
    criterion = torch.nn.CrossEntropyLoss(reduction='sum')
    test_loss = 0
    correct = 0

    with torch.no_grad():
        for sample_batched in test_loader:
            data, target = sample_batched['seq'].to(device), sample_batched['label'].to(device)

            output = model(data)
            test_loss += criterion(output, target).item()
            pred = output.max(1, keepdim=True)[1]
            correct += pred.eq(target.view_as(pred)).sum().item()

    test_loss /= float(len(test_loader.dataset))
    correct /= float(len(test_loader.dataset))
    
    print('Validation Accuracy: {0:.2f}%  Test Loss: {1:.6f} \n'.format(100. * correct, test_loss))

    return test_loss, 100. * correct

def evaluate(model, device, test_loader):

    preds = []
    trues = []
    model.eval()

    with torch.no_grad():
        for sample_batched in test_loader:
            data, target = sample_batched['seq'].to(device), sample_batched['label'].to(device)

            output = model(data)
            pred = [test_loader.dataset.class_names[i] for i in list(output.max(1)[1].cpu().detach().numpy())]
            preds += pred
            true = [test_loader.dataset.class_names[i] for i in list(target.cpu().detach().numpy())]
            trues += true

    labels = test_loader.dataset.class_names

    cm = confusion_matrix(trues, preds, labels=labels)
    disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=labels)
    cr = classification_report(trues, preds, target_names=labels)
    print(cr)
    correct = 0

    for pred, true in zip(preds, trues):
        if pred == true:
            correct += 1
            
    df = pd.DataFrame({'pred': preds, 'true': trues})

    return correct/len(trues), df

def train_evaluate(train_loader, test_loader, log_interval, num_epoches, seq_length, transform=None, num_channels=72, num_classes=3):

    model = LSTM_Reservoir(num_channels=num_channels, num_classes=num_classes)
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    
    model.to(device)
    optimizer = torch.optim.Adam(model.parameters())
    iteration = 0

    for epoch in range(1, 1+num_epoches):
        iteration = train(log_interval, model, device, train_loader, optimizer, epoch, iteration)
        if epoch%10==0:
            test_loss, test_acc = val(model, device, test_loader)
    acc, df = evaluate(model, device, test_loader)

    print(f'Final Acccuracy: {acc}')
    return model

log_interval = 1000
num_epoches = 50
seq_length = 250

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=20, shuffle=True)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=20, shuffle=False)

model = train_evaluate(train_loader, test_loader, log_interval, num_epoches, seq_length, transform)

Validation Accuracy: 24.27%  Test Loss: 2.148120 

Validation Accuracy: 23.22%  Test Loss: 3.079083 

Validation Accuracy: 24.06%  Test Loss: 5.034041 

Validation Accuracy: 28.24%  Test Loss: 4.910425 

Validation Accuracy: 26.36%  Test Loss: 5.363013 

                    precision    recall  f1-score   support

 backside_kickturn       0.36      0.32      0.34       123
frontside_kickturn       0.15      0.30      0.20       115
           pumping       0.36      0.22      0.27       240

          accuracy                           0.26       478
         macro avg       0.29      0.28      0.27       478
      weighted avg       0.31      0.26      0.27       478

Final Acccuracy: 0.26359832635983266
