In [1]:
import numpy as np
import pandas as pd
from typing import List, Dict, Tuple
import math
import random

from bravado.client import SwaggerClient
cbioportal = SwaggerClient.from_url('https://www.cbioportal.org/api/v2/api-docs',
                                config={"validate_requests":False,"validate_responses":False,"validate_swagger_spec":False})

In [2]:
def fetch_binary_matrix(
    cbioportal,
    study_id: str,
    sample_list_id: str,
    molecular_profile_id: str,
    prefix_patient_ids: bool = True,
) -> pd.DataFrame:
    mutations = cbioportal.Mutations.getMutationsInMolecularProfileBySampleListIdUsingGET(
        molecularProfileId=molecular_profile_id,
        sampleListId=sample_list_id,
        projection='DETAILED'
    ).result()

    mutation_rows = []
    for m in mutations:
        gene = m["gene"]["hugoGeneSymbol"]
        patient = m["patientId"]
        proteinChange = m["proteinChange"]
        proteinPosStart = m['proteinPosStart']
        variantType = m['variantType']
        if prefix_patient_ids:
            patient = f"{study_id}__{patient}"

        if variantType == 'SNP':
            mutation = f"{gene}_{proteinPosStart}"
            mutation_rows.append({"patientId": patient, "gene": mutation})

    df = pd.DataFrame(mutation_rows)
    if df.empty:
        return pd.DataFrame()

    binary_matrix = (
        df.assign(value=1)
          .drop_duplicates(["patientId", "gene"]) 
          .pivot_table(
              index="patientId",
              columns="gene",
              values="value",
              aggfunc="max",
              fill_value=0
           )
          .astype(int)
    )

    binary_matrix.columns.name = None
    binary_matrix.index.name = None

    return binary_matrix


def build_background_matrix(
    cbioportal,
    study_specs: List[Dict[str, str]]
) -> pd.DataFrame:

    matrices = []
    for spec in study_specs:
        mat = fetch_binary_matrix(
            cbioportal,
            study_id=spec["study_id"],
            sample_list_id=spec["sample_list_id"],
            molecular_profile_id=spec["molecular_profile_id"],
            prefix_patient_ids=True
        )
        if mat is not None and mat.shape[0] > 0:
            matrices.append(mat)

    if not matrices:
        return pd.DataFrame()

    
    all_genes = sorted(set.union(*(set(m.columns) for m in matrices))) # union-align columns
    mats_aligned = [m.reindex(columns=all_genes, fill_value=0).astype(int) for m in matrices]

    background = pd.concat(mats_aligned, axis=0)
    background = background.groupby(background.index).max()

    return background


def split_members(
    matrix_D: pd.DataFrame,
    frac_train: float = 0.8,
    rng_seed: int = 0
) -> Tuple[pd.DataFrame, pd.DataFrame]:
    rng = np.random.default_rng(rng_seed)
    patient_ids = matrix_D.index.to_numpy()
    rng.shuffle(patient_ids)

    split_idx = int(len(patient_ids) * frac_train)
    train_ids = patient_ids[:split_idx]
    heldout_ids = patient_ids[split_idx:]

    train_in = matrix_D.loc[train_ids].copy()
    held_out_members = matrix_D.loc[heldout_ids].copy()

    return train_in, held_out_members

def align_gene_space(
    train_in: pd.DataFrame,
    held_out_members: pd.DataFrame,
    background_out: pd.DataFrame
) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.Index]:
    """
    Ensures all dataframes have the same number of genes
    """
    all_genes = sorted(
        set(train_in.columns)
        | set(held_out_members.columns)
        | set(background_out.columns)
    )

    def _align(df):
        return df.reindex(columns=all_genes, fill_value=0).astype(int)

    return _align(train_in), _align(held_out_members), _align(background_out), pd.Index(all_genes)


In [3]:
def estimate_freqs(
    matrix: pd.DataFrame,
    alpha: float = 1.0
) -> pd.Series:
    """
    p(gene) = (count_mutated + alpha) / (num_patients + 2*alpha). Laplace-smoothing for rounding issues
    """
    num_patients = matrix.shape[0]
    gene_counts = matrix.sum(axis=0)
    p = (gene_counts + alpha) / (num_patients + 2 * alpha)
    return p.astype(float)


def llr_score(
    observed_genes: List[str],
    p_in: pd.Series,
    p_out: pd.Series
) -> float:
    """
    Compute sum_{g in observed_genes} log( p_in[g] / p_out[g] ).
    """
    score = 0.0
    for g in observed_genes:
        if g not in p_in.index or g not in p_out.index:
            continue
        pin = p_in[g]
        pout = p_out[g]
        if pin <= 0 or pout <= 0:
            continue
        score += math.log(pin / pout)
    return -1.0 * score

def sample_variant_subset(
    row: pd.Series,
    m: int,
    rng: np.random.Generator
) -> List[str]:
    mutated_genes = list(row.index[row.values == 1])
    if len(mutated_genes) == 0:
        return []
    if m >= len(mutated_genes):
        return mutated_genes
    # sample without replacement
    subset = rng.choice(mutated_genes, size=m, replace=False)
    return list(subset)


def get_score_distributions(
    members_df: pd.DataFrame,
    nonmembers_df: pd.DataFrame,
    p_in: pd.Series,
    p_out: pd.Series,
    m: int,
    draws_per_individual: int = 5,
    rng_seed: int = 0
) -> Tuple[List[float], List[float]]:
    rng = np.random.default_rng(rng_seed)

    member_scores = []
    for pid, row in members_df.iterrows():
        for _ in range(draws_per_individual):
            genes_subset = sample_variant_subset(row, m, rng)
            if len(genes_subset) == 0:
                continue
            score = llr_score(genes_subset, p_in, p_out)
            member_scores.append(score)

    nonmember_scores = []
    for pid, row in nonmembers_df.iterrows():
        for _ in range(draws_per_individual):
            genes_subset = sample_variant_subset(row, m, rng)
            if len(genes_subset) == 0:
                continue
            score = llr_score(genes_subset, p_in, p_out)
            nonmember_scores.append(score)

    return member_scores, nonmember_scores


In [4]:
def roc_curve_from_scores(
    member_scores: List[float],
    nonmember_scores: List[float]
) -> Tuple[np.ndarray, np.ndarray, np.ndarray, float]:

    scores = np.array(member_scores + nonmember_scores, dtype=float)
    labels = np.array([1]*len(member_scores) + [0]*len(nonmember_scores), dtype=int)
    thresholds = np.sort(np.unique(scores))[::-1]

    tpr_list = []
    fpr_list = []

    for thr in thresholds:
        preds = (scores >= thr).astype(int)

        tp = np.sum((preds == 1) & (labels == 1))
        fp = np.sum((preds == 1) & (labels == 0))
        fn = np.sum((preds == 0) & (labels == 1))
        tn = np.sum((preds == 0) & (labels == 0))

        tpr = tp / (tp + fn) if (tp + fn) > 0 else 0.0
        fpr = fp / (fp + tn) if (fp + tn) > 0 else 0.0

        tpr_list.append(tpr)
        fpr_list.append(fpr)

    tpr_arr = np.array(tpr_list)
    fpr_arr = np.array(fpr_list)

    order = np.argsort(fpr_arr)
    fpr_sorted = fpr_arr[order]
    tpr_sorted = tpr_arr[order]
    auc = np.trapz(tpr_sorted, fpr_sorted)

    return thresholds, fpr_arr, tpr_arr, auc


def best_operating_point(
    thresholds: np.ndarray,
    fpr_arr: np.ndarray,
    tpr_arr: np.ndarray,
    max_fpr: float = 0.01
) -> Dict[str, float]:
    """
    Among all thresholds that keep FPR <= max_fpr,
    return the one with the highest TPR.

    If none satisfy, return {}.
    """
    ok = np.where(fpr_arr <= max_fpr)[0]
    if len(ok) == 0:
        return {}
    idx = ok[np.argmax(tpr_arr[ok])]
    return {
        "threshold": float(thresholds[idx]),
        "fpr": float(fpr_arr[idx]),
        "tpr": float(tpr_arr[idx]),
    }

def run_open_world_eval(
    cbioportal,
    target_study: Dict[str, str],
    background_studies: List[Dict[str, str]],
    m_list: List[int] = [1,2,3,5,10,20,30],
    frac_train: float = 0.8,
    draws_per_individual: int = 5,
    rng_seed: int = 0
):
    """
    Full pipeline:
    1. Fetch dataset D (target_study).
    2. Fetch background (concat all background_studies).
    3. Split D into train_in vs held_out_members.
    4. Align gene columns.
    5. Estimate p_in, p_out.
    6. For each m, simulate attack, compute ROC, AUC, best operating point.
    """

    # 1. pull D
    D_matrix = fetch_binary_matrix(
        cbioportal,
        study_id=target_study["study_id"],
        sample_list_id=target_study["sample_list_id"],
        molecular_profile_id=target_study["molecular_profile_id"],
        prefix_patient_ids=True
    )

    # 2. pull background
    B_matrix = build_background_matrix(cbioportal, background_studies)

    # 3. split D into train (to estimate p_in) and held-out positives (eval)
    train_in, held_out_members = split_members(D_matrix, frac_train=frac_train, rng_seed=rng_seed)
    outsiders = B_matrix.copy()
    
    # 4. align gene space
    train_in, held_out_members, outsiders, genes = align_gene_space(train_in, held_out_members, outsiders)
    train_out, held_out_outsiders = split_members(outsiders, frac_train=frac_train, rng_seed=rng_seed)
    
    # 5. estimate p_in and p_out
    p_in = estimate_freqs(train_in, alpha=1.0)
    p_out = estimate_freqs(train_out, alpha=1.0)

    results = {}

    for m in m_list:
        # 6a. sample subsets and build score distributions
        member_scores, nonmember_scores = get_score_distributions(
            held_out_members,
            held_out_outsiders,
            p_in,
            p_out,
            m=m,
            draws_per_individual=draws_per_individual,
            rng_seed=rng_seed
        )

        # If either is empty (edge case: no mutations), skip
        if len(member_scores) == 0 or len(nonmember_scores) == 0:
            results[m] = {
                "auc": None,
                "best_op_at_1pctFPR": None,
                "member_scores": member_scores,
                "nonmember_scores": nonmember_scores,
            }
            continue

        # 6b. ROC
        thresholds, fpr_arr, tpr_arr, auc = roc_curve_from_scores(member_scores, nonmember_scores)

        # 6c. pick operating point with FPR <= 1%
        op = best_operating_point(thresholds, fpr_arr, tpr_arr, max_fpr=0.01)

        results[m] = {
            "auc": auc,
            "best_op_at_1pctFPR": op,
            "member_scores": member_scores,
            "nonmember_scores": nonmember_scores,
            "roc_thresholds": thresholds,
            "roc_fpr": fpr_arr,
            "roc_tpr": tpr_arr,
        }

    return {
        "train_in_shape": train_in.shape,
        "held_out_members_shape": held_out_members.shape,
        "outsiders_shape": outsiders.shape,
        "p_in": p_in,
        "p_out": p_out,
        "results": results,
    }

In [None]:
target_study = {
    "study_id": "breast_msk_2025",
    "sample_list_id": "breast_msk_2025_all",
    "molecular_profile_id": "breast_msk_2025_mutations"
}

background_studies = [
    {
        "study_id": "breast_cptac_gdc",
        "sample_list_id": "breast_cptac_gdc_all",
        "molecular_profile_id": "breast_cptac_gdc_mutations"
    },
    {
        "study_id": "brca_hta9_htan_2022",
        "sample_list_id": "brca_hta9_htan_2022_all",
        "molecular_profile_id": "brca_hta9_htan_2022_mutations"
    },
    {
        "study_id": "brca_metabric",
        "sample_list_id": "brca_metabric",
        "molecular_profile_id": "brca_metabric_mutations"
    },
    {
        "study_id": "breast_msk_2018",
        "sample_list_id": "breast_msk_2018_all",
        "molecular_profile_id": "breast_msk_2018_mutations"
    },
    {
        "study_id": "brca_pareja_msk_2020",
        "sample_list_id": "brca_pareja_msk_2020_all",
        "molecular_profile_id": "brca_pareja_msk_2020_mutations"
    },
    {
        "study_id": "breast_alpelisib_2020",
        "sample_list_id": "breast_alpelisib_2020_all",
        "molecular_profile_id": "breast_alpelisib_2020_mutations"
    },

]


attack_report = run_open_world_eval(
    cbioportal,
    target_study=target_study,
    background_studies=background_studies,
    m_list=[1,2,3,5,10,20,30,40,50],
    frac_train=0.8,
    draws_per_individual=10,
    rng_seed=42
)

In [None]:
m = 10
res_m10 = attack_report["results"][m]
print("AUC:", res_m10["auc"])
print("Operating point @1% FPR:", res_m10["best_op_at_1pctFPR"])

AUC: 0.9426487554419837
Operating point @1% FPR: {'threshold': 0.9890249733355252, 'fpr': 0.044444444444444446, 'tpr': 0.7189097103918228}


In [None]:
D_matrix = fetch_binary_matrix(
    cbioportal,
    study_id=target_study["study_id"],
    sample_list_id=target_study["sample_list_id"],
    molecular_profile_id=target_study["molecular_profile_id"],
    prefix_patient_ids=True
)

In [None]:
def posterior_from_llr(llr: float, prior_in: float) -> float:
    """
    Turn an LLR score into "probability they're in the dataset",
    under a prior P(in)=prior_in.

    Using Bayes:
    posterior = 1 / (1 + ((1-prior_in)/prior_in) * exp(-LLR))

    prior_in should reflect how plausible it is *a priori*
    that the person is in D (e.g. |D| / population_of_interest ).
    """
    odds_prior = prior_in / (1.0 - prior_in)
    odds_post = odds_prior * math.exp(llr)
    posterior = odds_post / (1.0 + odds_post)
    return posterior

def score_new_target(
    observed_genes: List[str],
    p_in: pd.Series,
    p_out: pd.Series,
    decision_threshold: float,
    prior_in: float = 0.01
):
    llr = llr_score(observed_genes, p_in, p_out)
    post = posterior_from_llr(llr, prior_in)
    is_member_call = (llr >= decision_threshold)
    return {
        "llr": llr,
        "posterior_prob_member": post,
        "call_member?": is_member_call
    }