# Import

In [2]:
import numpy as np

import os
import torch
import torch.nn as nn
import torch.nn.functional as F

from torch.utils.data import DataLoader
from torch.utils.data import random_split

from torch.utils.data import Dataset
!pip install torchmetrics
from torchmetrics.classification import BinaryF1Score

from torchmetrics.classification import BinaryF1Score
from torchmetrics.classification import BinaryRecall
from torchmetrics.classification import BinarySpecificity
from torchmetrics.classification import BinaryAUROC
from torchmetrics.classification import BinaryAccuracy

from tqdm import tqdm

Collecting torchmetrics
  Downloading torchmetrics-1.1.0-py3-none-any.whl (761 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m761.3/761.3 kB[0m [31m7.0 MB/s[0m eta [36m0:00:00[0m
Collecting lightning-utilities>=0.8.0 (from torchmetrics)
  Downloading lightning_utilities-0.9.0-py3-none-any.whl (23 kB)
Installing collected packages: lightning-utilities, torchmetrics
Successfully installed lightning-utilities-0.9.0 torchmetrics-1.1.0


 # DataSet & DataLoader preparation


In [3]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


## Dataset

In [4]:
class DatasetPTBXL(Dataset):

    def __init__(self, path,p_list,p_name):
        self.path = path
        self.len = len(os.listdir(self.path))
        self.target_list = p_list
        self.target_name = p_name
    def __len__(self):
        return self.len


    def __getitem__(self, idx):
        file = np.load(path + str(idx+1) + '.npy', allow_pickle=True).item()
        return torch.from_numpy(file['data']).type('torch.FloatTensor'), torch.from_numpy(file['target'][ptlg_list.index(ptlg_name)]).type('torch.FloatTensor')

## Inputs

In [5]:
# path = '/home/narmarinad_12/Desktop/ECG/files_processed/'
path = '/content/drive/MyDrive/files_processed/'
ptlg_list = ['SR', 'AFIB', 'STACH', 'SARRH', 'SBRAD', 'PACE', 'SVARR', 'BIGU', 'AFLT', 'SVTAC', 'PSVT', 'TRIGU']
ptlg_name = 'SR'


## Split and DataLoad

In [6]:
train_dataset = DatasetPTBXL(path,ptlg_list,ptlg_name)

tmp_len = len(train_dataset)
train_size = int (0.8 * tmp_len)
valid_size = int (0.1 * tmp_len)
test_size = tmp_len - train_size - valid_size

train_dataset, valid_dataset,test_dataset = random_split(train_dataset, [train_size, valid_size, test_size])
batch_size = 500
train_dataset = DataLoader(train_dataset, batch_size, num_workers = 1, pin_memory = True )
valid_dataset = DataLoader(valid_dataset, batch_size, num_workers = 1, pin_memory = True )
test_dataset  = DataLoader(test_dataset , batch_size, num_workers = 1, pin_memory = True )

# Model





![resnet-architecture-3-fotor-202308251314.jpg](attachment:resnet-architecture-3-fotor-202308251314.jpg)


In [7]:
class RhythmECGClassification(nn.Module):

    def __init__(self, in_channels, out_channels):

        super(RhythmECGClassification, self).__init__()

        self.in_channels = in_channels

        self.relu = nn.ReLU()

        self.block0 = nn.Sequential(nn.Conv1d(in_channels, 64, kernel_size=7, stride=2, padding=3),
                                    nn.BatchNorm1d(64),
                                    nn.ReLU(), nn.MaxPool1d(kernel_size=3, stride=2, padding=1))

        self.block1 = nn.Sequential(nn.Conv1d(64,64, kernel_size = 3, stride = 1, padding = 1),
                                    nn.BatchNorm1d(64),
                                    nn.ReLU(),
                                    nn.Conv1d(64,64, kernel_size = 3, stride = 1, padding = 1),
                                    nn.BatchNorm1d(64))

        self.block2 = nn.Sequential(nn.Conv1d(64,128, kernel_size = 3, stride = 2, padding = 1),
                                    nn.BatchNorm1d(128),
                                    nn.ReLU(),
                                    nn.Conv1d(128,128, kernel_size = 3, stride = 1, padding = 1),
                                    nn.BatchNorm1d(128))

        self.block3 = nn.Sequential(nn.Conv1d(128,128, kernel_size = 3, stride = 1, padding = 1),
                                    nn.BatchNorm1d(128),
                                    nn.ReLU(),
                                    nn.Conv1d(128,128, kernel_size = 3, stride = 1, padding = 1),
                                    nn.BatchNorm1d(128))

        self.block4 = nn.Sequential(nn.Conv1d(128,256, kernel_size = 3, stride = 2, padding = 1),
                                    nn.BatchNorm1d(256),
                                    nn.ReLU(),
                                    nn.Conv1d(256,256, kernel_size = 3, stride = 1, padding = 1),
                                    nn.BatchNorm1d(256))

        self.block5 = nn.Sequential(nn.Conv1d(256,256, kernel_size = 3, stride = 1, padding = 1),
                                    nn.BatchNorm1d(256),
                                    nn.ReLU(),
                                    nn.Conv1d(256,256, kernel_size = 3, stride = 1, padding = 1),
                                    nn.BatchNorm1d(256))

        self.block6 = nn.Sequential(nn.Conv1d(256,512, kernel_size = 3, stride = 2, padding = 1),
                                    nn.BatchNorm1d(512),
                                    nn.ReLU(),
                                    nn.Conv1d(512,512, kernel_size = 3, stride = 1, padding = 1),
                                    nn.BatchNorm1d(512))

        self.block7 = nn.Sequential(nn.Conv1d(512,512, kernel_size = 3, stride = 1, padding = 1),
                                    nn.BatchNorm1d(512),
                                    nn.ReLU(),
                                    nn.Conv1d(512,512, kernel_size = 3, stride = 1, padding = 1),
                                    nn.BatchNorm1d(512))

        self.block8 = nn.Sequential(nn.AdaptiveAvgPool1d(1), nn.Flatten(),
                                    nn.Linear(512, 1000), nn.ReLU(),
                                    nn.Linear(1000,out_channels), nn.Sigmoid())

        self.ident1 = nn.Sequential(nn.Conv1d(64, 128, kernel_size=3, stride=2, padding=1),
                                    nn.BatchNorm1d(128))

        self.ident2 = nn.Sequential(nn.Conv1d(128, 256, kernel_size=3, stride=2, padding=1),
                                    nn.BatchNorm1d(256))

        self.ident3 = nn.Sequential(nn.Conv1d(256, 512, kernel_size=3, stride=2, padding=1),
                                    nn.BatchNorm1d(512))


    def forward(self, x):

        x = self.block0(x)

        for i in range (3):
            identity = x
            x = self.block1(x) + identity
            x = self.relu(x)

        identity = self.ident1(x)
        x = self.block2(x) + identity
        x = self.relu(x)

        for i in range (3):
            identity = x
            x = self.block3(x) + identity
            x = self.relu(x)

        identity = self.ident2(x)
        x = self.block4(x) + identity
        x = self.relu(x)

        for i in range (5):
            identity = x
            x = self.block5(x) + identity
            x = self.relu(x)

        identity = self.ident3(x)
        x = self.block6(x) + identity
        x = self.relu(x)

        for i in range (2):
            identity = x
            x = self.block7(x) + identity
            x = self.relu(x)

        x = self.block8(x)
        return x


    def training_step(self,batch):
        signals,targets = batch
        preds = self((signals))
        loss = nn.BCELoss()
        return loss(preds, targets)


    def validation_step(self,batch):
        signals,targets = batch
        preds = self((signals.float()))
        return { 'batch_preds': preds.detach(), 'batch_targets': targets}

# Evaluation

    





In [8]:
#calculating metrics of epoch, returning dict of metrics
def evaluate(model, valid_dataset,device):
    model.eval()

    metric_F1 = BinaryF1Score().to(device)
    metric_Rec = BinaryRecall().to(device)
    metric_Spec = BinarySpecificity().to(device)
    metric_AUC = BinaryAUROC(thresholds=None).to(device)
    metric_Acc = BinaryAccuracy().to(device)

    with torch.no_grad():
        for batch in valid_dataset:
            data, target = batch
            data   = data.to(device)
            target = target.to(device)
            tmp = model.validation_step((data,target))

            metric_F1(tmp['batch_preds'], tmp['batch_targets'] )
            metric_Rec(tmp['batch_preds'], tmp['batch_targets'] )
            metric_Spec(tmp['batch_preds'], tmp['batch_targets'] )
            metric_AUC(tmp['batch_preds'], tmp['batch_targets'] )
            metric_Acc(tmp['batch_preds'], tmp['batch_targets'] )



    return {'model_acc': metric_Acc.compute().item(),
          'model_F1score': metric_F1.compute().item(),
          'model_Recall': metric_Rec.compute().item(),
          'model_Specificity': metric_Spec.compute().item(),
          'model_ROCAUC': metric_AUC.compute().item() }


In [9]:
import matplotlib.pyplot as plt

def visual_res(history, epochs_num ):
    plt.title('Динамика обучения')
    plt.xlabel('Эпоха')
    plt.ylabel('Метрика')


    # list_of_metrics содержит наборы значений метрик для построения графика метрика/номер эпохи
    plt.plot(list(range(epochs_num)), [x['model_F1score'] for x in history],label = 'F1')
    plt.plot(list(range(epochs_num)), [x['model_Recall'] for x in history],label = 'Recall')
    plt.plot(list(range(epochs_num)), [x['model_Specificity'] for x in history], label = 'Specificity')
    plt.plot(list(range(epochs_num)), [x['model_ROCAUC'] for x in history], label = 'AUC')
    plt.plot(list(range(epochs_num)), [x['model_acc'] for x in history], label = 'Acc')
    # plt.plot(list(range(epoch_num)), [x['epoch_loss'] for x in history], label = 'loss')
    plt.grid(True)
    plt.legend()
    #построение графиков

# Training function



In [12]:
def fit(epochs_num, lr,model, opt = torch.optim.Adam, wd = True, gc = True):

    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model = model.to(device)
    optimizer = opt(model.parameters(), lr, weight_decay = wd)
    history = []


    for epoch in range(epochs_num):
        torch.cuda.empty_cache()
        model.train()
        train_loss = 0
        samples_num = 0

        for batch in tqdm(train_dataset):
            data, target = batch
            data   = data.to(device)
            target = target.to(device)
            loss = model.training_step((data,target))
            loss.backward()
#             if gc == True:
#                 torch.nn.utils.clip_grad_norm_(model.parameters(), args.clip)
            optimizer.step()
#             optimizer.zero_grad()

            train_loss  += loss.detach()
            samples_num += len(data)

        result = evaluate(model, valid_dataset, device)
        result['train_loss'] = train_loss / samples_num
        history.append(result)

        print('Epoch ', epoch,end = "")
        print(': train_loss = %.4f'%(train_loss/ samples_num))

    return history


def train_model(epochs_num, lr, opt = torch.optim.Adam, wd = True, gc = True):
    model = RhythmECGClassification(12,1)
    history = fit(epochs_num, lr, model)
    visual_res(history,epochs_num)
    return model

# 6. Тренировка модели


In [None]:
lr = 0.0003
epochs_num = 10
model = train_model(epochs_num, lr)

 14%|█▍        | 5/35 [18:18<1:48:59, 218.00s/it]

In [None]:
# res = train_model(res, lr,epochs,device, pat)
# eval = evaluate(res, test_dataset,device)
# for keys in eval:
#     print(keys, '=' ,eval[keys])