# loading modules and data

In [2]:
import numpy as np
import tensorflow as tf
from sklearn.model_selection import train_test_split
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, LSTM
import sys
import torch
import torch.nn as nn
import torch.nn.functional as F
import math
import copy
from copy import deepcopy
import torch.optim as optim
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import f1_score, accuracy_score
from sklearn.cluster import KMeans
from sklearn.model_selection import train_test_split
from torchinfo import summary


In [3]:

# Load the data
data = np.load('/Volumes/ruoyu_hd/Projects/DScapstone/Sleep/Data/mni_sEEG/train_data_noCoordinates.npz')
X = data['X']
y = data['y']
regions = data['region']

In [4]:
X_tensor = torch.tensor(X, dtype=torch.float32)  # Convert to float32 for input
y_tensor = torch.tensor(y, dtype=torch.long)  # Convert to long for class labels

# Check the shapes to ensure correctness
print("X shape:", X_tensor.shape)
print("y shape:", y_tensor.shape)

X shape: torch.Size([4576, 6800])
y shape: torch.Size([4576])


In [5]:
X = X[:,0:3000] #sampling rate 100Hz, slice to the first 30s

In [6]:
X.shape

(4576, 3000)

### define frequency ranges

In [7]:
freq_bands = {
    "delta_freq_range": [(1, 2)],
    "theta_freq_range": [(3, 7)],
    "alpha_freq_range": [(8, 12)],
    "low_beta_freq_range": [(13, 16)],
    "mid_beta_freq_range": [(17, 20)],
    "high_beta_freq_range": [(21, 29)],
    "gamma_freq_range": [(30, 100)]
}
# "full_band_range": [(0.25, 100)]

# building blocks

In [8]:
class SELayer(nn.Module):
    def __init__(self, channel, reduction=16):
        super(SELayer, self).__init__()
        self.avg_pool = nn.AdaptiveAvgPool1d(1)
        self.fc = nn.Sequential(
            nn.Linear(channel, channel // reduction, bias=False),
            nn.ReLU(inplace=True),
            nn.Linear(channel // reduction, channel, bias=False),
            nn.Sigmoid()
        )

    def forward(self, x):
        b, c, _ = x.size()
        y = self.avg_pool(x).view(b, c)
        y = self.fc(y).view(b, c, 1)
        return x * y.expand_as(x)


class SEBasicBlock(nn.Module):
    expansion = 1

    def __init__(self, inplanes, planes, stride=1, downsample=None, groups=1,
                 base_width=64, dilation=1, norm_layer=None,
                 *, reduction=16):
        super(SEBasicBlock, self).__init__()
        self.conv1 = nn.Conv1d(inplanes, planes, stride)
        self.bn1 = nn.BatchNorm1d(planes)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = nn.Conv1d(planes, planes, 1)
        self.bn2 = nn.BatchNorm1d(planes)
        self.se = SELayer(planes, reduction)
        self.downsample = downsample
        self.stride = stride


    def forward(self, x):
        residual = x
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)
        out = self.se(out)

        if self.downsample is not None:
            residual = self.downsample(x)

        out += residual
        out = self.relu(out)

        return out

class GELU(nn.Module):
    # for older versions of PyTorch.  For new versions you can use nn.GELU() instead.
    def __init__(self):
        super(GELU, self).__init__()

    def forward(self, x):
        x = torch.nn.functional.gelu(x)
        return x


In [9]:
class extendedMRCNN(nn.Module):
    def __init__(self, afr_reduced_cnn_size):
        super(extendedMRCNN, self).__init__()
        drate = 0.5
        fs = 100

        kernel_size1 = int(fs / 0.25)  # delta range
        kernel_size2 = int(fs / 5)      # theta range
        kernel_size3 = int(fs / 10)     # alpha range
        kernel_size4 = int(fs / 20)     # beta range
        kernel_size5 = int(fs / 50)     # gamma range

        def create_feature_layer(kernel_size):
            stride = max(50, round(kernel_size / 8))  # Ensure at least 2
            padding = max(0, round(kernel_size / 2)) #

            return nn.Sequential(
                nn.Conv1d(1, 64, kernel_size=kernel_size, stride=stride, bias=False, padding=padding),
                nn.BatchNorm1d(64),
                nn.GELU(),
                nn.MaxPool1d(kernel_size=8, stride=2, padding=4),
                nn.Dropout(drate),
                nn.Conv1d(64, 128, kernel_size=8, stride=1, bias=False, padding=4),
                nn.BatchNorm1d(128),
                nn.GELU(),
                nn.Conv1d(128, 128, kernel_size=8, stride=1, bias=False, padding=4),
                nn.BatchNorm1d(128),
                nn.GELU(),
                
                nn.MaxPool1d(kernel_size=4, stride=4, padding=2)
            )

        self.features1 = create_feature_layer(kernel_size1)
        self.features2 = create_feature_layer(kernel_size2)
        self.features3 = create_feature_layer(kernel_size3)
        self.features4 = create_feature_layer(kernel_size4)
        self.features5 = create_feature_layer(kernel_size5)

        self.dropout = nn.Dropout(drate)
        self.inplanes = 640
        self.AFR = self._make_layer(SEBasicBlock, afr_reduced_cnn_size, 1)

    def _make_layer(self, block, planes, blocks, stride=1):
        downsample = None
        if stride != 1 or self.inplanes != planes * block.expansion:
            downsample = nn.Sequential(
                nn.Conv1d(self.inplanes, planes * block.expansion, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm1d(planes * block.expansion),
            )

        layers = []
        layers.append(block(self.inplanes, planes, stride, downsample))
        self.inplanes = planes * block.expansion
        for i in range(1, blocks):
            layers.append(block(self.inplanes, planes))

        return nn.Sequential(*layers)

    def forward(self, x):
        x1 = self.features1(x)
        x2 = self.features2(x)
        x3 = self.features3(x)
        x4 = self.features4(x)
        x5 = self.features5(x)
        print(f"x1 shape: {x1.shape}")
        print(f"x2 shape: {x2.shape}")
        print(f"x3 shape: {x3.shape}")
        print(f"x4 shape: {x4.shape}")
        print(f"x5 shape: {x5.shape}")

        # Concatenate all feature outputs along the channel dimension
        x_concat = torch.cat((x1, x2, x3, x4, x5), dim=1)
        x_concat = self.dropout(x_concat)
        x_concat = self.AFR(x_concat)

        return x_concat

In [10]:
class TContext(nn.Module):
  def __init__(self, input_size, hidden_size, num_layers, num_classes):
        super(TContext, self).__init__()
        self.lstm = nn.LSTM(input_size=input_size, hidden_size=hidden_size, num_layers=num_layers)
        self.fc = nn.Linear(hidden_size, num_classes)
  def forward(self, x):
      lstm_out, _ = self.lstm(x)
      out = self.fc(lstm_out[:, -1, :])
      return F.log_softmax(out, dim=1)

In [11]:
class DeepSleepSEEG(nn.Module):
  def __init__(self):
        super(DeepSleepSEEG, self).__init__()
        num_classes = 4
        afr_reduced_cnn_size = 640
        self.extendedMRCNN = extendedMRCNN(afr_reduced_cnn_size)
        lstm_input_size = afr_reduced_cnn_size # Output size from extendedMRCNN
        hidden_size = 128  # Number of LSTM units
        num_layers = 2     # Number of LSTM layers

        # Initialize TContext with appropriate parameters
        self.TContext = TContext(input_size=lstm_input_size, hidden_size=hidden_size, num_layers=num_layers, num_classes=num_classes)

  def forward(self, x):
      # Get features from extendedMRCNN
      x_feat = self.extendedMRCNN(x)

      # Pass features through TContext (LSTM)
      encoded_features = self.TContext(x_feat)

      return encoded_features

# testing (checking) model

### extendedMRCNN architecture

In [14]:
afr_reduced_cnn_size = 640  # Adjust based on your architecture requirements
model = extendedMRCNN(afr_reduced_cnn_size)
summary(model, input_size=(100,1,3000))

x1 shape: torch.Size([100, 128, 9])
x2 shape: torch.Size([100, 128, 9])
x3 shape: torch.Size([100, 128, 9])
x4 shape: torch.Size([100, 128, 9])
x5 shape: torch.Size([100, 128, 9])


Layer (type:depth-idx)                        Output Shape              Param #
extendedMRCNN                                 [100, 640, 9]             --
├─Sequential: 1-1                             [100, 128, 9]             --
│    └─Conv1d: 2-1                            [100, 64, 61]             25,600
│    └─BatchNorm1d: 2-2                       [100, 64, 61]             128
│    └─GELU: 2-3                              [100, 64, 61]             --
│    └─MaxPool1d: 2-4                         [100, 64, 31]             --
│    └─Dropout: 2-5                           [100, 64, 31]             --
│    └─Conv1d: 2-6                            [100, 128, 32]            65,536
│    └─BatchNorm1d: 2-7                       [100, 128, 32]            256
│    └─GELU: 2-8                              [100, 128, 32]            --
│    └─Conv1d: 2-9                            [100, 128, 33]            131,072
│    └─BatchNorm1d: 2-10                      [100, 128, 33]            256
│   

In [13]:
complete_model = DeepSleepSEEG()
summary(complete_model)

Layer (type:depth-idx)                             Param #
DeepSleepSEEG                                      --
├─extendedMRCNN: 1-1                               --
│    └─Sequential: 2-1                             --
│    │    └─Conv1d: 3-1                            25,600
│    │    └─BatchNorm1d: 3-2                       128
│    │    └─GELU: 3-3                              --
│    │    └─MaxPool1d: 3-4                         --
│    │    └─Dropout: 3-5                           --
│    │    └─Conv1d: 3-6                            65,536
│    │    └─BatchNorm1d: 3-7                       256
│    │    └─GELU: 3-8                              --
│    │    └─Conv1d: 3-9                            131,072
│    │    └─BatchNorm1d: 3-10                      256
│    │    └─GELU: 3-11                             --
│    │    └─MaxPool1d: 3-12                        --
│    └─Sequential: 2-2                             --
│    │    └─Conv1d: 3-13                           1,280
│   