# 00 Bulk Data Acquisition

One-click pipeline to fetch all transcriptomic, proteomic, metabolomic, and reference assets for the NETs project. Edit the master configuration first, then execute the cells from top to bottom in Google Colab.


In [None]:

# === MASTER CONFIGURATION ===
# 1. 基础路径
PROJECT_DIR = "/content/drive/MyDrive/NetsAnalysisProject"
RAW_DATA_DIR = f"{PROJECT_DIR}/data/raw"

# 2. 转录组数据 (GEO accession IDs)
GEO_IDS = [
    "GSE184869",
    "GSE125989",
    "GSE43837",
    "GSE14017",
    "GSE14018",
]

# 3. 蛋白质组数据 (PRIDE accession IDs)
PRIDE_IDS = [
    "PXD011796",  # NET 参考蛋白组
    "PXD005719",  # 脑转移膜蛋白组
    "PXD046330",  # HER2+ 脑分泌因子模型
    "PXD051579",  # HER2+ 脑转移BBB模型
]

# 4. 代谢组数据 (通过 URL 清单文件下载)
METABOLOMICS_MANIFEST = f"{PROJECT_DIR}/resources/manifests/metabolomics_urls.txt"

# 5. 参考数据 (例如人类蛋白组 FASTA)
REFERENCE_URLS = [
    "https://ftp.uniprot.org/pub/databases/uniprot/current_release/knowledgebase/reference_proteomes/Eukaryota/UP000005640/UP000005640_9606.fasta.gz",
]

# 6. 下载清单输出位置 (自动创建)
DOWNLOAD_LIST_DIR = f"{PROJECT_DIR}/tmp"
ARIA2_PRIDE_LIST = f"{DOWNLOAD_LIST_DIR}/aria2c_pride_urls.txt"
ARIA2_GEO_LIST = f"{DOWNLOAD_LIST_DIR}/aria2c_geo_urls.txt"
ARIA2_REFERENCE_LIST = f"{DOWNLOAD_LIST_DIR}/aria2c_reference_urls.txt"
MASTER_DOWNLOAD_LIST = f"{DOWNLOAD_LIST_DIR}/MASTER_DOWNLOAD_LIST.txt"

# 7. 下载控制
SKIP_EXISTING = True  # Skip already-downloaded files when building master list


In [None]:

import os
import sys
import json
import shutil
import subprocess
from pathlib import Path
from typing import Iterable, List, Tuple
from urllib.parse import urlparse, unquote

import requests

print(f"requests version: {requests.__version__}")

PROJECT_DIR_PATH = Path(PROJECT_DIR)
RAW_DATA_PATH = Path(RAW_DATA_DIR)
DOWNLOAD_LIST_PATH = Path(DOWNLOAD_LIST_DIR)
TRANSCRIPTOMICS_DIR = RAW_DATA_PATH / "transcriptomics"
PROTEOMICS_DIR = RAW_DATA_PATH / "proteomics"
METABOLOMICS_DIR = RAW_DATA_PATH / "metabolomics"
REFERENCE_DIR = RAW_DATA_PATH / "reference"


In [None]:

def ensure_dir(path: Path) -> Path:
    path.mkdir(parents=True, exist_ok=True)
    return path

def infer_geo_series_folder(geo_id: str) -> str:
    if not geo_id.startswith("GSE"):
        raise ValueError(f"Unexpected GEO accession format: {geo_id}")
    numeric = geo_id[3:]
    if len(numeric) < 3:
        raise ValueError(f"GEO accession is too short: {geo_id}")
    head = numeric[:-3]
    return f"GSE{head}nnn"

def write_aria2_block(handle, target_dir: Path, filename: str, url: str) -> None:
    handle.write(f"dir={target_dir}\n")
    handle.write(f"out={filename}\n")
    handle.write(f"{url}\n\n")

def generate_pride_urls(pxd_ids: Iterable[str], output_path: str, categories: Iterable[str] = ("RAW", "RESULT")) -> Tuple[Path, int]:
    session = requests.Session()
    allowed = {c.upper() for c in categories}
    output = Path(output_path)
    ensure_dir(output.parent)
    ensure_dir(PROTEOMICS_DIR)
    total = 0
    with output.open("w", encoding="utf-8") as handle:
        handle.write("# PRIDE download manifest\n")
        for accession in pxd_ids:
            accession = accession.strip()
            if not accession:
                continue
            dataset_dir = ensure_dir(PROTEOMICS_DIR / accession)
            handle.write(f"# {accession}\n")
            offset = 0
            limit = 200
            while True:
                url = f"https://www.ebi.ac.uk/pride/ws/archive/v2/projects/{accession}/files"
                resp = session.get(url, params={"offset": offset, "limit": limit}, timeout=60)
                if resp.status_code == 404:
                    print(f"[warn] {accession} not found on PRIDE API")
                    break
                resp.raise_for_status()
                payload = resp.json()
                if not payload:
                    break
                for entry in payload:
                    category = (entry.get("fileCategory") or {}).get("value", "").upper()
                    if allowed and category not in allowed:
                        continue
                    ftp_locations = [loc for loc in entry.get("publicFileLocations", []) if loc.get("name") == "FTP Protocol"]
                    if not ftp_locations:
                        continue
                    ftp_url = ftp_locations[0]["value"]
                    filename = entry.get("fileName") or Path(urlparse(ftp_url).path).name
                    if not filename:
                        continue
                    write_aria2_block(handle, dataset_dir, filename, ftp_url)
                    total += 1
                if len(payload) < limit:
                    break
                offset += limit
            handle.write("\n")
    print(f"[generate_pride_urls] {total} files written to {output}")
    return output, total

def generate_geo_urls(geo_ids: Iterable[str], output_path: str) -> Tuple[Path, int]:
    output = Path(output_path)
    ensure_dir(output.parent)
    ensure_dir(TRANSCRIPTOMICS_DIR)
    total = 0
    with output.open("w", encoding="utf-8") as handle:
        handle.write("# GEO download manifest\n")
        for geo_id in geo_ids:
            geo_id = geo_id.strip()
            if not geo_id:
                continue
            series_folder = infer_geo_series_folder(geo_id)
            base_url = f"ftp://ftp.ncbi.nlm.nih.gov/geo/series/{series_folder}/{geo_id}"
            dataset_dir = ensure_dir(TRANSCRIPTOMICS_DIR / geo_id)
            handle.write(f"# {geo_id}\n")
            if geo_id == "GSE184869":
                excel_filename = "GSE184869_rna_seq_batch_corrected_log2_TMM_normalised_CPM_protein_coding_genes.xlsx"
                excel_url = f"{base_url}/suppl/{excel_filename}"
                write_aria2_block(handle, dataset_dir, excel_filename, excel_url)
                total += 1
                handle.write("\n")
                continue
            matrix_url = f"{base_url}/matrix/{geo_id}_series_matrix.txt.gz"
            write_aria2_block(handle, dataset_dir, f"{geo_id}_series_matrix.txt.gz", matrix_url)
            total += 1
            raw_url = f"{base_url}/suppl/{geo_id}_RAW.tar"
            write_aria2_block(handle, dataset_dir, f"{geo_id}_RAW.tar", raw_url)
            total += 1
            handle.write("\n")
    print(f"[generate_geo_urls] {total} files written to {output}")
    return output, total

def derive_metabolomics_filename(url: str, counter: int) -> str:
    parsed = urlparse(url)
    path_name = Path(parsed.path).name
    if path_name and path_name.lower() != "study_download.php":
        return unquote(path_name)
    params = {}
    if parsed.query:
        for chunk in parsed.query.split('&'):
            if '=' in chunk:
                k, v = chunk.split('=', 1)
                params.setdefault(k.upper(), v)
    study = params.get('STUDY_ID', f'STUDY{counter:03d}')
    analysis = params.get('ANALYSIS_ID', f'AN{counter:03d}')
    data_type = params.get('DATA_TYPE', 'data').replace('/', '_')
    return f"{study}_{analysis}_{data_type}.zip"

def generate_reference_urls(urls: Iterable[str], output_path: str) -> Tuple[Path, int]:
    output = Path(output_path)
    ensure_dir(output.parent)
    ensure_dir(REFERENCE_DIR)
    total = 0
    with output.open("w", encoding="utf-8") as handle:
        handle.write("# Reference assets\n")
        for idx, url in enumerate(urls, start=1):
            url = url.strip()
            if not url:
                continue
            parsed = urlparse(url)
            filename = Path(parsed.path).name
            if not filename:
                filename = f"reference_asset_{idx}.dat"
            filename = unquote(filename)
            write_aria2_block(handle, REFERENCE_DIR, filename, url)
            total += 1
        handle.write("\n")
    print(f"[generate_reference_urls] {total} files written to {output}")
    return output, total

def merge_manifests(manifests: List[Path], metabolomics_manifest: Path, master_path: Path, skip_existing: bool = True) -> Tuple[Path, int, int]:
    ensure_dir(master_path.parent)
    ensure_dir(METABOLOMICS_DIR)
    queued_entries = 0
    skipped_entries = 0
    skipped_paths: List[Path] = []

    def maybe_write(target_dir: Path, filename: str, url: str) -> None:
        nonlocal queued_entries, skipped_entries
        dest = Path(target_dir) / filename
        if skip_existing and dest.exists() and dest.stat().st_size > 0:
            skipped_entries += 1
            skipped_paths.append(dest)
            return
        write_aria2_block(master, target_dir, filename, url)
        queued_entries += 1

    with master_path.open("w", encoding="utf-8") as master:
        master.write("# MASTER DOWNLOAD LIST\n\n")
        for manifest in manifests:
            master.write(f"# include {manifest.name}\n")
            current_dir: Path = RAW_DATA_PATH
            pending_out: str | None = None
            with manifest.open("r", encoding="utf-8") as handle:
                for raw_line in handle:
                    line = raw_line.strip()
                    if not line or line.startswith('#'):
                        continue
                    if line.startswith('dir='):
                        current_dir = Path(line.split('=', 1)[1])
                        continue
                    if line.startswith('out='):
                        pending_out = line.split('=', 1)[1]
                        continue
                    if pending_out is None:
                        continue
                    maybe_write(current_dir, pending_out, line)
                    pending_out = None
            master.write("\n")
        master.write("# metabolomics manifest\n")
        count = 0
        with metabolomics_manifest.open("r", encoding="utf-8") as handle:
            for raw_line in handle:
                line = raw_line.strip()
                if not line or line.startswith('#'):
                    continue
                count += 1
                filename = derive_metabolomics_filename(line, count)
                maybe_write(METABOLOMICS_DIR, filename, line)
    print(f"[merge_manifests] Queued {queued_entries} downloads (skipped {skipped_entries}). -> {master_path}")
    if skipped_paths:
        preview = ', '.join(str(p.relative_to(RAW_DATA_PATH)) for p in skipped_paths[:5])
        if len(skipped_paths) > 5:
            preview += ', ...'
        print(f"[merge_manifests] Skipped existing files: {preview}")
    return master_path, queued_entries, skipped_entries


In [None]:

# Step 1: Environment preparation
try:
    import google.colab  # type: ignore
    from google.colab import drive  # type: ignore
    if not Path("/content/drive").exists() or not list(Path("/content/drive").glob("MyDrive")):
        drive.mount("/content/drive")
    else:
        print("[info] Google Drive already mounted.")
except ModuleNotFoundError:
    print("[info] google.colab package not available. Skipping automatic mount.")

for directory in [PROJECT_DIR_PATH, RAW_DATA_PATH, DOWNLOAD_LIST_PATH, TRANSCRIPTOMICS_DIR, PROTEOMICS_DIR, METABOLOMICS_DIR, REFERENCE_DIR]:
    ensure_dir(directory)
    print(f"[mkdir] {directory}")

if shutil.which("aria2c") is None:
    print("[setup] Installing aria2...")
    subprocess.run(["apt-get", "update"], check=True)
    subprocess.run(["apt-get", "install", "-y", "aria2"], check=True)
else:
    print("[info] aria2c already available.")


In [None]:

# Step 2: Generate download manifests
download_manifests = []
pride_path, pride_count = generate_pride_urls(PRIDE_IDS, ARIA2_PRIDE_LIST)
download_manifests.append(pride_path)
geo_path, geo_count = generate_geo_urls(GEO_IDS, ARIA2_GEO_LIST)
download_manifests.append(geo_path)
reference_path, reference_count = generate_reference_urls(REFERENCE_URLS, ARIA2_REFERENCE_LIST)
download_manifests.append(reference_path)

metabolomics_manifest_path = Path(METABOLOMICS_MANIFEST)
if not metabolomics_manifest_path.exists():
    raise FileNotFoundError(f"Metabolomics manifest not found: {metabolomics_manifest_path}")
met_lines = sum(1 for line in metabolomics_manifest_path.open("r", encoding="utf-8") if line.strip() and not line.strip().startswith("#"))
print(f"[info] metabolomics manifest ready: {metabolomics_manifest_path} ({met_lines} URLs)")


In [None]:

# Step 3: Merge manifests and run bulk download
master_path, queued_count, skipped_count = merge_manifests(
    download_manifests,
    Path(METABOLOMICS_MANIFEST),
    Path(MASTER_DOWNLOAD_LIST),
    skip_existing=SKIP_EXISTING,
)
print(f"[info] MASTER_DOWNLOAD_LIST: {master_path}")
print(f"[info] Downloads queued this run: {queued_count}")
print(f"[info] Existing files skipped: {skipped_count}")


In [None]:
# Execute single aria2c command (may take a long time)
print("--- Starting Bulk Download ---")
aria2_cmd = [
    "aria2c",
    "-c",
    "-x", "16",
    "-s", "16",
    "--max-tries=0",
    "--retry-wait=30",
    f"--input-file={MASTER_DOWNLOAD_LIST}",
    f"--dir={RAW_DATA_DIR}",
    f"--log={RAW_DATA_DIR}/download.log",
    "--log-level=info",
]
result = subprocess.run(aria2_cmd, check=False)
print(f"[aria2c] return code: {result.returncode}")
print("--- Bulk Download Command Issued. Check download.log for progress. ---")


In [None]:

# Step 3b: Reorganize downloaded files into target directories
from collections import defaultdict

def parse_master_manifest(manifest_path: Path) -> List[Tuple[Path, str]]:
    mapping = []
    current_dir = RAW_DATA_PATH
    with manifest_path.open('r', encoding='utf-8') as handle:
        for raw_line in handle:
            line = raw_line.strip()
            if not line or line.startswith('#'):
                continue
            if line.startswith('dir='):
                current_dir = Path(line.split('=', 1)[1])
                continue
            if line.startswith('out='):
                filename = line.split('=', 1)[1]
                mapping.append((current_dir, filename))
                continue
    return mapping

def locate_candidate(filename: str, dest_path: Path) -> Path | None:
    direct = RAW_DATA_PATH / filename
    if direct.exists():
        return direct
    alt = RAW_DATA_PATH / f"{filename}.1"
    if alt.exists():
        return alt
    stem, suffix = Path(filename).stem, ''.join(Path(filename).suffixes)
    if suffix:
        dotted = RAW_DATA_PATH / f"{stem}.1{suffix}"
        if dotted.exists():
            return dotted
    matches = [p for p in RAW_DATA_PATH.glob(f"{filename}*") if p != dest_path]
    if matches:
        return matches[0]
    return None

def ensure_parent(path: Path) -> None:
    ensure_dir(path.parent)

manifest_path = Path(MASTER_DOWNLOAD_LIST)
if not manifest_path.exists():
    raise FileNotFoundError(f"MASTER_DOWNLOAD_LIST not found: {manifest_path}")

entries = parse_master_manifest(manifest_path)
move_records = []
missing_entries = []
already_correct = 0

for target_dir, filename in entries:
    dest_path = Path(target_dir) / filename
    if dest_path.exists():
        already_correct += 1
        continue
    candidate = locate_candidate(filename, dest_path)
    if candidate and candidate.exists():
        ensure_parent(dest_path)
        shutil.move(str(candidate), str(dest_path))
        move_records.append((candidate, dest_path))
    else:
        missing_entries.append((filename, target_dir))

print(f"[organize] Already in place: {already_correct}")
print(f"[organize] Moved files: {len(move_records)}")
for src, dest in move_records[:10]:
    print(f"  moved -> {dest.relative_to(RAW_DATA_PATH)}")
if len(move_records) > 10:
    print(f"  ... {len(move_records) - 10} more")

if missing_entries:
    print(f"[organize] Missing files: {len(missing_entries)}")
    for fname, tdir in missing_entries[:10]:
        print(f"  missing {fname} -> {tdir}")
    if len(missing_entries) > 10:
        print(f"  ... {len(missing_entries) - 10} more")
else:
    print("[organize] No missing files detected.")

leftover = [p for p in RAW_DATA_PATH.iterdir() if p.is_file()]
if leftover:
    print(f"[organize] Unassigned files remaining in {RAW_DATA_PATH}: {len(leftover)}")
    for path in leftover[:10]:
        size = path.stat().st_size
        print(f"  leftover {path.name} (size={size})")
    if len(leftover) > 10:
        print(f"  ... {len(leftover) - 10} more")
else:
    print("[organize] No files left in the top-level raw directory.")

zero_bytes = []
for target_dir, filename in entries:
    dest_path = Path(target_dir) / filename
    if dest_path.exists() and dest_path.stat().st_size == 0:
        zero_bytes.append(dest_path)
if zero_bytes:
    print(f"[organize] Warning: {len(zero_bytes)} zero-byte files detected (failed downloads):")
    for path in zero_bytes[:10]:
        print(f"  zero {path.relative_to(RAW_DATA_PATH)}")


In [None]:
# Step 4: Post-download verification
subprocess.run(["ls", "-lR", RAW_DATA_DIR], check=False)
