2D smoothed particle hydrodynamics (SPH) physics engine testbench, adapted from https://github.com/AlexandreSajus/Python-Fluid-Simulation

In [21]:
from math import sqrt, cos, sin, pi
import numpy as np
from matplotlib import animation
from matplotlib.colors import ListedColormap
import matplotlib.pyplot as plt
import random

%matplotlib qt

In [22]:
"""Configuration file defining the simulation parameters."""

# Simulation parameters
N = 250  # Number of particles
SIM_W = 0.8  # Simulation space width//2 (actually the radius lol)
SIM_H = 0.9  # Simulation height
BOTTOM = 0  # Simulation space ground
TOP = SIM_H # Simulation ceiling ground

# Physics parameters
G_MAG = 0.02 * 0.25  # Acceleration of gravity (Magnitude)
G_ANG = -1/2*pi # Acceleration of gravity (Direction)
G_X = cos(G_ANG) * G_MAG
G_Y = sin(G_ANG) * G_MAG


SPACING = 0.12  # Spacing between particles, used to calculate pressure
K = SPACING / 1000.0  # Pressure factor
K_NEAR = K * 10  # Near pressure factor, pressure when particles are close to each other
# Default density, will be compared to local density to calculate pressure
REST_DENSITY = 1.0
# Neighbour radius, if the distance between two particles is less than R, they are neighbours
R = SPACING * 1.25
SIGMA = 0.2  # Viscosity factor
MAX_VEL = 2.0  # Maximum velocity of particles, used to avoid instability
# Wall constraints factor, how much the particle is pushed away from the simulation walls
WALL_DAMP = 1
VEL_DAMP = 0.5  # Velocity reduction factor when particles are going above MAX_VEL

CELL_SIZE = 0.1


In [23]:
class Particle:
    """
    A single particle of the simulated fluid

    Attributes:
        x_pos: x position of the particle
        y_pos: y position of the particle
        previous_x_pos: x position of the particle in the previous frame
        previous_y_pos: y position of the particle in the previous frame
        visual_x_pos: x position of the particle that is shown on the screen
        visual_y_pos: y position of the particle that is shown on the screen
        rho: density of the particle
        rho_near: near density of the particle, used to avoid collisions between particles
        press: pressure of the particle
        press_near: near pressure of the particle, used to avoid collisions between particles
        neighbors: list of the particle's neighbors
        x_vel: x velocity of the particle
        y_vel: y velocity of the particle
        x_force: x force applied to the particle
        y_force: y force applied to the particle
    """

    def __init__(self, x_pos: float, y_pos: float):
        self.x_pos = x_pos
        self.y_pos = y_pos
        self.previous_x_pos = x_pos
        self.previous_y_pos = y_pos
        self.visual_x_pos = x_pos
        self.visual_y_pos = y_pos
        self.rho = 0.0
        self.rho_near = 0.0
        self.press = 0.0
        self.press_near = 0.0
        self.neighbors = []
        self.x_vel = 0.0
        self.y_vel = 0.0
        self.x_force = G_X
        self.y_force = G_Y

    def update_state(self, g_mag, g_ang):
        """
        Updates the state of the particle, with a (polar) acceleration vector
        """
        
        # Reset previous position
        (self.previous_x_pos, self.previous_y_pos) = (self.x_pos, self.y_pos)

        # Apply force using Newton's second law and Euler integration with mass = 1 and dt = 1
        (self.x_vel, self.y_vel) = (
            self.x_vel + self.x_force,
            self.y_vel + self.y_force,
        )

        # Move particle according to its velocity using Euler integration with dt = 1
        (self.x_pos, self.y_pos) = (self.x_pos + self.x_vel, self.y_pos + self.y_vel)

        # Set visual position. Visual position is the one shown on the screen
        # It is used to avoid unstable particles to be shown
        (self.visual_x_pos, self.visual_y_pos) = (self.x_pos, self.y_pos)

        # Update force with ploar gravity vector
        (self.x_force, self.y_force) = (cos(g_ang) * g_mag, sin(g_ang) * g_mag)

        # Define velocity using Euler integration with dt = 1
        (self.x_vel, self.y_vel) = (
            self.x_pos - self.previous_x_pos,
            self.y_pos - self.previous_y_pos,
        )

        # Calculate velocity
        velocity = sqrt(self.x_vel**2 + self.y_vel**2)

        # Reduces the velocity if it is too high
        if velocity > MAX_VEL:
            self.x_vel *= VEL_DAMP
            self.y_vel *= VEL_DAMP

        # Wall constraints, if a particle is out of bounds, create a spring force to bring it back
        if self.x_pos < -SIM_W:
            self.x_force -= (self.x_pos - -SIM_W) * WALL_DAMP
            self.visual_x_pos = -SIM_W

        # Same thing for the right wall
        if self.x_pos > SIM_W:
            self.x_force -= (self.x_pos - SIM_W) * WALL_DAMP
            self.visual_x_pos = SIM_W

        # Same thing but for the floor
        if self.y_pos < BOTTOM:
            self.y_force -= (self.y_pos - BOTTOM) * WALL_DAMP
            self.visual_y_pos = BOTTOM
        
        # Same thing but for the ceiling
        if self.y_pos > TOP:
            self.y_force -= (self.y_pos - TOP) * WALL_DAMP
            self.visual_y_pos = TOP

        # Reset density
        self.rho = 0.0
        self.rho_near = 0.0

        # Reset neighbors
        self.neighbors = []

    def calculate_pressure(self):
        """
        Calculates the pressure of the particle
        """
        self.press = K * (self.rho - REST_DENSITY)
        self.press_near = K_NEAR * self.rho_near


In [24]:
"""Utilities and physics calculations"""

def start(
    xmin: float, xmax: float, ymin: float, ymax: float, space: float, count: int
) -> list[Particle]:
    """
    Creates a rectangle of particles within xmin, xmax, ymin, ymax
    We start by creating a particle at (xmin, ymin)
    and then add particles until we reach count particles
    Particles are represented by their position [x, y]

    Args:
        xmin (float): x min bound of the rectangle
        xmax (float): x max bound of the rectangle
        ymin (float): y min bound of the rectangle
        space (float): space between particles
        count (int): number of particles

    Returns:
        list: list of Particle objects
    """
    result = []
    for _ in range(count):
        x_pos = random.uniform(xmin, xmax)
        y_pos = random.uniform(ymin, ymax)
        result.append(Particle(x_pos, y_pos))
    return result


def calculate_density(particles: list[Particle]) -> None:
    """
    Calculates density of particles
        Density is calculated by summing the relative distance of neighboring particles
        We distinguish density and near density to avoid particles to collide with each other
        which creates instability

    Args:
        particles (list[Particle]): list of particles
    """
    for i, particle_1 in enumerate(particles):
        density = 0.0
        density_near = 0.0
        # Density is calculated by summing the relative distance of neighboring particles
        for particle_2 in particles[i + 1 :]:
            distance = sqrt(
                (particle_1.x_pos - particle_2.x_pos) ** 2
                + (particle_1.y_pos - particle_2.y_pos) ** 2
            )
            if distance < R:
                # normal distance is between 0 and 1
                normal_distance = 1 - distance / R
                density += normal_distance**2
                density_near += normal_distance**3
                particle_2.rho += normal_distance**2
                particle_2.rho_near += normal_distance**3
                particle_1.neighbors.append(particle_2)
        particle_1.rho += density
        particle_1.rho_near += density_near


def create_pressure(particles: list[Particle]) -> None:
    """
    Calculates pressure force of particles
        Neighbors list and pressure have already been calculated by calculate_density
        We calculate the pressure force by summing the pressure force of each neighbor
        and apply it in the direction of the neighbor

    Args:
        particles (list[Particle]): list of particles
    """
    for particle in particles:
        press_x = 0.0
        press_y = 0.0
        for neighbor in particle.neighbors:
            particle_to_neighbor = [
                neighbor.x_pos - particle.x_pos,
                neighbor.y_pos - particle.y_pos,
            ]
            distance = sqrt(particle_to_neighbor[0] ** 2 + particle_to_neighbor[1] ** 2)
            normal_distance = 1 - distance / R
            total_pressure = (
                particle.press + neighbor.press
            ) * normal_distance**2 + (
                particle.press_near + neighbor.press_near
            ) * normal_distance**3
            pressure_vector = [
                particle_to_neighbor[0] * total_pressure / distance,
                particle_to_neighbor[1] * total_pressure / distance,
            ]
            neighbor.x_force += pressure_vector[0]
            neighbor.y_force += pressure_vector[1]
            press_x += pressure_vector[0]
            press_y += pressure_vector[1]
        particle.x_force -= press_x
        particle.y_force -= press_y


def calculate_viscosity(particles: list[Particle]) -> None:
    """
    Calculates the viscosity force of particles
    Force = (relative distance of particles)*(viscosity weight)*(velocity difference of particles)
    Velocity difference is calculated on the vector between the particles

    Args:
        particles (list[Particle]): list of particles
    """

    for particle in particles:
        for neighbor in particle.neighbors:
            particle_to_neighbor = [
                neighbor.x_pos - particle.x_pos,
                neighbor.y_pos - particle.y_pos,
            ]
            distance = sqrt(particle_to_neighbor[0] ** 2 + particle_to_neighbor[1] ** 2)
            normal_p_to_n = [
                particle_to_neighbor[0] / distance,
                particle_to_neighbor[1] / distance,
            ]
            relative_distance = distance / R
            velocity_difference = (particle.x_vel - neighbor.x_vel) * normal_p_to_n[
                0
            ] + (particle.y_vel - neighbor.y_vel) * normal_p_to_n[1]
            if velocity_difference > 0:
                viscosity_force = [
                    (1 - relative_distance)
                    * SIGMA
                    * velocity_difference
                    * normal_p_to_n[0],
                    (1 - relative_distance)
                    * SIGMA
                    * velocity_difference
                    * normal_p_to_n[1],
                ]
                particle.x_vel -= viscosity_force[0] * 0.5
                particle.y_vel -= viscosity_force[1] * 0.5
                neighbor.x_vel += viscosity_force[0] * 0.5
                neighbor.y_vel += viscosity_force[1] * 0.5


In [25]:
def update(particles: list[Particle], g_mag=None, g_ang=None) -> list[Particle]:
    """
    Calculates a step of the simulation
    """
    g_mag = G_MAG if g_mag is None else g_mag
    g_ang = G_ANG if g_ang is None else g_ang
    # Update the state of the particles (apply forces, reset values, etc.)
    for particle in particles:
        particle.update_state(g_mag, g_ang)

    # Calculate density
    calculate_density(particles)

    # Calculate pressure
    for particle in particles:
        particle.calculate_pressure()

    # Apply pressure force
    create_pressure(particles)

    # Apply viscosity force
    calculate_viscosity(particles)

    return particles




In [26]:
# Animation function for direct visualization
def animate(i: int):
    """
    Animates the simulation in matplotlib

    Args:
        i: frame number

    Returns:
        points: the points to be plotted
    """
    global simulation_state, frame

    simulation_state = update(simulation_state, g_ang=G_ANG+frame*pi/200)
    # Create an array with the x and y coordinates of the particles
    visual = np.array(
        [
            [particle.visual_x_pos, particle.visual_y_pos]
            for particle in simulation_state
        ]
    )
    POINTS.set_data(visual[:, 0], visual[:, 1])  # Updates the position of the particles
    frame += 1
    return (POINTS,)



In [27]:
# # Animate the actual particles
# fig = plt.figure()
# axes = fig.add_subplot(xlim=(-SIM_W, SIM_W), ylim=(0, SIM_H))
# (POINTS,) = axes.plot([], [], "o", ms=20, color="black")

# simulation_state = start(-SIM_W, SIM_W, BOTTOM, TOP, 0.03, N)

# frame = 0

# ani = animation.FuncAnimation(fig, animate, interval=10, blit=True)
# plt.show()

In [28]:
def hash_grid(particles, sim_w, sim_h, cell_size=CELL_SIZE, binary=True):
    """
    Computes a grid based on particle positions.

    Parameters:
        particles (list): List of Particle objects.
        sim_w (float): Half the width of the simulation area. 
                       x ranges from -sim_w to sim_w.
        sim_h (float): Height of the simulation area. y ranges from 0 to sim_h.
        cell_size (float): The size of each cell (both width and height).
        binary (bool): If True, each cell is marked 1 if at least one particle 
                       is present. If False, each cell contains the count of particles.

    Returns:
        numpy.ndarray: 2D array with either binary values or particle counts per cell.
    """
    # Calculate grid dimensions
    nx = int((2 * sim_w) / cell_size)  # cells along x-axis (from -sim_w to sim_w)
    ny = int(sim_h / cell_size)        # cells along y-axis (from 0 to sim_h)

    # Initialize grid: zeros for both binary and count modes.
    grid = np.zeros((ny, nx), dtype=int)

    for p in particles:
        # Map particle coordinates to grid indices:
        # Shift x by sim_w and then divide by cell_size.
        # Directly divide y by cell_size.
        x_index = int((p.x_pos + sim_w) / cell_size)
        y_index = int(p.y_pos / cell_size)

        # Check that indices are within bounds.
        if 0 <= x_index < nx and 0 <= y_index < ny:
            if binary:
                grid[y_index, x_index] = 1  # Mark cell as occupied
            else:
                grid[y_index, x_index] += 1  # Increment count

    return grid


In [29]:
def animate_grid(i: int):
    """
    Animates the simulation grid by updating the binary hash grid
    derived from the particle positions.

    Args:
        i: frame number

    Returns:
        GRID_IM: the updated grid image
    """
    global simulation_state, frame, GRID_IM

    # Update simulation state using the same update function
    simulation_state = update(simulation_state, g_ang=G_ANG + frame * np.pi/100)
    
    # Create grid data from the updated simulation state.
    # Here we use binary mode (cells are 1 if a particle is present)
    grid_data = hash_grid(simulation_state, SIM_W, SIM_H, binary=True)
    
    # Update the grid image with the new grid data
    GRID_IM.set_data(grid_data)
    
    frame += 1
    return (GRID_IM,)

In [30]:
# Setup matplotlib for grid animation
fig_grid = plt.figure()
ax_grid = fig_grid.add_subplot(xlim=(-SIM_W, SIM_W), ylim=(0, SIM_H))

# Compute initial grid from the current simulation state
# (Assume simulation_state has been created by start or similar function)
init_grid = hash_grid(start(-SIM_W, SIM_W, BOTTOM, TOP, 0.03, N), 2*SIM_W, SIM_H,)

# Create an image for the grid
GRID_IM = ax_grid.imshow(
    init_grid,
    cmap="binary",
    origin='lower',
    extent=(-SIM_W, SIM_W, 0, SIM_H),
    interpolation='none'
)

ani = animation.FuncAnimation(fig_grid, animate_grid, interval=10, blit=True)
plt.show()

  ani = animation.FuncAnimation(fig_grid, animate_grid, interval=10, blit=True)
