# Validation Examples
#### Frederik Kelbel, Imperial College London

## Dependencies

In [1]:
import torch
import plotly.graph_objects as go
import numpy as np
from operators import div, Δ


from DGM import DGMSolver
from pdes import PDE
from scipy.integrate import quad
from plotly.subplots import make_subplots
from configs import CONFIG_PARABOLIC_PDES as MODEL_CONFIG

## Plotting

In [2]:
def plot_losses(losses, avg_over=10):
    avgs = np.convolve(losses, np.ones(avg_over), 'valid') / avg_over
    fig = make_subplots(rows=1, cols=1)
    fig.add_trace(go.Scatter(x=np.arange(len(avgs)), y=avgs, mode='lines', name="Error at x=0.1"), row=1, col=1)
    fig.update_layout(
        title="Loss",
        xaxis_title="Iterations",
        yaxis_title="Loss",
        font=dict(
            family="Courier New, monospace",
            size=14
        )
    )
    fig.show()
    
def plot_1D_functions(solver, sol):
    fig = make_subplots(rows=1, cols=2, 
                   specs=[[{'type': 'surface'}, {'type': 'surface'}]])
    xs = np.linspace(0, 1, 100)
    ts = np.linspace(0.01, 1, 100)
    us_pred = np.array([[solver.u(x, t=t) for x in xs] for t in ts])
    us = np.array([[sol(x, t) for x in xs] for t in ts])
    x_mesh, t_mesh = np.meshgrid(xs, ts)
    fig.add_trace(go.Surface(z=us, showscale=False), row=1, col=1)
    fig.add_trace(go.Surface(z=us_pred), row=1, col=2)
    fig.update_layout(title='Solution | Approximation',
                  scene = dict(
                    xaxis_title="t",
                    yaxis_title="x",
                    zaxis_title="u(x, t)"),
                  scene2 = dict(
                    xaxis_title="t",
                    yaxis_title="x",
                    zaxis_title="u(x, t)"),
                  margin=dict(l=50, r=50, b=50, t=50))
    fig.show()

## Partial Differential Equations

### Nonhomogeneous Transport:
$$
\begin{cases}
u_t + b (\nabla \cdot u) = f \quad & \text{in } [0, 1]^\texttt{x_dim=1} \times [0, 1]\\
u = g \quad & \text{on } [0, 1]^\texttt{x_dim=1} \times {t=0}
\end{cases}
$$

In [3]:
class TRANSPORT(PDE):
    def __init__(self):
        super().__init__()
        self.b = 0.3
        self.f = lambda x, t: 2*x + x*t
        self.g = lambda x: x
        self.x_dim = 1
        self.equation = lambda u, x, t: div(u, t) + self.b * div(u, x) - self.f(x, t)
        self.domain_func = [lambda x: x]
        self.init_datum = [lambda u, x, t: u -self.g(x)]
        self.init_func = [lambda x: x]
        self.boundary_cond = [lambda u, x, t: u*0]
        self.boundary_func = [lambda x: x]
        

In [4]:
eq = TRANSPORT()
model = MODEL_CONFIG
solver = DGMSolver(model, eq)
losses = list(solver.train(200))
plot_losses(losses)

100%|██████████| 200/200 [00:00<00:00, 251.70 it/s]


In [5]:
if eq.x_dim == 1:
    f = lambda s, x, t, b: eq.f(x+(s-t)*b, s)
    Transport_sol =\
        lambda x, t: eq.g(x-eq.b*t) + quad(f, 0, t, args=(x, t, eq.b))[0]
    plot_1D_functions(solver, Transport_sol)

### Inviscid Burger's Equation:
$$
\begin{cases}
u_t + u (\nabla \cdot u) = 0 \quad & \text{in } [0, 1]^\texttt{x_dim=1} \times [0, 1]\\
u = g \quad & \text{on } [0, 1]^\texttt{x_dim=1} \times {t=0}
\end{cases}
$$

In [6]:
class INVISCID_BURGERS(PDE):
    def __init__(self):
        super().__init__()
        self.x_dim = 1
        self.equation = lambda u, x, t: div(u, t) + u * div(u, x)
        self.domain_func = [lambda x: x]
        self.init_datum = [lambda u, x, t: u - x]
        self.init_func = [lambda x: x]
        self.boundary_cond = [lambda u, x, t: u*0]
        self.boundary_func = [lambda x: x]

In [7]:
eq = INVISCID_BURGERS()
model = MODEL_CONFIG
solver = DGMSolver(model, eq)
losses = list(solver.train(500))
plot_losses(losses)

100%|██████████| 500/500 [00:02<00:00, 243.09 it/s]


In [8]:
if eq.x_dim==1:
    INVISCID_BURGERS_sol = lambda x, t: x / (1 + t)
    plot_1D_functions(solver, INVISCID_BURGERS_sol)

### Diffusion Advection Equation:
$$
\begin{cases}
u_t + a u_x - b \Delta u = 0 \quad & \text{in } [0, 1]^{\texttt{x_dim}=1} \times [0, 1]\\
u = \exp\Big( -\frac{(x-1)^2}{b}\Big) \quad & \text{on } [0, 1]^{\texttt{x_dim}=1} \times {t=0} \\
u = \frac{1}{\sqrt{4t+1}} \exp\Big(-\frac{(x-1-at)^2}{b(4t+1)} \Big) \quad & \text{on } \{0, 1\}^{\texttt{x_dim}=1}\times [0, 1]
\end{cases}
$$

In [9]:
class HEAT(PDE):
    def __init__(self):
        super().__init__()
        self.a = 0.01
        self.b = 10
        self.x_dim = 1
        self.equation = lambda u, x, t: div(u, t) + self.a * div(u, x) - self.b * Δ(u, x)
        self.domain_func = [lambda x: x]
        self.init_datum = [lambda u, x, t: u - torch.exp(-(x-1)**2/self.b)]
        self.init_func = [lambda x: x]
        self.boundary_cond = [lambda u, x, t: u - 1/torch.sqrt(4*t+1)*torch.exp(-(x-1-self.a*t)**2/(self.b*(4*t+1)))]
        self.boundary_func = [lambda x: torch.where(x > 0.5, 1.0, 0.0)]

In [10]:
MODEL_CONFIG = {
    "batch_size":128, # minimum batch size is two because of split
    "hidden_dim": 64,
    "learning_rate": 5e-3,
    "loss_weights": (1, 2, 2),
    "sampling_method": "uniform",
    "lr_decay": 0.99
}
eq = HEAT()
model = MODEL_CONFIG
solver = DGMSolver(model, eq)
losses = list(solver.train(400))
plot_losses(losses)

100%|██████████| 400/400 [00:03<00:00, 130.86 it/s]


In [11]:
if eq.x_dim == 1:
    HEAT_sol = lambda x, t: 1/np.sqrt(4*t+1)*np.exp(-(x-1-eq.a*t)**2/(eq.b*(4*t+1)))
    plot_1D_functions(solver, HEAT_sol)

### Diffusion Equation in higher dimensions:
$$
\begin{cases}
u_t - \kappa \Delta u = 0 \quad & \text{in } [0, 1]^{\texttt{x_dim}=2} \times [0, 1]\\
u = \cos(x_1) + \sin(x_2) \quad & \text{on } [0, 1]^{\texttt{x_dim}=1} \times {t=0} \\
\end{cases}
$$

In [26]:
class WAVE(PDE):
    def __init__(self):
        super().__init__()
        self.κ = 1
        self.y_0 = 0.5
        self.x_dim = 2
        self.δ = 0.2
        self.d = 0.5
        
        def init_function(x):
            x[:, 1] -= self.y_0
            x = x/(2*torch.sum(x**2, dim=-1))
            x[:, 1] += self.y_0
            return self.δ*x
            
        self.equation = lambda u, x, t: Δ(u, t) - self.κ * Δ(u, x)
        
        self.init_datum = lambda x: torch.cos(np.pi*torch.sqrt(x[:, 0]**2 + (x[:, 1]-self.y_0)**2)/(2*self.δ))
        self.domain_func = [lambda x: (x-0.5)*2]
        self.boundary_cond = lambda x, t: div(u, t)
        
        self.init_func = [lambda x: init_function(x)]
        self.boundary_cond = [lambda u, x, t: u - 1/torch.sqrt(4*t+1)*torch.exp(-(x-1-self.a*t)**2/(self.b*(4*t+1)))]
        self.boundary_func = [lambda x: torch.where(x > 0.5, 1.0, 0.0)]

MODEL_CONFIG = {
    "batch_size": 64,
    "hidden_dim": 256,
    "learning_rate": 5e-3,
    "loss_weights": (1, 0, 1),
    "lr_decay": 0.7,
    "sampling_method": "uniform"
}

We need multiple boundary conditions, boundary funcs as some functions with bounds specified, then an rf agent that learns to sample from these optimally

In [15]:
eq = WAVE()
model = MODEL_CONFIG
solver = DGMSolver(model, eq)
losses = list(solver.train(300))
plot_losses(losses)

TypeError: object of type 'function' has no len()

In [None]:
if eq.x_dim == 2:
    e_g = lambda y_1, y_2, x_1, x_2, t: np.exp(-((x_1-y_1)**2+(x_2-y_2)**2)/(4*eq.κ*t))*(np.cos(x_1) + np.sin(x_2))
    DIFF_sol =lambda x_1, x_2, t: 1/(4*np.pi*eq.κ*t)*quad(lambda y_2, x_1, x_2, t: quad(e_g, -np.inf, np.inf, args=(y_2, x_1, x_2, t))[0], -np.inf, np.inf, args=(x_1, x_2, t))[0]

In [None]:
#domain_error = np.mean([[[np.abs(DIFF_sol(x_1, x_2, t)-solver.u(x_1, x_2, t=t)) for x_1 in np.linspace(0, 1, 50)] for x_2 in np.linspace(0, 1, 50)] for t in np.linspace(0.01, 1,10)], axis=0)

In [None]:
#domain_error = np.array([[DIFF_sol(x_1, x_2, 0.1) for x_1 in np.linspace(0, 1, 50)] for x_2 in np.linspace(0, 1, 50)])

In [None]:
'''fig = make_subplots(rows=1, cols=1, specs=[[{'type': 'heatmap'}]])
fig.add_trace(go.Heatmap(z=domain_error, coloraxis = "coloraxis"), row=1, col=1)
fig.update_layout(title='Error density',
                  coloraxis = {'colorscale':'turbo'},
                  xaxis_title="x_1",
                  yaxis_title="x_2",
                  margin=dict(l=20, r=20, b=20, t=50))
fig.show()'''

In [None]:
domain_error = np.array([[solver.u(x_2, x_1, t=0.1) for x_1 in np.linspace(0, 1, 50)] for x_2 in np.linspace(0, 1, 50)])

In [None]:
fig = make_subplots(rows=1, cols=1, specs=[[{'type': 'heatmap'}]])
fig.add_trace(go.Heatmap(z=domain_error, coloraxis = "coloraxis"), row=1, col=1)
fig.update_layout(title='Error density',
                  coloraxis = {'colorscale':'turbo'},
                  xaxis_title="x_1",
                  yaxis_title="x_2",
                  margin=dict(l=20, r=20, b=20, t=50))
fig.show()

How about a network that utilizes a range of different activation functions and combines them