In [1]:
import torch
import deepxde as dde
import numpy as np
from typing import cast, Any
import plotly.graph_objects as go
import plotly

%load_ext jupyter_black

Using backend: pytorch
Other supported backends: tensorflow.compat.v1, tensorflow, jax, paddle.
paddle supports more examples now and is recommended.


In [2]:
print(torch.cuda.is_available())
print(torch.cuda.get_device_name(0))

True
NVIDIA GeForce GTX 1650


In [324]:
class LaminarFlow2D:
    """Class modeling the laminar flow in 2+1 dimensions.


    Args:
        u_inf (float): Velocity of the fluid at the inlet.
        reynolds (float): Reynolds number.
        radius (float): Radius of the cynlinder.
    """

    __slots__ = ("u_inf", "reynolds", "nu", "radius")

    def __init__(self, u_inf: float, reynolds: float, radius: float):
        # PDE parameters
        self.u_inf = u_inf
        self.reynolds = reynolds
        self.radius = radius
        self.nu = (self.u_inf * 2 * self.radius) / self.reynolds
        print(f"Viscosity: {self.nu:.4f}")

    def equation(self, x: torch.Tensor, y: torch.Tensor) -> torch.Tensor:
        """Defines the ODE.

        Args:
            x (torch.Tensor): Input of the neural network.
            y (torch.Tensor): Output of the neural network.

        Returns:
            (torch.Tensor): Residual of the differential equation.

        """
        ux, uy, p = y[:, 0:1], y[:, 1:2], y[:, 2:]

        ux_x = cast(torch.Tensor, dde.grad.jacobian(y, x, i=0, j=0))
        ux_y = cast(torch.Tensor, dde.grad.jacobian(y, x, i=0, j=1))
        ux_t = cast(torch.Tensor, dde.grad.jacobian(y, x, i=0, j=2))
        uy_x = cast(torch.Tensor, dde.grad.jacobian(y, x, i=1, j=0))
        uy_y = cast(torch.Tensor, dde.grad.jacobian(y, x, i=1, j=1))
        uy_t = cast(torch.Tensor, dde.grad.jacobian(y, x, i=1, j=2))
        p_x = cast(torch.Tensor, dde.grad.jacobian(y, x, i=2, j=0))
        p_y = cast(torch.Tensor, dde.grad.jacobian(y, x, i=2, j=1))

        ux_xx = cast(torch.Tensor, dde.grad.hessian(y, x, i=0, j=0, component=0))
        ux_xy = cast(torch.Tensor, dde.grad.hessian(y, x, i=0, j=1, component=0))
        ux_yy = cast(torch.Tensor, dde.grad.hessian(y, x, i=1, j=1, component=0))
        uy_xx = cast(torch.Tensor, dde.grad.hessian(y, x, i=0, j=0, component=1))
        uy_xy = cast(torch.Tensor, dde.grad.hessian(y, x, i=0, j=1, component=1))
        uy_yy = cast(torch.Tensor, dde.grad.hessian(y, x, i=1, j=1, component=1))

        pde = [
            ux_t + ux * ux_x + uy * ux_y + p_x - self.nu * (ux_xx + ux_yy),
            uy_t + ux * uy_x + uy * uy_y + p_y - self.nu * (uy_xx + uy_yy),
            ux_x + uy_y,
        ]

        return pde

In [481]:
# Equation parameters

u_inf = 1.5
reynolds = 20
radius = 0.05

# Neural network parameters
input_size = [3]
output_size = [3]
layers_sizes = input_size + [16, 32, 16] + output_size
activation = "tanh"
initializer = "Glorot normal"

# Training parameters
optimizer_kw = dict(
    lr=0.001,
    # metrics=["l2 relative error"],
    # loss_weights=[0.001, 1, 1],
)
# n_iterations = 10_000
n_iterations = 6_000

# Mesh parameters
n_training_inside = 2000
n_training_bdy = 500
n_training_initial = 250
# n_test = 5000

# Ranges
L = 2.2
D = 0.41
x_begin = 0
x_end = L
y_begin = 0
y_end = D
t_begin = 0
t_end = 5

x_disk = 0.2
y_disk = 0.2


def solve_problem(
    reynolds: float, t_end: float, radius: float
) -> tuple[LaminarFlow2D, dde.Model, dde.model.LossHistory, dde.model.TrainState]:
    """Given the parameters, defines the model and trains the model."""
    # System
    umean = 2 * u_inf / 3
    laminar_eq = LaminarFlow2D(u_inf=umean, reynolds=reynolds, radius=radius)

    # Geometry description
    outer = dde.geometry.Rectangle(xmin=[x_begin, y_begin], xmax=[x_end, y_end])
    inner = dde.geometry.Disk([x_disk, y_disk], radius)
    # geom_space = outer
    geom_space = outer - inner
    geom_time = dde.geometry.TimeDomain(t_begin, t_end)
    geom_full = dde.geometry.GeometryXTime(geom_space, geom_time)

    def parabolic_x(x):
        profile = 4 * x[:, 1] * (D - x[:, 1]) / D**2
        profile = profile.reshape((len(x), 1))
        return profile

    def val_array_cst(val: float):
        def func(x: np.ndarray):
            return val + np.zeros((len(x), 1))

        return func

    def boundary_disk(x, on_boundary):
        return dde.utils.isclose(
            np.power(x[0] - x_disk, 2) + np.power(x[1] - y_disk, 2), np.power(radius, 2)
        )

    def boundary_edges(x, on_boundary):
        return on_boundary and (
            dde.utils.isclose(x[1], y_end) or dde.utils.isclose(x[1], y_begin)
        )

    def boundary_outflow(x, on_boundary):
        return on_boundary and dde.utils.isclose(x[0], x_end)

    def boundary_inflow(x, on_boundary):
        return on_boundary and dde.utils.isclose(x[0], x_begin)

    def boundary_inner(x, on_boundary):
        return on_boundary and inner.on_boundary(x[:2])

    def normal_vel(inputs, outputs, X):
        ux = outputs[:, 0]
        uy = outputs[:, 1]
        res = torch.sqrt(torch.pow(ux, 2) + torch.pow(uy, 2))

        return res

    # BCs
    # No slip on the sphere
    bc_noslip = [
        # dde.icbc.OperatorBC(geom_full, normal_vel, boundary_inner),
        # dde.icbc.OperatorBC(geom_full, normal_vel, boundary_inner),
        dde.icbc.DirichletBC(geom_full, val_array_cst(0), boundary_inner, component=0),
        dde.icbc.DirichletBC(geom_full, val_array_cst(0), boundary_inner, component=1),
    ]
    # Inlet
    bc_in = [
        dde.icbc.DirichletBC(geom_full, parabolic_x, boundary_inflow, component=0),
        dde.icbc.DirichletBC(geom_full, val_array_cst(0), boundary_inflow, component=1),
        # dde.icbc.NeumannBC(
        #     geom_full,
        #     # lambda x, y, _: dde.grad.jacobian(y, x, i=2, j=0),
        #     val_array_cst(0),
        #     boundary_outflow,
        #     component=2,
        # ),
    ]
    # Outlet
    bc_out = [
        # dde.icbc.NeumannBC(
        #     geom_full,
        #     # lambda x, y, _: dde.grad.jacobian(y, x, i=0, j=0),
        #     val_array_cst(0),
        #     boundary_outflow,
        #     component=0,
        # ),
        # dde.icbc.NeumannBC(
        #     geom_full,
        #     # lambda x, y, _: dde.grad.jacobian(y, x, i=1, j=0),
        #     val_array_cst(0),
        #     boundary_outflow,
        #     component=1,
        # ),
        # dde.icbc.DirichletBC(
        #     geom_full, val_array_cst(0), boundary_outflow, component=2
        # ),
        dde.icbc.RobinBC(
            geom_full,
            lambda x, y: -y[:, 2] / laminar_eq.nu,
            boundary_outflow,
            component=0,
        ),
        dde.icbc.RobinBC(geom_full, lambda x, y: 0, boundary_outflow, component=1),
        dde.icbc.DirichletBC(
            geom_full, val_array_cst(0), boundary_outflow, component=2
        ),
    ]
    # Edges
    bc_edges = [
        dde.icbc.DirichletBC(geom_full, val_array_cst(0), boundary_edges, component=0),
        dde.icbc.DirichletBC(geom_full, val_array_cst(0), boundary_edges, component=1),
        # dde.icbc.NeumannBC(
        #     geom_full,
        #     # lambda x, y, _: dde.grad.jacobian(y, x, i=2, j=1),
        #     val_array_cst(0),
        #     boundary_edges,
        #     component=2,
        # ),
    ]

    # Initial value
    ic = [
        dde.icbc.IC(
            geom_full,
            parabolic_x,
            lambda _, on_initial: on_initial,
            component=0,
        ),
        dde.icbc.IC(
            geom_full, val_array_cst(0), lambda _, on_initial: on_initial, component=1
        ),
        # dde.icbc.IC(
        #     geom_full, lambda x: 0, lambda _, on_initial: on_initial, component=2
        # ),
    ]

    # Load data
    all_bcs = [
        *bc_noslip,
        *bc_in,
        *bc_out,
        *bc_edges,
        *ic,
    ]
    data = dde.data.TimePDE(
        geom_full,
        laminar_eq.equation,
        all_bcs,
        num_domain=n_training_inside,
        num_boundary=n_training_bdy,
        num_initial=n_training_initial,
        # num_test=n_training_inside,
    )

    neural_net = dde.nn.FNN(layers_sizes, activation, initializer)
    # neural_net.apply_output_transform(lambda x, y: abs(y))
    model = dde.Model(data, neural_net)
    model.compile("adam", **optimizer_kw)
    losshistory, train_state = model.train(0)
    initial_losses = losshistory.loss_train[0]
    loss_weights = (len(all_bcs) + 3) / (initial_losses + 1e-5)
    # loss_weights = 1.0 / (initial_losses + 1e-5)

    model.compile("adam", loss_weights=loss_weights, **optimizer_kw)
    # model.train(iterations=n_iterations)
    # model.compile("L-BFGS")
    losshistory, train_state = model.train(iterations=n_iterations)
    model.compile("L-BFGS-B")
    losshistory, train_state = model.train(iterations=n_iterations)

    return laminar_eq, model, losshistory, train_state

In [482]:
def get_predictions(
    laminar_eq: LaminarFlow2D, model: dde.Model, n_dim: int, n_time: int
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
    x = np.linspace(x_begin, x_end, n_dim)
    y = np.linspace(y_begin, y_end, n_dim)
    t = np.linspace(t_begin, t_end, n_time)
    X, Y, T = np.meshgrid(x, y, t)
    grid = np.stack([X.flatten(), Y.flatten(), T.flatten()], axis=1)

    y_predict = model.predict(grid)
    y_predict = y_predict.reshape((n_dim, n_dim, n_time, 3))
    return (x, y, t), y_predict

In [483]:
def mae(y_true: np.ndarray, y_predict: np.ndarray) -> float:
    return np.mean(np.abs(y_true - y_predict))


def plot_result(coords: np.ndarray, y_predict: np.ndarray, radius: float) -> go.Figure:
    width = 900
    height = 400
    scale = 3

    vabs = np.sqrt(
        np.power(y_predict[:, :, :, 0], 2) + np.power(y_predict[:, :, :, 1], 2)
    )

    frames_data = [
        go.Heatmap(
            z=vabs[:, :, k],
            # z=y_predict[:, :, k, 2],
            x=coords[0],
            y=coords[1],
            colorscale="AgSunset",
            zmin=0,
            zmax=1.5,
            colorbar=dict(
                title=dict(text="Absolute velocity", side="bottom"),
                orientation="h",
                x=0.5,
                y=-0.2,
                xanchor="center",
                yanchor="top",
            ),
        )
        for k in range(y_predict.shape[2])
    ]

    def get_title_timestep(k: int):
        return f"Time Step: {coords[2][k]:.2f}"

    fig = go.Figure(
        data=frames_data[0],
        layout=dict(
            autosize=False,
            width=width,
            height=height,
            title=dict(
                text=get_title_timestep(0),
                y=0.91,
                x=0.5,
                xanchor="center",
                yanchor="bottom",
            ),
            xaxis=dict(title=dict(text="x")),
            yaxis=dict(title=dict(text="y")),
            font=dict(size=14),
            margin=dict(l=40, r=40, t=60, b=60),
            updatemenus=[
                dict(
                    type="buttons",
                    buttons=[
                        dict(
                            label="Play",
                            method="animate",
                            args=[
                                None,
                                {
                                    "frame": {
                                        "duration": 500,
                                        "redraw": True,
                                        "mode": "next",
                                    },
                                    "fromcurrent": True,
                                },
                            ],
                        )
                    ],
                )
            ],
        ),
        frames=[
            go.Frame(
                data=frames_data[k],
                layout=go.Layout(title={"text": get_title_timestep(k)}),
            )
            for k in range(len(frames_data))
        ],
    )
    fig.add_shape(
        type="circle",
        xref="x",
        yref="y",
        fillcolor="Black",
        x0=x_disk - radius,
        y0=y_disk - radius,
        x1=x_disk + radius,
        y1=y_disk + radius,
        line_color="Black",
    )
    # fig.write_image(
    #     f"../figs/laminar_flow.png",
    #     width=width,
    #     height=height,
    #     scale=scale,
    # )
    return fig

In [None]:
reynolds = 30
t_end = 5
radius = 0.05
# Training parameters
optimizer_kw = dict(
    lr=0.001,
)
heat_eq, model, loss_history, train_state = solve_problem(
    reynolds=reynolds, t_end=t_end, radius=radius
)
dde.saveplot(loss_history, train_state, issave=False, isplot=True)
# plot_result(train_state, name_case="underdamped")

Viscosity: 0.0033
Compiling model...
'compile' took 0.000334 s

Training model...

0         [1.41e-01, 2.13e-02, 1.60e-01, 2.15e-01, 3.60e-04, 1.03e+00, 3.22e-03, 3.60e+04, 6.58e-03, 4.01e-01, 6.44e-01, 2.63e-02, 2.32e+00, 4.44e-02]    [1.41e-01, 2.13e-02, 1.60e-01, 2.15e-01, 3.60e-04, 1.03e+00, 3.22e-03, 3.60e+04, 6.58e-03, 4.01e-01, 6.44e-01, 2.63e-02, 2.32e+00, 4.44e-02]    []  

Best model at step 0:
  train loss: 3.60e+04
  test loss: 3.60e+04
  test metric: []

'train' took 0.029333 s

Compiling model...
'compile' took 0.000216 s

Training model...

Step      Train loss                                                                                                                                      Test loss                                                                                                                                       Test metric
0         [1.40e+01, 1.40e+01, 1.40e+01, 1.40e+01, 1.36e+01, 1.40e+01, 1.40e+01, 1.40e+01, 1.40e+01, 1.40e+01, 1.40e+01, 1.40e+

In [None]:
with torch.no_grad():
    torch.cuda.empty_cache()

In [None]:
coords, y_predict = get_predictions(heat_eq, model, 100, 100)
# plot_result(coords, y_predict, n=1, kappa=0.1)

In [None]:
plot_result(coords, y_predict, radius=radius)