<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"><li><span><a href="#Utility-Scripts" data-toc-modified-id="Utility-Scripts-1"><span class="toc-item-num">1&nbsp;&nbsp;</span>Utility Scripts</a></span></li><li><span><a href="#Modeling-rationale" data-toc-modified-id="Modeling-rationale-2"><span class="toc-item-num">2&nbsp;&nbsp;</span>Modeling rationale</a></span><ul class="toc-item"><li><span><a href="#The-internal-clock" data-toc-modified-id="The-internal-clock-2.1"><span class="toc-item-num">2.1&nbsp;&nbsp;</span>The internal clock</a></span></li><li><span><a href="#Synchronization-process" data-toc-modified-id="Synchronization-process-2.2"><span class="toc-item-num">2.2&nbsp;&nbsp;</span>Synchronization process</a></span></li></ul></li><li><span><a href="#Cellular-Automata-modeling" data-toc-modified-id="Cellular-Automata-modeling-3"><span class="toc-item-num">3&nbsp;&nbsp;</span>Cellular Automata modeling</a></span><ul class="toc-item"><li><span><a href="#Parameters-of-the-simulation" data-toc-modified-id="Parameters-of-the-simulation-3.1"><span class="toc-item-num">3.1&nbsp;&nbsp;</span>Parameters of the simulation</a></span></li><li><span><a href="#Simulation-script" data-toc-modified-id="Simulation-script-3.2"><span class="toc-item-num">3.2&nbsp;&nbsp;</span>Simulation script</a></span></li></ul></li></ul></div>

In [1]:
# before everything, import these libraries (Shift-Enter)
import numpy as np
import xarray as xr
import plotly.graph_objects as go
import ipywidgets as widgets
from fractions import Fraction
from scipy.ndimage.filters import generic_filter
from IPython.lib.display import YouTubeVideo
from dataclasses import dataclass

# II. Coding Projects > Fireflies synchronization


<hr>
&nbsp;

In [12]:
YouTubeVideo('https://www.youtube.com/watch?v=d77GdblhvEo',width=560,height=315)

## Utility Scripts

In [3]:
@dataclass
class Slider:
    """Represent a range of (linear) values as both:
    - an np.array 
    - an ipywidget.Floatslider
    """
    name: str
    start: float
    stop: float
    step: float
    val_ini: float = None

    def __post_init__(self):
        self.val = nparange(self.start, self.stop, self.step)
        if not self.val_ini:
            self.val_ini = np.random.choice(self.val, 1)[0]
        
        self.slider = widgets.FloatSlider(min=self.start,
                                        max=self.stop,
                                        step=self.step,
                                        value=self.val_ini,
                                        description=self.name,
                                        continuous_update=True)


@dataclass
class logSlider:
    """Represent a range of log values as both:
    - an np.array 
    - an ipywidget.FloatLogSlider
    """
    name: str
    start: float
    stop: float
    num: int
    val_ini: float = None
    base: int = 10
    decimals: int = 1

    def __post_init__(self):
        # create numpy array of all values
        self.val = np.around(np.logspace(start=self.start,
                                        stop=self.stop,
                                        num=self.num, 
                                        endpoint=True), self.decimals)
        # check each value is unique
        if self.val.size != np.unique(self.val, return_counts=False).size:
            print(f"WARNING: Repeated values in {self.name}.val"
                ", increase 'decimals' or reduce 'num'")

        # pick initial value if not provided
        if not self.val_ini:
            self.val_ini = np.random.choice(self.val, 1)[0]

        # convert num into step for FloatLogSlider
        step = (self.stop - self.start)/(self.num-1)

        # create slider
        self.slider = widgets.FloatLogSlider(min=self.start,
                                    max=self.stop,
                                    step=step,
                                    value=self.val_ini,
                                    base=self.base,
                                    description=self.name,
                                    readout_format=f'.{self.decimals}f')


def nparange(start, stop, step):
    """Modified np.arange()
        - improve float precision (by use of fractions)
        - includes endpoint

    Args:
        start, stop, step: float (stop is included in array)

    Returns:
        ndarray
    """
    delta, zoom  = get_frac(step)
    
    return np.arange(start * zoom, stop * zoom + delta, delta) / zoom


def get_frac(step, readout_format='.16f', atol=1e-12):
    precision = "{:" + readout_format + "}" 
    frac = Fraction(precision.format(step))
    if frac.denominator > 1/atol:
        print("WARNING: potential Floats inconsistencies due to 'step'"
            " being an irrational number")
    return (frac.numerator, frac.denominator)


gist_heat = [[0.0, "rgb(0,0,0)"],
                [0.1111111111111111, "rgb(42,0,0)"],
                [0.2222222222222222, "rgb(85,0,0)"],
                [0.3333333333333333, "rgb(128,0,0)"],
                [0.4444444444444444, "rgb(170,0,0)"],
                [0.5555555555555556, "rgb(212,28,0)"],
                [0.6666666666666666, "rgb(255,85,0)"],
                [0.7777777777777778, "rgb(255,142,28)"],
                [0.8888888888888888, "rgb(255,198,142)"],
                [1.0, "rgb(255,255,255)"]]

## Modeling rationale

### The internal clock

- Each firefly has its own individual internal clock (or phase) $\theta$
- $\theta$ varies between 0 and 1
- $\theta$ has a period T
- Every time the clock reachs 1 (every T times), the firefly flashes 
- After flashing, the clock is reset to 0

$$\theta(t+1) = \theta(t) + \frac{1}{T} \text{ (mod 1)}$$
$$\Rightarrow  \theta(t) = \frac{t}{T} \text{ (mod 1)}$$

In [4]:
# Let's visualize what we just said

# set the variable
t1 = np.linspace(0, 10, 1000)  # time array

# Set a slider to check the influence of the resetting strength
T1 = Slider(name='period T', start=1, stop=10, step=1)

# Set an xarray for all (time, amplitude) value combinations
f1= lambda t, T: np.mod(t/T, 1)
tt1, TT = np.meshgrid(t1, T1.val)
y1 = xr.DataArray(f1(tt1, TT),
            dims=['T', 't'],
            coords={'T': T1.val, 't': t1})


# Set the graph
trace0 = go.Scatter(x=t1, 
                    y=y1.sel(T=T1.val_ini).values)
fig1 = go.FigureWidget([trace0])
fig1.update_layout(template='none',
                    width=800, height=500,
                    title="Flashes of a single firefly",
                    title_x=0.5,
                    xaxis_title="t",
                    yaxis_title='θ')


# Set the callback to update the graph 
def update1(change):
    fig1.data[0].y = y1.sel(T=change['new']).values


# Link the slider and the callback
T1.slider.observe(update1, names='value')

# Display
display(widgets.VBox([T1.slider, fig1]))

VBox(children=(FloatSlider(value=9.0, description='period T', max=10.0, min=1.0, step=1.0), FigureWidget({
   …

### Synchronization process

- When a firefly flashes, it influences its neighbours
- The neighbours slows down or speeds up so as to flash more nearly in
phase on the next cycle
- A simple model satisfying this hypothesis is:

$$$$

In [5]:
# set the variable
t2 = np.linspace(0,1,1000)  # time array
T2 = 200                    # the period

# Set a slider to check the influence of the resetting strength
A = Slider(name='Amplitude', start=0, stop=0.22, step=0.01)
A.slider.description_tooltip = "Amplitude of the resetting strength\n "\
                                "(it measures the firefly’s ability to "\
                                "modify its instantaneous frequency)"

# Set an xarray for all (time, amplitude) value combinations
# Option 1: using modulus
f2= lambda t, A: np.mod(t + 1/T2 + A*np.sin(2*np.pi*(1.005-t)), 1)
tt2, AA = np.meshgrid(t2, A.val)
y2 = xr.DataArray(f2(tt2, AA),
            dims=['Amplitude', 't'],
            coords={'Amplitude': A.val, 't': t2})

# Option 2: using a band pass filter (between [0,1])
f2bis= lambda t, A: np.clip(t + 1/T2 + A*np.sin(2*np.pi*(1.005-t)), 0, 1)
y2bis = xr.DataArray(f2bis(tt2, AA),
            dims=['Amplitude', 't'],
            coords={'Amplitude': A.val, 't': t2})


# Set the graph
trace0 = go.Scatter(x=t2, 
                    y=y2.sel(Amplitude=A.val_ini).values,
                    name='with coupling')
trace1 = go.Scatter(x=t2,
                    y=np.mod(t2+(1/T2),1),
                    name='no coupling')
trace2 = go.Scatter(x=t2,
                    y=y2bis.sel(Amplitude=A.val_ini).values,
                    name='with coupling (band-pass)',
                    visible='legendonly')
fig2 = go.FigureWidget([trace0, trace1, trace2])
fig2.update_layout(template='none',
                    width=800, height=500,
                    title="Influence of the resetting strength",
                    title_x=0.5,
                    xaxis_title="θ<sub>t<sub>",
                    yaxis_title="θ<sub>t+1<sub>",
                    legend_title='Click to deselect', 
                    legend_title_font=dict(size=16),
                    legend_title_font_color='FireBrick')


# Set the callback to update the graph 
def update2(change):
    with fig2.batch_update():
        fig2.data[0].y = y2.sel(Amplitude=change['new']).values
        fig2.data[2].y = y2bis.sel(Amplitude=change['new']).values


# Link the slider and the callback
A.slider.observe(update2, names='value')


# Display
display(widgets.VBox([A.slider, fig2]))

VBox(children=(FloatSlider(value=0.02, description='Amplitude', description_tooltip='Amplitude of the resettin…

## Cellular Automata modeling

### Parameters of the simulation

In [11]:
@dataclass
class Parameters:
    """This class contains the parameters of the simulation.
    """

    # Size of the grid
    M: int = 10            # number of cells in the x direction
    N: int = 10            # number of cells in the y direction
    fireflies_density = 1  # how much of the grid is populated

    # Coupling parameters
    coupling_value: float = 0.1    # float (between 0 and +/- 0.3)
    neighbor_distance: float = 3  # Size of neighbourhood radius

    # Simulation settings
    nb_periods: int = 20
    time_step_per_period: int = 100  # dt = 1/time_step_per_period

In [7]:
def fireflies_simulator(par):
    """This function simulates the fireflies system, computes the order
    parameter and shows figures of the system state during the simulation.

    Args:
        - Parameters

    Returns:
        The value of the order parameter over time
    """
    time_steps = np.arange(start=0,
                            stop=par.nb_periods * par.time_step_per_period,
                            step=1)
    phases = np.zeros((time_steps.size+1, par.N, par.M))  # 1 grid (MxN) per time step
    phases[0] = np.random.random((par.N, par.M))          # random initial state

    # empty some cells to ensure the right fireflies density
    nb_empty_cell = np.around((1-par.fireflies_density)*par.N*par.M, 0).astype(int)
    ind = np.random.choice(phases[0].size, size=nb_empty_cell, replace=False)
    phases[0].ravel()[ind] = np.nan

    neighbors = compute_neighborhood(phases[0], par.N, par.M, par.neighbor_distance)
    phase_increment = 1 / par.time_step_per_period

    for t in time_steps:
        phases[t+1] = phases[t] + phase_increment
        glow_idx = np.array(np.nonzero(phases[t+1]>1)).T
        ids = [np.ravel_multi_index(tup, (par.N, par.M)) for tup in glow_idx]
        for ind in ids:
            i,j = np.unravel_index(ind, (par.N, par.M))
            phases[t+1][neighbors[ind]] = nudge(phases[t+1][neighbors[ind]], 
                                                phases[t+1][i,j],
                                                par.coupling_value)

    return  phases


def compute_neighborhood(grid, N, M, r):
    """For every fireflies, compute a mask array of neighboring fireflies.

    Args:
        grid (ndarray): phase state of each firefly
        N (int): number of cells in the x direction
        M (int): number of cells in the y direction
        r (float): radius of neighbour_distance

    Returns:
        dict: keys: index (ravel) of each firefly
                values: mask array with neighbour fireflies
    """
    neighbors = dict.fromkeys((range(N*M)), [])
    occupied_cells = ~np.isnan(grid)
    for i in range(N):
        for j in range(M):
            if np.isnan(grid[i,j]):
                continue
            y, x = np.ogrid[-i:N-i, -j:M-j]
            mask = (x**2 + y**2 <= r**2)*occupied_cells  # only keep occupied neighboring cells
            # mask[i,j] = False                            # don't include the cell istelf
            neighbors[np.ravel_multi_index((i,j), (N,M))] = mask
    return neighbors


def nudge(neighbor_phases, flash_phase, amplitude):
    """Nudge the neighboring fireflies.

    Args:
        neighbor_phases (ndarray): phases of all the fireflies to nudge
        flash_phase (float): phase of the flashing firefly
        amplitude (float): resetting strength / coupling value
    """
    phase_diff = flash_phase - neighbor_phases
    res = neighbor_phases + amplitude*np.sin(2*np.pi*(phase_diff))
    return np.mod(res, 1)  # np.clip(res, 0, 1) for a band-pass filter

In [8]:
# Let's initiate the (default) parameters for a simulation
settings = Parameters(N=20, M=20, coupling_value=0.01)

# and print it
settings

Parameters(M=20, N=20, coupling_value=0.01, neighbor_distance=3, nb_periods=20, time_step_per_period=100)

In [9]:
# And let's run the simulation
heatmaps = fireflies_simulator(settings)

In [10]:
mymin, mymax, mystep = 0, heatmaps.shape[0]-1, 10

play = widgets.Play(
    value=0,
    min=mymin,
    max=mymax,
    step=mystep,
    interval=100,
    description="Press play",
)

# Set the figure
fig5 = go.FigureWidget(
    data=go.Heatmap(z=heatmaps[0], colorscale='Hot', reversescale=True),
    layout=go.Layout(title="Fireflies Simulator"))
fig5.data[0].update(zmin=0, zmax=1)


# Set the callback to update the graph 
def update5(change):
    fig5.data[0].z = heatmaps[change['new']]


# Link the slider and the callback
slider = widgets.IntSlider(min=mymin, max=mymax, step=mystep)
widgets.jslink((play, 'value'), (slider, 'value'))
slider.observe(update5, names='value')


# Display
controllers = widgets.HBox([play, slider])
display(widgets.VBox([controllers, fig5]))
# controllers

VBox(children=(HBox(children=(Play(value=0, description='Press play', max=2000, step=10), IntSlider(value=0, m…

## References

Ermentrout, G. B., & Rinzel, J. O. H. N. (1984). Beyond a pacemaker's entrainment limit: phase walk-through. American Journal of Physiology-Regulatory, Integrative and Comparative Physiology, 246(1), R102-R106.

Ermentrout, G. B., & Edelstein-Keshet, L. (1993). Cellular automata approaches to biological modeling. Journal of theoretical Biology, 160(1), 97-133.

Strogatz, S. H. (2018). Nonlinear dynamics and chaos with student solutions manual: With applications to physics, biology, chemistry, and engineering. CRC press. pp 105-106

Pikovsky, A., Kurths, J., Rosenblum, M., & Kurths, J. (2003). Synchronization: a universal concept in nonlinear sciences (No. 12). Cambridge university press. pp61-612