In [2]:
import scanpy as sc

train = sc.read_h5ad("100_ind_subset.h5ad")
test = sc.read_h5ad("onek1k_annotated_test.h5ad")
train = train[train.obs.cell_type == 0]
test = test[test.obs.cell_type == 0]

In [3]:
import rdata
model = rdata.read_rds("./100_ind_0.rds")['0']
genes = model['gene_sel1']
sigGenes = [s.item() for s in model['marginal_param1']['dim_0']]
covMat = model['cov_mat']

In [4]:
combined = sc.concat([train, test], join="outer", label="batch")
combined = combined[:,sigGenes]
combined

View of AnnData object with n_obs × n_vars = 279535 × 58
    obs: 'individual', 'cell_type', 'cell_label', 'barcode_col', 'n_genes_by_counts', 'total_counts', 'total_counts_mt', 'pct_counts_mt', 'batch'

In [9]:
import numpy as np
import pandas as pd
import scipy.sparse as sp
from numpy.linalg import slogdet, pinv
import math

# -------------------------
# Helpers for sparse
# -------------------------
def is_sparse(X):
    return sp.issparse(X)

def center_sparse_rows(X):
    """Center rows of sparse matrix X (n×m) by subtracting column means."""
    if not is_sparse(X):
        return X - X.mean(axis=0), np.array(X.mean(axis=0)).ravel()
    col_means = np.array(X.mean(axis=0)).ravel()
    # subtract col_means from each row efficiently
    Xc = X - sp.csr_matrix(col_means).multiply(np.ones((X.shape[0], 1)))
    return Xc, col_means

def sample_cov_unbiased(X):
    """Compute unbiased covariance from observations × features sparse matrix."""
    n = X.shape[0]
    if n < 2:
        raise ValueError("Need at least 2 observations to compute sample covariance")
    if is_sparse(X):
        Xc, _ = center_sparse_rows(X)
        # (m×n) @ (n×m) = m×m covariance numerator
        cov = (Xc.T @ Xc) / (n - 1)
        return cov.toarray()  # result must be dense m×m
    else:
        Xc = X - X.mean(axis=0, keepdims=True)
        return (Xc.T @ Xc) / (n - 1)

def stable_logdet(A, eps=1e-12):
    A = np.asarray(A)
    A_j = A + eps * np.eye(A.shape[0])
    sign, ld = slogdet(A_j)
    if sign <= 0:
        vals = np.linalg.eigvalsh(A_j)
        vals = np.clip(vals, a_min=eps, a_max=None)
        return np.sum(np.log(vals))
    return ld

def frobenius_distance(A, B):
    return np.linalg.norm(A - B, ord="fro")

def logdet_difference(A, B):
    return stable_logdet(A) - stable_logdet(B)

def symmetric_kl_gaussian(S, T):
    p = S.shape[0]
    S_inv = pinv(S)
    T_inv = pinv(T)
    tr1 = np.trace(T_inv @ S)
    tr2 = np.trace(S_inv @ T)
    ldS = stable_logdet(S)
    ldT = stable_logdet(T)
    kl1 = 0.5 * (tr1 - p + ldT - ldS)
    kl2 = 0.5 * (tr2 - p + ldS - ldT)
    return kl1 + kl2

# -------------------------
# Sparse-aware main attack
# -------------------------
def per_individual_attack_sparse(
    adata,
    individual_key,
    S_published,
    n,
    n_trials=2000,
    metric="fro",
    seed=None
):
    if seed is not None:
        np.random.seed(seed)

    X = adata.X  # may be sparse
    n_prime, m = X.shape
    labels = np.asarray(adata.obs[individual_key])

    unique_inds = np.unique(labels)
    records = []

    # leave-one-individual-out scores
    for ind in unique_inds:
        print(ind)
        mask = (labels == ind)
        X_minus = X[~mask, :]
        if X_minus.shape[0] < 2:
            cov_minus = np.full((m, m), np.nan)
        else:
            cov_minus = sample_cov_unbiased(X_minus)

        if metric == "fro":
            score = frobenius_distance(S_published, cov_minus)
        elif metric == "logdet":
            score = abs(logdet_difference(S_published, cov_minus))
        elif metric == "kl":
            score = symmetric_kl_gaussian(S_published, cov_minus)
        else:
            raise ValueError("Unsupported metric")

        records.append({
            "individual": ind,
            "score_obs": score,
            "n_cells": mask.sum()
        })

    df = pd.DataFrame(records)

    # Monte-Carlo null distribution
    indices = np.arange(n_prime)
    sample_scores = np.empty(n_trials)
    for t in range(n_trials):
        chosen = np.random.choice(indices, size=n, replace=False)
        cov_chosen = sample_cov_unbiased(X[chosen, :])
        if metric == "fro":
            sample_scores[t] = frobenius_distance(S_published, cov_chosen)
        elif metric == "logdet":
            sample_scores[t] = abs(logdet_difference(S_published, cov_chosen))
        else:
            sample_scores[t] = symmetric_kl_gaussian(S_published, cov_chosen)

    # empirical p-values
    df["p_value_empirical"] = [
        (sample_scores <= s).mean() for s in df["score_obs"]
    ]

    return df, sample_scores


In [10]:
out = per_individual_attack_sparse(combined, "individual", covMat, 100)

1000_1001
1001_1002
1002_1003
1004_1005
1005_1006
1006_1007
1007_1008
1008_1009
1009_1010
1010_1011
1014_1015
1016_1017
1018_1019
101_101
1021_1022
1026_1027
1032_1033
1034_1035
1035_1036
1038_1039
1040_1041
1041_1042
1044_1045
1045_1046
1048_1049
1049_1050
104_104
1050_1051
1051_1052
1052_1053
1053_1054
1054_1055
1055_1056
1057_1058
1058_1059
1059_1060
105_105
1060_1061
1061_1062
1065_1066
1068_1069
1070_1071
1071_1072
1072_1073
1073_1074
1074_1075
1076_1077
1077_1078
1079_1080
1081_1082
108_108
109_109
111_111
112_112
114_114
116_116
117_117
119_119
11_11
120_120
121_121
122_122
123_123
125_125
126_126
130_130
131_131
132_132
133_133
134_134
138_138
139_139
144_144
145_145
151_151
152_152
154_154
159_159
15_15
161_161
163_163
165_166
166_167
168_169
169_170
16_16
171_172
176_177
178_179
179_180
17_17
183_184
184_185
186_187
187_188
189_190
190_191
193_194
196_197
197_198
19_19
1_1
200_201
203_204
204_205
208_209
211_212
213_214
214_215
215_216
219_220
220_221
222_223
223_224
224_225


In [38]:
from sklearn.metrics import roc_auc_score

pred_members = set(out[0].sort_values('p_value_empirical')[:100].individual)
pred_arr = [ind in pred_members for ind in combined.obs.individual.unique()]
actual_members = set(train.obs.individual.unique())
actual_arr = [ind in actual_members for ind in combined.obs.individual.unique()]
auroc = roc_auc_score(actual_arr, pred_arr)
print(auroc)

0.4944602851323828
