In [None]:
import math
import os
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from tqdm import tqdm
import matplotlib.pyplot as plt

# 1. КОНСТАНТЫ, ИМПОРТЫ И ВСПОМОГАТЕЛЬНЫЕ ФУНКЦИИ

# Параметры данных
N_POINTS = 2048   # Длина входного сигнала КПМГ
T_MAX = 12.282    # Максимальное время сигнала КПМГ в секундах(ограничение, при больших Т2)
NOISE_STD = 0.006 # Уровень шума

# диапазно значений Т2
T2_MIN = 0.1
T2_MAX = 5.0

# Массив времени
T_ECHOES = torch.linspace(0, T_MAX, N_POINTS)
CHECKPOINT_PATH = "best_nmr_param_cnn.pth" #загрузка модели, которая была обучена
NUM_WORKERS = 2

# Инициализация Весов
def init_weights(m):
    """Инициализирует веса для слоев Linear и Conv1d (Kaiming Uniform)."""
    if isinstance(m, (nn.Linear, nn.Conv1d)):
        nn.init.kaiming_uniform_(m.weight, nonlinearity='relu')
        if m.bias is not None:
            nn.init.constant_(m.bias, 0)


# 2. КЛАСС ДАТАСЕТА (Версия для создания датасеты параллельно на ОЗУ во время обучения для разгрузки памяти)

class NMROnTheFlyDataset(Dataset):
    def __init__(self, num_samples: int):
        self.num_samples = num_samples
        self.W_MIN = 0.1

    def __len__(self):
        return self.num_samples

    def __getitem__(self, idx: int):
        rng = torch.Generator()
        rng.manual_seed(idx % (2**32))

        # Генерация 1-4 компонент
        probs = torch.tensor([0.25, 0.25, 0.25, 0.25])
        n_comp = torch.multinomial(probs, 1, generator=rng).item() + 1

        # Генерация T2 (Линейное пространство в диапазоне [0.1 с, 5.0 с])
        u = torch.rand(n_comp, generator=rng)
        T2 = u * (T2_MAX - T2_MIN) + T2_MIN
        T2, _ = torch.sort(T2)

        # Генерация весов w с минимальной долей 0.1
        W_RANGE = 1.0 - self.W_MIN
        w = torch.rand(n_comp, generator=rng) * W_RANGE + self.W_MIN
        w = w / w.sum() # Нормализация, чтобы сумма = 1

        # 1. ГЕНЕРАЦИЯ И НОРМАЛИЗАЦИЯ СИГНАЛА
        signal = torch.zeros(N_POINTS)
        for i in range(n_comp):
            signal += w[i] * torch.exp(-T_ECHOES / T2[i])
        signal += torch.normal(0.0, NOISE_STD, size=(N_POINTS,), generator=rng)
        signal = torch.clamp(signal, min=1e-10)

        # Нормализация сигнала
        signal_norm_data = signal
        mean = signal_norm_data.mean()
        std = signal_norm_data.std() + 1e-8
        signal_norm = (signal_norm_data - mean) / std

        # 2. Создание истинного значения
        y_param_true = torch.zeros(8)
        y_param_true[:n_comp] = T2
        y_param_true[4:4+n_comp] = w

        return signal_norm.float().unsqueeze(0), y_param_true.float(), n_comp


# 3. КЛАСС МОДЕЛИ
class NMRParamCNN(nn.Module):
    def __init__(self):
        super().__init__()

        self.conv_layers = nn.Sequential(
            nn.Conv1d(1, 16, kernel_size=16, stride=1, padding='same'), nn.ReLU(),
            nn.MaxPool1d(kernel_size=2),

            nn.Conv1d(16, 32, kernel_size=16, stride=1, padding='same'), nn.ReLU(),
            nn.MaxPool1d(kernel_size=2),

            nn.Conv1d(32, 64, kernel_size=8, stride=1, padding='same'), nn.ReLU(),
            nn.MaxPool1d(kernel_size=2),

            nn.Conv1d(64, 128, kernel_size=8, stride=1, padding='same'), nn.ReLU(),
            nn.MaxPool1d(kernel_size=2),

            nn.Conv1d(128, 256, kernel_size=8, stride=1, padding='same'), nn.ReLU(),
            nn.MaxPool1d(kernel_size=2)
        )

        CONV_OUTPUT_SIZE = 256 * 64

        self.fc_layers = nn.Sequential(
            nn.Linear(CONV_OUTPUT_SIZE, 4096), nn.ReLU(), nn.Dropout(0.2),
            nn.Linear(4096, 1024), nn.ReLU(), nn.Dropout(0.2),
            nn.Linear(1024, 256), nn.ReLU(), nn.Dropout(0.2),
            nn.Linear(256, 8)
        )

    def forward(self, x):
        x = self.conv_layers(x)
        x = x.flatten(1)
        x = self.fc_layers(x)

        T2_out = x[:, :4]
        w_out = x[:, 4:]

        T2_out = torch.relu(T2_out)
        w_out = F.softmax(w_out, dim=1)

        return torch.cat([T2_out, w_out], dim=1)

# 4. СПЕЦИАЛИЗИРОВАННАЯ ФУНКЦИЯ ПОТЕРИ

def param_loss_fn(pred, target, n_comp, loss_func=nn.L1Loss(reduction='none')):
    """
    Вычисляет L1 Loss только для существующих компонент T2 и w.
    """
    batch_size = pred.size(0)
    total_loss = 0.0

    for i in range(batch_size):
        num = n_comp[i].item()

        if num > 0:
            loss_t2 = loss_func(pred[i, :num], target[i, :num]).mean()
            loss_w = loss_func(pred[i, 4:4+num], target[i, 4:4+num]).mean()

            sample_loss = loss_t2 + loss_w
            total_loss += sample_loss

    return total_loss / batch_size


# 5. ФУНКЦИЯ ДЛЯ ПРОВЕРКИ
def test_model(model: nn.Module, dataset: Dataset, device: torch.device):
    """Проверяет модель на одном тестовом сэмпле с фиксированным seed=42,
    отображая все 4 предсказанные компоненты."""

    model.eval()

    idx = 42
    x, y_params_true, n_comp_true = dataset[idx]
    x = x.unsqueeze(0).to(device)

    with torch.no_grad():
        pred_params_tensor = model(x).squeeze().cpu()

    # --- ИСТИННЫЕ ПАРАМЕТРЫ ---
    T2_t_list = [f"{y_params_true[j].item():.4f} с" for j in range(4) if y_params_true[j+4] > 1e-8]
    w_t_list = [f"{y_params_true[j+4].item():.3f}" for j in range(4) if y_params_true[j+4] > 1e-8]

    print("\n" + "="*70)
    print(f"Истина ({n_comp_true} компонент): T2: {', '.join(T2_t_list)} | Доли: {', '.join(w_t_list)}")

    # --- ПРЕДСКАЗАННЫЕ ПАРАМЕТРЫ ---
    pred_t2 = pred_params_tensor[:4].numpy()
    pred_w = pred_params_tensor[4:].numpy()

    pred_pairs = sorted([(pred_t2[i], pred_w[i]) for i in range(4)], key=lambda x: x[0])

    T2_p_list = [f"{t2:.4f} с" for t2, w in pred_pairs]
    w_p_list = [f"{w:.3f}" for t2, w in pred_pairs]
    total_pred_w_sum = np.sum([w for t2, w in pred_pairs])

    print(f"Предсказание (4 компонент): T2: {', '.join(T2_p_list)} | Доли: {', '.join(w_p_list)}")
    print(f"Сумма предсказанных долей: {total_pred_w_sum:.3f}")

    print("="*70)


# 6. ЦИКЛ ОБУЧЕНИЯ

def train_loop():
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    print(f"Устройство: {device}")

    train_ds = NMROnTheFlyDataset(2_000_000)
    val_ds   = NMROnTheFlyDataset(100_000)

    train_loader = DataLoader(train_ds, batch_size=128, shuffle=True, num_workers=NUM_WORKERS, pin_memory=True)
    val_loader   = DataLoader(val_ds, batch_size=128, shuffle=False, num_workers=NUM_WORKERS, pin_memory=True)

    model = NMRParamCNN().to(device)

    # Загрузка предыдущей сохраненной модели
    if os.path.exists(CHECKPOINT_PATH):
        model.load_state_dict(torch.load(CHECKPOINT_PATH, map_location=device))
        print("Загружена сохранённая CNN-модель.")
    else:
        model.apply(init_weights)
        print("Чекпоинт не найден. Обучение начинается с нуля.")

    optimizer = optim.AdamW(model.parameters(), lr=3e-4, weight_decay=1e-4)
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', factor=0.5, patience=4)

    best_val = float('inf')
    EPOCHS = 200

    for epoch in range(1, EPOCHS + 1):
        model.train()
        train_loss = 0.0

        for x, y_param_true, n_comp in tqdm(train_loader, desc=f"Epoch {epoch:3d}"):
            x, y_param_true = x.to(device), y_param_true.to(device)

            pred = model(x)
            loss = param_loss_fn(pred, y_param_true, n_comp)

            optimizer.zero_grad()
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
            optimizer.step()
            train_loss += loss.item()

        train_loss /= len(train_loader)

        model.eval()
        val_loss = 0.0
        with torch.no_grad():
            for x, y_param_true, n_comp in val_loader:
                x, y_param_true = x.to(device), y_param_true.to(device)
                val_loss += param_loss_fn(model(x), y_param_true, n_comp).item()
        val_loss /= len(val_loader)

        scheduler.step(val_loss)

        print(f"Epoch {epoch:3d} | Train {train_loss:.5f} | Val {val_loss:.5f}")

        if val_loss < best_val:
            best_val = val_loss
            torch.save(model.state_dict(), CHECKPOINT_PATH)
            print(f"  BEST! val = {val_loss:.5f} → сохранено")

        if epoch % 5 == 0 or val_loss == best_val:
            test_model(model, val_ds, device)

    print("Готово. Лучший val_loss:", best_val)


if __name__ == '__main__':
    train_loop()