<a href="https://colab.research.google.com/github/costpetrides/Fluid-Dynamics-Navier-Stokes/blob/main/Navier-Stokes-Simulation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install cmasher

Collecting cmasher
  Downloading cmasher-1.9.2-py3-none-any.whl.metadata (7.9 kB)
Collecting colorspacious>=1.1.0 (from cmasher)
  Downloading colorspacious-1.1.2-py2.py3-none-any.whl.metadata (3.6 kB)
Downloading cmasher-1.9.2-py3-none-any.whl (506 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m506.5/506.5 kB[0m [31m2.2 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading colorspacious-1.1.2-py2.py3-none-any.whl (37 kB)
Installing collected packages: colorspacious, cmasher
Successfully installed cmasher-1.9.2 colorspacious-1.1.2


In [2]:
import numpy as np
import scipy.sparse.linalg as splinalg
from scipy import interpolate
import matplotlib.pyplot as plt
import cmasher as cmr
from tqdm import tqdm
import imageio  # For GIF/Video creation
import os

DOMAIN_SIZE = 1.0
N_POINTS = 41
N_TIME_STEPS = 100
TIME_STEP_LENGTH = 0.1

KINEMATIC_VISCOSITY = 0.0000000001

MAX_ITER_CG = None

# Output directory for frames
OUTPUT_DIR = "frames"
os.makedirs(OUTPUT_DIR, exist_ok=True)

def forcing_function(time, point):
    x1, y1 = 0.4, 0.4
    x2, y2 = 0.6, 0.6
    magnitude = 3
    direction = np.array([1, -1.0])

    if x1 <= point[0] <= x2 and y1 <= point[1] <= y2:
        forced_value = magnitude * direction
    else:
        forced_value = np.array([0.0, 0.0])

    return forced_value

def main():
    element_length = DOMAIN_SIZE / (N_POINTS - 1)
    scalar_shape = (N_POINTS, N_POINTS)
    scalar_dof = N_POINTS ** 2
    vector_shape = (N_POINTS, N_POINTS, 2)
    vector_dof = N_POINTS ** 2 * 2

    x = np.linspace(0.0, DOMAIN_SIZE, N_POINTS)
    y = np.linspace(0.0, DOMAIN_SIZE, N_POINTS)

    X, Y = np.meshgrid(x, y, indexing="ij")
    coordinates = np.concatenate(
        (X[..., np.newaxis], Y[..., np.newaxis]), axis=-1
    )

    forcing_function_vectorized = np.vectorize(
        pyfunc=forcing_function, signature="(),(d)->(d)"
    )

    def partial_derivative_x(field):
        diff = np.zeros_like(field)
        diff[1:-1, 1:-1] = (
            (field[2:, 1:-1] - field[0:-2, 1:-1]) / (2 * element_length)
        )
        return diff

    def partial_derivative_y(field):
        diff = np.zeros_like(field)
        diff[1:-1, 1:-1] = (
            (field[1:-1, 2:] - field[1:-1, 0:-2]) / (2 * element_length)
        )
        return diff

    def laplace(field):
        diff = np.zeros_like(field)
        diff[1:-1, 1:-1] = (
            (
                field[0:-2, 1:-1]
                + field[1:-1, 0:-2]
                - 4 * field[1:-1, 1:-1]
                + field[2:, 1:-1]
                + field[1:-1, 2:]
            )
            / (element_length ** 2)
        )
        return diff

    def divergence(vector_field):
        return (
            partial_derivative_x(vector_field[..., 0])
            + partial_derivative_y(vector_field[..., 1])
        )

    def gradient(field):
        return np.concatenate(
            (
                partial_derivative_x(field)[..., np.newaxis],
                partial_derivative_y(field)[..., np.newaxis],
            ),
            axis=-1,
        )

    def curl_2d(vector_field):
        return (
            partial_derivative_x(vector_field[..., 1])
            - partial_derivative_y(vector_field[..., 0])
        )

    def advect(field, vector_field):
        backtraced_positions = np.clip(
            (coordinates - TIME_STEP_LENGTH * vector_field),
            0.0,
            DOMAIN_SIZE,
        )
        return interpolate.interpn(
            points=(x, y), values=field, xi=backtraced_positions
        )

    def diffusion_operator(vector_field_flattened):
        vector_field = vector_field_flattened.reshape(vector_shape)
        return (
            vector_field
            - KINEMATIC_VISCOSITY * TIME_STEP_LENGTH * laplace(vector_field)
        ).flatten()

    def poisson_operator(field_flattened):
        field = field_flattened.reshape(scalar_shape)
        return laplace(field).flatten()

    velocities_prev = np.zeros(vector_shape)
    time_current = 0.0

    # Save frames for GIF/Video
    frame_files = []

    for i in tqdm(range(N_TIME_STEPS)):
        time_current += TIME_STEP_LENGTH

        forces = forcing_function_vectorized(time_current, coordinates)
        velocities_forces_applied = velocities_prev + TIME_STEP_LENGTH * forces
        velocities_advected = advect(
            field=velocities_forces_applied,
            vector_field=velocities_forces_applied,
        )

        velocities_diffused = splinalg.cg(
            A=splinalg.LinearOperator(
                shape=(vector_dof, vector_dof),
                matvec=diffusion_operator,
            ),
            b=velocities_advected.flatten(),
            maxiter=MAX_ITER_CG,
        )[0].reshape(vector_shape)

        pressure = splinalg.cg(
            A=splinalg.LinearOperator(
                shape=(scalar_dof, scalar_dof),
                matvec=poisson_operator,
            ),
            b=divergence(velocities_diffused).flatten(),
            maxiter=MAX_ITER_CG,
        )[0].reshape(scalar_shape)

        velocities_projected = velocities_diffused - gradient(pressure)
        velocities_prev = velocities_projected

        # Plot and save each frame
        plt.figure(figsize=(5, 5), dpi=160)
        curl = curl_2d(velocities_projected)
        plt.contourf(
            X, Y, curl, cmap=cmr.redshift, levels=100,
        )
        plt.quiver(
            X, Y, velocities_projected[..., 0], velocities_projected[..., 1], color="cyan",
        )
        frame_file = f"{OUTPUT_DIR}/frame_{i:04d}.png"
        plt.savefig(frame_file)
        frame_files.append(frame_file)
        plt.close()

    # Create GIF
    gif_path = "simulation.gif"
    with imageio.get_writer(gif_path, mode="I", duration=0.1) as writer:
        for frame_file in frame_files:
            image = imageio.imread(frame_file)
            writer.append_data(image)

    print(f"GIF saved at {gif_path}")

    # Optional: Create video
    video_path = "simulation.mp4"
    with imageio.get_writer(video_path, fps=10) as writer:
        for frame_file in frame_files:
            image = imageio.imread(frame_file)
            writer.append_data(image)

    print(f"Video saved at {video_path}")


if __name__ == "__main__":
    main()


100%|██████████| 100/100 [00:25<00:00,  4.00it/s]
  image = imageio.imread(frame_file)


GIF saved at simulation.gif


  image = imageio.imread(frame_file)


Video saved at simulation.mp4
