In [None]:
!pip install einops



In [None]:
import torch
import torch.nn.functional as F
from torch import nn, einsum
from functools import partial
import time
from einops import rearrange, repeat
import pandas as pd
from sklearn.preprocessing import StandardScaler
import torch.utils.data as data
import numpy as np
from sklearn.model_selection import train_test_split
from collections import OrderedDict
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay, classification_report

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
device= torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [None]:
def create_dataset(dataset, windowlen):
    """Transform a time series into a prediction dataset

    Args:
        dataset: A numpy array of time series, first dimension is the time steps
        lookback: Size of window for prediction
    """
    # feature_scalling
    sc = StandardScaler()
    signal = sc.fit_transform(dataset)
    X = []
    for i in range(int(len(signal)/windowlen)):
        feature = signal[i*windowlen:(i+1)*windowlen]
        X.append(feature)
    return torch.tensor(np.array(X)).float()

def data_generator( batch_size, windowlen):
    print('Loading CHB-MIT Interical and preictal dataset...')
    preictal_data = pd.read_csv('/content/drive/MyDrive/data/ictal_data.csv')
    ictal_data = pd.read_csv('/content/drive/MyDrive/data/preictal_data.csv')

    class1 = create_dataset(preictal_data, windowlen=windowlen)
    y_1= torch.zeros(class1.shape[0],1)

    class2 = create_dataset(ictal_data, windowlen=windowlen)
    y_2 = torch.ones(class2.shape[0],1)

    datasets = torch.cat((class1, class2),0)
    labels = torch.cat((y_1, y_2), 0)

    X_train, X_test, y_train, y_test = train_test_split(datasets, labels, test_size=0.25, shuffle=True, random_state=42)
    print(f" Shape of the Training data is {X_train.shape,}, and Testing data is {X_test.shape}" )

    train_loader = data.DataLoader(data.TensorDataset(X_train, y_train),  batch_size=batch_size, shuffle=True)
    test_loader = data.DataLoader(data.TensorDataset(X_test, y_test),  batch_size=batch_size, shuffle=True)

    return train_loader, test_loader

In [None]:
batch_size=50
window_length=512

In [None]:
train_loader, test_loader = data_generator(batch_size, window_length)

Loading CHB-MIT Interical and preictal dataset...
 Shape of the Training data is (torch.Size([3070, 512, 23]),), and Testing data is torch.Size([1024, 512, 23])


In [None]:
lr=0.001
d_dim=23
heads=2
epochs=30
embed_dim=40
num_layers=1
dropout=0.1
log_interval=10


n_classes = 1  ## For Binary class
sequence_length = window_length

In [None]:
class LayerNorm(nn.Module):
    def __init__(self, dim):
        super().__init__()
        self.gamma = nn.Parameter(torch.ones(dim))
        self.register_buffer("beta", torch.zeros(dim))

    def forward(self, x):
        return F.layer_norm(x, x.shape[-1:], self.gamma, self.beta)


class Residual(nn.Module):
    """
    A residual connection followed by a layer norm.
    """
    def __init__(self, dim, dropout):
        super(Residual, self).__init__()
        self.norm = LayerNorm(dim)
        self.dropout = nn.Dropout(dropout)
    def forward(self, x):
        "Residual connection with the same size."
        return x + self.dropout(self.norm(x))

class MLP(nn.Module):
    def __init__(self, dim, hidden_dim, dropout):
        super(MLP,self).__init__()
        self.mlp = nn.Sequential(
            nn.Linear(dim, hidden_dim),
            nn.GELU(),
            nn.Dropout(dropout),
            nn.Linear(hidden_dim, dim),
            nn.Dropout(dropout)
        )
    def forward(self, x):
        return self.mlp(x)

class StochasticDepth(nn.Module):
    def __init__(self, drop_prob):
        super(StochasticDepth, self).__init__()
        self.drop_prob = torch.Tensor([drop_prob])
        self.register_buffer('Drop out', self.drop_prob)

    def forward(self, x, training=None):
        if self.training:
            device = x.device
            keep_prob = (1 - self.drop_prob).to(device)
            shape = (x.size(0),) + (1,) * (x.dim() - 1)
            random_tensor = keep_prob + torch.rand(shape).to(device)
            random_tensor = random_tensor.floor()
            return (x / keep_prob) * random_tensor
        return x

class Embedding(nn.Module):
    def __init__(self, d_dim, embed_dim):
        super(Embedding, self).__init__()
        self.d_dim = d_dim
        self.embed_dim = embed_dim
        self.embed = nn.Linear(self.d_dim, self.embed_dim)

    def forward(self, x):
        return self.embed(x)

In [None]:
class SequencePooling(nn.Module):
    def __init__(self, in_features):
        super(SequencePooling, self).__init__()
        self.attention = nn.Linear(in_features, out_features=1)
        self.apply(self.init_weight)

    def forward(self, x):
        attention_weights = F.softmax(self.attention(x), dim=1)
        attention_weights = torch.transpose(attention_weights, 1, 2)
        weighted_representation = torch.matmul(attention_weights, x)
        return torch.squeeze(weighted_representation, dim=-2)

    @staticmethod
    def init_weight(m):
      if isinstance(m, nn.Linear):
        nn.init.trunc_normal_(m.weight, std=0.1)
        nn.init.constant_(m.bias,0)


class Conv_Tokenizer(nn.Module):
   def __init__(self):
    super(Conv_Tokenizer, self).__init__()
    self.conv = nn.Sequential(OrderedDict([
        ('conv1', nn.Conv2d(1, 3, 3, 1, 1)),
        ('relu1', nn.ReLU()),
        ('max1', nn.MaxPool2d(3, 1, 1)),
        ('conv2', nn.Conv2d(3, 1, 3, 1, 1)),
        ('relu2', nn.ReLU()),
        ('max2', nn.MaxPool2d(3, 1, 1))]))
    self.apply(self.initialize_weight)

   def forward(self,x):
    return self.conv(x)

   @staticmethod
   def initialize_weight(m):
    if isinstance(m, nn.Conv2d):
      nn.init.kaiming_normal_(m.weight)

class PositionEmbedding(nn.Module):
    def __init__(self, sequence_length, dim):
        super(PositionEmbedding, self).__init__()
        self.embedding = nn.EmbeddingBag(sequence_length, dim)

    def forward(self, x):
        positions = torch.arange(x.size(-1)).expand(x.size(-2), -1).to(device)
        return x + self.embedding(positions)


class MHA(nn.Module):
    def __init__(self, dim, heads = 8, dropout = 0.1):
        super().__init__()


        self.heads = heads
        self.dim = dim

        assert self.dim% self.heads==0, 'dim is not a factor of head_dim'
        self.head_dim = self.dim/self.heads

        inner_dim = int(self.head_dim *  self.heads)
        self.scale = self.head_dim  ** -0.5

        self.to_qkv = nn.Linear(self.dim, inner_dim * 3, bias = False)

        self.to_out = nn.Sequential(nn.Linear(inner_dim, self.dim, bias= False),
            nn.Dropout(dropout)
        )
        self.apply(self.initialize_weight)

    def forward(self, x):
        b, n, _, h = *x.shape, self.heads
        qkv = self.to_qkv(x).chunk(3, dim = -1)
        q, k, v = map(lambda t: rearrange(t, 'b n (h d) -> b h n d', h = h), qkv)

        dots = einsum('b h i d, b h j d -> b h i j', q, k) * self.scale

        # Attention Score
        attn = dots.softmax(dim=-1)

        out = einsum('b h i j, b h j d -> b h i d', attn, v)
        out = rearrange(out, 'b h n d -> b n (h d)')
        out =  self.to_out(out)
        return out

    @staticmethod
    def initialize_weight(m):
        if isinstance(m, nn.Linear):
            nn.init.trunc_normal_(m.weight, std=.02)
            if isinstance(m, nn.Linear) and m.bias is not None:
                nn.init.constant_(m.bias, 0)

class LCT(nn.Module):
    def __init__(
        self,
        input_shape,
        embed_dim,
        num_heads,
        positional_emb=True,
        stochastic_depth_rate=0.1,
        dropout = 0.1,
        transformer_layers=6,
        num_classes=5,
    ):
        super(LCT, self).__init__()

        self.embedding = Embedding(input_shape[-1],embed_dim)  # Embedding of input tensor
        input_shape = (input_shape[0],input_shape[1], embed_dim)


        self.conv_tokenizer = Conv_Tokenizer()

        self.positional_emb = positional_emb
        if positional_emb:
            self.position_embedding = PositionEmbedding(sequence_length=input_shape[-2], dim = input_shape[-1])

        self.transformer_layers = transformer_layers
        self.stochastic_depth_rate = stochastic_depth_rate
        self.dropout = dropout

        # Calculate Stochastic Depth probabilities.
        dpr = [x.item() for x in torch.linspace(0, self.stochastic_depth_rate, self.transformer_layers)]

        self.transformer_blocks = nn.Sequential()
        for i in range(transformer_layers):
            # Layer normalization 1.
            self.transformer_blocks.append(nn.LayerNorm(input_shape[-1]))

            # Create a multi-head attention layer.
            self.transformer_blocks.append(
                MHA(dim=input_shape[-1], heads=num_heads, dropout=0.1)
            )

            # Skip connection 1.
            self.transformer_blocks.append(StochasticDepth(dpr[i]))

            # Layer normalization 2.
            self.transformer_blocks.append(Residual(input_shape[-1],self.dropout))

            # MLP.
            self.transformer_blocks.append(MLP(dim=input_shape[-1], hidden_dim=20, dropout=self.dropout))

            # Skip connection 2.
            self.transformer_blocks.append(Residual(input_shape[-1],self.dropout))

        #self.ct_trans = nn.Sequential(*self.transformer_blocks)

        self.sequence_pooling = SequencePooling(input_shape[-1])  # Placeholder for SequencePooling

        self.classifier = nn.Linear(input_shape[-1], num_classes)

    def forward(self, x):
        # Embed data.
        data_embed = self.embedding(x)

        # Convolution Tokenizer.
        data_conv = self.conv_tokenizer(data_embed.unsqueeze(1)).squeeze()

        # Apply positional embedding.
        if self.positional_emb:
            data_conv = self.position_embedding(data_conv)

        data = self.transformer_blocks(data_conv)

        # Apply sequence pooling.
        weighted_representation = self.sequence_pooling(data)

        # Classify outputs.
        out = self.classifier(weighted_representation)

        return out


In [None]:
data1, leb = next(iter(train_loader))
input_shape = data1.shape
print('Input shape', data1.shape)

class LCTmodel(nn.Module):
    def __init__(self, input_shape, embed_dim, num_heads, positional_emb, stochastic_depth_rate,
            dropout, transformer_layers, num_classes):
        super(LCTmodel, self).__init__()

        self.Trans = LCT(input_shape,
                embed_dim,
                num_heads,
                positional_emb,
                stochastic_depth_rate,
                dropout,
                transformer_layers,
                num_classes)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        out = self.Trans(x)
        out = self.sigmoid(out)
        return out

Input shape torch.Size([50, 512, 23])


In [None]:

model =  LCTmodel(input_shape,
        embed_dim=40,
        num_heads=4,
        positional_emb=True,
        stochastic_depth_rate=0.1,
        dropout = 0.1,
        transformer_layers=1,
        num_classes=1).to(device)


model_name = "Model_{}_dim_{}_heads_{}_lr_{}_dropout_{}".format(
            'CCT',embed_dim, heads, lr, dropout)

message_filename =  'r_' + model_name + '.txt'
model_filename =  'm_' + model_name + '.pt'
with open(message_filename, 'w') as out:
    out.write('start\n')

In [None]:
def output_s(message, save_filename):
    print (message)
    with open(save_filename, 'a') as out:
        out.write(message + '\n')

criterion = nn.BCELoss()
optimizer = torch.optim.Adam(model.parameters(), lr= lr)


def train(ep):
    targets = list()
    preds = list()
    train_loss = 0
    correct = 0

    model.train()

    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = criterion(output, target)

        loss.backward()

        optimizer.step()
        train_loss += loss
        pred = output.round()
        correct += (pred== target).sum().item()
        targets += list(target.detach().cpu().numpy())
        preds += list(pred.detach().cpu().numpy())
        acc = 100. * correct / ((batch_idx+1) * batch_size)

        if batch_idx > 0 and batch_idx % log_interval == 0:
            print("Train Epoch: {} [{}/{} ({:.2f}%)]\tLoss: {:.2f} \t Acc: {:.2f}".format(
                ep, batch_idx * batch_size, len(train_loader.dataset),
                100. * batch_idx / len(train_loader), train_loss.item()/(batch_idx),acc))

    return 100. * correct / len(train_loader.dataset), train_loss.item()/batch_size,


## Leeanable parameters counts ###
def test():
    model.eval()

    targets = list()
    preds = list()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            test_loss += criterion(output, target)
            pred = output.round()
            correct += (pred== target).sum().item()
            targets += list(target.detach().cpu().numpy())
            preds += list(pred.detach().cpu().numpy())

        Acc = 100. * correct / len(test_loader.dataset)
        test_loss /= len(test_loader.dataset)
        print('\nTest set: Average loss: {:.3f}, Accuracy: {}/{} ({:.2f}%)\n'.format(
            test_loss, correct, len(test_loader.dataset), Acc))
        #output_s(message, message_filename)
        return targets, preds, Acc, test_loss


In [None]:
if __name__ == "__main__":
    exec_time = 0
    for epoch in range(1, epochs+1):
        start = time.time()
        train_acc, train_loss = train(epoch)
        end = time.time()
        t = end-start
        exec_time+= t
        preds, targets, test_acc, test_loss = test()
        message = ('Train Epoch: {}, Train loss: {:.4f}, Time taken: {:.4f}, Train Accuracy: {:.4f}, Test loss: {:.4f}, Test Accuracy: {:.4f}' .format(
                epoch, train_loss, t, train_acc, test_loss, test_acc))
        output_s(message, message_filename)

        if epoch % 10 == 0:
            lr /= 10
            for param_group in optimizer.param_groups:
                param_group['lr'] = lr

        if epoch%(epochs)==0:
            print('Total Execution time for training:',exec_time)
            preds = np.array(preds)
            targets = np.array(targets)
            conf_mat= confusion_matrix(targets, preds)
            disp = ConfusionMatrixDisplay(confusion_matrix= conf_mat)
            disp.plot()
            print(classification_report(targets, preds, digits=4))


Test set: Average loss: 0.002, Accuracy: 992/1024 (96.88%)

Train Epoch: 1, Train loss: 0.0231, Time taken: 2.3300, Train Accuracy: 99.5765, Test loss: 0.0021, Test Accuracy: 96.8750


KeyboardInterrupt: 