# Navier Stokes Equatoin

In [1]:
import numpy as np
import time
import scipy.sparse.linalg as sp_la

# --- Core imports ---
from pycutfem.core.mesh import Mesh
from pycutfem.core.dofhandler import DofHandler
from pycutfem.utils.meshgen import structured_quad

# --- UFL-like imports ---
from pycutfem.ufl.functionspace import FunctionSpace
from pycutfem.ufl.expressions import (
    TrialFunction, TestFunction, VectorTrialFunction, VectorTestFunction,
    Function, VectorFunction, Constant, grad, inner, dot, div
)
from pycutfem.ufl.measures import dx
from pycutfem.ufl.forms import BoundaryCondition, assemble_form
from pycutfem.fem.mixedelement import MixedElement

# 1. ============================================================================
#    SETUP (Meshes, DofHandler, BCs)
# ===============================================================================
L, H = 1.0, 1.0
NX, NY = 16, 6 # Increased resolution for better visualization
nodes_q2, elems_q2, _, corners_q2 = structured_quad(L, H, nx=NX, ny=NY, poly_order=2)
mesh_q2 = Mesh(nodes=nodes_q2, element_connectivity=elems_q2, elements_corner_nodes=corners_q2, element_type="quad", poly_order=2)
mixed_element = MixedElement(mesh_q2, field_specs={'ux': 2, 'uy': 2, 'p': 1})

dof_handler = DofHandler(mixed_element, method='cg')


# Tag boundaries for applying BCs
bc_tags = {
    'bottom_wall': lambda x,y: np.isclose(y,0),
    'left_wall':   lambda x,y: np.isclose(x,0),
    'right_wall':  lambda x,y: np.isclose(x,L),
    'top_lid':     lambda x,y: np.isclose(y,H)
}
mesh_q2.tag_boundary_edges(bc_tags)

# Tag a single node for pressure pinning
dof_handler.tag_dof_by_locator(
        tag='pressure_pin_point',
        field='p',
        locator=lambda x, y: np.isclose(x, 0) and np.isclose(y, 0),
        find_first=True
    )

class DataBC:
    Um = 1.5 # Lid velocity
    H = H
    T = 2.0 # Total time for simulation
    
# bcs = [
#     # No-slip on bottom, left, and right walls
#     BoundaryCondition('ux', 'dirichlet', 'bottom_wall', lambda x,y:0.0),
#     BoundaryCondition('uy', 'dirichlet', 'bottom_wall', lambda x,y:0.0),
#     BoundaryCondition('ux', 'dirichlet', 'left_wall',   lambda x,y: 4 * DataBC.Um * y * (DataBC.H-y) / (DataBC.H**2)),
#     BoundaryCondition('uy', 'dirichlet', 'left_wall',   lambda x,y:0.0),
#     BoundaryCondition('ux', 'dirichlet', 'right_wall',  lambda x,y:0.0),
#     BoundaryCondition('uy', 'dirichlet', 'right_wall',  lambda x,y:0.0),
#     # Moving lid on the top wall
#     BoundaryCondition('ux', 'dirichlet', 'top_lid',     lambda x,y:0.0),
#     BoundaryCondition('uy', 'dirichlet', 'top_lid',    lambda x,y:0.0),
#     # Pin pressure at one point to ensure a unique solution
#     BoundaryCondition('p', 'dirichlet', 'pressure_pin_point',lambda x,y: 0.0)
# ]
t=0.0

bcs = [
    # No-slip on bottom, left, and right walls
    BoundaryCondition('ux', 'dirichlet', 'bottom_wall', lambda x,y: 0.0),
    BoundaryCondition('uy', 'dirichlet', 'bottom_wall', lambda x,y: 0.0),
    BoundaryCondition('ux', 'dirichlet', 'left_wall',   lambda x,y: 0.0),
    BoundaryCondition('uy', 'dirichlet', 'left_wall',   lambda x,y: 0.0),
    BoundaryCondition('ux', 'dirichlet', 'right_wall',  lambda x,y: 0.0),
    BoundaryCondition('uy', 'dirichlet', 'right_wall',  lambda x,y: 0.0),
    # Moving lid on the top wall
    BoundaryCondition('ux', 'dirichlet', 'top_lid',     lambda x,y: DataBC.Um * np.sin(t * np.pi)), # Corrected: Was 0.0
    BoundaryCondition('uy', 'dirichlet', 'top_lid',     lambda x,y: 0.0),
    # Pin pressure at one point to ensure a unique solution
    BoundaryCondition('p', 'dirichlet', 'pressure_pin_point', lambda x,y: 0.0)
]

# Create the corresponding homogeneous BCs for the Newton update
bcs_homog = [
    BoundaryCondition(bc.field, bc.method, bc.domain_tag,lambda x,y: 0.0) for bc in bcs
]

print(f"Naviver info: {dof_handler.info()}")

=== DofHandler (CG) ===
        ux: 429 DOFs @ offset 0
        uy: 429 DOFs @ offset 9
         p: 119 DOFs @ offset 18
  total : 977
Naviver info: None


In [2]:
# 2. ============================================================================
#    UFL FORMULATION
# ===============================================================================

# --- Define Constants and Function Spaces ---
rho = Constant(1.0)
dt = Constant(0.1)
theta = Constant(0.5) # Crank-Nicolson
mu = Constant(1.0e-1)

velocity_space = FunctionSpace("velocity", ['ux', 'uy'])
pressure_space = FunctionSpace("pressure", ['p'])

# --- Define all required UFL Functions ---
# Trial functions for the Jacobian system (the corrections)
du = VectorTrialFunction(velocity_space, dof_handler=dof_handler)
dp = TrialFunction(pressure_space, dof_handler=dof_handler)

# Test functions
v = VectorTestFunction(velocity_space, dof_handler=dof_handler)
q = TestFunction(pressure_space, dof_handler=dof_handler)

# Functions to hold solutions at different time steps/iterations
# u_k/p_k are the current Newton iteration k at time t_n+1
# u_n/p_n are the converged solution from the previous time step t_n
u_k = VectorFunction(name="u_k", field_names=['ux', 'uy'], dof_handler=dof_handler)
p_k = Function(name="p_k", field_name='p', dof_handler=dof_handler)
u_n = VectorFunction(name="u_n", field_names=['ux', 'uy'], dof_handler=dof_handler)
p_n = Function(name="p_n", field_name='p', dof_handler=dof_handler)

# --- Define Residual R(u_k, p_k) and Jacobian J ---
# The residual is the transient Navier-Stokes equation F(U_k) = 0
# where U_k = (u_k, p_k)

In [3]:


jacobian = (
    # Time derivative term: d/dt(u) -> du/dt
    rho * dot(du, v) / dt  
    
    # Convection term: theta * [ ((du ⋅ ∇)u_k) ⋅ v + ((u_k ⋅ ∇)du) ⋅ v ]
    +theta * rho * dot(dot(grad(u_k),du ) , v)  
    +theta * rho * dot(dot(grad(du),u_k ), v)   

    # Diffusion term: theta * mu * (∇du : ∇v)
    +theta * mu * inner(grad(du), grad(v)) 
    
    # Pressure term: -theta * dp * (∇⋅v)
    -dp * div(v)  
    
    # Continuity term: q * (∇⋅du)
    +q * div(du) 
) * dx()

residual = (
    # Time derivative: -(u_k - u_n)/dt ⋅ v
    rho * dot(u_k - u_n, v) / dt  
    + (rho*theta) * dot(  dot(grad(u_k),u_k ) , v) 
    + (rho - (rho*theta)) * dot(   dot(grad(u_n),u_n   ) , v) 
    # Diffusion terms at theta and (1-theta)
    +(theta * mu * inner(grad(u_k), grad(v)) ) 
    +((1.0 - theta) * mu * inner(grad(u_n), grad(v)) ) 
    
    # Pressure terms
    - p_k * div(v) 
    # Continuity term
    +q * div(u_k)  
    
    ) * dx()



In [6]:
# 3. ============================================================================
#    SOLVER LOOP (Corrected Implementation)
# ===============================================================================
import matplotlib.pyplot as plt
import scipy.sparse.linalg as sp_la
from pycutfem.ufl.forms import assemble_form

# --- Simulation Parameters ---
T_end = DataBC.T  # Total simulation time
dt_val = dt.value
num_steps = int(T_end / dt_val)
newton_tol = 1e-6
max_newton_iter = 10

# --- Initialize Solution Functions ---
# Note: It's good practice to initialize to zero, but the first step of the
# time loop will set the correct state from u_n anyway.
u_k.nodal_values.fill(0.0)
p_k.nodal_values.fill(0.0)
u_n.nodal_values.fill(0.0)
p_n.nodal_values.fill(0.0)

# --- Apply Initial Conditions ---
# Use the new robust `apply_bcs` method. It correctly handles the mapping
# from global BCs to the specific DOFs within each function object.
print("Applying initial boundary conditions...")
dof_handler.apply_bcs(bcs, u_n, p_n)

# --- Main Time-Stepping Loop ---
for n in range(num_steps):
    t = (n + 1) * dt_val
    print(f"\n--- Solving Time Step {n+1}/{num_steps} | t = {t:.2f}s ---")

    # Set initial guess for Newton iteration: u_k = u_n, p_k = p_n
    u_k.nodal_values[:] = u_n.nodal_values[:]
    p_k.nodal_values[:] = p_n.nodal_values[:]
    
    # Apply non-homogeneous BCs to the initial guess for this time step.
    # This ensures the iteration starts from a state that respects the boundaries.
    dof_handler.apply_bcs(bcs, u_k, p_k)

    # --- Inner Newton Iteration Loop ---
    for k in range(max_newton_iter):
        print(f"  Newton iteration {k+1}")

        # Jacobian and Residual definitions (as provided by user)
        jacobian = (
            rho * dot(du, v) / dt
            + theta * rho * dot(dot(grad(u_k),du ), v)
            + theta * rho * dot(dot(grad(du),u_k ), v)
            + theta * mu * inner(grad(du), grad(v))
            - dp * div(v)
            + q * div(du)
        ) * dx()

        residual = (
                    # Time derivative: -(u_k - u_n)/dt ⋅ v
                    rho * dot(u_k - u_n, v) / dt  
                    + (rho*theta) * dot(  dot(grad(u_k),u_k, ) , v) 
                    + (rho - (rho*theta)) * dot(   dot(grad(u_n),u_n  ) , v) 
                    # Diffusion terms at theta and (1-theta)
                    +(theta * mu * inner(grad(u_k), grad(v)) ) 
                    +((1.0 - theta) * mu * inner(grad(u_n), grad(v)) ) 
                    
                    # Pressure terms
                    - p_k * div(v) 
                    # Continuity term
                    +q * div(u_k)  
                    
                    ) * dx()

        # Assemble Jacobian matrix A and residual vector R
        # We solve J*delta_U = -R, so we pass -residual to the assembler.
        A, R_vec = assemble_form(jacobian == -residual, dof_handler=dof_handler, bcs=bcs_homog, quad_order=6)
        
        norm_res = np.linalg.norm(R_vec)
        print(f"    Residual Norm: {norm_res:.3e}")

        if norm_res < newton_tol:
            print(f"    Newton converged in {k+1} iterations.")
            break
        
        # Solve the linear system for the correction dU = (du, dp)
        delta_U = sp_la.spsolve(A, R_vec)
        
        # --- Update the solution using the robust DofHandler methods ---
        # 1. Add the correction vector to the function objects.
        #    This handles the mapping from the global delta_U vector to the
        #    local data arrays of u_k and p_k.
        dof_handler.add_to_functions(delta_U, [u_k, p_k])

        # 2. Re-apply the non-homogeneous boundary conditions.
        #    The solver calculates a correction for ALL DOFs, including those
        #    on the boundary. We must "clamp" the boundary DOFs back to their
        #    prescribed values after the update.
        dof_handler.apply_bcs(bcs, u_k, p_k)

    else: # This runs if the for loop completes without breaking
        raise RuntimeError(f"Newton's method did not converge after {max_newton_iter} iterations at t={t:.2f}s.")

    # Update solution for the next time step
    u_n.nodal_values[:] = u_k.nodal_values[:]
    p_n.nodal_values[:] = p_k.nodal_values[:]

    # Optional: Plot solution at the end of the time step
    plt.figure()
    u_n.plot(kind='quiver', title=f"Velocity Magnitude at t={t:.2f}s")
    # u_n.plot(kind='contour', title=f"Velocity Magnitude at t={t:.2f}s")
    plt.figure()
    p_n.plot(title=f"Pressure at t={t:.2f}s")
    plt.show() # Make sure to show the plot
    
    # Optional: You can also plot pressure
    # plt.figure()
    # p_n.plot(title=f"Pressure at t={t:.2f}s")
    # plt.show()

print("\nSimulation finished successfully!")


Applying initial boundary conditions...

--- Solving Time Step 1/20 | t = 0.10s ---
  Newton iteration 1


  res_53 = div_52.T @ basis_reshaped_46
  res_62 = basis_reshaped_55.T @ div_61


NumbaTypeError: Failed in nopython mode pipeline (step: Literal propagation)
Failed in literal_propagation_subpipeline mode pipeline (step: performs propagation of literal values)
isinstance() cannot determine the type of variable "res_4" due to a branch.

File "../../../.cache/pycutfem_jit/_pycutfem_kernel_e90868d7a9cec1aaa78cc5a34f50ce5c3439fd5f52baef74f562d60912e601b0.py", line 52:
def kernel(
    <source elided>
                res_5 = res_4 * 10.0
            elif np.isscalar(10.0) and (np.isscalar(res_4) or isinstance(res_4,np.ndarray)):
            ^

During: Pass PropagateLiterals
During: Pass LiteralPropagation