In [1]:

import os
import math
import pandas as pd
import numpy as np
from pathlib import Path
import shutil
import argparse
from typing import List, Union
import matplotlib.pyplot as plt
import yaml
from datetime import datetime


# import torchonn as onn
# from torchonn.models import ONNBaseModel
# from torchonn.op.mzi_op import project_matrix_to_unitary


import torch
from torch import Tensor, nn
from torch.types import Device, _size
from torch.nn.parameter import Parameter, UninitializedParameter
from torch.nn import init
from torch.utils.data import Dataset
from torch.utils.data import ConcatDataset
from torch.utils.data import DataLoader
from torch.utils.tensorboard import SummaryWriter
# from torchonn.layers import MZILinear
# from torchonn.models import ONNBaseModel
from collections import OrderedDict

from sklearn.preprocessing import StandardScaler
# from sklearn.model_selection import train_test_split

from configs.config import configs


# ### Initilization


# Init logging
import logging

2024-04-30 12:37:48.753061: I tensorflow/core/util/port.cc:110] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2024-04-30 12:37:48.773968: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 AVX512F AVX512_VNNI AVX512_BF16 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [2]:
logger = logging.getLogger(__name__)  # Use the current module's name
logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
# formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
# handler.setFormatter(formatter)
logger.addHandler(handler)
acc_example = 0.95  # Replace with your actual accuracy calculation
logger.info(f"test accuracy: {acc_example}")  # Log as info
# logger.debug("Current accuracy: %.2f", accuracy)  # Log as info

test accuracy: 0.95


In [3]:
parser = argparse.ArgumentParser()
parser.add_argument("config_file", metavar="FILE", help="config file")
# parser.add_argument('--run-dir', metavar='DIR', help='run directory')
# parser.add_argument('--pdb', action='store_true', help='pdb')
args = parser.parse_args(args=['configs/eeg_torch.yml'])
with open(args.config_file, 'r') as file:
    Configs = yaml.safe_load(file)
    

In [4]:
class customDataset(Dataset):
    def __init__(self, data_dir:str, label_dir:str, label_dict:dict, transform=None):
#         self.annotations = pd.read_csv(label_dir)
        self.data_dir = data_dir   # './data/origin_csv/train'
        self.label_dir = label_dir
        self.transform = transform
        self.files = os.listdir(self.data_dir)
        self.annotations = pd.read_csv(self.label_dir)
        self.label_dict = label_dict
        
    def __len__(self):
        return len(self.files)
    
    def __getitem__(self, index):
        data_path = os.path.join(self.data_dir, self.files[index])
        data = pd.read_csv(data_path)
        data = torch.tensor(data.values, dtype=torch.float32)
        file_name = self.files[index]
        
        label = torch.tensor(int(self.label_dict[self.annotations.iloc[index,1]]))
        
        if self.transform:
            data = self.transform(data)
            
        return (data.t(), label, file_name)


In [5]:
### Define operation in auto-encoder
class Mat_mul(nn.Module):
    __constants__ = ['in_features', 'out_features']
    in_features: int
    out_features: int
    weight: Tensor

    def __init__(self, in_features: int, out_features: int, bias: bool = True,
                 device=None, dtype=None) -> None:
        factory_kwargs = {'device': device, 'dtype': dtype}
        super().__init__()
        self.in_features = in_features
        self.out_features = out_features
#         self.weight = Parameter(torch.empty((out_features, in_features), **factory_kwargs))
        self.weight = Parameter(torch.empty((in_features, out_features), **factory_kwargs))
        if bias:
            self.bias = Parameter(torch.empty(out_features, **factory_kwargs))
        else:
            self.register_parameter('bias', None)
        self.reset_parameters()

    def reset_parameters(self) -> None:
        # Setting a=sqrt(5) in kaiming_uniform is the same as initializing with
        # uniform(-1/sqrt(in_features), 1/sqrt(in_features)). For details, see
        # https://github.com/pytorch/pytorch/issues/57109
        init.kaiming_uniform_(self.weight, a=math.sqrt(5))
        if self.bias is not None:
            fan_in, _ = init._calculate_fan_in_and_fan_out(self.weight)
            bound = 1 / math.sqrt(fan_in) if fan_in > 0 else 0
            init.uniform_(self.bias, -bound, bound)

    def forward(self, input: Tensor) -> Tensor:
        return input @ self.weight + self.bias
#         return torch.mul(input, self.weight, self.bias)
    
### Define auto-encoder    
class AutoEncoder(nn.Module):
    def __init__(self, input_size, hidden_size):
        super(AutoEncoder, self).__init__()
        self.encoder = nn.Sequential(
            Mat_mul(input_size, hidden_size),
            nn.ReLU()
        )
        self.encoder_2 = nn.Sequential(
            Mat_mul(int(input_size/2), hidden_size),
            nn.ReLU()
        )
        
        self.decoder = nn.Sequential(
            Mat_mul(hidden_size, input_size),
            nn.ReLU()
        )
        self.decoder_2 = nn.Sequential(
            Mat_mul(int(input_size/2), input_size),
            nn.ReLU()
        )

    def forward(self, x):
        z = self.encoder(x)
#         z = self.encoder_2(z)
        x_hat = self.decoder(z)
#         x_hat = self.decoder_2(x_hat)
        return z


### Define transformer_classifier
class transformer_classifier(nn.Module):
    def __init__(self, input_size:int, n_channels:int, model_hyp:dict, classes:int):
        super(transformer_classifier, self).__init__()
        self.ae = AutoEncoder(input_size=input_size, hidden_size=model_hyp['d_model'])  
        self.encoder_layer = nn.TransformerEncoderLayer(d_model=model_hyp['d_model'],
                                                        nhead=model_hyp['n_head'])
        self.transformer_encoder = nn.TransformerEncoder(self.encoder_layer, num_layers=model_hyp['n_layer'])
        self.flatten = nn.Flatten()
        self.linear = nn.Linear(model_hyp['d_model']*n_channels, classes)

    def forward(self, x):
#         z = self.ae(x)
#         logger.debug(f"au output size: %{z.shape}")
        z = self.transformer_encoder(x)
        logger.debug(f"transformer output size: %{z.shape}")
        z = self.flatten(z)
        logger.debug(f"flatten output size: %{z.shape}")
        y = self.linear(z)
        return y

In [6]:
train_data_dir = Configs['dataset']['train_data_dir']
train_label_dir = Configs['dataset']['train_label_dir']

val_data_dir = Configs['dataset']['val_data_dir']
val_label_dir = Configs['dataset']['val_label_dir']

label_dict = Configs['dataset']['classes']
train_dataset = customDataset(data_dir=train_data_dir,
                              label_dir=train_label_dir,
                              label_dict=label_dict)
val_dataset = customDataset(data_dir=val_data_dir,
                            label_dir=val_label_dir,
                            label_dict=label_dict)

train_loader = DataLoader(dataset=train_dataset, batch_size=Configs['train']['batch_size'],
                              shuffle=True, num_workers=16, pin_memory=True)

In [7]:
classifier = transformer_classifier(input_size=Configs['input_size'], 
                                        n_channels = Configs['n_channels'],
                                        model_hyp=Configs['model'],
                                        classes=len(Configs['dataset']['classes'])).to('cuda')

In [8]:
optimizer = torch.optim.Adam(classifier.parameters(),betas=(0.9,0.9),lr=Configs['optimizer']['init_lr'])
criterion = nn.CrossEntropyLoss()
writer = SummaryWriter(Configs['tensorboard']['runs_dir']+'visulizer')    # Initilize tensorflow

dataiter = iter(train_loader)
signal, labels,_ = next(dataiter)
signal = signal.to('cuda')

In [9]:
writer.add_graph(classifier, signal)
writer.close()