In [1]:
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.animation as animation
from functools import partial

# for matplotlib backends see: https://matplotlib.org/stable/users/explain/backends.html
# WHEN CHANGING MATPLOTLIB BACKEND, RESTART THE KERNEL

# use inline for basic inline graphs, but no animations
# %matplotlib inline

# use widget for inline animations
# %matplotlib widget

# use qt for interactive pop up windows of graphs --> must restart kernel
import matplotlib
matplotlib.use('QtAgg')
%matplotlib qt

### Define constants

In [2]:
grid_size_x = 15
grid_size_y = 10
grid_values_count = grid_size_x * grid_size_y
rho_0 = 1.
omega_i = np.array([4/9, 1/9, 1/9, 1/9, 1/9, 1/36, 1/36, 1/36, 1/36])
omega = 0.5

### Intialize arrays

In [3]:
# arrays defined in format y,x which is row, column if not stated differently

# Velocity directions c (defined by [x,y] components)
c_directions = np.array([
                [ 0, 0], # 0 ()
                [ 1, 0], # 1 (right)
                [ 0, 1], # 2 (up)
                [-1, 0], # 3 (left)
                [ 0,-1], # 4 (down)
                [ 1, 1], # 5 (right-up)
                [-1, 1], # 6 (left-up)
                [-1,-1], # 7 (left-down)
                [ 1,-1]  # 8 (right-down)
             ])

# invert the row values for streaming operation because up is represented by -1 and down by 1
c_directions_for_roll = c_directions * np.array([1, -1])

initial_density_per_position = rho_0 / float(grid_values_count) 
# indices: i = discretized velocity index (0-8), y = row, x = column
f_values_iyx = np.zeros((9, grid_size_y, grid_size_x))

#### Different density initializations

In [4]:
# evenly distribute density in all directions for all grid points
f_values_iyx = np.full((9, grid_size_y, grid_size_x), initial_density_per_position / 9.)

# add one impurity
f_values_iyx[:,6,8] = initial_density_per_position / 8.5

In [5]:
# calculate a "border", which is a vertical line, that seperates two directions
# this enables an initialization which can be used for a quick visual check of the streaming function
# inital_border_value = np.rint(grid_size_x/2).astype(np.uint8)
# f_values_iyx[5, :, inital_border_value:] = initial_density_per_position * 1/4
# f_values_iyx[1, :, 0:inital_border_value] = initial_density_per_position * 3/4

In [6]:
# density per position is zero except one specified non zero direction (the value refers to the index in c)
# f_values[1] = initial_density_per_position

### Functions

##### Density function

In [7]:
def compute_density_field(f_values_iyx):
    return np.sum(f_values_iyx, axis=0)

#### Velocity field function

In [8]:
def calc_density_and_average_velocity(f_values_iyx):

    # calculate current density
    density_values_yx = compute_density_field(f_values_iyx)

    # do: [:,np.newaxis] so that broadcasting works
    # multiply every element of f_values_iyx with it respective direction given by index i and then sum over all directions for a position jk
    # the resulting array has shape (y,x,2) and contains a velocity direction for every position and is divided by the respective density
    # indices: y = row, x = column, c = cartesian coordinates of respective velocity at position given in format [x,y]
    average_velocity_field_directions_yxc = np.divide(np.einsum("ijk, il->jkl", f_values_iyx, c_directions), density_values_yx[..., np.newaxis])
    
    # DISCRETIZATION HERE NOT NEEDED (i think so, because only when moving points on lattice the discretization is relevant)
    # numpy rounds 0.5 cases to nearest even value
    # average_velocity_field_directions_yxc = np.rint(average_velocity_field_directions_yxc).astype(np.uint8)
    # normalize the directions
    # average_velocity_field_directions_yxc = np.sign(average_velocity_field_directions_yxc)
    
    # new_direction = np.array([x_component, y_component])
    # # find index of direction in c that resembles the new direction
    # index = np.argwhere((c == new_direction).all(axis=1))[0][0]
    return density_values_yx, average_velocity_field_directions_yxc


#### Collision term

In [9]:
# u is the average velocity
# yx describes row, column
# c is cartesian coordinates [xy]
# i is the velocity discretization[0..8] <-- 9 values
def calc_collision_term(f_values_iyx, density_values_yx, u_yxc):
    # precompute terms
    u_norm_squared_yx = 1.5 * np.einsum("yxc, yxc -> yx", u_yxc, u_yxc)
    uc_yx = np.einsum("yxc, ic -> iyx", u_yxc, c_directions)
    f_eq_iyx = np.einsum('i,jk->ijk', omega_i, density_values_yx) * (1 + 3 * uc_yx + 4.5 * uc_yx**2 - u_norm_squared_yx)
    # return updated f values after collision
    return  (1-omega) * f_values_iyx + omega * f_eq_iyx


#### Streaming Function

In [10]:
def streaming(f_values_iyx):      
    # start at index 1 because direction 0 (0,0) does not change anything with roll call, axis=(1,0) means first in x direction, then in y direction because f is defined by rrow, col (=y,x) indices 
    for i in range(1, len(c_directions)): f_values_iyx[i] = np.roll(f_values_iyx[i], shift=c_directions_for_roll[i], axis=(1,0))
    return f_values_iyx


#### Update function that handles going to next time step

In [11]:
def update(f_values_iyx):
    f_values_iyx = streaming(f_values_iyx)
    density_values_yx, average_velocity_field_directions_yxc = calc_density_and_average_velocity(f_values_iyx)
    f_values_iyx = calc_collision_term(f_values_iyx, density_values_yx, average_velocity_field_directions_yxc)

    return f_values_iyx, density_values_yx, average_velocity_field_directions_yxc

### Visualization

#### Density plot

In [12]:
def plot_density_field(density_values_yx ,timestep = None):
    # Create a 3D figure
    fig = plt.figure()
    ax = fig.add_subplot(111, projection='3d')

    x_dim = density_values_yx.shape[1]
    y_dim = density_values_yx.shape[0]
    x,y = np.meshgrid(np.arange(x_dim), np.arange(y_dim))

    # Plot the surface
    ax.plot_surface(x, y, density_values_yx)

    ax.set_xlabel('x')
    ax.set_ylabel('y')
    title = "Density field"
    if timestep is not None:
        title += " for t={}".format(timestep)
    ax.set_title(title)

    plt.show()

#### Velocity field plot

In [13]:
# def plot_velocity_field(velocity_field_yxc, timestep : int = 0, animation_timesteps = 0, paused = False):
#     ani = None

#     # define grid of velocity field
#     x = np.arange(grid_size_x)
#     y = np.arange(grid_size_y)

#     X, Y = np.meshgrid(x, y)
#     fig, ax = plt.subplots()

#     # velocity x direction
#     U = velocity_field_yxc[:,:,0]
#     # velocity y direction
#     V = velocity_field_yxc[:,:,1]

#     stream = ax.streamplot(X, Y, U, V)
#     # ax.quiver(X, Y, c[velocity_values[:,:], 0], c[velocity_values[:,:]) is also a good option
#     ax.set_xticks(np.arange(0, grid_size_x, 1))
#     ax.set_yticks(np.arange(0, grid_size_y, 1))
#     ax.grid()

#     ax.set_xlabel('x')
#     ax.set_ylabel('y')

#     title = "Velocity field for t={}".format(timestep)
#     ax.set_title(title)

#     if animation_timesteps > 0:

#         def animate(i, f_values_iyx , ax):
#             ax.cla()
#             f_values_iyx, _, velocity_field_xy = update(f_values_iyx)
#             U = velocity_field_xy[:,:,0]
#             V = velocity_field_xy[:,:,1]
#             stream = ax.streamplot(X, Y, U, V)
#             title = "Velocity field for t={}".format(timestep + i)
#             ax.set_title(title)
#             return stream,
    
#         ani = animation.FuncAnimation(fig, func=partial(animate, f_values_iyx = f_values_iyx, ax = ax), frames=animation_timesteps, interval=200, blit=False, save_count=50)
#         def toggle_pause(self, *args, **kwargs):
#             if paused:
#                 ani.resume()
#             else:
#                 ani.pause()
#             paused = not paused
#         fig.canvas.mpl_connect('button_press_event', toggle_pause)
#     return ani

In [17]:
class Velocity_field_animation:
    def __init__(self, velocity_field_yxc, grid_size, timestep = 0):

        # Start with a normal distribution
        self.timestep = timestep
        # define grid of velocity field
        x = np.arange(grid_size[0])
        y = np.arange(grid_size[1])
        self.x_lim = grid_size[0]
        self.y_lim = grid_size[1]

        self.X, self.Y = np.meshgrid(x, y)
        self.fig, self.ax = plt.subplots()

        # velocity x direction
        U = velocity_field_yxc[:,:,0]
        # velocity y direction
        V = velocity_field_yxc[:,:,1]

        self.streamplot = self.ax.streamplot(self.X, self.Y, U, V)
        # ax.quiver(X, Y, c[velocity_values[:,:], 0], c[velocity_values[:,:]) is also a good option
        self.ax.set_xlim(0, self.x_lim)
        self.ax.set_ylim(0, self.y_lim)
        self.ax.set_xticks(np.arange(0, self.x_lim, 1))
        self.ax.set_yticks(np.arange(0, self.y_lim, 1))
        self.ax.grid()

        self.ax.set_xlabel('x')
        self.ax.set_ylabel('y')

        title = "Velocity field for t={}".format(timestep)
        self.ax.set_title(title)

        self.animation = None
        self.paused = False
        self.animation_timesteps = 20

        self.fig.canvas.mpl_connect('button_press_event', self.toggle_pause)

    def toggle_pause(self, *args, **kwargs):
        if self.paused:
            self.animation.resume()
        else:
            self.animation.pause()
        self.paused = not self.paused

    def update_animation(self, i, f_values_iyx):
        self.ax.cla()
        f_values_iyx, _, velocity_field_xy = update(f_values_iyx)
        U = velocity_field_xy[:,:,0]
        V = velocity_field_xy[:,:,1]
        self.streamplot = self.ax.streamplot(self.X, self.Y, U, V)
        self.timestep += 1
        # if self.timestep > self.animation_timesteps:
        #     print("TIMESTEP RESETTED")
        #     self.timestep = 0
        title = "Velocity field for t={}".format(self.timestep)
        self.ax.set_title(title)
        self.ax.set_xlim(0, self.x_lim)
        self.ax.set_ylim(0, self.y_lim)
        self.ax.set_xticks(np.arange(0, self.x_lim, 1))
        self.ax.set_yticks(np.arange(0, self.y_lim, 1))
        self.ax.grid()
        return self.streamplot,

    def start_animation(self, f_values_iyx, animation_timesteps = 20, delay_interval = 200):
        self.animation_timesteps = animation_timesteps
        self.animation = animation.FuncAnimation(self.fig, func=partial(self.update_animation, f_values_iyx = f_values_iyx), frames=animation_timesteps, interval=delay_interval, blit=False, save_count=50, repeat = False)


### Test

In [18]:
# testing velocity field animation
density_values_yx, velocity_field_yxc = calc_density_and_average_velocity(f_values_iyx)
# TODO: CHECK IF ANIMATION IS CORRECT (ARROW DIRECTION AND VELOCITY FIELD INITAL VALUES)
animation_plot = Velocity_field_animation(velocity_field_yxc, grid_size=[grid_size_x, grid_size_y], timestep=0)
animation_plot.start_animation(f_values_iyx, animation_timesteps=20, delay_interval = 1000)
# anim = plot_velocity_field(velocity_field_yxc, timestep=0, animation_timesteps=20)
plt.show()

  self.animation = animation.FuncAnimation(self.fig, func=partial(self.update_animation, f_values_iyx = f_values_iyx), frames=animation_timesteps, interval=delay_interval, blit=False, save_count=50, repeat = False)


In [33]:
# mass = density * volume
full_volume = grid_values_count

# initial computation of velocity field for visualization
density_values_yx, velocity_field_yxc = calc_density_and_average_velocity(f_values_iyx)
plot_density_field(density_values_yx, 0)
plot_velocity_field(velocity_field_yxc, 0)

full_density = np.sum(density_values_yx)
mass_before_update = full_density * full_volume

# do a timestep update and plot
f_values_iyx, density_values_yx, velocity_field_yxc = update(f_values_iyx)
plot_density_field(density_values_yx, 1)
plot_velocity_field(velocity_field_yxc, 1)

full_density = np.sum(density_values_yx)
mass_after_update = full_density * full_volume

print("Mass before update = {}".format(mass_before_update))
print("Mass after update = {}".format(mass_after_update))

NameError: name 'plot_velocity_field' is not defined