In [None]:
from joblib import Parallel, delayed
import matplotlib.pyplot as plt
from multiprocessing import cpu_count
import numpy as np
import os
from multiscale_complexity import calc_complexity
from moran_i import parallel_moran_i

In [None]:
class Oregonator():

    def __init__(self, u, v, f, q, epsilon, Du, Dv, dx, dt, n, timesteps, frames, d,  n_jobs, random_spots=False, nr_spots=10, spirals=False, spiral_locations=None, save=False, save_file_path='results'):
        """
        Initialize Oregonator simulation

        Parameters:
            u : float, initial concentration of u. Unused in final simulations
            v : float, initial concentration v. Unused in final simulations
            f : float, rate constant ratio of activator (u) reactions.
            q : float, rate constant ratio of inhibitor (v) reactions.
            epsilon : float, scaling parameter influencing relative u/v reaction rates
            Du : float, diffusion coefficient for u.
            Dv : float, diffusion coefficient for v.
            dx : float, spatial resolution or grid spacing.
            dt : float, time step size.
            n : int, size of the grid (number of grid points along one dimension).
            timesteps : int, number of time steps to simulate.
            frames : int, number of frames to save during the simulation.
            d : int, number of dimensions to simulate, must be 1 or 2.
            n_jobs : int, number of parallel jobs to use for simulation. In practice not useful for small grid sizes, so the recommended value is 1.

        Keyword Arguments:
            random_spots : bool, optional, flag indicating whether to generate random spots on the grid. Defaults to False.
            nr_spots : int, optional,  number of random spots to generate. Only applicable when random_spots is True. Defaults to 10.
            spirals : bool, optional, flag indicating whether to initiate spirals on the grid. Defaults to False.
            spiral_locations : ndarray, optional, list of (i, j, sign) tuples signifying the locations of the spiral sources. Only applicable when spirals is True.
            save : bool, optional, flag indicating whether to save simulation results. Defaults to False.
            save_file_path : str, optional, file path to save the simulation results, path is created if it doesnt exist. 
                                            Only applicable when save is True. Defaults to 'results'. 
                                                   
        """

        np.random.seed(42)
        if not (1 <= d <= 2):
            raise ValueError(f"{d} is out of bounds for d. d must be 1 or 2.")
        self.d = d
        self.u = u
        self.v = v
        self.f = f
        self.q = q
        self.epsilon = epsilon
        self.Du = Du
        self.Dv = Dv
        self.n = n
        self.save = save
        self.dx = dx
        self.dx_sq = self.dx**2
        self.timesteps = timesteps
        self.frames = frames
        self.frame_skips = self.timesteps // frames
        self.random_spots = random_spots
        self.nr_spots = nr_spots
        self.spiral = spirals

        self.save_file_path = save_file_path
        self.n_jobs = n_jobs
        self.dtau = dt
        
        # Initial conditionis for 1d simulation. Unused in final simulations
        if d == 1:
            self.uv_grid = np.zeros((timesteps, n, 2), dtype=np.float128)
            self.uv_grid[0,:,0] = u
            self.uv_grid[0,:,1] = v
            self.uv_grid[0,0,0] = 0.5        
        else:
            self.uv_grid = np.zeros((timesteps, n, n, 2))

            if not self.spiral and not random_spots:
                self.uv_grid[0,n//2,n//2,0] = 0.1
                self.uv_grid[0,n//2,n//2,1] = 0.1

            if random_spots:
                for _ in range(nr_spots):
                    row = np.random.randint(0, n)
                    col = np.random.randint(0, n)
                    self.uv_grid[0,row,col,0] = np.random.uniform(0.01, 0.1)
                    self.uv_grid[0,row,col,1] = np.random.uniform(0.01, 0.1)

        if spirals:
            self.spiral_locations = [(n//2, n//2, 1)]
            self.spiral_locations = spiral_locations
            self.barriers = self.place_spirals(self.spiral_locations)

        # Function to call differs for 1d or 2d situations    
        run_funcs = {1: self.run_1d, 2: self.run_2d}
        run_funcs[d]()

    def place_spirals(self, spiral_locations):
        """
        Place spiral centers by adding creating artificial barriers on the grid. For its overengineering not used in final presentation.

        Parameters:
            spiral_locations : list of (i, j, sign) tuples signifying spiral locations and directions
        
        Returns:
            barriers : list of grid coordinate tuples (i_min, j_min, i_max, j_max) representing a barrier rectangle
        """
        width = 40
        barriers = []

        # initiate spiral center concentrations and place barriers of fixed size slightly above or below spiral centers
        for i, j, direction_sign in spiral_locations:
            self.uv_grid[0,i,j,0] = 0.1
            self.uv_grid[0,i,j,1] = 0.1
            j_min, j_max = j - width//2, j + width//2 

            # Sign determines whether barrier is placed above r below spiral center
            i_bounds = [i + 3*direction_sign, i + direction_sign*(width+3)]
            i_min, i_max = sorted(i_bounds)
            barriers.append((i_min, j_min, i_max, j_max))
        return barriers

    def check_if_barrier(self, coordinate):
        """
        Check if coordinate is within one of the boundaries.

        Parameters:
            coordinate : tuple of ints
        
        Returns:
            Boolean signifying whether the coordinate is part of a boundary

        """
        i, j = coordinate
        for i_min, j_min, i_max, j_max in self.barriers:
            if i_min < i < i_max and j_min < j < j_max:
                return True
        return False


    def run_1d(self):
        """
        Function that runs a 1d oregonator simulation. Has been created for testing purposes. Function might be outdated, proper functioning is not guaranteed.
        """
        for t in range(self.timesteps-1):            
            for i in range(self.n):                
                uv = self.uv_grid
                u = uv[t, i, 0]
                v = uv[t, i, 1]
                u_grid = uv[t, :, 0]
                v_grid = uv[t, :, 1]
                
                # Compute diffusion terms and enforce no flux boundary conditions where diffusion must be 0
                if i == 0 or i == (self.n - 1):
                    Diff_u = 0
                    Diff_v = 0
                else:
                    # Explicit Euler finite difference approximation
                    Diff_u = (self.Du/self.dx_sq)*(u_grid[(i+1)%self.n] + u_grid[(i-1)%self.n] - 2*u)
                    Diff_v = (self.Dv/self.dx_sq)*(v_grid[(i+1)%self.n]+ v_grid[(i-1)%self.n] - 2*v)             

                uv[t+1, i, 0] = self.dtau*((1/self.epsilon)*(u*(1-u) - (self.f*v*(u-self.q)/(u+self.q))) + Diff_u) + u
                uv[t+1, i, 1] = self.dtau*(u - v + Diff_v) + v
        
    def compute_next_grid(self, t, prev_grid, index_range):
        """
        Fuction that computes concentrations at next point in time.

        Parameters:
            t : it, current time step being computed.
            prev_grid : ndarray of concentrations of shape (self.n, self.n, 2).
            index_range : tuple of column indices to co signifying the range of columns to compute by the worker process.
        
            Returns:
                subgrid : ndarray with concentrations of next point in time of subgrid
        """        
        min_j, max_j = index_range
        n_cols = max_j - min_j
        sub_grid = np.zeros((self.n, n_cols, 2))
        for i in range(self.n):
                for new_j in range(n_cols):
                    j = new_j + min_j

                    if self.spiral and t < 300 and self.check_if_barrier((i, j)):
                        sub_grid[i, new_j, 0] = 0
                        sub_grid[i, new_j, 1] = 0
                    else:
                        u = prev_grid[i, j, 0]
                        v = prev_grid[i, j, 1]

                        # Compute diffusion terms and enforce no flux boundary conditions where diffusion must be 0
                        if i == 0 or i == (self.n - 1) or j == 0 or j == (self.n - 1):
                            Diff_u = 0
                            Diff_v = 0
                        else:
                            # Explicit Euler finite difference approximation
                            Diff_u = (self.Du/self.dx_sq)*(prev_grid[(i+1)%self.n][j][0]+ prev_grid[(i-1)%self.n][j][0]+ prev_grid[i][(j+1)%self.n][0] + prev_grid[i][(j-1)%self.n][0] - 4*u)
                            Diff_v = (self.Dv/self.dx_sq)*(prev_grid[(i+1)%self.n][j][1]+ prev_grid[(i-1)%self.n][j][1]+ prev_grid[i][(j+1)%self.n][1] + prev_grid[i][(j-1)%self.n][1] - 4*v)
                        
                        # update grid
                        sub_grid[i, new_j, 0] = self.dtau*((1/self.epsilon)*(u*(1-u) - (self.f*v*(u-self.q)/(u+self.q))) + Diff_u) + u
                        sub_grid[i, new_j, 1] = self.dtau*(u - v + Diff_v) + v      
        return sub_grid


    def compute_columns_ranges(self, n, n_jobs):
        """
        Computes the range of columns per parallel job.

        Parameters:
            n : int, size of the grid (number of grid points along one dimension).
            n_columns : amount of jobs that is to be scheduled.

        Returns:
            column_range : tuple of (j_min, j_max) values.
        """ 
        column_ranges = []
        cols_per_subgrid = n // n_jobs
        
        # Assign (n_jobs - 1) jobs equal amount of columns and assign the final job the remaining ones in case of non-integer division.
        for i in range(n_jobs-1):
            column_ranges.append((i*cols_per_subgrid, (i+1)*cols_per_subgrid))
        column_ranges.append(((n_jobs-1)*cols_per_subgrid, n))
        return column_ranges

    def run_2d(self):
        """Function that runs a 2d simulation with parallel computing if model is initialized with n_jobs > 1."""

        # Schedule jobs that each compute a subgrid of the complete grid.
        column_ranges = self.compute_columns_ranges(self.n, self.n_jobs)
        for t in range(self.timesteps-1):            
            sub_grids = Parallel(n_jobs=self.n_jobs)(delayed(self.compute_next_grid)(t, self.uv_grid[t], column_range)
                                                                                     for column_range in column_ranges)
            # Combine sub grids
            self.uv_grid[t+1] = np.hstack(sub_grids)
            
        if self.save:
            self.save_results()

    def save_results(self):
        """
        Function that saves result. Saving involves computing certain complexity metrics and saving the number of frames as provided in initiation and saving the grid.
        For large grids the moran's I metric might be computationally costly.        
        """

        # Create file path non existent.
        output_dir = self.save_file_path
        if not os.path.exists(output_dir):
            os.makedirs(output_dir)

        complexities = []
        moran_is = []

        # Save frames
        for t, grid in enumerate(self.uv_grid[::int(self.frame_skips),:,:,0]):
            fig = plt.figure(layout='constrained', figsize=(16, 8))
            fig.suptitle(rf'$\epsilon = {self.epsilon}$, $f={self.f}$, $q={self.q}$, $D_u={self.Du}$, $D_v={self.Dv}$', fontsize='xx-large')
            subfigs = fig.subfigures(1, 2, wspace=0.07)
            left_ax_1, left_ax_2 = subfigs[0].subplots(2, 1, sharex=True)
            right_ax = subfigs[1].subplots()

            # Compute complexities
            complexities.append(calc_complexity(grid))
            moran_is.append(parallel_moran_i(grid, n_jobs=3, weights="inverse_dist_128x128_r5.npz"))

            # Plot settings
            left_ax_1.plot(complexities)
            left_ax_1.set_xlim(0, self.frames)
            left_ax_1.set_ylabel("Multiscale Complexity",fontsize=20)
            left_ax_2.plot(moran_is)
            left_ax_2.set_xlim(0, self.frames)
            left_ax_2.set_xlabel(f"Iterations (N/{self.frame_skips})",fontsize=20)
            left_ax_2.set_ylabel('Moran\'s I Complexity',fontsize=20)
            right_ax.imshow(grid, cmap=plt.cm.jet, vmin=0, vmax=1)
            plt.savefig(f'{output_dir}/frame_{self.frame_skips*t}.png')

        # Save grid in passed or created folder
        self.save_grid(output_dir)
    
    def save_grid(self, path):
        """Save numpy array in folder passed as argument."""
        np.save(f'{path}/grid.npy', self.uv_grid)


In [None]:
# Settings for random spots
u = None
v = None
epsilon = 0.04
Du = 0.9
Dv = 0.5
f = 0.9
q = 0.01
dx = 1
dt = 1/100
n = 128
timesteps = 101
frames = 10
d = 2 # 1d or 2d
ogn = Oregonator(u, v, f, q, epsilon, Du, Dv, dx, dt, n, timesteps, frames, d, n_jobs=1, random_spots=True, nr_spots=10, save=True, save_file_path='results')

In [None]:
# settings for double attempted spiral
epsilon = 0.04
Du = 0.9
Dv = 0.5
f = 0.9
q = 0.01
dx = 1
dt = 1/100

n = 128
timesteps = 101
frames = 10
d = 2 # 1d or 2d

ogn = Oregonator(u, v, f, q, epsilon, Du, Dv, dx, dt, n, timesteps, frames, d, n_jobs=1, spirals=True, spiral_locations=[(n//4, 3*(n//4), 1), (3*(n//4), n//4, -1)], save=True, save_file_path='results')

In [None]:
# Settings for single wave center
epsilon = 0.04
Du = 0.9
Dv = 0.5
f = 0.9
q = 0.01
dx = 1
dt = 1/100

n = 128
timesteps = 101
frames = 10
d = 2 # 1d or 2d
ogn = Oregonator(u, v, f, q, epsilon, Du, Dv, dx, dt, n, timesteps, frames, d, n_jobs=1, save=True, save_file_path='results')

In [None]:
# Settings for a single attempted spiral
epsilon = 0.04
Du = 0.9
Dv = 0.5
f = 0.9
q = 0.01
dx = 1
dt = 1/100
n = 128
timesteps = 101
frames = 10
d = 2 # 1d or 2d
ogn = Oregonator(u, v, f, q, epsilon, Du, Dv, dx, dt, n, timesteps, frames, d, n_jobs=1, spirals=True, spiral_locations=[(n//2, n//2, 1)], save=True, save_file_path='results')