# Poisson 2d
---

Consider the 2d Poisson Equation
\begin{equation}
\begin{aligned}
&\nabla \cdot (\sigma \nabla u) = f, \\
&\sigma \frac{\partial u}{\partial n}|_{\partial\Omega} = g, \\
\end{aligned}
\end{equation}
where the domain $\Omega = [0, 1]\times [0, 1]$, and the target field $\sigma(x, y) = 0.1x + 1.0$.
$f, g$ are chosen such that the exact solution is
\begin{equation}
u(x, y) = \exp(\pi y)\sin(\pi x).
\end{equation}

$f(x, y) = 0.1\pi \exp(\pi y)\cos(\pi x)$.

For the inverse problem, Dirichlet measurements are provided on the boundary.

We further assume the form of $\sigma$ is known, i.e.
\begin{equation}
\sigma(x, y) = a_0 + a_1 x,
\end{equation}
and only invert $a_0, a_1$.

In [1]:
import os, sys
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath("./"))))

import torch
import torch.nn as nn
import numpy as np

from pinn_lightning.pinn import PINN, InversePINN, Task
from pinn_lightning.data.dataset import PINNDataModule
from pinn_lightning.data.utils import tensor_grid, tensors_from_numpy
from pinn_lightning.utils.basic_nets import *
from pinn_lightning.utils.operators import grad, directional_grad

import pytorch_lightning as pl

%load_ext autoreload
%autoreload 2
%matplotlib notebook
%config Completer.use_jedi = False

import random
torch.manual_seed(1001)
np.random.seed(1001)
random.seed(1001)

torch.set_default_dtype(torch.float64)
device = torch.device("cpu")

# Step 1: Define Model

In [2]:
class Poisson2d(InversePINN):	
	@property
	def pi(self):
		return torch.tensor(np.pi).to(self.device)
	
	def _sigma(self, x):
		return 0.1*x + 1.0
	
	def get_output(self, batch_input):
		(x_domain, y_domain), (x_n, y_n, normal), (x_d, y_d) = batch_input  # x_n: neumann BC; x_d: dirichlet measurements
		x_domain.requires_grad = True
		y_domain.requires_grad = True
		u_domain = self.forward(torch.hstack([x_domain, y_domain]))
		sigma_domain = self.inverse_module(torch.hstack([x_domain, y_domain]))
		
		eq = grad(grad(u_domain, x_domain)*sigma_domain, x_domain) + \
			grad(grad(u_domain, y_domain)*sigma_domain, y_domain) - \
			0.1*self.pi*torch.exp(self.pi*y_domain)*torch.cos(self.pi*x_domain)
		
		x_n.requires_grad = True
		y_n.requires_grad = True
		u_n = self.forward(torch.hstack([x_n, y_n]))
		u_grad = directional_grad(u_n, x_n, y_n, direction=normal)
		sigma_n = self.inverse_module(torch.hstack([x_n, y_n]))
		sigma = self._sigma(x_n)
		
		u_d = self.forward(torch.hstack([x_d, y_d]))
		return eq, u_grad*sigma, sigma_n-sigma, u_d
	
class SigmaModule(nn.Module):
	def __init__(self, a0, a1):
		super().__init__()
		self.a0 = nn.Parameter(a0)
		self.a1 = nn.Parameter(a1)
		
	def forward(self, xy):
		return self.a1*xy[:, 0:1] + self.a0
	
class PrintCallback(pl.callbacks.Callback):
	def on_train_batch_end(self, trainer, pl_module, *args, **kwargs):
		pl_module.log("a0", pl_module.inverse_module.a0, on_step=False, on_epoch=True)
		pl_module.log("a1", pl_module.inverse_module.a1, on_step=False, on_epoch=True)
	
forward_module = DenseNet(
	[2] + [256]*4 + [1], Swish,
)
inverse_module = SigmaModule(
	a0 = torch.tensor(2.0),
	a1 = torch.tensor(0.2),
)

pinn = Poisson2d(forward_module, inverse_module)
pinn.configure_tasks(
	[
		Task(
			n_input = 2,
			n_output = 1,
			loss_fns = nn.MSELoss(),
			loss_weights = 1.0,
			names = "domain",
		),
		Task(
			n_input = 3,
			n_output = 2,
			loss_fns = nn.MSELoss(),
			loss_weights = 1.0,
			names = "neumann",
		),
		Task(
			n_input = 2,
			n_output = 1,
			loss_fns = nn.MSELoss(),
			loss_weights = 1.0,
			names = "dirichlet",
		),
	]
)
optimizer = torch.optim.Adam(
	[
		{
			"params": pinn.param_groups[0],
			"lr": 1e-2,
			"weight_decay": 0
		},
		{
			"params": pinn.param_groups[1],
			"lr": 1e-2,
			"weight_decay": 0,
		},
	]
)
pinn.configure_optimizers_and_schedulers(
	optimizer = optimizer,
	lr_scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
		optimizer,
		factor = 0.8,
		patience = 100,
		threshold = 1e-6,
		min_lr = 1e-4,
	),
)
pinn

Poisson2d(
  (forward_module): DenseNet(
    (net): Sequential(
      (0): Linear(in_features=2, out_features=256, bias=True)
      (1): Swish()
      (2): Linear(in_features=256, out_features=256, bias=True)
      (3): Swish()
      (4): Linear(in_features=256, out_features=256, bias=True)
      (5): Swish()
      (6): Linear(in_features=256, out_features=256, bias=True)
      (7): Swish()
      (8): Linear(in_features=256, out_features=1, bias=True)
    )
  )
  (inverse_module): SigmaModule()
)

# Step 2: Generate Dataset

In [3]:
x_1d = np.linspace(0, 1, 101).reshape((-1, 1))
y_1d = np.linspace(0, 1, 99).reshape((-1, 1))
xy = tensor_grid([x_1d, y_1d])
x_domain, y_domain = xy[:, 0:1], xy[:, 1:2]
u_domain = np.zeros_like(x_domain)

x_bc_1d = np.linspace(0, 1, 1001).reshape((-1, 1))
y_bc_1d = np.linspace(0, 1, 999).reshape((-1, 1))
xy_bc = np.vstack(
	[
		np.hstack([x_bc_1d, 0*np.ones_like(x_bc_1d)]),
		np.hstack([np.ones_like(y_bc_1d), y_bc_1d]),
		np.hstack([x_bc_1d, np.ones_like(x_bc_1d)]),
		np.hstack([0*np.ones_like(y_bc_1d), y_bc_1d]),
	]
)
normal = np.vstack(
	[
		np.hstack([np.zeros_like(x_bc_1d), -1*np.ones_like(x_bc_1d)]),
		np.hstack([np.ones_like(y_bc_1d), np.zeros_like(y_bc_1d)]),
		np.hstack([np.zeros_like(x_bc_1d), np.ones_like(x_bc_1d)]),
		np.hstack([-np.ones_like(y_bc_1d), np.zeros_like(y_bc_1d)]),
	]
)
x_d, y_d = xy_bc[:, 0:1], xy_bc[:, 1:2]
u_fn = lambda x, y: np.exp(np.pi*y)*np.sin(np.pi*x)
u_d = u_fn(x_d, y_d)

x_n, y_n = x_d, y_d
du_dx_fn = lambda x, y: np.pi*np.exp(np.pi*y)*np.cos(np.pi*x)
du_dy_fn = lambda x, y: np.pi*np.exp(np.pi*y)*np.sin(np.pi*x)
sigma_fn = lambda x, y: 1.0 + 0.1*x
sigma_n = sigma_fn(x_n, y_n)
u_n = (du_dx_fn(x_n, y_n)*normal[:, 0:1] + du_dy_fn(x_n, y_n)*normal[:, 1:2])*sigma_n
sigma_n_target = np.zeros_like(sigma_n)

In [4]:
dataset_domain = torch.utils.data.TensorDataset(
	*tensors_from_numpy(x_domain, y_domain, u_domain),
)
dataset_n = torch.utils.data.TensorDataset(
	*tensors_from_numpy(x_n, y_n, normal, u_n, sigma_n_target),
)
dataset_d = torch.utils.data.TensorDataset(
	*tensors_from_numpy(x_d, y_d, u_d),
)

poisson_data_module = PINNDataModule(
	datasets = [dataset_domain, dataset_n, dataset_d],
	collate_fns = [None, None, None],
	valid_splits = [0.2, 0.2, 0.2],
	batch_sizes = [40000, 4000, 4000],
	shuffle = [True, True, True],
)

# Step 3: training

In [None]:
trainer = pl.Trainer(
	max_epochs=30000,
	gpus=1,
	logger=True,
	callbacks=[
		pl.callbacks.EarlyStopping(monitor="valid_loss", patience=10000),
        pl.callbacks.ModelCheckpoint(dirpath="models", save_last=True, save_top_k=1),
		pl.callbacks.LearningRateMonitor(logging_interval='step'),
		PrintCallback(),
	],
	log_every_n_steps=10,
)
trainer.fit(
	pinn,
	poisson_data_module,
)

In [6]:
checkpoint_callback = trainer.callbacks[-1]
checkpoint = torch.load(checkpoint_callback.best_model_path)
pinn.load_state_dict(checkpoint["state_dict"])

x_test = torch.from_numpy(x_domain).to(pinn.device)
y_test = torch.from_numpy(y_domain).to(pinn.device)
u_test = u_fn(x_domain, y_domain).reshape((len(x_1d), len(y_1d))).T
# sigma_test = np.ones_like(u_test)
sigma_test = 1.0 + 0.1*x_domain.reshape((len(x_1d), len(y_1d))).T

with torch.no_grad():
	u_pred = pinn.forward(torch.hstack([x_test, y_test])).cpu().numpy().reshape((len(x_1d), len(y_1d))).T
	sigma_pred = pinn.inverse_module(torch.hstack([x_test, y_test])).cpu().numpy().reshape((len(x_1d), len(y_1d))).T
	
u_err = np.sqrt(np.mean((u_test - u_pred)**2))
u_norm = np.sqrt(np.mean(u_test**2))
sigma_err = np.sqrt(np.mean((sigma_test - sigma_pred)**2))
sigma_norm = np.sqrt(np.mean(sigma_test**2))

X, Y = np.meshgrid(x_1d, y_1d)
import matplotlib.pyplot as plt
import matplotlib as mpl
from matplotlib import ticker, cm
cmap = mpl.cm.viridis
	
fig, axes = plt.subplots(2, 3, figsize = (18, 10))

c0 = axes[0][0].contourf(X, Y, u_test, cmap = cmap, levels = 20)
c1 = axes[0][1].contourf(X, Y, u_pred, cmap = cmap, levels = 20)
c2 = axes[0][2].contourf(X, Y, np.abs(u_test - u_pred), cmap = cmap, norm = mpl.colors.LogNorm())
c3 = axes[1][0].contourf(X, Y, sigma_test, cmap = cmap, levels = 20)
c4 = axes[1][1].contourf(X, Y, sigma_pred, cmap = cmap, levels = 20)
c5 = axes[1][2].contourf(X, Y, np.abs(sigma_test - sigma_pred), cmap = cmap, norm = mpl.colors.LogNorm())
 
for c, ax in zip([c0, c1, c2], axes[0]):
    fig.colorbar(c, ax = ax)

for ax, name in zip(axes[0], ["u, true", "u, pred", "rel $l_2$ error: {:.2e}".format(u_err/u_norm)]):
    ax.set_title(name)
	
for c, ax in zip([c3, c4, c5], axes[1]):
    fig.colorbar(c, ax = ax)

for ax, name in zip(axes[1], ["$\sigma$, true", "$\sigma$, pred", "rel $l_2$ error: {:.2e}".format(sigma_err/sigma_norm)]):
    ax.set_title(name)

<IPython.core.display.Javascript object>