In [9]:
import session_info

import itertools
import os
import re
import shutil
import sys
from pathlib import Path
from typing import Iterable, Literal

import anndata as ad
import numpy as np
import scipy.sparse as sp
import duckdb
import pandas as pd
import requests
from appdirs import user_cache_dir
from tqdm import tqdm

In [10]:
REMOTE_URL = "https://object-store.rc.nectar.org.au/v1/AUTH_06d6e008e3e642da99d806ba3ea629c5"
ASSAY_URL = "{}/cellNexus-anndata".format(REMOTE_URL)
METADATA_URL = "{}/cellNexus-metadata/metadata.1.0.11.parquet".format(REMOTE_URL)
MIN_EXPECTED_SIZE = 5000000

assay_map = {"counts": "counts", "cpm": "cpm"}

def is_parquet_valid(parquet_file):
    try:
        conn = duckdb.connect()
        conn.from_parquet(str(parquet_file))  # Try reading
        return True  # File is valid
    except Exception as e:
        print(f"Parquet file is corrupt: {e}")
        return False  # File is corrupt
        
def _get_default_cache_dir() -> Path:
    return Path(user_cache_dir("cellNexusPy"))

    # helper function to download file over http/https
def _sync_remote_file(full_url: str, output_file: Path):
    if not output_file.exists():
        output_dir = output_file.parent
        output_dir.mkdir(parents=True, exist_ok=True)
        print(f"Downloading {full_url} to {output_file}", file=sys.stderr)
        req = requests.get(full_url, stream=True, allow_redirects=True)
        req.raise_for_status()
        pbar = tqdm(total=int(req.headers.get("Content-Length", 0)))
        with pbar.wrapattr(req.raw, "read") as src, output_file.open("wb") as dest:
            shutil.copyfileobj(src, dest)

# function to get metadata
def get_metadata(
    parquet_url: str = METADATA_URL,
    cache_dir: os.PathLike[str] = _get_default_cache_dir(),
) -> tuple[duckdb.DuckDBPyConnection, duckdb.DuckDBPyRelation]:
    parquet_local = Path(cache_dir) / parquet_url.split("/")[-1]

    if not parquet_local.exists() or not is_parquet_valid(parquet_local):
        print("File is missing or corrupted. Re-downloading...")
        parquet_local.unlink(missing_ok=True)  # Delete the corrupted file
        _sync_remote_file(parquet_url, parquet_local)  # Re-download
    
    _sync_remote_file(parquet_url, parquet_local)
    conn = duckdb.connect()
    return conn, conn.from_parquet(str(parquet_local))

def sync_assay_files(
    url: str = ASSAY_URL,
    cache_dir: Path = _get_default_cache_dir(),
    subdir: str = "",
    atlas: str = "",
    aggregation: str = "",
    files: Iterable[str] = [],
):
    for file in files:
        if aggregation == "single_cell":
            sub_url = f"{url}/{atlas}/{subdir}/{file}"
        else:
            sub_url = f"{url}/{atlas}/{aggregation}/{subdir}/{file}"
        output_filepath = cache_dir / atlas / aggregation / subdir / file

        if not output_filepath.exists() or os.path.getsize(output_filepath) < MIN_EXPECTED_SIZE:
            _sync_remote_file(sub_url, output_filepath)

        yield subdir, output_filepath

In [11]:
conn, table = get_metadata()
table

┌────────────────────────────────────────────────────────────────────────┬──────────────────────────────────────┬────────────────────┬──────────────────────────────────┬──────────────────────────┬────────────────────────────┬──────────────────────────────────┬─────────────┬────────────────────────┬────────────┬────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┬──────────────────────────────────────┬──────────────────────────────────────┬───────────────────┬───────────────────────────┬────────────────────────────────────┬─────────────────────┬──────────────────────────┬───────────────────────┬───────────────┬──────────────────────────────────────────────────────────────────────────────┬───────────────┬─────────────┬──────────┬───────────

In [13]:
query = table.filter("""
    self_reported_ethnicity = 'African'
    AND assay LIKE '%10%'
    AND tissue = 'lung parenchyma'
    AND cell_type LIKE '%CD4%'
""")
query

┌───────────────────────────────────────────────────────────────────────┬──────────────────────────────────────┬────────────────────┬──────────────────────────────────────────────────┬─────────────────────────────────┬────────────────────────────┬──────────────────────────────────┬───────────┬────────────────────────┬────────────┬────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┬──────────────────────────────────────┬──────────────────────────────────────┬───────────────────┬─────────────────────────┬────────────────────────────────────┬─────────┬──────────────────────────┬───────────────────────────────────────────────────────────────────────────────┬───────────────┬─────────────────────────────────────────────────────────────────────

In [20]:
data= query
assay = "counts"
aggregation = "pseudobulk"
cache_directory = _get_default_cache_dir()
repository = ASSAY_URL
features = slice(None, None, None)

In [45]:
#assert set(assays).issubset(set(assay_map.keys()))
#assert isinstance(cache_directory, Path), "cache_directory must be a Path"

cache_directory.mkdir(exist_ok=True, parents=True)

files_to_read = (
    data.project("file_id_cellNexus_pseudobulk").distinct().fetchdf()["file_id_cellNexus_pseudobulk"]
)

atlas = data.project('"atlas_id"').distinct().fetchdf()["atlas_id"][0]                                                                                                                      

synced = sync_assay_files(
    url=repository, cache_dir=cache_directory, atlas=atlas, subdir=assay, aggregation=aggregation, files=files_to_read
)

In [46]:
for assay_name, files in itertools.groupby(synced, key=lambda x: x[0]):
    ads = [filter_data(file[1]) for file in files]

In [44]:
def filter_data(file):
    cells = data.filter("file_id_cellNexus_pseudobulk ="  + "'"+str(file).split("/")[-1]+"'").fetchdf()
    cell_ids = cells["sample_id"].astype(str) + "___" + cells["cell_type_unified_ensemble"].astype(str)
    anndata = ad.read_h5ad(file)
    ann = anndata[cell_ids.unique(),features].copy()

    columns_to_remove = ["cell_id", "cell_type", "file_id_cellNexus_single_cell",
                     "cell_type_ontology_term_id",
                     "observation_joinid", "ensemble_joinid",
                     "nFeature_RNA", "data_driven_ensemble", "cell_type_unified",
                     "empty_droplet", "observation_originalid", "alive", "scDblFinder.class"]
    subdata = cells.drop(columns=[col for col in columns_to_remove if col in cells])
    pattern = '|'.join(re.escape(s) for s in ["metacell","azimuth","monaco","blueprint","subsets_","high_"])
    
    # Find matching columns and drop them
    cols_to_drop = subdata.columns[subdata.columns.str.contains(pattern, case=False, regex=True)]
    subdata = subdata.drop(columns=cols_to_drop)
    subdata = subdata.drop_duplicates(keep='last')
    subdata.index = subdata["sample_id"]+"___"+subdata["cell_type_unified_ensemble"]

    ann.obs = subdata.reindex(ann.obs.index)
    return ann

In [50]:
adatas = ad.concat(ads,index_unique="_")

In [51]:
adatas.obs

Unnamed: 0,dataset_id,sample_id,sample_,assay,assay_ontology_term_id,cell_count,citation,collection_id,dataset_version_id,default_embedding,...,X_umap2,age_days,tissue_groups,atlas_id,cell_type_unified_ensemble,is_immune,sample_chunk,cell_chunk,sample_pseudobulk_chunk,file_id_cellNexus_pseudobulk
e4d7f8162faf68a85f61bdbd81dae627___other_0,9f222629-9e39-47d0-b83f-e08d610c7479,e4d7f8162faf68a85f61bdbd81dae627,e4d7f8162faf68a85f61bdbd81dae627,10x 5' v1,EFO:0011025,2282447,Publication: https://doi.org/10.1038/s41591-02...,6f6d381a-7701-4781-935c-db10d30de293,8d84ba15-d367-4dce-979c-85da70b868a2,X_umap,...,,10585,respiratory system,cellxgene/01-05-2025,other,False,1.0,11.0,2.0,c25a6ea6b00d263d6cbb2d06a542a2c7___2.h5ad
d0a8856647d20b1fa1e83edb4bb9313e___other_0,9f222629-9e39-47d0-b83f-e08d610c7479,d0a8856647d20b1fa1e83edb4bb9313e,d0a8856647d20b1fa1e83edb4bb9313e,10x 5' v1,EFO:0011025,2282447,Publication: https://doi.org/10.1038/s41591-02...,6f6d381a-7701-4781-935c-db10d30de293,8d84ba15-d367-4dce-979c-85da70b868a2,X_umap,...,,20075,respiratory system,cellxgene/01-05-2025,other,False,1.0,5.0,2.0,c25a6ea6b00d263d6cbb2d06a542a2c7___2.h5ad
d0a8856647d20b1fa1e83edb4bb9313e___cd4 th17 em_1,9f222629-9e39-47d0-b83f-e08d610c7479,d0a8856647d20b1fa1e83edb4bb9313e,d0a8856647d20b1fa1e83edb4bb9313e,10x 5' v1,EFO:0011025,2282447,Publication: https://doi.org/10.1038/s41591-02...,6f6d381a-7701-4781-935c-db10d30de293,8d84ba15-d367-4dce-979c-85da70b868a2,X_umap,...,,20075,respiratory system,cellxgene/01-05-2025,cd4 th17 em,True,1.0,5.0,2.0,c25a6ea6b00d263d6cbb2d06a542a2c7___1.h5ad
d0a8856647d20b1fa1e83edb4bb9313e___cd4 th2 em_1,9f222629-9e39-47d0-b83f-e08d610c7479,d0a8856647d20b1fa1e83edb4bb9313e,d0a8856647d20b1fa1e83edb4bb9313e,10x 5' v1,EFO:0011025,2282447,Publication: https://doi.org/10.1038/s41591-02...,6f6d381a-7701-4781-935c-db10d30de293,8d84ba15-d367-4dce-979c-85da70b868a2,X_umap,...,,20075,respiratory system,cellxgene/01-05-2025,cd4 th2 em,True,1.0,5.0,2.0,c25a6ea6b00d263d6cbb2d06a542a2c7___1.h5ad
4f067f7e5f960bc72b0710684a521e84____SC84___cd4 th17 em_1,9f222629-9e39-47d0-b83f-e08d610c7479,4f067f7e5f960bc72b0710684a521e84____SC84,4f067f7e5f960bc72b0710684a521e84,10x 3' v3,EFO:0009922,2282447,Publication: https://doi.org/10.1038/s41591-02...,6f6d381a-7701-4781-935c-db10d30de293,8d84ba15-d367-4dce-979c-85da70b868a2,X_umap,...,,18980,respiratory system,cellxgene/01-05-2025,cd4 th17 em,True,1.0,14.0,2.0,c25a6ea6b00d263d6cbb2d06a542a2c7___1.h5ad
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
a2459ad4272363e6eb775e8e99607c3e___cd4 th1/th17 em_3,9f222629-9e39-47d0-b83f-e08d610c7479,a2459ad4272363e6eb775e8e99607c3e,a2459ad4272363e6eb775e8e99607c3e,10x 5' v1,EFO:0011025,2282447,Publication: https://doi.org/10.1038/s41591-02...,6f6d381a-7701-4781-935c-db10d30de293,8d84ba15-d367-4dce-979c-85da70b868a2,X_umap,...,,19345,respiratory system,cellxgene/01-05-2025,cd4 th1/th17 em,True,1.0,1.0,1.0,a4942dd92de40cc23c6161df7e02ccc7___1.h5ad
bfe624d44f7e5868cc22e11ad0f13866___cd4 th1 em_3,9f222629-9e39-47d0-b83f-e08d610c7479,bfe624d44f7e5868cc22e11ad0f13866,bfe624d44f7e5868cc22e11ad0f13866,10x 5' v1,EFO:0011025,2282447,Publication: https://doi.org/10.1038/s41591-02...,6f6d381a-7701-4781-935c-db10d30de293,8d84ba15-d367-4dce-979c-85da70b868a2,X_umap,...,,11315,respiratory system,cellxgene/01-05-2025,cd4 th1 em,True,1.0,1.0,1.0,a4942dd92de40cc23c6161df7e02ccc7___1.h5ad
a2459ad4272363e6eb775e8e99607c3e___treg_3,9f222629-9e39-47d0-b83f-e08d610c7479,a2459ad4272363e6eb775e8e99607c3e,a2459ad4272363e6eb775e8e99607c3e,10x 5' v1,EFO:0011025,2282447,Publication: https://doi.org/10.1038/s41591-02...,6f6d381a-7701-4781-935c-db10d30de293,8d84ba15-d367-4dce-979c-85da70b868a2,X_umap,...,,19345,respiratory system,cellxgene/01-05-2025,treg,True,1.0,1.0,1.0,a4942dd92de40cc23c6161df7e02ccc7___1.h5ad
a2459ad4272363e6eb775e8e99607c3e___cd4 th2 em_3,9f222629-9e39-47d0-b83f-e08d610c7479,a2459ad4272363e6eb775e8e99607c3e,a2459ad4272363e6eb775e8e99607c3e,10x 5' v1,EFO:0011025,2282447,Publication: https://doi.org/10.1038/s41591-02...,6f6d381a-7701-4781-935c-db10d30de293,8d84ba15-d367-4dce-979c-85da70b868a2,X_umap,...,,19345,respiratory system,cellxgene/01-05-2025,cd4 th2 em,True,1.0,1.0,1.0,a4942dd92de40cc23c6161df7e02ccc7___1.h5ad
