In [1]:
from image_creation import *

def bin_signal(x, bin_size=10):
    # x shape: (7500,)
    return x.reshape(-1, bin_size).sum(axis=1)




In [7]:
19860000 / 7500

2648.0

In [8]:
import os 

path = "/scratch/Arya/Processed_MAYO/"

# iterate over i_data.npy files and bin them and create images then save them in the same folder under then name i_data_images.npy


for root, dirs, files in os.walk(path):
    # the files should have format 3_data.npy or 4_data.npy 
    for file in files:
        if file.endswith("_data.npy"):
            file_path = os.path.join(root, file)
            data = np.load(file_path)  # shape (1, a fuck ton)

            length = data.shape[-1] // 7500 * 7500

            data = data.reshape(-1, 7500)

            binned_data = np.array([bin_signal(x) for x in data])  # shape (N, 750)
            
            images = [recurrence_image(data) for data in binned_data]  # shape (N, 28, 28)
            save_path = os.path.join(root, file.replace("_data.npy", "_data_images.npy"))
            np.save(save_path, images)
            print(f"Processed and saved images to {save_path}")


Processed and saved images to /scratch/Arya/Processed_MAYO/0_data_images.npy
Processed and saved images to /scratch/Arya/Processed_MAYO/1_data_images.npy
Processed and saved images to /scratch/Arya/Processed_MAYO/2_data_images.npy
Processed and saved images to /scratch/Arya/Processed_MAYO/3_data_images.npy
Processed and saved images to /scratch/Arya/Processed_MAYO/4_data_images.npy
Processed and saved images to /scratch/Arya/Processed_MAYO/5_data_images.npy
Processed and saved images to /scratch/Arya/Processed_MAYO/6_data_images.npy
Processed and saved images to /scratch/Arya/Processed_MAYO/7_data_images.npy
Processed and saved images to /scratch/Arya/Processed_MAYO/14_data_images.npy
Processed and saved images to /scratch/Arya/Processed_MAYO/8_data_images.npy
Processed and saved images to /scratch/Arya/Processed_MAYO/9_data_images.npy
Processed and saved images to /scratch/Arya/Processed_MAYO/16_data_images.npy
Processed and saved images to /scratch/Arya/Processed_MAYO/17_data_images.

In [13]:

import torch.nn as nn
import torch
import numpy as np


class HardSigmoid(nn.Module):
    """
    Hardware-friendly sigmoid approximation using ReLU + clip.
    Forward: y = clip(alpha * x + 0.5, 0, 1)
    """
    def __init__(self, alpha=1.0):
        super().__init__()
        self.alpha = alpha

    def forward(self, x):
        return torch.clamp(torch.relu(self.alpha * x + 0.5), 0.0, 1.0)

class SpatialAttention(nn.Module):

    def __init__(self, kernel_size=7):
        super(SpatialAttention, self).__init__()
        assert kernel_size in (3, 7), 'kernel size must be 3 or 7'
        padding = 3 if kernel_size == 7 else 1
        self.conv1 = nn.Conv2d(2, 1, kernel_size, padding=padding, bias=False)
        # self.sigmoid = nn.Sigmoid()
        self.sigmoid = HardSigmoid(alpha=1.0)

    def forward(self, x):
        avg_out = torch.mean(x, dim=1, keepdim=True)
        max_out, _ = torch.max(x, dim=1, keepdim=True)
        x = torch.cat([avg_out, max_out], dim=1)
        x = self.conv1(x)
        return self.sigmoid(x)

class ChannelAttention(nn.Module):

    def __init__(self, in_planes, ratio=4):
        super(ChannelAttention, self).__init__()
        self.avg_pool = nn.AdaptiveAvgPool2d(1)
        self.max_pool = nn.AdaptiveMaxPool2d(1)
        self.fc1 = nn.Conv2d(in_planes, in_planes // ratio, 1, bias=False)
        self.relu1 = nn.ReLU()
        self.fc2 = nn.Conv2d(in_planes // ratio, in_planes, 1, bias=False)
        # self.sigmoid = nn.Sigmoid()
        self.sigmoid = HardSigmoid(alpha=1.0)

    def forward(self, x):
        avg_out = self.fc2(self.relu1(self.fc1(self.avg_pool(x))))
        max_out = self.fc2(self.relu1(self.fc1(self.max_pool(x))))
        out = avg_out + max_out
        return self.sigmoid(out)

class cbam_block(nn.Module):

    def __init__(self, channel, ratio=4, kernel_size=3):
        super(cbam_block, self).__init__()
        self.channelattention = ChannelAttention(channel, ratio=ratio)
        self.spatialattention = SpatialAttention(kernel_size=kernel_size)

    def forward(self, x):
        x = x * self.channelattention(x)
        x = x * self.spatialattention(x)
        return x

def channel_shuffle(x, groups):

    # input shape: [batch_size, channels, H, W]
    batch, channels, height, width = x.size()
    channels_per_group = channels // groups
    x = x.view(batch, groups, channels_per_group, height, width)
    x = torch.transpose(x, 1, 2).contiguous()
    x = x.view(batch, channels, height, width)
    return x

class ChannelShuffle(nn.Module):

    def __init__(self, channels, groups):
        super(ChannelShuffle, self).__init__()
        if channels % groups != 0:
            raise ValueError("The number of channels must be divisible by the number of groups.")
        self.groups = groups

    def forward(self, x):
        return channel_shuffle(x, self.groups)

def Computing_mean(x, mask):

    mask = torch.count_nonzero(mask, dim=2)
    mask = torch.unsqueeze(mask, dim=2)
    x = x.sum(dim=2, keepdim=True)
    x = x / mask
    return x

class CNN(nn.Module):

    def __init__(self, F1: int, classes_num: int, D: int = 2):

        super(CNN, self).__init__()
        self.drop_out = 0.25

        self.att = cbam_block(D * F1)
        self.block_1 = nn.Sequential(
            nn.ZeroPad2d((7, 7, 0, 0)),
            nn.Conv2d(
                in_channels=1,
                out_channels=F1,
                kernel_size=(1, 16),
                stride=(1, 2),
                bias=False
            ),
            nn.BatchNorm2d(F1),
            nn.ReLU(inplace=True),
            nn.AvgPool2d((1, 8))
        )
        self.block_2 = nn.Sequential(
            nn.ZeroPad2d((7, 7, 0, 0)),
            nn.Conv2d(
                in_channels=F1,
                out_channels=F1,
                kernel_size=(1, 16),
                stride=(1, 2),
                bias=False,
                groups=F1
            ),
            nn.Conv2d(
                in_channels=F1,
                out_channels=D * F1,
                kernel_size=(1, 1),
                stride=(1, 1),
                bias=False
            ),
            nn.BatchNorm2d(D * F1),
            nn.ReLU(inplace=True)
        )
        self.block_3 = nn.Sequential(
            nn.Conv2d(
                in_channels=D * F1,
                out_channels=D * F1,
                kernel_size=(3, 1),
                stride=(1, 1),
                groups=D * F1,
                bias=False
            ),
            nn.Conv2d(
                in_channels=D * F1,
                out_channels=D * D * F1,
                kernel_size=(1, 1),
                stride=(1, 1),
                groups=4,
                bias=False
            ),
            nn.BatchNorm2d(D * D * F1),
            nn.ReLU(inplace=True),
            ChannelShuffle(D * D * F1, 4),
        )
        self.block_4 = nn.Sequential(
            nn.ZeroPad2d((4, 3, 0, 0)),
            nn.Conv2d(
                in_channels=D * D * F1,
                out_channels=D * D * F1,
                kernel_size=(1, 8),
                stride=(1, 1),
                groups=D * D * F1,
                bias=False
            ),
            nn.BatchNorm2d(D * D * F1),
            nn.Conv2d(
                in_channels=D * D * F1,
                out_channels=D * D * D * F1,
                kernel_size=(1, 1),
                stride=(1, 1),
                groups=4,
                bias=False
            ),
            nn.BatchNorm2d(D * D * D * F1),
            nn.ReLU(inplace=True),
            nn.AvgPool2d((1, 16))
        )

        self.classifier = nn.Sequential(
            nn.Dropout(self.drop_out),
            nn.LazyLinear(classes_num)
        )

    def forward(self, x, ssp = None):

        x = x.unsqueeze(1) # (Batch, 1, 128, 512)

        mask = torch.abs(x).sum(dim=3, keepdim=True)
        mask = (mask > 0).type(torch.float)

        print(x.shape)

        x = self.block_1(x)
        print(x.shape)
        x = self.block_2(x)
        print(x.shape)

        x = x * mask
        x1 = Computing_mean(x, mask)
        x2 = torch.norm(x, p=2, dim=2, keepdim=True)
        x3 = torch.norm(x, p=np.inf, dim=2, keepdim=True)

        x = torch.cat([x1, x2, x3], 2)
        x = self.att(x)
        x = self.block_3(x)
        x = self.block_4(x)
        x = x.view(x.shape[0], -1)

        x = self.classifier(x)

        return x
    

model = CNN(F1=16, classes_num=2, D=2)
A = torch.randn(256, 128, 512)
model(A).shape

torch.Size([256, 1, 128, 512])
torch.Size([256, 16, 128, 32])
torch.Size([256, 32, 128, 16])


torch.Size([256, 2])

In [14]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import numpy as np
import matplotlib.pyplot as plt
from torchmetrics import Accuracy

In [34]:
fs= 256                  #sampling frequency
channel= 128              #number of electrode
num_input= 1             #number of channel picture (for EEG signal is always : 1)
num_class= 2             #number of classes 
signal_length = 512      #number of sample in each tarial

In [35]:
device= 'cpu'
kernel_size_1= (1,round(fs/2)) 
kernel_size_2= (channel, 1)
kernel_size_3= (1, round(fs/8))
kernel_size_4= (1, 1)

kernel_avgpool_1= (1,4)
kernel_avgpool_2= (1,8)
dropout_rate= 0.2

ks0= int(round((kernel_size_1[0]-1)/2))
ks1= int(round((kernel_size_1[1]-1)/2))
kernel_padding_1= (ks0, ks1-1)
ks0= int(round((kernel_size_3[0]-1)/2))
ks1= int(round((kernel_size_3[1]-1)/2))
kernel_padding_3= (ks0, ks1)

In [36]:
class EEGNet(nn.Module): 
    def __init__(self, F1, D):
        super().__init__()
        F2= F1*D
        # layer 1
        self.conv2d = nn.Conv2d(num_input, F1, kernel_size_1, padding=kernel_padding_1)
        self.Batch_normalization_1 = nn.BatchNorm2d(F1)
        # layer 2
        self.Depthwise_conv2D = nn.Conv2d(F1, D*F1, kernel_size_2, groups= F1)
        self.Batch_normalization_2 = nn.BatchNorm2d(D*F1)
        self.Elu = nn.ELU()
        self.Average_pooling2D_1 = nn.AvgPool2d(kernel_avgpool_1)
        self.Dropout = nn.Dropout2d(dropout_rate)
        # layer 3
        self.Separable_conv2D_depth = nn.Conv2d( D*F1, D*F1, kernel_size_3,
                                                padding=kernel_padding_3, groups= D*F1)
        self.Separable_conv2D_point = nn.Conv2d(D*F1, F2, kernel_size_4)
        self.Batch_normalization_3 = nn.BatchNorm2d(F2)
        self.Average_pooling2D_2 = nn.AvgPool2d(kernel_avgpool_2)
        # layer 4
        self.Flatten = nn.Flatten()
        self.Dense = nn.Linear(F2*round(signal_length/32), num_class)
        self.Softmax = nn.Softmax(dim= 1)
        
        
    def forward(self, x):

        y = self.Batch_normalization_1(self.conv2d(x)) 
        print(y.shape)
        y = self.Batch_normalization_2(self.Depthwise_conv2D(y))
        y = self.Elu(y)
        y = self.Dropout(self.Average_pooling2D_1(y))
        y = self.Separable_conv2D_depth(y)
        y = self.Batch_normalization_3(self.Separable_conv2D_point(y))
        y = self.Elu(y)
        y = self.Dropout(self.Average_pooling2D_2(y))
        y = self.Flatten(y)
        y = self.Dense(y)
        y = self.Softmax(y)
        
        return y
    

In [37]:
signal = torch.randn(256, 1, 128, 512)
model = EEGNet(8, 3)

model(signal).shape

# count the number of parameters in the model
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

print(f"Number of parameters in the model: {count_parameters(model)}")


torch.Size([256, 8, 128, 511])
Number of parameters in the model: 6402
