In [43]:
import numpy as np

import csv
from tqdm import tqdm

import matplotlib
import matplotlib.pyplot as plt
import scienceplots
plt.style.use(['science'])
matplotlib.rcParams["font.size"] = "12"

import torch
import torch.nn as nn
from torch.autograd import grad

from scipy.integrate import solve_ivp

In [44]:
class FeedForwardNetwork(nn.Module):
    def __init__(self, hidden_layers, hidden_dim, input_dim=1, output_dim=1):
        super(FeedForwardNetwork, self).__init__()
        
        self.L = hidden_layers
        self.W = hidden_dim
        
        self.model = nn.Sequential()
        self.activation = nn.Tanh()
        
        inp_linear = nn.Linear(input_dim, hidden_dim)
        out_linear = nn.Linear(hidden_dim, output_dim)
        
        self.model.add_module('input', inp_linear)
        self.model.add_module('activ0', self.activation)
        for i in range(hidden_layers - 1):
            linear = nn.Linear(hidden_dim, hidden_dim)
            self.model.add_module(f'linear{i+1}', linear)
            self.model.add_module(f'activ{i+1}', self.activation)
        self.model.add_module('output', out_linear)
        
    def forward(self, x):
        return self.model(x)
    
    def weights_norm(self):
        norms = []
        with torch.no_grad():
            for name, parameter in self.model.named_parameters():
                norms.append(torch.linalg.norm(parameter))
        return np.array(norms)
                
def rmse(predicts, target):
    '''
    Note that for diffusion both arrays have shape (T, X).
    '''
    return np.sqrt(np.square(predicts - target).mean())

---
## Lotka-Volterra Problem

In [45]:
def plot_lotkavolterra(
    t, predictions, numerical, title=None, figsize=(5, 5), 
    show=True, save=False, path='./plot', dpi=300
    ):
    
    fig = plt.figure(figsize=figsize)
    plt.plot(t, predictions[0], label=r'$\mathcal{X}(t)$')
    plt.plot(t, predictions[1], label=r'$\mathcal{Y}(t)$')
    plt.plot(t, numerical[0], label=r'$x(t)$')
    plt.plot(t, numerical[1], label=r'$y(t)$')
    
    plt.xlabel('t')
    plt.ylabel('Population size')
    
    plt.legend()
    plt.title(title)
    
    if save:
        plt.savefig(f'{path}.png', dpi=dpi)
    
    if show:
        plt.show()
    else:
        plt.close()

In [46]:
class LotkaVolterraProblem:
    def __init__(self, T, params, initial_conditions):
        
        self.T = T
        self.alpha, self.beta, self.delta, self.gamma = params
        self.init_vals = torch.tensor(initial_conditions)

        self.numerical_solution = self._solve()
    
    def loss_initial(self, model):
        zero = torch.tensor([0.], requires_grad=True)
        x = model(zero)
        return torch.mean(torch.square(x - self.init_vals))
    
    def loss_physical(self, model, t):
        xy = model(t)
        x = xy[:,[0]]
        y = xy[:,[1]]

        dX = grad(x, t, grad_outputs=torch.ones_like(x), create_graph=True)[0]
        dY = grad(y, t, grad_outputs=torch.ones_like(y), create_graph=True)[0]
        
        loss_dX = torch.mean(torch.square(dX - self.alpha * x + self.beta * x * y))
        loss_dY = torch.mean(torch.square(dY - self.delta * x * y + self.gamma * y))
        
        return loss_dX, loss_dY
    
    def _solve(self):
        def lotka_volterra(t, y, alpha, beta, delta, gamma):
            x, y = y
            dx_dt = alpha * x - beta * x * y
            dy_dt = delta * x * y - gamma * y
            return [dx_dt, dy_dt]

        solution = solve_ivp(lotka_volterra, 
                             (0, self.T),
                             self.init_vals, 
                             method='RK45',
                             args=(self.alpha, self.beta, self.delta, self.gamma), 
                             t_eval=np.linspace(0, self.T, 128))
        
        return solution.y

In [47]:
def train_lotkavolterra(
    problem,
    model,
    w1, w2, w3, num_iters, N_D, lr,
    print_every=0, collect_every=0, save_every=0
):
    test_points = torch.linspace(0, problem.T, 128).reshape(-1, 1)
    t = torch.linspace(0, problem.T, N_D, requires_grad=True).reshape(-1, 1)

    optimizer = torch.optim.Adam(model.parameters(), lr=lr)

    losses, errors = [], []

    for i in range(num_iters + 1):

        optimizer.zero_grad()

        L_I = problem.loss_initial(model)
        L_X, L_Y = problem.loss_physical(model, t)

        L = w1 * L_I + w2 * L_X + w3 * L_Y

        L.backward()
        optimizer.step()
        
        if collect_every > 0 and i % collect_every == 0:
            preds = model(test_points).detach().numpy()
            x = preds[:,0].flatten()
            y = preds[:,1].flatten()

            error = (rmse(x, problem.numerical_solution[0]) + rmse(y, problem.numerical_solution[1])) / 2

            losses.append(np.array([L_I.item(), L_X.item(), L_Y.item(), L.  item()]))
            errors.append(error)
        
        if print_every > 0 and i % print_every == 0:
            print(f'Iteration {i} --- RMSE {error}')
                
        if save_every > 0 and i % save_every == 0:
            plot_lotkavolterra(
            test_points, [x, y], problem.numerical_solution, title=f'Iteration {i}', figsize=(6, 4), show=False, save=True, path=f'./training/lotkavolterra/Iteration {i}'
        )
    
    return np.array(losses), np.array(errors)

---
## Diffusion Problem

In [48]:
def plot_diffusion(x, t, predictions, size=(10, 5), show=True, save=False, path='./plot', dpi=300):
    X, T = np.meshgrid(x, t)
    fig = plt.figure(figsize=size)

    ax = fig.add_subplot(1, 2, 1, projection='3d')
    ax.plot_surface(X, T, predictions, cmap='viridis')
    ax.set_xlabel('x')
    ax.set_ylabel('t')

    ax = fig.add_subplot(1, 2, 2)
    ax.imshow(predictions, origin='lower', aspect='auto', cmap='viridis', 
              extent=[x.min(), x.max(), t.min(), t.max()])
    ax.set_xlabel('x')
    ax.set_ylabel('t')

    plt.tight_layout()
    
    if save:
        plt.savefig(f'{path}.png', dpi=dpi)
    
    if show:
        plt.show()
    else:
        plt.close()

In [49]:
class DiffusionProblem:
    def __init__(self, D, boundaries, resolution, b_values):
        self.D = D
        self.L, self.R, self.T = boundaries
        self.left_boundary, self.right_boundary, self.initial_values = b_values
        
        self.Nt, self.Nx = resolution
        self.numerical_solution = self._solve()
        self.t = torch.linspace(0, self.T, self.Nt)
        self.x = torch.linspace(self.L, self.R, self.Nx)
    
    def loss_boundary(self, model, t_left, t_right, x_init):
        
        def nearest_index(array, values):
            values = [np.abs(array - v).argmin() for v in values]
            return values
        
        left_values =  self.left_boundary[nearest_index(self.t, t_left)  ]
        right_values = self.right_boundary[nearest_index(self.t, t_right)]
        init_values =  self.initial_values[nearest_index(self.x, x_init) ]
        
        left =  model(torch.vstack([torch.ones_like(t_left)  * self.L, t_left]).T)
        right = model(torch.vstack([torch.ones_like(t_right) * self.R, t_right]).T)
        init =  model(torch.vstack([x_init, torch.zeros_like(x_init)]).T)
        
        left_error = torch.square(left_values - left.flatten()).mean()
        right_error = torch.square(right_values - right.flatten()).mean()
        init_error = torch.square(init_values - init.flatten()).mean()
        
        return left_error + right_error, init_error
    
    def loss_physical(self, model, x, t):
        u = model(torch.hstack([x, t]))
        
        ut =  grad(u,  t, grad_outputs=torch.ones_like(t), create_graph=True)[0]
        ux =  grad(u,  x, grad_outputs=torch.ones_like(x), create_graph=True)[0]
        uxx = grad(ux, x, grad_outputs=torch.ones_like(x), create_graph=True)[0]
    
        return torch.mean(torch.square(ut - self.D * uxx))
        
    def _solve(self):
        Nt, Nx = self.Nt, self.Nx
        
        dx = (self.R - self.L) / Nx     # Spatial step size
        dt = self.T / Nt                # Time step size
        alpha = self.D * dt / (2 * dx ** 2)

        # Crank-Nicholson method
        u = np.zeros((Nt, Nx))

        # Initial condition
        u[0, :] = self.initial_values

        # Boundary conditions
        u[:, 0], u[:, -1] = self.left_boundary, self.right_boundary

        A = np.diag((1 + 2*alpha) * np.ones(Nx)) + np.diag(-alpha * np.ones(Nx-1), 1) + np.diag(-alpha * np.ones(Nx-1), -1)
        B = np.diag((1 - 2*alpha) * np.ones(Nx)) + np.diag(alpha * np.ones(Nx-1), 1) + np.diag(alpha * np.ones(Nx-1), -1)
            
        A_reversed = np.linalg.inv(A)

        for n in range(0, Nt - 1):
            b = np.dot(B, u[n, :])
            u[n+1, :] = A_reversed @ b

        return u

In [50]:
def train_diffusion(
    problem,
    model,
    N_LB, N_RB, N_I, N_D,
    coef, lr, num_iters,
    print_every=0, collect_every=0, save_every=0
):

    x_sampled = torch.tensor(np.random.uniform(problem.L, problem.R, N_I), dtype=torch.float32)
    t_left_sampled =  torch.tensor(np.random.uniform(0, problem.T, N_LB), dtype=torch.float32)
    t_right_sampled = torch.tensor(np.random.uniform(0, problem.T, N_RB), dtype=torch.float32)

    x_pts = torch.tensor(np.random.uniform(problem.L, problem.R, N_D), requires_grad=True, dtype=torch.float32).reshape(-1, 1)
    t_pts = torch.tensor(np.random.uniform(0, problem.T, N_D), requires_grad=True, dtype=torch.float32).reshape(-1, 1)

    test_points = torch.cartesian_prod(problem.x, problem.t)

    optimizer = torch.optim.Adam(model.parameters(), lr=lr)

    losses, errors = [], []

    for i in range(num_iters + 1):

        optimizer.zero_grad()

        L_B, L_I = problem.loss_boundary(model, t_left_sampled, t_right_sampled, x_sampled)
        L_D = problem.loss_physical(model, x_pts, t_pts)

        total = coef * (L_B + L_I) + (1 - coef) * L_D

        total.backward()
        optimizer.step()
                
        if collect_every > 0 and i % collect_every == 0:
            preds = model(test_points).reshape(problem.Nx, problem.Nt).detach().numpy()
                
            error = rmse(preds.T, problem.numerical_solution)
                
            losses.append(np.array([L_I.item(), L_B.item(), L_D.item(), total.item()]))
            errors.append(error)
                
        if print_every > 0 and i % print_every == 0:
            print(f'Iteration {i} --- RMSE {error}')
                
        if save_every > 0 and i % save_every == 0:
            plot_diffusion(problem.x, problem.t, preds.T, show=False, save=True, path=f'./training/diffusion/Iteration {i}')
                
    return np.array(losses), np.array(errors)

---
## Different Tasks and Reference Solutions

### Lotka-Volterra

In [None]:
# Здесь мы определяем проблему.
T = 25
alpha, beta, delta, gamma = 0.4, 0.1, 0.1, 0.6
initial_conditions = [10., 5.]

lv = LotkaVolterraProblem(
    T=T, 
    params=(alpha, beta, delta, gamma), 
    initial_conditions=initial_conditions
    )

# Для данной задачи input_dim и output_dim фиксированы.
L, W = 2, 64
lv_model = FeedForwardNetwork(
    hidden_layers=L, hidden_dim=W,
    input_dim=1, output_dim=2
    )

# Можно задать правило инициализации. 
# Лучше всего работает метод zeros_
lv_model.init_weights(torch.nn.init.zeros_)

# Коэффициенты функции потерь 
w1, w2, w3 = 1, 2, 2

# Количество точек коллокации и гиперпараметры
N_D = 1024
lr, num_iters = 1e-3, 25000

losses, errors = train_lotkavolterra(
    problem=lv, model=lv_model,
    w1=w1, w2=w2, w3=w3, N_D=N_D,
    lr=lr, num_iters=num_iters,
    collect_every=num_iters # Это нужно чтобы выдало errors.
)

final_rmse = errors[-1]

In [None]:
hyperparameters_lv = [L, W, w1, w2, w3, N_D, lr]

### Diffusion

In [None]:
# Коэффициент диффузии
D = 0.5

# Левая, правая границы X, конечный момент времени T
L, R, T = 0, 1, 0.5

# Количество точек по времени и пространству. Их надо
# выбирать тщательно, т.к. может не работать численное решение,
# но можно не менять и всё будет нормально.
Nt, Nx = 1000, 250

# Граничные и начальные условия. Надо, чтобы в (L, 0) 
# и (R, 0) они совпадали, иначе всё сломается.
left_boundary = right_boundary = torch.zeros(Nt)
initial_conditions = torch.sin(2 * np.pi * torch.linspace(L, R, Nx)) ** 2

diff = DiffusionProblem(
    D, (L, R, T), (Nt, Nx),
    (left_boundary, right_boundary, initial_conditions)
)

# Для данной задачи input_dim и output_dim тоже фиксированы.
L, W = 2, 32
diff_model = FeedForwardNetwork(
    hidden_layers=L, hidden_dim=W,
    input_dim=2, output_dim=1
    )

# Количество supervised точек на границах и 
# количество точек коллокации внутри домена.
N_LB, N_RB, N_I, N_D = 64, 64, 128, 2048

# Коэффициент для функции потерь.
coef = 0.8

# Гиперпараметры оптимизатора.
lr, num_iters = 1e-3, 10000

losses, errors = train_diffusion(
    problem=diff, model=diff_model,
    N_LB=N_LB, N_RB=N_RB, N_I=N_I, N_D=N_D,
    coef=coef, lr=lr, num_iters=num_iters,
    collect_every=num_iters # Это нужно чтобы выдало errors.
)

final_rmse = errors[-1]

In [None]:
hyperparameters_diff = [L, W, N_LB, N_RB, N_I, N_D, coef, lr]

---
## Initialization Methods

In [53]:
init_methods = [
    (torch.nn.init.eye_, 'Eye'),
    (torch.nn.init.ones_, 'Ones'),
    (torch.nn.init.zeros_, 'Zeros'),
    (torch.nn.init.normal_, 'Normal'),
    (torch.nn.init.uniform_, 'Uniform'),
    (torch.nn.init.orthogonal_, 'Orthogonal'),
    (torch.nn.init.xavier_normal_, 'Xavier Normal'),
    (torch.nn.init.xavier_uniform_, 'Xavier Uniform'),
    (torch.nn.init.kaiming_normal_, 'Kaiming Normal'),
    (torch.nn.init.kaiming_uniform_, 'Kaiming Uniform'),
]

num_trainings = 10