# 4. Simulating Dynamic State Exchange

Using the PIE-smFRET simulations, blinking is added by simulating a fast exchange to E = 0 / S = 1 (donor-only) and S = 0 (acceptor-only states).

...

In [1]:
%matplotlib inline
import numpy as np
from numba import jit, njit
import tables
from typing import List
import matplotlib.pyplot as plt
import seaborn as sns
import pybromo as pbm
import pandas as pd
from scipy.stats import multinomial, expon
import traceback
print('Numpy version:', np.__version__)
print('PyTables version:', tables.__version__)
print('PyBroMo version:', pbm.__version__)

Numpy version: 1.26.4
PyTables version: 3.9.2
PyBroMo version: 0.8.1


In [2]:
# Lifetime functions from Harris (lifetime divisor paper), source:
# Setting up lifetime simulat ions
irf_read = pd.read_csv('../../simulations/harris_divisor_example/532nm_IRF_19-10-2021.csv', header = 5).to_numpy().reshape(-1)
irf_read -= 20
irf_read[irf_read < 0] = 0
irf_cum = irf_read.cumsum()
IRF_CDF = irf_cum/irf_cum[-1]

def rand_lt(tau, size, rs):
    irf_num = rs.rand(size)
    irf = np.array([np.argwhere(irf_n < IRF_CDF)[0, 0] for irf_n in irf_num]).astype(np.uint16)
    lt = rs.exponential(scale = tau*4096/25, size = size).astype(np.uint16)
    lt_irf = irf + lt
    mask = lt_irf >= 4096
    while np.any(mask):
        lt_irf[mask] -= 4096
        mask = lt_irf >= 4096
    return lt_irf

def rand_lt_nirf(tau, size, rs):
    lt = rs.exponential(scale=tau*4096/25, size=size).astype(np.uint16) + 270 # + 270 to match IRF
    mask = lt >= 4096
    while np.any(mask):
        lt[mask] -= 4096
        mask = lt >= 4096
    return lt

def rand_2lt(taus, size, rs):
    irf_num = rs.rand(size)
    irf = np.array([np.argwhere(irf_n < IRF_CDF)[0,0] for irf_n in irf_num]).astype(np.uint16)
    lt_sel = rs.rand(size) < taus[2]
    n_sel = lt_sel.sum()
    lt_0 = rs.exponential(scale=taus[0]*4096/25, size=n_sel).astype(np.uint16)
    lt_1 = rs.exponential(scale=taus[1]*4096/25, size=size-n_sel).astype(np.uint16)
    lt = np.empty(size, dtype=np.uint16)
    lt[lt_sel] = lt_0
    lt[~lt_sel] = lt_1
    lt_irf = irf + lt
    mask = lt_irf >= 4096
    while np.any(mask):
        lt_irf[mask] -= 4096
        mask = lt_irf >= 4096
    return lt_irf


The following sections deals with simulating state switching. This is achieved by first simulating a state switching trajectory in time steps of the original simulation from a Markov sequence. Each state gets a dwell time which is drawn on a per state basis from an exponential distribution. The corresponding times per state are then used to stitch the simulation files generated in the first part of the document. This is much like originally developed by Paul Harris, but extended to the $N$ state case.

The supplied single photon counting data is simulating pulsed interleaved excitation (PIE), so three photon streams are encoded: $D_{ex}D_{em}$, $D_{ex}A_{em}$, and $A_{ex}A_{em}$.

- $D_{ex}D_{em}$
  - Encoded with detector ID 0 (the donor channel)
- $D_{ex}A_{em}$
  - Encoded with detector ID 1, and nantime bin 5 (for now)
- $A_{ex}A_{em}$
  - Encoded with detector ID 1, and nantime bin 1 (for now)
 
The simulation code retains the sense for the PIE channels and assigned a sensible nanotime bin depending on the nature of the simulated photon stream.

In [3]:
def _markov_sequence(p_init: np.array, p_trans: np.array, sequence_length: int) -> List:
    """
    Generate a Markov sequence
    ---
    p_init - np.array: Initial state vector to start the Markov sequence
    p_trans - np.array: Transition matrix of the Markov sequence
    sequence_length: Length of the Markov sequence

    TODO: Check the user input before making the sequence to rule out faulty behaviour.
    """

    # Get initial state from normal distribution given initial state probabilities
    initial_state = int(list(multinomial.rvs(1, p_init)).index(1))

    # Make array to hold state sequence and assign initial state
    states = np.zeros((int(sequence_length)), dtype = np.int8)
    states[0] = initial_state

    # Build Markov sequence using p_trans
    for i in range(1, sequence_length - 1):
        state_trans = p_trans[states[i - 1]] # Get transition matrix of the last state
        new_state = int(list(multinomial.rvs(1, state_trans)).index(1)) # Draw new state using normal dist.
        states[i] = new_state

    return np.array(states)


@njit
def _markov_sequence_numba(p_init, p_trans, sequence_length):
    
    states = np.zeros(sequence_length, dtype=np.int8)
    states[0] = np.where(np.random.multinomial(1, pvals = p_init) == 1)[0][0]
    
    for i in range(1, sequence_length):
        state_i = np.random.multinomial(1, pvals = p_trans[states[i - 1]])
        states[i] =  np.where(state_i == 1)[0][0]

    return states

def _single_partcle_state_switching(times_s, dwell_times, times_unit, markov_sequence):
    """
    CAVE: times_s is a memoryview() of the simulated photon records and
    is looped over as such. Each iterations is thus a 32 bit = 4 byte
    chunk of the simulated photon records!
    """
    # Get indices for each state dwell and write that to slices_list.
    # The index list will be used to reconstruct the simulation
    # array. The iteration increments (and thus the slices) are
    # expressed in units of 32 bit photon records. This is because
    # times_s is a memoryview().
    state_sequence = []
    slices_list = []
    index_s = np.zeros(shape = len(dwell_times), dtype = np.int64)
    index_start_s = np.zeros(shape = len(dwell_times), dtype = np.int64)
    current_state_index = 0
    current_state = markov_sequence[0]
    t_start = 0
    no_macrotime_entries_left = [False for i in times_s]

    while not all(no_macrotime_entries_left):
        # Check if macrotime entries are remaining for any times array
        for i in range(len(times_s)):
            if index_s[i] >= len(times_s[i]) - 1:
                no_macrotime_entries_left[i] = True


        # Check current state
        #if current_state_index > len(markov_sequence) - 1:
        #    current_state_index = 0
        current_state = markov_sequence[current_state_index]

        # Draw dwell time of current state in units of f
        # macrotime clock ticks (aka. macrotimes indices)
        tau = expon.rvs(scale = dwell_times[current_state] / times_unit)

        # Progress the index counters of ALL simulation
        # files to avoid index progression mismatches
        for state_index in range(len(dwell_times)):
            times = times_s[state_index]
            delta_t = times[index_s[state_index]] - t_start
            while delta_t < tau and index_s[state_index] < len(times) - 1:
                index_s[state_index] += 1
                delta_t = times[index_s[state_index]] - t_start

        # Append times slices for the state sequence
        slices_list.append((index_start_s[current_state], index_s[current_state]))

        # Increment counters
        index_start_s = index_s.copy()
        t_start += tau
        current_state_index += 1

    return slices_list

def sim_state_switching(
    num_particles: int,
    times_states: list,
    det_states: list,
    par_states: list,
    nanotimes_states: list,
    times_unit: float,
    p_trans: np.array,
    p_init: np.array,
    dwell_times: np.array,
    lts: list,
    lt_rs,
    seed = 1
):
    """
    CAVE: This function is slightly modified compared to the sim_state_switching()
    function in 3_Dynamic_State_Exchange! The assignment of a new nanotime had to
    be adjusted to retain the nanotimes of the original state switching
    simulation, while also addign donor only and acceptor only nanotimes. This is
    achieved by adjusting the if-else statement close to the end of the function.
    The function will always assime that state 1 (in mseq_s) refers to the FRET
    simulation, whereas 0 and/or 2 refer to dye blinkign states.

    Simulate N state switching using Markov sequence for each
    particle in a PyBromo simulation. Adapted and expanded
    from Paul Harris's sim_two_states() code.
    ---
    num_particles (int): Number of simulated particles in the 
    times_states (list): 
    det_states (list): N tuple of detector arrays, one for each state
    par_states (list): N tuple of particle ID arrays, one for each states,
    nanotimes_states (list):  N tuple of nanotime arrays for each state; used to differentiate between PIE photon streams
    detector ID 0 is DexDem, detector ID 1 is the accetptor, DexAem was given a temporary nanotime of 5, the AexAem of 2
    times_unit: float,
    lts: list,
    lt_rs: function
    seed = 1
    """
    
    np.random.seed(seed)
    times_p = [] # macrotime entries per particle
    det_p = [] # detector entires per particle
    nanotimes_p = [] # nanotimes per particle


    for particle in range(num_particles):
        print("\nSimulating particle %d: " % particle, end = '', flush = True)

        # Get timestamps and detectors for current particle in each state
        print("timestamps", end = "..", flush = True)
        masks_states = [par == particle for par in par_states]
        times_s = [memoryview(t_par[mask_par]) for t_par, mask_par in zip(times_states, masks_states)]
        det_s = [memoryview(det_par[mask_par]) for det_par, mask_par in zip(det_states, masks_states)]
        nanotimes_s = [memoryview(nanotimes_par[mask_par]) for nanotimes_par, mask_par in zip(nanotimes_states, masks_states)]
        print("[done]", end = "", flush = True)


        # Make Markov sequence and slice list
        # Exhaustive simulation of state switching using the average state
        # dwell times per particle for the ENTIRE duration of the simulation
        # The states slice list is then cut to only be looped through while
        # the particle is actually emiting.
        print('residence..', end='', flush=True)
        mseq_s = _markov_sequence_numba(p_init=p_init, p_trans=p_trans, sequence_length=15_000_000) # There has to be a smarter solution to estimate the max number of state switched: I'm tired though...
        slices = _single_partcle_state_switching(times_s=times_s, dwell_times=np.array(dwell_times), times_unit=times_unit, markov_sequence=mseq_s)

        # Make new timestamps and detector arrays
        print("merge..", end="", flush = True)
        records_size = sum([s[1] - s[0] for s in slices]) # Getting array size of one of the simulation arrays to know how much space to allocate
        times = np.zeros(records_size, dtype = np.int64)
        det = np.zeros(records_size, dtype = np.uint8)
        par = np.ones(records_size, dtype = np.uint8) * particle # Keep info on the particle ID
        nanotimes = np.zeros(records_size, dtype = np.uint16)
        times_m = memoryview(times)
        det_m = memoryview(det)
        nanotimes_m = memoryview(nanotimes)

        # Gather photon data from particle i
        # and build per particle state switching arrays
        istart = 0
        for i in range(len(slices)):
            try:
                start, stop = int(slices[i][0]), int(slices[i][1])
                istop = istart + stop - start
                times_m[istart:istop] = times_s[mseq_s[i]][start:stop]
                det_m[istart:istop] = det_s[mseq_s[i]][start:stop]
                nanotimes_m[istart:istop] = nanotimes_s[mseq_s[i]][start:stop]
                
                # Simulate and assign nanotimes
                # Assign appropriate nanotimes for FRETing particles
                """
                The loop needs to consider the following:
                The current state (FRETing, donor-only, or acceptor-only)
                determines which behavior is chosen
                - For FRETing: Leave all entries as they are because those 
                    have been simulated in the previous notebook
                - For donor or acceptor-only: Check from which photon stream
                    the j-th photon originates AND which state the is currently
                    active (donor-only or acceptor-only)
                    DexDem:
                        - Donor-only: a E = 0 nanotime
                        - Acceptor-only: a uniform/noise photon
                    DexAem:
                        - Always a noise photon
                    AexAem:
                        - Donor-only: A uniform/noise photon
                        - Acceptor-only: a E = 0 photon + 2049 bins shifted

                """
                for j in range(int(stop - start)):
                    if particle is not int(num_particles - 1):
                        # Current state is FRET state (= not dye blinking)
                        # then just use the already simulated nanotimes from the 
                        # previous notebook
                        if mseq_s[i] == int(1):
                            nanotimes_m[istart:istop][j] = nanotimes_s[1][start:stop][j]
                        elif mseq_s[i] == int(0): # for Donor-only / acceptor dark
                            # DexDem photon
                            if det_m[istart:istop][j] == int(0):
                                nanotimes_m[istart:istop][j] = rand_lt(lts[0], 1, lt_rs)[0]
                            # DexAem or AexAem photon
                            else:
                                nanotimes_m[istart:istop][j] = int(np.random.uniform(0, 4095, 1)[0]) 
                        elif mseq_s[i] == int(2):
                            # for acceptor-only / donor dark
                            # AexAem
                            if det_m[istart:istop][j] == int(1):
                                nanotimes_m[istart:istop][j] = rand_lt(lts[0], 1, lt_rs)[0]+ int(4095 // 2)
                            else:
                                nanotimes_m[istart:istop][j] = int(np.random.uniform(0, 4095, 1)[0])
                        else:
                            raise Exception("Could not assign nanotime...")
                    else:
                        nanotimes_m[istart:istop][j] = int(np.random.uniform(0, 4095, 1)[0])
            except Exception as error:
                print("An error occured during array stitching: ", error)
                print(traceback.format_exc())
            istart = istop # Start next iteration where the last ended
        print("[done]", flush = True)
        times_p.append(times)
        det_p.append(det)
        nanotimes_p.append(nanotimes)

    print("\n[Finished]", flush = True)

    return times_p, det_p, nanotimes_p

In [4]:
# Continue with code from two state dynamics tutorial
# see https://github.com/OpenSMFS/PyBroMo/blob/master/notebooks/PyBroMo%20-%205.%20Two-state%20dynamics%20-%20Dynamic%20smFRET%20simulation.ipynb
from pathlib import Path
from textwrap import dedent, indent
from scipy.stats import expon
import phconvert as phc
import pandas as pd

In [5]:
SIM_PATH = 'Two_State_Sims/'
filelist = list(Path(SIM_PATH).glob("smFRET*"))
[f.name for f in filelist]

['smFRET_aa3ff9_E30E50_2states_taus_1000.0ms1000.0ms0.0ms_blink_5.0ms50.0ms10.0ms.h5',
 'smFRET_aa3ff9_E30E50_2states_taus_1000.0ms1000.0ms0.0ms_blink_1.0ms5.0ms2.0ms.h5',
 'smFRET_aa3ff9_E30E50_2states_taus_2.5ms1.0ms0.0ms_blink_0.5ms3.0ms1.0ms.h5',
 'smFRET_aa3ff9_E30E50_2states_taus_1000.0ms1000.0ms0.0ms_blink_1.0ms20.0ms1.0ms.h5',
 'smFRET_aa3ff9_E30E50_2states_taus_2.5ms1.0ms0.0ms.h5',
 'smFRET_aa3ff9_E30E50_2states_taus_2.5ms1.0ms0.0ms_blink_5.0ms50.0ms10.0ms.h5',
 'smFRET_aa3ff9_E30E50_2states_taus_1000.0ms1000.0ms0.0ms_blink_2.0ms5.0ms1.0ms.h5',
 'smFRET_aa3ff9_E30E50_2states_taus_2.5ms1.0ms0.0ms_blink_0.7ms0.78ms0.83ms.h5',
 'smFRET_aa3ff9_E30E50_2states_taus_2.5ms1.0ms0.0ms_blink_1.0ms20.0ms1.0ms.h5',
 'smFRET_aa3ff9_E30E50_2states_taus_2.5ms1.0ms0.0ms_blink_0.25ms2.0ms0.5ms.h5',
 'smFRET_aa3ff9_E30E50_2states_taus_1000.0ms1000.0ms0.0ms.h5',
 'smFRET_aa3ff9_E30E50_2states_taus_2.5ms1.0ms0.0ms_blink_1.0ms5.0ms2.0ms.h5',
 'smFRET_aa3ff9_E30E50_2states_taus_1000.0ms1000.0ms0.0ms

In [6]:
# Opening the photon emission simulations

filename_a = str("smFRET_aa3ff9_P_15_s0_D_1.0e-11_E_0_EmTot_350k_BgD1500_BgA1500_t_max_500s_d_only.hdf5")
filename_b = str("Two_State_Sims/smFRET_aa3ff9_E30E50_2states_taus_2.5ms1.0ms0.0ms.h5")
filename_c = str("smFRET_aa3ff9_P_15_s0_D_1.0e-11_E_0_EmTot_0k_BgD1500_BgA1500_t_max_500s_a_only.hdf5")

da = tables.open_file(filename_a)
db = tables.open_file(filename_b)
dc = tables.open_file(filename_c)

# Timestamps
times_a = da.root.photon_data.timestamps.read()
times_b = db.root.photon_data.timestamps.read()
times_c = dc.root.photon_data.timestamps.read()


# Detectors
det_a = da.root.photon_data.detectors.read()
det_b = db.root.photon_data.detectors.read()
det_c = dc.root.photon_data.detectors.read()


# Particle number for each timestamp
par_a = da.root.photon_data.particles.read()
par_b = db.root.photon_data.particles.read()
par_c = dc.root.photon_data.particles.read()

# Nanotimes
nano_a = da.root.photon_data.nanotimes.read()
nano_b = db.root.photon_data.nanotimes.read()
nano_c = dc.root.photon_data.nanotimes.read()


times_unit = (da.root.photon_data.timestamps_specs.timestamps_unit.read())*10


## Define the state exchange scheme

In [13]:
seed = 562154  # random number generator seed

# Parameters for state switching kinetics and simualted parameters
E1, E2, E3 = 0.0, 0.0, 0.0
base_liftime = 3.0
lts = (-base_liftime*(E1 - 1.0), -base_liftime*(E2 - 1.0), -base_liftime*(E3 - 1.0)) # Get lifetimes for each of the FRET efficiencies

lt_rs = np.random.RandomState()#(seed = seed)
n_particles = int(np.max(par_a) + 1) # Be aware that PyBroMo adds one particle to achieve background count simulation, thus do n_particles + 1

t_step = 0.5E-6 # Time step of each "clock tick" of the simulation
taus = np.array([0.5E-3, 0.75E-3, 0.5E-3]) # State dwell times in s

p_init = np.array([1/len(taus) for i in taus])


# Transition matrix for a linearly exchanging
# system: LF <-> MF <-> HF, not possible is LF <-> HF
p_trans = np.array([
        [0.0, 0.5, 0.5],
        [0.5, 0.0, 0.5],
        [0.5, 0.5, 0.0]
])

times_p, det_p, nanotimes_p = sim_state_switching(
    num_particles= (n_particles),
    times_states=(times_a, times_b, times_c),
    det_states=(det_a, det_b, det_c),
    par_states=(par_a, par_b, par_c),
    nanotimes_states= (nano_a, nano_b, nano_c),
    times_unit=t_step,
    p_trans=p_trans,
    p_init=p_init,
    dwell_times=taus,
    lts=lts,
    lt_rs=lt_rs,
    seed=seed
)


Simulating particle 0: timestamps..[done]residence..merge..[done]

Simulating particle 1: timestamps..[done]residence..merge..[done]

Simulating particle 2: timestamps..[done]residence..merge..[done]

Simulating particle 3: timestamps..[done]residence..merge..[done]

Simulating particle 4: timestamps..[done]residence..merge..[done]

Simulating particle 5: timestamps..[done]residence..merge..[done]

Simulating particle 6: timestamps..[done]residence..merge..[done]

Simulating particle 7: timestamps..[done]residence..merge..[done]

Simulating particle 8: timestamps..[done]residence..merge..[done]

Simulating particle 9: timestamps..[done]residence..merge..[done]

Simulating particle 10: timestamps..[done]residence..merge..[done]

Simulating particle 11: timestamps..[done]residence..merge..[done]

Simulating particle 12: timestamps..[done]residence..merge..[done]

Simulating particle 13: timestamps..[done]residence..merge..[done]

Simulating particle 14: timestamps..[done]residence..merg

In [19]:
assert all(all(np.diff(t) >= 0) for t in times_p)
assert len(times_p) == len(det_p) == n_particles

In [20]:
times_dyn = np.hstack(times_p)
det_dyn = np.hstack(det_p)
nanotimes_dyn = np.hstack(nanotimes_p)
argsort = times_dyn.argsort(kind='mergesort')
times_dyn = times_dyn[argsort]
det_dyn = det_dyn[argsort]
nanotimes_dyn = nanotimes_dyn[argsort]
par_dyn = np.hstack([det_p_i.size * [idx] for idx, det_p_i in enumerate(det_p)])
assert par_dyn.shape[0] == sum(d.size for d in det_p)
par_dyn = par_dyn[argsort]

In [21]:
# Save as hdf5
def make_photon_hdf5(times, det, par, nanotimes, times_unit, description, identity=None):
    photon_data = dict(
        timestamps = times,
        timestamps_specs = dict(timestamps_unit=times_unit/10),
        detectors = det,
        particles = par,
        nanotimes = nanotimes,
        nanotimes_specs = {
            'tcspc_unit': (1/20E6)/4096,
            'tcspc_range': (1/20E6),
            'tcspc_num_bins': 4096
        },
        measurement_specs = dict(
            measurement_type = 'smFRET-nsALEX',
            laser_repetition_rate = 20e6,
            detectors_specs = dict(spectral_ch1 = np.atleast_1d(0),
                                   spectral_ch2 = np.atleast_1d(1))))

    setup = dict(
        num_pixels = 1,
        num_spots = 1,
        num_spectral_ch = 2,
        num_polarization_ch = 1,
        num_split_ch = 1,
        modulated_excitation = True,
        lifetime = True,
        laser_repetition_rates = [20e6],
        excitation_alternated=(True,),
        excitation_cw=(False,)
    )

    provenance = dict(software='', software_version='', filename='')

    if identity is None:
        identity = dict()

    description = description
    acquisition_duration = (times[-1] - times[0]) * times_unit
    data = dict(
        acquisition_duration = round(acquisition_duration),
        description = description,
        photon_data = photon_data,
        setup=setup,
        provenance=provenance,
        identity=identity)
    return data

# Patching numpy asscalar
def patch_asscalar(a):
    return a.item()

setattr(np, "asscalar", patch_asscalar)


description = "Simulated data with simulated dye blinking"

In [22]:
# Write simulation to file
identity=dict(author='Max Mustermann', 
              author_affiliation='Simulated Data')
data = make_photon_hdf5(times_dyn, det_dyn, par_dyn, nanotimes_dyn, times_unit, description, identity=identity)
tau_string = ""
dyn_dwelltimes = filename_b[filename_b.find("taus_"):filename_b.find(".h5")]
blinking_string = ""
for tau in taus:
    blinking_string += str(tau*1000) + "ms"
h5_fname = "Blinking_Titration/" + filename_b[filename_b.find("smFRET"):filename_b.find("tau")] + dyn_dwelltimes + str("_blink_") +blinking_string
h5_fname_path = h5_fname + str(".h5")
print(h5_fname_path)

Blinking_Titration/smFRET_aa3ff9_E30E50_2states_taus_2.5ms1.0ms0.0ms_blink_0.5ms0.75ms0.5ms.h5


In [23]:
phc.hdf5.save_photon_hdf5(data, h5_fname=h5_fname_path, overwrite=True)

         File info in provenance group will not be added.

Saving: Blinking_Titration/smFRET_aa3ff9_E30E50_2states_taus_2.5ms1.0ms0.0ms_blink_0.5ms0.75ms0.5ms.h5
