# Simple fluid simulator

The goal is here to define a simple and efficient fluid simulator reusable for other projects.

The code below is a copy from : https://github.com/takah29/2d-fluid-simulator
Taking only some features, commenting and adapted to a notebook.

The used methods are the following :
- Pressure with Jacobi Iterations
- Pressure with Red-Black SOR Method
- Vorticity confinement 
- Advection with the CIP method

## Initialisation of the Kernel

In [1]:
import os
import taichi as ti
import numpy as np
import matplotlib.pyplot as plt

from abc import ABCMeta, abstractmethod
from pathlib import Path

ti.init(arch=ti.gpu, kernel_profiler = True)

VELOCITY_LIMIT = 10

[Taichi] version 1.7.3, llvm 15.0.4, commit 5ec301be, linux, python 3.12.2


[I 07/16/25 19:36:31.237 959] [shell.py:_shell_pop_print@23] Graphical python shell detected, using wrapped sys.stdout


[Taichi] Starting on arch=cuda


## Utilitary functions for the kernels

Advection equation : $$ v.\phi = v_x * \dfrac{\partial \phi}{\partial x} + v_y * \dfrac{\partial \phi}{\partial y} $$

In [2]:
# Give the value of a cell from a position
@ti.func
def sample(field, i, j):
    i = ti.max(0, ti.min(field.shape[0] - 1, i))
    j = ti.max(0, ti.min(field.shape[1] - 1, j))
    idx = ti.Vector([int(i), int(j)])
    return field[idx]

@ti.func
def sign(x):
    return -1.0 if x < 0.0 else 1.0

@ti.func
def diff_x(field, i, j, dx):
    """Central Difference x"""
    return 0.5 * (sample(field, i + 1, j) - sample(field, i - 1, j)) / dx

@ti.func
def diff_y(field, i, j, dx):
    """Central Difference y"""
    return 0.5 * (sample(field, i, j + 1) - sample(field, i, j - 1)) / dx

@ti.func
def diff2_x(field, i, j, dx):
    return (sample(field, i + 1, j) - 2.0 * sample(field, i, j) + sample(field, i - 1, j)) / dx**2


@ti.func
def diff2_y(field, i, j, dx):
    return (sample(field, i, j + 1) - 2.0 * sample(field, i, j) + sample(field, i, j - 1)) / dx**2
    
@ti.func
def advect(vc, phi, i, j, dx):
    """Central Differencing"""
    return vc[i, j].x * diff_x(phi, i, j, dx) + vc[i, j].y * diff_y(phi, i, j, dx)

@ti.func
def advect_kk_scheme(vc, phi, i, j, dx):
    """Kawamura-Kuwabara Scheme

    http://www.slis.tsukuba.ac.jp/~fujisawa.makoto.fu/cgi-bin/wiki/index.php?%B0%DC%CE%AE%CB%A1#y2dbc484
    """
    coef = [-2, 10, -9, 2, -1]
    v = ti.Vector([0.0, 0.0, 0.0, 0.0, 0.0])

    if vc[i, j].x < 0:
        v = ti.Vector(coef)
    else:
        v = -ti.Vector(coef[::-1])

    mx = ti.Matrix.cols(
        [
            sample(phi, i + 2, j),
            sample(phi, i + 1, j),
            sample(phi, i, j),
            sample(phi, i - 1, j),
            sample(phi, i - 2, j),
        ]
    )
    a = mx @ v / (6 * dx)

    if vc[i, j].y < 0:
        v = ti.Vector(coef)
    else:
        v = -ti.Vector(coef[::-1])

    my = ti.Matrix.cols(
        [
            sample(phi, i, j + 2),
            sample(phi, i, j + 1),
            sample(phi, i, j),
            sample(phi, i, j - 1),
            sample(phi, i, j - 2),
        ]
    )
    b = my @ v / (6 * dx)

    return vc[i, j].x * a + vc[i, j].y * b

# Visualisation functions

@ti.func
def visualize_norm(vec):
    c = vec.norm()
    return ti.Vector([c, c, c])


@ti.func
def visualize_pressure(val):
    return ti.Vector([ti.max(val, 0.0), 0.0, ti.max(-val, 0.0)])


@ti.func
def visualize_vorticity(v, i, j, dx):
    val = diff_x(v, i, j, dx).y - diff_y(v, i, j, dx).x
    return ti.Vector([ti.max(val, 0.0), 0.0, ti.max(-val, 0.0)])


@ti.func
def visualize_hue(vec):
    h = ti.atan2(vec.y, vec.x)

    while h < 0:
        h += 2 * pi
    h /= 2 * pi

    m = ti.sqrt(vec.x**2 + vec.y**2)
    ranges = 0.0
    rangee = 10.0

    while m > rangee:
        ranges = rangee
        rangee *= e

    k = (m - ranges) / (rangee - ranges)
    s = k * 2 if k < 0.5 else 1 - (k - 0.5) * 2
    s = 1 - pow(1 - s, 3)
    s = 0.4 + s * 0.6

    v = k * 2 if k < 0.5 else 1 - (k - 0.5) * 2
    v = 1 - v
    v = 1 - pow(1 - v, 3)
    v = 0.6 + v * 0.4

    return _hsv_to_rgb(h, s, v)


@ti.func
def visualize_xy(vec):
    return ti.Vector([vec.y, 0.0, vec.x])


@ti.func
def _hsv_to_rgb(h: float, s: float, v: float):
    if h == 1:
        h = 0
    z = ti.floor(h * 6)
    i = int(z)
    f = float(h * 6 - z)
    p = v * (1 - s)
    q = v * (1 - s * f)
    t = v * (1 - s * (1 - f))

    r = g = b = 1.0

    if i == 0:
        r = v
        g = t
        b = p
    elif i == 1:
        r = q
        g = v
        b = p
    elif i == 2:
        r = p
        g = v
        b = t
    elif i == 3:
        r = p
        g = q
        b = v
    elif i == 4:
        r = t
        g = p
        b = v
    elif i == 5:
        r = v
        g = p
        b = q

    return ti.Vector([r, g, b])


In [1]:
# Methods for updating a field 
class DoubleBuffers:
    def __init__(self, resolution, n_channel):
        if n_channel == 1:
            self.current = ti.field(ti.f32, shape=resolution)
            self.next = ti.field(ti.f32, shape=resolution)
        else:
            self.current = ti.Vector.field(n_channel, ti.f32, shape=resolution)
            self.next = ti.Vector.field(n_channel, ti.f32, shape=resolution)

    def swap(self):
        self.current, self.next = self.next, self.current

    def reset(self):
        self.current.fill(0)
        self.next.fill(0)

NameError: name 'ti' is not defined

## Setup of the walls

- 1 Class for generating the walls field on gpu.
- Functions to implement the walls

In [4]:
@ti.data_oriented
class BoundaryCondition:
    def __init__(self, bc_const, bc_mask, inflow, outflow):
        # Initialize boundary condition by converting numpy arrays to Taichi fields
        self._bc_const, self._bc_mask = BoundaryCondition.to_field(bc_const, bc_mask)
        self.inflow = inflow
        self.outflow = outflow

    @ti.kernel
    def set_velocity_boundary_condition(self, vc: ti.template()):
        # Get boundary mask field (static to optimize access)
        bc_mask = ti.static(self._bc_mask)
        for i, j in vc:
            if (
                bc_mask[i, j] == 1  # Wall boundary
                and 1 <= i < bc_mask.shape[0] - 1  # Not at x-boundaries
                and 1 <= j < bc_mask.shape[1] - 1  # Not at y-boundaries
            ):
                # Handle wall boundary conditions using Karman-Kozeny (KK) scheme
                # Reflect velocity at walls by setting opposite velocities
                
                # Case 1: Wall cell with fluid to the left
                if bc_mask[i - 1, j] == 0 and bc_mask[i, j - 1] == 1 and bc_mask[i, j + 1] == 1:
                    vc[i + 1, j] = -sample(vc, i - 1, j)
                # Case 2: Wall cell with fluid to the right
                elif bc_mask[i + 1, j] == 0 and bc_mask[i, j - 1] == 1 and bc_mask[i, j + 1] == 1:
                    vc[i - 1, j] = -sample(vc, i + 1, j)
                # Case 3: Wall cell with fluid below
                elif bc_mask[i, j - 1] == 0 and bc_mask[i - 1, j] == 1 and bc_mask[i + 1, j] == 1:
                    vc[i, j + 1] = -sample(vc, i, j - 1)
                # Case 4: Wall cell with fluid above
                elif bc_mask[i, j + 1] == 0 and bc_mask[i - 1, j] == 1 and bc_mask[i + 1, j] == 1:
                    vc[i, j - 1] = -sample(vc, i, j + 1)
            elif bc_mask[i, j] == 2:  # Inflow boundary
                # Set velocity to predefined constant value
                vc[i, j] = self._bc_const[i, j]
                #vc[i, j].x = ti.max(sample(vc, i - 1, j).x, 10.05)
                
            elif bc_mask[i, j] == 3:  # Outflow boundary
                # Ensure flow is moving outward with minimum velocity of 0.05
                # This prevents backflow at the outlet
                vc[i, j].x = ti.max(sample(vc, i - 1, j).x, 0.05)
                #vc[i, j].x = ti.max(sample(vc, i - 1, j).x, self.outflow)

    @ti.kernel
    def set_pressure_boundary_condition(self, pc: ti.template()):
        # Get static reference to boundary condition mask for efficiency
        bc_mask = ti.static(self._bc_mask)
        for i, j in pc:
            if bc_mask[i, j] == 1:  # Wall cell
                # Handle special cases at wall boundaries based on neighboring cells
                # Case 1: Wall cell with fluid to the left, walls above and below
                if bc_mask[i - 1, j] == 0 and bc_mask[i, j - 1] == 1 and bc_mask[i, j + 1] == 1:
                    pc[i, j] = sample(pc, i - 1, j)
                # Case 2: Wall cell with fluid to the right, walls above and below
                elif bc_mask[i + 1, j] == 0 and bc_mask[i, j - 1] == 1 and bc_mask[i, j + 1] == 1:
                    pc[i, j] = sample(pc, i + 1, j)
                # Case 3: Wall cell with fluid below, walls to left and right
                elif bc_mask[i, j - 1] == 0 and bc_mask[i - 1, j] == 1 and bc_mask[i + 1, j] == 1:
                    pc[i, j] = sample(pc, i, j - 1)
                # Case 4: Wall cell with fluid above, walls to left and right
                elif bc_mask[i, j + 1] == 0 and bc_mask[i - 1, j] == 1 and bc_mask[i + 1, j] == 1:
                    pc[i, j] = sample(pc, i, j + 1)
                # Corner cases - average values from two adjacent fluid cells
                # Case 5: Wall cell with fluid to the left and above
                elif bc_mask[i - 1, j] == 0 and bc_mask[i, j + 1] == 0:
                    pc[i, j] = (sample(pc, i - 1, j) + sample(pc, i, j + 1)) / 2.0
                # Case 6: Wall cell with fluid to the right and above
                elif bc_mask[i + 1, j] == 0 and bc_mask[i, j + 1] == 0:
                    pc[i, j] = (sample(pc, i + 1, j) + sample(pc, i, j + 1)) / 2.0
                # Case 7: Wall cell with fluid to the left and below
                elif bc_mask[i - 1, j] == 0 and bc_mask[i, j - 1] == 0:
                    pc[i, j] = (sample(pc, i - 1, j) + sample(pc, i, j - 1)) / 2.0
                # Case 8: Wall cell with fluid to the right and below
                elif bc_mask[i + 1, j] == 0 and bc_mask[i, j - 1] == 0:
                    pc[i, j] = (sample(pc, i + 1, j) + sample(pc, i, j - 1)) / 2.0
            elif bc_mask[i, j] == 2:  # Inflow boundary
                # Sample from the cell to the right (inside the domain)
                pc[i, j] = sample(pc, i + 1, j)
            elif bc_mask[i, j] == 3:  # Outflow boundary
                # Set pressure to zero at outflow
                pc[i, j] = 0.0

    @ti.func
    def is_wall(self, i, j):
        """Check if the cell at position (i,j) is a wall cell"""
        return self._bc_mask[i, j] == 1

    @ti.func
    def is_fluid_domain(self, i, j):
        """Check if the cell at position (i,j) is part of the fluid domain"""
        return self._bc_mask[i, j] == 0

    def get_resolution(self):
        """Return the grid resolution"""
        return self._bc_const.shape[:2]

    @staticmethod
    def to_field(bc, bc_mask):
        """
        Convert NumPy arrays to Taichi fields
        
        Args:
            bc: NumPy array containing boundary condition values
            bc_mask: NumPy array containing boundary condition types
            
        Returns:
            bc_field: Taichi vector field for boundary conditions
            bc_mask_field: Taichi field for boundary condition types
        """
        bc_field = ti.Vector.field(2, ti.f32, shape=bc.shape[:2])
        bc_field.from_numpy(bc)
        bc_mask_field = ti.field(ti.u8, shape=bc_mask.shape[:2])
        bc_mask_field.from_numpy(bc_mask)

        return bc_field, bc_mask_field

def create_bc_array(x_resolution, y_resolution):
    """
    Create arrays for boundary conditions
    
    Args:
        x_resolution: Width of the grid
        y_resolution: Height of the grid
        
    Returns:
        bc: Array for velocity boundary conditions
        bc_mask: Array for boundary type masks
        bc_dye: Array for dye colors at boundaries
    """
    bc = np.zeros((x_resolution, y_resolution, 2), dtype=np.float32)
    bc_mask = np.zeros((x_resolution, y_resolution), dtype=np.uint8)
    bc_dye = np.zeros((x_resolution, y_resolution, 3), dtype=np.float32)

    return bc, bc_mask, bc_dye


def create_color_map(color_list, n_samples):
    """
    Create a smooth color gradient from a list of colors
    
    Args:
        color_list: List of RGB color values
        n_samples: Number of samples in the output color map
        
    Returns:
        Interpolated color map array of shape (n_samples, 3)
    """
    color_arr = np.vstack(color_list)
    x = np.linspace(0.0, 1.0, color_arr.shape[0], endpoint=True)

    # Interpolate each color channel separately
    x_ = np.linspace(0.0, 1.0, n_samples, endpoint=True)
    r_arr = np.interp(x_, x, color_arr[:, 0])
    g_arr = np.interp(x_, x, color_arr[:, 1])
    b_arr = np.interp(x_, x, color_arr[:, 2])

    return np.vstack((r_arr, g_arr, b_arr)).T

def set_plane(bc, bc_mask, bc_dye, lower_left, upper_right):
    """
    Sets a rectangular plane as a boundary condition.
    
    Args:
        bc: Boundary condition velocity array
        bc_mask: Boundary condition type mask
        bc_dye: Boundary condition dye color array
        lower_left: Tuple (x,y) of lower left corner of the plane
        upper_right: Tuple (x,y) of upper right corner of the plane
    """
    bc[lower_left[0] : upper_right[0], lower_left[1] : upper_right[1]] = np.array([0.0, 0.0])
    bc_mask[lower_left[0] : upper_right[0], lower_left[1] : upper_right[1]] = 1
    bc_dye[lower_left[0] : upper_right[0], lower_left[1] : upper_right[1]] = np.array(
        [0.0, 0.0, 0.0]
    )

def set_circle(bc, bc_mask, bc_dye, center, radius):
    """
    Sets a circular boundary condition.
    
    Args:
        bc: Boundary condition velocity array
        bc_mask: Boundary condition type mask
        bc_dye: Boundary condition dye color array
        center: Tuple (x,y) of circle center
        radius: Radius of the circle
    """
    center = np.asarray(center)
    # Calculate the lower bound, ensuring it's not negative
    l_ = np.round(np.maximum(center - radius, 0)).astype(np.int32)
    # Calculate the upper bounds, ensuring they don't exceed array dimensions
    u0 = round(min(center[0] + radius, bc.shape[0]))
    u1 = round(min(center[1] + radius, bc.shape[1]))
    # Iterate through the bounding box of the circle
    for i in range(l_[0], u0):
        for j in range(l_[1], u1):
            # Add 0.5 to get the center of the cell
            x = np.array([i, j]) + 0.5
            # Check if the point is inside the circle
            if np.linalg.norm(x - center) < radius:
                bc[i, j] = np.array([0.0, 0.0])
                bc_mask[i, j] = 1
                bc_dye[i, j] = np.array([0.0, 0.0, 0.0])



def create_boundary_condition(resolution, inflow, outflow, no_dye=True):
    """
    Creates boundary conditions for fluid simulation.
    
    Args:
        resolution: Grid resolution (square grid)
        no_dye: Flag to control dye behavior (unused in this function)
        
    Returns:
        boundary_condition: BoundaryCondition object with configured boundaries
        
    Mask values:
        1: Wall
        2: Inflow
        3: Outflow
    """
    # Set up grid dimensions
    x_res, y_res = resolution, resolution
    bc, bc_mask, bc_dye = create_bc_array(x_res, y_res)

    # Define inflow boundary condition
    def set_inflow():
        # Set horizontal flow at the left edge
        bc[:2, :] = np.array([1.0, 0.0])
        bc_mask[:2, :] = 2

        # Define colors for dye visualization
        y = np.array([1.1, 1.1, 0.2])  # Yellow
        b = np.array([0.2, 0.2, 1.1])  # Blue
        r = np.array([1.1, 0.2, 0.2])  # Red
        c = np.array([0.2, 1.1, 1.1])  # Cyan
        # Create repeating color pattern for inflow
        color_map = create_color_map([c, r, b, y] * 3, bc_dye.shape[1])
        bc_dye[:2, :] = np.stack((color_map, color_map), axis=0)

    # Define outflow boundary condition
    def set_outflow():
        # Set right edge as outflow
        bc[-1, :] = np.array([0.0, 0.0])
        bc_mask[-1, :] = 3

    # Define wall boundary conditions
    def set_wall():
        # Set top and bottom walls
        set_plane(bc, bc_mask, bc_dye, (0, 0), (x_res, 2))  # Bottom wall
        set_plane(bc, bc_mask, bc_dye, (0, y_res - 2), (x_res, y_res))  # Top wall

        # Add a circular obstacle (cylinder) in the flow
        r = y_res // 18  # Radius of cylinder
        c = (x_res // 4, y_res // 2)  # Position of cylinder (1/4 from left, middle height)
        set_circle(bc, bc_mask, bc_dye, c, r)

    # Apply all boundary conditions
    set_inflow()
    set_outflow()
    set_wall()

    # Create and return the boundary condition object
    boundary_condition = BoundaryCondition(bc, bc_mask, inflow, outflow)
    
    return boundary_condition

$$\vec v = \vec v + dt . (-f_{advect}-\vec\nabla P + \Delta \vec v /Re )$$

In [5]:
@ti.kernel
def update_velocities(vn: ti.template(), vc: ti.template(), pc: ti.template()):
    for i, j in vn:
        if is_fluid_domain(i, j):
            vn[i, j] = vc[i, j] + dt * (
                - advect(vc, vc, i, j, dx)
                - ti.Vector(
                    [
                        diff_x(pc, i, j, dx),
                        diff_y(pc, i, j, dx),
                    ]
                )
                + (diff2_x(vc, i, j, dx) + diff2_y(vc, i, j, dx)) / Re
            )

## Pressure updater

In [2]:
class PressureUpdater(metaclass=ABCMeta):
    """Abstract base class for pressure update algorithms in fluid simulations.
    
    This class defines the interface for pressure update strategies and provides
    common attributes needed for pressure calculations.
    """
    def __init__(self, boundary_condition, dt, dx):
        """Initialize the pressure updater.
        
        Args:
            boundary_condition: Object handling boundary conditions
            dt: Time step size
            dx: Grid cell size
        """
        self._bc = boundary_condition
        self.dt = dt
        self.dx = dx

    @abstractmethod
    def update(self, p, v_current):
        """Update pressure field based on current velocity.
        
        Args:
            p: Pressure field to update
            v_current: Current velocity field
        """
        pass


@ti.func
def predict_p(pc, vc, i, j, dt, dx):
    """Predict new pressure value for a cell using finite difference approximation.
    
    Implements a discretized form of the pressure Poisson equation that accounts for
    both the Laplacian of pressure and velocity divergence terms.
    
    Args:
        pc: Current pressure field
        vc: Current velocity field
        i, j: Grid cell indices
        dt: Time step size
        dx: Grid cell size
        
    Returns:
        Predicted pressure value for cell (i,j)
    """
    # Calculate velocity gradients using central differences
    sub_x = sample(vc, i + 1, j) - sample(vc, i - 1, j)  # ∂v/∂x components
    sub_y = sample(vc, i, j + 1) - sample(vc, i, j - 1)  # ∂v/∂y components

    # Calculate predicted pressure using discretized Poisson equation
    pred_p = (
        # Laplacian term (average of neighboring pressures)
        0.25
        * (
            sample(pc, i + 1, j)    # Right neighbor
            + sample(pc, i - 1, j)  # Left neighbor
            + sample(pc, i, j + 1)  # Top neighbor
            + sample(pc, i, j - 1)  # Bottom neighbor
        )
        # Quadratic velocity gradient terms (from non-linear advection)
        + (sub_x.x**2 + sub_y.y**2 + (sub_y.x * sub_x.y)) / 8.0
        # Divergence correction term
        - dx * (sub_x.x + sub_y.y) / (8 * dt)
    )

    return pred_p

@ti.data_oriented
class JacobiPressureUpdater(PressureUpdater):
    """Jacobi Method for iteratively solving the pressure Poisson equation"""

    def __init__(self, boundary_condition, dt, dx, n_iter):
        """Initialize the Jacobi pressure solver
        
        Args:
            boundary_condition: Object handling boundary conditions
            dt: Time step size
            dx: Grid cell size
            n_iter: Number of Jacobi iterations to perform
        """
        super().__init__(boundary_condition, dt, dx)

        self._n_iter = n_iter

    def update(self, p, v_current):
        """Update pressure field using Jacobi iteration
        
        Performs multiple iterations of the Jacobi method to solve
        the pressure Poisson equation.
        
        Args:
            p: Pressure field object with current and next states
            v_current: Current velocity field
        """
        for _ in range(self._n_iter):
            self._bc.set_pressure_boundary_condition(p.current)  # Apply boundary conditions
            self._update(p.next, p.current, v_current)  # Perform one Jacobi iteration
            p.swap()  # Swap current and next pressure fields for next iteration

    @ti.kernel
    def _update(self, p_next: ti.template(), p_current: ti.template(), v_current: ti.template()):
        """Single iteration of Jacobi method
        
        Updates each pressure cell based on its neighbors and velocity divergence.
        
        Args:
            p_next: Field to store updated pressure values
            p_current: Field containing current pressure values
            v_current: Current velocity field
        """
        for i, j in p_next:
            if not self._bc.is_wall(i, j):  # Skip wall cells
                p_next[i, j] = predict_p(p_current, v_current, i, j, self.dt, self.dx)  # Calculate new pressure value
                
@ti.data_oriented
class RedBlackSorPressureUpdater(PressureUpdater):
    """Red-Black SOR Method
    
    This class implements the Red-Black Successive Over-Relaxation (SOR) method
    for solving the pressure Poisson equation. The method alternates between updating
    "red" cells (where i+j is odd) and "black" cells (where i+j is even) to improve
    convergence speed compared to standard iterative methods.
    """

    def __init__(self, boundary_condition, dt, dx, relaxation_factor, n_iter):
        """Initialize the Red-Black SOR pressure updater
        
        Args:
            boundary_condition: Object handling boundary conditions
            dt: Time step size
            dx: Grid cell size
            relaxation_factor: SOR relaxation parameter (typically between 1.0 and 2.0)
            n_iter: Number of iterations to perform
        """
        super().__init__(boundary_condition, dt, dx)

        self._n_iter = n_iter
        self._relaxation_factor = relaxation_factor

    def update(self, p, v_current):
        """Update pressure field using Red-Black SOR method
        
        Args:
            p: Pressure field object with current and next states
            v_current: Current velocity field
        """
        for _ in range(self._n_iter):
            self._bc.set_pressure_boundary_condition(p.current)
            self._update(p.next, p.current, v_current)
            p.swap()

    def _update(self, p_next, p_current, v_current):
        """Perform one iteration of Red-Black SOR
        
        First updates all "red" cells (i+j is odd), then updates all "black" cells (i+j is even).
        Note that for black cells, we use the already updated values from red cells.
        
        Args:
            p_next: Field to store updated pressure values
            p_current: Field containing current pressure values
            v_current: Current velocity field
        """
        # The pressure field could use just one buffer, but we use two for interface consistency
        # If only one field is needed, pass the same field for both p_next and p_current
        self._update_pressures_odd(p_next, p_current, v_current)  # Update red cells
        self._update_pressures_even(p_next, p_next, v_current)    # Update black cells using updated red values

    @ti.kernel
    def _update_pressures_odd(self, pn: ti.template(), pc: ti.template(), vc: ti.template()):
        """Update pressure values for odd (red) cells
        
        Args:
            pn: Field to store updated pressure values
            pc: Field containing current pressure values
            vc: Current velocity field
        """
        for i, j in pn:
            if (i + j) % 2 == 1:  # Red cells (odd sum of indices)
                if self._bc.is_fluid_domain(i, j):
                    pn[i, j] = self._pn_ij(pc, vc, i, j)

    @ti.kernel
    def _update_pressures_even(self, pn: ti.template(), pc: ti.template(), vc: ti.template()):
        """Update pressure values for even (black) cells
        
        Args:
            pn: Field to store updated pressure values
            pc: Field containing current pressure values (typically already has updated red cells)
            vc: Current velocity field
        """
        for i, j in pn:
            if (i + j) % 2 == 0:  # Black cells (even sum of indices)
                if self._bc.is_fluid_domain(i, j):
                    pn[i, j] = self._pn_ij(pc, vc, i, j)

    @ti.func
    def _pn_ij(self, pc, vc, i, j):
        """Calculate new pressure value using SOR formula
        
        Combines the current pressure value with the predicted pressure using the
        relaxation factor to accelerate convergence.
        
        Args:
            pc: Field containing current pressure values
            vc: Current velocity field
            i, j: Grid cell indices
            
        Returns:
            Updated pressure value for cell (i,j)
        """
        return (1.0 - self._relaxation_factor) * pc[i, j] + self._relaxation_factor * predict_p(
            pc, vc, i, j, self.dt, self.dx
        )

NameError: name 'ABCMeta' is not defined

## Vorticity confinement

In [1]:
@ti.data_oriented
class VorticityConfinement:
    """
    Class implementing vorticity confinement technique to preserve fluid vortices
    that might be lost due to numerical dissipation in fluid simulations.
    """
    def __init__(self, boundary_condition, dt, dx, weight):
        """
        Initialize the vorticity confinement solver.
        
        Args:
            boundary_condition: Object defining the fluid domain boundaries
            dt: Time step size
            dx: Grid cell size
            weight: Strength of the vorticity confinement effect
        """
        self._bc = boundary_condition
        self.dt = dt
        self.dx = dx
        self.weight = weight

        self._resolution = boundary_condition.get_resolution()

        # Fields to store vorticity and its absolute value
        self.vorticity = ti.field(ti.f32, shape=self._resolution)
        self.vorticity_abs = ti.field(ti.f32, shape=self._resolution)

    @ti.kernel
    def _calc_vorticity(self, vc: ti.template()):
        """
        Calculate the vorticity field from the velocity field.
        
        Args:
            vc: Current velocity field
        """
        for i, j in self.vorticity:
            if self._bc.is_fluid_domain(i, j):
                # Vorticity is defined as the curl of velocity (∇ × v)
                # In 2D, this is just (∂v_y/∂x - ∂v_x/∂y)
                self.vorticity[i, j] = diff_x(vc, i, j, self.dx).y - diff_y(vc, i, j, self.dx).x
                self.vorticity_abs[i, j] = ti.abs(self.vorticity[i, j])

    @ti.kernel
    def _add_vorticity(
        self,
        vn: ti.template(),
        vc: ti.template(),
    ):
        """
        Apply vorticity confinement forces to the velocity field.
        
        Args:
            vn: Next velocity field (output)
            vc: Current velocity field (input)
        """
        for i, j in vn:
            if self._bc.is_fluid_domain(i, j):
                # Add vorticity confinement force to the current velocity
                vn[i, j] = vc[i, j] + self.dt * self.weight * self._vorticity_vec(i, j)

    @ti.func
    def _vorticity_vec(self, i, j):
        """
        Calculate the vorticity confinement force vector at position (i,j).
        
        Args:
            i, j: Grid position
            
        Returns:
            Vector representing the vorticity confinement force
        """
        # Calculate gradient of vorticity magnitude
        vorticity_grad_vec = ti.Vector(
            [diff_x(self.vorticity_abs, i, j, self.dx), diff_y(self.vorticity_abs, i, j, self.dx)]
        )
        # Normalize the gradient vector
        vorticity_grad_vec = vorticity_grad_vec / vorticity_grad_vec.norm()
        
        # Calculate force vector: N × ω where N is the normalized gradient and ω is vorticity
        # The cross product in 2D becomes (N.y, -N.x) * vorticity
        vorticity_vec = (
            ti.Vector([vorticity_grad_vec.y, -vorticity_grad_vec.x]) * self.vorticity[i, j]
        )

        # Clamp the force vector to prevent numerical instability
        return ti.max(ti.min(vorticity_vec, 0.1), -0.1)

    def apply(self, v):
        """
        Apply vorticity confinement to the velocity field.
        
        Args:
            v: Velocity field object with current and next states
        """
        self._calc_vorticity(v.current)
        self._add_vorticity(v.next, v.current)  # Only update v.next


NameError: name 'ti' is not defined

## Solver

In [8]:
@ti.data_oriented
class Solver(metaclass=ABCMeta):
    def __init__(self, boundary_condition):
        self._bc = boundary_condition
        self._resolution = boundary_condition.get_resolution()

    @abstractmethod
    def update(self):
        pass

    @abstractmethod
    def get_fields(self):
        pass

    @ti.func
    def is_wall(self, i, j):
        return self._bc.is_wall(i, j)

    @ti.func
    def is_fluid_domain(self, i, j):
        return self._bc.is_fluid_domain(i, j)


@ti.kernel
def limit_field(field: ti.template(), limit: ti.f32):
    for i, j in field:
        norm = field[i, j].norm()
        if norm > limit:
            field[i, j] = limit * (field[i, j] / norm)


@ti.kernel
def clamp_field(field: ti.template(), low: ti.f32, high: ti.f32):
    for i, j in field:
        field[i, j] = ti.min(ti.max(field[i, j], low), high)

@ti.data_oriented
class MacSolver(Solver):
    """Maker And Cell method"""

    def __init__(
        self,
        boundary_condition,
        pressure_updater,
        advect_function,
        dt,
        dx,
        Re,
        vorticity_confinement=None,
    ):
        super().__init__(boundary_condition)

        self._advect = advect_function
        self.dt = dt
        self.dx = dx
        self.Re = Re

        self.pressure_updater = pressure_updater
        self.vorticity_confinement = vorticity_confinement

        self.v = DoubleBuffers(self._resolution, 2)  # velocity
        self.p = DoubleBuffers(self._resolution, 1)  # pressure

    def update(self, mouse_data):

        self._bc.set_velocity_boundary_condition(self.v.current)
        self._update_velocities(self.v.next, self.v.current, self.p.current)
        
        self.v.swap()

        if self.vorticity_confinement is not None:
            self.vorticity_confinement.apply(self.v)
            self.v.swap()
        apply_impulse(self.v.current, mouse_data)
        self.pressure_updater.update(self.p, self.v.current)

        limit_field(self.v.current, VELOCITY_LIMIT)

    def get_fields(self):
        return self.v.current, self.p.current

    # Application of the Navier-Stoke Equation
    @ti.kernel
    def _update_velocities(self, vn: ti.template(), vc: ti.template(), pc: ti.template()):
        for i, j in vn:
            if self.is_fluid_domain(i, j):
                vn[i, j] = vc[i, j] + self.dt * (
                    -self._advect(vc, vc, i, j, self.dx)
                    - ti.Vector(
                        [
                            diff_x(pc, i, j, self.dx),
                            diff_y(pc, i, j, self.dx),
                        ]
                    )
                    + (diff2_x(vc, i, j, self.dx) + diff2_y(vc, i, j, self.dx)) / self.Re
                )

### CipMac Solver

We calculate for each point of the grid :
$$ \begin{align}
K_1 = F_{i,j} - F_{im,j} - F_{i,jm} + F_{im,jm} \\
K_2 = F_{im,j} - F_{i,j} \\
K_3 = F_{i,jm} - F_{i,j}
\end{align} $$

Main advection equations : 
$$ \begin{align}
a_{i,j} = (\partial v_{x[im,j]} + \partial v_{x[i,j]} . dx - 2.(- K_{2[i,j]}))/i_d \\
b_{i,j} = (\partial v_{y[i,jm]} + \partial v_{y[i,j]} . dx - 2.(- K_{3[i,j]}))/j_d \\
c_{i,j} = ((-K_{1[i,j]}) - i_s . (\partial v_{x[i,jm]} - \partial v_{x[i,j]}) . dx)/j_d \\
d_{i,j} = ((-K_{1[i,j]}) - j_s . (\partial v_{y[im,j]} - \partial v_{y[i,j]}) . dx)/i_d \\
e_{i,j} = (3.K_{2[i,j]}+ i_s.(\partial v_{x[im,j]} - 2.\partial v_{x[i,j]})) / dx^2 \\
f_{i,j} = (3.K_{2[i,j]}+ j_m.(\partial v_{y[i,jm]} - 2.\partial v_{y[i,j]})) / dx^2 \\
g_{i,j} = (-(\partial v_{y[i,jm]} - 2.\partial v_{y[i,j]})+c . dx^2)/(i_s.dx)
\end{align} $$


In [9]:
@ti.data_oriented
class CipMacSolver(Solver):
    """Maker And Cell method"""

    def __init__(
        self, boundary_condition, pressure_updater, dt, dx, Re, vorticity_confinement=None
    ):
        super().__init__(boundary_condition)
        self.dt = dt
        self.dx = dx
        self.Re = Re

        self.pressure_updater = pressure_updater
        self.vorticity_confinement = vorticity_confinement

        self.v = DoubleBuffers(self._resolution, 2)  # velocity
        self.vx = DoubleBuffers(self._resolution, 2)  # velocity gradient x
        self.vy = DoubleBuffers(self._resolution, 2)  # velocity gradient y
        self.p = DoubleBuffers(self._resolution, 1)  # pressure

        self._set_grad(self.vx.current, self.vy.current, self.v.current)

    def update(self, mouse_data):

        apply_impulse(self.v.current, mouse_data)
        self._bc.set_velocity_boundary_condition(self.v.current)
        self._update_velocities(self.v, self.vx, self.vy, self.p)

        if self.vorticity_confinement is not None:
            self.vorticity_confinement.apply(self.v)
            self.v.swap()

        self.pressure_updater.update(self.p, self.v.current)

        limit_field(self.v.current, VELOCITY_LIMIT)

    def get_fields(self):
        return self.v.current, self.p.current

    @ti.kernel
    def _set_grad(self, fx: ti.template(), fy: ti.template(), f: ti.template()):
        for i, j in fx:
            fx[i, j] = diff_x(f, i, j, self.dx)
            fy[i, j] = diff_y(f, i, j, self.dx)

    def _update_velocities(self, v, vx, vy, p):
        self._non_advection_phase(v.next, v.current, p.current)
        self._non_advection_phase_grad(vx.next, vy.next, vx.current, vy.current, v.current, v.next)
        v.swap()
        vx.swap()
        vy.swap()

        self._advection_phase(
            v.next, vx.next, vy.next, v.current, vx.current, vy.current, v.current
        )
        v.swap()
        vx.swap()
        vy.swap()

    @ti.kernel
    def _non_advection_phase(
        self,
        fn: ti.template(),
        fc: ti.template(),
        pc: ti.template(),
    ):
        """中間量の計算"""
        for i, j in fn:
            if not self.is_wall(i, j):
                G = -ti.Vector(
                    [
                        diff_x(pc, i, j, self.dx),
                        diff_y(pc, i, j, self.dx),
                    ]
                ) + self._calc_diffusion(fc, i, j)
                fn[i, j] = fc[i, j] + G * self.dt

    @ti.kernel
    def _non_advection_phase_grad(
        self,
        fxn: ti.template(),
        fyn: ti.template(),
        fxc: ti.template(),
        fyc: ti.template(),
        fc: ti.template(),
        fn: ti.template(),
    ):
        """中間量の勾配の計算"""
        for i, j in fn:
            if not self.is_wall(i, j):
                # 勾配の更新
                fxn[i, j] = fxc[i, j] + (
                    fn[i + 1, j] - fc[i + 1, j] - fn[i - 1, j] + fc[i - 1, j]
                ) / (2.0 * self.dx)
                fyn[i, j] = fyc[i, j] + (
                    fn[i, j + 1] - fc[i, j + 1] - fn[i, j - 1] + fc[i, j - 1]
                ) / (2.0 * self.dx)

    @ti.func
    def _calc_diffusion(self, fc, i, j):
        return (diff2_x(fc, i, j, self.dx) + diff2_y(fc, i, j, self.dx)) / self.Re

    @ti.kernel
    def _advection_phase(
        self,
        fn: ti.template(),  # Output field after advection
        fxn: ti.template(), # Output x-gradient field after advection
        fyn: ti.template(), # Output y-gradient field after advection
        fc: ti.template(),  # Current field values
        fxc: ti.template(), # Current x-gradient field values
        fyc: ti.template(), # Current y-gradient field values
        v: ti.template(),   # Velocity field
    ):
        # Iterate through all cells in the field
        for i, j in fn:
            # Only process fluid cells
            if self.is_fluid_domain(i, j):
                self._cip_advect(fn, fxn, fyn, fc, fxc, fyc, v, i, j)

    @ti.func
    def _cip_advect(self, fn, fxn, fyn, fc, fxc, fyc, v, i, j):
        # Determine direction of flow (positive or negative in x and y)
        i_s = int(sign(v[i, j].x))  # Sign of x velocity component
        j_s = int(sign(v[i, j].y))  # Sign of y velocity component
        i_m = i - i_s  # Upwind cell index in x direction
        j_m = j - j_s  # Upwind cell index in y direction

        # Calculate intermediate values for CIP coefficients
        tmp1 = fc[i, j] - fc[i, j_m] - fc[i_m, j] + fc[i_m, j_m]  # Mixed second derivative approximation
        tmp2 = fc[i_m, j] - fc[i, j]  # First derivative in x direction
        tmp3 = fc[i, j_m] - fc[i, j]  # First derivative in y direction

        # Denominators for coefficient calculations
        i_s_denom = i_s * self.dx**3
        j_s_denom = j_s * self.dx**3

        # Calculate CIP interpolation coefficients (a through g)
        # These coefficients define the cubic interpolation polynomial
        a = (i_s * (fxc[i_m, j] + fxc[i, j]) * self.dx - 2.0 * (-tmp2)) / i_s_denom
        b = (j_s * (fyc[i, j_m] + fyc[i, j]) * self.dx - 2.0 * (-tmp3)) / j_s_denom
        c = (-tmp1 - i_s * (fxc[i, j_m] - fxc[i, j]) * self.dx) / j_s_denom
        d = (-tmp1 - j_s * (fyc[i_m, j] - fyc[i, j]) * self.dx) / i_s_denom
        e = (3.0 * tmp2 + i_s * (fxc[i_m, j] + 2.0 * fxc[i, j]) * self.dx) / self.dx**2
        f = (3.0 * tmp3 + j_s * (fyc[i, j_m] + 2.0 * fyc[i, j]) * self.dx) / self.dx**2
        g = (-(fyc[i_m, j] - fyc[i, j]) + c * self.dx**2) / (i_s * self.dx)

        # Calculate displacement based on velocity and timestep
        X = -v[i, j].x * self.dt  # Displacement in x direction
        Y = -v[i, j].y * self.dt  # Displacement in y direction

        # Update field value using CIP interpolation
        # This evaluates the cubic polynomial at position (X,Y)
        fn[i, j] = (
            ((a * X + c * Y + e) * X + g * Y + fxc[i, j]) * X
            + ((b * Y + d * X + f) * Y + fyc[i, j]) * Y
            + fc[i, j]
        )

        # Update gradient values
        # Calculate derivatives of the interpolation polynomial at (X,Y)
        Fx = (3.0 * a * X + 2.0 * c * Y + 2.0 * e) * X + (d * Y + g) * Y + fxc[i, j]
        Fy = (3.0 * b * Y + 2.0 * d * X + 2.0 * f) * Y + (c * X + g) * X + fyc[i, j]

        # Calculate velocity gradients
        dx = diff_x(v, i, j, self.dx)  # Velocity gradient in x direction
        dy = diff_y(v, i, j, self.dx)  # Velocity gradient in y direction
        
        # Apply correction for gradient advection (accounts for deformation of the flow)
        fxn[i, j] = Fx - self.dt * (Fx * dx.x + Fy * dx.y) / 2.0
        fyn[i, j] = Fy - self.dt * (Fx * dy.x + Fy * dy.y) / 2.0

## External actions

In [10]:
class MouseDataGen:
    def __init__(self):
        self.prev_mouse = None
        self.prev_color = None

    def __call__(self, gui):
        # [0:2]: normalized delta direction
        # [2:4]: current mouse xy
        # [4:7]: color
        mouse_data = np.zeros(8, dtype=np.float32)
        if gui.is_pressed(ti.GUI.LMB):
            # print("touch", end=" ")
            mxy = np.array(gui.get_cursor_pos(), dtype=np.float32) * res
            if self.prev_mouse is None:
                self.prev_mouse = mxy
                # Set lower bound to 0.3 to prevent too dark colors
                self.prev_color = (np.random.rand(3) * 0.7) + 0.3
            else:
                mdir = mxy - self.prev_mouse
                mdir = mdir / (np.linalg.norm(mdir) + 1e-5)
                mouse_data[0], mouse_data[1] = mdir[0], mdir[1]
                mouse_data[2], mouse_data[3] = mxy[0], mxy[1]
                mouse_data[4:7] = self.prev_color
                self.prev_mouse = mxy
                # print("the mouse move")
        else:
            self.prev_mouse = None
            self.prev_color = None
        return mouse_data    

@ti.kernel
def apply_impulse(vf: ti.template(), imp_data: ti.types.ndarray()):
    
    g_dir = -ti.Vector([0, 9.8]) * 300

    force_radius = 20.0
    f_strength = 10.0
    
    for i, j in vf:
        omx, omy = imp_data[2], imp_data[3]
        mdir = ti.Vector([imp_data[0], imp_data[1]])
        dx, dy = (i + 0.5 - omx), (j + 0.5 - omy)
        d2 = dx * dx + dy * dy
        # dv = F * dt
        factor = ti.exp(-d2 / force_radius)

        #dc = dyef[i, j]
        #a = dc.norm()

        # momentum = (mdir * f_strength * factor + g_dir * a / (1 + a)) * dt
        momentum = (mdir * f_strength * factor ) * dt

        v = vf[i, j]
        vf[i, j] = v + momentum
        
        # add dye
        #if mdir.norm() > 0.5:
        #    dc += ti.exp(-d2 * (4 / (res / 15) ** 2)) * ti.Vector([imp_data[4], imp_data[5], imp_data[6]])

        #dyef[i, j] = dc


## Fluid Simulator

In [11]:
@ti.data_oriented
class FluidSimulator:
    def __init__(self, solver):
        self._solver = solver
        self.rgb_buf = ti.Vector.field(3, ti.f32, shape=solver._resolution)  # image buffer
        self._wall_color = ti.Vector([0.5, 0.7, 0.5])

    def step(self, mouse_data):
        self._solver.update(mouse_data)

    def get_norm_field(self):
        self._to_norm(self.rgb_buf, *self._solver.get_fields()[:2])
        return self.rgb_buf

    def get_pressure_field(self):
        self._to_pressure(self.rgb_buf, self._solver.get_fields()[1])
        return self.rgb_buf

    def get_vorticity_field(self):
        self._to_vorticity(self.rgb_buf, self._solver.get_fields()[0])
        return self.rgb_buf

    def field_to_numpy(self):
        fields = self._solver.get_fields()
        return {"v": fields[0].to_numpy(), "p": fields[1].to_numpy()}

    @ti.kernel
    def _to_norm(self, rgb_buf: ti.template(), vc: ti.template(), pc: ti.template()):
        for i, j in rgb_buf:
            rgb_buf[i, j] = 0.2 * visualize_norm(vc[i, j])
            rgb_buf[i, j] += 0.002 * visualize_pressure(pc[i, j])
            if self._solver.is_wall(i, j):
                rgb_buf[i, j] = self._wall_color

    @ti.kernel
    def _to_pressure(self, rgb_buf: ti.template(), pc: ti.template()):
        for i, j in rgb_buf:
            rgb_buf[i, j] = 0.04 * visualize_pressure(pc[i, j])
            if self._solver.is_wall(i, j):
                rgb_buf[i, j] = self._wall_color

    @ti.kernel
    def _to_vorticity(self, rgb_buf: ti.template(), vc: ti.template()):
        for i, j in rgb_buf:
            rgb_buf[i, j] = 0.005 * visualize_vorticity(vc, i, j, self._solver.dx)
            if self._solver.is_wall(i, j):
                rgb_buf[i, j] = self._wall_color

    @staticmethod
    def create(num, resolution, dt, dx, re, vor_eps, inflow, outflow):
        boundary_condition = create_boundary_condition(resolution, inflow, outflow)
        vorticity_confinement = (
            VorticityConfinement(boundary_condition, dt, dx, vor_eps)
            if vor_eps is not None
            else None
        )
        pressure_updater = RedBlackSorPressureUpdater(
            boundary_condition, dt, dx, relaxation_factor=1.3, n_iter=2
        )

        solver_1 = MacSolver(
            boundary_condition,
            pressure_updater,
            advect_kk_scheme,
            dt,
            dx,
            re,
            vorticity_confinement,
        )

        solver_2 = CipMacSolver(
            boundary_condition,
            pressure_updater,
            dt,
            dx,
            re,
            vorticity_confinement,
        )

        return FluidSimulator(solver_2)

## Main functions

In [None]:
# Definition of the main function

res = 400
dt = 0.0005
Re = 1000
dx = 1 / res

def main():

    n_bc = 1     # Boundary condition
    vor_eps = 50.0  # Weight of the vorticity
    n_vis = 0 
    vis_num = 2    # Visualisation parameter
    inflow = 10.0
    outflow = 10.0

    window = ti.GUI("Fluid Simulation", ( res, res))
    #canvas = window.get_canvas()

    print("Generation of the simulator")
    fluid_sim = FluidSimulator.create(n_bc, res, dt, dx, Re, vor_eps, inflow, outflow)

    output_path = Path(os.getcwd()).resolve() / "output"

    md_gen = MouseDataGen()

    # video_manager = ti.tools.VideoManager(output_dir=str(img_path), framerate=30, automatic_build=False)

    step = 0
    ss_count = 0
    paused = False
    print("Loop started")
    
    while window.running:

        ti.profiler.clear_kernel_profiler_info()
        
        if step % 5 == 0:
            if vis_num == 0:
                img = fluid_sim.get_norm_field()
            elif vis_num == 1:
                img = fluid_sim.get_pressure_field()
            elif vis_num == 2:
                img = fluid_sim.get_vorticity_field()
            else:
                raise NotImplementedError()

            window.set_image(img)
            window.show()

            # video_manager.write_frame(img)

        if not paused:
            mouse_data = md_gen(window)
            fluid_sim.step(mouse_data)

        if window.get_event(ti.GUI.PRESS):
            e = window.event
            if e.key == ti.GUI.ESCAPE:
                break
            elif e.key == "p":
                print("paused")
                paused = not paused
            elif e.key == "v":
                vis_num = (vis_num + 1) % n_vis
            elif e.key == "s":
                print("saved")
                output_path.mkdir(exist_ok=True)
                ti.tools.imwrite(img, str(output_path / f"{ss_count:04}.png"))
                ss_count += 1
            elif e.key == "d":
                output_path.mkdir(exist_ok=True)
                fields = fluid_sim.field_to_numpy()
                np.savez(str(output_path / f"step_{step:06}.npz"), **fields)

        step += 1

    window.close()

    ti.profiler.print_kernel_profiler_info('trace')

    # video_manager.make_video(mp4=True)


if __name__ == "__main__":

    main()


Generation of the simulator
Loop started
