# Estimation of Navier-Stokes equation
## Done by: Andreea-Ioana Florea

In [78]:
print("Hello, world!!!")

Hello, world!!!


In [79]:
import tensorflow as tf
import deepxde as dde
import numpy as np
import matplotlib.pyplot as plt

In [80]:
# Physical parameters
rho = 1  # Density
mu = 1   # Viscosity
u_in = 1
D = 1
L = 2

# Define geometry: A rectangle
#geom = dde.geometry.Rectangle(xmin=[-L / 2, -D / 2], xmax=[L / 2, D / 2])
# Main channel
channel = dde.geometry.Rectangle(xmin=[-L / 2, -D / 2], xmax=[L / 2, D / 2])

# Obstacle (Cylinder)
center = [0, 0]
radius = 0.1
cylinder = dde.geometry.Disk(center, radius)

# Combined geometry
geom = channel - cylinder

# 1. Define the Heart Shape coordinates
t = np.linspace(0, 2 * np.pi, 100)
# Parametric heart equations scaled down to size 0.1
heart_x = 0.1 * (16 * np.sin(t)**3) / 16
heart_y = 0.1 * (13 * np.cos(t) - 5 * np.cos(2*t) - 2 * np.cos(3*t) - np.cos(4*t)) / 16

# Shift the heart to the LEFT of the center (x = -0.5)
heart_points = np.vstack([heart_x - 0.5, heart_y]).T
heart_shape = dde.geometry.Polygon(heart_points)

# 2. Define the Cylinder and Channel
channel = dde.geometry.Rectangle(xmin=[-L / 2, -D / 2], xmax=[L / 2, D / 2])
cylinder = dde.geometry.Disk([0, 0], 0.1)

# 3. Combine: Subtract BOTH from the channel
# This creates a channel with two holes
geom = channel - cylinder - heart_shape

  self.normal = self.normal / np.linalg.norm(self.normal, axis=1).reshape(-1, 1)


In [81]:
# Define boundary conditions
def boundary_wall(X, on_boundary):
    on_wall = np.logical_and(
        np.logical_or(
            np.isclose(X[1], -D / 2, rtol=1e-05, atol=1e-08),
            np.isclose(X[1], D / 2, rtol=1e-05, atol=1e-08),
        ),
        on_boundary,
    )
    return on_wall

def boundary_inlet(X, on_boundary):
    on_inlet = np.logical_and(
        np.isclose(X[0], -L / 2, rtol=1e-05, atol=1e-08), on_boundary
    )
    return on_inlet

def boundary_outlet(X, on_boundary):
    on_outlet = np.logical_and(
        np.isclose(X[0], L / 2, rtol=1e-05, atol=1e-08), on_boundary
    )
    return on_outlet

def boundary_cylinder(X, on_boundary):
    # Check if the point is on the boundary and NOT on the outer channel walls
    return on_boundary and not (
        np.isclose(X[0], -L/2) or np.isclose(X[0], L/2) or
        np.isclose(X[1], -D/2) or np.isclose(X[1], D/2)
    )

def boundary_obstacles(X, on_boundary):
    # Logic: It's an obstacle if it's on a boundary but NOT the external box edges
    on_outer_wall = np.logical_or(
        np.logical_or(np.isclose(X[0], -L/2), np.isclose(X[0], L/2)),
        np.logical_or(np.isclose(X[1], -D/2), np.isclose(X[1], D/2))
    )
    return on_boundary and not on_outer_wall

# No-slip for all internal objects
bc_obs_u = dde.DirichletBC(geom, lambda X: 0.0, boundary_obstacles, component=0)
bc_obs_v = dde.DirichletBC(geom, lambda X: 0.0, boundary_obstacles, component=1)

bc_cylinder_u = dde.DirichletBC(geom, lambda X: 0.0, boundary_cylinder, component=0)
bc_cylinder_v = dde.DirichletBC(geom, lambda X: 0.0, boundary_cylinder, component=1)

bc_wall_u = dde.DirichletBC(geom, lambda X: 0.0, boundary_wall, component=0)
bc_wall_v = dde.DirichletBC(geom, lambda X: 0.0, boundary_wall, component=1)

bc_inlet_u = dde.DirichletBC(geom, lambda X: u_in, boundary_inlet, component=0)
bc_inlet_v = dde.DirichletBC(geom, lambda X: 0.0, boundary_inlet, component=1)

bc_outlet_p = dde.DirichletBC(geom, lambda X: 0.0, boundary_outlet, component=2)
bc_outlet_v = dde.DirichletBC(geom, lambda X: 0.0, boundary_outlet, component=1)

In [82]:
# Define PDE system
def pde(X, Y):
    """Args:
    Y: network output [u, v, p],
    X: input coordinates [x, y]"""
    du_x = dde.grad.jacobian(Y, X, i=0, j=0)
    du_y = dde.grad.jacobian(Y, X, i=0, j=1)
    dv_x = dde.grad.jacobian(Y, X, i=1, j=0)
    dv_y = dde.grad.jacobian(Y, X, i=1, j=1)
    dp_x = dde.grad.jacobian(Y, X, i=2, j=0)
    dp_y = dde.grad.jacobian(Y, X, i=2, j=1)

    du_xx = dde.grad.hessian(Y, X, component=0, i=0, j=0)
    du_yy = dde.grad.hessian(Y, X, component=0, i=1, j=1)
    dv_xx = dde.grad.hessian(Y, X, component=1, i=0, j=0)
    dv_yy = dde.grad.hessian(Y, X, component=1, i=1, j=1)

    pde_u = Y[:, 0:1] * du_x + Y[:, 1:2] * du_y + 1 / rho * dp_x - (mu / rho) * (du_xx + du_yy)
    pde_v = Y[:, 0:1] * dv_x + Y[:, 1:2] * dv_y + 1 / rho * dp_y - (mu / rho) * (dv_xx + dv_yy)
    pde_cont = du_x + dv_y


    return [pde_u, pde_v, pde_cont]

In [83]:
data = dde.data.PDE(
    geom,
    pde,
    [
        bc_wall_u, bc_wall_v,
        bc_inlet_u, bc_inlet_v,
        bc_outlet_p, bc_outlet_v,
        bc_obs_u, bc_obs_v  # Applies to both Heart and Cylinder
    ],
    num_domain=3000,    # Increase domain points for more detail
    num_boundary=600,   # Increase boundary points to define the heart edges
    num_test=200,
)

  v = (self.vertices[i + 1] - self.vertices[i]) / self.diagonals[i, i + 1]




In [84]:
# Define neural network
net = dde.maps.FNN([2] + [64] * 5 + [3], "tanh", "Glorot uniform")
model = dde.Model(data, net)

In [None]:
# Compile and train
model.compile("adam", lr=1e-3)
losshistory, train_state = model.train(epochs=20000)

Compiling model...
'compile' took 0.002389 s

Training model...

Cause: could not parse the source code of <function <lambda> at 0x000002F030401FC0>: no matching AST found among candidates:
# coding=utf-8
lambda x, on: np.array([on_boundary(x[i], on[i]) for i in range(len(x))])
# coding=utf-8
lambda x, on: np.array([on_boundary1(x[i], on[i]) for i in range(len(x))])
# coding=utf-8
lambda x, on: np.array([on_boundary2(x[i], on[i]) for i in range(len(x))])
Cause: could not parse the source code of <function <lambda> at 0x000002F030401FC0>: no matching AST found among candidates:
# coding=utf-8
lambda x, on: np.array([on_boundary(x[i], on[i]) for i in range(len(x))])
# coding=utf-8
lambda x, on: np.array([on_boundary1(x[i], on[i]) for i in range(len(x))])
# coding=utf-8
lambda x, on: np.array([on_boundary2(x[i], on[i]) for i in range(len(x))])
Cause: could not parse the source code of <function <lambda> at 0x000002F09A820790>: no matching AST found among candidates:
# coding=utf-8
lambda 

In [None]:
# Visualize training points
plt.figure(figsize=(10, 8))
plt.scatter(data.train_x_all[:, 0], data.train_x_all[:, 1], s=0.5)
plt.xlabel("x")
plt.ylabel("y")
plt.show()

# Predict and plot results
samples = geom.random_points(500000)
result = model.predict(samples)

color_legend = [[0, 1.5], [-0.3, 0.3], [0, 35]]
for idx in range(3):
    plt.figure(figsize=(20, 4))
    plt.scatter(samples[:, 0], samples[:, 1], c=result[:, idx], cmap="jet", s=2)
    plt.colorbar()
    plt.clim(color_legend[idx])
    plt.xlim((0 - L / 2, L - L / 2))
    plt.ylim((0 - D / 2, D - D / 2))
    plt.tight_layout()
    plt.show()

In [None]:
import os

num_frames = 150
os.makedirs("frames", exist_ok=True)

# Reuse the same samples
samples = geom.random_points(300000)

for i in range(num_frames):
    result = model.predict(samples)

    plt.figure(figsize=(20, 4))
    plt.scatter(
        samples[:, 0],
        samples[:, 1],
        c=result[:, 0],   # u-velocity (change to 1 or 2 if needed)
        cmap="jet",
        s=2
    )
    plt.colorbar()
    plt.clim(0, 1.5)
    plt.xlim((-L / 2, L / 2))
    plt.ylim((-D / 2, D / 2))
    plt.title(f"Frame {i+1}")
    plt.tight_layout()

    plt.savefig(f"frames/frame_{i:04d}.png", dpi=150)
    plt.close()
