In [1]:
import os
import sys
import time

import numpy as np
import torch

from torch import nn
from torch import autograd
from torch.utils.data import Dataset, DataLoader

import torch.nn.functional as F
from typing import List, Dict
from torch.nn import MSELoss
from tqdm import tqdm
from IPython.display import clear_output

import matplotlib.pyplot as plt

In [2]:
class CustomDataset(Dataset):
    """
    Custom dataset class. Approachable for problems with time-space input structure
    """

    def __init__(self,
                 domain_points : List[torch.tensor],
                 boundary_points : List[torch.tensor]):
        """
        :param domain_points: list of interior domain points coordinates
        :type domain points: List[torch.tensor]
        :param boundary_points: list of boundary domain points coordinates
        :type boundary_points: List[torch.tensor])
        """

        super(CustomDataset, self).__init__()

        assert len(domain_points) == len(boundary_points), 'Interior and boundary data \
                                                            must have the same size with boundary'

        self.domain_points = domain_points
        self.boundary_points = boundary_points
        self.length = len(self.domain_points)
        self.domain_points.requires_grad = True
        self.boundary_points.requires_grad = True



    def __len__(self):
        return self.length

    def __getitem__(self,
                    idx : int):
        """
        :param idx: datasets idx value
        :param idx: int
        """

        return {'domain_points' : self.domain_points[idx],
                'boundary_points' : self.boundary_points[idx]}

In [3]:
def compute_diff(f : torch.tensor,
                 x : torch.tensor,
                 axis : int = 0,
                 diff_order : int = 1) -> torch.tensor:
    """
    Function for evaluating differential operators for any integer order (diff_order)

    :param f: list of function values
    :type f: torch.tensor
    :param x: list of variable values
    :type x: torch.tensor
    :param axis: idx of dependent variable operator
    :type axis: int
    :param diff_order: order of differential operator
    :type diff_order: int

    :returns: list of derivative values
    :rtype: torch.tensor
    """
    
    jacobian = []
    for j in range(len(f)):
        dfdx_i = autograd.grad(f[j], x, create_graph = True, retain_graph = True)[0]
        dfdx_i = dfdx_i[j]

        if len(x.shape) != 1:
            jacobian.append(dfdx_i[axis])

        else:
            jacobian.append(dfdx_i)

    current_diff = jacobian
    for i in range(1, diff_order):
        for j in range(len(f)):
            dfdx_i = autograd.grad(current_diff[j], x, create_graph = True, retain_graph = True)[0]
            dfdx_i = dfdx_i[j]

            if len(x.shape) != 1:
                current_diff[j] = dfdx_i[axis]

            else:
                current_diff[j] = dfdx_i
        
    return torch.hstack(current_diff)

In [4]:
class VanillaSolver(nn.Module):
    """
    Class for vanilla ODE solver declared as neural network with MLP architechture
    """

    def __init__(self,
                 num_hidden_layers : int = 3):
        """
        :param num_hidden_layers: count of model hidden layers
        :type num_hidden_layers: int
        """

        self.num_hidden_layers = num_hidden_layers
        

        super(VanillaSolver, self).__init__()

        self.InputLayer = nn.Linear(in_features = 1, out_features = 32, bias = False)
        self.OutputLayer = nn.Linear(in_features = 32, out_features = 1, bias = False)

        self.Layers = nn.ModuleList([nn.Linear(in_features = 32, out_features = 32, bias = False) for i in range(self.num_hidden_layers)])


    def forward(self,
                x : torch.tensor) -> torch.tensor:
        """
        :param x: input tensor
        :type x: torch.tensor
        :returns: function value tensor
        :rtype: torch.tensor
        """

        x = self.InputLayer(x)
        
        for layer in self.Layers:
            x = layer(x)
            x = F.elu(x)
        
        x = self.OutputLayer(x)

        return x

In [5]:
class CustomTrainer():

    def __init__(self,
                 model : nn.Module,
                 train_dataset : Dataset,
                 val_dataset : Dataset,
                 optimizer : str,
                 batch_size : int = 64,
                 num_epochs : int = 10, 
                 optimizer_params : Dict = {}):

        """
        :param model: apprximator model
        :type model: nn.Module
        :param: train_dataset: instance of train dataset
        :type train_dataset: CustomDataset(Dataset)
        :param val_dataset: instance of val dataset
        :type val_dataset: CustomDataset(Dataset)
        :param optimizer: optimizer instance from torch.optim(model.parameters())
        :type optimizer: torch.optim
        :param: batch_size: batch size for SGD
        :type: int
        :param: num_epochs: epochs for training count
        :type num_epochs: int
        :param optimizer_params: optimizer config
        :type optimizer_params: Dict
        """

        self.model = model
        self.train_dataset = train_dataset
        self.val_dataset = val_dataset
        self.batch_size = batch_size
        self.num_epochs = num_epochs
        self.optimizer_params = optimizer_params

        self.optimizer = getattr(torch.optim, optimizer)

        if len(self.optimizer_params.items()) != 0:
            self.optimizer = self.optimizer(self.model.parameters(), **self.optimizer_params)

        else:
            self.optimizer = self.optimizer(self.model.parameters())

        self.train_loader = DataLoader(self.train_dataset, 
                                       batch_size = 64,
                                       shuffle = False)
        
    def train(self) -> nn.Module:
        """
        Model training function

        :returns: trained model
        :rtype: nn.Module
        """
        torch.manual_seed(42)
        self.model.train()

        loss_history = np.array([])

        for i in tqdm(range(self.num_epochs)):
            mean_loss = 0
            for batch in self.train_loader:
                
                x_domain = batch['domain_points']
                x_boundary = batch['boundary_points']
                y_domain = self.model(x_domain)
                y_boundary = self.model(x_boundary)

                loss_value = loss_function(y_domain, 
                                           x_domain,
                                           y_boundary,
                                           x_boundary)

                mean_loss += loss_value.item()

                loss_value.backward()
                self.optimizer.step()
                self.optimizer.zero_grad()

            mean_loss /= len(self.train_loader)
            loss_history = np.append(loss_history, mean_loss)

            if i%10 == 0:
                plt.grid()
                plt.title('Loss history')
                plt.plot(loss_history)
                plt.savefig('Loss_history.png')
                plt.show()
                time.sleep(5)
                clear_output()
                
        return self.model

            

In [6]:
loss = MSELoss()

In [7]:
def loss_function(y_domain : torch.tensor,
                  x_domain : torch.tensor,
                  y_boundary : torch.tensor,
                  x_boundary : torch.tensor, 
                  lambda_value : float = 1) -> torch.tensor:
    """
    Physical loss function for ODE numerical solution residuals approximation 

    :param y_domain: domain function values
    :type y_domain: torch.tensor
    :param x_domain: domain variable values
    :type x_domain: torch.tensor
    :param y_boundary: boundary y values
    :type y_boundary: torch.tensor
    :param x_boundary: boundary  values
    :type x_boundary: torch.tensor
    :param lambda_value: constant in loss function
    :type lambda_value: float

    :returns: loss value
    :rtype: torch.tensor
    """
    
    dydx = compute_diff(y_domain,
                        x_domain)

    #dydx = dydx - y_domain

    #l0 = loss(y_domain, dydx)
    l0 = torch.abs((y_domain-dydx).mean()/y_domain).mean()
    #l0 = mape(y_domain.detach().numpy(), dydx.detach().numpy())
    #l0 = torch.abs(dydx - y_domain)
    #l1 = loss(torch.ones(y_boundary.shape), y_boundary)
    l1 = torch.abs((torch.ones(y_boundary.shape)-y_boundary)/torch.ones(y_boundary.shape)).mean()


    return l0 + lambda_value*l1

In [8]:
torch.manual_seed(42)

model = VanillaSolver(num_hidden_layers = 3)
domain_points = torch.linspace(0.01, 1, 1000).reshape(-1, 1)
boundary_points = torch.zeros(1000).reshape(-1, 1)
train_dataset = CustomDataset(domain_points, boundary_points)

trainer = CustomTrainer(
    model = model,
    train_dataset = train_dataset,
    val_dataset = train_dataset,
    optimizer = 'AdamW',
    batch_size = 64,
    num_epochs = 1000)

In [None]:
fitted = trainer.train()

In [None]:
mape(fitted(domain_points).reshape(1, -1)[0].detach(), compute_diff(fitted(domain_points).reshape(1, -1)[0], domain_points).detach())

In [None]:
plt.plot(fitted(domain_points).reshape(1, -1)[0].detach().numpy())
plt.plot(np.exp(domain_points.detach().numpy()), color = 'red')