In [8]:
import os
import sys

script_path = os.path.join(os.getcwd(), "../../Scripts/")
sys.path.append(script_path)
import data_generator as dgen

In [9]:
import torch
import numpy as np
from torch.utils.data import Dataset, DataLoader
import torch.nn.functional as F

# Примерная функция для нормализации
def normalize(signal):
    return (signal - np.mean(signal)) / np.std(signal)

# Класс для подготовки датасета
class ECGDataset(Dataset):
    def __init__(self, data, labels):
        self.data = data
        self.labels = labels
        self.fixed_length = 5000  # Пример длины для padding

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        # Берем данные пациента
        ecg_signal = self.data[idx]
        
        # Применяем нормализацию к каждому каналу
        ecg_signal = np.array([normalize(ch) for ch in ecg_signal])
        
        # Padding/Truncation до фиксированной длины
        ecg_signal = self._fix_length(ecg_signal)
        
        # Преобразование в torch.tensor
        ecg_signal = torch.tensor(ecg_signal, dtype=torch.float32)
        label = torch.tensor(self.labels[idx], dtype=torch.long)
        
        return ecg_signal, label
    
    def _fix_length(self, ecg_signal):
        # Применяем padding или обрезание
        if ecg_signal.shape[1] < self.fixed_length:
            pad_size = self.fixed_length - ecg_signal.shape[1]
            ecg_signal = np.pad(ecg_signal, ((0, 0), (0, pad_size)), 'constant')
        else:
            ecg_signal = ecg_signal[:, :self.fixed_length]
        return ecg_signal

# Пример использования
# data = список из N записей, каждая содержит 8 каналов
# labels = список меток заболеваний
dgen.init(filter=True)
amy, amyc, norm, amy_h, amyc_h, norm_h = dgen.read_data_with_meta()

X = norm + amyc + amy
y = np.concatenate([np.zeros(len(norm) + len(amyc)), np.ones(len(amy))])

import pickle
with open('../../Data/dumped/X_train.pkl', 'rb') as f:
    f.seek(0)
    X_train = pickle.load(f)
with open('../../Data/dumped/y_train.pkl', 'rb') as f:
    f.seek(0)
    y_train = pickle.load(f)
with open('../../Data/dumped/X_test.pkl', 'rb') as f:
    f.seek(0)
    X_test = pickle.load(f)
with open('../../Data/dumped/y_test.pkl', 'rb') as f:
    f.seek(0)
    y_test = pickle.load(f)

train_dataset = ECGDataset(data=X_train, labels=y_train[0])
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

test_dataset = ECGDataset(data=X_test, labels=y_test[0])
test_loader = DataLoader(train_dataset, batch_size=32, shuffle=False)



In [10]:
import torch.nn as nn

class ECGNet(nn.Module):
    def __init__(self):
        super(ECGNet, self).__init__()
        
        # Сверточные слои
        self.conv1 = nn.Conv1d(in_channels=8, out_channels=16, kernel_size=7, padding=3)
        self.conv2 = nn.Conv1d(in_channels=16, out_channels=32, kernel_size=5, padding=2)
        self.pool = nn.MaxPool1d(kernel_size=2)
        
        # LSTM слой для захвата временных зависимостей
        self.lstm = nn.LSTM(input_size=32, hidden_size=64, num_layers=2, batch_first=True, bidirectional=True)
        
        # Полносвязные слои
        self.fc1 = nn.Linear(64*2, 128)
        self.fc2 = nn.Linear(128, 3)  # Предполагается 3 класса болезней
        
    def forward(self, x):
        # x shape: [batch_size, 8, seq_len]
        
        # Свертка
        x = F.relu(self.conv1(x))
        x = self.pool(F.relu(self.conv2(x)))
        
        # Подготовка для LSTM
        # Меняем размер на [batch_size, seq_len, channels] для LSTM
        x = x.permute(0, 2, 1)
        
        # LSTM
        x, (hn, cn) = self.lstm(x)
        
        # Берем последнее скрытое состояние LSTM
        x = x[:, -1, :]  # [batch_size, 64*2]
        
        # Полносвязные слои
        x = F.relu(self.fc1(x))
        x = self.fc2(x)  # [batch_size, num_classes]
        
        return x

In [11]:
# Пример обучения
model = ECGNet()
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

def train_model(model, dataloader, criterion, optimizer, num_epochs=25):
    model.train()
    for epoch in range(num_epochs):
        running_loss = 0.0
        
        for inputs, labels in dataloader:
            # Обнуление градиентов
            optimizer.zero_grad()
            
            # Прямой проход
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            
            # Обратный проход и оптимизация
            loss.backward()
            optimizer.step()
            
            running_loss += loss.item() * inputs.size(0)
        
        epoch_loss = running_loss / len(dataloader.dataset)
        print(f'Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss:.4f}')

# Запуск обучения
train_model(model, train_loader, criterion, optimizer, num_epochs=25)


Epoch 1/25, Loss: 1.0670
Epoch 2/25, Loss: 0.9333
Epoch 3/25, Loss: 0.6982
Epoch 4/25, Loss: 0.6352
Epoch 5/25, Loss: 0.5876
Epoch 6/25, Loss: 0.5879
Epoch 7/25, Loss: 0.5789
Epoch 8/25, Loss: 0.5828
Epoch 9/25, Loss: 0.5774
Epoch 10/25, Loss: 0.5644
Epoch 11/25, Loss: 0.5670
Epoch 12/25, Loss: 0.5508
Epoch 13/25, Loss: 0.5409
Epoch 14/25, Loss: 0.5299
Epoch 15/25, Loss: 0.5184
Epoch 16/25, Loss: 0.4956
Epoch 17/25, Loss: 0.4914
Epoch 18/25, Loss: 0.5199
Epoch 19/25, Loss: 0.5034
Epoch 20/25, Loss: 0.4986
Epoch 21/25, Loss: 0.4977
Epoch 22/25, Loss: 0.4771
Epoch 23/25, Loss: 0.4704
Epoch 24/25, Loss: 0.4515
Epoch 25/25, Loss: 0.4388


In [14]:
class MultiBranchECGNet(nn.Module):
    def __init__(self, num_channels=8, num_classes=3):
        super(MultiBranchECGNet, self).__init__()
        
        # Создаем отдельную ветвь для каждого канала
        self.branches = nn.ModuleList([self.create_branch() for _ in range(num_channels)])
        
        # Полносвязные слои после объединения всех ветвей
        self.fc1 = nn.Linear(320000, 128)
        self.fc2 = nn.Linear(128, num_classes)
    
    def create_branch(self):
        # Каждый канал проходит через свою сверточную ветвь
        branch = nn.Sequential(
            nn.Conv1d(1, 16, kernel_size=7, padding=3),  # Свертка с padding
            nn.ReLU(),
            nn.MaxPool1d(2),
            nn.Conv1d(16, 32, kernel_size=5, padding=2),  # Вторая с вертка
            nn.ReLU(),
            nn.MaxPool1d(2)
        )
        return branch
    
    def forward(self, x):
        # x shape: [batch_size, num_channels, seq_len]
        
        # Пропускаем каждый канал через свою ветвь
        branch_outputs = []
        for i in range(x.size(1)):  # num_channels
            branch_output = self.branches[i](x[:, i:i+1, :])  # Обрабатываем i-й канал
            branch_outputs.append(branch_output)
        
        # Объединяем выходы всех ветвей
        out = torch.cat(branch_outputs, dim=1)  # Соединяем по оси каналов
        
        # Преобразуем для подачи на полносвязные слои
        out = out.view(out.size(0), -1)  # Flatten
        
        # Полносвязные слои
        out = F.relu(self.fc1(out))
        out = self.fc2(out)
        
        return out

# Пример использования
model = MultiBranchECGNet(num_channels=8, num_classes=2)


In [15]:
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# Обучение модели
def train_model(model, dataloader, criterion, optimizer, num_epochs=10):
    model.train()
    for epoch in range(num_epochs):
        running_loss = 0.0
        
        for inputs, labels in dataloader:
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item() * inputs.size(0)
        
        epoch_loss = running_loss / len(dataloader.dataset)
        print(f'Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss:.4f}')

# Запуск обучения
train_model(model, train_loader, criterion, optimizer, num_epochs=25)


Epoch 1/25, Loss: 9.6126
Epoch 2/25, Loss: 2.3094
Epoch 3/25, Loss: 1.6491
Epoch 4/25, Loss: 0.6087
Epoch 5/25, Loss: 0.4328
Epoch 6/25, Loss: 0.2854
Epoch 7/25, Loss: 0.2050
Epoch 8/25, Loss: 0.1287
Epoch 9/25, Loss: 0.0657
Epoch 10/25, Loss: 0.0266
Epoch 11/25, Loss: 0.0108
Epoch 12/25, Loss: 0.0054
Epoch 13/25, Loss: 0.0020
Epoch 14/25, Loss: 0.0009
Epoch 15/25, Loss: 0.0004
Epoch 16/25, Loss: 0.0002
Epoch 17/25, Loss: 0.0001
Epoch 18/25, Loss: 0.0001
Epoch 19/25, Loss: 0.0001
Epoch 20/25, Loss: 0.0001
Epoch 21/25, Loss: 0.0001
Epoch 22/25, Loss: 0.0000
Epoch 23/25, Loss: 0.0000
Epoch 24/25, Loss: 0.0000
Epoch 25/25, Loss: 0.0000


In [16]:
from sklearn.metrics import recall_score

def validate_model(model, dataloader):
    model.eval()  # Переводим модель в режим оценки
    all_preds = []
    all_labels = []
    
    with torch.no_grad():  # Отключаем градиенты для валидации
        for inputs, labels in dataloader:
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)  # Предсказания с максимальной вероятностью
            all_preds.append(preds.cpu().numpy())
            all_labels.append(labels.cpu().numpy())
    
    # Преобразуем в numpy массивы
    all_preds = np.concatenate(all_preds)
    all_labels = np.concatenate(all_labels)
    
    # Считаем accuracy
    recall = recall_score(all_labels, all_preds)
    print(f'Validation recall: {recall:.4f}')
    
    return recall

# Пример вызова валидации
test_accuracy = validate_model(model, test_loader)


Validation Accuracy: 1.0000
