In [1]:
import torch
import torch.nn            as nn
import torch.nn.functional as F
import torch.optim         as optim
import torch.autograd      as autograd

import numpy as np
import matplotlib.pyplot   as plt

from scipy.integrate import odeint

import abc

In [2]:
def diff(x, t, order):
    ones = torch.ones_like(t)
    der, = autograd.grad(x, t, create_graph=True, grad_outputs=ones)
    for i in range(1, order):
        der, = autograd.grad(der, t, create_graph=True, grad_outputs=ones)
    return der

In [3]:
class FCNN(nn.Module):
    """
    A fully connected neural network.
    """
    def __init__(self, n_hidden_units, n_hidden_layers, actv):
        """
        :param n_hidden_units: number of hidden units in each hidden layer
        :param n_hidden_layers: number of hidden layers
        :param actv: the activation layer used in each hidden layer
        """
        super(FCNN, self).__init__()
        
        layers = []
        layers.append(nn.Linear(1, n_hidden_units))
        layers.append(actv())
        for i in range(n_hidden_layers):
            layers.append(nn.Linear(n_hidden_units, n_hidden_units))
            layers.append(actv())
        layers.append(nn.Linear(n_hidden_units, 1))
        self.NN = torch.nn.Sequential(*layers)

    def forward(self, t):
        x = self.NN(t)
        return x

In [4]:
class Condition:
    def __init__(self):
        pass
    @abc.abstractmethod
    def enforce(self, t, x):
        pass
    
class InitialValueProblem(Condition):
    """
    A initial value problem: 
    x (t=t_0) = x_0
    x'(t=t_0) = x_0_prime
    """
    def __init__(self, t_0, x_0, x_0_prime=None):
        self.t_0, self.x_0, self.x_0_prime = t_0, x_0, x_0_prime
    def enforce(self, t, x):
        if self.x_0_prime: 
            return self.x_0 + (t-self.t_0)*self.x_0_prime + ( (1-torch.exp(-t+self.t_0))**2 )*x
        else:
            return self.x_0 + (1-torch.exp(-t+self.t_0))*x

class TwoPointDirichletBoundaryCondition(Condition):
    """
    A two point Dirichlet boundary condition: 
    x(t=t_0) = x_0
    x(t=t_0) = x_1
    """
    def __init__(self, t_0, x_0, t_1, x_1):
        self.t_0, self.x_0, self.t_1, self.x_1 = t_0, x_0, t_1, x_1
    def enforce(self, t, x):
        t_tilde = (t-self.t_0) / (self.t_1-self.t_0)
        return self.x_0*(1-t_tilde) + self.x_1*t_tilde + (1-torch.exp((1-t_tilde)*t_tilde))*x

In [5]:
class ExampleGenerator:
    def __init__(self, size, t_min=0.0, t_max=1.0, method='uniform'):
        self.size = size
        self.t_min, self.t_max = t_min, t_max
        if   method=='uniform':
            self.examples = torch.zeros(self.size, requires_grad=True)
            self.get_examples = lambda: self.examples + torch.rand(self.size)*(self.t_max-self.t_min) + self.t_min
        elif method=='equally-spaced':
            self.examples = torch.linspace(self.t_min, self.t_max, self.size, requires_grad=True)
            self.get_examples = lambda: self.examples
        elif method=='equally-spaced-noisy':
            self.examples = torch.linspace(self.t_min, self.t_max, self.size, requires_grad=True)
            self.noise_mean = torch.zeros(self.size)
            self.noise_std  = torch.ones(self.size) * ( (t_max-t_min)/size ) / 4.0
            self.get_examples = lambda: self.examples + torch.normal(mean=self.noise_mean, std=self.noise_std)
        else:
            raise ValueError(f'Unknown method: {method}')

In [6]:
class Monitor:
    def __init__(self, t_min, t_max, check_every=100):
        %matplotlib notebook
        self.check_every = check_every
        self.fig = plt.figure(figsize=(20, 8))
        self.ax1 = self.fig.add_subplot(121)
        self.ax2 = self.fig.add_subplot(122)
        # input for plotting
        self.ts_plt =    np.linspace(t_min, t_max, 100) 
        # input for neural network
        self.ts_ann = torch.linspace(t_min, t_max, 100, requires_grad=True).reshape((-1, 1, 1)) 
    def check(self, net, ode, condition, loss_history):
        xs = net(self.ts_ann)
        xs = condition.enforce(self.ts_ann, xs)
        Fxt = ode(xs, self.ts_ann)
        Fxt = Fxt.detach().numpy().flatten()
        xs = xs.detach().numpy().flatten()

        self.ax1.clear()
        self.ax1.plot(self.ts_plt, xs,  label='x(t) by NN')
        self.ax1.plot(self.ts_plt, Fxt, label='F(x,t) by NN')
        self.ax1.legend()
        self.ax1.set_title('solutions')

        self.ax2.clear()
        self.ax2.plot(loss_history)
        self.ax2.set_title('loss during training')
        self.ax2.set_ylabel('loss')
        self.ax2.set_xlabel('epochs')
        self.ax2.set_yscale('log')

        self.fig.canvas.draw()

In [7]:
def solve(ode, condition, t_min, t_max,
          net=None, example_generator=None, optimizer=None, criterion=None, batch_size=16, 
          max_epochs=100000, tol=1e-4,
          monitor=None):
    """
    Train a neural network to solve an ODE.
    
    :param ode: an ode to be solved, it should be a function (x, t) -> F(x, t)
    :param condition: the initial value/boundary condition, a Condition instance
    :param net: a pytorch neural network, a nn.Module instance
    :param t_min: lower bound of the domain on which the ode is solved
    :param t_max: upper bound of the domain on which the ode is solved
    :param example_generator: a ExampleGenerator instance
    :param optimizer: a optimizer from pytorch.optim
    :param criterion: a loss function from pytorch.nn
    :param batch_size: the size of the minibatch
    :param max_epochs: the maximum number of epochs
    :param tol: the training stops if the loss is lower than this value
    :param monitor: a Monitor instance
    """
    
    # default values
    if not net: 
        net = FCNN(n_hidden_units=32, n_hidden_layers=1, actv=nn.Sigmoid)
    if not example_generator: 
        example_generator = ExampleGenerator(32, t_min, t_max, method='equally-spaced-noisy')
    if not optimizer:
        optimizer = optim.Adam(net.parameters(), lr=0.001)
    if not criterion:
        criterion = nn.MSELoss()
    if not monitor:
        monitor = Monitor(t_min, t_max, check_every=100)
    
    n_examples = example_generator.size
    if n_examples%batch_size != 0:
        # todo: I know this is stupid, I'll fix this later
        raise RuntimeError('Please choose a batch_size such that it is a factor of the size of the training set.')
    n_batches = n_examples//batch_size
    zeros = torch.zeros(batch_size)
    
    loss_history = []
    
    for epoch in range(max_epochs):
        loss_epoch = 0.0

        examples = example_generator.get_examples()
        ts_batches = examples.reshape((n_batches, batch_size, 1))
        for ts in ts_batches:     

            xs = net(ts)
            xs = condition.enforce(ts, xs)
            
            Fxt = ode(xs, ts)
            loss = criterion(Fxt, zeros)
            loss_epoch += loss.item()

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
        
        loss_history.append(loss_epoch/n_batches)
        if loss_history[-1] < tol: break
        
        if epoch%monitor.check_every == 0:
            monitor.check(net, ode, condition, loss_history)
    
    def solution(ts):
        if not isinstance(ts, torch.Tensor): ts = torch.tensor([ts], dtype=torch.float32)
        ts = ts.reshape(-1, 1)
        xs = net(ts)
        xs = condition.enforce(ts, xs)
        return xs.detach().numpy()
    if loss_history[-1] > tol:
        print('The solution has not converged.')
        
    return solution, loss_history

A damped harmonic oscillator: 

$$
F(x, t) = \frac{d^2x}{dt^2} + 0.3\frac{dx}{dt} + x = 0
$$

Solve a initial value problem where

$$
x|_{t=0} = 0.4, \frac{dx}{dt}|_{t=0} = -1.6
$$

In [8]:
damped_harmonic_oscillator = lambda x, t: diff(x, t, order=2) + 0.3*diff(x, t, order=1) + x

initial_value = InitialValueProblem(t_0=0.0, x_0=0.4, x_0_prime=-1.6)
solution, _ = solve(ode=damped_harmonic_oscillator, condition=initial_value, t_min=0.0, t_max=4*np.pi)

<IPython.core.display.Javascript object>

The user will be returned a function through which they can query $x$ given $t$.

In [9]:
solution(3.52)

array([[0.08930445]], dtype=float32)

In [10]:
ts_plt = np.linspace(0, 12, 100)
xs_net = solution(ts_plt)

def deriv(u, t):
    x, x_prime = u
    return [x_prime, -0.3*x_prime - x]
xs_num = odeint(deriv, [0.4, -1.6], ts_plt)[:, 0]

plt.figure()
plt.plot(ts_plt, xs_net, label='x(t) by NN')
plt.plot(ts_plt, xs_num, label='x(t) by ODE solver')
plt.legend()
plt.title('solutions');

<IPython.core.display.Javascript object>

Several things can be specified by the user. We can change the initial value problem to a boundary value problem by using another `Condition` object.

A damped harmonic oscillator: 

$$
F(x, t) = \frac{d^2x}{dt^2} + 0.3\frac{dx}{dt} + x = 0
$$

Solve a boundary value problem where

$$
x|_{t=0} = 1.7, \frac{dx}{dt}|_{t=4\pi} = 0.0
$$

In [11]:
boundary_condition = TwoPointDirichletBoundaryCondition(t_0=0.0, x_0=1.7, t_1=4*np.pi, x_1=0.0)
solution, _ = solve(ode=damped_harmonic_oscillator, condition=boundary_condition, t_min=0.0, t_max=4*np.pi)

<IPython.core.display.Javascript object>

The solution has not converged.


We can change the network architecture.

In [12]:
fcnn = FCNN(n_hidden_units=32, n_hidden_layers=3, actv=nn.Sigmoid)
solution, _ = solve(ode=damped_harmonic_oscillator, condition=boundary_condition, t_min=0.0, t_max=4*np.pi,
                    net=fcnn)

<IPython.core.display.Javascript object>

The solution has not converged.


We can change the optimizer.

In [14]:
fcnn = FCNN(n_hidden_units=32, n_hidden_layers=3, actv=nn.Sigmoid)
rmsprop = optim.RMSprop(fcnn.parameters(), lr=0.01)
solution, _ = solve(ode=damped_harmonic_oscillator, condition=boundary_condition, t_min=0.0, t_max=4*np.pi,
                    net=fcnn, optimizer=rmsprop)

<IPython.core.display.Javascript object>

The solution has not converged.


We can change how the training examples are generated.

In [16]:
examples = ExampleGenerator(32, 0.0, 4*np.pi, method='uniform')
solution, _ = solve(ode=damped_harmonic_oscillator, condition=initial_value, t_min=0.0, t_max=4*np.pi,
                    example_generator=examples)

<IPython.core.display.Javascript object>