In [1]:
import numpy as np
import pandas as pd
from scipy import io as sio

import warnings

warnings.filterwarnings('ignore')

### Load the Data

In [2]:
data = sio.loadmat('Project_data.mat')
channels = data['Channels']
test_data = data['TestData']
train_data = data['TrainData']
train_labels = data['TrainLabels'].ravel()
fs = data['fs']

In [3]:
train_labels.shape, train_data.shape, test_data.shape

((550,), (59, 5000, 550), (59, 5000, 159))

### Load Pre-Calculated Features

In [4]:
var = np.load('var.npy')
amp_hist = np.load('amp_hist.npy')
ar_model = np.load('ar_model.npy')
cross_corr = np.load('cross_corr.npy')
max_freq = np.load('max_freq.npy')
mean_freq = np.load('mean_freq.npy')
med_freq = np.load('med_freq.npy')
rel_energy = np.load('rel_energy.npy')

var_test = np.load('var_test.npy')
amp_hist_test = np.load('amp_hist_test.npy')
ar_model_test = np.load('ar_model_test.npy')
cross_corr_test = np.load('cross_corr_test.npy')
max_freq_test = np.load('max_freq_test.npy')
mean_freq_test = np.load('mean_freq_test.npy')
med_freq_test = np.load('med_freq_test.npy')
rel_energy_test = np.load('rel_energy_test.npy')

In [5]:
from sklearn.preprocessing import StandardScaler

features_tr = np.concatenate((var, amp_hist, ar_model, cross_corr, max_freq, mean_freq, med_freq, rel_energy), axis = 1)
features_te = np.concatenate((var_test, amp_hist_test, ar_model_test, cross_corr_test, max_freq_test, mean_freq_test, med_freq_test, rel_energy_test), axis = 1)

sc = StandardScaler()

features_tr = sc.fit_transform(features_tr)
features_te = sc.fit_transform(features_te)

np.save('features_train_scaled', features_tr)
np.save('features_test_scaled', features_te)

features_tr.shape, features_te.shape

((550, 5369), (159, 5369))

In [7]:
var_fea = ['variance_' + str(i) for i in range(var.shape[1])]
hist_fea = ['hist_' + str(i) for i in range(amp_hist.shape[1])]
ar_fea = ['ar_' + str(i) for i in range(ar_model.shape[1])]
cross_fea = ['correlation_' + str(i) for i in range(cross_corr.shape[1])]
max_fea = ['max_freq_' + str(i) for i in range(max_freq.shape[1])]
mean_fea = ['mean_freq_' + str(i) for i in range(mean_freq.shape[1])]
med_fea = ['median_freq_' + str(i) for i in range(med_freq.shape[1])]
energ_fea = ['rel_energy_' + str(i) for i in range(rel_energy.shape[1])]

all_fea = var_fea + hist_fea + ar_fea + cross_fea + max_fea + mean_fea + med_fea + energ_fea

def get_feature_name(index):
    return all_fea[index]

len(all_fea), get_feature_name(900)

(5369, 'ar_192')

In [8]:
features_tr.shape, train_labels.shape

((550, 5369), (550,))

### PSO Algorithm

In [31]:
import numpy as np

class Particle:
    def __init__(self, num_features, total_features):
        self.position = np.random.choice(total_features, size=(num_features,), replace=False)
        self.velocity = np.random.rand(num_features)
        self.pbest_position = self.position
        self.pbest_value = -float('inf')

    def update_velocity(self, gbest_position, w=0.5, c1=1, c2=2):
        r1 = np.random.rand(len(self.velocity))
        r2 = np.random.rand(len(self.velocity))
        
        cognitive_velocity = c1 * r1 * (self.pbest_position - self.position)
        social_velocity = c2 * r2 * (gbest_position - self.position)
        
        self.velocity = w * self.velocity + cognitive_velocity + social_velocity

    def update_position(self, total_features):
        old_position = self.position.copy()
        self.position = self.position + self.velocity
        
        clipped_indices = np.where((self.position < 0) | (self.position > total_features-1))
        self.position = np.clip(self.position, 0, total_features-1)  # Ensure the position is within the bounds
        self.position = np.round(self.position).astype(int)  # Ensure the position is an integer
    
        # Negate the velocity for clipped positions
        self.velocity[clipped_indices] = -self.velocity[clipped_indices]


class Swarm:
    def __init__(self, num_particles, num_features, total_features):
        self.particles = [Particle(num_features, total_features) for _ in range(num_particles)]
        self.gbest_value = -float('inf')
        self.gbest_position = np.random.choice(total_features, size=(num_features,), replace=False)

    def update_gbest(self, X, y):
        for particle in self.particles:
            fitness_cadidate = self.fitness(particle.position, X, y)
            if(particle.pbest_value < fitness_cadidate):
                particle.pbest_value = fitness_cadidate
                particle.pbest_position = particle.position

            if(self.gbest_value < fitness_cadidate):
                self.gbest_value = fitness_cadidate
                self.gbest_position = particle.position

    def move_particles(self, total_features):
        for particle in self.particles:
            particle.update_velocity(self.gbest_position)
            particle.update_position(total_features)

    def fitness(self, position, X, y):
        X = X[:, position]
        mu1 = np.mean(X[y == -1, :], axis=0)
        mu2 = np.mean(X[y == 1, :], axis=0)
        mu0 = np.mean(X, axis=0)
    
        S1 = np.sum([(x_i - mu1).reshape(-1, 1) @ (x_i - mu1).reshape(1, -1) for x_i in X[y == -1, :]], axis=0)
        S2 = np.sum([(x_i - mu2).reshape(-1, 1) @ (x_i - mu2).reshape(1, -1) for x_i in X[y == 1, :]], axis=0)
    
        Sb = np.sum([(mu_i - mu0).reshape(-1, 1) @ (mu_i - mu0).reshape(1, -1) for mu_i in [mu1, mu2]], axis=0)
    
        Sw = S1 + S2
    
        J = np.trace(Sb) / np.trace(Sw)
    
        return J

In [32]:
num_features = 50
total_features = 5369
num_particles = 1000
num_iterations = 100

swarm = Swarm(num_particles, num_features, total_features)

for i in range(num_iterations):
    swarm.move_particles(total_features)
    swarm.update_gbest(features_tr, train_labels)

In [37]:
np.save('features_PSO', swarm.gbest_position)

#### Select the given features

In [38]:
X_tr = features_tr[:, swarm.gbest_position]
X_te = features_te[:, swarm.gbest_position]
y_tr = train_labels

### MLP

In [40]:
import torch
from torch import nn, optim
from sklearn.model_selection import KFold
from torch.utils.data import TensorDataset, DataLoader

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

class MLPNet(nn.Module):
    def __init__(self, input_size, hidden1_size, hidden2_size):
        super().__init__()
        self.l1 = nn.Linear(input_size, hidden1_size) 
        self.l2 = nn.Linear(hidden1_size, hidden2_size)  
        self.l3 = nn.Linear(hidden2_size, 1)
        self.relu = nn.ReLU()
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        out = self.relu(self.l1(x))
        out = self.relu(self.l2(out))
        out = self.sigmoid(self.l3(out))
        return out

#### Hyper Parameters 

In [45]:
batch_size = 10
learning_rate = 0.01
input_size = 50
hidden1_size = 60
hidden2_size = 60
num_epochs = 10
num_total_steps = 550 // batch_size

model = MLPNet(input_size, hidden1_size, hidden2_size).to(device)
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
criterion = nn.BCELoss()

### Train

In [46]:
X_tr_tensor = torch.tensor(np.float32(X_tr))
y_tr_tensor = torch.tensor(np.float32(np.where(y_tr == 1, 1.0, 0.0)))

kfold = KFold(n_splits=5, shuffle=True)

train_accuracies = []
valid_accuracies = []
for fold, (train_ids, valid_ids) in enumerate(kfold.split(X_tr_tensor)):
    print(f'FOLD {fold}')
    print('--------------------------------')

    train_subsampler = torch.utils.data.SubsetRandomSampler(train_ids)
    valid_subsampler = torch.utils.data.SubsetRandomSampler(valid_ids)
    train_loader = DataLoader(TensorDataset(X_tr_tensor, y_tr_tensor), batch_size=batch_size, sampler=train_subsampler)
    valid_loader = DataLoader(TensorDataset(X_tr_tensor, y_tr_tensor), batch_size=batch_size, sampler=valid_subsampler)

    for epoch in range(num_epochs): 
            for i, (inputs, targets) in enumerate(train_loader):
                inputs = inputs.to(device)
                targets = targets.to(device)
                
                outputs = model(inputs)
                
                loss = criterion(outputs.ravel(), targets)
                
                loss.backward()
                optimizer.step()
                optimizer.zero_grad()

                if (i+1) % num_total_steps-1 == 0:
                    print (f'Epoch [{epoch+1}/{num_epochs}], Step [{i+1}/{num_total_steps}], Loss: {loss.item():.4f}')

    correct, total = 0, 0
    with torch.no_grad():
        for inputs, targets in train_loader:
            inputs = inputs.to(device)
            targets = targets.to(device)
            
            outputs = model(inputs)
            
            predicted = torch.where(outputs > 0.5, 1, 0).T.to(device)
            
            total += targets.size(0)
            correct += (predicted == targets).sum().item()

    train_accuracies.append(100 * correct / total)
    print(f'Train Accuracy: {100 * correct / total:.2f}%')
    print('--------------------------------')

    correct, total = 0, 0
    with torch.no_grad():
        for inputs, targets in valid_loader:
            inputs = inputs.to(device)
            targets = targets.to(device)
            
            outputs = model(inputs)
            
            predicted = torch.where(outputs > 0.5, 1, 0).T.to(device)
            
            total += targets.size(0)
            correct += (predicted == targets).sum().item()

    valid_accuracies.append(100 * correct / total)
    print(f'Validation Accuracy: {100 * correct / total:.2f}%')
    print('--------------------------------')

print('\nOveral Results')
print(f'Average Training Accurcy is : {np.mean(train_accuracies)}')
print(f'Average Validation Accurcy is : {np.mean(valid_accuracies)}')

FOLD 0
--------------------------------
Epoch [1/10], Step [1/55], Loss: 0.7001
Epoch [2/10], Step [1/55], Loss: 0.4845
Epoch [3/10], Step [1/55], Loss: 0.5434
Epoch [4/10], Step [1/55], Loss: 0.3442
Epoch [5/10], Step [1/55], Loss: 0.3051
Epoch [6/10], Step [1/55], Loss: 0.6284
Epoch [7/10], Step [1/55], Loss: 0.6700
Epoch [8/10], Step [1/55], Loss: 0.4119
Epoch [9/10], Step [1/55], Loss: 0.3153
Epoch [10/10], Step [1/55], Loss: 0.5139
Train Accuracy: 86.59%
--------------------------------
Validation Accuracy: 78.18%
--------------------------------
FOLD 1
--------------------------------
Epoch [1/10], Step [1/55], Loss: 0.4689
Epoch [2/10], Step [1/55], Loss: 0.4388
Epoch [3/10], Step [1/55], Loss: 0.0599
Epoch [4/10], Step [1/55], Loss: 0.2328
Epoch [5/10], Step [1/55], Loss: 0.0398
Epoch [6/10], Step [1/55], Loss: 0.1157
Epoch [7/10], Step [1/55], Loss: 0.1947
Epoch [8/10], Step [1/55], Loss: 0.1810
Epoch [9/10], Step [1/55], Loss: 0.2232
Epoch [10/10], Step [1/55], Loss: 0.1229
T

#### Predict Labels 

In [47]:
test_data_tensor = torch.tensor(np.float32(X_te)).to(device)

model.eval()

with torch.no_grad():
    outputs = model(test_data_tensor)
        
predicted = torch.where(outputs > 0.5, 1, 0).T
np.save('test_labels_MLP_PSO', predicted.cpu())
predicted

tensor([[0, 0, 0, 0, 1, 0, 1, 1, 1, 1, 0, 1, 0, 0, 0, 0, 0, 1, 1, 1, 0, 0, 1, 0,
         0, 1, 1, 0, 1, 0, 0, 1, 1, 0, 1, 1, 0, 1, 0, 1, 1, 0, 0, 0, 0, 0, 1, 0,
         1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 1, 1, 0, 0, 1, 1, 1, 0, 0, 1, 0, 0, 1, 0,
         1, 1, 0, 0, 1, 1, 1, 0, 0, 0, 0, 0, 1, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 0,
         1, 1, 1, 1, 0, 1, 0, 0, 0, 1, 1, 0, 1, 0, 0, 0, 0, 1, 0, 1, 1, 0, 0, 0,
         0, 0, 1, 0, 1, 0, 1, 1, 0, 0, 1, 0, 1, 1, 1, 1, 0, 1, 1, 0, 1, 1, 0, 0,
         1, 0, 0, 1, 1, 1, 1, 1, 1, 0, 1, 1, 0, 0, 0]], device='cuda:0')

### RBF

In [66]:
import torch
from torch import nn, optim
from torch.autograd import Variable
from sklearn.cluster import KMeans

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

class RBF(nn.Module):
    def __init__(self, input_dim, output_dim, X_train):
        super().__init__()
        kmeans = KMeans(n_clusters=output_dim)
        kmeans.fit(X_train)
        self.centers = nn.Parameter(torch.tensor(kmeans.cluster_centers_).float())
        self.beta = nn.Parameter(torch.ones(1, output_dim))

    def radial_function(self, x):
        x = x.unsqueeze(-1)  # add an extra dimension at the end for broadcasting
        return torch.exp(-self.beta.mul((x - self.centers.T).pow(2).sum(1)))

    def forward(self, x):
        return self.radial_function(x)

class RBFNet(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, X_train):
        super().__init__()
        self.rbf = RBF(input_dim, hidden_dim, X_train)
        self.linear = nn.Linear(hidden_dim, output_dim)

    def forward(self, x):
        x = self.rbf(x)
        return self.linear(x)

#### Hyperparameters

In [67]:
batch_size = 10
learning_rate = 0.01
input_dim = 50 
hidden_dim = 20
output_dim = 1
num_epochs = 10
num_total_steps = 550 // batch_size

model = RBFNet(input_dim, hidden_dim, output_dim, torch.tensor(np.float32(X_tr))).to(device)
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
criterion = nn.MSELoss()

### Train

In [70]:
X_tr_tensor = torch.tensor(np.float32(X_tr))
y_tr_tensor = torch.tensor(np.float32(np.where(y_tr == 1, 1.0, 0.0)))

kfold = KFold(n_splits=5, shuffle=True)

train_accuracies = []
valid_accuracies = []
for fold, (train_ids, valid_ids) in enumerate(kfold.split(X_tr_tensor)):
    print(f'FOLD {fold}')
    print('--------------------------------')

    train_subsampler = torch.utils.data.SubsetRandomSampler(train_ids)
    valid_subsampler = torch.utils.data.SubsetRandomSampler(valid_ids)
    train_loader = DataLoader(TensorDataset(X_tr_tensor, y_tr_tensor), batch_size=batch_size, sampler=train_subsampler)
    valid_loader = DataLoader(TensorDataset(X_tr_tensor, y_tr_tensor), batch_size=batch_size, sampler=valid_subsampler)
    
    for epoch in range(num_epochs): 
            for i, (inputs, targets) in enumerate(train_loader):
                inputs = inputs.to(device)
                targets = targets.to(device)
                
                outputs = model(inputs)
                
                loss = criterion(outputs.ravel(), targets)
                
                loss.backward()
                optimizer.step()
                optimizer.zero_grad()

                if (i+1) % num_total_steps-1 == 0:
                    print (f'Epoch [{epoch+1}/{num_epochs}], Step [{i+1}/{num_total_steps}], Loss: {loss.item():.4f}')

    correct, total = 0, 0
    with torch.no_grad():
        for inputs, targets in train_loader:
            inputs = inputs.to(device)
            targets = targets.to(device)
            
            outputs = model(inputs)
            
            predicted = torch.where(outputs > 0.5, 1, 0).T.to(device)
            
            total += targets.size(0)
            correct += (predicted == targets).sum().item()

    train_accuracies.append(100 * correct / total)
    print(f'Train Accuracy: {100 * correct / total:.2f}%')
    print('--------------------------------')

    correct, total = 0, 0
    with torch.no_grad():
        for inputs, targets in valid_loader:
            inputs = inputs.to(device)
            targets = targets.to(device)
            
            outputs = model(inputs)
            
            predicted = torch.where(outputs > 0.5, 1, 0).T.to(device)
            
            total += targets.size(0)
            correct += (predicted == targets).sum().item()

    valid_accuracies.append(100 * correct / total)
    print(f'Validation Accuracy: {100 * correct / total:.2f}%')
    print('--------------------------------')

print('\nOveral Results')
print(f'Average Training Accurcy is : {np.mean(train_accuracies)}')
print(f'Average Validation Accurcy is : {np.mean(valid_accuracies)}')

FOLD 0
--------------------------------
Epoch [1/10], Step [1/55], Loss: 0.1570
Epoch [2/10], Step [1/55], Loss: 0.1117
Epoch [3/10], Step [1/55], Loss: 0.1628
Epoch [4/10], Step [1/55], Loss: 0.1902
Epoch [5/10], Step [1/55], Loss: 0.1396
Epoch [6/10], Step [1/55], Loss: 0.1234
Epoch [7/10], Step [1/55], Loss: 0.1807
Epoch [8/10], Step [1/55], Loss: 0.1752
Epoch [9/10], Step [1/55], Loss: 0.1572
Epoch [10/10], Step [1/55], Loss: 0.1462
Train Accuracy: 70.91%
--------------------------------
Validation Accuracy: 71.82%
--------------------------------
FOLD 1
--------------------------------
Epoch [1/10], Step [1/55], Loss: 0.1577
Epoch [2/10], Step [1/55], Loss: 0.1655
Epoch [3/10], Step [1/55], Loss: 0.1982
Epoch [4/10], Step [1/55], Loss: 0.1837
Epoch [5/10], Step [1/55], Loss: 0.1499
Epoch [6/10], Step [1/55], Loss: 0.1825
Epoch [7/10], Step [1/55], Loss: 0.2034
Epoch [8/10], Step [1/55], Loss: 0.1929
Epoch [9/10], Step [1/55], Loss: 0.1219
Epoch [10/10], Step [1/55], Loss: 0.1700
T

#### Predict Test Labels

In [69]:
test_data_tensor = torch.tensor(np.float32(X_te)).to(device)

model.eval()

with torch.no_grad():
    outputs = model(test_data_tensor)
        
predicted = torch.where(outputs > 0.5, 1, 0).T
np.save('test_labels_RBF_PSO', predicted.cpu())
predicted

tensor([[0, 1, 1, 0, 0, 0, 0, 0, 1, 1, 1, 1, 0, 0, 1, 1, 1, 1, 0, 1, 0, 1, 1, 0,
         0, 1, 0, 1, 1, 1, 0, 1, 1, 0, 0, 1, 0, 1, 0, 1, 0, 1, 1, 0, 0, 0, 0, 0,
         0, 1, 0, 0, 1, 1, 0, 0, 1, 0, 1, 1, 1, 0, 1, 1, 0, 0, 0, 0, 0, 0, 1, 0,
         0, 0, 0, 0, 0, 0, 0, 1, 0, 1, 0, 1, 1, 0, 0, 0, 0, 0, 0, 1, 0, 0, 1, 0,
         0, 1, 1, 0, 0, 1, 0, 1, 1, 1, 0, 0, 1, 1, 1, 0, 0, 1, 0, 0, 0, 0, 0, 0,
         0, 0, 1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 1, 1, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0,
         1, 0, 0, 1, 0, 1, 1, 0, 0, 0, 1, 0, 0, 1, 1]], device='cuda:0')