# Extract BioProject datasets pipeline

This notebook restores the original interactive pipeline helpers for querying NCBI BioProject and SRA metadata.


In [None]:

"""Utilities for extracting BioProject datasets from NCBI."""
from __future__ import annotations

from pathlib import Path
from typing import Dict, Iterable, List, Sequence
import xml.etree.ElementTree as ET

import pandas as pd
from Bio import Entrez


__all__ = [
    "BIOPROJECT_QUERY_DEFAULT",
    "extract_project_data_types",
    "fetch_bioproject_details",
    "fetch_srr_details",
    "load_bioproject_table",
    "map_bioproject_to_biosample",
    "map_bioproject_to_srr",
    "run_pipeline",
    "search_bioprojects",
    "setup_entrez",
    "summarize_samples",
    "write_bioproject_srr_mapping",
    "write_bioproject_table",
]


BIOPROJECT_QUERY_DEFAULT = '("Exaiptasia diaphana"[Organism] AND microbiome)'


def setup_entrez(email: str) -> None:
    """Configure the Entrez client with a contact email address."""

    Entrez.email = email


def search_bioprojects(query: str, retmax: int = 100) -> Dict:
    """Run an Entrez search against the BioProject database."""

    handle = Entrez.esearch(db="bioproject", term=query, retmax=retmax)
    try:
        return Entrez.read(handle)
    finally:
        handle.close()


def fetch_bioproject_details(id_list: Sequence[str]) -> List[Dict]:
    """Fetch metadata for a sequence of BioProject identifiers."""

    if not id_list:
        return []

    handle = Entrez.efetch(
        db="bioproject",
        id=",".join(id_list),
        rettype="docsum",
        retmode="xml",
    )
    try:
        document = Entrez.read(handle)["DocumentSummarySet"]["DocumentSummary"]
    finally:
        handle.close()
    return document


def extract_project_data_types(project: Dict) -> List[str]:
    """Derive the sequencing or assay data types associated with a BioProject."""

    data_types: List[str] = []
    data_type_set = project.get("ProjectDataTypeSet")
    if isinstance(data_type_set, dict):
        project_data = data_type_set.get("ProjectDataType")
        if isinstance(project_data, dict):
            project_data = [project_data]
        if isinstance(project_data, list):
            for item in project_data:
                if not isinstance(item, dict):
                    continue
                candidates = []
                for key in ("DataType", "data_type", "Data_Type"):
                    value = item.get(key)
                    if value:
                        candidates.append(value)
                if not candidates:
                    value = item.get("#text") or item.get("@value")
                    if value:
                        candidates.append(value)
                for value in candidates:
                    if isinstance(value, dict):
                        value = value.get("#text") or value.get("@value")
                    if isinstance(value, str):
                        normalized = value.strip()
                        if normalized and normalized not in data_types:
                            data_types.append(normalized)
    if not data_types:
        project_type = project.get("ProjectType")
        if isinstance(project_type, dict):
            for key in ("ProjectTypeSubmission", "SubmissionType", "#text", "@value"):
                value = project_type.get(key)
                if isinstance(value, dict):
                    value = value.get("#text") or value.get("@value")
                if isinstance(value, str):
                    normalized = value.strip()
                    if normalized:
                        data_types.append(normalized)
                        break
    return data_types


def write_bioproject_table(projects: Iterable[Dict], path: Path) -> pd.DataFrame:
    """Write BioProject metadata to a TSV file and return it as a DataFrame."""

    columns = [
        "ProjectId",
        "ProjectAcc",
        "ProjectDate",
        "ProjectTitle",
        "ProjectDescription",
        "OrganismName",
        "OrganismStrain",
        "ProjectDataTypes",
    ]
    records = []
    for project in projects:
        data_types = extract_project_data_types(project)
        records.append(
            {
                "ProjectId": project.get("Project_Id", ""),
                "ProjectAcc": project.get("Project_Acc", ""),
                "ProjectDate": project.get("Registration_Date", ""),
                "ProjectTitle": project.get("Project_Title", ""),
                "ProjectDescription": project.get("Project_Description", ""),
                "OrganismName": project.get("Organism_Name", ""),
                "OrganismStrain": project.get("Organism_Strain", ""),
                "ProjectDataTypes": "; ".join(data_types) if data_types else "",
            }
        )
    df = pd.DataFrame(records, columns=columns)
    path = Path(path)
    path.parent.mkdir(parents=True, exist_ok=True)
    df.to_csv(path, sep="\t", index=False)
    return df


def load_bioproject_table(path: Path) -> pd.DataFrame:
    """Load a BioProject table that was previously written to disk."""

    return pd.read_csv(path, sep="\t")


## Example execution

Uncomment the call below to run the pipeline with your email and output directory.


In [None]:
# Example usage:
# results = run_pipeline(
#     email="lukas.becker@hhu.de",
#     output_dir=Path("../data"),
# )
# results["sample_summary"].head()
