In [None]:
import numpy as np
from scipy.stats import maxwell, multivariate_normal
from scipy.stats import mannwhitneyu
import pandas as pd
import seaborn as sns
from matplotlib import pyplot as plt
from tqdm import tqdm

from calmutils.simulation import runit_vec


def simulate_distance_pairs(bias=0, noise_sd=0, mu=100, delta_mu=10, N=1000, dimensionality=3, N2=None):

    # if N of second group is not given, use same
    if N2 is None:
        N2 = N

    # make bias in random direction (will be the same for all vectors)
    bias = (bias * runit_vec(d=3)).squeeze()

    # Maxwell a parameter so we get desired means (https://en.wikipedia.org/wiki/Maxwell%E2%80%93Boltzmann_distribution)
    a1 = mu * np.sqrt(np.pi) / np.sqrt(2**3)
    a2 = (mu + delta_mu) * np.sqrt(np.pi) / np.sqrt(2**3)

    # Maxwell random distances with desired means
    d1 = maxwell.rvs(size=N, scale=a1)
    d2 = maxwell.rvs(size=N2, scale=a2)

    # random vectors with those distances
    v1 = (runit_vec(N, dimensionality).T * d1).T
    v2 = (runit_vec(N2, dimensionality).T * d2).T

    # add multivariate normal with mean=bias, and cov to correspond to noise sd (we use symmetical, TODO: also asymmetrical?)
    v1_noise = v1 + multivariate_normal.rvs(mean=bias, cov=np.diag(np.full(dimensionality, noise_sd**2)), size=N)
    v2_noise = v2 + multivariate_normal.rvs(mean=bias, cov=np.diag(np.full(dimensionality, noise_sd**2)), size=N2)

    # lengths of noisy vectors
    d1_noise = np.linalg.norm(v1_noise, axis=1)
    d2_noise = np.linalg.norm(v2_noise, axis=1)

    return d1, d2, d1_noise, d2_noise


def power_simulation(N_sims, bias, noise_sd, mu, delta_mu, N, N2, dimensionality=3):
    """
    Repeat distance pair simulation with localization inaccuracies N_sims times.
    Collect mean differences of noisy data and noiseless
    and Mann-Whitney-U pvalues for each simulation.
    """

    # Maxwell distribution is not defined for mean distances < 0, return dummy result
    if mu < 0 or mu + delta_mu < 0:
        return [1] * N_sims, [0] * N_sims

    pvals = []
    mean_diffs = []
    for _ in range(N_sims):
        d1, d2, d1_noise, d2_noise = simulate_distance_pairs(bias, noise_sd, mu, delta_mu, N, dimensionality, N2)
        mean_diffs.append(np.mean(np.concat([d1_noise - d1, d2_noise - d2])))
        pvals.append(mannwhitneyu(d1_noise, d2_noise).pvalue)
    return pvals, mean_diffs

In [None]:
## PARAMETERS

# should be 3
dimensionality = 3

# data point pairs to simulate
N = 5_000
N2 = None

# mean distance of 1st set
mu = 300
# extra distance of 2nd set
delta_mu = 100

# std. dev. of normal noise to be added to 
noise_sd = 100

# bias of measurement (in random direction)
bias = 0


## RUN SIMULATION
d1, d2, d1_noise, d2_noise = simulate_distance_pairs(bias, noise_sd, mu, delta_mu, N, dimensionality, N2)

### PLOT & ANALYSIS

# make temp df from distances (each as own column -> will be melted for plot)
df = pd.DataFrame.from_dict({
    'd1_noiseless': d1,
    'd2_noiseless': d2,
    'd1_withnoise': d1_noise,
    'd2_withnoise': d2_noise
})

df = df.melt()
df[["measurement", "noise"]] = df["variable"].str.split("_", expand=True)
# sns.histplot(df, x='value', hue="variable", element='poly', fill=None)

# Significance test with "clean" and "noisy" distances
mannwhitneyu(d1, d2), mannwhitneyu(d1_noise, d2_noise)


def get_histogram_df(df, grouping_columns, value_column, bins=25, cumulative=False):

    df_stats = []

    for i, dfi in df.groupby(grouping_columns):

        counts, bin_centers = np.histogram(dfi[value_column], bins=np.linspace(0, df[value_column].max(), bins+1))
        probs = counts / counts.sum()
        probs_cumulative = np.cumsum(probs)
        bin_centers = (bin_centers[:-1] + bin_centers[1:]) / 2

        df_hist = pd.DataFrame({"prob": probs, value_column: bin_centers})
        df_hist['stat'] = 'probability'

        df_hist_cum = pd.DataFrame({"prob": probs_cumulative, value_column: bin_centers})
        df_hist_cum['stat'] = 'probability_cumulative'

        df_stats_i = df_hist_cum if cumulative else df_hist
        df_stats_i[grouping_columns] = i

        df_stats.append(df_stats_i)

    return pd.concat(df_stats)

df = get_histogram_df(df, ["measurement", "noise"], "value", bins=40)

sns.lineplot(df, x="value", y="prob", hue="measurement", style="noise")
sns.despine()
plt.savefig('/home/david/Documents/PromoterEnhancer_Revision/PowerFigure/power_analysis_example.pdf')

In [None]:
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from itertools import product

N_sims = 1_000
significance_cutoff = 0.05

# should be 3
dimensionality = 3


# N: data point pairs to simulate
# mu: mean distance of 1st set
# delta_mu: extra distance of 2nd set
# noise_sd: std. dev. of normal noise to be added to
# bias: bias of measurement (in random direction)

# Parameters for STED transcribing vs. non-transcribing
# N(non-tr): ~5000, n2(tr): ~150, mu(for non-tr): 430nm
simulation_parameters_sted = {
    "N": 5000,
    "N2": 150,
    "mu": 430,
    "delta_mu": np.arange(-150, 151, 10),
    "noise_sd": np.arange(0, 151, 5),
    "bias": 0
}

simulation_parameters_spining_disk = {
    "N": 1900,
    "N2": 1900,
    "mu": 300,
    "delta_mu": np.arange(-150, 151, 10),
    "noise_sd": np.arange(0, 251, 10),
    "bias": 0
}

simulation_parameters_spining_disk_varmu = {
    "N": 1500,
    "N2": 1500,
    "delta_mu": np.arange(-150, 151, 10),
    "mu": np.arange(0, 501, 10),
    "noise_sd": 40,
    "bias": 0
}

simulation_parameters_sted_varmu = {
    "N": 5000,
    "N2": 150,
    "delta_mu": np.arange(-150, 151, 10),
    "mu": np.arange(0, 501, 10),
    "noise_sd": 10,
    "bias": 0
}


# pick one of the parameter sets
simulation_parameters = simulation_parameters_sted_varmu

# check that we have two parameters with multiple values for which we do grid of simulations
if not np.sum([not np.isscalar(v) for v in simulation_parameters.values()]) == 2:
    raise ValueError("provide lists of values for two parameters")

tested_parameters_names = [k for k,v in simulation_parameters.items() if not np.isscalar(v)]
values_p1, values_p2 = [v for v in simulation_parameters.values() if not np.isscalar(v)]

significance_heatmap = np.zeros((len(values_p1), len(values_p2)))
mean_diff_heatmap = np.zeros((len(values_p1), len(values_p2)))

futures = []

# thread pool does not work nicely e.g. on my Linux machine 
# -> use process pool if necessary

# with ThreadPoolExecutor() as tpe:
with ProcessPoolExecutor() as tpe:
    for i, p1 in enumerate(values_p1):
        for j, p2 in enumerate(values_p2):
            
            # get parameter values from loop values or scalars from dict
            bias = p1 if tested_parameters_names[0] == "bias" else (p2 if tested_parameters_names[1] == "bias" else simulation_parameters["bias"])
            noise_sd = p1 if tested_parameters_names[0] == "noise_sd" else (p2 if tested_parameters_names[1] == "noise_sd" else simulation_parameters["noise_sd"])
            mu = p1 if tested_parameters_names[0] == "mu" else (p2 if tested_parameters_names[1] == "mu" else simulation_parameters["mu"])
            delta_mu = p1 if tested_parameters_names[0] == "delta_mu" else (p2 if tested_parameters_names[1] == "delta_mu" else simulation_parameters["delta_mu"])
            N = p1 if tested_parameters_names[0] == "N" else (p2 if tested_parameters_names[1] == "N" else simulation_parameters["N"])
            N2 = p1 if tested_parameters_names[0] == "N2" else (p2 if tested_parameters_names[1] == "N2" else simulation_parameters["N2"])
 
            futures.append(tpe.submit(power_simulation, N_sims, bias, noise_sd, mu, delta_mu, N, N2, dimensionality))
            
    n_combos = len(values_p1) * len(values_p2)

    for f, (i, j) in tqdm(zip(futures, product(range(len(values_p1)), range(len(values_p2)))), total=n_combos):
        pvals, mean_diffs = f.result()
        significance_heatmap[i,j] = (np.array(pvals) < significance_cutoff).mean()
        mean_diff_heatmap[i,j] = np.array(mean_diffs).mean()

In [None]:
plt.figure(figsize=(12,8))

plt.imshow(significance_heatmap , cmap='magma')
plt.colorbar(shrink=0.7)

plt.xticks(range(len(values_p2)), values_p2, rotation=90);
plt.xlabel(tested_parameters_names[1])

plt.yticks(range(len(values_p1)), values_p1);
plt.ylabel(tested_parameters_names[0])

plt.clim(0, 1)

other_param_summary = ', '.join(f"{k}: {v}" for k,v in simulation_parameters.items() if np.isscalar(v))
plt.title(f"Fraction of significant tests\n{other_param_summary}")

# annot_coord = (5, 17)
# val_at_arrow = significance_heatmap[*annot_coord[::-1]] 
# plt.annotate(f"observed data: {val_at_arrow}", annot_coord, arrowprops=dict(arrowstyle='->'), xytext=(3, 23))

plt.savefig('/home/david/Documents/PromoterEnhancer_Revision/PowerFigure/power_analysis_sted_1_power.pdf')

In [None]:
plt.figure(figsize=(12,8))
plt.imshow(mean_diff_heatmap, cmap='RdBu')

plt.colorbar(shrink=0.7)

plt.xticks(range(len(values_p2)), values_p2, rotation=90);
plt.xlabel(tested_parameters_names[1])

plt.yticks(range(len(values_p1)), values_p1);
plt.ylabel(tested_parameters_names[0])

plt.clim(-100, 100)

other_param_summary = ', '.join(f"{k}: {v}" for k,v in simulation_parameters.items() if np.isscalar(v))
plt.title(f"Difference in mean noiseless vs. noisy\n{other_param_summary}")

# annot_coord = (5, 17)
# val_at_arrow = mean_diff_heatmap[*annot_coord[::-1]] 
# plt.annotate(f"observed data: {val_at_arrow:.3f}", annot_coord, arrowprops=dict(arrowstyle='->'), xytext=(3, 23))

plt.savefig('/home/david/Documents/PromoterEnhancer_Revision/PowerFigure/power_analysis_sted_2_meandiff.pdf')

## OLD: hardcoded combinations

In [None]:
from concurrent.futures import ThreadPoolExecutor
from itertools import product

N_sims = 500
significance_cutoff = 0.05

# should be 3
dimensionality = 3

# data point pairs to simulate
N = 500

# mean distance of 1st set
mu = 150
# extra distance of 2nd set
delta_mus = [0, 5, 10, 15, 20, 25, 30, 35, 40, 45, 50]

# std. dev. of normal noise to be added to 
noise_sds = [0, 20, 40, 60, 80, 100, 120, 140, 160, 180, 200]

# bias of measurement (in random direction)
bias = 0


significance_heatmap = np.zeros((len(delta_mus), len(noise_sds)))

futures = []

with ThreadPoolExecutor() as tpe:
    for i, delta_mu in enumerate(delta_mus):
        for j, noise_sd in enumerate(noise_sds):
            
            def _sim_thread(noise_sd, delta_mu):
                pvals = []
                for _ in range(N_sims):
                    _, _, d1_noise, d2_noise = simulate_distance_pairs(bias, noise_sd, mu, delta_mu, N, dimensionality)
                    pvals.append(mannwhitneyu(d1_noise, d2_noise).pvalue)
                return pvals
            
            futures.append(tpe.submit(_sim_thread, noise_sd, delta_mu))
            
    n_combos = len(delta_mus) * len(noise_sds)

    for f, (i, j) in tqdm(zip(futures, product(range(len(delta_mus)), range(len(noise_sds)))), total=n_combos):
        pvals = f.result()
        significance_heatmap[i,j] = (np.array(pvals) < significance_cutoff).mean()

# plt.imshow(result_heatmap)

In [None]:
plt.imshow(significance_heatmap, cmap='magma')
plt.colorbar()

plt.xticks(range(len(noise_sds)), noise_sds);
plt.xlabel("Noise SD")

plt.yticks(range(len(delta_mus)), delta_mus);
plt.ylabel("distance diff")

plt.clim(0, 1)
plt.title("Fraction of significant tests")

In [None]:
from concurrent.futures import ThreadPoolExecutor
from itertools import product

N_sims = 500
significance_cutoff = 0.05

# should be 3
dimensionality = 3

# data point pairs to simulate
N = 500

# mean distance of 1st set
mu = 150
# extra distance of 2nd set
delta_mus = [0, 5, 10, 15, 20, 25, 30, 35, 40, 45, 50]

# std. dev. of normal noise to be added to 
noise_sd = 0

# bias of measurement (in random direction)
biases = [0, 20, 40, 60, 80, 100, 120, 140, 160, 180, 200]


significance_heatmap = np.zeros((len(delta_mus), len(biases)))

futures = []

with ThreadPoolExecutor() as tpe:
    for i, delta_mu in enumerate(delta_mus):
        for j, bias in enumerate(biases):
            
            def _sim_thread(bias, delta_mu):
                pvals = []
                for _ in range(N_sims):
                    _, _, d1_noise, d2_noise = simulate_distance_pairs(bias, noise_sd, mu, delta_mu, N, dimensionality)
                    pvals.append(mannwhitneyu(d1_noise, d2_noise).pvalue)
                return pvals
            
            futures.append(tpe.submit(_sim_thread, bias, delta_mu))
            
    n_combos = len(delta_mus) * len(biases)

    for f, (i, j) in tqdm(zip(futures, product(range(len(delta_mus)), range(len(biases)))), total=n_combos):
        pvals = f.result()
        significance_heatmap[i,j] = (np.array(pvals) < significance_cutoff).mean()

In [None]:
plt.imshow(significance_heatmap, cmap='magma')
plt.colorbar()

plt.xticks(range(len(biases)), biases);
plt.xlabel("bias")

plt.yticks(range(len(delta_mus)), delta_mus);
plt.ylabel("distance diff")

plt.clim(0, 1)
plt.title("Fraction of significant tests")


In [None]:
from concurrent.futures import ThreadPoolExecutor
from itertools import product

N_sims = 500
significance_cutoff = 0.05

# should be 3
dimensionality = 3

# data point pairs to simulate
Ns = [100, 200, 300, 400, 500, 600, 700, 800, 900, 1000]

# mean distance of 1st set
mu = 150

# extra distance of 2nd set
delta_mus = [0, 5, 10, 15, 20, 25, 30, 35, 40, 45, 50]

# std. dev. of normal noise to be added to 
noise_sd = 0

# bias of measurement (in random direction)
bias = 0


significance_heatmap = np.zeros((len(delta_mus), len(Ns)))

futures = []

with ThreadPoolExecutor() as tpe:
    for i, delta_mu in enumerate(delta_mus):
        for j, N in enumerate(Ns):
            
            def _sim_thread(N, delta_mu):
                pvals = []
                for _ in range(N_sims):
                    _, _, d1_noise, d2_noise = simulate_distance_pairs(bias, noise_sd, mu, delta_mu, N, dimensionality)
                    pvals.append(mannwhitneyu(d1_noise, d2_noise).pvalue)
                return pvals
            
            futures.append(tpe.submit(_sim_thread, N, delta_mu))
            
    n_combos = len(delta_mus) * len(Ns)

    for f, (i, j) in tqdm(zip(futures, product(range(len(delta_mus)), range(len(Ns)))), total=n_combos):
        pvals = f.result()
        significance_heatmap[i,j] = (np.array(pvals) < significance_cutoff).mean()

In [None]:
plt.imshow(significance_heatmap, cmap='magma')
plt.colorbar()

plt.xticks(range(len(Ns)), Ns);
plt.xlabel("N")

plt.yticks(range(len(delta_mus)), delta_mus);
plt.ylabel("distance diff")

plt.clim(0, 1)
plt.title("Fraction of significant tests")

### Define own version of Maxwell

(assumed bug in builtin but was due to wrong parameterization, no longer necessary)

In [None]:
from scipy.stats import rv_continuous

class maxwell2(rv_continuous):
    def _pdf(self, x, a):
        return np.sqrt(2.0/np.pi) * x**2 / a**3 * np.exp(-x**2/2/a**2)