In [1]:
# imports
import deepxde as dde
import numpy as np
import os
import sys
import tensorflow as tf
from matplotlib import pyplot as plt

from typing import Callable, Dict, List, Tuple

# config
tf.get_logger().setLevel('ERROR')

dde.config.set_random_seed(7913)
if dde.backend.backend_name != 'tensorflow':
    raise Exception("set backend tensorflow with: python -m deepxde.backend.set_default_backend tensorflow")

DTYPE: str = "float32"
dde.config.set_default_float(DTYPE)
tf.keras.backend.set_floatx(DTYPE)
if DTYPE == "float32":
    dde.config.real.set_float32()
    DTYPE = np.float32
elif DTYPE == "float64":
    dde.config.real.set_float64()
    DTYPE = np.float64
else:
    raise Exception("Choose DTYPE: float32 or float64")

def add_ones(X: np.ndarray) -> np.ndarray:
    t = np.ones((X.shape[0], X.shape[1] + 1))
    t[:,1:] = X
    return t

# helper functions
def get_grad(model: tf.keras.Sequential, X_r: tf.Tensor, r: int, d: int = 1) -> np.ndarray:
    """Calculate the dth derivative from the model at r.

    Args:
        model (tf.keras.Sequential): model
        X_r (tf.Tensor): Input Matrix
        r (int): index of feature
        d (int): which derivative, d > 0. Default is 1. 

    Returns:
        np.ndarray: gradient
    """
    with tf.GradientTape(persistent=True) as tape:
        cols = []
        for i in range(X_r.shape[1]):
            cols.append(X_r[:, i])
            if r == i:
                tape.watch(cols[i])

        X = tf.stack(cols, axis=1)
        u = model(X)

        G = np.empty((X_r.shape[0], u.shape[1]), dtype= DTYPE)
        for k in range(u.shape[1]):
            u_x = u[:,k]
            x = cols[r]
            for _ in range(d):
                u_x = tape.gradient(u_x, x)
            G[:, k] = u_x.numpy()
    del tape
    return G

# geometry classes
class Geometry:
    def __init__(self, num_dom: int, num_bnd: int, num_tst: int) -> None:
        self.num_dom: int = num_dom
        self.num_bnd: int = num_bnd
        self.num_tst: int = num_tst
        self.geom: dde.geometry.Geometry
        self.dirichlet_bcs: List[dde.icbc.boundary_conditions.BC]
        self.data_dom: dde.data.Data
        self.data_test: dde.data.Data
        self.dirichlet_bcs: List[dde.data.Data]

    def exact_sol(r: np.ndarray) -> np.ndarray:
        raise NotImplementedError

    def pde(r, T):
        raise NotImplementedError

    def phi(r):
        raise NotImplementedError

    def full_pde(r, T):
        raise NotImplementedError

    def get_loss(self, model: tf.keras.Sequential, X_r: tf.Tensor) -> np.ndarray:
        raise NotImplementedError


class RectHole(Geometry):
    a = 0.4
    b = 1.0

    def __init__(self, num_dom: int, num_bnd: int, num_tst: int) -> None:
        a = RectHole.a
        b = RectHole.b

        super().__init__(num_dom, num_bnd, num_tst)

        self.geom: dde.geometry.Geometry = dde.geometry.Rectangle([-b, -b], [b, b]) - dde.geometry.Disk([0, 0], a / 2)

        self.dirichlet_bcs: List[dde.icbc.boundary_conditions.DirichletBC] = []
        self.dirichlet_bcs.append(dde.DirichletBC(geom=self.geom, func=RectHole._exact_sol, on_boundary=RectHole._ver_boundary))
        self.dirichlet_bcs.append(dde.DirichletBC(geom=self.geom, func=RectHole._exact_sol, on_boundary=RectHole._int_boundary))

        self.neumann_bcs: List[dde.icbc.boundary_conditions.NeumannBC] = []
        self.neumann_bcs.append(dde.NeumannBC(geom=self.geom, func=RectHole._exact_grad_n, on_boundary=RectHole._hor_boundary))

        self.data_dom = dde.data.PDE(self.geom, RectHole.pde, [], num_domain=num_dom, num_boundary=0, num_test=num_tst, solution=None)

        self.data_dirichlet: List[dde.data.PDE] = []
        for bc in self.dirichlet_bcs:
            self.data_dirichlet.append(dde.data.PDE(self.geom, RectHole.pde, [bc], num_domain=0, num_boundary=num_bnd, num_test=num_tst, solution=__class__._exact_sol))

        self.data_neumann: List[dde.data.PDE] = []
        for bc in self.neumann_bcs:
            self.data_neumann.append(dde.data.PDE(self.geom, RectHole.pde, [bc], num_domain=0, num_boundary=num_bnd, num_test=num_tst, solution=__class__._exact_grad_n))

        self.data_test = dde.data.PDE(
            self.geom, 
            RectHole.full_pde, 
            self.dirichlet_bcs + self.neumann_bcs, 
            num_domain=num_dom, 
            num_boundary=num_bnd, 
            num_test=num_tst, 
            solution=RectHole._exact_sol)

    def _ver_boundary(r, on_boundary):
        x, _ = r
        return on_boundary and np.isclose(x**2, RectHole.b**2)

    def _int_boundary(r, on_boundary):
        x, y = r
        return on_boundary and np.isclose((x**2 + y**2), (RectHole.a / 2)**2)

    def _exact_grad_n(r):
        # Q&A: here just on horizontal boundary defined? @Sacchetti
        a = RectHole.a
        b = RectHole.b
        x = r[:, 0:1]
        return b * (2 - a / (x**2 + b**2)**0.5)

    def _hor_boundary(r, on_boundary):
        _, y = r
        return on_boundary and np.isclose(y**2, RectHole.b**2)

    def _exact_sol(r: np.ndarray) -> np.ndarray:
        x = r[:, 0:1]
        y = r[:, 1:]
        return ((x**2 + y**2)**0.5 - (RectHole.a / 2))**2

    def _phi(r):
        x = r[:, 0:1]
        y = r[:, 1:]
        return (RectHole.a / (x**2 + y**2)**0.5) - 4

    def exact_sol(self, r: np.ndarray) -> np.ndarray:
        return __class__._exact_sol(r)

    def pde(r, T):
        dT_xx = dde.grad.hessian(T, r, i=0, j=0)
        dT_yy = dde.grad.hessian(T, r, i=1, j=1)
        return dT_xx + dT_yy

    def phi(self, r):
        return __class__._phi(r)

    def full_pde(r, T):
        return RectHole._pde(r, T) + RectHole._phi(r)

    def get_loss(self, model: tf.keras.Sequential, X_r: tf.Tensor) -> np.ndarray:
        u_xx = get_grad(model, X_r, 0, 2)
        u_yy = get_grad(model, X_r, 1, 2)
        return u_xx + u_yy



class ELM:
    def __init__(self, layers: List[int], seed: Tuple[int, int] = None, rcond: float = 1e-10) -> None:
        for i in range(len(layers)):
            layers[i] -= 1
        self.layers: List[int] = [2] + layers + [1]
        self.activation_func: Callable = np.tanh
        self.L: int = 2

        rng = np.random.default_rng(123)
        self.weights_: List[np.ndarray] = []
        for i in range(1, len(self.layers)):
            self.weights_.append(rng.random((self.layers[i - 1] + 1, self.layers[i])) - 0.5)

    def forward_prop(self, X: np.ndarray) -> List[np.ndarray]:
        H = []
        for weights in self.weights_[:-1]:
            X = add_ones(X)
            X = np.dot(X, weights)
            X = self.activation_func(X)
            H.append(X)

        X = add_ones(X)
        H.append(np.dot(X, self.weights_[-1]))
        return H

    def fit(self, X: np.ndarray, y: np.ndarray) -> None:
        a_l = self.forward_prop(X)
        H = a_l[-2]
        H = add_ones(H)
        H_inv = np.linalg.pinv(H, rcond=1e-10)
        self.weights_[-1] = np.dot(H_inv, y)

    def mse(self, X: np.ndarray, y: np.ndarray) -> float:
        y_hat = self.forward_prop(X)[-1]
        return ((y - y_hat)**2).mean()

    def rms(self, X: np.ndarray, y: np.ndarray) -> float:
        return self.mse(X, y)**0.5


Using backend: tensorflow



Enable just-in-time compilation with XLA.

Set the default float type to float32


In [2]:
# eval functions

def get_deviation(elm: ELM, geom: Geometry) -> Tuple[float, float]:
    """Get RMS of predicted and exact value.

    Args:
        elm (PdeELM): trained elm model
        geom (Geometry): geometry

    Returns:
        Tuple[float, float]: training, test
    """

    X, _, _ = geom.data_test.train_next_batch()
    y_pred_train = elm.predict(X)
    y_true_train = geom.exact_sol(X)
    deviation_train = rms(y_pred_train, y_true_train)

    Xt, _, _ = geom.data_test.test()
    y_pred_test = elm.predict(Xt)
    y_true_test = geom.exact_sol(Xt)
    deviation_test = rms(y_pred_test, y_true_test)
    return deviation_train, deviation_test

def rms(y_pred: np.ndarray, y_true: np.ndarray) -> float:
    """Calculate deviation from exact solution (root mean squared error).

    Args:
        y_pred (np.ndarray): predicted values
        y_true (np.ndarray): exact values

    Returns:
        float: rms
    """
    return ((y_pred - y_true)**2).mean()**0.5

In [7]:
def run(
    num_dom: int, 
    num_bnd: int, 
    num_tst: int, 
    layers: List[int], 
    rcond: float = 1e-10) -> Dict:
    """Make and train a elm model with given hyper-parameters.

    Args:
        num_dom (int): points in domain
        num_bnd (int): points in each boundary condition
        num_tst (int): points in the test set
        layers (List[int]): hidden layer sizes without bias node
        rcond (float): threshold value for numpy.linalg.pinv, Default is 1e-10.

    Returns:
        Dict: {'accuracy': (train, test), 'deviation': (train, test)}
        accuracy = RMS of loss
        deviation = RMS of deviation to exact solution
    """
    geom = RectHole(num_dom=num_dom, num_bnd=num_bnd, num_tst=num_tst)
    X, _, _ = geom.data_dom.train_next_batch()
    y = geom.exact_sol(X)

    for data_bc in geom.data_dirichlet:
        X_bc, b_bc, _ = data_bc.train_next_batch()
        X = np.concatenate([X, X_bc])
        y = np.concatenate([y, b_bc])

    for data_bc in geom.data_neumann:
        X_bc, b_bc, _ = data_bc.train_next_batch()
        X = np.concatenate([X, X_bc])
        y = np.concatenate([y, b_bc])
    
    exact_elm = ELM(layers, seed=(123, 456), rcond=rcond)
    exact_elm.fit(X, y)

    deviation = get_deviation(exact_elm, geom)
    X, _, _ = geom.data_test.train_next_batch()
    Xt, _, _ = geom.data_test.test()
    acc_train = exact_elm.get_accuracy(X, geom)
    acc_test = exact_elm.get_accuracy(Xt, geom)

    return {'accuracy' : (acc_train, acc_test), 'deviation' : deviation}

In [9]:
# example
num_dom = 2048
num_bnd = 1024
num_tst = 4444
layers = [32]
rcond = 1e-8

res = run(num_dom, num_bnd, num_tst, layers, rcond)
print(res)



TypeError: 'float' object is not callable

In [None]:
# find best layersize
num_dom = 250
num_bnd = 85
num_tst = 380
rcond = 1e-10
start = 4
stop = 500
points = 20
step = int((stop - start)/points)

res = []
metrics = ['deviation_train', 'deviation_test', 'accuracy_train', 'accuracy_test']
while step >= 1:
    layers = np.arange(start=start, stop=stop+1, step=step).reshape(-1, 1)
    res_step = {
        'layersizes' : layers,
        'deviation_train' : [],
        'deviation_test' : [],
        'accuracy_train' : [],
        'accuracy_test' : [],
        'min_l' : 0
        }
    for l in layers:
        r = run(num_dom, num_bnd, num_tst, l, rcond)
        res_step['deviation_train'].append(r['deviation'][0])
        res_step['deviation_test'].append(r['deviation'][1])
        res_step['accuracy_train'].append(r['accuracy'][0])
        res_step['accuracy_test'].append(r['accuracy'][1])
    res.append(res_step)
    min_l = layers[np.argmin(res_step['accuracy_test'])]
    res_step['min_l'] = min_l
    start = min_l - step + 1
    stop = min_l + step - 1
    step = int((stop - start)/points)


runs = len(res)
fig = plt.figure(figsize=(20, 10))
for r in range(runs):
    for idx, metric in enumerate(metrics):
        fig.add_subplot(runs, 4, r*4 + idx + 1)
        plt.xlabel("layersize")
        plt.ylabel(metric)
        min_l = res[r]["min_l"]
        layers = res[r]["layersizes"]
        min_l = layers[np.argmin(res[r][metric])]
        plt.title(f"min: {min_l[0]}")
        plt.scatter(res[r]["layersizes"], res[r][metric])

layers = res[-1]["layersizes"]
opt_layers: int = layers[np.argmin(res[-1]["accuracy_test"])]

In [None]:
# find best ratio num_bnd/num_dom

num_data = 2000
num_tst = 300
layers = [opt_layers]
# percent of num_bnd
step = 0.01
start = 0.1
stop = 0.5
rcond = 1e-10

percents = np.arange(start, stop, step=step)
res = {
    'deviation_train' : [],
    'deviation_test' : [],
    'accuracy_train' : [],
    'accuracy_test' : [],
    }
metrics = ['deviation_train', 'deviation_test', 'accuracy_train', 'accuracy_test']

for p in percents:
    num_bnd = int(p * num_data)
    num_dom = num_data - num_bnd
    r = run(num_dom, num_bnd, num_tst, layers, rcond)
    res['deviation_train'].append(r['deviation'][0])
    res['deviation_test'].append(r['deviation'][1])
    res['accuracy_train'].append(r['accuracy'][0])
    res['accuracy_test'].append(r['accuracy'][1])

runs = len(res)
fig = plt.figure(figsize=(20, 5))
for idx, metric in enumerate(metrics):
    fig.add_subplot(1, 4, idx + 1)
    plt.xlabel("percent_bnd")
    plt.title("min: " + "{:.2f}".format(percents[np.argmin(res[metric])]))
    plt.ylabel(metric)
    plt.scatter(percents, res[metric])

opt_bnd_percent: float = percents[np.argmin(res['accuracy_test'])]

In [None]:
# find number of datapoints needed
start = 300
stop = 20_000
step = 300
num_data = np.arange(start, stop, step)
num_tst = 300
layers = [opt_layers]
bnd_per = opt_bnd_percent

res = {
    'deviation_train' : [],
    'deviation_test' : [],
    'accuracy_train' : [],
    'accuracy_test' : [],
    }
metrics = ['deviation_train', 'deviation_test', 'accuracy_train', 'accuracy_test']

for data in num_data:
    num_bnd = int(bnd_per * data)
    num_dom = data - num_bnd
    r = run(num_dom, num_bnd, num_tst, layers, rcond)
    res['deviation_train'].append(r['deviation'][0])
    res['deviation_test'].append(r['deviation'][1])
    res['accuracy_train'].append(r['accuracy'][0])
    res['accuracy_test'].append(r['accuracy'][1])

runs = len(res)
fig = plt.figure(figsize=(24, 5))
for idx, metric in enumerate(metrics):
    fig.add_subplot(1, 4, idx + 1)
    plt.xlabel("num_dom + num_bnd")
    plt.title(f"min: {num_data[np.argmin(res[metric])]}")
    plt.ylabel(metric)
    plt.scatter(num_data, res[metric])

opt_num_data: int = num_data[np.argmin(res['accuracy_test'])]

In [None]:
num_bnd = int(opt_bnd_percent * opt_num_data)
num_dom = opt_num_data - num_bnd
layers = opt_layers
num_tst = 340
rconds = np.arange(start=1, stop=30, step=1, dtype=DTYPE)
scale = lambda x: 1/(10**(x))
# scale = 1e-x
rconds = scale(rconds)

res = {
    'deviation_train' : [],
    'deviation_test' : [],
    'accuracy_train' : [],
    'accuracy_test' : [],
    }
metrics = ['deviation_train', 'deviation_test', 'accuracy_train', 'accuracy_test']

for rcond in rconds:
    r = run(num_dom, num_bnd, num_tst, layers, rcond)
    res['deviation_train'].append(r['deviation'][0])
    res['deviation_test'].append(r['deviation'][1])
    res['accuracy_train'].append(r['accuracy'][0])
    res['accuracy_test'].append(r['accuracy'][1])

fig = plt.figure(figsize=(20, 5))
for idx, metric in enumerate(metrics):
    ax = fig.add_subplot(1, 4, idx + 1)
    ax.set_xscale("log")
    ax.set_yscale("log")
    plt.xlabel("rcond")
    plt.title(f"min: {rconds[np.argmin(res[metric])]}")
    plt.ylabel(metric)
    plt.scatter(rconds, res[metric])

opt_rcond: float = rconds[np.argmin(res['accuracy_test'])]

In [None]:
# print optimum
num_bnd = int(opt_bnd_percent * opt_num_data)
num_dom = opt_num_data - num_bnd
num_tst = 340
rcond = opt_rcond

res = run(num_dom=num_dom, num_bnd=num_bnd, num_tst=num_tst, layers=opt_layers, rcond=rcond)
print("\n best parameteres found:")
print(f"layers: {opt_layers}\tnum_dom: {num_dom}\tnum_bnd: {num_bnd}\tr_cond: {rcond}")
print(res)
# True number of points on the boundary is:
# ~ number of boundary conditions * num_bnd
# TO DO extract exact number of boundary-points from deepxde