# FD Lid-Driven Cavity Flow - Animation

## General Infromation about Running the Code 


In [6]:
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.colors import SymLogNorm # This will be used to get a better color scale for vorticity, since it has large boundary values
from matplotlib.animation import FuncAnimation, PillowWriter
import os
from tqdm import tqdm
from numba import njit


print_titles = True # Set to True for show titles in figures for standalone use, False for the report
tdqm_installed = True # Set to True if tqdm is installed, and uncomment the line above
libertine_installed = True # Only set this to true if the font Linux Libertine is installed

#******

# Create data and figures folders if they do not exist
folder = "data"
if not os.path.exists(folder):
    os.mkdir(folder)

folder = "gifs"
if not os.path.exists(folder):
    os.mkdir(folder)
    

## Visualization

In [3]:
plt.rcParams.update({
    "axes.prop_cycle": plt.cycler(color=["black"] 
                            + plt.rcParams['axes.prop_cycle'].by_key()['color']),
    "font.family": "serif",             # Serif fonts (e.g., Times-like)
    "font.size": 14,                    # Base font size
    "mathtext.fontset": "cm",           # Math Font to match a bit better
    "legend.fontsize": 12,
    "axes.labelsize": 14,
    "axes.titlesize": 14,
    "xtick.labelsize": 12,
    "ytick.labelsize": 12,
    "figure.dpi": 300,                  # High resolution
    "savefig.dpi": 300,
    "axes.linewidth": 0.8,              # Thin axis lines
    "xtick.direction": "in",            # Ticks pointing inward
    "ytick.direction": "in",
    "xtick.top": True,                  # Ticks on all sides
    "ytick.right": True,
    "xtick.minor.visible": True,
    "ytick.minor.visible": True,
    "grid.linewidth": 0.4
})

if libertine_installed: plt.rcParams.update({"font.serif": ["Linux Libertine"]})  # Same font as in LaTeX)

fsize_small = (4,2.75) # for a single colum in double colum
fsize_large = (6.5,4)

## Building the LDC Simulation Class

In [12]:
class LDC_anim:
    def __init__(self,h,dt,Re,grid_size, vx_top=1, vx_bottom=0):
        self.h = h
        self.dt = dt 
        self.Re = Re
        self.grid_size = grid_size
        self.vx_top = vx_top
        self.vx_bottom = vx_bottom
        self.initialize()

    # =====
    def initialize(self): # Careful, for changing grid_size you have to reassign the whole class`
        self.N_y, self.N_x = self.grid_size # first y then x, so if we print it out it corresponds to the figure

    def update_stream_SOR(self, u, w, omega, max_iter, tol):
        '''
        Update stream function using Successive Over-Relaxation (SOR) method.

        INPUT
        -----
        u: (Ny,Nx)-ndarray 
            initial stream function
        w: (Ny,Nx)-ndarray
            vorticity
        omega: float
            relaxation factor
        max_iter: int
            maximum number of iterations
        tol: float
            tolerance for convergence

        OUTPUT
        ------
        u: (Ny,Nx)-ndarray
            updated stream function
        '''
        for it in range(max_iter):
            max_r = 0.0
            for j in range(1, self.N_y-1):
                for i in range(1, self.N_x-1):
                    u_new = 0.25 * (u[j+1,i] + u[j-1,i] + u[j,i+1] + u[j,i-1] + self.h*self.h * w[j,i])
                    r = u_new - u[j,i]
                    u[j,i] += omega * r
                    max_r = max(max_r, abs(r))

            if max_r < tol: break

        return u
    
    # =====
    def update_vorticity(self, w, u):
        ''' 
        Update vorticity with finite differences.

        INPUT
        -----
        w: (Ny,Nx)-ndarray
            initial vorticity
        u: (Ny,Nx)-ndarray
            stream function

        OUTPUT
        ------
        w_new: (Ny,Nx)-ndarray
            updated vorticity
        '''
        w_new = np.copy(w)

        termA = (u[1:-1, 2:] - u[1:-1, :-2]) * (w[2:, 1:-1] - w[:-2, 1:-1])
        termB= (u[2:, 1:-1] - u[:-2, 1:-1]) * (w[1:-1, 2:] - w[1:-1, :-2])
        termC = w[1:-1, 2:] + w[1:-1, :-2] + w[2:, 1:-1] + w[:-2, 1:-1] - 4 * w[1:-1, 1:-1]

        w_new[1:-1, 1:-1] = (w[1:-1, 1:-1] + self.dt / (self.h * self.h) * (0.25 * (termA - termB) + (1 / self.Re) * termC))

        return w_new
    
    #=====
    @staticmethod
    @njit
    def update_pressure_SOR(p, u, h, omega, max_iter, tol):
        ''' 
        IMPORTANT: this uses numba's njit for speed-up, which translates the code to non-Python code. This is WAYYY more efficient since 
        python kinda sucks for loops. (I also tried to vectorize it but it was still too slow since termA cannot be vectorized in SOR.)
        HOWEVER, numba cannot take class variables, so we have to take it out of the class by staticmethod and pass h as argument.
        We are also using red-black SOR which also improves performance and ram usage apparently (source: some guy on stackoverflow).

        I also tried a version with vectorized termB and termC, but it did not change much when using numba.

        INPUT
        -----
        p: (Ny,Nx)-ndarray 
            initial pressure
        u: (Ny,Nx)-ndarray
            stream function
        h: float
            grid spacing
        omega: float
            relaxation factor
        max_iter: int
            maximum number of iterations
        tol: float
            tolerance for convergence   

        OUTPUT
        ------
        p: (Ny,Nx)-ndarray
            updated pressure
        '''
        N_y, N_x = p.shape
        factorC = 1/(8*h**2)
        for it in range(max_iter):
            max_r = 0.0

            for color in [0, 1]:  # Red-black SOR
                for j in range(1, N_y-1):
                    for i in range((j+color) % 2 +1, N_x-1, 2): # This selects every second grid point, shifted depending on color value
                        termA = (p[j,i+1] + p[j,i-1] + p[j+1,i] + p[j-1,i])
                        termB = 2 * (u[j,i+1] - 2 * u[j,i] + u[j,i-1]) * (u[j+1,i] - 2 * u[j,i] + u[j-1,i]) / (h*h)
                        termC = factorC * (u[j+1,i+1] - u[j-1,i+1] - u[j+1,i-1] + u[j-1,i-1])**2
                        p_new = (0.25) * (termA - termB - termC)

                        r = p_new - p[j,i]
                        p[j,i] += omega * r
                        max_r = max(max_r, abs(r))

            if max_r < tol: break

        return p
    # =====
    def vorticity_boundary(self, w, u):
        '''
        Apply boundary conditions for vorticity.
        
        INPUT
        -----
        w: (Ny,Nx)-ndarray
            initial vorticity
        u: (Ny,Nx)-ndarray
            stream function
            
        OUTPUT
        ------
        w: (Ny,Nx)-ndarray
            updated vorticity with boundary conditions applied
        '''
        factor = 2/(self.h*self.h)
        w[-1, :] = - factor * u[-2,:] - (2/self.h) * self.vx_top    # Top wall (A)
        w[:, -1] = - factor * u[:,-2]                               # Right wall (B)
        w[0, :] = - factor * u[1, :] + (2/self.h) * self.vx_bottom  # Bottom wall (C)
        w[:, 0] = - factor * u[:, 1]                                # Left wall (D)

        return w
    
    # =====
    def pressure_boundary(self, p, w):
        ''' 
        Apply boundary conditions for pressure.
        INPUT
        -----
        p: (Ny,Nx)-ndarray
            initial pressure
        w: (Ny,Nx)-ndarray
            vorticity   

        OUTPUT
        ------
        p: (Ny,Nx)-ndarray
            updated pressure with boundary conditions applied
        '''  
        # (A) top wall
        for i in range(self.N_x-2):
            p[-1,i+1] = p[-1,i] - 1/self.Re * (w[-1,i] - w[-2,i])
        # (B) right wall
        for j in range(self.N_y-2):
            p[j+1,-1] = p[j,-1] + 1/self.Re * (w[j,-1] - w[j,-2])
        # (C) bottom wall
        for i in range(self.N_x-2):
            p[0,i+1] = p[0,i] - 1/self.Re * (w[1,i] - w[0,i])
            # (D) left wall
        for j in range(self.N_y-2):
            p[j+1,0] = p[j,0] + 1/self.Re * (w[j,1] - w[j,0])
            
        return p
    

    # =====
    def test_stability(self):
        '''
        Test stability condition for the simulation.
        '''
        stability = self.dt/(self.Re*self.h*self.h)
        if stability > 0.25:
            print(f'Warning: Stability condition not met! value: {stability:.3f}')
        else:
            print(f'Stability criterion value: {stability:.3f}')

        return None
    
    
    # =====
    def system_evolution(self, 
                    n_t,
                    sps = 30, # saves per second for each field
                    SOR_omega=1.9,  
                    SOR_max_iter=1000, 
                    SOR_tol=1e-5,
                    ):
        '''
        Simulate the time evolution of the lid-driven cavity flow.
        INPUT
        -----
        n_t: int
            number of time steps
        sps: int
            saves per second for each field
        SOR_omega: float
            relaxation factor for SOR
        SOR_max_iter: int
            maximum number of iterations for SOR
        SOR_tol: float
            tolerance for SOR

        OUTPUT
        ------
        u_history: (n_saves,Ny,Nx)-ndarray
            history of stream function
        w_history: (n_saves,Ny,Nx)-ndarray
            history of vorticity
        p_history: (n_saves,Ny,Nx)-ndarray
            history of pressure
        
        '''
        self.test_stability()

        k_save = int(1/self.dt/sps)
        n_saves = int(np.floor(n_t / k_save) + 1)

        # Initialize storage arrays and arrays at t=0
        u = np.zeros((self.N_y, self.N_x)) # u at t=0 is 0
        w = np.zeros((self.N_y, self.N_x))
        w[-1, :] = -2 * self.vx_top / self.h  # Top wall vorticity at t=0 (u=0 everywhere)
        w[0, :] = 2 * self.vx_bottom / self.h  # Bottom wall vorticity at t=0 

        p = np.zeros((self.N_y, self.N_x))  # Initialize pressure array

        u_history = np.zeros((n_saves, self.N_y, self.N_x)) 
        w_history = np.zeros((n_saves, self.N_y, self.N_x))
        p_history = np.zeros((n_saves, self.N_y, self.N_x)) 
        w_history[0] = w
        i=1


        if tdqm_installed:
            loop = tqdm(range(1,n_t))
        else:
            loop = range(1,n_t)
            
        for n in loop:
            # Update stream function
            u = self.update_stream_SOR(u, w, omega=SOR_omega, max_iter=SOR_max_iter, tol=SOR_tol)
            w = self.vorticity_boundary(w, u)
            w = self.update_vorticity(w, u)

            p = self.pressure_boundary(p, w)
            p = self.update_pressure_SOR(p, u, self.h, omega=SOR_omega, max_iter=SOR_max_iter, tol=SOR_tol)

            if n % k_save == 0:
                u_history[i] = u
                w_history[i] = w
                p_history[i] = p
                i += 1
            

        return u_history, w_history, p_history
       

In [5]:
def get_velocities(u, h):
    '''
    '''

    vx = np.zeros_like(u)
    vy = np.zeros_like(u)

    vx[1:-1, :] = (u[2:,:] - u[:-2,:]) / (2*h) # Central differences
    vy[:,1:-1] = - (u[:,2:] - u[:,:-2]) / (2*h) # Central differences

    # Boundaries
    vx[-1,:] = (u[-1,:] - u[-2,:]) / h  # Top wall need backwards difference
    vx[0,:] = (u[1,:] - u[0,:]) / h     # Bottom wall need forwards difference
    vy[:,0] = - (u[:,1] - u[:,0]) / h   # Left wall need forwards difference
    vy[:,-1] = - (u[:,-1] - u[:,-2]) / h # Right wall need backwards difference

    # For the plot, we will also return the mashgrid straight away
    N_y, N_x = u.shape
    x = np.linspace(0, 1, N_x)
    y = np.linspace(0, 1, N_y)
    X, Y = np.meshgrid(x, y)

    return vx, vy, X, Y

Simulate the data, for one Re, bottom lid moving in same direction.

In [13]:
grid_size = (50,50)
T = 15
SOR_tol = 1e-4


# *********
h = 1/grid_size[0] 
dt = 0.01*h*h*10 
nt = int(T/dt) 


LDC = LDC_anim(
    h=h,
    dt=0.001,
    Re=200,
    grid_size=grid_size,
    vx_top=1,
    vx_bottom=1
)

u, w, p = LDC.system_evolution(
    n_t=nt,
    sps=30,
    SOR_tol=SOR_tol)

Stability criterion value: 0.013


100%|██████████| 374998/374998 [27:43<00:00, 225.46it/s]


Creating the gif:

In [None]:

# u, w, p have shape (n_saves, Ny, Nx)
n_saves = u.shape[0]

# Shared color limits
u_min, u_max = u.min(), u.max()
w_abs = abs(w).max()
p_abs = abs(p).max()

# Create figure with 3 rows
fig, axes = plt.subplots(3, 1, figsize=(6, 10))

# Initial images for frame 0
im0 = axes[0].imshow(
    u[0], origin="lower", extent=[0, 1, 0, 1],
    cmap="Purples_r", vmin=u_min, vmax=u_max
)
axes[0].set_ylabel("y")
axes[0].set_title("Stream function")

im1 = axes[1].imshow(
    w[0], origin="lower", extent=[0, 1, 0, 1],
    cmap="PuOr_r", norm=SymLogNorm(linthresh=2, linscale=1.0, vmin=-w_abs, vmax=w_abs)
)
axes[1].set_ylabel("y")
axes[1].set_title("Vorticity")

im2 = axes[2].imshow(
    p[0], origin="lower", extent=[0, 1, 0, 1],
    cmap="PuOr_r", vmin=-p_abs, vmax=p_abs
)
axes[2].set_xlabel("x")
axes[2].set_ylabel("y")
axes[2].set_title("Pressure")

# Colorbars
fig.colorbar(im0, ax=axes[0], fraction=0.03).set_label("Stream function")
fig.colorbar(im1, ax=axes[1], fraction=0.03).set_label("Vorticity")
fig.colorbar(im2, ax=axes[2], fraction=0.03).set_label("Pressure")

plt.tight_layout()


# Update function for animation
def update(frame):
    im0.set_data(u[frame])
    im1.set_data(w[frame])
    im2.set_data(p[frame])
    axes[0].set_title(f"Stream function (frame {frame})")
    return im0, im1, im2


# Create animation
anim = FuncAnimation(
    fig, update, frames=n_saves,
    interval=80,  # milliseconds between frames
    blit=False
)

# Save as GIF (optional)
anim.save("gifs/Re200.gif", writer=PillowWriter(fps=12))

plt.show()
