In [None]:
"""
configs
"""

import sys
import platform, os, psutil, sys
import importlib.metadata as im, platform, sys

print(sys.executable)
print("OS :", platform.system(), platform.release())
print("CPU:", platform.processor())
print("RAM:", round(psutil.virtual_memory().total / 1e9, 1), "GB")

pkgs = [
    "scanpy",
    "anndata",
    "h5py",
    "pandas",
    "numpy",
    "scipy",
    "matplotlib",
    "scrublet",
    "scikit-misc",
]

print("Python", platform.python_version(), "\n")
for p in pkgs:
    try:
        print(f"{p}=={im.version(p)}")
    except im.PackageNotFoundError:
        print(f"{p} not installed")

In [None]:
"""
Build a publication‑grade AnnData (.h5ad) for SSv4 mouse MOp (GSE185862)
"""

#paths
H5_PATH   = "GSE185862_expression_matrix_SSv4.hdf5"
META_PATH = "GSE185862_metadata_ssv4.csv.gz"
GROUP_PATH= "GSE185862_sample_group_mapping.csv.gz"
UMAP_PATH = "GSE185862_umap2d_ssv4.csv.gz"
H5AD_PATH = "GSE185862_SSv4.h5ad"

#imports
import h5py, numpy as np, scipy.sparse as sp, pandas as pd, anndata as ad

#helpers
clean = lambda s: pd.Index(s).astype(str).str.replace("-1$", "", regex=True)
def short_cat(s):   # short & low‑cardinality object columns
    return s.dtype == "object" and s.str.len().max() <= 32 and s.nunique() < 500

#counts matrix
with h5py.File(H5_PATH, "r") as f:
    nc, ng = f["data/exon/dims"][:]
    X = sp.csc_matrix((f["data/exon/x"][:],
                       f["data/exon/i"][:].astype(np.int32),
                       f["data/exon/p"][:].astype(np.int64)),
                      shape=(nc, ng)).tocsr()
    genes = f["gene_names"][:].astype(str)
    bars  = f["sample_names"][:].astype(str)
    tot_ct= f["data/total_exon_counts"][:]

adata = ad.AnnData(X,
                   obs=pd.DataFrame(index=clean(bars)),
                   var=pd.DataFrame(index=genes))
adata.obs["total_counts"] = tot_ct
adata.var_names_make_unique()

#metadata join
meta = pd.read_csv(META_PATH, low_memory=False)

meta["sample_name"] = clean(meta["sample_name"])
meta = meta.set_index("sample_name", drop=True)              
meta = meta.loc[~meta.index.duplicated(keep="first")]

adata.obs = adata.obs.join(meta, how="left")

#fill missing keys with 'Unknown'
for c in ["subclass_label", "class_label", "donor_id", "donor_sex_label"]:
    if c in adata.obs:
        adata.obs[c] = adata.obs[c].fillna("Unknown")

#choose best label column for .cell_type
priority = ["subclass_label", "cluster_label",
            "cell_type_alias_label", "class_label"]
for c in priority:
    if c in adata.obs and adata.obs[c].notna().sum() > 1:
        adata.obs["cell_type"] = adata.obs[c].fillna("Unknown").astype("string")
        print("cell_type source →", c)
        break
else:
    adata.obs["cell_type"] = pd.Series("Unknown",
                                       index=adata.obs.index,
                                       dtype="string")

print("unique cell_type:", adata.obs["cell_type"].nunique())

#sample‑group mapping
adata.obs["sample_id"] = adata.obs.index.str.split("_").str[0]
groups = pd.read_csv(GROUP_PATH, low_memory=False)
adata.obs = adata.obs.merge(groups[["sample", "sample_group"]],
                            left_on="sample_id", right_on="sample",
                            how="left").drop(columns="sample")

#UMAP coordinates
umap = pd.read_csv(UMAP_PATH, index_col=0)
umap.index = clean(umap.index)
adata.obsm["X_umap"] = umap.reindex(adata.obs_names).values

#type‑casting
for col, s in adata.obs.items():
    if col != "cell_type" and short_cat(s):
        adata.obs[col] = s.astype("category")
    elif s.dtype == "object":
        adata.obs[col] = s.astype("string")

#write
ad.settings.allow_write_nullable_strings = True
adata.write(H5AD_PATH, compression="gzip")
ad.read_h5ad(H5AD_PATH, backed="r").file.close()

print("written:", H5AD_PATH,
      "| cells:", adata.n_obs,
      " genes:", adata.n_vars,
      "| unique cell_type:", adata.obs['cell_type'].nunique())

In [None]:
import anndata as ad
adn = ad.read_h5ad("GSE185862_SSv4.h5ad")
assert "sample_id" in adn.obs, "batch key missing"
assert adn.obs["cell_type"].nunique() > 1, "cell_type still empty!"
print("Builder OK – QC script can proceed")

In [None]:
"""
Smart‑seq4 preprocessing with ribosomal %, Scrublet, Seurat‑v3 HVG
"""

#paths
RAW      = "GSE185862_SSv4.h5ad"
FILTERED = "GSE185862_SSv4_filtered.h5ad"
NORM     = "GSE185862_SSv4_norm.h5ad"
HVG      = "GSE185862_SSv4_hvg.h5ad"
FIGDIR   = "Figures"

#config
MIN_GENES = 5_000; MAX_GENES = 15_000
MIN_COUNTS = 50_000; MAX_COUNTS = 2_000_000
MAX_PCT_MT = 1; MAX_PCT_RB = 20
CHUNK = 10_000

RUN_SCRUBLET = True
SCRUB_N = 20_000
SCRUB_THRESHOLD = 0.25          
EXPECTED_RATE = 0.05

BATCH_KEY = "sample_id"         

#imports
import os, numpy as np, scanpy as sc, anndata as ad, importlib
import matplotlib.pyplot as plt, tqdm, scrublet as scr
os.makedirs(FIGDIR, exist_ok=True); plt.rcParams["figure.dpi"] = 600

#QC
adata = ad.read_h5ad(RAW, backed="r")
adata.var["mt"] = adata.var_names.str.lower().str.startswith("mt-")
adata.var["rb"] = adata.var_names.str.lower().str.startswith(("rpl", "rps"))
mt_mask, rb_mask = adata.var["mt"].values, adata.var["rb"].values
n = adata.n_obs

genes_pc  = np.zeros(n, np.int32)
counts_pc = np.zeros(n, np.int64)
mt_cnts   = np.zeros(n, np.int64)
rb_cnts   = np.zeros(n, np.int64)

for s in tqdm.trange(0, n, CHUNK, desc="QC metrics"):
    e = min(s + CHUNK, n); X = adata.X[s:e]
    genes_pc[s:e]  = X.getnnz(1)
    counts_pc[s:e] = np.asarray(X.sum(1)).flatten()
    mt_cnts[s:e]   = np.asarray(X[:, mt_mask].sum(1)).flatten()
    rb_cnts[s:e]   = np.asarray(X[:, rb_mask].sum(1)).flatten()

pct_mt, pct_rb = mt_cnts/counts_pc*100, rb_cnts/counts_pc*100
adata.file.close()

#filter
adata = ad.read_h5ad(RAW)
adata.obs["n_genes"] = genes_pc; adata.obs["n_counts"] = counts_pc
adata.obs["pct_mt"]  = pct_mt;   adata.obs["pct_ribo"] = pct_rb

mask = (
    (adata.obs.n_genes.between(MIN_GENES, MAX_GENES)) &
    (adata.obs.n_counts.between(MIN_COUNTS, MAX_COUNTS)) &
    (adata.obs.pct_mt < MAX_PCT_MT) &
    (adata.obs.pct_ribo < MAX_PCT_RB)
)
adata = adata[mask].copy()
sc.pp.filter_genes(adata, min_cells=3)
adata.layers["counts"] = adata.X.copy()
adata.X = adata.X.astype(np.float32)       
adata.write(FILTERED, compression="gzip")

#Scrublet
if RUN_SCRUBLET:
    sub = np.random.choice(adata.n_obs, SCRUB_N, replace=False)
    scb = scr.Scrublet(adata.X[sub, :].astype(np.float32),
                       expected_doublet_rate=EXPECTED_RATE)
    dscore, _ = scb.scrub_doublets()
    scb.plot_histogram(); plt.tight_layout()
    plt.savefig(f"{FIGDIR}/doublet_hist.png"); plt.close()
    preds = dscore > SCRUB_THRESHOLD
    adata.obs["doublet_score"] = np.nan
    adata.obs.loc[adata.obs.index[sub], "doublet_score"] = dscore
    adata.obs["predicted_doublet"] = False
    adata.obs.loc[adata.obs.index[sub], "predicted_doublet"] = preds
    adata = adata[~adata.obs.predicted_doublet].copy()

#normalise & log1p
adata.layers["counts"] = adata.X.copy()     # keep raw counts
sc.pp.normalize_total(adata, target_sum=1_000_000)
sc.pp.log1p(adata)
adata.raw = adata                             # log‑normalised stored
adata.write(NORM, compression="gzip")

#HVG
adata.layers["counts"] = adata.layers["counts"].astype(np.int32, copy=False)

sc.pp.highly_variable_genes(
    adata,
    flavor="seurat_v3",
    n_top_genes=3_000,
    layer="counts",    
    subset=True,
)
adata.write(HVG)
print("Written:", HVG) 

In [None]:
adata.obs.groupby("donor_sex_label")["external_donor_name_label"].nunique()