In [1]:
import os
import math
import pandas as pd
import numpy as np
from pathlib import Path
import shutil
from typing import List, Union
import matplotlib.pyplot as plt

# import torchonn as onn
# from torchonn.models import ONNBaseModel
# from torchonn.op.mzi_op import project_matrix_to_unitary


import torch
from torch import Tensor, nn
from torch.types import Device, _size
from torch.nn.parameter import Parameter, UninitializedParameter
from torch.nn import init
from torch.utils.data import Dataset
from torch.utils.data import ConcatDataset
from torch.utils.data import DataLoader
# from torchonn.layers import MZILinear
# from torchonn.models import ONNBaseModel
from collections import OrderedDict

from sklearn.preprocessing import StandardScaler
# from sklearn.model_selection import train_test_split


### Initilization

In [2]:
# Init logging
import logging

logger = logging.getLogger(__name__)  # Use the current module's name
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler()
# formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
# handler.setFormatter(formatter)
logger.addHandler(handler)
acc_example = 0.95  # Replace with your actual accuracy calculation
logger.info(f"Current accuracy: %{acc_example}")  # Log as info
# logger.debug("Current accuracy: %.2f", accuracy)  # Log as info

Current accuracy: %0.95


## Auto-Encoder

#### Load raw data

In [3]:
class customDataset(Dataset):
    def __init__(self, data_dir, label_dir, transform=None):
#         self.annotations = pd.read_csv(label_dir)
        self.data_dir = data_dir   # './data/origin_csv/train'
        self.transform = transform
        self.files = os.listdir(self.data_dir)
        self.annotations = pd.read_csv(label_dir)
        
    def __len__(self):
        return len(self.files)
    
    def __getitem__(self, index):
        data_path = os.path.join(self.data_dir, self.files[index])
        data = pd.read_csv(data_path)
        data = torch.tensor(data.values, dtype=torch.float32)
        file_name = self.files[index]
        
        label = torch.tensor(int(label_dic[self.annotations.iloc[index,1]]))
        
        if self.transform:
            data = self.transform(data)
            
        return (data.t(), label, file_name)

In [4]:
train_label_dir = './data/train_label.csv'
train_data_dir = './data/origin_csv/train/'

eval_label_dir = './data/eval_label.csv'
eval_data_dir = './data/origin_csv/eval/'

label_dic = {'normal':0, 'abnormal':1}

    
# transform = transforms.Compose([
#     transforms.MinMaxScaler(feature_range=(0, 1)),
#     transforms.ToTensor(),
# ])

train_dataset = customDataset(data_dir=train_data_dir, label_dir=train_label_dir)
eval_dataset = customDataset(data_dir=eval_data_dir, label_dir=eval_label_dir)
combined_dataset = ConcatDataset([train_dataset, eval_dataset])

#### Define auto-encoder model

In [5]:


# define

class Mat_mul(nn.Module):
    r"""Applies a linear transformation to the incoming data: :math:`y = xA^T + b`

    This module supports :ref:`TensorFloat32<tf32_on_ampere>`.

    On certain ROCm devices, when using float16 inputs this module will use :ref:`different precision<fp16_on_mi200>` for backward.

    Args:
        in_features: size of each input sample
        out_features: size of each output sample
        bias: If set to ``False``, the layer will not learn an additive bias.
            Default: ``True``

    Shape:
        - Input: :math:`(*, H_{in})` where :math:`*` means any number of
          dimensions including none and :math:`H_{in} = \text{in\_features}`.
        - Output: :math:`(*, H_{out})` where all but the last dimension
          are the same shape as the input and :math:`H_{out} = \text{out\_features}`.

    Attributes:
        weight: the learnable weights of the module of shape
            :math:`(\text{out\_features}, \text{in\_features})`. The values are
            initialized from :math:`\mathcal{U}(-\sqrt{k}, \sqrt{k})`, where
            :math:`k = \frac{1}{\text{in\_features}}`
        bias:   the learnable bias of the module of shape :math:`(\text{out\_features})`.
                If :attr:`bias` is ``True``, the values are initialized from
                :math:`\mathcal{U}(-\sqrt{k}, \sqrt{k})` where
                :math:`k = \frac{1}{\text{in\_features}}`

    Examples::

        >>> m = nn.Linear(20, 30)
        >>> input = torch.randn(128, 20)
        >>> output = m(input)
        >>> print(output.size())
        torch.Size([128, 30])
    """
    __constants__ = ['in_features', 'out_features']
    in_features: int
    out_features: int
    weight: Tensor

    def __init__(self, in_features: int, out_features: int, bias: bool = True,
                 device=None, dtype=None) -> None:
        factory_kwargs = {'device': device, 'dtype': dtype}
        super().__init__()
        self.in_features = in_features
        self.out_features = out_features
#         self.weight = Parameter(torch.empty((out_features, in_features), **factory_kwargs))
        self.weight = Parameter(torch.empty((in_features, out_features), **factory_kwargs))
        if bias:
            self.bias = Parameter(torch.empty(out_features, **factory_kwargs))
        else:
            self.register_parameter('bias', None)
        self.reset_parameters()

    def reset_parameters(self) -> None:
        # Setting a=sqrt(5) in kaiming_uniform is the same as initializing with
        # uniform(-1/sqrt(in_features), 1/sqrt(in_features)). For details, see
        # https://github.com/pytorch/pytorch/issues/57109
        init.kaiming_uniform_(self.weight, a=math.sqrt(5))
        if self.bias is not None:
            fan_in, _ = init._calculate_fan_in_and_fan_out(self.weight)
            bound = 1 / math.sqrt(fan_in) if fan_in > 0 else 0
            init.uniform_(self.bias, -bound, bound)

    def forward(self, input: Tensor) -> Tensor:
        return input @ self.weight + self.bias
#         return torch.mul(input, self.weight, self.bias)
    
    
class AutoEncoder(nn.Module):
    def __init__(self, input_size, hidden_size):
        super(AutoEncoder, self).__init__()
        self.encoder = nn.Sequential(
            Mat_mul(input_size, hidden_size),
            nn.ReLU()
        )
        self.encoder_2 = nn.Sequential(
            Mat_mul(int(input_size/2), hidden_size),
            nn.ReLU()
        )
        
        self.decoder = nn.Sequential(
            Mat_mul(hidden_size, input_size),
            nn.ReLU()
        )
        self.decoder_2 = nn.Sequential(
            Mat_mul(int(input_size/2), input_size),
            nn.ReLU()
        )

    def forward(self, x):
        z = self.encoder(x)
#         z = self.encoder_2(z)
        x_hat = self.decoder(z)
#         x_hat = self.decoder_2(x_hat)
        return x_hat


# x = torch.randn(100, 10)

# define ae model
ae = AutoEncoder(input_size=1000, hidden_size=256).to('cuda')


# use ae encoder
# z = ae.encoder(x)

# print(z.shape)

In [6]:
init_lr = 1e-4
epochs = 100
batch_size = 4096
step = 0

dataloader = DataLoader(dataset=combined_dataset, batch_size=batch_size, \
                                  shuffle=True)
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(ae.parameters(),betas=(0.9,0.9),lr=init_lr)

def poly_lr_scheduler(optimizer, init_lr, iter, lr_decay_iter=1,
                      max_iter=0, power=0.9):
    """Polynomial decay of learning rate
        :param init_lr is base learning rate
        :param iter is a current iteration
        :param lr_decay_iter how frequently decay occurs, default is 1
        :param max_iter is number of maximum iterations
        :param power is a polymomial power
    """
    if max_iter == 0:
        raise Exception("MAX ITERATION CANNOT BE ZERO!")
    if iter % lr_decay_iter or iter > max_iter:
        return optimizer
    lr = init_lr * (1 - iter / max_iter) ** power
    logger.debug(f'lr=: {lr}')
    for param_group in optimizer.param_groups:
        param_group['lr'] = lr
    return lr

## Train auto-encoder

In [7]:
model_params = {
    'model_state_dict': ae.state_dict(),
    'optimizer_state_dict': optimizer.state_dict(),
    'learning_rate': optimizer.param_groups[0]['lr']
}
min_loss = 0.5

for epoch in range(epochs):
    
    poly_lr_scheduler(optimizer, init_lr=init_lr, iter=epoch, max_iter=epochs)
    for batch_index, (data,_,_) in enumerate(dataloader, 0):
        data = data.to('cuda')
#         data = data.to('cuda')
        x_hat = ae(data)
#         logger.debug("x_hat, shape=%", x_hat.shape)
        loss = criterion(x_hat, data)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    logger.info(f'epoch={epoch}, loss={loss}')
    torch.save(model_params, './weights/model_params_latest.pth')
    if min_loss > loss:
        torch.save(model_params, './weights/model_params_best.pth')
        min_loss = loss

lr=: 0.0001
epoch=0, loss=0.11845755577087402
lr=: 9.909954834128343e-05
epoch=1, loss=0.11250562220811844
lr=: 9.819818665965754e-05
epoch=2, loss=0.11015499383211136
lr=: 9.729590473501306e-05
epoch=3, loss=0.10887461155653
lr=: 9.63926921258551e-05
epoch=4, loss=0.10820388048887253
lr=: 9.548853816214998e-05
epoch=5, loss=0.10739194601774216
lr=: 9.458343193786322e-05
epoch=6, loss=0.10710445791482925
lr=: 9.367736230317176e-05
epoch=7, loss=0.1076795905828476
lr=: 9.277031785633283e-05
epoch=8, loss=0.10783097892999649
lr=: 9.186228693518995e-05
epoch=9, loss=0.10684892535209656
lr=: 9.095325760829622e-05
epoch=10, loss=0.10539495199918747
lr=: 9.004321766563289e-05
epoch=11, loss=0.10772541165351868
lr=: 8.91321546089e-05
epoch=12, loss=0.10658790916204453
lr=: 8.822005564135439e-05
epoch=13, loss=0.10598741471767426
lr=: 8.73069076571686e-05
epoch=14, loss=0.10725656151771545
lr=: 8.639269723028191e-05
epoch=15, loss=0.10709816217422485
lr=: 8.547741060271343e-05
epoch=16, loss=0

## Save auto-encoder

## Load auto-encoder

## auto-encoder inference for transformer

In [None]:
def ae_infer(data_path:str, result_path:str, label_dir:str):
    
    if os.path.exists(result_path):
        shutil.rmtree(result_path)
    os.mkdir(result_path)
    enc_dataset = customDataset(data_dir=str(data_path), label_dir=label_dir)
    # Define column names (optional, but recommended)
    channels = ['Fp1', 'Fp2', 'F3','F4', 'C3', 'C4', 'P3', 'P4', 'O1', 'O2', 'F7', 'F8', 'T3', 'T4',
                'T5', 'T6', 'Fz', 'Cz', 'Pz']
    ae_inference  = AutoEncoder(input_size=1000, hidden_size=256).to('cuda')
    ae_inference.load_state_dict(torch.load('./weights/ae_model_weights.pth'))
    ae_inference.eval()
    
    for batch_index, (data,label,file_name) in enumerate(enc_dataset, 0):
        data = data.to('cuda')
        z = ae_inference.encoder(data)   # 19*500
    #     z = ae_inference.encoder_2(z)   # 19*256
    #     logger.debug(file_name)  # Log as info
    #     logger.debug(z.shape)  # Log as info
        z = z.t().cpu()
        z = z.detach().numpy()
        
        df = pd.DataFrame(z, columns=channels)

        # Save as CSV file
        df.to_csv(result_path+file_name, index=False) 

#  training data encoding
train_label_dir = './data/train_label.csv'
train_data_dir = './data/origin_csv/train/'
train_result_dir = './data/encodered_csv/train/'
ae_infer(train_data_dir, train_result_dir, train_label_dir)

#  evaluation data encoding
eval_label_dir = './data/eval_label.csv'
eval_data_dir = './data/origin_csv/eval/'
eval_result_dir = './data/encodered_csv/eval/'
ae_infer(eval_data_dir, eval_result_dir, eval_label_dir)

#### Train transformer

### Normalize dataset

### Positional encoding

### raw data to tensor

In [None]:
# X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.30, random_state=42)

X_train_1 = X_train[:, 0:X_train.shape[1]//2]
X_train_2 = X_train[:, X_train.shape[1]//2:]


X_test_1 = X_test[:, 0:X_test.shape[1]//2]
X_test_2 = X_test[:, X_test.shape[1]//2:]

y_train = train_labels
y_test = val_labels

X_train_1 = torch.FloatTensor(X_train_1).to('cuda')
X_train_2 = torch.FloatTensor(X_train_2).to('cuda')
X_test_1 = torch.FloatTensor(X_test_1).to('cuda')
X_test_2 = torch.FloatTensor(X_test_2).to('cuda')

y_train = torch.LongTensor(y_train).to('cuda')
y_test = torch.LongTensor(y_test).to('cuda')

input_dim = X_train_1.shape[1]
output_dim = 2
logger.debug(f'X_train_1.shape: {X_train_1.shape}, X_train_2.shape: {X_train_2.shape}, y_train.shape: {y_train.shape},X_test_1.shape: {X_test_1.shape}, X_test_2.shape: {X_test_2.shape}, y_test.shape: {y_test.shape}')

### Build Optimizer and lr

### define  training

### train

In [None]:
num_epochs = 500
train_losses = np.zeros(num_epochs)
test_losses  = np.zeros(num_epochs)

train_network(model,optimizer,criterion,
              X_train_1,X_train_2,y_train,
              X_test_1,X_test_2,y_test,
              num_epochs,train_losses,test_losses)

In [None]:
plt.figure(figsize=(10,10))
plt.plot(train_losses, label='train loss')
plt.plot(test_losses, label='test loss')
plt.legend()
plt.show()

In [None]:
predictions_train = []
predictions_test =  []
with torch.no_grad():
    predictions_train = model(X_train_1, X_train_2)
    predictions_test = model(X_test_1, X_test_2)

In [None]:
def get_accuracy_multiclass(pred_arr,original_arr):
    if len(pred_arr)!=len(original_arr):
        return False
    pred_arr = pred_arr.numpy()
    original_arr = original_arr.numpy()
    final_pred= []
    # we will get something like this in the pred_arr [32.1680,12.9350,-58.4877]
    # so will be taking the index of that argument which has the highest value here 32.1680 which corresponds to 0th index
    for i in range(len(pred_arr)):
        final_pred.append(np.argmax(pred_arr[i]))
    final_pred = np.array(final_pred)
    count = 0
    #here we are doing a simple comparison between the predicted_arr and the original_arr to get the final accuracy
    for i in range(len(original_arr)):
        if final_pred[i] == original_arr[i]:
            count+=1
    return count/len(final_pred)

In [None]:
train_acc = get_accuracy_multiclass(predictions_train.cpu(),y_train.cpu())
test_acc  = get_accuracy_multiclass(predictions_test.cpu(),y_test.cpu())

In [None]:
logger.info(f"Training Accuracy: {round(train_acc*100,3)}")
logger.info(f"Test Accuracy: {round(test_acc*100,3)}")

### Evolutionary algorithm