In [4]:
import numpy as np
import torch
from scipy.stats import t, ttest_1samp, rankdata, spearmanr

# 1) BH on p-values (NumPy)
def bh_pvalue(pvals, q=0.05):
    M = pvals.size
    order = np.argsort(pvals)
    thresh = (np.arange(1, M+1) * q / M)
    leq = pvals[order] <= thresh
    keep = np.zeros(M, bool)
    if np.any(leq):
        k = np.max(np.where(leq)[0]) + 1
        keep[order[:k]] = True
    return keep

# 2) BH on t-stats (NumPy)
def bh_tstat_numpy(t_stats, df, q=0.05):
    M = t_stats.size
    order = np.argsort(-np.abs(t_stats))
    t_desc = np.abs(t_stats[order])
    alpha_r = np.arange(1, M+1) * q / M
    crit = t.ppf(1 - alpha_r/2, df=df)
    hits = t_desc >= crit
    keep = np.zeros(M, bool)
    if np.any(hits):
        r = np.max(np.where(hits)[0]) + 1
        keep[order[:r]] = True
    return keep

# 3) BH on t-stats (PyTorch) with precomputed critical values
def bh_tstat_torch_precomputed(t_stats: torch.Tensor, crit: torch.Tensor) -> torch.BoolTensor:
    flat = t_stats.flatten()
    M = flat.numel()
    device = flat.device

    abs_flat = flat.abs()
    order = torch.argsort(abs_flat, descending=True)
    t_desc = abs_flat[order]

    hits = t_desc >= crit
    keep_flat = torch.zeros(M, dtype=torch.bool, device=device)
    if hits.any():
        last = torch.nonzero(hits, as_tuple=False).max()
        keep_flat[order[: last.item() + 1 ]] = True

    return keep_flat.view(t_stats.shape)

# —— Main comparison script —— 
np.random.seed(0)
M, m, df, q = 20000, 30, 29, 0.05

# Simulate data & small true shifts
data = np.random.randn(M, m)
signal_idx = np.random.choice(M, size=int(0.05*M), replace=False)
data[signal_idx] += 0.5

# Compute t-stats & p-values
t_stats = np.empty(M); pvals = np.empty(M)
for i in range(M):
    t_stats[i], pvals[i] = ttest_1samp(data[i], popmean=0.0)

# Precompute critical curve once in NumPy, move to torch
alpha_r = np.arange(1, M+1) * (q / M)
crit_np  = t.ppf(1 - alpha_r/2, df=df)              # length-M
device   = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
crit_torch = torch.tensor(crit_np, device=device, dtype=torch.float32)

# Apply BH procedures
keep_p    = bh_pvalue(pvals, q=q)
keep_t_np = bh_tstat_numpy(t_stats, df=df, q=q)

t_torch   = torch.from_numpy(t_stats.astype(np.float32)).to(device)
keep_t_t  = bh_tstat_torch_precomputed(t_torch, crit_torch).cpu().numpy()

# Compare selections
print("Discoveries (p-value) :", keep_p.sum())
print("Discoveries (NumPy t) :", keep_t_np.sum())
print("Discoveries (Torch t) :", keep_t_t.sum())
print("NumPy vs Torch equal? :", np.array_equal(keep_t_np, keep_t_t))
print("p-value vs Torch equal?:", np.array_equal(keep_p, keep_t_t))

# Ranking Spearman ρ
rank_p = rankdata(pvals, method='ordinal')
rank_t = rankdata(-np.abs(t_stats), method='ordinal')
rho, _ = spearmanr(rank_p, rank_t)
print("Spearman ρ (p vs |t| ranks):", rho)


NB fits: 100%|███████████████████████████████████████████████████████████████████████████████████████| 50/50 [00:00<00:00, 461.39it/s]


Mean relative error (means):     0.041
Mean relative error (variances): 0.100
Fraction KS rejections (α=0.05): 0.00%
Frobenius-norm error (corr):     0.055


  res = hypotest_fun_out(*samples, **kwds)


In [8]:
import numpy as np
import pandas as pd
from scipy import stats
from statsmodels.discrete.discrete_model import NegativeBinomial
from sklearn.covariance import LedoitWolf
from tqdm import tqdm


def _fit_nb_mle(counts_1d):
    """
    Fit an intercept-only Negative Binomial by MLE.
    Returns
    -------
    mu : float
        Estimated mean (exp(intercept)).
    size : float
        Estimated NB size (r = 1/alpha).
    """
    endog = counts_1d
    exog = np.ones((len(endog), 1))
    model = NegativeBinomial(endog, exog)
    res = model.fit(disp=False, maxiter=100, method="bfgs")
    intercept = res.params[0]
    mu = np.exp(intercept)
    # dispersion alpha is last parameter in statsmodels >=0.13
    alpha = res.params_alpha if hasattr(res, 'params_alpha') else res.params[-1]
    size = 1.0 / alpha
    return mu, size


def _counts_to_uniform(counts, mu, size, rng, eps=1e-6):
    """
    Randomized PIT for NB-distributed counts:
    maps integer counts to Uniform(eps,1-eps).
    """
    lo = stats.nbinom.cdf(counts - 1, size, size / (size + mu))
    hi = stats.nbinom.cdf(counts,     size, size / (size + mu))
    u = lo + rng.random(counts.shape) * (hi - lo)
    return np.clip(u, eps, 1 - eps)


def fit_nb_per_gene(counts_df, progress=True):
    """
    Fit NB to each gene (row) independently.
    Returns Series mu and size indexed by gene.
    """
    mus, sizes = [], []
    iterator = tqdm(counts_df.iterrows(), total=counts_df.shape[0],
                    disable=not progress, desc="NB fits")
    for gene, row in iterator:
        mu, size = _fit_nb_mle(row.values)
        mus.append(mu)
        sizes.append(size)
    return pd.Series(mus, index=counts_df.index, name="mu"), \
           pd.Series(sizes, index=counts_df.index, name="size")


def fit_correlation(counts_df, mu, size, seed=0):
    """
    Estimate latent Gaussian correlation via PIT + Ledoit-Wolf shrinkage.
    Returns
    -------
    R_shrunk : ndarray (G×G)
        Shrunk correlation matrix.
    lambda_opt : float
        Ledoit-Wolf shrinkage intensity.
    """
    rng = np.random.default_rng(seed)
    G, C = counts_df.shape
    U = np.empty((G, C), float)
    for j, g in enumerate(counts_df.index):
        U[j] = _counts_to_uniform(counts_df.loc[g].values, mu[g], size[g], rng)

    Z = stats.norm.ppf(U)

    # Ledoit-Wolf on cells×genes
    lw = LedoitWolf().fit(Z.T)
    Sigma = lw.covariance_
    d = np.sqrt(np.diag(Sigma))
    R_shrunk = Sigma / np.outer(d, d)

    return R_shrunk, lw.shrinkage_


class NBCopulaModel:
    """
    NB-Gaussian copula model: stores mu, size, and latent correlation R.
    """
    def __init__(self, mu, size, R):
        self.mu, self.size, self.R = mu, size, R
        self._chol = np.linalg.cholesky(R)

    def simulate(self, n_cells, seed=0):
        rng = np.random.default_rng(seed)
        G = len(self.mu)
        Z = self._chol @ rng.standard_normal((G, n_cells))
        U = stats.norm.cdf(Z)
        counts = np.empty_like(U, dtype=int)
        for j, g in enumerate(self.mu.index):
            counts[j] = stats.nbinom.ppf(
                U[j], self.size[g], self.size[g] / (self.size[g] + self.mu[g])
            )
        return pd.DataFrame(counts, index=self.mu.index,
                            columns=[f"cell_{i}" for i in range(n_cells)])

        mu_hat, size_hat = fit_nb_per_gene(df, progress=False)
        R_hat, lam = fit_correlation(df, mu_hat, size_hat, seed=seed+1)
        model = NBCopulaModel(mu_hat, size_hat, R_hat)
        sim = model.simulate(C, seed=seed+2)
def test_nb_copula(G=50, C=1000, n_reps=50, seed=0):
    """
    Empirical assessment of NB-copula fitting & simulation.
    Adds marginal NB goodness-of-fit checks.

    Returns dict of (mean, std) for each metric.
    """
    rng = np.random.default_rng(seed)

    # true NB params & correlation
    true_mu   = rng.uniform(1, 6,  G)
    true_size = rng.uniform(0.5, 3, G)
    eig = rng.dirichlet(np.ones(G)) * G
    R_true = stats.random_correlation(eig, seed=seed).rvs()

    # containers
    metrics = {k: [] for k in [
        'mean_err','var_err','ks_reject','corr_err','R_err','gof_reject'
    ]}

    for rep in range(n_reps):
        # simulate ground-truth counts
        L = np.linalg.cholesky(R_true)
        Zg = L @ rng.standard_normal((G, C))
        Ug = stats.norm.cdf(Zg)
        counts = np.array([stats.nbinom.ppf(Ug[j], true_size[j],
                          true_size[j]/(true_size[j]+true_mu[j]))
                          for j in range(G)])
        df = pd.DataFrame(counts, index=[f"g{j}" for j in range(G)])

        # fit & simulate
        mu_hat, size_hat = fit_nb_per_gene(df, progress=False)
        R_hat, lam = fit_correlation(df, mu_hat, size_hat, seed=seed+1)
        model = NBCopulaModel(mu_hat, size_hat, R_hat)
        sim = model.simulate(C, seed=seed+2)

        # metrics
        metrics['mean_err'].append(np.mean(np.abs(sim.mean(1)-df.mean(1))/df.mean(1)))
        metrics['var_err'].append(np.mean(np.abs(sim.var(1)-df.var(1))/df.var(1)))
        ks_p = [stats.ks_2samp(sim.loc[g], df.loc[g]).pvalue for g in df.index]
        metrics['ks_reject'].append(np.mean(np.array(ks_p) < 0.05))
        metrics['corr_err'].append(
            np.linalg.norm(np.corrcoef(sim)-np.corrcoef(df), ord='fro')/G
        )
        metrics['R_err'].append(
            np.linalg.norm(R_hat-R_true, ord='fro')/G
        )
        # marginal NB goodness-of-fit via one-sample KS on randomized PIT
        gof = []
        for j, g in enumerate(df.index):
            u = _counts_to_uniform(df.loc[g].values, mu_hat[g], size_hat[g], rng)
            # test Uniform(0,1)
            _, pval = stats.kstest(u, 'uniform', args=(0,1))
            gof.append(pval)
        metrics['gof_reject'].append(np.mean(np.array(gof) < 0.05))

    # summarize
    return {k: (np.mean(v), np.std(v)) for k, v in metrics.items()}


if __name__ == '__main__':
    res = test_nb_copula()
    for name, (m, s) in res.items():
        print(f"{name}: mean={m:.3f}, sd={s:.3f}")


  res = hypotest_fun_out(*samples, **kwds)
  res = hypotest_fun_out(*samples, **kwds)
  res = hypotest_fun_out(*samples, **kwds)
  res = hypotest_fun_out(*samples, **kwds)
  res = hypotest_fun_out(*samples, **kwds)
  res = hypotest_fun_out(*samples, **kwds)
  res = hypotest_fun_out(*samples, **kwds)


mean_err: mean=0.024, sd=0.001
var_err: mean=0.066, sd=0.004
ks_reject: mean=0.001, sd=0.004
corr_err: mean=0.040, sd=0.001
R_err: mean=0.037, sd=0.001
gof_reject: mean=0.000, sd=0.003


  res = hypotest_fun_out(*samples, **kwds)
