In [1]:
%%capture
!pip install torchmetrics biosppy

In [None]:
!gdown 1f0vsok44_ZwKnLK2AGRqYyw0FmYcxrCT # test.zip
!gdown 1zXWuODLYE3UFgdUqn4HCnF93Ubg48sAm # sample_submissions к открытому тесту

!gdown 1-ix_pq7EIODv3DKEjmLCIxSpvyBHYu_n # папка с моделями

!unzip test.zip
!unzip resnets18.zip

In [3]:
import pandas as pd
import numpy as np
from tqdm.auto import tqdm
import os

import torch
import torch.nn as nn
from torchvision import models
from torchmetrics import F1Score

from scipy.signal import savgol_filter, resample
import biosppy

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

device(type='cuda')

In [4]:
class ECGResNet(nn.Module):
    def __init__(self, model_path='best.pt', device='cpu'):
        super().__init__()
        self.device = device

        self.classifier = models.resnet18(pretrained=True)
        self.classifier.conv1 = torch.nn.Conv2d(1, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
        self.classifier.fc = torch.nn.Linear(in_features=self.classifier.fc.in_features, out_features=2, bias=True)

        self.classifier.load_state_dict(torch.load(model_path, map_location=torch.device(self.device)))
        self.classifier.eval().to(self.device)

    def forward(self, x):
        x = self.classifier(x)
        return x

In [5]:
class ECGClassifier(nn.Module):
    def __init__(self, models_path, device='cpu'):
        super().__init__()
        self.device = device

        models_names = [f'{models_path}/{name}' for name in os.listdir(models_path) if name[-3:] == 'pth']

        self.pipeline = []

        for name in models_names:
            self.pipeline.append(ECGResNet(model_path=name, device=self.device))

    def forward(self, x):
        logits = torch.zeros(x.shape[0], 2).to(self.device)

        for model in self.pipeline:
            logits += nn.functional.softmax(model(x), dim=1)

        logits /= len(self.pipeline)

        return logits

In [6]:
%%capture
classifier = ECGClassifier('/content/resnets18', device=device)

In [11]:
def get_predicts(data_path, sample_submission=None):
    """
    data_path: путь к директроии с сигналами
    sample_submission: путь датафрейму с колонками record_name myocard который будет заполняться, если None создается на основе data_path
    """

    predict = pd.DataFrame(columns=['record_name', 'myocard'])

    # создание датафрейма для предсказаний
    if sample_submission is None:
        predict.record_name = [record[:-4] for record in os.listdir(data_path) if record[-4:] == '.npy']
    else:
        test = pd.read_csv(sample_submission)
        predict.record_name = test.record_name

    for name in tqdm(predict.record_name):
        data = np.load(f'{data_path}/{name}.npy')

        # Удаление шума
        data = np.apply_along_axis(lambda x: savgol_filter(x, 30, 2), axis=1, arr=data)

        # Получение R-иков, если в одном канале, не найден ищутся в остальных
        peaks = biosppy.signals.ecg.christov_segmenter(signal=data[10], sampling_rate = 500)[0]
        for channel in range(12):
            if len(peaks) > 3:
                break
            peaks = biosppy.signals.ecg.christov_segmenter(signal=data[channel], sampling_rate = 500)[0]

        signals = []
        count = 1

        for i in peaks[1:-1]:
            # Вокруг R пика берется интервал равный половине растояния до следующего R пика
            diff1 = abs(peaks[count - 1] - i)
            diff2 = abs(peaks[count + 1]- i)
            x = peaks[count - 1] + diff1 // 2
            y = peaks[count + 1] - diff2 // 2

            signal = torch.tensor(resample(data[:, x:y], 500, axis=1)).to(device)

            signals.append(signal)
            count += 1

        else:
            signals = torch.stack(signals)[:, None, :, :]

            peaks_logits = classifier(signals)
            peaks_logits = torch.argmax(peaks_logits, dim=1)

            ans = int(sum(peaks_logits) / len(peaks_logits) > 0.5)


        predict.loc[predict.record_name == name, 'myocard'] = ans

    return predict

In [12]:
predict = get_predicts(data_path='test', sample_submission='test.csv')

  0%|          | 0/449 [00:00<?, ?it/s]

In [None]:
predict.to_csv('predict.csv', index=False)