In [None]:
import os
os.environ['OMP_NUM_THREADS'] = '1'
n_proc = 1 # define number of processes for multiprocessing
import sys
sys.path.append('../')
import numpy as np
from matplotlib import pyplot as plt
import tqdm
import noctiluca as nl
from multiprocessing import Pool
from itertools import product
import pickle
import warnings
import minflux
import trajectories
import bayesmsd
import pandas as pd

# Settings the warnings to be ignored
warnings.filterwarnings('ignore')

# Simulate MINFLUX on fBms
### Define MINFLUX parameters

In [None]:
# reload(minflux)
minflux_l150 = minflux.Minflux2D()

minflux_l150.beam_offsets_cartesian = minflux_l150.calc_evenly_spaced_points(6)
minflux_l150.add_minflux_L(150)
minflux_l150.beam_pattern = lambda r, z, center: minflux_l150.donut_beam(r, z, 360, 350, center)
minflux_l150.define_multiplex_cycle(20.0, 5.0, 5.0)

In [None]:
tracking_params = {"dt": 1,
                   "background_emission_rate": 0.015,
                   "minimum_photon_threshold": 10,
                   "background_threshold": 80000,
                   "maximum_dark_time": 300000,
                   "stickiness": 4,
                   "beta": np.array([1, 1])}

### Simulate a few trajectories to check

In [None]:
n_trajectories = 2
alphas = [0.1, 0.15, 0.2, 0.25, 0.3]

output_trajectories = {a: [] for a in alphas}

for alpha in alphas:
    print(alpha)
    for i in tqdm.tqdm(range(n_trajectories)):
        fbm_generator = trajectories.FractionalBrownianMotion(1, alpha / 2)
        fbm = fbm_generator.computation_method(fbm_generator.covariance_sequence,
                                               10000 * int(minflux_l150.multiplex_cycle_time),
                                               (3,))
        fbm *= np.sqrt(fbm_generator.calculate_variance(1))
        fbm = np.cumsum(fbm, axis=0)
        output_trajectories[alpha].append(fbm)


### Examine msds

In [None]:
fbm_scaling = 4

colors = ['orangered', 'orchid', 'orange', 'gray', 'royalblue',
          'turquoise', 'limegreen', 'crimson', 'cyan']
fig, ax = plt.subplots(figsize=(9, 6))
for alpha in alphas:
    artists = nl.plot.msd_overview(nl.make_TaggedSet(output_trajectories[alpha][0][:10000].T),
                                   dt=1, label=f"{alpha}")
    for a in artists[:-1]:
        a.remove()
    artists[-1].set_color(colors[i])

x = np.logspace(0, 3)
for a in alphas:
    plt.plot(x, x**a)

plt.ylim([1, 100])
plt.xlabel("frame")
plt.ylabel("displacement")
plt.legend()
plt.show()
plt.show()

# Simulate tracking on a bunch of trajectories

In [None]:
def parfun(curr_todo):
    fbm, alpha, scale, L_val, per = curr_todo
    key = (alpha, scale, L_val, per)
    return key, minflux_l150._track_particle(scale * fbm, L_value=L_val, photon_emission_rate=per, **tracking_params)

In [None]:
n_trajectories = 150
alphas = [0.1, 0.15, 0.2, 0.25, 0.3]
scales = np.sqrt(np.logspace(1, 2.6, 10))
L_values = [25, 50, 100, 150, 200]
pers = [0.5, 1, 2]

output_path = "" # provide output path

for alpha in alphas:
    print(alpha)
    tracking_outputs = {a: [] for a in product(alphas, scales, L_values, pers)}
    error_outputs = {a: [] for a in product(alphas, scales, L_values, pers)}
    photon_emission_rates_outputs = {a: [] for a in product(alphas, scales, L_values, pers)}

    ground_truth_motion_blurred = {a: [] for a in alphas}
    ground_truth_subsampled = {a: [] for a in alphas}

    fbm_generator = trajectories.FractionalBrownianMotion(1, alpha / 2)
    for i in tqdm.tqdm(range(n_trajectories)):
        fbm = fbm_generator.computation_method(fbm_generator.covariance_sequence,
                                               11000 * int(minflux_l150.multiplex_cycle_time),
                                               (3,))
        fbm *= np.sqrt(fbm_generator.calculate_variance(1))
        fbm = np.cumsum(fbm, axis=0)
        fbm -= fbm[0, :]
        localization_chunks = np.split(
                fbm,
                fbm.shape[0] / int(minflux_l150.multiplex_cycle_time),
                axis=0
            )
        ground_truth_motion_blurred[alpha].append(np.concatenate([np.mean(chunk, axis=0) for chunk in localization_chunks]))
        ground_truth_subsampled[alpha].append(fbm[::int(minflux_l150.multiplex_cycle_time / tracking_params["dt"]), :])
        todo = product([fbm], [alpha], scales, L_values, pers)
        with Pool(processes=n_proc) as mypool:
            minflux_list = list(mypool.imap(parfun, todo))
        for key, val in minflux_list:
            tracking_outputs[key].append(val[0])
            error_outputs[key].append(val[1])
            photon_emission_rates_outputs[key].append(val[2])
    with open(os.path.join(output_path, f'position_estimates_alpha_{alpha}.pickle'), 'wb') as handle:
        pickle.dump(tracking_outputs, handle, protocol=pickle.HIGHEST_PROTOCOL)

    with open(os.path.join(output_path, f'position_errors_alpha_{alpha}.pickle'), 'wb') as handle:
        pickle.dump(error_outputs, handle, protocol=pickle.HIGHEST_PROTOCOL)

    with open(os.path.join(output_path, f'emission_rates_alpha_{alpha}.pickle'), 'wb') as handle:
        pickle.dump(photon_emission_rates_outputs, handle, protocol=pickle.HIGHEST_PROTOCOL)

    with open(os.path.join(output_path, f'ground_truth_motion_blurred_{alpha}.pickle'), 'wb') as handle:
        pickle.dump(ground_truth_motion_blurred, handle, protocol=pickle.HIGHEST_PROTOCOL)

    with open(os.path.join(output_path, f'ground_truth_subsampled_{alpha}.pickle'), 'wb') as handle:
        pickle.dump(ground_truth_subsampled, handle, protocol=pickle.HIGHEST_PROTOCOL)

The following block may be used to read the simulation outputs. Otherwise, proceed to the following block. 

In [None]:
alphas = [0.1, 0.15, 0.2, 0.25, 0.3]

position_estimates = []
position_errors = []
emission_rates = []
gt_motion_blurred = []
gt_subsampled = []


for alpha in alphas:
    with open(os.path.join(output_path, f'position_estimates_alpha_{alpha}.pickle'), 'rb') as handle:
        position_estimates.append(pickle.load(handle))

    with open(os.path.join(output_path, f'position_errors_alpha_{alpha}.pickle'), 'rb') as handle:
        position_errors.append(pickle.load(handle))

    with open(os.path.join(output_path, f'emission_rates_alpha_{alpha}.pickle'), 'rb') as handle:
        emission_rates.append(pickle.load(handle))

    with open(os.path.join(output_path, f'ground_truth_motion_blurred_{alpha}.pickle'), 'rb') as handle:
        gt_motion_blurred.append(pickle.load(handle))

    with open(os.path.join(output_path, f'ground_truth_subsampled_{alpha}.pickle'), 'rb') as handle:
        gt_subsampled.append(pickle.load(handle))

In [None]:
all_position_estimates = {}
all_position_errors = {}
all_emission_rates = {}
all_motion_blurred = {}
all_subsampled = {}
for pos_estimate, pos_error, emission_rate in zip(position_estimates, position_errors, emission_rates):
    for key, val in pos_estimate.items():
        if len(val) > 0:
            all_position_estimates[key] = val
            all_position_errors[key] = pos_error[key]
            all_emission_rates[key] = emission_rate[key]
for motion_blurred, subsampled in zip(gt_motion_blurred, gt_subsampled):
    for key, val in motion_blurred.items():
        if len(val) > 0:
            all_motion_blurred[key] = val
            all_subsampled[key] = subsampled[key]

In [None]:
# make noctiluca sets
minflux_set = nl.TaggedSet()
key_labels = ["alpha=", "scale=", "L=", "per="]
for key, position_estimate in all_position_estimates.items():
    tagset = [a + f"{v:.2f}" for a, v in zip(key_labels, key)]
    for curr_position_estimate in position_estimate:
        minflux_set.add(nl.Trajectory(curr_position_estimate.T), tags=tagset)

gt_set = nl.TaggedSet()
for key, subsampled in all_subsampled.items():
    for curr_subsampled in subsampled:
        gt_set.add(nl.Trajectory(curr_subsampled[:, :2]), tags=[f"alpha={key:.2f}",
                                                                "blurred=False"])
    for curr_motion_blurred in all_motion_blurred[key]:
        gt_set.add(nl.Trajectory(curr_motion_blurred.reshape(curr_subsampled.shape)[:, :2]),
                   tags=[f"alpha={key:.2f}", "blurred=True"])

In [None]:
# calculate MSDs for ground truth dataset
gt_set.makeSelection()
_ = nl.analysis.MSD(tqdm.tqdm(gt_set))
nl.io.write.hdf5(gt_set, os.path.join(output_path, "ground_truth_data.h5"))

In [None]:
# calculate MSDs for MINFLUX dataset
def parfun(args):
    i, traj = args
    _ = nl.analysis.MSD(traj)
    return i, traj.meta['MSD']

todo = list(enumerate(minflux_set))
with Pool(processes=n_proc) as mypool:
    imap = mypool.imap_unordered(parfun, todo)
    imap = tqdm.tqdm(imap, total=len(todo))
    for i, msd_meta in imap:
        minflux_set[i].meta['MSD'] = msd_meta
nl.io.write.hdf5(minflux_set, os.path.join(output_path, "simulated_minflux_data.h5"))

# Analyze simulated trajectories
Read outputs. 

In [None]:
output_path = ""

gt_set = nl.io.load.hdf5(os.path.join(output_path, "ground_truth_data.h5"))
minflux_set = nl.io.load.hdf5(os.path.join(output_path, "simulated_minflux_data.h5"))

Examine available tags.

In [None]:
print(minflux_set.tagset())
print(gt_set.tagset())

### Define some useful functions

In [None]:
def default_msd(t, cycles, dt, scale, alpha):
    """A function for the MSD of the fBm without motion blur or localization
    error.

    Parameters
    ----------
    t :
        Time lag
    cycles :
        The MINFLUX cycles
    dt :
        The timestep of the MINFLUX simulation
    scale : float
        The scaling factor for the fBm
    alpha : float
        The alpha for the fBm

    Returns
    -------
        A function for the MSD without localization error or motion blur
    """
    return 2 * scale**2 * cycles**alpha * (t / dt) ** alpha

def params2msd(cycles, dt, scale, alpha, sigma2, motion_blur):
    """Makes an MSD with motion blur and localization error from the given
    parameters

    Parameters
    ----------
    cycles :
        The MINFLUX cycles
    dt : float
        The timestep of the MINFLUX simulation
    scale : float
        The scaling factor for the fBm
    alpha : float
        The alpha for the fBm
    sigma2 : float
        Localization error
    motion_blur : float
        Motion blur to apply

    Returns
    -------
        An function for the MSD with motion blur and localization error
    """

    @bayesmsd.deco.MSDfun
    @bayesmsd.deco.imaging(noise2=sigma2,
                           f=motion_blur,
                           alpha0=alpha,
                          )
    def mb_msd(t, cycles=cycles, dt=dt, scale=scale, alpha=alpha):
        return default_msd(t, cycles, dt, scale, alpha)
    return mb_msd

def msd_plot_only_ensemble(data, color="black", nl_kwargs={}):
    """Helper function to plot only the ensemble average of MSD.

    Parameters
    ----------
    data :
        The data to be plotted
    color : str, optional
        The color of the ensemble line, by default "black"
    nl_kwargs : dict, optional
        kwargs to pass to noctiluca, by default {}

    Returns
    -------
       Artists objects from matplotlib.
    """
    artists = nl.plot.msd_overview(data, **nl_kwargs)
    for a in artists[:-1]:
        a.remove()
    artists[-1].set_color(color)
    return artists


### Produce motion blur SI figure

In [None]:
fig, ax = plt.subplots(3, 5, figsize=(12, 9))
for j, scale in enumerate([5.84, 8.80, 13.25]):
    for i, alpha in enumerate([0.1, 0.15, 0.2, 0.25, 0.3]):
        minflux_set.makeSelection()
        L_vals = [25, 50, 100, 150, 200]
        dt = 2.1e-4
        per=0.5
        for L in L_vals:
            if L == 25:
                per = 2.0
            elif L == 50:
                per = 2.0
            else:
                per = 1.0
            minflux_set.makeSelection(tags=[f"alpha={alpha:.2f}",
                                            f"scale={scale:.2f}",
                                            f"L={L:.2f}",
                                            f"per={per:.2f}"],
                                            logic=all)
            e_msd = nl.analysis.MSD(minflux_set)
            x_vals = dt * np.arange(1, e_msd.shape[0])
            ax[j, i].loglog(x_vals, e_msd[1:], label=f"Minflux L={L}")

        gt_set.makeSelection(tags=[f"alpha={alpha:.2f}", "blurred=False"],
                             logic=all)
        e_msd = nl.analysis.MSD(gt_set)
        x_vals = dt * np.arange(1, e_msd.shape[0])
        ax[j, i].loglog(x_vals, scale**2 * e_msd[1:], "r",
                        label="raw subsampled")

        gt_set.makeSelection(tags=[f"alpha={alpha:.2f}", "blurred=True"],
                             logic=all)
        e_msd = nl.analysis.MSD(gt_set)
        x_vals = dt * np.arange(1, e_msd.shape[0])
        ax[j, i].loglog(x_vals, scale**2 * e_msd[1:], "r", label="motion blurred")

        x_vals = np.logspace(np.log10(x_vals[0]), np.log10(x_vals[-1]))
        ax[j, i].plot(x_vals, default_msd(x_vals, 1, 180, dt, scale, alpha),
                      "k", label="theoretical")
        motion_blur_msd = params2msd(1, 180, dt, scale, alpha, 0, dt)
        ax[j, i].plot(x_vals, motion_blur_msd(x_vals), "k--",
                      label="theoretical+motion blur")
        ax[j, i].set_xlim([x_vals[0], x_vals[-1]])
        ax[j, i].set_title(
            f"alpha={alpha:.2f}, scale={scale:.2f}, per={per:.2f}"
            )
        ax[j, i].set_xlabel("time (s)")
        ax[j, i].set_ylabel("displacement (nm^2)")
        ax[j, i].set_aspect(1./ax[j, i].get_data_ratio())
plt.tight_layout()
plt.savefig("motion_blur_sim_multiple_alpha_multiple_scale.svg")
plt.show()