In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import math
import os
import wfdb
import random
from sklearn.model_selection import KFold
import torch.optim as optim
from collections import Counter
from torchvision import transforms
from torch.utils.data import DataLoader
from sklearn.metrics import confusion_matrix
from sklearn.metrics import recall_score, precision_score, accuracy_score, f1_score

# Encoders

### EfficientNetB0

In [None]:
class ConvBlock(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size, stride, groups=1, bias=False, padding=None, activation=nn.SiLU,):
        super(ConvBlock, self).__init__()
        layers = []
        if padding is None:
            padding = kernel_size // 2
            
        layers.extend(
            [
                nn.Conv1d(in_channels=in_channels, out_channels=out_channels, kernel_size=kernel_size, stride=stride, padding=padding, bias=bias, groups=groups,),
                nn.BatchNorm1d(out_channels),
            ]
        )
        if activation is not None:
            if "inplace" in activation.__init__.__code__.co_varnames:
                layers.extend([activation(inplace=True)])
            else:
                layers.extend([activation()])

        self.conv = nn.Sequential(*layers)

    def forward(self, x):
        return self.conv(x)


class SEBlock1D(nn.Module):
    def __init__(self, in_channels, reduction=24):
        super(SEBlock1D, self).__init__()
        if in_channels == 32:
            reduction=4
        self.avg_pool = nn.AdaptiveAvgPool1d(1)
        self.fc = nn.Sequential(
            nn.Linear(in_channels, in_channels // reduction, bias=False),
            nn.ReLU(inplace=True),
            nn.Linear(in_channels // reduction, in_channels, bias=False),
            nn.Sigmoid(),
        )

    def forward(self, x):
        b, c, _ = x.size()
        y = self.avg_pool(x)
        y = y.view(b, c)
        y = self.fc(y).view(b, c, 1)
        return x * y.expand_as(x)


class MBConv(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size, stride, expansion_factor):
        super(MBConv, self).__init__()
        hidden_dim = in_channels * expansion_factor
        self.use_residual = stride == 1 and in_channels == out_channels

        layers = []
        if expansion_factor != 1:
            layers.extend(
                [
                    ConvBlock(in_channels=in_channels, out_channels=hidden_dim, kernel_size=1, stride=1, groups=1,)
                ]
            )
        layers.extend(
            [
                ConvBlock(in_channels=hidden_dim, out_channels=hidden_dim, kernel_size=kernel_size, stride=stride, groups=hidden_dim,),
                SEBlock1D(in_channels=hidden_dim),
                ConvBlock(in_channels=hidden_dim, out_channels=out_channels, kernel_size=1, stride=1, groups=1, activation=None,
                ),
            ]
        )
        self.conv = nn.Sequential(*layers)

    def forward(self, x):
        input = x
        if self.use_residual:
            for i in self.conv:
                x = i(x)
            return x + input
        else:
            for i in self.conv:
                x = i(x)
            return x

In [None]:
class EfficientNetB0(nn.Module):
    def __init__(self, input_channels):
        super(EfficientNetB0, self).__init__()

        self.stages = [
            # [Operator(f), Channels(c), Layers(l), Kernel(k), Stride(s), Expansion(exp)]
            [ConvBlock, 32, 1, 3, 2, 1],
            [MBConv, 16, 1, 3, 1, 1],
            [MBConv, 24, 2, 3, 2, 6],
            [MBConv, 40, 2, 5, 2, 6],
            [MBConv, 80, 3, 3, 2, 6],
            [MBConv, 112, 3, 5, 1, 6],
            [MBConv, 192, 4, 5, 2, 6],
            [MBConv, 320, 1, 3, 1, 6],
            [ConvBlock, 1280, 1, 1, 1, 0]
        ]
        layers = []
        last_channel = input_channels
        for i in self.stages:
            block, channel, num_layers, kernel, stride, expansion = i
            if block == ConvBlock:
                layers.extend([block(in_channels=last_channel,out_channels=channel,kernel_size=kernel,stride=stride,)])
                last_channel = channel
            elif block == MBConv:
                for j in range(num_layers):
                    if j == 0:
                        layers.extend([block(in_channels=last_channel,out_channels=channel,kernel_size=kernel,stride=stride,expansion_factor=expansion,)])
                    else:
                        layers.extend([block(in_channels=last_channel,out_channels=channel,kernel_size=kernel,stride=1,expansion_factor=expansion,)])
                    last_channel = channel

        self.conv = nn.Sequential(*layers)


    def forward(self, x, return_layers=True):
        features = []
        for i, block in enumerate(self.conv):
            x = block(x)
            features.append(x)


        return x, [features[5], features[3], features[0]]

### ResNet18

In [None]:
class BasicBlock(nn.Module):
    expansion = 1

    def __init__(self, in_channels, out_channels, stride=1, downsample=None):
        super(BasicBlock, self).__init__()
        self.conv1 = nn.Conv1d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm1d(out_channels)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = nn.Conv1d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm1d(out_channels)
        self.downsample = downsample

    def forward(self, x):
        identity = x
        if self.downsample is not None:
            identity = self.downsample(x)

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)

        out += identity
        out = self.relu(out)

        return out

In [None]:
class ResNet18(nn.Module):
    def __init__(self, block, layers, in_channels):
        super(ResNet18, self).__init__()
        self.in_channels = 64
        self.conv1 = nn.Conv1d(in_channels=in_channels, out_channels=64, kernel_size=7, stride=2, padding=3, bias=False)
        self.bn1 = nn.BatchNorm1d(64)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool1d(kernel_size=3, stride=2, padding=1)

        self.layer1 = self._make_layer(block, 64, layers[0])
        self.layer2 = self._make_layer(block, 128, layers[1], stride=2)
        self.layer3 = self._make_layer(block, 256, layers[2], stride=2)
        self.layer4 = self._make_layer(block, 512, layers[3], stride=2)

    def _make_layer(self, block, out_channels, blocks, stride=1):
        downsample = None
        if stride != 1 or self.in_channels != out_channels * block.expansion:
            downsample = nn.Sequential(
                nn.Conv1d(self.in_channels, out_channels * block.expansion, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm1d(out_channels * block.expansion),
            )

        layers = []
        layers.append(block(self.in_channels, out_channels, stride, downsample))
        self.in_channels = out_channels * block.expansion
        for _ in range(1, blocks):
            layers.append(block(self.in_channels, out_channels))

        return nn.Sequential(*layers)

    def forward(self, x):
        features = []
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        features.append(x)
        x = self.maxpool(x)

        x = self.layer1(x)
        features.append(x)
        x = self.layer2(x)
        features.append(x)
        x = self.layer3(x)
        x = self.layer4(x)

        return x, features[::-1]

### MobileNetv2

In [None]:
import torch
import torch.nn as nn

class ConvBlock(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size, stride, groups=1, bias=False, padding=None, activation=nn.SiLU,):
        super(ConvBlock, self).__init__()
        layers = []
        if padding is None:
            padding = kernel_size // 2
        layers.extend(
            [
                nn.Conv1d(in_channels=in_channels, out_channels=out_channels, kernel_size=kernel_size, stride=stride, padding=padding, bias=bias, groups=groups,),
                nn.BatchNorm1d(out_channels),
            ]
        )
        if activation is not None:
            if "inplace" in activation.__init__.__code__.co_varnames:
                layers.extend([activation(inplace=True)])
            else:
                layers.extend([activation()])

        self.conv = nn.Sequential(*layers)

    def forward(self, x):
        return self.conv(x)
    

class InvertedResidual(nn.Module):
    def __init__(self, in_channels, out_channels, stride, expansion_factor):
        super(InvertedResidual, self).__init__()
        self.stride = stride
        assert stride in [1, 2]

        hidden_dim = int(in_channels * expansion_factor)
        self.use_res_connect = self.stride == 1 and in_channels == out_channels

        if expansion_factor == 1:
            self.conv = nn.Sequential(
                # dw
                nn.Conv1d(hidden_dim, hidden_dim, 3, stride, 1, groups=hidden_dim, bias=False),
                nn.BatchNorm1d(hidden_dim),
                nn.ReLU6(inplace=True),
                # pw-linear
                nn.Conv1d(hidden_dim, out_channels, 1, 1, 0, bias=False),
                nn.BatchNorm1d(out_channels),
            )
        else:
            self.conv = nn.Sequential(
                # pw
                nn.Conv1d(in_channels, hidden_dim, 1, 1, 0, bias=False),
                nn.BatchNorm1d(hidden_dim),
                nn.ReLU6(inplace=True),
                # dw
                nn.Conv1d(hidden_dim, hidden_dim, 3, stride, 1, groups=hidden_dim, bias=False),
                nn.BatchNorm1d(hidden_dim),
                nn.ReLU6(inplace=True),
                # pw-linear
                nn.Conv1d(hidden_dim, out_channels, 1, 1, 0, bias=False),
                nn.BatchNorm1d(out_channels),
            )

    def forward(self, x):
        if self.use_res_connect:
            return x + self.conv(x)
        else:
            return self.conv(x)


class MobileNet1D(nn.Module):
    def __init__(self, input_channels, width_mult=1.0):
        super(MobileNet1D, self).__init__()
        self.first_channel = 32
        self.last_channel = 1280

        self.interverted_residual_setting = [
            # t, c, n, s
            [1, 16, 1, 1],
            [6, 24, 2, 2],
            [6, 32, 3, 2],
            [6, 64, 4, 2],
            [6, 96, 3, 1],
            [6, 160, 3, 2],
            [6, 320, 1, 1],
        ]

        # block = InvertedResidualBlock
        block = InvertedResidual

        # First layer
        self.features = [
            nn.Sequential(
                nn.Conv1d(input_channels, self.first_channel, 3, 2, 1, bias=False),
                nn.BatchNorm1d(self.first_channel),
                nn.ReLU6(inplace=True),
            )
        ]

        # Inverted residual blocks
        for t, c, n, s in self.interverted_residual_setting:
            output_channel = int(c * width_mult)
            for i in range(n):
                if i == 0:
                    self.features.append(
                        block(self.first_channel, output_channel, s, expansion_factor=t)
                    )
                else:
                    self.features.append(
                        block(self.first_channel, output_channel, 1, expansion_factor=t)
                    )
                self.first_channel = output_channel

        # Last layer
        self.features.append(
            nn.Sequential(
                nn.Conv1d(self.first_channel, self.last_channel, 1, 1, 0, bias=False),
                nn.BatchNorm1d(self.last_channel),
                nn.ReLU6(inplace=True),
            )
        )

        self.features = nn.ModuleList(self.features)

    def forward(self, x, return_layers=True):
        features = []
        for i, block in enumerate(self.features):
            x = block(x)
            features.append(x)
        return x, [features[6], features[3], features[0]]

# TransUNet

In [None]:
class PositionalEncoding1D(nn.Module):
    def __init__(self, embed_dim, max_len=5000):
        super(PositionalEncoding1D, self).__init__()
        
        position = torch.arange(0, max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, embed_dim, 2) * (-torch.log(torch.tensor(10000.0)) / embed_dim))
        pe = torch.zeros(max_len, embed_dim)
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)
        
        self.register_buffer('pe', pe)
    
    def forward(self, x):
        x = x + self.pe[:, :x.size(1), :]
        return x

In [None]:
class PatchEmbedding1D(nn.Module):
    def __init__(self, feature_dim, patch_size, embed_dim, num_patches):
        super(PatchEmbedding1D, self).__init__()
        self.patch_size = patch_size
        self.projection = nn.Linear(feature_dim * patch_size, embed_dim)
        
        # Define learnable positional embeddings for each patch position
        self.positional_embeddings = nn.Parameter(torch.randn(1, num_patches, embed_dim))
    
    def forward(self, x):
        batch_size, feature_dim, reduced_length = x.shape
        num_patches = reduced_length // self.patch_size
        
        assert reduced_length % self.patch_size == 0, "Reduced length must be divisible by patch size"

        x = x.unfold(dimension=2, size=self.patch_size, step=self.patch_size)
        x = x.permute(0, 2, 1, 3).contiguous().view(batch_size, num_patches, -1)

        x = self.projection(x)
        x = x + self.positional_embeddings[:, :num_patches, :]
        
        return x

In [None]:
class TransformerEncoder1D(nn.Module):
    def __init__(self, embed_dim, num_heads, num_layers):
        super(TransformerEncoder1D, self).__init__()
        self.positional_encoding = PositionalEncoding1D(embed_dim=embed_dim)
        encoder_layer = nn.TransformerEncoderLayer(d_model=embed_dim, nhead=num_heads, dim_feedforward=2048)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
    
    def forward(self, x):
        x = self.positional_encoding(x) 
        
        x = x.permute(1, 0, 2)
        x = self.transformer_encoder(x)
        x = x.permute(1, 0, 2)
        return x

In [None]:
class ChannelAttention(nn.Module):
    def __init__(self, in_planes, ratio=16):
        super(ChannelAttention, self).__init__()
        self.avg_pool = nn.AdaptiveAvgPool1d(1)
        self.max_pool = nn.AdaptiveMaxPool1d(1)
           
        self.fc = nn.Sequential(nn.Conv1d(in_planes, in_planes // 16, 1, bias=False),
                               nn.ReLU(),
                               nn.Conv1d(in_planes // 16, in_planes, 1, bias=False))
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        avg_out = self.fc(self.avg_pool(x))
        max_out = self.fc(self.max_pool(x))
        out = avg_out + max_out
        return self.sigmoid(out)

class SpatialAttention(nn.Module):
    def __init__(self, kernel_size=7):
        super(SpatialAttention, self).__init__()

        self.conv1 = nn.Conv1d(2, 1, kernel_size, padding=kernel_size//2, bias=False)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        avg_out = torch.mean(x, dim=1, keepdim=True)
        max_out, _ = torch.max(x, dim=1, keepdim=True)
        x = torch.cat([avg_out, max_out], dim=1)
        x = self.conv1(x)
        return self.sigmoid(x)


class CBAM(nn.Module):
    def __init__(self, in_channels):
        super(CBAM, self).__init__()
        self.conv1 = nn.Conv1d(in_channels=in_channels, out_channels=in_channels*8, kernel_size=1)
        self.ca = ChannelAttention(in_channels*8)
        self.sa = SpatialAttention()
        self.conv2 = nn.Conv1d(in_channels=in_channels*8, out_channels=in_channels, kernel_size=1)

    def forward(self, x):
        x = self.conv1(x)
        x = self.ca(x) * x
        x = self.sa(x) * x
        x = self.conv2(x)
        return x

In [None]:
class TransUNet(nn.Module):
    def __init__(self, in_channels, patch_size, num_patches, embed_dim, num_heads, num_layers, num_classes, encoder_name="ResNet", input_signal=1800):
        super(TransUNet, self).__init__()
        self.encoder_name = encoder_name
        
        if self.encoder_name == "EfficientNet":
            self.encoder = EfficientNetB0(in_channels)
            feature_dim = 1280
        elif self.encoder_name == "ResNet":
            self.encoder = ResNet18(block=BasicBlock, layers=[2, 2, 2, 2], in_channels=in_channels)
            feature_dim = 512
        elif self.encoder_name == "MobileNet":
            self.encoder = MobileNet1D(input_channels=in_channels)
            feature_dim = 1280
        
        self.patch_embedding = PatchEmbedding1D(feature_dim=feature_dim, patch_size=patch_size, embed_dim=embed_dim, num_patches=num_patches)

        self.transformer_encoder = TransformerEncoder1D(embed_dim=embed_dim, num_heads=num_heads, num_layers=num_layers)

        if self.encoder_name == "EfficientNet":
            self.conv1 = nn.Conv1d(in_channels=num_patches, out_channels=40, kernel_size=1)
            self.conv2 = nn.Conv1d(in_channels=80, out_channels=24, kernel_size=1)
            self.conv3 = nn.Conv1d(in_channels=48, out_channels=32, kernel_size=1)
            self.conv4 = nn.Conv1d(in_channels=64, out_channels=num_classes, kernel_size=1)
            
            self.up1 = nn.Upsample(size=math.ceil(input_signal/8), mode='linear', align_corners=True)
            self.up2 = nn.Upsample(size=math.ceil(input_signal/4), mode='linear', align_corners=True)
            self.up3 = nn.Upsample(size=math.ceil(input_signal/2), mode='linear', align_corners=True)
            self.up4 = nn.Upsample(size=math.ceil(input_signal), mode='linear', align_corners=True)

            # CBAM gate:
            self.cbam1 = CBAM(in_channels=40)
            self.cbam2 = CBAM(in_channels=24)
            self.cbam3 = CBAM(in_channels=32)
        elif self.encoder_name == "ResNet":
            self.conv1 = nn.Conv1d(in_channels=num_patches, out_channels=128, kernel_size=1)
            self.conv2 = nn.Conv1d(in_channels=256, out_channels=64, kernel_size=1)
            self.conv3 = nn.Conv1d(in_channels=128, out_channels=64, kernel_size=1)
            self.conv4 = nn.Conv1d(in_channels=128, out_channels=num_classes, kernel_size=1)
            
            self.up1 = nn.Upsample(size=math.ceil(input_signal/8), mode='linear', align_corners=True)
            self.up2 = nn.Upsample(size=math.ceil(input_signal/4), mode='linear', align_corners=True)
            self.up3 = nn.Upsample(size=math.ceil(input_signal/2), mode='linear', align_corners=True)
            self.up4 = nn.Upsample(size=math.ceil(input_signal), mode='linear', align_corners=True)

            # CBAM gate:
            self.cbam1 = CBAM(in_channels=128)
            self.cbam2 = CBAM(in_channels=64)
            self.cbam3 = CBAM(in_channels=64)

        elif self.encoder_name == "MobileNet":
            self.conv1 = nn.Conv1d(in_channels=num_patches, out_channels=32, kernel_size=1)
            self.conv2 = nn.Conv1d(in_channels=64, out_channels=24, kernel_size=1)
            self.conv3 = nn.Conv1d(in_channels=48, out_channels=32, kernel_size=1)
            self.conv4 = nn.Conv1d(in_channels=64, out_channels=num_classes, kernel_size=1)
            
            self.up1 = nn.Upsample(size=math.ceil(input_signal/8), mode='linear', align_corners=True)
            self.up2 = nn.Upsample(size=math.ceil(input_signal/4), mode='linear', align_corners=True)
            self.up3 = nn.Upsample(size=math.ceil(input_signal/2), mode='linear', align_corners=True)
            self.up4 = nn.Upsample(size=math.ceil(input_signal), mode='linear', align_corners=True)

            # CBAM gate:
            self.cbam1 = CBAM(in_channels=32)
            self.cbam2 = CBAM(in_channels=24)
            self.cbam3 = CBAM(in_channels=32)
            

    def forward(self, x):
        x, features = self.encoder(x)
        x = self.patch_embedding(x)
        x = self.transformer_encoder(x)
        if self.encoder_name == "EfficientNet":
            x = self.conv1(x)
            x = self.up1(x)
            tmp_features = self.cbam1(features[0])
            x = torch.cat([x, tmp_features], dim=1)
            x = self.conv2(x)
            x = self.up2(x)
            tmp_features = self.cbam2(features[1])
            x = torch.cat([x, tmp_features], dim=1)
            x = self.conv3(x)
            x = self.up3(x)
            tmp_features = self.cbam3(features[2])
            x = torch.cat([x, tmp_features], dim=1)
            x = self.conv4(x)
            x = self.up4(x)
            
        elif self.encoder_name == "ResNet":
            x = self.conv1(x)
            x = self.up1(x)
            tmp_features = self.cbam1(features[0])
            x = torch.cat([x, tmp_features], dim=1)
            x = self.conv2(x)
            x = self.up2(x)
            tmp_features = self.cbam2(features[1])
            x = torch.cat([x, tmp_features], dim=1)
            x = self.conv3(x)
            x = self.up3(x)
            tmp_features = self.cbam3(features[2])
            x = torch.cat([x, tmp_features], dim=1)
            x = self.conv4(x)
            x = self.up4(x)

        elif self.encoder_name == "MobileNet":
            x = self.conv1(x)
            x = self.up1(x)
            tmp_features = self.cbam1(features[0])
            x = torch.cat([x, tmp_features], dim=1)
            x = self.conv2(x)
            x = self.up2(x)
            tmp_features = self.cbam2(features[1])
            x = torch.cat([x, tmp_features], dim=1)
            x = self.conv3(x)
            x = self.up3(x)
            tmp_features = self.cbam3(features[2])
            x = torch.cat([x, tmp_features], dim=1)
            x = self.conv4(x)
            x = self.up4(x)

        return x

# Dataset

In [None]:
class Dataset:
    def __init__(self, path, length=1800):
        self.path = path
        self.length = length
        pass

    def __segmentate_data(self, signal, peaks, labels):
        start, end = 0, self.length
        signals_tab, peaks_tab, labels_tab, peaks_idx = [], [], [], []
        while end < signal.shape[0] and start < signal.shape[0]:
            signals_tab.append(signal[start:end])
            peaks_tab.append(peaks[start:end])
            labels_tab.append(labels[start:end])

            idx = np.where(peaks_tab[-1] == 1)[0]
            if len(idx) == 0:
                tmp = 0
            else:
                tmp = idx[-1]
            peaks_idx.append(tmp)
            if start == (start + tmp):
                start = start + self.length
                end = start + self.length
            else:
                start += tmp
                end = start + self.length
        return (np.array(signals_tab), np.array(peaks_tab), np.array(labels_tab), peaks_idx,)

    def __read_file(self, file):
        file_path = os.path.join(self.path, file)

        record = wfdb.rdrecord(file_path)
        annotation = wfdb.rdann(file_path, "atr")

        size = record.p_signal.shape[0]

        file_peaks = np.zeros(size)  # coords of R-peaks
        file_labels = np.zeros(size)  # labels of each time step
        tmp_sample, tmp_symbol = [], []  # coors and symbols of R-peaks

        # get rid of noninformative samples
        for i in range(len(annotation.symbol)):
            # idk if that list is the same for other data
            if annotation.symbol[i] not in ["+", "~", "x", "|"]:
                tmp_sample.append(annotation.sample[i])
                tmp_symbol.append(annotation.symbol[i])
                file_peaks[annotation.sample[i]] = 1

        # creating labels list based on the
        for i in range(len(tmp_symbol) - 1):
            if tmp_symbol[i] in ["N", "L", "R", "e", "j"]:
                file_labels[tmp_sample[i] : tmp_sample[i + 1]] = 0
            elif tmp_symbol[i] in ["A", "a", "J", "S"]:
                file_labels[tmp_sample[i] : tmp_sample[i + 1]] = 1
            elif tmp_symbol[i] in ["V", "E"]:
                file_labels[tmp_sample[i] : tmp_sample[i + 1]] = 2
            elif tmp_symbol[i] in ["F"]:
                file_labels[tmp_sample[i] : tmp_sample[i + 1]] = 3
            elif tmp_symbol[i] in ["/", "f", "Q"]:
                file_labels[tmp_sample[i] : tmp_sample[i + 1]] = 4

        # get rid of time steps behind first classified R-peak, and after the last one
        file_signals = record.p_signal[tmp_sample[0] : tmp_sample[-1]]
        file_labels = file_labels[tmp_sample[0] : tmp_sample[-1]]
        file_peaks = file_peaks[tmp_sample[0] : tmp_sample[-1]]

        # segmentate signals, labels, peaks into n sec segments
        segmented_signals, segmented_peaks, segmented_outputs, peak_idx = (
            self.__segmentate_data(file_signals, file_peaks, file_labels)
        )

        return segmented_signals, segmented_peaks, segmented_outputs, peak_idx
        
    def read_dataset(self):
        f = open(f"{self.path}/RECORDS", "r")
        files = f.read().replace("\n", " ").split()

        peaks, targets, inputs, peaks_idx = [], [], [], []

        for file in files:
            segmented_signals, segmented_peaks, segmented_outputs, peak_idx = self.__read_file(file)
            
            peaks.extend(segmented_peaks)
            inputs.extend(segmented_signals)
            targets.extend(segmented_outputs)
            peaks_idx.extend(peak_idx)

        return np.array(inputs), np.array(targets), np.array(peaks), np.array(peaks_idx)
        
    
    def read_dataset_patients(self, num_fold):
        random.seed(42)
        f = open(f"{self.path}/RECORDS", "r")
        files = f.read().replace("\n", " ").split()
        random.shuffle(files)
        folds = np.array_split(files, 5)

        train_files, test_files = [], []
        for i in range(len(folds)):
            if i == num_fold:
                test_files.extend(folds[i])
            else:
                train_files.extend(folds[i])

        peaks_train, targets_train, inputs_train, peaks_idx_train = [], [], [], []
        peaks_test, targets_test, inputs_test, peaks_idx_test = [], [], [], []

        for file in files:
            segmented_signals, segmented_peaks, segmented_outputs, peak_idx = self.__read_file(file)

            if file in train_files:
                peaks_train.extend(segmented_peaks)
                inputs_train.extend(segmented_signals)
                targets_train.extend(segmented_outputs)
                peaks_idx_train.extend(peak_idx)
            elif file in test_files:
                peaks_test.extend(segmented_peaks)
                inputs_test.extend(segmented_signals)
                targets_test.extend(segmented_outputs)
                peaks_idx_test.extend(peak_idx)
                
        X_train = np.array(inputs_train)
        X_test = np.array(inputs_test)
        y_train = np.array(targets_train)
        y_test = np.array(targets_test)
        peaks_train = np.array(peaks_train)
        peaks_test = np.array(peaks_test)
        peaks_idx_train = np.array(peaks_idx_train)
        peaks_idx_test = np.array(peaks_idx_test)

        return (X_train.transpose(0, 2, 1), X_test.transpose(0, 2, 1), peaks_train, peaks_test, y_train, y_test, peaks_idx_train, peaks_idx_test,)
            

In [None]:
def categorize_data(targets, preds, peaks):
    targets = np.array(targets.cpu())
    preds = np.array(preds.cpu())
    peaks = np.array(peaks.cpu())
    indices_of_ones = [np.where(row == 1)[0] for row in peaks]
    categorized_preds_most, categorized_targets = [], []
    
    for i in range(len(peaks)):
        tmp_preds_most, tmp_targets = [], []
        for j in range(len(indices_of_ones[i])):
            if j == 0 and indices_of_ones[i][j] != 0:
                counter = Counter(preds[i][0 : indices_of_ones[i][0]])
                most_common_number, count = counter.most_common(1)[0]
                tmp_preds_most.append(most_common_number)
                
                counter = Counter(targets[i][0 : indices_of_ones[i][0]])
                counter_target, count = counter.most_common(1)[0]
            elif j == len(indices_of_ones[i]) - 1:
                counter = Counter(preds[i][indices_of_ones[i][-1] : len(peaks[i])])
                most_common_number, count = counter.most_common(1)[0]
                tmp_preds_most.append(most_common_number)
                
                counter = Counter(targets[i][indices_of_ones[i][-1] : len(peaks[i])])
                counter_target, count = counter.most_common(1)[0]
            else:
                counter = Counter(preds[i][indices_of_ones[i][j] : indices_of_ones[i][j + 1]])
                most_common_number, count = counter.most_common(1)[0]
                tmp_preds_most.append(most_common_number)
                
                counter = Counter(targets[i][indices_of_ones[i][j] : indices_of_ones[i][j + 1]])
                counter_target, count = counter.most_common(1)[0]
            tmp_targets.append(counter_target)
        
        categorized_preds_most.append(tmp_preds_most)
        categorized_targets.append(tmp_targets)
    return categorized_preds_most, categorized_targets

In [None]:
class ECGDataset(Dataset):
    def __init__(self, inputs, targets, peaks, peaks_idx):
        self.inputs = inputs
        self.targets = targets
        self.peaks = peaks
        self.peaks_idx = peaks_idx
        self.transform = transforms.Compose(
            [transforms.ToTensor(), transforms.Normalize(mean=[0.5], std=[0.5])]
        )

    def __len__(self):
        return len(self.inputs)

    def __getitem__(self, index):
        input = self.inputs[index]
        target = self.targets[index]
        peak = self.peaks[index]
        peak_idx = self.peaks_idx[index]

        input = self.transform(input)

        target = torch.tensor(target, dtype=torch.long)
        peak = torch.tensor(peak, dtype=torch.long)
        peak_idx = torch.tensor(peak_idx, dtype=torch.long)

        return input[0], target, peak, peak_idx


In [None]:
def calculate_metrices(preds, targets, num_classes, end_idxs=None):
    accuracy, precision, recall, f1 = [], [], [], []
    cm = np.zeros((num_classes, num_classes), dtype=int)
    
    for i in range(len(targets)):
        if end_idxs is None:
            tmp_true = targets[i]
            tmp_pred = preds[i]
        else:
            if end_idxs[i].item() == 0:
                continue
            else:
                tmp_true = targets[i][: -1]
                tmp_pred = preds[i][: -1]

        accuracy.append(accuracy_score(tmp_true,tmp_pred))
        precision.append(precision_score(tmp_true,tmp_pred, zero_division=0, average="weighted"))
        recall.append(recall_score(tmp_true,tmp_pred, zero_division=0, average="weighted"))
        f1.append(f1_score(tmp_true,tmp_pred, zero_division=0, average="weighted"))
        cm += confusion_matrix(tmp_true,tmp_pred, labels=np.arange(num_classes))
        
    return (np.nanmean(accuracy), np.nanmean(precision), np.nanmean(recall), np.nanmean(f1), cm,)


def iou(pred, target, num_classes, epsilon=1e-6):
    ious = []
    pred = pred.view(-1)
    target = target.view(-1)
    
    for cls in range(num_classes):
        pred_inds = (pred == cls)
        target_inds = (target == cls)
        intersection = (pred_inds & target_inds).sum().float()
        union = (pred_inds | target_inds).sum().float()
        if union == 0:
            ious.append(np.nan)  # Only happens if both pred and target are empty for this class
        else:
            ious.append((intersection / (union + epsilon)).item())
    
    return np.nanmean(ious)

In [None]:
class FocalLoss(nn.Module):
    def __init__(self, alpha=0.25, gamma=2):
        super(FocalLoss, self).__init__()
        self.alpha = alpha
        self.gamma = gamma
    
    def forward(self, inputs, targets):
        ce_loss = F.cross_entropy(inputs, targets, reduction='none')
        pt = torch.exp(-ce_loss)
        focal_loss = self.alpha * (1-pt)**self.gamma * ce_loss
        return focal_loss.mean()

In [None]:
def test_model(model, criterion, data_loader, device, num_classes, num_channels=2):
    model.eval()

    iou_score = []
    acc_tab, prec_tab, rec_tab, f1_tab = [], [], [], []
    cm_tab = np.zeros((num_classes, num_classes), dtype=int)

    running_loss = 0

    with torch.no_grad():
        for inputs, targets, peaks, end_idxs in data_loader:
            inputs = inputs.to(torch.float32).to(device)
            targets = targets.to(device)
            peaks = peaks.to(device)
            end_idxs = end_idxs.to(device)

            batch_outputs = model(inputs)

            outputs = batch_outputs.permute(0,2,1).contiguous()
            outputs = outputs.view(-1, num_classes)
            target = targets.view(-1)

            loss = criterion(outputs, target)
            running_loss+=loss.item()

            preds = torch.argmax(batch_outputs, dim=1)

            # continuous normal
            iou_score.append(iou(preds, targets, num_channels))

            # categorized normal
            categorized_preds, categorized_targets = categorize_data(targets, preds, peaks)
            acc, prec, rec, f1, cm = calculate_metrices(preds=categorized_preds, targets=categorized_targets, num_classes=num_classes, end_idxs=end_idxs)
            acc_tab.append(acc)
            prec_tab.append(prec)
            rec_tab.append(rec)
            f1_tab.append(f1)
            cm_tab += cm

    print("Testing!")
    print(f"Loss value: {running_loss/len(data_loader)}, Accuracy: {np.nanmean(acc_tab)}")

    return_list = [running_loss/len(data_loader), np.nanmean(iou_score),
                   np.nanmean(acc_tab), np.nanmean(prec_tab), np.nanmean(rec_tab), np.nanmean(f1_tab), cm_tab]
    
    return return_list

In [None]:
def train_model(model, criterion, optimizer, data_loader, device, num_classes, num_channels=2):
    model.train()

    iou_score = []
    acc_tab, prec_tab, rec_tab, f1_tab = [], [], [], []
    cm_tab = np.zeros((num_classes, num_classes), dtype=int)

    running_loss = 0

    for inputs, targets, peaks, end_idxs in data_loader:
        inputs = inputs.to(torch.float32).to(device)
        targets = targets.to(device)
        peaks = peaks.to(device)
        end_idxs = end_idxs.to(device)

        optimizer.zero_grad()
        batch_outputs = model(inputs)

        outputs = batch_outputs.permute(0,2,1).contiguous()
        outputs = outputs.view(-1, num_classes)
        target = targets.view(-1)

        loss = criterion(outputs, target)
        running_loss+=loss.item()
        loss.backward()
        optimizer.step()

        preds = torch.argmax(batch_outputs, dim=1)

        # continuous normal
        iou_score.append(iou(preds, targets, num_channels))

        # categorized normal
        categorized_preds, categorized_targets = categorize_data(targets, preds, peaks)
        acc, prec, rec, f1, cm = calculate_metrices(preds=categorized_preds, targets=categorized_targets, num_classes=num_classes, end_idxs=end_idxs)
        acc_tab.append(acc)
        prec_tab.append(prec)
        rec_tab.append(rec)
        f1_tab.append(f1)
        cm_tab += cm

        
    print("Training!")
    print(f"Loss value: {running_loss/len(data_loader)}, Accuracy: {np.nanmean(acc_tab)}")
    return_list = [running_loss/len(data_loader), np.nanmean(iou_score),
                   np.nanmean(acc_tab), np.nanmean(prec_tab), np.nanmean(rec_tab), np.nanmean(f1_tab),cm_tab]
    
    return return_list

# Testy

In [None]:
def make_results(train_results, test_results, epochs, encoder_name, evaluation_type, path_to_save, num_folds=5):
    test_loss, test_iou, test_acc, test_prec, test_rec, test_f1 = np.zeros(epochs), np.zeros(epochs), np.zeros(epochs), np.zeros(epochs), np.zeros(epochs), np.zeros(epochs)
    train_loss, train_iou, train_acc, train_prec, train_rec, train_f1 = np.zeros(epochs), np.zeros(epochs), np.zeros(epochs), np.zeros(epochs), np.zeros(epochs), np.zeros(epochs)
    train_cm, test_cm = [np.zeros((5, 5), dtype=np.float32) for _ in range(epochs)], [np.zeros((5, 5), dtype=np.float32) for _ in range(epochs)]
    for i in range(len(train_results)):
        # train part
        for j in range(len(train_results[i])):
            train_loss[j] += train_results[i][j][0]
            train_iou[j] += train_results[i][j][1]
            train_acc[j] += train_results[i][j][2]
            train_prec[j] += train_results[i][j][3]
            train_rec[j] += train_results[i][j][4]
            train_f1[j] += train_results[i][j][5]
            train_cm[j] += train_results[i][j][6]

        # test part
        for j in range(len(train_results[i])):
            test_loss[j] += test_results[i][j][0]
            test_iou[j] += test_results[i][j][1]
            test_acc[j] += test_results[i][j][2]
            test_prec[j] += test_results[i][j][3]
            test_rec[j] += test_results[i][j][4]
            test_f1[j] += test_results[i][j][5]
            test_cm[j] += test_results[i][j][6]

    train_loss /= num_folds
    train_iou /= num_folds
    train_acc /= num_folds
    train_prec /= num_folds
    train_rec /= num_folds
    train_f1 /= num_folds
    for i in train_cm:
        i /= num_folds

    test_loss /= num_folds
    test_iou /= num_folds
    test_acc /= num_folds
    test_prec /= num_folds
    test_rec /= num_folds
    test_f1 /= num_folds
    for i in test_cm:
        i /= num_folds

    result_list = {"train_loss": train_loss, "train_iou": train_iou, "train_acc": train_acc, "train_prec": train_prec, "train_rec": train_rec, "train_f1": train_f1, "train_cm": train_cm,
                  "test_loss": test_loss, "test_iou": test_iou, "test_acc": test_acc, "test_prec": test_prec, "test_rec": test_rec, "test_f1": test_f1, "test_cm": test_cm}

    tmp_path = os.path.join(path_to_save, f"{encoder_name}_{evaluation_type}")
    os.mkdir(tmp_path)

    for key, value in result_list.items():
        if key in ["train_cm", "test_cm"]:
            file_path = os.path.join(tmp_path, f"{key}.npz")
            np.savez(file_path, *value)
        else:
            file_path = os.path.join(tmp_path, f"{key}.txt")
            with open(file_path, "w") as file:
                for i in value:
                    file.write(f"{i};")

In [None]:
def mixed_patients(data_path, encoder_name, device, path_to_save, length=1800, batch_size=64, num_classes=5, learning_rate=1e-4, epochs=100):
    dataset = Dataset(path=data_path, length=length)
    inputs, targets, peaks, peaks_idx = dataset.read_dataset()

    kf = KFold(n_splits=5, shuffle=True, random_state=42)
    folds_train, folds_test = [], []
    for fold, (train_index, test_index) in enumerate(kf.split(inputs)):
        X_train, X_test = inputs[train_index], inputs[test_index]
        y_train, y_test = targets[train_index], targets[test_index]
        peaks_train, peaks_test = peaks[train_index], peaks[test_index]
        peaks_idx_train, peaks_idx_test = peaks_idx[train_index], peaks_idx[test_index]
        
        train_dataset = ECGDataset(X_train.transpose(0, 2, 1), y_train, peaks_train, peaks_idx_train)
        test_dataset = ECGDataset(X_test.transpose(0, 2, 1), y_test, peaks_test, peaks_idx_test)
        
        train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, drop_last=False)
        test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True, drop_last=False)

        model = TransUNet(in_channels=2, patch_size=1, num_patches=57, embed_dim=256, num_heads=8,
                          num_layers=4, num_classes=5, encoder_name=encoder_name, input_signal=length).to(device)

        criterion = FocalLoss()
        optimizer = optim.Adam(model.parameters(), lr=learning_rate)

        train_tab, test_tab = [], []
        for epoch in range(epochs):
            print(f"Epoch {epoch+1}")
            train_list = train_model(model=model, criterion=criterion, optimizer=optimizer, data_loader=train_loader, device=device, num_classes=num_classes)
            test_list = test_model(model=model, criterion=criterion, data_loader=test_loader, device=device, num_classes=num_classes)
            train_tab.append(train_list)
            test_tab.append(test_list)

        folds_train.append(train_tab)
        folds_test.append(test_tab)

    make_results(train_results=folds_train, test_results=folds_test, epochs=epochs, encoder_name=encoder_name, 
                 evaluation_type="Mix", path_to_save=path_to_save, num_folds=5)
    


In [None]:
def divided_patients(data_path, encoder_name, device, path_to_save, length=1800, batch_size=64, num_classes=5, learning_rate=1e-4, epochs=100):
    folds_train, folds_test = [], []
    for i in range(5):
        dataset = Dataset(path=data_path, length=length)
        X_train, X_test, peaks_train, peaks_test, y_train, y_test, peaks_idx_train, peaks_idx_test = dataset.read_dataset_patients(num_fold=i)
        
        train_dataset = ECGDataset(X_train, y_train, peaks_train, peaks_idx_train)
        test_dataset = ECGDataset(X_test, y_test, peaks_test, peaks_idx_test)

        train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, drop_last=False)
        test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True, drop_last=False)

        model = TransUNet(in_channels=2, patch_size=1, num_patches=57, embed_dim=256, num_heads=8,
                          num_layers=4, num_classes=5, encoder_name=encoder_name, input_signal=length).to(device)

        criterion = FocalLoss()
        optimizer = optim.Adam(model.parameters(), lr=learning_rate)

        train_tab, test_tab = [], []
        for epoch in range(epochs):
            print(f"Epoch {epoch+1}")
            train_list = train_model(model=model, criterion=criterion, optimizer=optimizer, data_loader=train_loader, device=device, num_classes=num_classes)
            test_list = test_model(model=model, criterion=criterion, data_loader=test_loader, device=device, num_classes=num_classes)
            train_tab.append(train_list)
            test_tab.append(test_list)

        folds_train.append(train_tab)
        folds_test.append(test_tab)

    make_results(train_results=folds_train, test_results=folds_test, epochs=epochs, encoder_name=encoder_name, 
                 evaluation_type="Divided", path_to_save=path_to_save, num_folds=5)
    

In [None]:
encoders = ["EfficientNet", "ResNet", "MobileNet"]
methods = ["Divided", "Mix"]

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
path = os.path.join(os.getcwd(), "mit-bih-arrhythmia-database-1.0.0")
path_to_save = os.path.join(os.getcwd(), "Results")
num_classes = 5
in_channels = 2
in_length = 1800
batch_size = 64
learning_rate = 1e-4
epochs = 200

os.mkdir(path_to_save)
for encoder in encoders:
    for method in methods:
        if method == "Mix":
            mixed_patients(data_path=path, encoder_name=encoder, device=device, path_to_save=path_to_save, epochs=epochs)
        elif method == "Divided":
            divided_patients(data_path=path, encoder_name=encoder, device=device, path_to_save=path_to_save, epochs=epochs)