In [None]:
from google.colab import drive
drive.mount('/content/drive')

%cd /content/drive/MyDrive/repos/Epilepsy_Microglia
%pip install -q -r requirements.txt

**3. Downstream**
1. scANVI rough labeling
2. Reference label transfer



In [None]:
import torch

# Single Cell Libraries
import scvi
import scanpy as sc
import anndata as ad

# Data Processing and Plotting
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import igraph
import leidenalg
import random

# File grab
import os
import tempfile
import pooch
import shutil, subprocess, glob
import gzip

print(torch.__version__)
print(scvi.__version__)
print(torch.cuda.is_available())

# Random key
SEED = 42
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed_all(SEED)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
scvi.settings.seed = SEED

In [None]:
# not for github

# Pathway/Settings
SOURCEDIR = "/content/drive/MyDrive/datas/epilepsy_microglia/processed/GSE201048/"
FILE_LABEL = "kumar_v2_20251020"
sc.set_figure_params(dpi_save=300, frameon=False)
sc.settings.figdir = "/content/drive/MyDrive/repos/Epilepsy_Microglia/figures"

adata = sc.read_h5ad(os.path.join(SOURCEDIR,f"{FILE_LABEL}.h5ad"))
model_dir = os.path.join(SOURCEDIR,"model_v2")
model = scvi.model.SCVI.load(model_dir, adata=adata)

adata

In [None]:
# not for github
adata.write(os.path.join(SOURCEDIR, "kumar_v2_20251025.h5ad"))
model.save(os.path.join(SOURCEDIR, "model_v2"), overwrite=True)

In [None]:
os.remove("/content/drive/MyDrive/datas/epilepsy_microglia/processed/GSE201048/kumar_v2_20251020.h5ad")

In [None]:
SCVI_LATENT_KEY = "X_scVI"
latent = model.get_latent_representation()
adata.obsm[SCVI_LATENT_KEY] = latent
latent.shape

In [None]:
sc.pp.neighbors(adata, use_rep="X_scVI")
sc.tl.umap(adata, random_state=SEED)
sc.tl.leiden(adata,
             resolution=0.5,
             flavor="igraph",
             random_state=SEED,
             key_added="leiden_v2_0.5")

In [None]:
sc.tl.rank_genes_groups(adata, "leiden_v2_0.5", method="wilcoxon")
sc.pl.rank_genes_groups(adata, n_genes=10, sharey=False,
                        save="20251025_scvi_leiden0.5_rank_genes_groups")

In [None]:
# 0. basic downstream

#sc.pp.neighbors(adata, use_rep="X_scVI", random_state=SEED)
#sc.tl.umap(adata, random_state=SEED)
#sc.tl.leiden(adata, resolution=0.5, flavor="igraph", random_state=SEED)
sc.pl.umap(adata, color=["CD19"])

In [None]:
# Annotating schema v0.2 (20251016)

import scipy.sparse as sp

def get_bool(adata, gene, thr):
    x = adata[:, gene].X
    if sp.issparse(x):
        mask = (x > thr)
        return np.ravel(mask.toarray() if mask.ndim > 1 else mask.A1)
    else:
        return np.ravel(x) > thr

cd45_hi = get_bool(adata, 'PTPRC', 1.5)
cd14_hi = get_bool(adata, 'CD14', 1.0)
cd3_hi  = get_bool(adata, 'CD3E', 1.0)
cd56_hi = get_bool(adata, 'NCAM1', 1.0)
cd19_hi = get_bool(adata, 'CD19', 0.5)
cd20_hi = get_bool(adata, 'MS4A1', 0.5)

adata.obs['manual_label_v1'] = "Unknown"

# immune cell : CD45hi
adata.obs.loc[cd45_hi & cd14_hi, 'manual_label_v1'] = "Mono_Mac"
adata.obs.loc[cd45_hi & cd3_hi, 'manual_label_v1'] = "T"
adata.obs.loc[cd45_hi & cd56_hi & ~cd3_hi, 'manual_label_v1'] = "NK"
adata.obs.loc[cd45_hi & cd19_hi & cd20_hi, 'manual_label_v1'] = "B"

def sparse_any_gt(adata, genes, thr):
    X = adata[:, genes].X
    if not sp.issparse(X):
        return (X > thr).any(axis=1)
    ## keep as sparse boolean matrix
    return np.array((X > thr).sum(axis=1)).ravel() > 0

# microglia: CD45lo but microglia RNA genes
mg = (~cd45_hi) & sparse_any_gt(adata, ['P2RY12', 'TMEM119'], 0.5)
adata.obs.loc[mg, 'manual_label_v1'] = 'Microglia'

# oligo (RNA)
oligo = (~cd45_hi) & sparse_any_gt(adata, ['PLP1','MBP','MOG'], 0.5)
adata.obs.loc[oligo, 'manual_label_v1'] = 'Oligo'

In [None]:
print(pd.value_counts(adata.obs["manual_label_v1"]))

In [None]:
# 2. scANVI label stabilization

scvi.model.SCANVI.setup_anndata(
    adata,
    labels_key="manual_label_v1",
    unlabeled_category="Unknown",
    batch_key="sample_id"
)

sanvi = scvi.model.SCANVI.from_scvi_model(
    model,
    adata=adata,
    unlabeled_category="Unknown",
    labels_key="manual_label_v1"
)

sanvi.train(
    max_epochs=200,
    check_val_every_n_epoch=10,
    n_samples_per_label=100
)
adata.obs['label_scanvi'] = sanvi.predict()

In [None]:
# Check results

sc.pp.neighbors(adata, use_rep="X_scVI")
sc.tl.umap(adata, random_state=SEED)

In [None]:
sc.pl.umap(adata, color=["label_scanvi"])

In [None]:
sc.pl.umap(adata, color=["PTPRC"])

In [None]:
"PTPRC" in adata.var_names

In [None]:
sc.pp.neighbors(adata, use_rep="X_pca", random_state=SEED)
sc.tl.umap(adata, random_state=SEED)
sc.pl.umap(adata, color = ["label_scanvi"])

In [None]:
# @title
# schema v0.1 (20251010) Dumped 20251012

annotation_schema = {
    "version": "0.1",
    "levels": {
        "L0_major": {
            "Microglia": {
                "positive": ["P2RY12", "TMEM119", "CX3CR1", "TREM2", "CSF1R"],
                #"negative": ["PTPRC", "MBP", "AQP4", "CLDN5", "VWF", "SLC17A7", "GAD1", "MOG"],
                "scores": [
                    {
                        "module": "PIC_core",  # e.g., [AIF1, TYROBP, CYBB, NFKBIA, IL1B, CCL3...]
                        "use_for_subtype": True,
                    }
                ],
            },
            "Astro": {
                "positive": ["GFAP", "AQP4", "ALDH1L1", "S100B", "SOX9"],
                #"negative": ["P2RY12", "MBP", "MOG", "CLDN5", "KDR"],
            },
            "Oligo": {
                "positive": ["PLP1", "MBP", "MOG"],
                #"negative": ["AQP4", "P2RY12"],
            },
            "Endo": {
                "positive": ["CLDN5", "KDR", "FOXF2"],
                #"negative": ["P2RY12", "MBP"],
            },
            "Pericyte": {
                "positive": ["CLDN5", "KDR", "FOXF2"],
                #"negative": ["P2RY12", "MBP"],
            },
            "Mono/Macro": {
                "positive": ["LST1", "PTPRC", ],
                "negative": []
            },
            "Immune_T_NK": {
                "positive": ["CD2", "GNLY", "NKG7"],
                "negative": ["P2RY12"],
            },
            "B/Plasma": {

            },
    },
    "rules": {
        "scoring": {
            "method": "rank_pct",  # percentile ranks, not raw cutoffs
            "min_pos_pct": 0.90,
            "max_neg_pct": 0.50,
        },
        "doublet_flags": [
            {"and": ["P2RY12", "GNLY"]},  # Microglia × T/NK co-expression
            {"and": ["P2RY12", "MBP"]},   # Microglia × Oligo
          ],
        },
    }
}

# check if anything out of adata.var_names
L0_major = annotation_schema["levels"]["L0_major"]

for celltype, panel in L0_major.items():
    pos = panel.get("positive", [])
    neg = panel.get("negative", [])
    all_genes = pos + neg
    missing = [g for g in all_genes if g not in adata.var_names]
    if missing :
        print(f"{celltype}: missing {len(missing)} -> {missing}")

In [None]:
# @title
# Annotation 20251010, dumped 20251012

# 1. Annotate the cells based on the labeling panel

# anndata is normalized and log-transformed
rough_labels = np.full(adata.n_obs, "Unknown", dtype=object)

L0_major = annotation_schema["levels"]["L0_major"]

# (1) Build a center assignment based on ( >= 2 genes, >= log1p expr 2 )

for celltype, panel in L0_major.items():
    pos_genes = [g for g in panel.get("positive", []) if g in adata.var_names]
    if len(pos_genes) == 0:
        continue

    # boolean mark for each cell x each gene (pos)
    expr = adata[:, pos_genes].X
    if not isinstance(expr, np.ndarray):
      expr = expr.toarray()

    # set expression threshold
    threshold = 2.0

    # count how many genes exceed threshold per cell
    mark = (expr >= threshold).sum(axis=1)
    mask = mark >= 2
    rough_labels[mask] = f"{celltype}"

adata.obs["rough_annot_center"] = rough_labels
print(pd.value_counts(adata.obs["rough_annot_center"]))

# (2) Build PCA neighbors
sc.pp.pca(adata, n_comps=30)
sc.pp.neighbors(adata, use_rep="X_pca", n_neighbors=15)

# (3) Apply neighborhood purity filter : rule out > 50% other labels.
n_neighbors = adata.uns["neighbors"]["params"]["n_neighbors"]
connectivities = adata.obsp["connectivities"]

# for each cell, compute fraction of neighbors with same label
labels = adata.obs["rough_annot_center"].values
pure = np.full(len(labels), True)

for i in range(len(labels)):
    if pd.isna(labels[i]):
        continue
    # neighbor indices and weights
    neigh_idx = connectivities[i].indices
    neigh_labels = labels[neigh_idx]
    same = np.sum(neigh_labels == labels[i])
    valid = np.sum(~pd.isna(neigh_labels))
    if valid > 0 and same / valid < 0.5:
        pure[i] = False

adata.obs.loc[~pure, "rough_annot_center"] = "Unknown"

# summary
print(pd.value_counts(adata.obs["rough_annot_center"]))

In [None]:
# @title
# not for github
# check double-positive cell proportion

import numpy as np
import scipy.sparse as sp

x = adata[:, "MS4A1"].X
y = adata[:, "CD19"].X

if sp.issparse(x): x = x.toarray()
if sp.issparse(y): y = y.toarray()

x = np.ravel(x)
y = np.ravel(y)

# Correct boolean combination
bcell_pos = (x > 0.5) & (y > 0.5)

n_pos = int(bcell_pos.sum())
print(f"B-cell double positive: {n_pos} cells")
