* If you're running this on Google Colab, please uncomment and run the cell below.

In [17]:
%matplotlib inline
import matplotlib.pyplot as plt
from tqdm import trange

import time

import torch
import torch.nn as nn
import torch.optim as optim
from IPython import display
import numpy as np
import os

## 1. SPINN

In [114]:
class SPINN(nn.Module):
    def __init__(self, features, alpha=0.05):
        super().__init__()
        self.features = features
        self.alpha = alpha  # коэффициент диффузии
        
        # Создаем слои для трех координат (t, x, y)
        self.networks = nn.ModuleList([
            self._build_network() for _ in range(3)
        ])
        
        # Слои для объединения выходов
        self.combine_layer1 = nn.Linear(features[-1] * 2, features[-1])
        self.combine_layer2 = nn.Linear(features[-1] * 2, features[-1])
        self.final_layer = nn.Linear(features[-1], 1)
        self.activation = nn.Tanh()
    
    def _build_network(self):
        layers = []
        layers.append(nn.Linear(1, self.features[0]))
        layers.append(nn.Tanh())
        
        for i in range(len(self.features) - 2):
            layers.append(nn.Linear(self.features[i], self.features[i + 1]))
            layers.append(nn.Tanh())
            
        layers.append(nn.Linear(self.features[-2], self.features[-1]))
        layers.append(nn.Tanh())
        return nn.Sequential(*layers)
    
    def _ensure_2d(self, x):
        if x.dim() == 1:
            return x.unsqueeze(1)
        return x
    
    def forward(self, t, x, y):
        # Преобразуем входы в 2D тензоры [batch_size, 1]
        t = self._ensure_2d(t)
        x = self._ensure_2d(x)
        y = self._ensure_2d(y)
        
        # Пропускаем через отдельные сети
        t_features = self.networks[0](t)
        x_features = self.networks[1](x)
        y_features = self.networks[2](y)
        
        combined = torch.cat([t_features, x_features], dim=1)
        combined = self.activation(self.combine_layer1(combined))
        
        combined = torch.cat([combined, y_features], dim=1)
        combined = self.activation(self.combine_layer2(combined))
        
        # Финальный слой
        output = self.final_layer(combined)
        return output.squeeze(-1)


class DiffusionLoss:
    def __init__(self, model, alpha=None):
        self.model = model
        self.alpha = alpha if alpha is not None else model.alpha

    def residual_loss(self, t, x, y):
        t.requires_grad_(True)
        x.requires_grad_(True)
        y.requires_grad_(True)
        
        u = self.model(t, x, y)
        
        # Производная по времени
        ut = torch.autograd.grad(
            u.sum(), t, 
            create_graph=True,
            retain_graph=True
        )[0]
        
        # Производные по x
        ux = torch.autograd.grad(
            u.sum(), x,
            create_graph=True,
            retain_graph=True
        )[0]
        
        uxx = torch.autograd.grad(
            ux.sum(), x,
            create_graph=True,
            retain_graph=True
        )[0]
        
        # Производные по y
        uy = torch.autograd.grad(
            u.sum(), y,
            create_graph=True,
            retain_graph=True
        )[0]
        
        uyy = torch.autograd.grad(
            uy.sum(), y,
            create_graph=True,
            retain_graph=True
        )[0]
        
        # Уравнение диффузии: ut - alpha * (uxx + uyy) = 0
        residual = ut - self.alpha * (uxx + uyy)
        return torch.mean(residual**2)

    def initial_loss(self, t, x, y, u_true):
        """Вычисляет ошибку в начальных условиях."""
        u_pred = self.model(t, x, y)
        return torch.mean((u_pred - u_true)**2)

    def boundary_loss(self, tb_list, xb_list, yb_list):
        """
        Вычисляет ошибку на границе (нулевые граничные условия).
        
        Args:
            tb_list, xb_list, yb_list: списки тензоров для границ
        """
        loss = 0.0
        
        for i in range(len(tb_list)):
            tb = tb_list[i]
            xb = xb_list[i]
            yb = yb_list[i]
            
            # Проверяем размеры и корректируем их при необходимости
            if tb.shape[0] == 1 and xb.shape[0] > 1:
                # Растягиваем tb до размера xb
                tb = tb.expand(xb.shape)
            elif xb.shape[0] == 1 and tb.shape[0] > 1:
                # Растягиваем xb до размера tb
                xb = xb.expand(tb.shape)
                
            if tb.shape[0] == 1 and yb.shape[0] > 1:
                # Растягиваем tb до размера yb
                tb = tb.expand(yb.shape)
            elif yb.shape[0] == 1 and tb.shape[0] > 1:
                # Растягиваем yb до размера tb
                yb = yb.expand(tb.shape)
                
            if xb.shape[0] == 1 and yb.shape[0] > 1:
                # Растягиваем xb до размера yb
                xb = xb.expand(yb.shape)
            elif yb.shape[0] == 1 and xb.shape[0] > 1:
                # Растягиваем yb до размера xb
                yb = yb.expand(xb.shape)
            
            # Теперь все тензоры должны иметь одинаковый размер в первом измерении
            u_pred = self.model(tb, xb, yb)
            loss += torch.mean(u_pred**2)
            
        return loss / len(tb_list)


# Функция шага оптимизации
def update_model(model, optimizer, train_data):
    optimizer.zero_grad()
    loss = DiffusionLoss(model)(*train_data)
    loss.backward()
    optimizer.step()
    return loss.item()



## 2. Data generator

In [115]:
def spinn_train_generator_diffusion3d(nc, seed=None):
    # Setup random seed
    if seed is not None:
        torch.manual_seed(seed)
    
    # colocation points
    tc = torch.rand(nc, 1)
    xc = torch.rand(nc, 1) * 2 - 1  # uniform from -1 to 1
    yc = torch.rand(nc, 1) * 2 - 1  # uniform from -1 to 1
    
    # initial points
    ti = torch.zeros(nc, 1)
    xi = xc
    yi = yc
    
    # Create meshgrid for initial conditions
    xi_flat = xi.flatten()
    yi_flat = yi.flatten()
    xi_mesh, yi_mesh = torch.meshgrid(xi_flat, yi_flat, indexing='ij')
    
    # Compute initial conditions
    ui = 0.25 * torch.exp(-((xi_mesh - 0.3)**2 + (yi_mesh - 0.2)**2) / 0.1) + \
         0.4 * torch.exp(-((xi_mesh + 0.5)**2 + (yi_mesh + 0.1)**2) * 15) + \
         0.3 * torch.exp(-(xi_mesh**2 + (yi_mesh + 0.5)**2) * 20)
    
    # boundary points (hard-coded)
    tb = [tc, tc, tc, tc]
    xb = [torch.tensor([[-1.]], dtype=torch.float32),  # форма [1, 1]
        torch.tensor([[1.]], dtype=torch.float32),   # форма [1, 1]
        xc,                                          # форма [1000, 1]
        xc]                                          # форма [1000, 1]
    yb = [yc,                                          # форма [1000, 1]
        yc,                                          # форма [1000, 1]
        torch.tensor([[-1.]], dtype=torch.float32),  # форма [1, 1]
        torch.tensor([[1.]], dtype=torch.float32)]   # форма [1, 1]
    
    return tc, xc, yc, ti, xi, yi, ui, tb, xb, yb


def spinn_test_generator_diffusion3d(nc_test, data_dir="/home/user/SPINN_PyTorch/data/diffusion3d"):
    """
    Генерирует тестовые данные для трехмерной диффузии на основе предварительно 
    сохраненных файлов решений.
    
    Args:
        nc_test: Количество тестовых точек (для совместимости с интерфейсом)
        data_dir: Директория с сохраненными файлами решений
        
    Returns:
        Кортеж с данными (t, x, y, z, u_gt, tm, xm, ym, zm)
    """
    
    u_gt = []
    tt = 0.0
    
    # Загружаем сохраненные решения для разных моментов времени
    for _ in range(101):
        file_path = os.path.join(data_dir, f'heat_gaussian_{tt:.2f}.npy')
        u_gt.append(torch.from_numpy(np.load(file_path)))
        tt += 0.01
    
    u_gt = torch.stack(u_gt)
    
    # Создаем сетки для координат
    t = torch.linspace(0.0, 1.0, u_gt.shape[0])
    x = torch.linspace(-1.0, 1.0, u_gt.shape[1])
    y = torch.linspace(-1.0, 1.0, u_gt.shape[2])
    
    # Проверяем размерность данных
    if len(u_gt.shape) > 3:  # 3D case (t, x, y, z)
        z = torch.linspace(-1.0, 1.0, u_gt.shape[3])
    else:  # 2D case (t, x, y)
        z = torch.tensor([0.0])  # Одна точка для z
    
    # Отключаем отслеживание градиентов для координатных сеток
    t = t.detach()
    x = x.detach()
    y = y.detach()
    z = z.detach()
    
    # Создаем меш-сетки
    if len(u_gt.shape) > 3:  # 3D case (t, x, y, z)
        tm, xm, ym, zm = torch.meshgrid(t, x, y, z, indexing='ij')
    else:  # 2D case (t, x, y)
        tm, xm, ym = torch.meshgrid(t, x, y, indexing='ij')
        # Создаем фиктивную координату z для совместимости с 3D моделью
        zm = torch.zeros_like(ym)
        # Добавляем фиктивное измерение z к u_gt
        u_gt = u_gt.unsqueeze(-1)
    
    # Форматируем данные для входа в модель
    t_flat = t.reshape(-1)
    x_flat = x.reshape(-1)
    y_flat = y.reshape(-1)
    z_flat = z.reshape(-1) if len(z.shape) > 0 else z
    u_gt_flat = u_gt.reshape(-1)
    
    return t_flat, x_flat, y_flat, z_flat, u_gt_flat, tm, xm, ym, zm


## 3. Utils

In [116]:
def relative_l2(u_pred, u_true):
    return torch.sqrt(torch.sum((u_pred - u_true)**2) / torch.sum(u_true**2))


# Функция для визуализации результатов
def plot_diffusion3d(t, x, y, u_pred, u_gt=None):
    """
    Визуализирует решение уравнения диффузии
    
    Аргументы:
        t, x, y: координаты точек
        u_pred: предсказанное решение
        u_gt: точное решение (если доступно)
    """
    # Выбираем несколько временных срезов для визуализации
    time_slices = [0, len(t)//4, len(t)//2, 3*len(t)//4, -1]
    
    plt.figure(figsize=(15, 10))
    for i, t_idx in enumerate(time_slices):
        plt.subplot(2, 3, i+1)
        
        # Извлекаем данные для выбранного временного среза
        t_slice = t[t_idx].item()
        u_slice = u_pred[t_idx]
        
        # Создаем 2D тепловую карту для данного временного среза
        plt.pcolormesh(x[t_idx], y[t_idx], u_slice, cmap='viridis', shading='auto')
        plt.colorbar(label='u')
        plt.title(f't = {t_slice:.2f}')
        plt.xlabel('x')
        plt.ylabel('y')
    
    # Если доступно точное решение, показываем ошибку на последнем графике
    if u_gt is not None:
        plt.subplot(2, 3, 6)
        error = torch.abs(u_pred - u_gt)
        error_mean = error.mean().item()
        error_max = error.max().item()
        
        plt.pcolormesh(x[-1], y[-1], error[-1], cmap='hot', shading='auto')
        plt.colorbar(label='Error')
        plt.title(f'Error (mean: {error_mean:.2e}, max: {error_max:.2e})')
        plt.xlabel('x')
        plt.ylabel('y')
    
    plt.tight_layout()
    plt.show()

## 4. Main function

In [121]:
def main(NC, NC_TEST, SEED, LR, EPOCHS, N_LAYERS, FEATURES, LOG_ITER, ALPHA=0.05):
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    torch.manual_seed(SEED)
    
    feat_sizes = [FEATURES] * N_LAYERS
    model = SPINN(feat_sizes, alpha=ALPHA).to(device)
    criterion = DiffusionLoss(model)
    optimizer = optim.Adam(model.parameters(), lr=LR)
    
    # Списки для хранения значений лоссов
    loss_history = []
    residual_loss_history = []
    initial_loss_history = []
    boundary_loss_history = []
    error_history = []
    
    # Используем существующие функции генерации данных
    train_data = spinn_train_generator_diffusion3d(NC, seed=SEED)
    
    # Обработка данных перед отправкой на устройство
    tc, xc, yc, ti, xi, yi, ui, tb, xb, yb = train_data
    
    # Перемещаем данные на устройство
    tc = tc.to(device)
    xc = xc.to(device)
    yc = yc.to(device)
    ti = ti.to(device)
    xi = xi.to(device)
    yi = yi.to(device)
    ui = ui.to(device)
    tb = [t.to(device) for t in tb]
    xb = [x.to(device) for x in xb]
    yb = [y.to(device) for y in yb]
    
    # Объединяем данные обратно

    train_data = (tc, xc, yc, ti, xi, yi, ui, tb, xb, yb)
    
    # Загрузка тестовых данных
    t_test, x_test, y_test, z_test, u_gt, tm, xm, ym, zm = spinn_test_generator_diffusion3d(NC_TEST)
    t_test = t_test.to(device)
    x_test = x_test.to(device)
    y_test = y_test.to(device)
    u_gt = u_gt.to(device)
    
    # Проверка размерности тестовых данных
    if t_test.shape[0] != x_test.shape[0] or t_test.shape[0] != y_test.shape[0]:
        # Если разные размеры, создаем сетку координат для оценки
        n_points = u_gt.shape[0]
        t_grid = t_test.reshape(-1, 1).repeat(1, n_points).reshape(-1)
        x_grid = x_test.repeat(n_points)
        y_grid = y_test.repeat(n_points)
        t_test, x_test, y_test = t_grid, x_grid, y_grid
    
    pbar = trange(1, EPOCHS + 1)
    best_error = float('inf')
    
    for e in pbar:
        if e % 100 == 0:
            # Обновляем обучающие данные каждые 100 эпох
            train_data = spinn_train_generator_diffusion3d(NC, seed=SEED+e)
            
            # Обработка новых данных
            tc, xc, yc, ti, xi, yi, ui, tb, xb, yb = train_data
            
            # Перемещаем данные на устройство
            tc = tc.to(device)
            xc = xc.to(device)
            yc = yc.to(device)
            ti = ti.to(device)
            xi = xi.to(device)
            yi = yi.to(device)
            ui = ui.to(device)
            tb = [t.to(device) for t in tb]
            xb = [x.to(device) for x in xb]
            yb = [y.to(device) for y in yb]
            
            # Объединяем данные обратно
            train_data = (tc, xc, yc, ti, xi, yi, ui, tb, xb, yb)
        
        optimizer.zero_grad()
        
        # Распаковываем данные
        tc, xc, yc, ti, xi, yi, ui, tb, xb, yb = train_data
        
        # Вычисляем компоненты функции потерь
        loss_residual = criterion.residual_loss(tc, xc, yc)
        loss_initial = criterion.initial_loss(ti, xi, yi, ui)
        loss_boundary = criterion.boundary_loss(tb, xb, yb)
        
        # Общие потери
        loss = loss_residual + loss_initial + loss_boundary
        
        loss.backward()
        optimizer.step()
        
        # Сохраняем значения лоссов
        loss_history.append(loss.item())
        residual_loss_history.append(loss_residual.item())
        initial_loss_history.append(loss_initial.item())
        boundary_loss_history.append(loss_boundary.item())
        
        if e % LOG_ITER == 0:
            with torch.no_grad():
                model.eval()
                # Вычисляем предсказание на тестовых данных
                u_pred = model(t_test, x_test, y_test)
                print("u_pred.shape", u_pred.shape)
                print("u_gt.shape", u_gt.shape)
                print("tm.shape", tm.shape)
                error = relative_l2(u_pred, u_gt)
                error_history.append(error.item())
                
                display.clear_output(wait=True)
                
                # Сохраняем лучший результат
                if error < best_error:
                    best_error = error
                    # Визуализация решения
                    t_slice_indices = [0, 25, 50, 75, 100]  # Индексы временных срезов
                    print("u_pred.shape", u_pred.shape)
                    print("u_gt.shape", u_gt.shape)
                    print("tm.shape", tm.shape)
                    plot_diffusion_solution(tm, xm, ym, u_pred.reshape(tm.shape), u_gt.reshape(tm.shape), 
                                           t_slice_indices)
                
                # Визуализация лоссов
                plt.figure(figsize=(15, 5))
                plt.subplot(121)
                plt.semilogy(loss_history, label='Total Loss')
                plt.semilogy(residual_loss_history, label='Residual Loss')
                plt.semilogy(initial_loss_history, label='Initial Loss')
                plt.semilogy(boundary_loss_history, label='Boundary Loss')
                plt.grid(True)
                plt.legend()
                plt.xlabel('Iteration')
                plt.ylabel('Loss (log scale)')
                plt.title('Training Losses')
                
                plt.subplot(122)
                plt.semilogy(range(0, len(error_history) * LOG_ITER, LOG_ITER), error_history, 'r-', label='Relative L2 Error')
                plt.grid(True)
                plt.legend()
                plt.xlabel('Iteration')
                plt.ylabel('Error (log scale)')
                plt.title('Relative L2 Error')
                plt.tight_layout()
                plt.show()
                
                pbar.set_description(
                    f'Loss: {loss.item():.2e} '
                    f'(R: {loss_residual.item():.2e}, '
                    f'I: {loss_initial.item():.2e}, '
                    f'B: {loss_boundary.item():.2e}), '
                    f'Error: {error.item():.2e}'
                )
                model.train()
    
    print(f'\nTraining completed! Best error: {best_error:.2e}')
    
    # Финальная визуализация всех лоссов
    plt.figure(figsize=(15, 5))
    plt.subplot(121)
    plt.semilogy(loss_history, label='Total Loss')
    plt.semilogy(residual_loss_history, label='Residual Loss')
    plt.semilogy(initial_loss_history, label='Initial Loss')
    plt.semilogy(boundary_loss_history, label='Boundary Loss')
    plt.grid(True)
    plt.legend()
    plt.xlabel('Iteration')
    plt.ylabel('Loss (log scale)')
    plt.title('Final Training Losses')
    
    plt.subplot(122)
    plt.semilogy(range(0, len(error_history) * LOG_ITER, LOG_ITER), error_history, 'r-', label='Relative L2 Error')
    plt.grid(True)
    plt.legend()
    plt.xlabel('Iteration')
    plt.ylabel('Error (log scale)')
    plt.title('Final Relative L2 Error')
    plt.tight_layout()
    plt.show()
    
    return model, best_error, error_history


## 5. Run!

In [122]:
PARAMS = {
    'NC': 1000,        # количество точек коллокации
    'NC_TEST': 1000,   # количество тестовых точек
    'SEED': 42,        # случайное зерно
    'LR': 1e-3,        # скорость обучения
    'EPOCHS': 10000,   # количество эпох
    'N_LAYERS': 4,     # количество слоев
    'FEATURES': 100,   # количество нейронов в слое
    'LOG_ITER': 1000,  # частота логирования
}


model, best_error, error_history = main(**PARAMS)

 10%|▉         | 999/10000 [00:37<05:38, 26.61it/s]

u_pred.shape torch.Size([101])
u_gt.shape torch.Size([1030301])
tm.shape torch.Size([101, 101, 101])





RuntimeError: The size of tensor a (101) must match the size of tensor b (1030301) at non-singleton dimension 0

In [8]:
error_history

[0.6878677606582642,
 0.45009270310401917,
 0.35234466195106506,
 0.29060640931129456,
 0.22788578271865845,
 0.1717708259820938,
 0.13801142573356628,
 0.11588618159294128,
 0.09456562250852585,
 0.08465206623077393]