In [24]:
import os
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, TensorDataset
from sklearn.model_selection import train_test_split
import torch.nn.functional as F
from tqdm.notebook import tqdm

# train_folder = "../../../Datasets/dataset_pickle/test/train/"
# eval_folder = "../../../Datasets/dataset_pickle/test/eval/"

# train_folder = "../data/test/train/"
# eval_folder = "../data/test/eval/"


epochs = 10
batch_size = 2
hidden_size = 10
output_size = 1  # Binary classification
number_of_eeg_channels = 8
eval_batch = 1

In [36]:
# Basic Block for ResNet
class BasicBlock(nn.Module):
    expansion = 1

    def __init__(self, in_planes, planes, stride=1):
        super(BasicBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(planes)
        self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(planes)

        self.shortcut = nn.Sequential()
        if stride != 1 or in_planes != self.expansion * planes:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_planes, self.expansion * planes, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm3d(self.expansion * planes)
            )

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out += self.shortcut(x)
        out = F.relu(out)
        return out


# ResNet
class ResNet(nn.Module):
    def __init__(self, input_channels, block, num_blocks, num_classes=1000):
        super(ResNet, self).__init__()
        self.in_planes = 64

        self.conv1 = nn.Conv2d(input_channels, 64, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.layer1 = self._make_layer(block, 64, num_blocks[0], stride=1)
        self.layer2 = self._make_layer(block, 128, num_blocks[1], stride=2)
        self.layer3 = self._make_layer(block, 256, num_blocks[2], stride=2)
        self.layer4 = self._make_layer(block, 512, num_blocks[3], stride=2)
        self.max_pool = nn.AdaptiveMaxPool2d((1, 1))
        self.linear = nn.Linear(512 * block.expansion, num_classes)

    def _make_layer(self, block, planes, num_blocks, stride):
        strides = [stride] + [1] * (num_blocks - 1)
        layers = []
        for stride in strides:
            layers.append(block(self.in_planes, planes, stride))
            self.in_planes = planes * block.expansion
        return nn.Sequential(*layers)

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)
        out = self.layer4(out)
        out = self.max_pool(out)
        out = out.view(out.size(0), -1)
        out = self.linear(out)
        return out


def ResNet18(input_channels):
    return ResNet(input_channels,BasicBlock, [2, 2, 2, 2])

In [51]:
class ResNetEEG18(nn.Module):
    def __init__(self, input_channels):
        super(ResNetEEG18, self).__init__()
        self.resnet18 = ResNet18(input_channels)

        '''
        '''

        
        self.linear = nn.Linear(1000, 1)

    def forward(self, src):
        src = self.resnet18(src)
        src = self.linear(src)
        src = torch.sigmoid(src)
        return src
        

In [55]:
demo_data = torch.rand(1, 3, 12, 60)

resneteegmodel = ResNetEEG18(3)

output = resneteegmodel(demo_data)
output

print(output.size())

torch.Size([1, 1])


In [None]:
# Previous implementation
# Create a simple 1D convolutional neural network model
class ConvEEGModel(nn.Module):
    def __init__(self, input_channels, input_features, output_size):
        super(ConvEEGModel, self).__init__()
        self.conv1 = nn.Sequential(
            nn.utils.weight_norm(nn.Conv1d(input_channels, 32, 3, padding = 2, dilation=1), dim = None),
            nn.ReLU(),
            nn.Dropout(0.2),
        )
        self.conv2 = nn.Sequential(
            nn.utils.weight_norm(nn.Conv1d(32, 16, 3, padding = 4, dilation=2), dim = None),
            nn.ReLU(),
            nn.Dropout(0.2),
        )
        self.residual1 = nn.Conv1d(input_channels, 16, kernel_size = 1, padding = 3)
        self.conv3 = nn.Sequential(
            nn.utils.weight_norm(nn.Conv1d(16, 8, 3, padding = 8, dilation=4), dim = None),
            nn.ReLU(),
            nn.Dropout(0.2),
        )
        self.conv4 = nn.Sequential(
            nn.utils.weight_norm(nn.Conv1d(8, 4, 3, padding = 16, dilation=8), dim = None),
            nn.ReLU(),
            nn.Dropout(0.2),
        )
        self.residual2 = nn.Conv1d(16, 4, kernel_size = 1, padding = 12)
        self.linear = nn.Linear(4, 1)

    def forward(self, src):

        residual = src.clone()
        
        src = self.conv1(src)
        src = self.conv2(src)
        src += self.residual1(residual)

        residual = src.clone()


        src = self.conv3(src)
        src = self.conv4(src)
        src += self.residual2(residual)

        src = src.squeeze(1)
        src = src.permute(0,2,1)
        src = torch.mean(src, dim=1)
        src = self.linear(src)
        src = torch.sigmoid(src)
        return src


        # # print(x.shape)
        # x = self.conv1d(x)
        # # print(x.shape)
        # x = self.relu(x)
        # # print(x.shape)
        # x = self.flatten(x)
        # # print(x.shape)
        # x = self.fc1(x)
        # # print(x.shape)
        # x = self.softmax(x)
        # # print(x.shape)
        # return x

input_channels = train_dataset.__getitem__(0)[0].shape[0]
input_features = train_dataset.__getitem__(0)[0].shape[1]
criterion = nn.CrossEntropyLoss()
loss_fn = torch.nn.BCELoss()
# optimizer = torch.optim.AdamW(params, lr=0.0003)
# optimizer = torch.optim.Adam(params, lr=0.0003, betas=(0.9, 0.98), eps=1e-9)

model = ConvEEGModel(input_channels, input_features, output_size)
optimizer = torch.optim.AdamW(model.parameters(), lr=0.0003)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)


In [45]:
# Create a tensor with some values
x = torch.tensor([1.0, 2.0, 3.0])

# Apply sigmoid activation
sigmoid_output = torch.sigmoid(x)

print(sigmoid_output)

tensor([0.7311, 0.8808, 0.9526])


In [47]:
# Create a simple dummy dataset
demo_data = torch.randn((100, 10, 12, 60))  # 100 samples, 10 channels (RGB), 12x60 images
demo_labels = torch.randint(0, 10, (100,))  # 10 classes

demo_dataset = TensorDataset(demo_data, demo_labels)
demo_dataloader = DataLoader(demo_dataset, batch_size=8, shuffle=True)

# Instantiate the ResNet model
resnet_model = ResNetEEG18(10)

# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(resnet_model.parameters(), lr=0.01, momentum=0.9)

# Training loop on the demo dataset
num_epochs = 5

for epoch in range(num_epochs):
    for inputs, labels in demo_dataloader:
        optimizer.zero_grad()
        outputs = resnet_model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

# Test the model on a small batch from the demo dataset
with torch.no_grad():
    for inputs, labels in demo_dataloader:
        outputs = resnet_model(inputs)
        print("Input Shape:", inputs.shape)
        print("Output Shape:", outputs.shape)
        break  # Break after processing one batch for demonstration

Input Shape: torch.Size([8, 10, 12, 60])
Output Shape: torch.Size([8, 1000])
