In [1]:
import scanpy as sc
import numpy as np
import os
import json
import glob

In [2]:
FILE_PATH = "/cs/labs/mornitzan/yam.arieli/datasets/Oesinghaus/Parse_10M_PBMC_cytokines.h5ad"
BASE_PATH = "/cs/labs/mornitzan/yam.arieli/datasets/Oesinghaus_pseudotubes"

# --- Configuration ---
N_PER_CELL_TYPE = 30       # cells per cell type per pseudo-tube
MIN_CELLS_THRESHOLD = 10   # minimum cells for a cell type to be included in a tube
N_PSEUDO_TUBES = 10        # number of pseudo-tubes to build per (donor, cytokine)
RANDOM_SEED = 42
# ---------------------

rng = np.random.default_rng(RANDOM_SEED)

In [3]:
adata_backed = sc.read_h5ad(FILE_PATH, backed='r')

In [4]:
# ---------------------------------------------------------------------------
# Resume helpers
# ---------------------------------------------------------------------------

def _is_cytokine_complete(base_path, donor_name, cytokine, n_pseudo_tubes):
    """Return True if all N pseudo-tube files already exist on disk."""
    safe_cyt = str(cytokine).replace(" ", "_").replace("/", "_")
    folder = os.path.join(base_path, donor_name, safe_cyt)
    if not os.path.exists(folder):
        return False
    existing = glob.glob(os.path.join(folder, "pseudotube_*.h5ad"))
    return len(existing) >= n_pseudo_tubes


def _get_eligible_cell_types(obs_df, min_cells_threshold):
    """Return list of cell types with at least min_cells_threshold cells."""
    return [
        ct for ct in obs_df['cell_type'].unique()
        if (obs_df['cell_type'] == ct).sum() >= min_cells_threshold
    ]


def _compute_n_cells(obs_df, eligible_cell_types, n_per_cell_type):
    """
    Compute expected cells per pseudo-tube deterministically from obs.

    Each tube samples min(N_PER_CELL_TYPE, n_available) cells per eligible
    cell type, so the total is fixed for a given (donor, cytokine) pair.
    """
    return sum(
        min(n_per_cell_type, int((obs_df['cell_type'] == ct).sum()))
        for ct in eligible_cell_types
    )


# ---------------------------------------------------------------------------
# Build manifest — skip donors / cytokines that are already on disk
# ---------------------------------------------------------------------------

manifest = []

for donor_name in sorted(adata_backed.obs['donor'].unique()):
    print(f"\n{'='*50}")
    print(f"Processing {donor_name}...")

    # adata_backed.obs is in-memory (pandas DataFrame) — no I/O needed here
    donor_obs = adata_backed.obs[adata_backed.obs['donor'] == donor_name]
    cytokines = sorted(donor_obs['cytokine'].unique())

    n_complete = sum(
        1 for cyt in cytokines
        if _is_cytokine_complete(BASE_PATH, donor_name, cyt, N_PSEUDO_TUBES)
    )

    # ----------------------------------------------------------------
    # Case 1: entire donor already done — rebuild manifest from obs,
    # no need to load the expression matrix into memory.
    # ----------------------------------------------------------------
    if n_complete == len(cytokines):
        print(f"  All {len(cytokines)} cytokines already complete — "
              f"rebuilding manifest entries (no data load needed)")
        for cytokine in cytokines:
            cyt_obs = donor_obs[donor_obs['cytokine'] == cytokine]
            eligible_cell_types = _get_eligible_cell_types(cyt_obs, MIN_CELLS_THRESHOLD)
            if not eligible_cell_types:
                continue
            n_cells = _compute_n_cells(cyt_obs, eligible_cell_types, N_PER_CELL_TYPE)
            safe_cyt = str(cytokine).replace(" ", "_").replace("/", "_")
            folder = os.path.join(BASE_PATH, donor_name, safe_cyt)
            for tube_idx in range(N_PSEUDO_TUBES):
                manifest.append({
                    "path": os.path.join(folder, f"pseudotube_{tube_idx}.h5ad"),
                    "donor": donor_name,
                    "cytokine": cytokine,
                    "n_cells": n_cells,
                    "cell_types_included": eligible_cell_types,
                    "tube_idx": tube_idx,
                })
        continue

    # ----------------------------------------------------------------
    # Case 2: donor is partially (or not at all) processed —
    # load into memory once, then skip already-complete cytokines.
    # ----------------------------------------------------------------
    print(f"  {n_complete}/{len(cytokines)} cytokines already complete — "
          f"loading donor into memory")
    mask = adata_backed.obs['donor'] == donor_name
    adata_donor = adata_backed[mask, :].to_memory()

    for cytokine in cytokines:
        adata_cyt = adata_donor[adata_donor.obs['cytokine'] == cytokine]
        eligible_cell_types = _get_eligible_cell_types(adata_cyt.obs, MIN_CELLS_THRESHOLD)

        if not eligible_cell_types:
            print(f"  Skipping {cytokine}: no eligible cell types")
            continue

        safe_cytokine = str(cytokine).replace(" ", "_").replace("/", "_")
        folder = os.path.join(BASE_PATH, donor_name, safe_cytokine)

        # Already done: add manifest entries without touching files
        if _is_cytokine_complete(BASE_PATH, donor_name, cytokine, N_PSEUDO_TUBES):
            n_cells = _compute_n_cells(adata_cyt.obs, eligible_cell_types, N_PER_CELL_TYPE)
            for tube_idx in range(N_PSEUDO_TUBES):
                manifest.append({
                    "path": os.path.join(folder, f"pseudotube_{tube_idx}.h5ad"),
                    "donor": donor_name,
                    "cytokine": cytokine,
                    "n_cells": n_cells,
                    "cell_types_included": eligible_cell_types,
                    "tube_idx": tube_idx,
                })
            print(f"  {cytokine}: already complete (skipped)")
            continue

        # Not yet done: process normally
        os.makedirs(folder, exist_ok=True)

        for tube_idx in range(N_PSEUDO_TUBES):
            sampled_adatas = []

            for ct in eligible_cell_types:
                ct_adata = adata_cyt[adata_cyt.obs['cell_type'] == ct]
                n_available = ct_adata.n_obs
                n_sample = min(N_PER_CELL_TYPE, n_available)

                chosen_idx = rng.choice(n_available, size=n_sample, replace=False)
                sampled_adatas.append(ct_adata[chosen_idx].copy())

            # Concatenate all cell types into one pseudo-tube
            pseudo_tube = sc.concat(sampled_adatas, join='outer')

            # Shuffle cell order so cell type isn't implicit in position
            shuffle_idx = rng.permutation(pseudo_tube.n_obs)
            pseudo_tube = pseudo_tube[shuffle_idx].copy()

            # Save
            out_path = os.path.join(folder, f"pseudotube_{tube_idx}.h5ad")
            pseudo_tube.write_h5ad(out_path, compression='gzip')

            manifest.append({
                "path": out_path,
                "donor": donor_name,
                "cytokine": cytokine,
                "n_cells": pseudo_tube.n_obs,
                "cell_types_included": eligible_cell_types,
                "tube_idx": tube_idx,
            })

        print(f"  {cytokine}: {N_PSEUDO_TUBES} pseudo-tubes, "
              f"{len(eligible_cell_types)} cell types, "
              f"{pseudo_tube.n_obs} cells each")

# Save manifest
manifest_path = os.path.join(BASE_PATH, "manifest.json")
with open(manifest_path, "w") as f:
    json.dump(manifest, f, indent=2)

print(f"\nDone. {len(manifest)} pseudo-tubes saved.")
print(f"Manifest: {manifest_path}")


Processing Donor1...
  All 91 cytokines already complete — rebuilding manifest entries (no data load needed)

Processing Donor10...
  All 91 cytokines already complete — rebuilding manifest entries (no data load needed)

Processing Donor11...
  All 91 cytokines already complete — rebuilding manifest entries (no data load needed)

Processing Donor12...
  All 91 cytokines already complete — rebuilding manifest entries (no data load needed)

Processing Donor2...
  All 91 cytokines already complete — rebuilding manifest entries (no data load needed)

Processing Donor3...
  All 91 cytokines already complete — rebuilding manifest entries (no data load needed)

Processing Donor4...
  All 91 cytokines already complete — rebuilding manifest entries (no data load needed)

Processing Donor5...
  All 91 cytokines already complete — rebuilding manifest entries (no data load needed)

Processing Donor6...
  All 91 cytokines already complete — rebuilding manifest entries (no data load needed)

Proces