# Experiment: Cell-Type-Adjusted Spatial Stickiness

Use this notebook to compute **cell-type-adjusted stickiness** (spatial structure beyond cell-type composition), inspect significance, and run quick diagnostics.


In [35]:
from __future__ import annotations

from pathlib import Path
import importlib
import sys

import numpy as np
import pandas as pd
import scanpy as sc

def find_repo_root(start: Path) -> Path:
    for candidate in (start, *start.parents):
        if (candidate / "celltype_adjusted_stickiness.py").exists():
            return candidate
    raise FileNotFoundError("Could not find repo root from current working directory.")

REPO_ROOT = find_repo_root(Path.cwd().resolve())
if str(REPO_ROOT) not in sys.path:
    sys.path.insert(0, str(REPO_ROOT))

import celltype_adjusted_stickiness as ctas
ctas = importlib.reload(ctas)

stickiness = ctas.stickiness
stickiness_diagnostics = ctas.stickiness_diagnostics

print(f"Repo root: {REPO_ROOT}")


Repo root: /Users/christoffer/work/karolinska/development/NeighborNorm


## Load AnnData
Set `DATA_PATH` to your `.h5ad` file.


In [36]:
DATA_PATH = "/Volumes/processing2/BALO/baloMS_indep_clust_balo_MANA_SC.h5ad"  # change this
adata = sc.read_h5ad(DATA_PATH)
adata


  utils.warn_names_duplicates("obs")


AnnData object with n_obs × n_vars = 202751 × 5101
    obs: 'x_centroid', 'y_centroid', 'transcript_counts', 'control_probe_counts', 'genomic_control_counts', 'control_codeword_counts', 'unassigned_codeword_counts', 'deprecated_codeword_counts', 'total_counts', 'cell_area', 'nucleus_area', 'nucleus_count', 'segmentation_method', 'run', 'sample_id', 'n_genes_by_counts', 'n_counts', 'n_genes', 'leiden_0.5', 'leiden_1', 'leiden_1.5', 'leiden_2', 'gmm_mana_5', 'gmm_mana_8', 'gmm_mana_10', 'gmm_mana_12', 'gmm_mana_15', 'gmm_mana_20'
    var: 'n_cells_by_counts', 'mean_counts', 'pct_dropout_by_counts', 'total_counts', 'highly_variable', 'highly_variable_rank', 'means', 'variances', 'variances_norm'
    uns: 'X_mana_gauss_params', 'gmm_mana_10_colors', 'gmm_mana_15_colors', 'gmm_mana_5_colors', 'gmm_mana_8_colors', 'hvg', 'leiden_0.5', 'leiden_0.5_colors', 'leiden_1', 'leiden_1.5', 'leiden_1.5_colors', 'leiden_1_colors', 'leiden_2', 'leiden_2_colors', 'log1p', 'neighbors', 'pca', 'rank_genes_

In [37]:
print(f"n_obs={adata.n_obs:,}, n_vars={adata.n_vars:,}")
print("obs columns (first 20):", list(adata.obs.columns)[:20])
print("obsm keys:", list(adata.obsm.keys()))
print("obsp keys:", list(adata.obsp.keys()))

if "spatial" not in adata.obsm:
    raise KeyError("adata.obsm['spatial'] is required.")

CTAS_CELL_TYPE_KEY = "cell_type" if "cell_type" in adata.obs else "leiden_2"
if CTAS_CELL_TYPE_KEY not in adata.obs:
    raise KeyError("Need a cell type column (expected 'cell_type' or 'leiden_2').")

CTAS_CONN_KEY = (
    "spatial_neighbors_connectivities"
    if "spatial_neighbors_connectivities" in adata.obsp
    else "connectivities"
)
if CTAS_CONN_KEY not in adata.obsp:
    raise KeyError("Need a connectivities graph in adata.obsp.")

print("Using cell_type_key:", CTAS_CELL_TYPE_KEY)
print("Using connectivities_key:", CTAS_CONN_KEY)


n_obs=202,751, n_vars=5,101
obs columns (first 20): ['x_centroid', 'y_centroid', 'transcript_counts', 'control_probe_counts', 'genomic_control_counts', 'control_codeword_counts', 'unassigned_codeword_counts', 'deprecated_codeword_counts', 'total_counts', 'cell_area', 'nucleus_area', 'nucleus_count', 'segmentation_method', 'run', 'sample_id', 'n_genes_by_counts', 'n_counts', 'n_genes', 'leiden_0.5', 'leiden_1']
obsm keys: ['X_mana_gauss', 'X_pca', 'X_scVI', 'X_umap', 'spatial']
obsp keys: ['connectivities', 'distances', 'spatial_connectivities', 'spatial_distances']
Using cell_type_key: leiden_2
Using connectivities_key: connectivities


## Run Cell-Type-Adjusted Stickiness
Optional prefiltering below can drastically reduce runtime on large datasets.


In [None]:
USE_PREFILTER = True
MIN_DETECTED_CELLS = 50
MAX_DETECT_FRAC = 0.98
N_TOP_HVG = 4000

adata_run = adata
if USE_PREFILTER:
    mat = adata.layers["counts"] if "counts" in adata.layers else adata.X
    detected = np.asarray((mat > 0).sum(axis=0)).ravel()
    detect_frac = detected / max(1, adata.n_obs)

    gene_mask = detected >= MIN_DETECTED_CELLS
    if MAX_DETECT_FRAC is not None:
        gene_mask &= detect_frac <= MAX_DETECT_FRAC

    adata_pref = adata[:, gene_mask].copy()
    layer_for_hvg = "counts" if "counts" in adata_pref.layers else None

    if adata_pref.n_vars > N_TOP_HVG:
        sc.pp.highly_variable_genes(
            adata_pref,
            layer=layer_for_hvg,
            flavor="seurat_v3",
            n_top_genes=min(N_TOP_HVG, adata_pref.n_vars),
            inplace=True,
        )
        adata_run = adata_pref[:, adata_pref.var["highly_variable"].values].copy()
    else:
        adata_run = adata_pref

print(f"Running stickiness on {adata_run.n_vars:,} genes (from {adata.n_vars:,}).")


In [None]:
ctas_df = stickiness(
    adata_run,
    layer="counts" if "counts" in adata_run.layers else None,
    cell_type_key=CTAS_CELL_TYPE_KEY,
    total_counts_key="total_counts",
    connectivities_key=CTAS_CONN_KEY,
    key_added="stickiness",
    n_perm=100,  # raise to 200+ for final results
    random_state=0,
    compute_within_cell_type=False,  # set True if you need withinCT_* columns
    min_cells_per_type=50,
    store_residuals=False,
    copy=False,
    show_progress=True,
)

ctas_df.sort_values("z", ascending=False).head(30)


In [None]:
print("Stored in adata_run.varm keys:", list(adata_run.varm.keys()))
print("Stored in adata_run.uns keys:", [k for k in adata_run.uns.keys() if k.startswith("stickiness")])

pd.DataFrame.from_records(adata_run.varm["stickiness"]).head()


## Diagnostics
Check whether detection-rate coupling drops after adjustment and whether marker genes drop in rank.


In [None]:
marker_genes = ["COL4A1", "MBP", "GFAP"]
diag = stickiness_diagnostics(ctas_df, marker_genes=marker_genes)

print(
    "Spearman corr(detection_rate, naive stickiness):",
    diag["corr_detection_naive_spearman"],
)
print(
    "Spearman corr(detection_rate, residual stickiness):",
    diag["corr_detection_resid_spearman"],
)

diag["marker_rank_shift"]


In [None]:
top_adjusted = ctas_df.sort_values("z", ascending=False)[["gene", "z", "qval", "rank_resid"]].head(20)
top_naive = ctas_df.sort_values("stickiness_naive", ascending=False)[["gene", "stickiness_naive", "rank_naive"]].head(20)

print("Top by adjusted z:")
top_adjusted

print("Top by naive stickiness:")
top_naive


## Optional: Spatial Visualization of Top Adjusted Genes


In [None]:
top_genes = ctas_df.sort_values("z", ascending=False)["gene"].head(6).tolist()
adata_run.obsm["X_spatial"] = np.asarray(adata_run.obsm["spatial"])
sc.pl.embedding(adata_run, basis="spatial", color=top_genes, size=20.0, cmap="viridis")


## Optional: Save Result


In [None]:
OUT_PATH = REPO_ROOT / "data" / "your_data.sticky_adjusted.h5ad"
# adata_run.write_h5ad(OUT_PATH)
# print(f"Saved: {OUT_PATH}")
