# Better automated diffusive system

In [1]:
%matplotlib widget

**Library Import**

In [2]:
import numpy as np
import matplotlib.pyplot as plt
import scipy
import scipy.integrate
from tqdm.notebook import tqdm
import crank_nicolson_numba.generic as cn
import itertools
# For parallelization
from joblib import Parallel, delayed

import nekhoroshev_tools as nt
import poly_tools as pt
import expo_tools as et

In [3]:
def perform_experiment(engine, movement_list, reset=True, immovable=False):
    if reset:
        engine.reset()
    data_list = []
    absolute_time = 0.0
    for move in tqdm(movement_list):
        data = {}

        if not immovable:
            if move["movement"] == "forward":
                data["I_max_before"] = engine.I_max
                data["I_max_low"] = engine.I_max
                engine.move_barrier_forward(move["amount"])
                data["I_max_after"] = engine.I_max
                data["I_max_high"] = engine.I_max
            elif move["movement"] == "backward":
                data["I_max_before"] = engine.I_max
                data["I_max_high"] = engine.I_max
                engine.move_barrier_backward(move["amount"])
                data["I_max_after"] = engine.I_max
                data["I_max_low"] = engine.I_max
            else:
                data["I_max_before"] = engine.I_max
                data["I_max_high"] = engine.I_max
                data["I_max_after"] = engine.I_max
                data["I_max_low"] = engine.I_max
        else:
            data["I_max_before"] = engine.I_max
            data["I_max_high"] = engine.I_max
            data["I_max_after"] = engine.I_max
            data["I_max_low"] = engine.I_max

        time, current = engine.current(move["samples"], move["it_per_sample"])

        data["t_absolute"] = time
        data["t_relative"] = time - absolute_time
        absolute_time = time[-1]

        data["current"] = current

        data_list.append(data)
    return data_list

In [20]:
def rho_0(I, damping_position=np.nan, l=np.nan):
    if np.isnan(damping_position) or np.isnan(l):
        return np.exp(-I)
    else:
        return np.exp(-I) / (1 + np.exp((I - damping_position)/l))

In [21]:
I_damping = 2.95
I_max = 3.0
I_star = 8.0
k = 0.33
exponent = 1/(2*k)

c = nt.standard_c(0.0, I_max, I_star, exponent)

In [162]:
plt.figure()

I = np.linspace(2.5, 3.0, 100, endpoint=False)
plt.plot(I, [nt.afpt(i, 3.0, I_star, exponent, c) for i in I], label="3.0")

I = np.linspace(2.5, 3.5, 100, endpoint=False)
plt.plot(I, [nt.afpt(i, 3.5, I_star, exponent, c) for i in I], label="3.5")

#plt.yscale("log")

print(nt.afpt(3.8, 3.85, I_star, exponent, c))
print(nt.afpt(3.5, 3.55, I_star, exponent, c))
print(nt.afpt(3.3, 3.35, I_star, exponent, c))
print(nt.current_peak_time(I_damping, I_max, I_star, exponent, c))
print(nt.afpt(I_damping, I_max, I_star, exponent, c))

plt.xlabel("$I$")
plt.ylabel("Average First Passage Time")
plt.legend(title="$I_a$ position")

Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …

0.004806933965221642
0.009920566757656485
0.017764192218942033
0.0001784420294643284
0.06377387516741567


<matplotlib.legend.Legend at 0x7f8308456850>

In [40]:
step_size = 0.05

it_per_sample = 10
samples = 5000
iterations = 10

n_macro_iterations = 5

In [41]:
cn_sampling = 5000
I_list, dI = np.linspace(0.0, I_max, cn_sampling, retstep=True)

cn_time_steps = 100
dt = nt.afpt(I_damping + (step_size * iterations), I_max + (step_size * iterations), I_star, exponent, c) / cn_time_steps

In [42]:
macro_iteration = [
    {"label": i, "movement": "backward", "amount": step_size, "samples": samples, "it_per_sample": it_per_sample} for i in range(iterations)
] + [
    {"label": i, "movement": "forward", "amount": step_size, "samples": samples, "it_per_sample": it_per_sample} for i in range(iterations)
]

movement_list = [
    {"movement": "still", "samples": samples, "it_per_sample": it_per_sample},
    {"movement": "still", "samples": samples, "it_per_sample": it_per_sample}
] + macro_iteration * n_macro_iterations

In [43]:
engine = cn.cn_generic(0, I_max, rho_0(I_list, I_damping, dI*5), dt, lambda x: nt.D(x, I_star, exponent, c, True))
data_0 = engine.get_data_with_x()

  return c * np.exp(-2*np.power(I_star/I, exponent)) * (0.5 if halved else 1.0)


In [44]:
immovable_data = perform_experiment(engine, movement_list, reset=True, immovable=True)

  0%|          | 0/102 [00:00<?, ?it/s]

In [45]:
experiment_data = perform_experiment(engine, movement_list, reset=True, immovable=False)

  0%|          | 0/102 [00:00<?, ?it/s]

In [46]:
for i, d in enumerate(experiment_data):
    d["normalized_current"] = d["current"]/immovable_data[i]["current"]

In [114]:
fig, ax1 = plt.subplots()
ax2 = ax1.twinx()

for i, d in enumerate(experiment_data):
    if i != 0:
        ax2.plot(
            [d["t_absolute"][0], d["t_absolute"][0], d["t_absolute"][-1]],
            [d["I_max_before"], d["I_max_after"], d["I_max_after"]],
            c="grey"
        )
        ax1.plot(d["t_absolute"], d["current"], c="C1")
    else:
        ax2.plot(
        [d["t_absolute"][0], d["t_absolute"][0], d["t_absolute"][-1]],
        [d["I_max_before"], d["I_max_after"], d["I_max_after"]],
        c="grey", label="Barrier position"
        )
        ax1.plot(d["t_absolute"], d["current"], c="C1", label="Current with moving barrier")

    
for i, d in enumerate(immovable_data):
    if i == 0:
        ax1.plot(d["t_absolute"], d["current"], "--", c="C0", label="Regular current")
    else:    
        ax1.plot(d["t_absolute"], d["current"], "--", c="C0")

ax1.set_yscale("log")

ax1.set_xlabel("$t$")
ax1.set_ylabel("Current")
ax2.set_ylabel("Barrier position $ I\\ [\sigma]$")
ax1_legend = ax1.legend()
ax2.legend()
ax1_legend.remove()
ax2.add_artist(ax1_legend)

Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …

<matplotlib.legend.Legend at 0x7f830322f7f0>

In [115]:
fig, ax1 = plt.subplots()
ax2 = ax1.twinx()

for i, d in enumerate(experiment_data):
    if i != 0:
        ax2.plot(
            [d["t_absolute"][0], d["t_absolute"][0], d["t_absolute"][-1]],
            [d["I_max_before"], d["I_max_after"], d["I_max_after"]],
            c="grey"
        )
        ax1.plot(d["t_absolute"], d["current"]/immovable_data[i]["current"], c="C0")
    else:
        ax2.plot(
            [d["t_absolute"][0], d["t_absolute"][0], d["t_absolute"][-1]],
            [d["I_max_before"], d["I_max_after"], d["I_max_after"]],
            c="grey", label="Barrier position"
        )
        ax1.plot(d["t_absolute"], d["current"]/immovable_data[i]["current"], c="C0", label="Normalized current")
    
#for d in immovable_data:
#    ax1.plot(d["t_absolute"], d["current"], "--", c="C0")

#ax1.set_yscale("log")

ax1.set_xlabel("$t$")
ax1.set_ylabel("Normalized current")
ax2.set_ylabel("Barrier position $ I\\ [\sigma]$")
ax1_legend = ax1.legend()
ax2.legend()
ax1_legend.remove()
ax2.add_artist(ax1_legend)

Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …

<matplotlib.legend.Legend at 0x7f830125c400>

In [132]:
plt.figure()

for i, d in enumerate(experiment_data):
    if d["I_max_before"] > d["I_max_after"]:
        if i < iterations * 2 + 2:
            plt.plot(d["t_relative"], d["normalized_current"], c="C{}".format(movement_list[i]["label"]), label="{:.2f} - {:.2f}".format(d["I_max_before"], d["I_max_after"]))
        else:
            plt.plot(d["t_relative"], d["normalized_current"], c="C{}".format(movement_list[i]["label"]))
        
        plt.axvline(nt.afpt(d["I_max_low"], d["I_max_high"], I_star, exponent, c), c="C{}".format(movement_list[i]["label"]))

plt.axhline(1.03, c="grey")        
plt.axhline(1.035, c="grey")        
plt.axhline(1.04, c="grey")

plt.xscale("log")
plt.legend(ncol=2, fontsize="x-small", title="Collimator step (from - to)\nVertical lines are $\\langle t \\rangle$")
plt.xlabel("Relative time")
plt.ylabel("Relative current")
#plt.yscale("log")

Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …

Text(0, 0.5, 'Relative current')

In [57]:
nt.afpt(d["I_max_low"], d["I_max_high"], I_star, exponent, c)

0.0643550662269322

In [133]:
plt.figure()

for i, d in enumerate(experiment_data):
    if d["I_max_before"] < d["I_max_after"] and not movement_list[i]["label"] == 0:
        if i < iterations * 2 + 2:
            plt.plot(d["t_relative"], d["normalized_current"], c="C{}".format(movement_list[i]["label"] - 1), label="{:.2f} - {:.2f}".format(d["I_max_before"], d["I_max_after"]))
        else:
            plt.plot(d["t_relative"], d["normalized_current"], c="C{}".format(movement_list[i]["label"] - 1))
        
        plt.axvline(nt.afpt(d["I_max_low"], d["I_max_high"], I_star, exponent, c), c="C{}".format(movement_list[i]["label"]-1))

plt.axhline(0.955, c="grey")        
plt.axhline(0.960, c="grey")        
plt.axhline(0.965, c="grey")        

plt.xscale("log")
#plt.yscale("log")

plt.xscale("log")
plt.legend(ncol=2, fontsize="x-small", title="Collimator step (from - to)\nVertical lines are $\\langle t \\rangle$")
plt.xlabel("Relative time")
plt.ylabel("Relative current")

Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …

Text(0, 0.5, 'Relative current')

## Let's try this (violent) thing

In [137]:
idx = np.argmin(np.absolute(1.85-I_list))

rho_test = rho_0(I_list[idx:], I_damping, 0.02)

plt.figure()
    
plt.plot(
    [nt.afpt(i, I_max, I_star, exponent, c) for i in I_list[idx:]],
    rho_test * (0.5 * (nt.D(I_list[idx:], I_star, exponent, c)/I_list[idx:])),
    c="C1", label="$\\langle t \\rangle$ reconstruction"
)

for i, d in enumerate(immovable_data):
    if i == 0:
        plt.plot(d["t_absolute"], d["current"], "--", c="C0", label="Real current")
    else:
        plt.plot(d["t_absolute"], d["current"], "--", c="C0")
        
plt.yscale("log")
plt.title("Comparison between real current and `$\\langle t \\rangle$ based' current")
plt.xlabel("$t$")
plt.ylabel("$J$")
plt.legend()

Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …

<matplotlib.legend.Legend at 0x7f82fe0b0c40>

...it works, this can be VERY usefull.

## Another Violent Test

In [157]:
afpt_test = nt.afpt(I_damping, I_max, I_star, exponent, c)

t_test = np.linspace(0.0, afpt_test*2.0, 200000)
c_test = np.array([nt.current_point(t, I_damping, I_max, I_star, exponent, c) for t in tqdm(t_test)])

  0%|          | 0/200000 [00:00<?, ?it/s]

In [159]:
plt.figure()
plt.plot(t_test, c_test, label="Current lost from $\\delta$ distribution (Analytic estimate)")
plt.axvline(afpt_test, c="red", label="Average first passage time")
plt.xlabel("$t$")
plt.ylabel("Current")
plt.legend()

Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …

<matplotlib.legend.Legend at 0x7f82fe239bb0>

In [163]:
print("All current lost (up to our integration):", scipy.integrate.simps(c_test, t_test))

idx_test = np.argmin(np.absolute(t_test - afpt_test))
print("Current lost up to afpt:", scipy.integrate.simps(c_test[:idx_test], t_test[:idx_test]))

All current lost (up to our integration): 0.9849291930170353
Current lost up to afpt: 0.9674227942119674


The agreement is *almost* perfect, and the discrepancies can be perfectly related to the accuracy lost in the linearization on which the analytic estimation is based on.