In [None]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import scipy as sp
import seaborn as sns
from postprocessor.core.processes.fft import fft
from postprocessor.core.processes.findpeaks import findpeaks
from postprocessor.routines.mean_plot import mean_plot

from postprocessor.core.multisignal.crosscorr import crosscorr
# from crosscorr import crosscorr
from synthetic import (
    fitzhugh_nagumo,
    fitzhugh_nagumo_stochastic,
    gillespie_noise,
    gillespie_noise_raw,
    harmonic,
    harmonic_stochastic,
    sinusoid,
)
from utils import multiarray_random_shift, simple_median_plot, tile_signals

In [None]:
def logistic_envelope(timeaxis, k_min, k_max, tau):
    """Logistic function, to function as envelope function for oscillations"""
    return k_min + k_max * (1 - np.exp(-timeaxis / tau))

GENERATE SIGNALS

Step 1: Define parameters

Step 2: Generate arrays of signals

In [None]:
num_timeseries = 100
timeaxis = np.linspace(0, 500, 500)
#timeaxis = np.linspace(0, 100, 100)

Choice group A: fill with same signal
(thus using numpy.tile instead of for loop to make it fast)

Choice 1: Array of FHNs

In [None]:
fitzhugh_nagumo_single, _ = fitzhugh_nagumo(
    timeaxis=timeaxis, ext_stimulus=0.4, tau=12.5, a=0.7, b=0.82
)
fitzhugh_nagumo_single -= np.mean(fitzhugh_nagumo_single)
fitzhugh_nagumo_array = tile_signals([fitzhugh_nagumo_single], [num_timeseries])

Choice 2: Array of sinusoids

In [None]:
sinusoid_single = sinusoid(timeaxis=timeaxis, amp=1, freq=0.0235, phase=0)
sinusoid_array = tile_signals([sinusoid_single], [num_timeseries])

Choice 3: Mixed array of sinusoids

In [None]:
sinusoid_long = sinusoid(timeaxis=timeaxis, amp=1, freq=0.03, phase=0)
sinusoid_short = sinusoid(timeaxis=timeaxis, amp=1, freq=0.04, phase=0)
sinusoid_mixed_array = tile_signals([sinusoid_short, sinusoid_long], [20, 20])

Shift phases -- grouping pairs/triplets/tuples of signals that come from
the same sources

In [None]:
fitzhugh_nagumo_array, sinusoid_array = multiarray_random_shift(
    [fitzhugh_nagumo_array, sinusoid_array]
)

In [None]:
sinusoid_array = multiarray_random_shift([sinusoid_array])[0]

In [None]:
sinusoid_mixed_array = multiarray_random_shift([sinusoid_mixed_array])[0]

Choice group B: each row is different

Choice 4: Array of sinusoids, random phases

In [None]:
sinusoid_outofphase_array = np.empty((num_timeseries, len(timeaxis)), dtype=float)
for row_index in range(num_timeseries):
    phase = np.random.random() * 2 * np.pi
    sinusoid_outofphase_array[row_index] = sinusoid(
        timeaxis=timeaxis, amp=1, freq=0.03, phase=phase
    )

Choice 5: Mixed array of sinusoids, random phases

In [None]:
def generate_sinusoid_outofphase_array(num_timeseries, timeaxis, freq):
    sinusoid_outofphase_array = np.empty((num_timeseries, len(timeaxis)), dtype=float)
    for row_index in range(num_timeseries):
        phase = np.random.random() * 2 * np.pi
        sinusoid_outofphase_array[row_index] = sinusoid(
            timeaxis=timeaxis, amp=1, freq=freq, phase=phase
        )
    return sinusoid_outofphase_array


sinusoid_outofphase_long = generate_sinusoid_outofphase_array(200, timeaxis, 0.03)
sinusoid_outofphase_short = generate_sinusoid_outofphase_array(200, timeaxis, 0.04)
sinusoid_mixed_array = np.concatenate(
    (sinusoid_outofphase_short, sinusoid_outofphase_long)
)

Choice 6: Array of sinusoids with envelope function, random phases
(was functionalised, could make it function again if needed)

In [None]:
k_min = 1
k_max = 10
tau = 100
envelope = logistic_envelope(timeaxis=timeaxis, k_min=k_min, k_max=k_max, tau=tau)
nonstat_sinusoid_array = np.empty((num_timeseries, len(timeaxis)), dtype=float)
for row_index in range(num_timeseries):
    phase = np.random.random() * 2 * np.pi
    nonstat_sinusoid_array[row_index] = envelope * sinusoid(
        timeaxis=timeaxis, amp=1, freq=0.03, phase=phase
    )

Step 3: Generate arrays of noise

Choice 1: white/Gaussian noise

In [None]:
white_noise_std = 3
white_noise_array1 = np.random.normal(
    loc=0, scale=white_noise_std, size=(num_timeseries, len(timeaxis))
)
white_noise_array2 = np.random.normal(
    loc=0, scale=white_noise_std, size=(num_timeseries, len(timeaxis))
)

Choice 2: Gillespie noise

In [None]:
# SET PARAMETERS
noise_timescale = 20
noise_amp = 100
gill_time_final = 7500
gill_num_intervals = 5000

In [None]:
# GENERATE noise array
gill_noise_array = gillespie_noise(
    num_timeseries=num_timeseries,
    num_timepoints=len(timeaxis),
    noise_timescale=noise_timescale,
    noise_amp=noise_amp,
    time_final=gill_time_final,
    grid_num_intervals=gill_num_intervals,
)

In [None]:
# Filename generator
deathrate = 1 / noise_timescale
birthrate = noise_amp / noise_timescale
deathrate_str = str(deathrate).replace(".", "p")
birthrate_str = str(birthrate).replace(".", "p")
gill_noise_filename = "gillespienoise_k" + birthrate_str + "_d" + deathrate_str + ".csv"
# LOAD noise array
gill_noise_array = np.genfromtxt(gill_noise_filename, delimiter=",")

Step 4: Assign signal and noisy arrays, then construct final dataframes

Step 4.1: Assign signal arrays

In [None]:
signal_array1 = sinusoid_array
signal_array2 = fitzhugh_nagumo_array

Step 4.2: Assign noise arrays

In [None]:
noise_array1 = gill_noise_array

noise_array2 = gill_noise_array.copy()
rng = np.random.default_rng()
rng.shuffle(noise_array2)
# noise_array2 = white_noise_array2

In [None]:
# Filename generator for next two cells
# noise_timescale = 200
# noise_amp = 1000

deathrate = 1 / noise_timescale
birthrate = noise_amp / noise_timescale
deathrate_str = str(deathrate).replace(".", "p")
birthrate_str = str(birthrate).replace(".", "p")
filename = "gillespienoise_k" + birthrate_str + "_d" + deathrate_str + ".csv"
print(filename)

In [None]:
# Alternative: LOAD noise array(s)
gill_noise_array = np.genfromtxt(filename, delimiter=",")

In [None]:
# Optional: SAVE noise array(s)
# Useful for Gillespie noise because it takes time to generate,
# especially for long final times.
deathrate = 1 / noise_timescale
birthrate = noise_amp / noise_timescale
deathrate_str = str(deathrate).replace(".", "p")
birthrate_str = str(birthrate).replace(".", "p")
filename = "gillespienoise_k" + birthrate_str + "_d" + deathrate_str + ".csv"

np.savetxt(filename, gill_noise_array, delimiter=",")

Step 4.3: Add signal and noise

In [None]:
signal_array1 = signal_array1 + noise_array1
signal_array2 = signal_array2 + noise_array2

Alternative: harmonic oscillation with stochastic parameters

In [None]:
# Generate Gillespie noise (raw)
noise_timescale = 20
noise_amp = 500

gill_time_final = 750 #0
gill_num_intervals = 500 #0
gill_noise_time, gill_noise_list = gillespie_noise_raw(
    num_timeseries=num_timeseries,
    noise_timescale=noise_timescale,
    noise_amp=noise_amp,
    time_final=gill_time_final,
)

print("Gillespie noise generated.")

# Model parameter
ang_freq = 0.3
# Noise parameter
std = 0.015

# Scale Gillespie time axis to fit time axis
for gill_time_element in gill_noise_time:
    gill_time_element -= gill_time_element[0]
    gill_time_element *= timeaxis[-1] / gill_time_element[-1]

# Scale noise array to create angular frequency array
# ang_freq_2darray = (gill_noise_array * std) + ang_freq
ang_freq_2darray = [
    (gill_noise_element * std) + ang_freq for gill_noise_element in gill_noise_list
]

# Generate sinusoids via harmonic DEs, with stochastic angular frequency
# defined by gill_noise_array
# TODO: Make this faster, this is ridiculously slow
stoch_signal_list = []
for row_index in range(num_timeseries):
    # Random phase shift
    # Determine initial conditions...
    # (add code here)

    # Generate sinusoid
    print(f"Generating time series {row_index+1} of {num_timeseries}")
    print("Stochastic")
    stoch_signal, _ = harmonic_stochastic(
        timeaxis=gill_noise_time[row_index],
        ang_freq_array=ang_freq_2darray[row_index],
    )
    stoch_signal_list.append(stoch_signal)
    print("Done")
    
determ_signal, _ = harmonic(
    timeaxis=timeaxis,
    ang_freq=ang_freq,
)

In [None]:
plt.subplots(figsize=(10,5))
plt.step(gill_noise_time[0], gill_noise_list[0])
#plt.plot(gill_noise_time[0], gill_noise_list[0])
plt.title('gillespie noise')

plt.subplots()
plt.plot(np.diff(gill_noise_time[0]))
plt.title('time interval over time point')

plt.subplots(figsize=(10,5))
plt.step(gill_noise_time[0], ang_freq_2darray[0])
#plt.plot(gill_noise_time[0], ang_freq_2darray[0])
plt.title('angular frequency')

plt.subplots(figsize=(10,5))
plt.plot(gill_noise_time[0], stoch_signal_list[0], '-o')
plt.title('stochastic simulated signal')

plt.subplots(figsize=(10,5))
plt.plot(timeaxis, determ_signal, '-o')
plt.title('deterministic simulated signal')

plt.subplots()
plt.plot(np.diff(timeaxis))
plt.title('time interval over time point')

Alternative: FitzHugh-Nagumo model with stochastic parameters

In [None]:
# TECH DEBT: HARD-CODING
num_timeseries = 1
timeaxis = np.linspace(0, 100, 100)

# Generate Gillespie noise (raw)
noise_timescale = 20
noise_amp = 500

gill_time_final = 750 #0
gill_num_intervals = 500 #0
gill_noise_time, gill_noise_list = gillespie_noise_raw(
    num_timeseries=num_timeseries,
    noise_timescale=noise_timescale,
    noise_amp=noise_amp,
    time_final=gill_time_final,
)

print("Gillespie noise generated.")

# Model parameters
ext_stimulus = 0.4
tau = 12.5
a = 0.7
b = 0.8
# Noise parameter
std = 0.015

# Scale noise array to create ext_stimulus array
# TODO: Use a different gill_noise_array per each parameter -- i.e.
# generate a 4x size gill_noise_array and slice that into four

ext_stimulus_2darray = [
    (gill_noise_element * std) + ext_stimulus for gill_noise_element in gill_noise_list
]
tau_2darray = [
    (gill_noise_element * std) + tau for gill_noise_element in gill_noise_list
]
a_2darray = [
    (gill_noise_element * std) + a for gill_noise_element in gill_noise_list
]
b_2darray = [
    (gill_noise_element * std) + b for gill_noise_element in gill_noise_list
]

#ext_stimulus_array = (gill_noise_list[0] * std) + ext_stimulus
#tau_array = (gill_noise_list[1] * std) + tau
#a_array = (gill_noise_list[2] * std) + a
#b_array = (gill_noise_list[3] * std) + b

# Generate time series via FHN DEs, with stochastic parameters
# defined by gill_noise_array
# TODO: Make this faster, this is ridiculously slow
stoch_signal_list = []
for row_index in range(num_timeseries):
    # Random phase shift
    # Determine initial conditions...
    # (add code here)

    # Generate signal
    print(f"Generating time series {row_index+1} of {num_timeseries}")
    print("Stochastic")
    stoch_signal, _ = fitzhugh_nagumo_stochastic(
        timeaxis=gill_noise_time[0],
        ext_stimulus_array=ext_stimulus_2darray[row_index],
        tau_array=tau_2darray[row_index],
        a_array=a_2darray[row_index],
        b_array=b_2darray[row_index],
    )
    stoch_signal_list.append(stoch_signal)
    print("Done")

determ_signal, _ = fitzhugh_nagumo(
    timeaxis=timeaxis, ext_stimulus=0.4, tau=12.5, a=0.7, b=0.8
)

In [None]:
plt.subplots(figsize=(10,5))
plt.step(gill_noise_time[0], gill_noise_list[0])
#plt.plot(gill_noise_time[0], gill_noise_list[0])
plt.title('gillespie noise')

plt.subplots()
plt.plot(np.diff(gill_noise_time[0]))
plt.title('time interval over time point')

plt.subplots(figsize=(10,5))
plt.step(gill_noise_time[0], ext_stimulus_2darray[0])
#plt.plot(gill_noise_time[0], ang_freq_2darray[0])
plt.title('ext stimulus')

plt.subplots(figsize=(10,5))
plt.plot(gill_noise_time[0], stoch_signal_list[0], '-o')
plt.title('stochastic simulated signal')

plt.subplots(figsize=(10,5))
plt.plot(timeaxis, determ_signal, '-o')
plt.title('deterministic simulated signal')

plt.subplots()
plt.plot(np.diff(timeaxis))
plt.title('time interval over time point')

Step 4.4: Construct dataframes for correlation processes

In [None]:
signal_df1 = pd.DataFrame(signal_array1)
signal_df2 = pd.DataFrame(signal_array2)

Step 5: Autocorrelation & cross-correlation

Autocorrelation

In [None]:
autocorr_result = crosscorr.as_function(
    signal_df1, stationary=False, normalised=True, only_pos=True
)

Cross-correlation

In [None]:
crosscorr_result = crosscorr.as_function(signal_df1, signal_df2)

Mean across replicates

In [None]:
mean_across_replicates = np.nanmean(signal_array1, axis=0).reshape(
    (1, signal_array1.shape[1])
)
mean_across_replicates = mean_across_replicates.T

PLOTTING

input data

In [None]:
sns.heatmap(signal_df1)

In [None]:
sns.heatmap(signal_df2)

gillespie noise

In [None]:
gill_array = signal_array1
fig, ax = plt.subplots(figsize=(12, 4))
ax.plot(
    mean_across_replicates,
    linewidth=3,
    label=f"mean across {gill_array.shape[0]} replicates",
)
ax.plot(gill_array[0], linewidth=1, label="replicate 1")
ax.plot(gill_array[1], linewidth=1, label="replicate 2")
ax.set_xlabel("Time point")
ax.set_ylabel("Signal")
fig.legend()

acf of signals with gillespie noise, with options

In [None]:
fit_exp_decay = True
scale_lag_by_pd = True
freq = 0.03

fig, ax = plt.subplots()

# scale lag axis by sinusoid period
autocorr_result_scaled = autocorr_result.copy()
if scale_lag_by_pd:
    freq = freq
    xlabel = "Lag (periods)"
    plt.vlines(x=[1, 2, 3, 4], ymin=-1, ymax=1, ls="--")
else:
    freq = 1
    xlabel = "Lag (time points)"
autocorr_result_scaled.columns *= freq

# fit exp decay
if fit_exp_decay:
    decayrate = (gill_time_final / (gill_num_intervals - 1)) * (1 / noise_timescale)
    t = autocorr_result.columns.to_numpy()
    decay_function = np.exp(-(decayrate) * t)
    ax.plot(t * freq, decay_function, color="r")

# draw acf
simple_median_plot(
    autocorr_result_scaled,
    xlabel=xlabel,
    ylabel="Autocorrelation function",
    ax=ax,
)
# and axes
plt.axhline(0, color="k")
plt.axvline(0, color="k")

at longer lags, fewer data points are used to compute acf, and thus the std dev across replicate acfs at those points are greater

In [None]:
lag = np.linspace(0, 499, 500)
num_datapoints = signal_array1.shape[1] * np.linspace(500, 1, 500)
acf_variation = np.std(autocorr_result)

fig, ax = plt.subplots()
ax.plot(num_datapoints, acf_variation)
ax.set_xlabel("Number of data points used at lag value")
ax.set_ylabel("Standard deviation of\nautocorrelation function values at lag value")

variation between acfs, expressed as area the curve of std dev change over lag time

In [None]:
std_array = autocorr_result.std(axis=0)
plt.plot(std_array)
auc = np.trapz(std_array)
print(auc)

robustness: fft computed and power at oscillation frequency examined

In [None]:
fft_freqs, fft_power = fft.as_function(autocorr_result, sampling_period=1)

fig_fft, ax_fft = plt.subplots()
mean_plot(
    fft_power,
    unit_scaling=fft_freqs.iloc[0, 1],
    label="",
    xlabel="Frequency ($\mathrm{min}^{-1}$)",
    ylabel="Power",
    plot_title=f"Mean Fourier spectrum across all time series\n(n = {len(fft_power)})",
    ax=ax_fft,
)
ax_fft.axvline(
    freq,
    color="r",
    linestyle="--",
)

index_freq = np.argwhere(fft_freqs.iloc[0].to_numpy() == 0.03)[0][0]
powers_at_freq = fft_power.iloc[:, index_freq]
print(f"mean power at freq = {np.mean(powers_at_freq)}")
print(f"std dev of power at freq = {np.std(powers_at_freq)}")

envelope function

In [None]:
# find peaks & troughs
mean_acf_df = autocorr_result.mean().to_frame().T
peaks_df = findpeaks.as_function(mean_acf_df)
troughs_df = findpeaks.as_function(-mean_acf_df)

# datatype conversions
timeaxis = mean_acf_df.columns.to_numpy()
mean_acf = mean_acf_df.to_numpy()[0]
peaks_mask = peaks_df.to_numpy()[0] != 0
troughs_mask = troughs_df.to_numpy()[0] != 0

In [None]:
# https://stackoverflow.com/questions/3938042/fitting-exponential-decay-with-no-initial-guessing
def model_func(t, A, K, C):
    return A * np.exp(-K * t) + C


def fit_exp_linear(t, y, C=0):
    y = y - C
    y = np.log(y)
    K, A_log = np.polyfit(t, y, 1)
    A = np.exp(A_log)
    return A, K


def fit_exp_nonlinear(t, y, p0):
    opt_parms, parm_cov = sp.optimize.curve_fit(model_func, t, y, p0, maxfev=1000)
    A, K, C = opt_parms
    return A, K, C


# initial guess is the decay function in acf plot
central_decay_rate = (gill_time_final / (gill_num_intervals - 1)) * (
    1 / noise_timescale
)
initial_A = 1
initial_K = central_decay_rate
initial_C = 0
initial_guess = [initial_A, initial_K, initial_C]

# fit peaks
upper_A, upper_K, upper_C = fit_exp_nonlinear(
    timeaxis[peaks_mask],
    mean_acf[peaks_mask],
    p0=initial_guess,
)
upper_func = model_func(timeaxis, upper_A, upper_K, upper_C)

# fit troughs
lower_A, lower_K, lower_C = fit_exp_nonlinear(
    timeaxis[troughs_mask],
    -mean_acf[troughs_mask],
    p0=initial_guess,
)
lower_func = -model_func(timeaxis, lower_A, lower_K, lower_C)

In [None]:
plt.plot(timeaxis, mean_acf)
plt.scatter(timeaxis[peaks_mask], mean_acf[peaks_mask])
plt.scatter(timeaxis[troughs_mask], mean_acf[troughs_mask])
plt.plot(timeaxis, upper_func)
plt.plot(timeaxis, lower_func)

print(f"upper envelope: {upper_A:.4f} * exp(- {upper_K:.4f}) + {upper_C:.4f}")
print(f"lower envelope: {lower_A:.4f} * exp(- {lower_K:.4f}) + {lower_C:.4f}")

cross-correlation

In [None]:
plt.plot(sinusoid_array[0])
plt.plot(fitzhugh_nagumo_array[0])

In [None]:
plt.plot(signal_df1[0])
plt.plot(signal_df2[0])

In [None]:
simple_median_plot(
    crosscorr_result, ylabel="Cross correlation", xlabel="Lag (time points)"
)
plt.axhline(0, color="k")
plt.axvline(0, color="k")