In [3]:
import numpy as np
import torch

# Загрузка исходных данных [320, 10, 246] — 320 наборов, 10 временных шагов, 246 признаков
data = np.load('ihb.npy')

# Заменим NaN на нули (или можно использовать средние значения)
mean_vals = np.nanmean(data, axis=0)
data = np.where(np.isnan(data), mean_vals, data)

# Разделим данные на группы по 16 для каждого пациента
num_patients = 20  # 20 пациентов по 16 наборов
grouped_data = data.reshape(num_patients, 16, 10, 246)  # [20, 16, 10, 246]

# Для каждой пары из 320 объектов нужно сформировать набор данных для обучения
pairs = []
labels = []

for i in range(num_patients):
    for j in range(16):
        for k in range(j + 1, 16):
            # Пара внутри одного пациента
            pairs.append((grouped_data[i, j], grouped_data[i, k]))
            labels.append(1)  # Метка 1 для пар одного пациента

        for other_patient in range(i + 1, num_patients):
            for l in range(16):
                # Пара между разными пациентами
                pairs.append((grouped_data[i, j], grouped_data[other_patient, l]))
                labels.append(0)  # Метка 0 для пар разных пациентов

# Преобразуем пары и метки в тензоры
pairs = np.array(pairs)  # [num_pairs, 2, 10, 246]
labels = np.array(labels)

train_x_tensor = torch.tensor(pairs, dtype=torch.float32)
train_y_tensor = torch.tensor(labels, dtype=torch.float32)

In [16]:
class LSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers):
        super(LSTMModel, self).__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        # Изменяем размер входа полносвязного слоя на 2 * hidden_size, т.к. мы объединяем два скрытых состояния
        self.fc = nn.Linear(2 * hidden_size, 1)  # Один выход для бинарной классификации
        self.sigmoid = nn.Sigmoid()

    def forward(self, input_1, input_2):
        # Пропускаем оба входных вектора через LSTM
        _, (hn_1, _) = self.lstm(input_1)  # hn_1: [num_layers, batch_size, hidden_size]
        _, (hn_2, _) = self.lstm(input_2)

        # Соединяем скрытые состояния двух наборов временных шагов
        combined = torch.cat((hn_1[-1], hn_2[-1]), dim=1)  # Соединяем последний временной шаг по скрытому состоянию

        # Пропускаем через полносвязный слой и сигмоиду
        output = self.fc(combined)
        return self.sigmoid(output)

# Параметры модели
input_size = 246  # Количество признаков
hidden_size = 128  # Размер скрытого слоя
num_layers = 2  # Количество слоев LSTM

model = LSTMModel(input_size, hidden_size, num_layers)

In [34]:
class LSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers):
        super(LSTMModel, self).__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size * 2, 1)  # Умножаем на 2, т.к. будем соединять скрытые состояния двух LSTM
        self.sigmoid = nn.Sigmoid()

    def forward(self, input_1, input_2):
        # Прогоняем первый набор признаков через LSTM
        out_1, (hn_1, cn_1) = self.lstm(input_1)
        # Прогоняем второй набор признаков через LSTM
        out_2, (hn_2, cn_2) = self.lstm(input_2)
        
        # Выводим формы скрытых состояний для отладки
        #print(f"Форма hn_1: {hn_1.shape}, Форма hn_2: {hn_2.shape}")
        
        # Соединяем последние скрытые состояния (последний слой LSTM)
        # Убедимся, что используем правильные оси для объединения
        combined = torch.cat((hn_1[-1], hn_2[-1]), dim=-1)  # Соединяем вдоль последней оси
        
        # Пропускаем через полносвязный слой и сигмоиду
        output = self.fc(combined)
        output = self.sigmoid(output)
        return output
    

# Параметры модели
input_size = 246  # Количество признаков
hidden_size = 128  # Размер скрытого слоя
num_layers = 2  # Количество слоев LSTM

model = LSTMModel(input_size, hidden_size, num_layers)

In [47]:
import torch.optim as optim
import torch.nn as nn

# Определим функцию потерь и оптимизатор
criterion = nn.BCELoss()  # Бинарная кросс-энтропия
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Датасет и загрузчики данных
from torch.utils.data import DataLoader, TensorDataset, random_split

# Соединяем данные в тензор-датасет
dataset = TensorDataset(train_x_tensor, train_y_tensor)
train_size = int(0.8 * len(dataset))  # 80% на обучение
test_size = len(dataset) - train_size  # 20% на тестирование
train_dataset, test_dataset = random_split(dataset, [train_size, test_size])

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32,shuffle=False)

# Функция для обучения модели
def train_model(model, train_loader, criterion, optimizer, num_epochs=10):
    model.train()
    for epoch in range(num_epochs):
        running_loss = 0.0
        for pairs, labels in train_loader:
            input_1, input_2 = pairs[:, 0], pairs[:, 1]

            # Прямой проход
            outputs = model(input_1, input_2).squeeze()

            # Вычисляем потери
            loss = criterion(outputs, labels)

            # Обратное распространение и шаг оптимизации
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            running_loss += loss.item()

        print(f"Эпоха [{epoch+1}/{num_epochs}], Потери: {running_loss / len(train_loader)}")

# Обучаем модель
train_model(model, train_loader, criterion, optimizer)

KeyboardInterrupt: 

In [51]:
def manual_test(model, data_tensor):
    """
    Функция для ручной проверки модели.
    Позволяет выбрать два объекта и временной промежуток, чтобы проверить предсказание модели.
    """
    model.eval()  # Переводим модель в режим оценки (без обновления градиентов)
    
    # Ввод данных от пользователя
    subject_1 = int(input("Введите индекс первого субъекта (0-319): "))
    subject_2 = int(input("Введите индекс второго субъекта (0-319): "))
    timestep_1 = int(input("Введите индекс временного шага для первого субъекта (0-9): "))
    timestep_2 = int(input("Введите индекс временного шага для второго субъекта (0-9): "))

    # Извлекаем два набора признаков для выбранных субъектов и временных шагов
    input_1 = dataset[subject_1][0][0, timestep_1, :]  # Признаки первого субъекта на временном шаге
    input_2 = dataset[subject_2][0][1, timestep_2, :]  # Признаки второго субъекта на временном шаге

    # Объединяем два набора векторов и готовим для модели
    input_1 = input_1.unsqueeze(0)  # Добавляем batch dimension
    input_2 = input_2.unsqueeze(0)
    # Прогон через модель
    with torch.no_grad():
        prediction = model(input_1, input_2)
        print(f"Предсказанная вероятность: {prediction.item():.4f}")
        print(f"Предсказанная метка (0 - разные субъекты, 1 - один и тот же субъект): {(prediction > 0.5).item()}")

manual_test(model,test_dataset)


Предсказанная вероятность: 1.0000
Предсказанная метка (0 - разные субъекты, 1 - один и тот же субъект): True


In [48]:
def automatic_test(model, test_loader):
    """
    Функция для автоматической проверки модели на тестовом наборе данных.
    """
    model.eval()  # Переводим модель в режим оценки
    correct = 0
    total = 0
    
    with torch.no_grad():  # Отключаем градиенты, т.к. обучение не требуется
        for input_1, input_2, labels in test_loader:
            # Прогоняем пары данных через модель
            outputs = model(input_1, input_2).squeeze()  # Получаем предсказания от модели
            predictions = (outputs > 0.5).float()  # Преобразуем вероятности в бинарные метки (0 или 1)
            
            # Считаем количество правильных предсказаний
            correct += (predictions == labels).sum().item()
            total += labels.size(0)

    accuracy = correct / total  # Вычисляем точность
    print(f'Точность модели: {accuracy * 100:.2f}%')
    return accuracy

# Запуск теста:
automatic_test(model, test_loader)

ValueError: not enough values to unpack (expected 3, got 2)