# Vorticity Streamfunction Equations (Cont.)
#### Hunter Lybbert
#### Student ID 2426454
#### 11-22-24

In [13]:
from typing import Optional

import numpy as np
from scipy.integrate import solve_ivp
from scipy.sparse import spdiags, dia_matrix
from matplotlib import pyplot as plt

### This section is from HW 04
Helpful references are in the textbook, [Data-Driven Modeling & Scientific Computation](https://faculty.washington.edu/kutz/kutz_book_v2.pdf) by J. Nathan Kutz,  on pages 209 and 221.

In [14]:
def partial_in_x(n: int) -> dia_matrix:
    """
    :param n: the dimension of the (n, n) matrix
    """
    upper = np.ones(n*n)
    lower = -1*np.ones(n*n)

    matrix = spdiags([upper, lower, lower, upper], diags=np.array([n, -n, n*(n - 1), -n*(n - 1)]))

    return matrix


def partial_in_y(n: int) -> dia_matrix:
    """
    :param n: the dimension of the (n, n) matrix
    """
    upper = np.tile(np.repeat([0, 1], [1, n-1]), n)
    lower = np.tile(np.repeat([-1, 0], [n-1, 1]), n)

    upper_wrap_around = np.tile(np.repeat([0, -1], [n-1, 1]), n)
    lower_wrap_around = np.tile(np.repeat([1, 0], [1, n-1]), n)

    matrix = spdiags([upper, lower, upper_wrap_around, lower_wrap_around], diags=np.array([1, -1, n - 1, -n + 1]))

    return matrix


def second_gradient_x_y(n: int) -> dia_matrix:
    """
    :param n: the dimension of the (n, n) matrix
    """
    main_diag = -4*np.ones(n*n)
    upper = np.tile(np.repeat([0, 1], [1, n-1]), n)
    lower = np.tile(np.repeat([1, 0], [n-1, 1]), n)
    
    upper_wrap_around = np.tile(np.repeat([0, 1], [n-1, 1]), n)
    lower_wrap_around = np.tile(np.repeat([1, 0], [1, n-1]), n)

    off_diag = np.ones(n*n)

    data = [
        main_diag,
        upper,
        lower,
        upper_wrap_around,
        lower_wrap_around,
        off_diag,
        off_diag,
        off_diag,
        off_diag
    ]
    diags = np.array(
        [
            0,
            1,
            -1,
            n - 1,
            -n + 1,
            n*(n - 1),
            -n*(n - 1),
            n,
            -n
        ]
    )

    matrix = spdiags(data, diags=diags)

    return matrix


def build_matrices(
    n: int,
    delta_x: float,
    delta_y: Optional[float] = None,
    output_as_np: Optional[bool] = False
) -> np.array:
    """
    
    :param n: the dimension of the (n,n) matrices
    :param delta_x: the step size in x
    :param delta_y: the step size in y
    :param output_as_np: 

    """
    if not delta_y:
        delta_y = delta_x

    if delta_x == delta_y:
        A = (1/(delta_x**2))*second_gradient_x_y(n)
        B = (1/(2*delta_x))*partial_in_x(n)
        C = (1/(2*delta_x))*partial_in_y(n)

    else:
        raise NotImplementedError("Currently do not support different step sizes in x and y")
    
    if output_as_np:
        A = A.toarray()
        B = B.toarray()
        C = C.toarray()

    return A, B, C

In [15]:
def prob1(L, n) -> tuple[np.array]:
    """
    :param L: the upper bound on the range (-L,L) to solve the diff eq on
    :param n: the number of intervals to chop up out 2 spatial dimensions
    """
    delta_x = (L - (-L)) / n

    return build_matrices(n, delta_x=delta_x, output_as_np=True)

A1, A2, A3 = prob1(L=10, n=8)

In [16]:
fig, ax = plt.subplots(1,3)
fig.set_figwidth(22)
fig.set_figheight(10)


ax[0].matshow(A1, cmap='viridis')
ax[0].set_title("A")
ax[1].matshow(A2, cmap='viridis')
ax[1].set_title("B")
ax[2].matshow(A3, cmap='viridis')
ax[2].set_title("C")
plt.show()

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

### Now we begin HW 5

#### A little pseudo code and math latex helpful stuff

set $\omega^t = \omega^0$

`For t in time:`

1) Solve $$A\psi^t = \omega^t$$
2) Time step update $$\omega^{t+1} = \omega^t + \Delta t \left[ \nu A\omega^t - \left( B\psi^t C\omega^t - C\psi^tB\omega^t \right) \right]$$

In [17]:
def omega_0(
    x: np.array,
    y: np.array,
    amplitude: float = 1,
    variance_in_x: float = 4,
    variance_in_y: float = 1,
) -> np.array:
    """
    Initial conditions for vorticity aka omega.

    :param x: the array indicating the linspace in the x spatial dimenstino
    :param y: the array indicating the linspace in the y spatial dimenstino
    :param amplitude: the height of the gaussian at x,y = (0,0)
    :param variance_in_x: measure of the spread of the gaussian in the x direction
    :param variance_in_y: measure of the spread of the gaussian in the y direction

    """
    exponent = (
        -x**2/variance_in_x
        -y**2/variance_in_y
    )
    return amplitude*np.exp(exponent)
    

In [27]:
import os

from typing import Any
from enum import Enum

from scipy.fftpack import fft2, ifft2

import matplotlib.animation as animation
from IPython.display import HTML

%matplotlib notebook

In [None]:
def incriment_file(base_filename: str, directory: str) -> None:
    """
    returns file name incrimented appropriately
    """
    # Extract the name and extension
    name, ext = os.path.splitext(base_filename)
    path = os.path.join(directory, base_filename)

    # Check if the file already exists
    counter = 0
    while os.path.exists(path):
        # Add or update the suffix
        path = os.path.join(directory, f"{name}_{counter}{ext}")
        counter += 1

    return path

def fourier_solve(
    omega: np.array,
    kx: np.array,
    ky: np.array,
) -> np.array:
    """This should be an nxn array in and out"""
    derivative_terms = kx**2 + ky**2
    omega_ft = -fft2(omega)/derivative_terms

    psi = ifft2(omega_ft)
    return psi


class LinearSystemSolveMethods(Enum):
    """
    Options of how to solve system
    """
    fourier = "fourier"
    basic_ab = "basic_ab"
    lu_decomp = "lu_decomp"
    bicgstab = "BICGSTAB"
    gmres = "GMRES"


class VorticityStreamFunctionSolver:
    """A clase to configure and solve the Vorticity Stream Function Differential Equation."""
    
    def __init__(self, nu: float, L: int, n: float) -> None:
        """
        Initialize Everything.
        
        :param nu: customizable parameter, no clear interpretation yet

        :returns: None
        """
        self.L: int = L
        self.n: float = n
        self.nu: float = nu

        self._setup_omega_0()
        self._setup_diff_matrices()
        self._setup_wavenumbers()

    def _setup_omega_0(self) -> None:
        """"""
        x_full = np.linspace(-self.L, self.L, self.n + 1)
        y_full = np.linspace(-self.L, self.L, self.n + 1)

        x_trunc = x_full[:self.n]
        y_trunc = y_full[:self.n]

        self.X, self.Y = np.meshgrid(x_trunc, y_trunc)
        omega_0_mat: np.array = omega_0(
            x=self.X,
            y=self.Y,
            variance_in_x=1,
            variance_in_y=20,  # this is provided from the assignment
        )
        omega_0_vec: np.array = omega_0_mat.reshape(self.n*self.n)
        self.omega_0 = omega_0_vec

    def _setup_diff_matrices(self) -> None:
        delta_x = ( self.L - (-self.L))/self.n
        self.A, self.B, self.C = build_matrices(n=self.n, delta_x=delta_x)

    def _setup_wavenumbers(self) -> None:
        """
        """
        scale_factor = 2 * np.pi / (self.L - (-self.L))
        kx = scale_factor * np.concatenate((np.arange(0, self.n/2), np.arange(-self.n/2, 0)))
        ky = scale_factor * np.concatenate((np.arange(0, self.n/2), np.arange(-self.n/2, 0)))

        # avoid divide by zero with floating point precision error
        kx[0] = 1e-6
        ky[0] = 1e-6
        self.kx, self.ky = np.meshgrid(kx, ky)
    
    def omega_ode_rhs(self, t: float, omega: np.array):
        """Define the ODE."""
        omega_mat = omega.reshape((self.n, self.n))
        
        psi_mat = np.real(fourier_solve(omega_mat, kx=self.kx, ky=self.ky))  # expand this in the future for other linear solvers
        psi = psi_mat.reshape(self.n*self.n)

        diffusion = self.nu * self.A@omega
        advection = ((self.B@psi) * (self.C@omega)) - ((self.C@psi) * (self.B@omega))
        final = diffusion - advection

        return final
    
    def solve(self, tspan: np.array) -> None:
        """Solve the ODE with solve_ivp."""
        self.omega_sol = solve_ivp(
            self.omega_ode_rhs,
            t_span=(tspan[0], tspan[-1]),
            y0=self.omega_0,
            method="RK45",
            t_eval=tspan,
        )
        return self.omega_sol

    def plot(self, step: int) -> None:
        """"""
        plt.pcolor(self.omega_sol.y[:,step].reshape((self.n,self.n)))
        plt.show()

    def create_animation(self, tspan):
        """Create animation of the system as time evolved."""
        animation.writer = animation.writers['ffmpeg']

        plt.ioff()
        fig = plt.figure()
        ax = fig.add_subplot(111)

        # write the update function, specifically including the ax.clear() function this was important.
        def update(i):
            ax.clear()
            ax.pcolor(self.omega_sol.y[:,i].reshape((self.n, self.n)), cmap='viridis')
            ax.set_title("Solution to Vorticity Stream Function")
            return ax

        self.ani = animation.FuncAnimation(fig, update, frames=range(len(tspan)), interval=100)

        self._save_animation()

    def _save_animation(self) -> None:
        # Save gif
        file_name = incriment_file("vorticity_stream_function.gif", "visuals")
        self.ani.save(file_name, writer='pillow')

        # Save as MP4
        file_name_mp4 = incriment_file("vorticity_stream_function.mp4", "visuals")
        writer = animation.writers['ffmpeg']
        writer = writer(fps=10, metadata=dict(artist='Hunter Lybbert'), bitrate=1800)
        self.ani.save(file_name_mp4, writer=writer)

In [34]:
def solve_vorticity_streamfunction(
    plot: bool = True,
    animate: bool = True,
) -> None:
    """
    Solve the system

    :returns: None
    """
    # setup...
    L = 10
    n = 128
    nu = 0.001  # as given in the assignment
    tspan = np.arange(0, 10.25, .25)

    VSFSolver = VorticityStreamFunctionSolver(nu=nu, L=L, n=n)
    result = VSFSolver.solve(tspan=tspan)

    if plot or animate:
        VSFSolver.create_animation(tspan)

    return VSFSolver

solver = solve_vorticity_streamfunction()

In [21]:
HTML(solver.ani.to_html5_video())

In [11]:
A1 = None
A2 = None
A3 = None

In [12]:
def get_linear_system_solve_callable(
    solve_method: LinearSystemSolveMethods
) -> np.array:
    """
    solve_method
    """
    if solve_method.value == LinearSystemSolveMethods.fourier.value:
        return fourier_solve

    else:
        raise NotImplementedError(f"We have not yet implimented the {solve_method.value}.")