# ATS MDA workflow (expanded and optimized)

This notebook implements a **step-by-step, auditable workflow** for preparing an **ATS** model run directory for publication as a **Model Data Archive (MDA)** in **ESS-DIVE**.

The workflow is intentionally split into two complementary deliverables:

1. **A curated copy of your simulation directory** (the *data package directory*) that contains only files you intend to archive.
2. Two ESS-DIVE companion tables generated from that curated copy:
   - `flmd.csv` — *file-level metadata* (one row per file)
   - `dd.csv` — *data dictionary* for key tabular outputs (column names, units, definitions)

## What this notebook does (high level)

1. **Configuration & validation**
   - Read your scratch/work location from the environment (commonly `SCRATCH` on HPC systems).
   - Define: (a) the source simulation directory, (b) the destination data package directory.

2. **Copy only publishable artifacts**
   - Use `rsync` include/exclude rules (fast, preserves structure).
   - Optionally fall back to a pure-Python copier if `rsync` is unavailable.

3. **Post-copy cleanup**
   - Remove bulky or intermediate files that are not needed for publication (e.g., periodic checkpoints, visualization time-series, Paraview `.xmf`).
   - Keep final checkpoints (or any file patterns you explicitly choose).

4. **Enumerate files and generate `flmd.csv`**
   - Walk the curated data package directory.
   - Create a file manifest with paths plus a first-pass description field you can refine.

5. **Generate `dd.csv` for tabular outputs**
   - Locate observation/output CSVs (default: `water_balance*.csv`).
   - Parse the header into *parameter names* and *units*.
   - Auto-populate *definitions* from ATS input-spec documentation when possible.

## What you must still do manually

- Review and edit both CSVs (especially descriptions/definitions) to reflect your manuscript and study context.
- Add project-level metadata (authors, funding, spatial/temporal coverage, etc.) using the ESS-DIVE submission interface and templates.
- Upload the curated package and metadata to ESS-DIVE.

## Assumptions and expectations

- You have a simulation directory with a stable structure (often including run subfolders like `run0`, `run1`, etc.).
- You have sufficient disk quota to create a curated copy.
- You are comfortable adjusting a few configuration variables in the **Configuration** section.

## Safety and reproducibility notes

- This workflow is designed to be *non-destructive to the original simulation directory*.
- **Deletion steps only operate inside the destination data package directory**, not the source.
- If you enable optional CSV rewriting, the notebook defaults to writing a separate `*_tmp` file rather than overwriting in place.

---

Proceed top-to-bottom. Each section includes context, rationale, and what to verify before moving on.


In [None]:
# Core imports
from __future__ import annotations

import os
import re
import shutil
import subprocess
import logging
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, Iterable, List, Optional, Tuple

import pandas as pd
import requests
from io import StringIO

# ----------------------------------------------------------------------------
# Logging
# ----------------------------------------------------------------------------
logger = logging.getLogger("ats-mda")
if not logger.handlers:
    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s | %(levelname)s | %(message)s",
        datefmt="%Y-%m-%d %H:%M:%S",
    )


def which(executable: str) -> Optional[str]:
    """Return the resolved path to an executable or None if not found."""
    return shutil.which(executable)


def run_cmd(cmd: List[str], *, cwd: Optional[Path] = None) -> str:
    """Run a command safely (no shell), log it, and return stdout.

    Raises
    ------
    subprocess.CalledProcessError
        If the command exits non-zero.
    """
    logger.info("Running: %s", " ".join(cmd))
    result = subprocess.run(
        cmd,
        cwd=str(cwd) if cwd else None,
        check=True,
        text=True,
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
    )
    if result.stderr.strip():
        # Keep stderr visible but do not treat as failure if exit code was 0.
        logger.debug("stderr: %s", result.stderr.strip())
    return result.stdout




def run_cmd_live(cmd: List[str], *, cwd: Optional[Path] = None) -> None:
    """Run a command and stream stdout/stderr live to the notebook.

    This is particularly useful for long-running tools (e.g., rsync) where
    users expect to see progress in real time.
    """
    logger.info("Running (live): %s", " ".join(cmd))
    subprocess.run(
        cmd,
        cwd=str(cwd) if cwd else None,
        check=True,
        text=True,
    )
def ensure_dir(path: Path) -> None:
    """Create a directory if it does not exist."""
    path.mkdir(parents=True, exist_ok=True)


def human_bytes(num_bytes: int) -> str:
    """Format bytes as a human-friendly string."""
    units = ["B", "KB", "MB", "GB", "TB", "PB"]
    size = float(num_bytes)
    for u in units:
        if size < 1024.0 or u == units[-1]:
            return f"{size:.2f} {u}"
        size /= 1024.0


def dir_size_bytes(root: Path) -> int:
    """Compute total size of all files under root."""
    total = 0
    for p in root.rglob("*"):
        if p.is_file():
            total += p.stat().st_size
    return total


def list_files(root: Path) -> List[Path]:
    """Return all files under root (sorted), as absolute Paths."""
    return sorted([p for p in root.rglob("*") if p.is_file()])


def rel_manifest_rows(root: Path, files: List[Path]) -> Tuple[List[str], List[str]]:
    """Return (filenames, file_paths) for an ESS-DIVE-like manifest.

    - File_Name is the basename
    - File_Path is a *relative* folder path with a trailing slash
    """
    names: List[str] = []
    paths: List[str] = []

    for p in files:
        rel = p.relative_to(root)
        names.append(rel.name)
        parent = rel.parent.as_posix()
        if parent in ("", "."):
            paths.append("./")
        else:
            paths.append(f"./{parent}/")
    return names, paths


def normalize_column_name(name: str) -> str:
    """Normalize a header label to a stable, machine-friendly column name.

    This follows the spirit of the original notebook (replace spaces with
    underscores), but is more explicit and predictable.
    """
    name = name.strip()
    name = re.sub(r"\s+", "_", name)
    name = re.sub(r"[^0-9A-Za-z_\.-]", "", name)
    name = re.sub(r"_+", "_", name)
    return name


def parse_ats_csv_header_line(line: str) -> Tuple[List[str], List[str]]:
    """Parse a CSV header of the form: `var [unit], var2 [unit2], ...`.

    Returns
    -------
    parameters : list[str]
        Normalized column names.
    units : list[str]
        Unit strings ("N/A" if missing).
    """
    tokens = [t.strip().strip("\ufeff") for t in line.strip().split(",")]

    params: List[str] = []
    units: List[str] = []

    pattern = re.compile(r"^(.*?)\s*\[(.*?)\]\s*$")

    for tok in tokens:
        m = pattern.match(tok)
        if m:
            raw_name, raw_unit = m.group(1), m.group(2)
            params.append(normalize_column_name(raw_name))
            units.append(raw_unit.strip() or "N/A")
        else:
            # No unit bracket detected; keep the full token as the name.
            params.append(normalize_column_name(tok))
            units.append("N/A")

    return params, units


def detect_header_line(lines: List[str], *, max_lines: int = 400) -> int:
    """Heuristically detect a header line index in ATS-style CSV outputs.

    Many ATS outputs include a preamble, then a single header line that contains
    multiple comma-separated fields, commonly with unit brackets `[ ... ]`.

    The heuristic below looks for the first line that:
    - contains commas (suggesting CSV), AND
    - contains at least one '[' and one ']' (suggesting `name [unit]` tokens)

    If your file does not follow this pattern, set a manual override.
    """
    scan = lines[:max_lines]
    for i, line in enumerate(scan):
        if "," in line and "[" in line and "]" in line:
            return i
    raise ValueError(
        "Could not auto-detect a header line. "
        "Set `header_line_hint` explicitly in the configuration section."
    )


def parse_org_table(text: str) -> pd.DataFrame:
    """Parse an org-mode pipe table into a DataFrame.

    This is a lightweight parser that:
    - keeps rows starting with '|'
    - discards separator rows (those that are mostly dashes/plus)
    """
    rows: List[List[str]] = []
    for raw in text.splitlines():
        line = raw.strip()
        if not line.startswith("|"):
            continue
        # Skip org separator lines like |-----+-----|
        if set(line.replace("|", "").strip()) <= set("-+"):
            continue
        parts = [p.strip() for p in line.strip("|").split("|")]
        rows.append(parts)

    if not rows:
        raise ValueError("No org-table rows found.")

    header = rows[0]
    data = rows[1:]

    df = pd.DataFrame(data, columns=header)
    # Drop empty column names if any
    df = df.loc[:, [c for c in df.columns if str(c).strip()]]
    return df


def build_definition_map_from_symbol_table(df_symbol: pd.DataFrame) -> Dict[str, str]:
    """Best-effort extraction of (variable_root -> description) from ATS symbol table."""

    cols = [c.lower() for c in df_symbol.columns]

    # Try to locate columns by name first.
    var_col = None
    desc_col = None
    for c in df_symbol.columns:
        cl = str(c).lower()
        if var_col is None and "variable" in cl and "name" in cl:
            var_col = c
        if desc_col is None and "description" in cl:
            desc_col = c

    # Fall back to historical positional assumptions.
    if var_col is None or desc_col is None:
        if df_symbol.shape[1] >= 4:
            var_col = df_symbol.columns[1]
            desc_col = df_symbol.columns[3]
        else:
            raise ValueError(
                "Unexpected symbol table shape; cannot infer variable/description columns."
            )

    # Build map, removing obvious empties.
    out: Dict[str, str] = {}
    for v, d in zip(df_symbol[var_col], df_symbol[desc_col]):
        v = str(v).strip()
        d = str(d).strip()
        if v and v.lower() != "nan" and d and d.lower() != "nan":
            out[v] = d
    return out


def fill_definitions(parameters: List[str], definition_map: Dict[str, str]) -> List[str]:
    """Fill definitions by substring matching (longest-key-wins).

    This mirrors the intent of the original notebook (match symbol-table keys
    within column names) but avoids pandas chained assignment and is faster.
    """
    keys = sorted(definition_map.keys(), key=len, reverse=True)
    defs: List[str] = []
    for p in parameters:
        matched = ""
        for k in keys:
            if k in p:
                matched = definition_map[k]
                break
        defs.append(matched)
    return defs


## Configuration

This is the only section you should need to edit for a typical project.

### Key paths

- **`SCRATCH_ROOT`**: Where you want the curated data package to live.
  - On many HPC systems this is provided as an environment variable named `SCRATCH`.
  - If `SCRATCH` is not set, the notebook falls back to the current working directory.

- **`SIMULATION_DIR`**: The *source* directory containing your ATS run outputs.

- **`DATA_PKG_DIR`**: The *destination* directory that becomes your MDA payload.
  - The notebook copies files into this directory and may delete intermediate artifacts *inside it*.

### File selection logic

- **`INCLUDE_EXTENSIONS`** controls which file types are copied.
- Slurm stdout/stderr files are also included via a name pattern (default: `slurm*`).

### Observation/data-dictionary logic

- The data dictionary step targets one class of CSV outputs.
- By default, it searches for files matching `water_balance*.csv`.

If your simulation uses different names, change `OBS_FILE_HANDLE` and/or `OBS_FILE_FORMAT`.


In [None]:
@dataclass(frozen=True)
class Config:
    # Root working location (HPC scratch or local path)
    scratch_root: Path

    # Source (simulation) and destination (curated data package)
    simulation_dir: Path
    data_pkg_dir: Path

    # What to copy
    include_extensions: Tuple[str, ...]
    include_name_globs: Tuple[str, ...]

    # Optional post-copy cleanup patterns
    #
    # Many ATS simulations write outputs under directories whose names contain
    # tokens like run0, run1, run2, etc. Not every user has every run token.
    # This notebook therefore treats these tokens as *optional*:
    #   - If at least one matching run directory is found, cleanup will run
    #     on the directories that exist.
    #   - If none are found, the cleanup cell raises an error (because it has
    #     nothing to operate on).
    run_tokens: Tuple[str, ...]
    keep_checkpoint_token: str

    # Data dictionary targeting
    obs_file_handle: str
    obs_file_format: str

    # Header parsing
    header_line_hint: Optional[int] = None
    header_probe_max_lines: int = 400

    # If you want to write cleaned CSVs (avoid in-place edits by default)
    write_new_csv: bool = False
    inplace: bool = False  # Only used if write_new_csv is True


SCRATCH_ROOT = Path(os.environ.get("SCRATCH") or Path.cwd())

cfg = Config(
    scratch_root=SCRATCH_ROOT,
    simulation_dir=SCRATCH_ROOT / "Naches",  # EDIT ME
    data_pkg_dir=SCRATCH_ROOT / "my_ATS_MDA",  # EDIT ME
    include_extensions=(
        "exo", "xml", "csv", "dat", "txt", "xmf", "h5", "out", "nc", "jpg", "png", "pdf"
    ),
    include_name_globs=("slurm*",),
    run_tokens=("run0", "run1", "run2"),
    keep_checkpoint_token="final",  # keep checkpoint*final*.h5
    obs_file_handle="water_balance",
    obs_file_format="csv",
    # If auto-detection fails, set a manual line index (0-based).
    # header_line_hint=112,
    write_new_csv=False,
    inplace=False,
)

cfg


## 1) Validate paths and prepare destination

Before copying anything, confirm:

- `cfg.simulation_dir` points to the directory that contains your finished ATS run outputs.
- `cfg.data_pkg_dir` is a **new or disposable** directory where the notebook can place a curated copy.

The notebook will:

- create `cfg.data_pkg_dir` if needed;
- log the source/destination sizes so you can sanity-check that the curated copy is smaller than the full run.


In [None]:
# Validate source directory
if not cfg.simulation_dir.exists():
    raise FileNotFoundError(
        f"Simulation directory does not exist: {cfg.simulation_dir}\n"
        "Update cfg.simulation_dir in the Configuration section."
    )
logger.info("Simulation directory: %s", cfg.simulation_dir)

# Prepare destination
ensure_dir(cfg.data_pkg_dir)
logger.info("Data package directory: %s", cfg.data_pkg_dir)

# Optional: report current size (destination might already have content)
try:
    src_size = dir_size_bytes(cfg.simulation_dir)
    dst_size = dir_size_bytes(cfg.data_pkg_dir)
    logger.info("Source size: %s", human_bytes(src_size))
    logger.info("Destination current size: %s", human_bytes(dst_size))
except Exception as e:
    logger.warning("Could not compute directory sizes: %s", e)


## 2) Copy publishable artifacts into the data package directory

This step creates the curated package by copying only selected files.

### Why use `rsync` include/exclude rules?

- It preserves directory structure.
- It is fast for large directories.
- It avoids copying unwanted file types up front.

The include rules are:

- Include all folders (`--include=*/`) so the traversal works.
- Include file extensions listed in `cfg.include_extensions`.
- Include any filename globs listed in `cfg.include_name_globs` (e.g., `slurm*`).
- Exclude everything else.

If `rsync` is not available, the notebook falls back to a pure-Python copy that preserves relative paths.


In [None]:
# Build include arguments
include_args: List[str] = []
for ext in cfg.include_extensions:
    include_args.append(f"--include=*.{ext}")
for glob_pat in cfg.include_name_globs:
    include_args.append(f"--include={glob_pat}")

rsync_path = which("rsync")

if rsync_path:
    # NOTE: `-P` enables `--partial --progress` (per-file progress).
    # We intentionally run rsync in "live" mode so the progress is visible.
    cmd = [
        "rsync",
        "-avhP",
        "--include=*/",
        *include_args,
        "--exclude=*",
        "--prune-empty-dirs",
        f"{str(cfg.simulation_dir).rstrip('/')}/",
        str(cfg.data_pkg_dir),
    ]

    run_cmd_live(cmd)
else:
    logger.warning("rsync not found; using a slower Python-based copy.")

    allowed_exts = {f".{e.lower()}" for e in cfg.include_extensions}

    for src in cfg.simulation_dir.rglob("*"):
        if not src.is_file():
            continue

        if src.suffix.lower() in allowed_exts or any(src.match(p) for p in cfg.include_name_globs):
            rel = src.relative_to(cfg.simulation_dir)
            dst = cfg.data_pkg_dir / rel
            ensure_dir(dst.parent)
            shutil.copy2(src, dst)

logger.info("Copy complete. Destination size: %s", human_bytes(dir_size_bytes(cfg.data_pkg_dir)))


## 3) Post-copy cleanup (remove bulky intermediates)

Publication packages should generally exclude:

- **Periodic checkpoints** that are not required to reproduce or understand final results.
- **Visualization time-series files** that can be regenerated and are very large.
- **Paraview `.xmf`** metadata files (optional; include them only if they are part of your reproducibility story).

### Run directories are not guaranteed

Different users and workflows may have different run directory layouts. A common convention is to place outputs under
folders whose names include tokens like `run0`, `run1`, or `run2`, but it is **not required** that all (or any specific)
run tokens exist.

This notebook therefore implements the following rule:

- If the package contains **at least one** directory whose name includes any of the tokens in `cfg.run_tokens`
  (default: `run0`, `run1`, `run2`), cleanup will operate on **only the directories that exist**.
- If the package contains **none** of those directories, the cleanup step raises an error.

### Conservative default cleanup rules

- Remove `checkpoint*.h5` that do **not** contain the token configured as `cfg.keep_checkpoint_token` (default: `final`).
- Remove all `*.xmf` under detected run directories.
- Remove all `ats_vis_*.h5` under detected run directories.

Adjust these rules to match your publication and reproducibility requirements.


In [None]:
def find_run_dirs(tokens: Iterable[str], root: Path) -> Dict[str, Path]:
    """Find run directories whose names contain any of the provided tokens.

    Requirements
    ------------
    - Users may have *some* of run0/run1/run2 (or none).
    - If none are found, raise an error.
    - If one or more are found, return only the ones that exist.

    Notes
    -----
    We match directories by substring using `*{token}*` so layouts like
    `spinup_run1` or `caseA/run0_foo` are detected.
    """
    found: Dict[str, Path] = {}

    tokens_tuple = tuple(tokens)
    for token in tokens_tuple:
        candidates = [p for p in root.rglob(f"*{token}*") if p.is_dir()]
        if not candidates:
            continue

        # Prefer the shallowest path (fewest parts) to avoid nested hits.
        candidates = sorted(candidates, key=lambda p: (len(p.parts), str(p)))
        chosen = candidates[0]
        found[token] = chosen

        if len(candidates) > 1:
            logger.warning(
                "Multiple candidates found for %s; choosing %s (alternatives: %s)",
                token,
                chosen,
                ", ".join(str(c) for c in candidates[1:5]),
            )

    if not found:
        raise FileNotFoundError(
            f"Could not find any run directories matching {tokens_tuple} under {root}.\n"
            "Expected at least one directory containing one of: run0, run1, run2 (or the tokens you configured)."
        )

    return found


run_dirs = find_run_dirs(cfg.run_tokens, cfg.data_pkg_dir)
run_dirs


In [None]:
# Remove non-final checkpoints, xmf, and visualization files
removed = {
    "non_final_checkpoints": 0,
    "xmf": 0,
    "ats_vis": 0,
}

for token, run_dir in run_dirs.items():
    # 1) Checkpoints
    checkpoints = list(run_dir.rglob("checkpoint*.h5"))
    non_final = [p for p in checkpoints if cfg.keep_checkpoint_token not in p.name]
    for p in non_final:
        p.unlink(missing_ok=True)
        removed["non_final_checkpoints"] += 1

    # 2) XMF files
    for p in run_dir.rglob("*.xmf"):
        p.unlink(missing_ok=True)
        removed["xmf"] += 1

    # 3) Visualization time-series
    for p in run_dir.rglob("ats_vis_*.h5"):
        p.unlink(missing_ok=True)
        removed["ats_vis"] += 1

logger.info("Cleanup summary: %s", removed)
logger.info("Destination size after cleanup: %s", human_bytes(dir_size_bytes(cfg.data_pkg_dir)))


## 4) Enumerate all files and generate `flmd.csv`

The *file-level metadata* table is typically required by repositories to:

- provide a complete inventory of files;
- explain what each file is and how it relates to the dataset;
- support discovery and reuse.

This notebook generates a first-pass `flmd.csv` that you can edit.

### Notes on the fields

- `File_Name`: basename only
- `File_Path`: relative folder path (with trailing slash)
- `File_Description`: intentionally left for you to refine, but we add a few auto-filled hints based on filename patterns
- `Start_Date` / `End_Date`: left as `-9999` placeholders (replace if you have file-level temporal coverage)

After creation, open the CSV in a spreadsheet editor and improve descriptions as needed.


In [None]:
all_files = list_files(cfg.data_pkg_dir)
logger.info("Total files in package: %d", len(all_files))

file_names, file_paths = rel_manifest_rows(cfg.data_pkg_dir, all_files)

flmd = pd.DataFrame(
    {
        "File_Name": file_names,
        "File_Description": ["" for _ in all_files],
        "Standard": ["N/A" for _ in all_files],
        "Header_Rows": [-9999 for _ in all_files],
        "Column_or_Row_Name_Position": [-9999 for _ in all_files],
        "File_Path": file_paths,
    }
)

# Auto-fill some common patterns to reduce manual effort
pattern_to_desc = {
    "water_balance": "Model output ASCII/CSV file (water balance diagnostics).",
    "checkpoint": "ATS checkpoint file (binary restart/state).",
    "ats_vis": "ATS visualization output file (binary; often large).",
    "pflotran": "Output file from PFLOTRAN (biogeochemistry engine).",
    "slurm": "Scheduler (Slurm) stdout/stderr log for the run.",
    "xml": "ATS input/configuration file.",
    "exo": "Computational mesh file.",
    "LAI": "Leaf Area Index input forcing.",
}

for pat, desc in pattern_to_desc.items():
    mask = flmd["File_Name"].str.contains(pat, regex=False)
    flmd.loc[mask, "File_Description"] = desc

flmd_path = cfg.data_pkg_dir / "flmd.csv"
flmd.to_csv(flmd_path, index=False)
logger.info("Wrote: %s", flmd_path)

flmd.head(10)

## 5) Build a data dictionary (`dd.csv`) from a representative output CSV

This section focuses on tabular data intended for reuse (e.g., time series diagnostics).

### How the parsing works

1. Find files matching `f"{cfg.obs_file_handle}*.{cfg.obs_file_format}"`.
2. Read a small number of initial lines.
3. Detect the header line (or use `cfg.header_line_hint` if set).
4. Parse header tokens of the form `name [unit]`.

The result is a `dd.csv` with:

- `Column_or_Row_Name`
- `Unit`
- `Definition` (auto-filled later when possible)
- `Data_Type`
- `Term_Type`

If your CSV format differs (e.g., units in a second header row), adjust the parsing functions in the setup cell.


In [None]:
# Locate candidate CSVs
csv_files = sorted(cfg.data_pkg_dir.rglob(f"{cfg.obs_file_handle}*.{cfg.obs_file_format}"))
if not csv_files:
    raise FileNotFoundError(
        f"No files found matching {cfg.obs_file_handle}*.{cfg.obs_file_format} under {cfg.data_pkg_dir}"
    )

logger.info("Found %d candidate CSV files. Using the first as a schema reference: %s", len(csv_files), csv_files[0])

ref_csv = csv_files[0]
lines = ref_csv.read_text(errors="replace").splitlines(keepends=True)

if cfg.header_line_hint is not None:
    header_idx = cfg.header_line_hint
else:
    header_idx = detect_header_line(lines, max_lines=cfg.header_probe_max_lines)

logger.info("Using header line index: %d", header_idx)
header_line = lines[header_idx]

parameters, units = parse_ats_csv_header_line(header_line)
logger.info("Parsed %d parameters", len(parameters))

# Optional rewriting of CSV headers (disabled by default)
if cfg.write_new_csv:
    for p in csv_files:
        content = p.read_text(errors="replace").splitlines(keepends=True)
        content[header_idx] = ",".join([f"{a} [{b}]" if b != "N/A" else a for a, b in zip(parameters, units)]) + "\n"

        out_path = p if cfg.inplace else p.with_name(p.name + "_tmp")
        out_path.write_text("".join(content))

# Create dd.csv skeleton

dd = pd.DataFrame(
    {
        "Column_or_Row_Name": parameters,
        "Unit": units,
        "Definition": ["" for _ in parameters],
        "Data_Type": ["numeric" for _ in parameters],
        "Term_Type": ["column_header" for _ in parameters],
        "Missing_Value_Code": ['"N/A"; "-9999"; ""; "NA"' for _ in parameters],
        "Reported_Precision": [1E-16 for _ in parameters],
    }
)

dd.head(15)

## 6) Auto-fill definitions from ATS input-spec documentation (best effort)

ATS publishes an input-specification *symbol table* that maps many variable root names to descriptions.

This notebook attempts to:

1. Download the symbol table (an org-mode table) from the ATS GitHub repository.
2. Parse it into a lookup map.
3. Fill `dd["Definition"]` by matching the *longest* symbol-table key that appears as a substring of each column name.

Important limitations:

- Not every output column will have a corresponding symbol-table entry.
- Matching is heuristic; you must review the definitions.
- Network access is required for the download step.

If you are working offline, skip the download and fill definitions manually.


In [None]:
# Download and parse symbol table
symbol_table_url = (
    "https://raw.githubusercontent.com/amanzi/ats/refs/heads/master/"
    "docs/documentation/source/input_spec/symbol_table.org"
)

try:
    resp = requests.get(symbol_table_url, timeout=30)
    resp.raise_for_status()

    df_symbol = parse_org_table(resp.text)
    definition_map = build_definition_map_from_symbol_table(df_symbol)

    logger.info("Loaded %d symbol-table entries", len(definition_map))

    dd["Definition"] = fill_definitions(dd["Column_or_Row_Name"].tolist(), definition_map)

    missing = (dd["Definition"].str.strip() == "").sum()
    logger.info("Columns without auto-filled definitions: %d / %d", missing, len(dd))

except Exception as e:
    logger.warning("Could not download/parse symbol table: %s", e)
    logger.warning("Proceeding with empty definitions; fill dd.csv manually.")

# Write dd.csv

dd_path = cfg.data_pkg_dir / "dd.csv"
dd.to_csv(dd_path, index=False)
logger.info("Wrote: %s", dd_path)

dd.head(20)

## 7) Optional: package integrity checks (recommended)

Before submission, it is good practice to:

- spot-check the curated directory content;
- record the total size;
- (optionally) generate a checksum manifest.

The cell below generates a simple `sha256sums.txt` file for all files in the package.
This is optional, but can help you verify upload integrity.


In [None]:
import hashlib

checksum_path = cfg.data_pkg_dir / "sha256sums.txt"

with checksum_path.open("w", encoding="utf-8") as f:
    for p in list_files(cfg.data_pkg_dir):
        # Skip checksum file itself to keep the manifest stable
        if p.name == checksum_path.name:
            continue

        h = hashlib.sha256()
        with p.open("rb") as fp:
            for chunk in iter(lambda: fp.read(1024 * 1024), b""):
                h.update(chunk)

        rel = p.relative_to(cfg.data_pkg_dir).as_posix()
        f.write(f"{h.hexdigest()}  {rel}\n")

logger.info("Wrote checksums: %s", checksum_path)
logger.info("Final package size: %s", human_bytes(dir_size_bytes(cfg.data_pkg_dir)))

checksum_path.read_text().splitlines()[:5]

## 8) Suggested next step: upload this ATS MDA package to ESS-DIVE

At this point, your curated ATS MDA directory (`cfg.data_pkg_dir`) should contain:

- The data files you intend to publish (e.g., model outputs, inputs needed for reproducibility)
- `flmd.csv` (File Level Metadata)
- `dd.csv` (Data Dictionary)
- (Optional) `checksums.sha256` if you ran the integrity-check cell

### Option A: Submit with the ESS-DIVE online form (recommended for small/medium datasets)

1. Create (or sign in to) your ESS-DIVE contributor account.
2. Start a new dataset submission using the **online data submission form**.
3. Upload the files in `cfg.data_pkg_dir`.
4. Fill in dataset-level metadata (title, abstract, authors, funding award numbers, temporal/spatial coverage, etc.).
5. Initiate the review/publish cycle to request a DOI.

Helpful ESS-DIVE docs:
- "Get Started" (contributing data)
- "Submit Data with Online Form"
- "Publish your Dataset"

### Option B: Programmatic submission with the ESS-DIVE Dataset API (recommended for large, repeated, or automated submissions)

If you plan recurring uploads (e.g., many parameter sweeps) or want automated pipelines:

1. Ensure your dataset metadata meets ESS-DIVE requirements.
2. Prepare a JSON-LD metadata document (per ESS-DIVE documentation).
3. Use the Dataset API to upload files from a local directory and submit metadata.

Helpful ESS-DIVE docs:
- "Submit Data with the Dataset API"
- "ESS-DIVE Dataset API" overview

### Practical recommendations before you upload

- Keep `flmd.csv` and `dd.csv` alongside your data files so ESS-DIVE tooling can parse and index your dataset.
- If you publish multiple observation files, consider generating a data dictionary per file or using wildcards in dd fields (per ESS-DIVE guidance).
- For very large uploads (multi-GB to TB scale), contact ESS-DIVE support to choose the best transfer mechanism.

### Optional: create a single archive for upload

Some users prefer to upload a single archive (and/or stage it somewhere for transfer):

```bash
cd "${DATA_PKG_DIR}"
tar -czf ats_mda_package.tar.gz .
```

Replace `${DATA_PKG_DIR}` with your `cfg.data_pkg_dir` path.


## 9) Create per-subdirectory tarballs for transfer / upload (with automatic splitting > 5 GB)

Large ATS MDA directories can be difficult to upload as a single payload, especially when working over slow links.
A common practice is to archive each *top-level subdirectory* (e.g., `run0/`, `run1/`, `run2/`) into its own compressed tarball:

- `run0.tar.gz`
- `run1.tar.gz`
- `run2.tar.gz`

This section:

1. Creates `*.tar.gz` archives **for each immediate subdirectory** under `cfg.data_pkg_dir`.
2. Reports the size of each archive.
3. If an archive exceeds **5 GiB**, splits it into `5G` parts using `split`, producing files like:
   - `run0.tar.gz.part001`, `run0.tar.gz.part002`, ...

You can upload either:

- the intact `*.tar.gz` files (when they are small enough), or
- the `*.partNNN` files (for large archives), and reconstruct them later.


In [None]:
# --- Tarball + optional split utilities ---

def make_tar_gz_per_subdir(
    mda_dir: Path,
    *,
    output_dir: Optional[Path] = None,
    max_part_gib: int = 5,
    overwrite: bool = False,
    split_large_archives: bool = True,
    remove_original_after_split: bool = False,
) -> pd.DataFrame:
    """Create one .tar.gz per immediate subdirectory of ``mda_dir``.

    This is useful when you want to transfer or upload a large ATS MDA package in
    smaller, logically grouped pieces (e.g., one archive per ``runX/`` directory).

    Parameters
    ----------
    mda_dir
        The ATS MDA directory (e.g., ``cfg.data_pkg_dir``).
    output_dir
        Where to write archives. Defaults to ``mda_dir``.
    max_part_gib
        Split threshold and chunk size, in GiB. Default: 5.
    overwrite
        If True, overwrite existing ``*.tar.gz`` archives *and* any existing split parts.
    split_large_archives
        If True, archives larger than ``max_part_gib`` will be split with ``split -b {max_part_gib}G``.
    remove_original_after_split
        If True, delete the original ``*.tar.gz`` after splitting (saves space, but destructive).

    Returns
    -------
    pandas.DataFrame
        Summary table with sizes and split outputs.

    Notes
    -----
    - This function archives *each immediate subdirectory* under ``mda_dir``.
      Files located directly in ``mda_dir`` (e.g., ``flmd.csv`` and ``dd.csv``)
      are **not** included in these per-subdirectory archives.
    - The splitting step prefers GNU ``split`` flags to create ``.part001`` style
      suffixes. If those flags are not supported, it falls back to POSIX ``split``
      and renames outputs to ``.part001``, ``.part002``, ...
    """

    def _remove_existing_parts(out_dir: Path, tar_name: str) -> None:
        for p in out_dir.glob(tar_name + ".part*"):
            try:
                p.unlink()
            except Exception as e:
                logger.warning("Could not remove existing part %s: %s", p, e)

    def _split_archive(tar_path: Path, out_dir: Path) -> List[Path]:
        """Split ``tar_path`` into ``max_part_gib`` chunks, returning the part file paths."""
        prefix = str(tar_path) + ".part"  # desired prefix -> <tar>.part001

        # If parts already exist and we're not overwriting, reuse them.
        existing = sorted(out_dir.glob(tar_path.name + ".part*"))
        if existing and not overwrite:
            logger.info("Split parts already exist (reusing): %s (%d parts)", tar_path.name, len(existing))
            return existing

        # If overwriting, clean up old parts first.
        if existing and overwrite:
            _remove_existing_parts(out_dir, tar_path.name)

        # Preferred GNU split invocation.
        gnu_cmd = [
            split_exe,
            "-b",
            f"{max_part_gib}G",
            "--numeric-suffixes=1",
            "--suffix-length=3",
            str(tar_path),
            prefix,
        ]

        try:
            run_cmd(gnu_cmd)
            parts = sorted(out_dir.glob(tar_path.name + ".part*"))
            if parts:
                return parts
        except subprocess.CalledProcessError as e:
            logger.warning(
                "GNU-style split flags not supported on this system; falling back to POSIX split. Error: %s",
                str(e),
            )

        # POSIX fallback: split -b 5G <file> <prefix>
        posix_cmd = [split_exe, "-b", f"{max_part_gib}G", str(tar_path), prefix]
        run_cmd(posix_cmd)

        # POSIX split produces suffixes like aa, ab, ac... Rename to numeric partNNN.
        raw_parts = sorted(out_dir.glob(tar_path.name + ".part*"))
        if not raw_parts:
            raise RuntimeError(f"Split reported success but no part files found for: {tar_path}")

        # Rename deterministically in lexical order.
        renamed = []
        for i, p in enumerate(raw_parts, start=1):
            new = p.with_name(f"{tar_path.name}.part{i:03d}")
            if new.exists() and not overwrite:
                raise FileExistsError(
                    f"Refusing to overwrite existing part file: {new}. Set overwrite=True to replace."
                )
            p.rename(new)
            renamed.append(new)

        return renamed

    mda_dir = Path(mda_dir)
    if output_dir is None:
        output_dir = mda_dir
    output_dir = Path(output_dir)

    if not mda_dir.exists():
        raise FileNotFoundError(f"MDA directory does not exist: {mda_dir}")

    ensure_dir(output_dir)

    tar_exe = which("tar")
    if not tar_exe:
        raise RuntimeError("Required executable 'tar' not found on PATH.")

    split_exe = which("split")
    if split_large_archives and not split_exe:
        raise RuntimeError("Requested splitting, but required executable 'split' not found on PATH.")

    max_bytes = int(max_part_gib) * 1024**3

    subdirs = sorted([p for p in mda_dir.iterdir() if p.is_dir() and not p.name.startswith(".")])
    if not subdirs:
        raise FileNotFoundError(f"No subdirectories found under {mda_dir} (nothing to archive).")

    rows = []

    for sd in subdirs:
        tar_path = output_dir / f"{sd.name}.tar.gz"

        # Optionally clean up existing tar + parts if overwriting.
        if overwrite:
            if tar_path.exists():
                try:
                    tar_path.unlink()
                except Exception as e:
                    raise RuntimeError(f"Could not remove existing archive {tar_path}: {e}")
            _remove_existing_parts(output_dir, tar_path.name)

        if tar_path.exists() and not overwrite:
            logger.info("Archive exists (skipping creation): %s", tar_path)
        else:
            # Create archive with a stable relative layout: <subdir>/...
            # -C mda_dir ensures tar stores paths relative to mda_dir.
            cmd = [tar_exe, "-czf", str(tar_path), "-C", str(mda_dir), sd.name]
            logger.info("Creating archive: %s", tar_path)
            run_cmd(cmd)

        if not tar_path.exists():
            raise FileNotFoundError(f"Archive was not created: {tar_path}")

        size_bytes = tar_path.stat().st_size
        did_split = False
        part_files: List[Path] = []

        if split_large_archives and size_bytes > max_bytes:
            did_split = True
            logger.info(
                "Archive > %d GiB (%s). Splitting into %d GiB parts: %s",
                max_part_gib,
                human_bytes(size_bytes),
                max_part_gib,
                tar_path.name,
            )

            part_files = _split_archive(tar_path, output_dir)

            if remove_original_after_split:
                logger.info("Removing original archive after split: %s", tar_path)
                tar_path.unlink()

        rows.append(
            {
                "subdir": sd.name,
                "archive": tar_path.name,
                "archive_path": str(tar_path),
                "archive_size_bytes": size_bytes,
                "archive_size": human_bytes(size_bytes),
                "split": did_split,
                "num_parts": len(part_files),
                "parts": ", ".join([p.name for p in part_files]) if part_files else "",
            }
        )

    df = pd.DataFrame(rows).sort_values(["split", "archive_size_bytes"], ascending=[True, False])
    return df


# --- Run packaging on your curated MDA directory ---

df_archives = make_tar_gz_per_subdir(
    cfg.data_pkg_dir,
    output_dir=cfg.data_pkg_dir,
    max_part_gib=5,
    overwrite=False,
    split_large_archives=True,
    remove_original_after_split=False,
)

# Report sizes clearly
pd.set_option("display.max_colwidth", 140)
logger.info(
    "Archive summary\n%s",
    df_archives[["subdir", "archive", "archive_size", "split", "num_parts"]].to_string(index=False),
)

# Display as a table in the notebook
df_archives


### Reconstructing a split archive (`*.partNNN` → `*.tar.gz`)

If a tarball was split (e.g., `run0.tar.gz.part001`, `run0.tar.gz.part002`, ...), you can reassemble it on a Linux/HPC system as follows:

1. **Ensure all parts are present in the same directory**.
2. Reconstruct the tarball (the numeric suffix padding ensures correct ordering):

```bash
cat run0.tar.gz.part* > run0.tar.gz
```

3. (Optional) Verify the reconstructed archive:

```bash
gzip -t run0.tar.gz
# or list a few entries
tar -tzf run0.tar.gz | head
```

4. Extract it:

```bash
tar -xzf run0.tar.gz
```

Notes:

- If you are transferring over a network, consider also transferring the `sha256sums.txt` manifest (Section 7) and verifying checksums after reconstruction.
- If you set `remove_original_after_split=True`, only the parts will remain (saving scratch space). If you keep the original tarball, you can upload either the whole tarball or its parts—do not upload both unless you intend to.
