In [None]:
from synrxn.data_loader import DataLoader
dl = DataLoader(task="property")
dl.print_names()

In [None]:
from synrxn.split.repeated_kfold import RepeatedKFoldsSplitter
df = dl.load("b97xd3")
splitter = RepeatedKFoldsSplitter(
            n_splits=5,
            n_repeats=5,
            ratio=(8, 1, 1),
            shuffle=True,
            random_state=42,
        )

splitter.split(df, stratify_col=None)

train_df, val_df, test_df = splitter.get_split(repeat=0, fold=0, as_frame=True)

In [None]:
from __future__ import annotations
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from typing import Dict, Iterable, List, Optional, Tuple
import io
import re
import math
import hashlib
import requests
import pandas as pd

_GH_RAW_TPL = "https://raw.githubusercontent.com/{owner}/{repo}/refs/{ref_type}/{ref}/Data"
_GH_API_TPL = "https://api.github.com/repos/{owner}/{repo}/contents/Data/{task}?ref={ref}"
_ZENODO_API_TPL = "https://zenodo.org/api/records/{record_id}"


class DataLoader:
    def __init__(
        self,
        task: str,
        source: str = "github",                 # "github" or "zenodo"
        owner: str = "TieuLongPhan",
        repo: str = "SynRXN",
        ref: str = "main",                      # branch, tag, or commit SHA (when source="github")
        ref_type: str = "heads",                # "heads" for branches/SHAs, "tags" for releases
        zenodo_record: Optional[int] = None,    # e.g. 17297723
        zenodo_doi: Optional[str] = None,       # e.g. "10.5281/zenodo.17297723" (auto-parsed)
        cache_dir: Optional[Path] = None,
        timeout: int = 20,
        user_agent: str = "SynRXN-DataLoader/1.0",
        max_workers: int = 6,
    ) -> None:
        self.task = str(task).strip("/")

        self.source = source.lower().strip()
        if self.source not in {"github", "zenodo"}:
            raise ValueError("source must be 'github' or 'zenodo'")

        # GitHub config
        self.owner = owner
        self.repo = repo
        self.ref = ref
        self.ref_type = ref_type  # "heads" or "tags"
        self._gh_raw_base = _GH_RAW_TPL.format(owner=self.owner, repo=self.repo, ref_type=self.ref_type, ref=self.ref)
        self._gh_api_url = _GH_API_TPL.format(owner=self.owner, repo=self.repo, task=self.task, ref=self.ref)

        # Zenodo config
        self.zenodo_record = self._infer_zenodo_record(zenodo_record, zenodo_doi)

        # IO / network
        self.timeout = int(timeout)
        self.headers = {"User-Agent": user_agent}
        self.max_workers = int(max_workers)

        # cache
        self.cache_dir: Optional[Path] = Path(cache_dir).expanduser().resolve() if cache_dir else None
        if self.cache_dir:
            self.cache_dir.mkdir(parents=True, exist_ok=True)

        # memo
        self._names_cache: Optional[List[str]] = None
        self._zenodo_file_index: Optional[Dict[str, Dict]] = None  # maps file key -> file meta (from zenodo)

    # ------------- OOP niceties -------------
    def __repr__(self) -> str:
        z = self.zenodo_record if self.zenodo_record is not None else None
        return (f"DataLoader(task={self.task!r}, source={self.source!r}, "
                f"github={self.owner}/{self.repo}@{self.ref_type}/{self.ref}, "
                f"zenodo_record={z}, cache_dir={str(self.cache_dir) if self.cache_dir else None})")

    def __str__(self) -> str:
        if self.source == "github":
            return f"<DataLoader github:{self.owner}/{self.repo}@{self.ref_type}/{self.ref} task={self.task}>"
        else:
            return f"<DataLoader zenodo:{self.zenodo_record} task={self.task}>"

    def __len__(self) -> int:
        return len(self.names)

    def __contains__(self, name: str) -> bool:
        return name in self.names

    def __iter__(self):
        yield from self.names

    # ------------- Properties -------------
    @property
    def names(self) -> List[str]:
        return self.available_names()

    @property
    def raw_base(self) -> str:
        return self._gh_raw_base

    @property
    def api_url(self) -> str:
        return self._gh_api_url

    # ------------- Public API -------------
    def available_names(self, refresh: bool = False) -> List[str]:
        if self._names_cache is not None and not refresh:
            return list(self._names_cache)

        if self.source == "github":
            names = self._fetch_names_github()
        else:
            names = self._fetch_names_zenodo()

        self._names_cache = sorted(set(names))
        return list(self._names_cache)

    def refresh_names(self) -> List[str]:
        return self.available_names(refresh=True)

    def suggest(self, name: str, n: int = 5) -> List[str]:
        import difflib
        names = self.available_names()
        if not names:
            return []
        return difflib.get_close_matches(name, names, n=n, cutoff=0.4)

    def print_names(self, cols: int = 3, show_count: bool = True) -> None:
        names = self.available_names()
        if show_count:
            print(f"Datasets in task '{self.task}': {len(names)}")
        if not names:
            print("  (no names found)")
            return
        rows = math.ceil(len(names) / cols)
        padded = names + [""] * (rows * cols - len(names))
        matrix = [padded[i : i + rows] for i in range(0, rows * cols, rows)]
        for r in range(rows):
            row_items = [matrix[c][r].ljust(30) for c in range(cols) if matrix[c][r]]
            print("  " + "  ".join(row_items))

    def load(
        self,
        name: str,
        use_cache: bool = True,
        dtype: Optional[Dict[str, object]] = None,
        **pd_kw,
    ) -> pd.DataFrame:
        urls, checksum = self._urls_for(name)
        cache_path = self._cache_path_for(name)

        if use_cache and cache_path is not None and cache_path.exists():
            return pd.read_csv(cache_path, compression="gzip", dtype=dtype, **pd_kw)

        last_err = None
        for ext in [".csv.gz", ".csv"]:
            url = urls.get(ext)
            if not url:
                continue
            try:
                resp = requests.get(url, headers=self.headers, timeout=self.timeout, stream=False)
            except requests.RequestException as e:
                last_err = e
                continue

            if resp.status_code == 200:
                content = resp.content

                # Verify checksum if coming from Zenodo and checksum is known
                if self.source == "zenodo" and checksum is not None and ext in checksum:
                    algo, expected = checksum[ext]
                    if not self._verify_checksum(content, algo, expected):
                        last_err = RuntimeError(f"Checksum mismatch for {name}{ext}")
                        continue

                # cache gz bytes if applicable
                if use_cache and cache_path is not None and ext == ".csv.gz":
                    try:
                        cache_path.write_bytes(content)
                    except Exception:
                        pass

                buf = io.BytesIO(content)
                if ext == ".csv.gz":
                    return pd.read_csv(buf, compression="gzip", dtype=dtype, **pd_kw)
                else:
                    return pd.read_csv(buf, compression=None, dtype=dtype, **pd_kw)
            else:
                last_err = RuntimeError(f"HTTP {resp.status_code} for {url}")

        avail = self.available_names(refresh=True)
        suggestions = self.suggest(name) if avail else []
        tried = [u for u in [urls.get(".csv.gz"), urls.get(".csv")] if u]
        msg_lines = [
            f"Failed to fetch dataset '{name}' for task '{self.task}'.",
            f"Source: {self.source}",
            "Tried URLs (in order):",
        ] + [f"  {u}" for u in tried]

        if avail:
            msg_lines.append("")
            msg_lines.append("Available dataset names:")
            if len(avail) > 200:
                msg_lines.append(f"  (showing first 200 of {len(avail)}):")
                avail_display = avail[:200]
            else:
                avail_display = avail
            msg_lines += [f"  {n}" for n in avail_display]
            if suggestions:
                msg_lines.append("")
                msg_lines.append(f"Did you mean: {suggestions} ?")

        msg_lines.append("")
        msg_lines.append(f"Last error: {last_err!s}")
        raise FileNotFoundError("\n".join(msg_lines))

    def load_many(
        self,
        names: Iterable[str],
        use_cache: bool = True,
        dtype: Optional[Dict[str, object]] = None,
        parallel: bool = True,
        **pd_kw,
    ) -> Dict[str, pd.DataFrame]:
        names_list = list(names)
        results: Dict[str, pd.DataFrame] = {}

        if not parallel or self.max_workers <= 1 or len(names_list) == 1:
            for nm in names_list:
                try:
                    results[nm] = self.load(nm, use_cache=use_cache, dtype=dtype, **pd_kw)
                except Exception as e:
                    raise RuntimeError(f"Failed to load {self.task}/{nm}: {e}") from e
            return results

        with ThreadPoolExecutor(max_workers=self.max_workers) as ex:
            futures = {
                ex.submit(self.load, nm, use_cache, dtype, **pd_kw): nm
                for nm in names_list
            }
            for fut in as_completed(futures):
                nm = futures[fut]
                try:
                    results[nm] = fut.result()
                except Exception as e:
                    raise RuntimeError(f"Failed to load {self.task}/{nm}: {e}") from e
        return results

    # ------------- Internal: URL building -------------
    def _urls_for(self, name: str) -> Tuple[Dict[str, str], Optional[Dict[str, Tuple[str, str]]]]:
        if self.source == "zenodo":
            urls, checksums = self._zenodo_urls_for(name)
            # fallback to GitHub if not found in Zenodo (optional, keep behavior consistent)
            if urls:
                # ensure both keys exist for stable iteration order in load()
                if ".csv.gz" not in urls:
                    urls[".csv.gz"] = f"{self._gh_raw_base}/{self.task}/{name}.csv.gz"
                if ".csv" not in urls:
                    urls[".csv"] = f"{self._gh_raw_base}/{self.task}/{name}.csv"
                return urls, checksums
        # default: GitHub raw
        base = f"{self._gh_raw_base}/{self.task}/{name}"
        urls = {".csv.gz": f"{base}.csv.gz", ".csv": f"{base}.csv"}
        return urls, None

    def _zenodo_urls_for(self, name: str) -> Tuple[Dict[str, str], Dict[str, Tuple[str, str]]]:
        if not self.zenodo_record:
            return {}, {}
        index = self._zenodo_file_index or self._build_zenodo_index()
        if not index:
            return {}, {}

        rel_gz = f"Data/{self.task}/{name}.csv.gz"
        rel_csv = f"Data/{self.task}/{name}.csv"
        urls: Dict[str, str] = {}
        checksums: Dict[str, Tuple[str, str]] = {}

        if rel_gz in index:
            f = index[rel_gz]
            urls[".csv.gz"] = f["links"]["download"]
            algo, hexdigest = self._parse_checksum(f.get("checksum", ""))
            if algo and hexdigest:
                checksums[".csv.gz"] = (algo, hexdigest)

        if rel_csv in index:
            f = index[rel_csv]
            urls[".csv"] = f["links"]["download"]
            algo, hexdigest = self._parse_checksum(f.get("checksum", ""))
            if algo and hexdigest:
                checksums[".csv"] = (algo, hexdigest)

        return urls, checksums

    def _build_zenodo_index(self) -> Dict[str, Dict]:
        api = _ZENODO_API_TPL.format(record_id=self.zenodo_record)
        try:
            r = requests.get(api, headers=self.headers, timeout=self.timeout)
            r.raise_for_status()
            meta = r.json()
        except requests.RequestException:
            self._zenodo_file_index = {}
            return {}

        files = meta.get("files", [])
        # Build index of {key -> file_meta}
        idx = {f.get("key", ""): f for f in files if f.get("key")}
        self._zenodo_file_index = idx
        return idx

    # ------------- Internal: names fetching -------------
    def _fetch_names_github(self) -> List[str]:
        try:
            r = requests.get(self._gh_api_url, headers=self.headers, timeout=self.timeout)
            r.raise_for_status()
            items = r.json()
        except requests.RequestException:
            return []

        names = set()
        for it in items:
            nm = it.get("name", "")
            if nm.endswith(".csv.gz"):
                names.add(nm[:-len(".csv.gz")])
            elif nm.endswith(".csv"):
                names.add(nm[:-len(".csv")])
        return sorted(names)

    def _fetch_names_zenodo(self) -> List[str]:
        index = self._zenodo_file_index or self._build_zenodo_index()
        base = f"Data/{self.task}/"
        names = set()
        for key in index.keys():
            if not key.startswith(base):
                continue
            if key.endswith(".csv.gz"):
                names.add(key[len(base):-len(".csv.gz")])
            elif key.endswith(".csv"):
                names.add(key[len(base):-len(".csv")])
        return sorted(names)

    # ------------- Internal: cache, checksum, helpers -------------
    def _cache_path_for(self, name: str) -> Optional[Path]:
        if not self.cache_dir:
            return None
        return (self.cache_dir / f"{self.task}__{name}.csv.gz").resolve()

    def _verify_checksum(self, data: bytes, algo: str, expected_hex: str) -> bool:
        algo = algo.lower()
        if algo in {"md5", "sha1", "sha224", "sha256", "sha384", "sha512"}:
            h = hashlib.new(algo)
            h.update(data)
            return h.hexdigest().lower() == expected_hex.lower()
        return False

    def _parse_checksum(self, checksum_field: str) -> Tuple[Optional[str], Optional[str]]:
        # Zenodo format: "md5:abcdef123..." or "sha256:abcdef..."
        if not checksum_field:
            return None, None
        m = re.match(r"^(md5|sha1|sha224|sha256|sha384|sha512):([0-9A-Fa-f]+)$", checksum_field.strip())
        if not m:
            return None, None
        return m.group(1), m.group(2)

    def _infer_zenodo_record(self, record: Optional[int], doi: Optional[str]) -> Optional[int]:
        if record is not None:
            return int(record)
        if not doi:
            return None
        # Accept forms like "10.5281/zenodo.17297723" or "https://doi.org/10.5281/zenodo.17297723"
        # Extract the trailing numeric ID
        m = re.search(r"zenodo\.(\d+)$", doi)
        if m:
            return int(m.group(1))
        m = re.search(r"/records/(\d+)$", doi)
        if m:
            return int(m.group(1))
        # Fallback: digits at the end
        m = re.search(r"(\d+)$", doi)
        return int(m.group(1)) if m else None


# ---------------------------
# Usage examples
# ---------------------------
# 1) Latest (mutable) from GitHub main:
# dl = DataLoader(task="aam", source="github", ref="main", ref_type="heads")
# df = dl.load("ecoli")

# 2) Reproducible GitHub tag:
# dl = DataLoader(task="aam", source="github", ref="v0.0.5", ref_type="tags")
# df = dl.load("ecoli")

# 3) Archival Zenodo version (with checksum validation):
# dl = DataLoader(task="aam", source="zenodo", zenodo_record=17297723)
# df = dl.load("ecoli")

# 4) Zenodo by DOI (auto-parse record id):
# dl = DataLoader(task="aam", source="zenodo", zenodo_doi="10.5281/zenodo.17297723")
# df = dl.load("ecoli")


In [None]:
# 3) Archival Zenodo version (with checksum validation):
dl = DataLoader(task="aam", source="zenodo", zenodo_record=17297723)
df = dl.load("ecoli")
df

In [None]:
dl = DataLoader(task="aam", source="zenodo", zenodo_doi="10.5281/zenodo.17297723")
df = dl.load("ecoli")

In [None]:
df

In [11]:
from __future__ import annotations
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from typing import Dict, Iterable, List, Optional, Tuple
import io
import re
import math
import hashlib
import requests
import pandas as pd

_ZENODO_RECORD_API = "https://zenodo.org/api/records/{record_id}"
_ZENODO_SEARCH_API = "https://zenodo.org/api/records"
_GH_RAW_TPL = "https://raw.githubusercontent.com/{owner}/{repo}/refs/{ref_type}/{ref}/Data"
_GH_API_TPL = "https://api.github.com/repos/{owner}/{repo}/contents/Data/{task}?ref={ref}"

class DataLoader:
    def __init__(
        self,
        task: str,
        concept_doi: str = "10.5281/zenodo.17297258",
        version: Optional[str] = None,
        cache_dir: Optional[Path] = None,
        timeout: int = 20,
        user_agent: str = "SynRXN-DataLoader/1.1",
        max_workers: int = 6,
        gh_owner: str = "TieuLongPhan",
        gh_repo: str = "SynRXN",
        gh_ref: Optional[str] = None,
        gh_enable: bool = True,
    ) -> None:
        self.task = str(task).strip("/")
        self.concept_doi = concept_doi.strip()
        self.version = version.strip() if isinstance(version, str) else None
        self.timeout = int(timeout)
        self.headers = {"User-Agent": user_agent}
        self.max_workers = int(max_workers)

        # GitHub fallback config
        self.gh_enable = bool(gh_enable)
        self.gh_owner = gh_owner
        self.gh_repo = gh_repo
        # If user didnâ€™t provide a ref: derive from version (vX first, then X), else default main
        self._gh_try_refs: List[Tuple[str, str]] = []  # list of (ref_type, ref)
        if self.gh_enable:
            if gh_ref:
                self._gh_try_refs = [("heads", gh_ref)]
            elif self.version:
                self._gh_try_refs = [("tags", f"v{self.version}"), ("tags", self.version)]
            else:
                self._gh_try_refs = [("heads", "main")]

        self.cache_dir: Optional[Path] = Path(cache_dir).expanduser().resolve() if cache_dir else None
        if self.cache_dir:
            self.cache_dir.mkdir(parents=True, exist_ok=True)

        # Resolved Zenodo record + file index
        self._record_id: Optional[int] = self._resolve_record_id(self.concept_doi, self.version)
        self._file_index: Dict[str, Dict] = self._build_file_index(self._record_id)

        # Names caches
        self._names_cache_zenodo: Optional[List[str]] = None
        self._names_cache_github: Optional[List[str]] = None

    def __repr__(self) -> str:
        return (f"DataLoader(task={self.task!r}, concept_doi={self.concept_doi!r}, "
                f"version={self.version!r}, record_id={self._record_id}, "
                f"github={self.gh_owner}/{self.gh_repo}, gh_refs={self._gh_try_refs}, "
                f"cache_dir={str(self.cache_dir) if self.cache_dir else None})")

    def __str__(self) -> str:
        ver = self.version or "latest"
        return f"<DataLoader zenodo:{self.concept_doi} [{ver}] + github:{self.gh_owner}/{self.gh_repo} task={self.task}>"

    def __len__(self) -> int:
        return len(self.names)

    def __contains__(self, name: str) -> bool:
        return name in self.names

    def __iter__(self):
        yield from self.names

    @property
    def names(self) -> List[str]:
        return self.available_names()

    def available_names(self, refresh: bool = False) -> List[str]:
        z_names = self._available_names_zenodo(refresh=refresh)
        g_names = self._available_names_github(refresh=refresh) if self.gh_enable else []
        return sorted(set(z_names).union(g_names))

    def refresh_names(self) -> List[str]:
        self._file_index = self._build_file_index(self._record_id)
        self._names_cache_zenodo = None
        self._names_cache_github = None
        return self.available_names(refresh=True)

    def suggest(self, name: str, n: int = 5) -> List[str]:
        import difflib
        names = self.available_names()
        if not names:
            return []
        return difflib.get_close_matches(name, names, n=n, cutoff=0.4)

    def print_names(self, cols: int = 3, show_count: bool = True) -> None:
        names = self.available_names()
        if show_count:
            print(f"Datasets in task '{self.task}': {len(names)}")
        if not names:
            print("  (no names found)")
            return
        rows = math.ceil(len(names) / cols)
        padded = names + [""] * (rows * cols - len(names))
        matrix = [padded[i: i + rows] for i in range(0, rows * cols, rows)]
        for r in range(rows):
            row_items = [matrix[c][r].ljust(30) for c in range(cols) if matrix[c][r]]
            print("  " + "  ".join(row_items))

    def load(self, name: str, use_cache: bool = True, dtype: Optional[Dict[str, object]] = None, **pd_kw) -> pd.DataFrame:
        urls, checksums = self._urls_for(name)
        cache_path = self._cache_path_for(name)

        if use_cache and cache_path is not None and cache_path.exists():
            return pd.read_csv(cache_path, compression="gzip", dtype=dtype, **pd_kw)

        last_err = None
        tried = []
        for ext in [".csv.gz", ".csv"]:
            url = urls.get(ext)
            if not url:
                continue
            tried.append(url)
            try:
                resp = requests.get(url, headers=self.headers, timeout=self.timeout)
            except requests.RequestException as e:
                last_err = e
                continue
            if resp.status_code == 200:
                content = resp.content
                if ext in checksums:
                    algo, expected = checksums[ext]
                    if not self._verify_checksum(content, algo, expected):
                        last_err = RuntimeError(f"Checksum mismatch for {name}{ext}")
                        continue
                if use_cache and cache_path is not None and ext == ".csv.gz":
                    try:
                        cache_path.write_bytes(content)
                    except Exception:
                        pass
                buf = io.BytesIO(content)
                if ext == ".csv.gz":
                    return pd.read_csv(buf, compression="gzip", dtype=dtype, **pd_kw)
                else:
                    return pd.read_csv(buf, compression=None, dtype=dtype, **pd_kw)
            else:
                last_err = RuntimeError(f"HTTP {resp.status_code} for {url}")

        avail = self.available_names(refresh=True)
        suggestions = self.suggest(name) if avail else []
        msg = [
            f"Failed to fetch dataset '{name}' for task '{self.task}'.",
            f"Concept DOI: {self.concept_doi}",
            f"Version: {self.version or 'latest'} (record {self._record_id})",
            "Tried URLs:",
            *[f"  {u}" for u in tried] if tried else ["  (none found at Zenodo/GitHub lookup paths)"],
        ]
        if avail:
            msg.append("")
            msg.append("Available dataset names:")
            if len(avail) > 200:
                msg.append(f"  (showing first 200 of {len(avail)}):")
                avail_display = avail[:200]
            else:
                avail_display = avail
            msg += [f"  {n}" for n in avail_display]
            if suggestions:
                msg.append("")
                msg.append(f"Did you mean: {suggestions} ?")
        if last_err:
            msg.append("")
            msg.append(f"Last error: {last_err!s}")
        raise FileNotFoundError("\n".join(msg))

    def load_many(self, names: Iterable[str], use_cache: bool = True, dtype: Optional[Dict[str, object]] = None, parallel: bool = True, **pd_kw) -> Dict[str, pd.DataFrame]:
        names_list = list(names)
        results: Dict[str, pd.DataFrame] = {}
        if not parallel or self.max_workers <= 1 or len(names_list) == 1:
            for nm in names_list:
                try:
                    results[nm] = self.load(nm, use_cache=use_cache, dtype=dtype, **pd_kw)
                except Exception as e:
                    raise RuntimeError(f"Failed to load {self.task}/{nm}: {e}") from e
            return results
        with ThreadPoolExecutor(max_workers=self.max_workers) as ex:
            futures = {ex.submit(self.load, nm, use_cache, dtype, **pd_kw): nm for nm in names_list}
            for fut in as_completed(futures):
                nm = futures[fut]
                try:
                    results[nm] = fut.result()
                except Exception as e:
                    raise RuntimeError(f"Failed to load {self.task}/{nm}: {e}") from e
        return results

    # ---------- URL builders ----------
    def _urls_for(self, name: str) -> Tuple[Dict[str, str], Dict[str, Tuple[str, str]]]:
        urls: Dict[str, str] = {}
        checksums: Dict[str, Tuple[str, str]] = {}

        # Zenodo first
        rel_gz = f"Data/{self.task}/{name}.csv.gz"
        rel_csv = f"Data/{self.task}/{name}.csv"
        if rel_gz in self._file_index:
            f = self._file_index[rel_gz]
            urls[".csv.gz"] = f["links"]["download"]
            algo, hex_ = self._parse_checksum(f.get("checksum", ""))
            if algo and hex_:
                checksums[".csv.gz"] = (algo, hex_)
        if rel_csv in self._file_index and ".csv.gz" not in urls:
            f = self._file_index[rel_csv]
            urls[".csv"] = f["links"]["download"]
            algo, hex_ = self._parse_checksum(f.get("checksum", ""))
            if algo and hex_:
                checksums[".csv"] = (algo, hex_)

        # GitHub fallback (if missing)
        if self.gh_enable and (".csv.gz" not in urls or ".csv" not in urls):
            for ref_type, ref in self._gh_try_refs:
                base = _GH_RAW_TPL.format(owner=self.gh_owner, repo=self.gh_repo, ref_type=ref_type, ref=ref)
                if ".csv.gz" not in urls:
                    urls[".csv.gz"] = f"{base}/{self.task}/{name}.csv.gz"
                if ".csv" not in urls:
                    urls[".csv"] = f"{base}/{self.task}/{name}.csv"
                # Only one ref set; if network fails it will move to next in load()
                break

        return urls, checksums

    # ---------- Names ----------
    def _available_names_zenodo(self, refresh: bool = False) -> List[str]:
        if self._names_cache_zenodo is not None and not refresh:
            return list(self._names_cache_zenodo)
        base = f"Data/{self.task}/"
        names = set()
        for key in self._file_index.keys():
            if not key.startswith(base):
                continue
            if key.endswith(".csv.gz"):
                names.add(key[len(base):-len(".csv.gz")])
            elif key.endswith(".csv"):
                names.add(key[len(base):-len(".csv")])
        self._names_cache_zenodo = sorted(names)
        return list(self._names_cache_zenodo)

    def _available_names_github(self, refresh: bool = False) -> List[str]:
        if self._names_cache_github is not None and not refresh:
            return list(self._names_cache_github)
        names = set()
        for ref_type, ref in self._gh_try_refs:
            api_url = _GH_API_TPL.format(owner=self.gh_owner, repo=self.gh_repo, task=self.task, ref=ref)
            try:
                r = requests.get(api_url, headers=self.headers, timeout=self.timeout)
                r.raise_for_status()
                items = r.json()
            except requests.RequestException:
                continue
            for it in items:
                nm = it.get("name", "")
                if nm.endswith(".csv.gz"):
                    names.add(nm[:-len(".csv.gz")])
                elif nm.endswith(".csv"):
                    names.add(nm[:-len(".csv")])
            break  # only first successful ref
        self._names_cache_github = sorted(names)
        return list(self._names_cache_github)

    # ---------- Zenodo resolution ----------
    def _resolve_record_id(self, concept_doi: str, version: Optional[str]) -> int:
        params = {"q": f'conceptdoi:"{concept_doi}"', "all_versions": 1, "size": 200}
        r = requests.get(_ZENODO_SEARCH_API, params=params, headers=self.headers, timeout=self.timeout)
        r.raise_for_status()
        hits = r.json().get("hits", {}).get("hits", [])
        if not hits:
            raise RuntimeError(f"No Zenodo records found for concept DOI {concept_doi}")
        if version:
            target = self._normalize_version(version)
            for h in hits:
                meta_ver = self._normalize_version(h.get("metadata", {}).get("version", ""))
                if meta_ver == target:
                    return int(h["id"])
            # fallback to raw compare
            for h in hits:
                raw = str(h.get("metadata", {}).get("version", "")).strip()
                if raw == version or raw == f"v{version}" or f"v{raw}" == version:
                    return int(h["id"])
            raise RuntimeError(f"Version '{version}' not found under {concept_doi}. "
                               f"Available: {sorted({h.get('metadata', {}).get('version','') for h in hits})}")
        hits_sorted = sorted(hits, key=lambda h: h.get("updated", h.get("created", "")), reverse=True)
        return int(hits_sorted[0]["id"])

    def _build_file_index(self, record_id: Optional[int]) -> Dict[str, Dict]:
        if record_id is None:
            return {}
        url = _ZENODO_RECORD_API.format(record_id=record_id)
        r = requests.get(url, headers=self.headers, timeout=self.timeout)
        r.raise_for_status()
        meta = r.json()
        files = meta.get("files", [])
        return {f.get("key", ""): f for f in files if f.get("key")}

    # ---------- Utils ----------
    def _normalize_version(self, v: str) -> str:
        v = str(v).strip()
        if v.lower().startswith("v"):
            v = v[1:]
        return v

    def _parse_checksum(self, checksum_field: str) -> Tuple[Optional[str], Optional[str]]:
        if not checksum_field:
            return None, None
        m = re.match(r"^(md5|sha1|sha224|sha256|sha384|sha512):([0-9A-Fa-f]+)$", checksum_field.strip())
        if not m:
            return None, None
        return m.group(1), m.group(2)

    def _verify_checksum(self, data: bytes, algo: str, expected_hex: str) -> bool:
        algo = algo.lower()
        if algo in {"md5", "sha1", "sha224", "sha256", "sha384", "sha512"}:
            h = hashlib.new(algo)
            h.update(data)
            return h.hexdigest().lower() == expected_hex.lower()
        return False

    def _cache_path_for(self, name: str) -> Optional[Path]:
        if not self.cache_dir:
            return None
        return (self.cache_dir / f"{self.task}__{name}.csv.gz").resolve()

# -------------------------
# Usage
# -------------------------
# 1) Pin to specific Zenodo version, fallback to GitHub tag v{version}:
# dl = DataLoader(task="aam", concept_doi="10.5281/zenodo.17297258", version="0.0.5")
# df = dl.load("ecoli")

# 2) Latest Zenodo, fallback to GitHub main:
# dl = DataLoader(task="aam", concept_doi="10.5281/zenodo.17297258", version=None)
# df = dl.load("some_name")


SyntaxError: invalid syntax (1844242779.py, line 166)

In [10]:
# 2) Pin to a specific version label (e.g., "0.0.5" or "v0.0.5"):
dl = DataLoader(task="aam", concept_doi="10.5281/zenodo.17297258", version="0.0.5")
df = dl.load("ecoli")


FileNotFoundError: Failed to fetch dataset 'ecoli' for task 'aam'.
Concept DOI: 10.5281/zenodo.17297258
Version: 0.0.5 (record 17297723)
Tried URLs:

Last error: None