In [None]:
# Standard library
# (none in this case)

# Third-party libraries
import numpy as np
import pandas as pd
import scanpy as sc
import scipy.sparse as sp
from sklearn.metrics import (
    precision_recall_curve,
    average_precision_score,
    roc_auc_score,
)

# Local modules
from cna_inferer.segmentation import call_cnas
from cna_inferer.main import process_and_call_cnas


In [2]:

def simulate_cna(adata, chrom, start_bp, end_bp,
                 effect='gain', fold_change=1.5,
                 cell_fraction=0.5, random_seed=None):
    if random_seed is not None:
        np.random.seed(random_seed)

    # —— 1) 找到要注入的基因下标 ——
    mask = (
        (adata.var['chromosome'] == chrom) &
        (adata.var['start']      >= start_bp) &
        (adata.var['end']        <= end_bp)
    ).values
    gene_idxs = np.nonzero(mask)[0]
    if gene_idxs.size == 0:
        raise ValueError(f"No genes in {chrom}:{start_bp}-{end_bp}")

    # —— 2) 随机选细胞 —— 
    n_cells  = adata.n_obs
    n_target = int(np.round(n_cells * cell_fraction))
    cell_idxs = np.random.choice(n_cells, n_target, replace=False)

    # —— 3) 转成 dense，做切片乘除 —— 
    X = adata.X
    if sp.issparse(X):
        X = X.toarray()       # ← 转成 (n_cells × n_genes) 的 ndarray
    else:
        X = X.copy()

    # 用 ix 同时索引行列，一次性批量修改
    if effect == 'gain':
        X[np.ix_(cell_idxs, gene_idxs)] *= fold_change
    else:
        X[np.ix_(cell_idxs, gene_idxs)] /= fold_change

    # —— 4) 保存回去 + 存真值 —— 
    adata.X = X
    ev = {
        'chromosome': chrom,
        'start_bp':   start_bp,
        'end_bp':     end_bp,
        'effect':     effect,
        'cells':      cell_idxs.tolist()
    }
    adata.uns.setdefault('simulated_cna', []).append(ev)
    return adata


def get_segment_by_gene_count(adata, chrom, gene_count, start_idx=0):
    """
    根据某染色体的基因数量，自动返回 start_bp/end_bp。
    """
    idxs = np.where(adata.var['chromosome'] == chrom)[0]
    idxs = np.sort(idxs)
    if start_idx + gene_count > len(idxs):
        raise ValueError(f"Not enough genes on chr{chrom} for count {gene_count}")
    seg = idxs[start_idx:start_idx + gene_count]
    start_bp = adata.var.iloc[seg[0]]['start']
    end_bp   = adata.var.iloc[seg[-1]]['end']
    return start_bp, end_bp


def evaluate_cnvs(adata, window_size):
    """
    基于 adata.uns['simulated_cna'] 和 adata.uns['cna_events']，
    构建 y_true/y_pred，并返回 AUPR/AUROC。
    """
    bin_info = adata.uns['bin_info']
    n_bins   = bin_info.shape[0]
    n_cells  = adata.n_obs

    # 真值矩阵
    y_true = np.zeros((n_cells, n_bins), dtype=int)
    for ev in adata.uns.get('simulated_cna', []):
        chrom   = ev['chromosome']
        start_bp = ev['start_bp']
        end_bp   = ev['end_bp']
        cells   = ev['cells']
        # 找到对应 bins
        mask = (
            (adata.var['chromosome'] == chrom) &
            (adata.var['start'] >= start_bp) &
            (adata.var['end']   <= end_bp)
        ).values
        gene_idxs = np.where(mask)[0]
        bin_idxs  = np.unique(gene_idxs // window_size)
        for ci in cells:
            y_true[ci, bin_idxs] = 1

    # 预测矩阵
    # cell_name -> idx
    cell_to_idx = {c: i for i, c in enumerate(adata.obs_names)}
    y_pred = np.zeros_like(y_true)
    df = adata.uns.get('cna_events', pd.DataFrame())
    if not df.empty:
        for _, row in df.iterrows():
            ci    = cell_to_idx[row['cell']]
            start = int(row['start_bin'])
            end   = int(row['end_bin'])
            y_pred[ci, start:end+1] = 1

    # 计算指标
    y_true_flat = y_true.ravel()
    y_pred_flat = y_pred.ravel()
    precision, recall, _ = precision_recall_curve(y_true_flat, y_pred_flat)
    aupr  = average_precision_score(y_true_flat, y_pred_flat)
    try:
        auroc = roc_auc_score(y_true_flat, y_pred_flat)
    except ValueError:
        auroc = np.nan
    return {'AUPR': aupr, 'AUROC': auroc}


def annotate_genes_from_gtf_pandas(adata, gtf_path):
    cols = ["chrom","source","feature","start","end","score","strand","frame","attribute"]
    gtf = pd.read_csv(gtf_path, sep="\t", comment="#", header=None, names=cols, low_memory=False)
    # 只保留 gene
    genes = gtf[gtf.feature=='gene'].copy()
    # 从 attribute 抽 id/name
    genes['gene_id']   = genes.attribute.str.extract('gene_id "([^"]+)"')
    genes['gene_name'] = genes.attribute.str.extract('gene_name "([^"]+)"')
    # 重命名 & 索引
    genes = genes.set_index('gene_id')[['gene_name','chrom','start','end']]
    genes = genes.rename(columns={'chrom':'chromosome'})
    # join 回 adata.var
    #   这里用 gene_ids（Ensembl ID）做 join，避免 gene_name 可能重复
    adata.var = adata.var.join(
        genes, 
        on='gene_ids', 
        how='left', 
        rsuffix='_gtf'
    )
    return adata




In [3]:
adata = process_and_call_cnas("GSM3814888_day8_rep1_filtered_gene_bc_matrices_h5.h5")

  utils.warn_names_duplicates("var")
INFO:root:Extracted GTF attributes: ['gene_id', 'gene_version', 'gene_name', 'gene_source', 'gene_biotype', 'transcript_id', 'transcript_version', 'transcript_name', 'transcript_source', 'transcript_biotype', 'tag', 'ccds_id', 'transcript_support_level', 'exon_number', 'exon_id', 'exon_version', 'protein_id', 'protein_version']
  adata.var["n_cells"] = number


✅ Processed 0 cells...
✅ Processed 500 cells...
✅ Processed 1000 cells...
✅ Processed 1500 cells...
✅ Processed 2000 cells...
✅ Processed 2500 cells...
✅ Processed 3000 cells...
✅ Processed 3500 cells...
✅ Processed 4000 cells...
📊 Total CNA events in GSM3814888_day8_rep1_filtered_gene_bc_matrices_h5.h5: 1229


In [4]:
chroms = adata.var["chromosome"].dropna().unique()
chroms = np.sort(chroms)

print(f"🧬 Total chromosomes in this dataset: {len(chroms)}")
print("→ Chromosome list:", chroms)

🧬 Total chromosomes in this dataset: 24
→ Chromosome list: ['1' '10' '11' '12' '13' '14' '15' '16' '17' '18' '19' '2' '20' '21' '22'
 '3' '4' '5' '6' '7' '8' '9' 'X' 'Y']


In [5]:
import random
valid_chroms = [str(i) for i in range(1, 23)] + ["X", "Y"]
selected_chroms = random.sample(valid_chroms, 7)
print("Selected chromosomes:", selected_chroms)

Selected chromosomes: ['20', 'Y', '8', '17', '6', '2', '14']


In [6]:
small_scenarios  = []
medium_scenarios = []
large_scenarios  = []

for chrom in selected_chroms:
    start_idx = 0
    for name, gene_count, frac, fc, group in [
        ("small_high",  50,   0.8, 2.0, small_scenarios),
        ("medium_mid", 300,   0.4, 1.5, medium_scenarios),
        ("large_low",  1000,  0.1, 1.5, large_scenarios),
    ]:
        try:
            start, end = get_segment_by_gene_count(adata, chrom, gene_count, start_idx)
            group.append({
                "name": name,
                "chrom": chrom,
                "start": start,
                "end": end,
                "effect": "gain",
                "fold_change": fc,
                "cell_fraction": frac,
                "gene_count": gene_count,
            })
            start_idx += gene_count
        except ValueError:
            continue


In [9]:

def evaluate_cnvs(adata, window_size):
    """
    Evaluate CNA (Copy Number Alteration) prediction by comparing simulated ground truth (adata.uns['simulated_cna'])
    and predicted CNAs (adata.uns['cna_events']) in binned expression space.

    Parameters:
    - adata: AnnData object with .var['chromosome'], ['start'], ['end'] for genes
    - window_size: int, number of genes per bin (used to map genes to bins)

    Returns:
    - dict with AUPR and AUROC scores
    """
    bin_info = adata.uns['bin_info']
    n_bins = adata.obsm["X_binned"].shape[1]
    n_cells  = adata.n_obs

    # Ground truth matrix
    y_true = np.zeros((n_cells, n_bins), dtype=int)
    for ev in adata.uns.get('simulated_cna', []):
        chrom   = ev['chromosome']
        start_bp = ev['start_bp']
        end_bp   = ev['end_bp']
        cells   = ev['cells']
        # Identify genes within the specified region
        mask = (
            (adata.var['chromosome'] == chrom) &
            (adata.var['start'] >= start_bp) &
            (adata.var['end']   <= end_bp)
        ).values
        gene_idxs = np.where(mask)[0]
        bin_idxs = np.clip(gene_idxs // window_size, 0, n_bins - 1)
        for ci in cells:
            y_true[ci, bin_idxs] = 1

    # Prediction matrix
    cell_to_idx = {c: i for i, c in enumerate(adata.obs_names)}
    y_pred = np.zeros_like(y_true)
    df = adata.uns.get('cna_events', pd.DataFrame())
    if not df.empty:
        for _, row in df.iterrows():
            ci    = cell_to_idx[row['cell']]
            start = int(row['start_bin'])
            end   = int(row['end_bin'])
            y_pred[ci, start:end+1] = 1

    # Evaluation metrics
    y_true_flat = y_true.ravel()
    y_pred_flat = y_pred.ravel()
    precision, recall, _ = precision_recall_curve(y_true_flat, y_pred_flat)
    aupr  = average_precision_score(y_true_flat, y_pred_flat)
    try:
        auroc = roc_auc_score(y_true_flat, y_pred_flat)
    except ValueError:
        auroc = np.nan

    return {'AUPR': aupr, 'AUROC': auroc}

In [10]:
results = []

for scenario_group, label in zip(
    [small_scenarios, medium_scenarios, large_scenarios],
    ["small_high", "medium_mid", "large_low"]
):
    ad = adata.copy()

   
    for s in scenario_group:
        ad = simulate_cna(ad, s["chrom"], s["start"], s["end"],
                          s["effect"], s["fold_change"], s["cell_fraction"], random_seed=42)

    injected_cells = sum(len(ev["cells"]) for ev in ad.uns["simulated_cna"])

    
    ad = call_cnas(
        ad,
        window_size=100,
        z_thresh=1.5,
        min_bins=3
    )
    
    detected_events = ad.uns["cna_events"].shape[0]
    m = evaluate_cnvs(ad, window_size=100)

    results.append({
        "group": label,
        "n_segments": len(scenario_group),
        "injected_cells": injected_cells,
        "detected_events": detected_events,
        "AUPR": m["AUPR"],
        "AUROC": m["AUROC"]
    })

pd.DataFrame(results)


✅ Processed 0 cells...
✅ Processed 500 cells...
✅ Processed 1000 cells...
✅ Processed 1500 cells...
✅ Processed 2000 cells...
✅ Processed 2500 cells...
✅ Processed 3000 cells...
✅ Processed 3500 cells...
✅ Processed 4000 cells...
✅ Processed 0 cells...
✅ Processed 500 cells...
✅ Processed 1000 cells...
✅ Processed 1500 cells...
✅ Processed 2000 cells...
✅ Processed 2500 cells...
✅ Processed 3000 cells...
✅ Processed 3500 cells...
✅ Processed 4000 cells...
✅ Processed 0 cells...
✅ Processed 500 cells...
✅ Processed 1000 cells...
✅ Processed 1500 cells...
✅ Processed 2000 cells...
✅ Processed 2500 cells...
✅ Processed 3000 cells...
✅ Processed 3500 cells...
✅ Processed 4000 cells...


Unnamed: 0,group,n_segments,injected_cells,detected_events,AUPR,AUROC
0,small_high,6,20862,1843,0.028981,0.492203
1,medium_mid,6,10428,1843,0.049087,0.496971
2,large_low,1,435,1843,0.007978,0.492252
