In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader
from torch.nn import Module
from typing import Literal
import numpy as np
from torch.utils.data import Dataset
import glob
import torch
import torch.nn.functional as F
from torch import optim

In [2]:
class SegmentationMetric:
    def __init__(self,
                 monitor: Literal['p', 'qrs', 't', 'all'] = 'all',
                 orientation_type: Literal['onset', 'offset', 'all'] = 'all',
                 return_type: Literal['precision', 'recall', 'f1', 'confusion_matrix'] = 'confusion_matrix',
                 samples=75):

        assert monitor in ['p', 'qrs', 't', 'all']
        assert orientation_type in ['onset', 'offset', 'all']
        assert return_type in ['precision', 'recall', 'f1', 'confusion_matrix']

        self.samples = samples
        self.monitor = monitor
        self.orientation_type = orientation_type
        self.return_type = return_type
        
        self.metric_to_func = {'precision': self.__precision,
                               'recall': self.__recall,
                               'f1': self.__f1}

    def __call__(self, y_pred, y_true):
        assert y_pred.shape == y_true.shape
        assert len(y_pred.shape) == 2
        
        matrix = np.zeros((2, 2), dtype=int)
        monitors = ['p', 'qrs', 't'] if self.monitor == 'all' else [self.monitor]
        orientations = ['onset', 'offset'] if self.orientation_type == 'all' else [self.orientation_type]
        for wave in monitors:
            for orientation in orientations:
                matrix += self.__handle(y_pred, y_true, wave, orientation)
        
        if self.return_type == 'confusion_matrix':
            return matrix

        return self.metric_to_func[self.return_type](matrix[0, 1], matrix[1, 0], matrix[1, 1])

    def __handle(self, y_pred, y_true, wave, orientation) -> tuple[int, int, int]:
        
        index = ['p', 'qrs', 't'].index(wave) + 1
        orientation = 2 * ['offset', 'onset'].index(orientation) - 1
        y_pred[y_true == 4] = 0

        y_true, y_pred = (y_true == index), (y_pred == index)

        wave_true = np.logical_and(np.roll(y_true, orientation) != 1, y_true == 1).astype(int)
        wave_pred = np.logical_and(np.roll(y_pred, orientation) != 1, y_pred == 1).astype(int)

        true_batch, true_indexes = np.where(wave_true == 1)
        
        tp = fn = 0
        
        for batch, x in zip(true_batch, true_indexes):
            wave = wave_pred[batch][x - self.samples // 2: x + self.samples // 2]
            if wave.sum():
                tp += 1
            else:
                fn += 1
            wave[:] = -1
        
        fp = (wave_pred[:, self.samples:-self.samples] == 1).sum()
        return np.array([[0, fp], [fn, tp]])
    
    @staticmethod
    def __precision(fp, fn, tp):
        if fp + tp == 0:
            return 1
        return tp / (tp + fp)
    
    @staticmethod
    def __recall(fp, fn, tp):
        if fn + tp == 0:
            return 1
        return tp / (tp + fn)
    
    @staticmethod
    def __f1(fp, fn, tp):
        precision = SegmentationMetric.__precision(fp, fn, tp)
        recall = SegmentationMetric.__recall(fp, fn, tp)
        if precision + recall == 0:
            return 1
        return 2 * (precision * recall) / (precision + recall)
    
    def __str__(self):
        return f'{self.monitor}_{self.orientation_type}'

In [34]:

class SignalDataset(Dataset):
    def __init__(self, data_folder, label_folder, max_length=5000):
        self.data_files = glob.glob(f'{data_folder}/*.npy')
        self.label_files = glob.glob(f'{label_folder}/*.npy')
        self.max_length = max_length

    def __len__(self):
        return len(self.data_files)

    def __getitem__(self, idx):
        data = np.load(self.data_files[idx])
        labels = np.load(self.label_files[idx])

        # Обрезка данных и меток, если длина превышает max_length
        if data.shape[1] > self.max_length:
            data = data[:, :self.max_length]
            labels = labels[:, :self.max_length]

        return torch.from_numpy(data).float(), torch.from_numpy(labels).long()

# Использование DataLoader
data_folder = '/home/meshalkin/Diplom/ludb/data/signals'
label_folder = '/home/meshalkin/Diplom/ludb/data/masks'
dataset = SignalDataset(data_folder, label_folder)
data_loader = DataLoader(dataset, batch_size=2, shuffle=True)
for i, (signal, label) in enumerate(data_loader):
    print(f"label.size = {label.shape}")
    print(f"signal.size = {signal.shape}")
    break

label.size = torch.Size([2, 12, 5000])
signal.size = torch.Size([2, 12, 5000])


In [36]:
class TimeSeriesSegmentationNet(nn.Module):
    def __init__(self, num_channels, num_classes, length):
        super(TimeSeriesSegmentationNet, self).__init__()
        self.conv1 = nn.Conv1d(num_channels, num_channels * 8, kernel_size=3, padding=1, groups=num_channels)
        self.conv2 = nn.Conv1d(num_channels * 8, num_channels * 16, kernel_size=3, padding=1, groups=num_channels)
        self.conv3 = nn.Conv1d(num_channels * 16, num_channels * num_classes, kernel_size=3, padding=1, groups=num_channels)

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = self.conv3(x)
        x = x.view(x.size(0), -1, num_channels, num_classes)  # изменение формы для выравнивания по каналам и классам
        x = x.permute(0, 2, 3, 1)  # Перестановка для получения [batch_size, num_channels, length, num_classes]
        return x
        
# Параметры модели
num_channels = 12
num_classes = 4
length = 5000
batch_size = 2

# Создание и тестирование модели
model = TimeSeriesSegmentationNet(num_channels, num_classes, length)
x = torch.randn(batch_size, num_channels, length)
output = model(x)
print(output.shape)  # Должно быть torch.Size([batch_size, num_channels, length])
print(output.dtype)  

torch.Size([2, 12, 4, 5000])
torch.float32


In [38]:
def train_model(model, train_loader, num_epochs=10, learning_rate=0.001):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = model.to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)

    for epoch in range(num_epochs):
        model.train()
        total_loss = 0

        for inputs, labels in train_loader:
            inputs = inputs.to(device)
            labels = labels.to(device)
            
            optimizer.zero_grad()
            
            outputs = model(inputs)  # [batch_size, num_channels, length, num_classes]
            outputs = outputs.permute(0, 3, 1, 2)  # [batch, num_classes, num_channels, length]
            
            # labels должны быть в формате [batch_size, num_channels, length]
            # Перестраиваем labels для соответствия ожидаемой размерности CrossEntropyLoss
            labels = labels.view(-1)  # Превращаем в одномерный массив

            # Так как CrossEntropyLoss ожидает вход в размерности [N, C, d1, d2, ...], где C - количество классов,
            # мы должны также изменить размер outputs
            outputs = outputs.view(-1, num_classes)  # [N * d1 * d2, C]

            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            
            total_loss += loss.item() * inputs.size(0)
        
        print(f'Epoch {epoch+1}/{num_epochs}, Loss: {total_loss / len(train_loader.dataset)}')
    
    print('Training complete.')
train_model(model, data_loader)    

Epoch 1/10, Loss: 1.386100938788316
Epoch 2/10, Loss: 1.3858506067211513
Epoch 3/10, Loss: 1.3858189950218747
Epoch 4/10, Loss: 1.3858009195451773
Epoch 5/10, Loss: 1.3857999412229065
Epoch 6/10, Loss: 1.3858155991848176
Epoch 7/10, Loss: 1.3858340090365961
Epoch 8/10, Loss: 1.3858852857731099
Epoch 9/10, Loss: 1.3859277655151327
Epoch 10/10, Loss: 1.3861314238125388
Training complete.


In [None]:
# metric = SegmentationMetric()

# def validate_model(model, valid_loader):
#     device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
#     model.eval()  # Переключаемся в режим оценки
#     correct = 0
#     total = 0

#     with torch.no_grad():  # Отключаем вычисление градиентов
#         for inputs, labels in valid_loader:
#             inputs = inputs.to(device)
#             labels = labels.to(device)

#             outputs = model(inputs)
#             predicted = torch.argmax(outputs, dim=3)  # Получаем предсказанные классы

#             # Считаем количество правильных предсказаний
#             correct += (predicted == labels).sum().item()
#             total += inputs.size(0) * inputs.size(2) * inputs.size(1)  # Общее количество предсказаний

#     accuracy = correct / total
#     print(f'Validation Accuracy: {accuracy:.4f}')

# # Пример использования:
# # Создание DataLoader для валидационного набора данных
# # valid_loader = DataLoader(valid_dataset, batch_size=batch_size, shuffle=False)

# # Проведение валидации
# validate_model(model, data_loader)