<a href="https://colab.research.google.com/github/DMZ0/individuality-power-analysis/blob/main/power_analysis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [18]:
import numpy as np
import pandas as pd
from scipy.stats import zscore, rankdata, beta
from scipy.special import logit, polygamma
from google.colab import files

rng = np.random.default_rng()

def corr(x, y, axis=-1, corr_type='pearson'):
    if corr_type == 'spearman':
        x = rankdata(x, axis=axis)
        y = rankdata(y, axis=axis)
    z_x = zscore(x, axis=axis, ddof=1)
    z_y = zscore(y, axis=axis, ddof=1)
    n = z_x.size if axis is None else z_x.shape[axis]
    return np.sum(z_x * z_y, axis=axis) / (n - 1)

def power_sim(rho, mu_p=0.5, sig_p=0.1, mu_n=180, sig_n=120, n_min=50,
              num_behavior=48, num_dissect=48, num_sim=2000, num_perm=2000, alpha=0.05,
              pval_method='normal', alternative='two-sided', corr_type='pearson'):
    """
    Estimates the power to reject the null hypothesis of zero correlation in a typical experiment
    to measure the correlation across individuals between some proportion-based behavioral metric
    and a neural signal of interest.

    Assumes that the behavioral metric p_obs is a noisy estimate of the logistic transform p of a
    latent (time-independent) bias parameter x = logit(p), which exhibits a true (population)
    correlation `rho` with the neural signal of interest y. The latter is assumed to follow a
    normal distribution conditional on x. The value of p for an individual fly is drawn from a
    beta distribution with mean `mu_p` and `std sig_p`, then estimated, up to binomial noise, by
    observing the outcome of some number n of binary decisions. The number of decisions n is also
    random and assumed to follow a negative binomial distribution with mean `mu_n` and std `sig_n`,
    except that we truncate the left tail of the negative binomial by ignoring flies with fewer
    than `n_min` observed decisions.

    Of our initial dataset of `num_behavior` flies with at least `n_min` decisions, we measure the
    neural signal y in the top `num_dissect` most biased flies (i.e., the `num_dissect` flies whose
    estimated x values are highest in absolute z-score). After applying an optional rank transform
    to the simulated data (x, y), we obtain a P-value either by approximating the logistic-beta
    distribution of x as a Gaussian and using the analytic form of the resulting null distribution
    of the sample correlation, or by a permutation test (i.e., comparing the observed sample
    correlation to that obtained after shuffling the labels of the simulated data). The power is
    then estimated as the fraction of simulations on which the P-value is less than `alpha`.

    Parameters
    ----------
    rho : float
        Effect size, measured as the population correlation coefficient (Pearson) between the
        individual bias parameter p and the neural signal of interest.
    mu_p : float, optional
        Mean of the (beta) distribution of p ∈ [0, 1]. Default is 0.5.
    sig_p : float, optional
        Std of the (beta) distribution of p. Must be less than `(mu_p * (1 - mu_p))**0.5`.
        Default is 0.1.
    mu_n : float, optional
        Mean of the (neg binom) distribution of the num of observed decisions n per individual
        before enforcing cutoff of n_min. Default is 180.
    sig_n : float, optional
        Std of the (neg binom) distribution of n before enforcing cutoff of n_min. Must be
        greater than `mu_n**0.5`. Default is 120.
    n_min : int, optional
        Minimum value of n for individuals in each simulated behavioral dataset. Default is 50.
    num_behavior : int, optional
        Number of individuals in each simulated behavioral dataset. Default is 48.
    num_dissected : int, optional
        Number of individuals sampled for dissection (i.e., processing for measurement of the
        neural signal of interest) from the behavioral dataset. Individuals are sampled in order
        of the magnitude of their z-scored estimated bias. Must not be greater than num_behavior.
        Default is 48.
    num_sim : int, optional
        Number of simulations over which to average for calculating the power. Default is 2000.
    num_perm : int, optional
        Number of permutations of a simulated dataset over which to average for calculating the
        P-value of the null hypothesis H_0. Default is 2000.
        This option is ignored when `method == 'normal'`.
    alpha : float, optional
        Significance level at which to test H_0. Default is 0.05.
    pval_method : {'normal', 'permutation'}, optional
        Defines the method used to calculate the P-value of the null hypothesis. Default is
        'normal'. If 'permutation', then the P-value is calculated by averaging over num_perm
        shuffles of each simulated dataset (can be quite expensive).
    alternative : {'two-sided', 'one-sided'}, optional
        Defines the alternative hypothesis H_1. Default is 'two-sided'. If 'one-sided', then H_1
        assumes that the correlation has the same sign as `rho`.
    corr_type : {'pearson', 'spearman'}, optional
        Defines the test statistic. Default is 'pearson'. If 'spearman', then the simulated data
        are rank-transformed prior to computing the usual Pearson correlation.

    Returns
    -------
    power : float
        Estimated power to reject the null hypothesis of zero correlation, assuming a true
        correlation `rho`.
    """
    q = mu_n**2 / (sig_n**2 - mu_n)
    n = rng.negative_binomial(q, mu_n/sig_n**2, size=(num_sim, num_behavior))
    while np.any(n < n_min):
        n[n < n_min] = rng.negative_binomial(q, mu_n/sig_n**2, size=n[n < n_min].size)
    nu = mu_p*(1-mu_p)/sig_p**2 - 1
    a = mu_p * nu
    b = (1-mu_p) * nu
    p = rng.beta(a, b, size=n.shape)
    x = logit(p)
    mu_x = polygamma(0, a) - polygamma(0, b)
    sig_x = np.sqrt(polygamma(1, a) + polygamma(1, b))
    p_obs = rng.binomial(n, p) / n
    x_obs = logit(p_obs)
    z_obs = zscore(x_obs, axis=-1, ddof=1)
    min_ord = num_behavior - num_dissect
    ind = np.argpartition(z_obs**2, min_ord)[:,min_ord:]
    x_ext = np.take_along_axis(x, ind, axis=1)
    x_obs_ext = np.take_along_axis(x_obs, ind, axis=1)
    y = rng.normal(loc=rho * (x_ext-mu_x) / sig_x, scale=np.sqrt(1 - rho**2))
    r = corr(x_obs_ext, y, corr_type=corr_type)
    if pval_method == 'permutation':
        y_perm = rng.permuted(np.tile(y, (num_perm, 1, 1)), axis=-1)
        r_perm = corr(x_obs_ext, y_perm, corr_type=corr_type)
        if alternative == 'two-sided':
            P = np.mean(np.abs(r_perm) > np.abs(r), axis=0)
        else:
            rho_sign = np.sign(rho)
            P = np.mean(rho_sign*r_perm > rho_sign*r, axis=0)
    else:
        s = num_dissect/2 - 1
        P = beta.cdf(-np.abs(r), s, s, loc=-1, scale=2)
        if alternative == 'two-sided':
            P *= 2
    return np.mean(P < alpha)

def get_power_data(rhos, nums_dissect, nums_behavior, filename=None, download=False, **kwargs):
    """
    Wrapper function that calls `power_sim` on a rectangular grid of values for the params
    `rho`, `num_dissect`, and `num_behavior`. Returns power estimates in the form of a pandas
    dataframe. Optionally saves and/or downloads the data as a csv file.
    """
    power_data = []
    for rho in rhos:
        for num_dissect in nums_dissect:
            for num_behavior in nums_behavior:
                power_data.append({"num_behavior": num_behavior,
                                   "num_dissect": num_dissect,
                                   "rho": rho,
                                   "power": power_sim(rho,
                                                      num_behavior=num_behavior,
                                                      num_dissect=num_dissect,
                                                      **kwargs)})
    df = pd.DataFrame(power_data)
    if filename is not None:
        df.to_csv(filename, index=False)
        if download:
            files.download(filename)
    return df

# this example takes ~30 min to run due to the large value of num_sim
# df = get_power_data(rhos=[0.1, 0.15, 0.2, 0.25, 0.3, 0.4],
#                     nums_dissect=[12, 24, 48, 72, 96, 144, 192],
#                     nums_behavior=[200, 400, 800, 1200, 1800, 2600, 4000],
#                     filename="power_data.csv", download=True,
#                     mu_p=0.585, sig_p=0.085, mu_n=180, sig_n=120, n_min=40, num_sim=10**4)

df = pd.read_csv(
    "https://raw.githubusercontent.com/DMZ0/individuality-power-analysis/main/power_data.csv"
)

In [None]:
# @title
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from scipy.interpolate import RectBivariateSpline

rho_vals = df["rho"].unique()

steps = []
initial_step = 0
num_traces_within_step = 2

fig = make_subplots(
    rows=1, cols=2,
    column_widths=[0.55, 0.45],
    specs=[[{'type': 'scene'}, {'type': 'xy'}]],
)

for j, rho in enumerate(rho_vals):
    df_sub = df[df["rho"] == rho]
    df_sub_wide = df_sub.pivot(index="num_behavior", columns="num_dissect", values="power").dropna()
    x = df_sub_wide.index.to_numpy()
    y = df_sub_wide.columns.to_numpy()
    interp = RectBivariateSpline(x, y, df_sub_wide.to_numpy(), s=0.0005, kx=3, ky=3)
    xx = np.linspace(x.min(), x.max(), num=50)
    yy = np.linspace(y.min(), y.max(), num=50)
    ff = interp(xx, yy)
    fig.add_trace(
        go.Surface(
            visible=(j == initial_step), x=xx, y=yy, z=ff, cmin=0, cmax=1, showscale=False,
            contours = {'z': {'show': True, 'start': 0.05, 'end': 0.95, 'size': 0.1}},
        ), row=1, col=1
    )
    fig.add_trace(
        go.Contour(
            visible=(j == initial_step), x=xx, y=yy, z=ff,
            contours={'start': 0.05, 'end': 0.95, 'size': 0.1},
            colorbar={'title': "Power",
                      'titleside': 'top',
                      'titlefont': {'family': 'Old Standard TT', 'size': 18},
                      'tickvals': np.linspace(0.05, 0.95, num=10)
            }
        ), row=1, col=2
    )
    step = {
        'method': 'update',
        'args': [{'visible': [False] * num_traces_within_step * len(rho_vals)}],
        'label': f"{rho}"
    }
    for k in range(num_traces_within_step):
        step['args'][0]['visible'][num_traces_within_step*j + k] = True
    steps.append(step)

fig.update_layout(
    sliders=[{
        'active': initial_step,
        'currentvalue': {'prefix': "rho = "},
        'pad': {'t': 50},
        'steps': steps
    }],
    scene_camera={
        'up': {'x': 0, 'y': 0, 'z': 1},
        'center': {'x': -0.07, 'y': 0.05, 'z': -0.2},
        'eye': {'x': -1.3, 'y': -1.3, 'z': 0.6}
    },
    autosize=False,
    width=920,
    height=500,
    scene={'zaxis': {'range': [0, 1]}},
    margin={'l': 20, 'r': 20, 't':40, 'b':20},
    xaxis={'title': "$N_\\text{behavior}$"},
    yaxis={'title': "$N_\\text{dissect}$"}
)

fig.update_scenes(
    xaxis_title_text="num behavior",
    yaxis_title_text="num dissected",
    zaxis_title_text="power"
)

fig.show()

In [None]:
# @title
from plotly.subplots import make_subplots
import plotly.graph_objects as go
from matplotlib import cm

df2 = df.copy()
df2.loc[df2["num_behavior"]==df2["num_dissect"], "num_behavior"] = "num dissect"
num_behavior_vals = df2["num_behavior"].unique()
num_dissect_vals = df2["num_dissect"].unique()
rho_vals = df2["rho"].unique()

steps = []
initial_step = 0
num_traces_within_step = len(rho_vals) + len(num_dissect_vals)

fig = make_subplots(rows=1, cols=2)

for i, num_behavior in enumerate(num_behavior_vals):
    for j, rho in enumerate(rho_vals):
        df_sub = df2[(df2["num_behavior"] == num_behavior) & (df2["rho"] == rho)]
        fig.add_trace(
            go.Scatter(
                visible=(i == initial_step),
                name=f"$\\rho = {rho}$",
                x=df_sub["num_dissect"],
                y=df_sub["power"],
                legendgroup="rho",
                line={'color': f'rgb{cm.plasma(0.8*rho/rho_vals.max())[:3]}'}
            ), row=1, col=1
        )
    for j, num_dissect in enumerate(num_dissect_vals):
        df_sub = df2[(df2["num_behavior"]==num_behavior) & (df2["num_dissect"]==num_dissect)]
        fig.add_trace(
            go.Scatter(
                visible=(i == initial_step),
                name=f"$N_\\text{{dissect}} = {num_dissect}$",
                x=df_sub["rho"],
                y=df_sub["power"],
                legendgroup="num dissected",
                line={'color': f'rgb{cm.viridis(0.2 + 0.6*num_dissect/num_dissect_vals.max())[:3]}'}
            ), row=1, col=2
        )
    step = {
        'method': 'update',
        'args': [{'visible': [False] * num_traces_within_step * len(num_behavior_vals)}],
        'label': f"{num_behavior}"
    }
    for k in range(num_traces_within_step):
        step['args'][0]['visible'][num_traces_within_step*i + k] = True
    steps.append(step)

fig.update_layout(
    sliders=[{
        'active': initial_step,
        'currentvalue': {'prefix': "num behavior = "},
        'pad': {'t': 50},
        'steps': steps
    }],
    autosize=False,
    width=800,
    height=500,
    xaxis={'title': "$N_\\text{dissected}$"},
    xaxis2={'title': "$\\text{Effect size}~\\rho$"},
    yaxis={
        'title_text': "$\\text{Power}$",
        'tickmode': 'array',
        'titlefont': {'size': 16},
        'range': [0, 1]
    },
    yaxis2={'range': [0, 1]}
)

fig.show()

In [None]:
# @title
from plotly.subplots import make_subplots
import plotly.graph_objects as go
from matplotlib import cm

num_behavior_vals = df["num_behavior"].unique()
num_dissect_vals = df["num_dissect"].unique()
rho_vals = df["rho"].unique()

steps = []
initial_step = 2
num_traces_within_step = len(rho_vals) + len(num_behavior_vals)

fig = make_subplots(rows=1, cols=2)

for i, num_dissect in enumerate(num_dissect_vals):
    for j, rho in enumerate(rho_vals):
        df_sub = df[(df["num_dissect"] == num_dissect) & (df["rho"] == rho)]
        fig.add_trace(
            go.Scatter(
                visible=(i == initial_step),
                name=f"$\\rho = {rho}$",
                x=df_sub["num_behavior"],
                y=df_sub["power"],
                legendgroup="rho",
                line={'color': f'rgb{cm.plasma(0.8*rho/rho_vals.max())[:3]}'}
            ), row=1, col=1
        )
    for j, num_behavior in enumerate(num_behavior_vals):
        df_sub = df[(df["num_behavior"] == num_behavior) & (df["num_dissect"] == num_dissect)]
        fig.add_trace(
            go.Scatter(
                visible=(i == initial_step),
                name=f"$N_\\text{{behavior}} = {num_behavior}$",
                x=df_sub["rho"],
                y=df_sub["power"],
                legendgroup="num behavior",
                line={'color': f'rgb{cm.viridis(np.log(num_behavior)/np.log(num_behavior_vals.max()) - 0.1)[:3]}'}
            ), row=1, col=2
        )
    step = {
        'method': 'update',
        'args': [{"visible": [False] * num_traces_within_step * len(num_dissect_vals)}],
        'label': f"{num_dissect}"
    }
    for j in range(num_traces_within_step):
        step['args'][0]['visible'][num_traces_within_step*i + j] = True
    steps.append(step)

fig.update_layout(
    sliders=[{
        'active': initial_step,
        'currentvalue': {'prefix': 'num dissect = '},
        'pad': {'t': 50},
        'steps': steps
    }],
    autosize=False,
    width=800,
    height=500,
    xaxis={'title': "$N_\\text{behavior}$"},
    xaxis2={'title': "$\\text{Effect size}~\\rho$"},
    yaxis={
        'title_text': "$\\text{Power}$",
        'tickmode': 'array',
        'titlefont': {'size': 16},
        'range': [0, 1]
    },
    yaxis2={'range': [0, 1]}
)

fig.show()