# AORTA FSDP2 Training Launcher

Use this notebook to orchestrate AORTA's multi-stream FSDP2 workload from a Jupyter session. It wraps the existing CLI so you can launch short sanity runs, full-scale multi-GPU jobs, and cross-accelerator experiments directly from the notebook interface.


## Workflow Overview

1. Run the setup cell to register the repository package path and inspect the active accelerator.
2. Adjust the configuration path or overrides to match the experiment you want to execute.
3. Invoke one of the helper functions:
   - `run_training_single_process(...)` for quick smoke tests on a single GPU or CPU inside the notebook kernel.
   - `run_training_torchrun(...)` to launch distributed jobs (NVIDIA/AMD) via `torchrun` while still streaming logs back into the notebook.
4. Optionally analyse the produced artefacts (logs, JSONL traces) with the reporting utilities once a run completes.

Each helper accepts the same CLI arguments as `train.py`, so you can reuse your existing YAML configs and override syntax.


In [7]:
# --- Environment bootstrap -------------------------------------------------
import os
import sys
import json
import shlex
import subprocess
from pathlib import Path

try:
    import torch
except ImportError:  # pragma: no cover - defensive fallback
    torch = None


def _find_repo_root(start: Path) -> Path:
    markers = [start] + list(start.parents)
    for candidate in markers:
        if (candidate / 'config' / 'default.yaml').exists():
            return candidate
        if (candidate / '.git').exists() and (candidate / 'src').exists():
            return candidate
    return start


REPO_ROOT = _find_repo_root(Path.cwd().resolve())
SRC_ROOT = REPO_ROOT / 'src'
if SRC_ROOT.exists() and str(SRC_ROOT) not in sys.path:
    sys.path.insert(0, str(SRC_ROOT))

print(f"Repository root: {REPO_ROOT}")
print(f"Python executable: {sys.executable}")

if torch is None:
    print("PyTorch import failed; install torch before running training.")
else:
    accelerator = 'cpu'
    device_name = 'CPU'
    if torch.cuda.is_available():
        accelerator = 'nvidia'
        if getattr(torch.version, 'hip', None):
            accelerator = 'amd'
        try:
            device_name = torch.cuda.get_device_name(0)
        except Exception:  # pragma: no cover - query best effort
            device_name = 'Unknown GPU'
        device_count = torch.cuda.device_count()
    else:
        device_count = 0
    print(f"Detected accelerator: {accelerator} | device_count={device_count} | primary_device={device_name}")


Repository root: /manrao/jsr_perf_max/aorta
Python executable: /opt/conda/envs/py_3.10/bin/python
Detected accelerator: amd | device_count=8 | primary_device=AMD Instinct MI350X


In [8]:
import torch
torch.__version__

'2.8.0a0+git0bf8d8e'

## Inspect / Edit Configuration

The helpers expect a valid config file (default: `config/default.yaml`). Update the path or override dictionary in the next cell to customise hyperparameters, profiling options, or output directory for your run.


In [9]:

import yaml

CONFIG_PATH = REPO_ROOT / "config" / "default.yaml"
print(f"Using config: {CONFIG_PATH}")

with CONFIG_PATH.open("r", encoding="utf-8") as handle:
    base_config = yaml.safe_load(handle)

# Display the top-level keys for quick reference
print("Top-level config sections:", ", ".join(base_config.keys()))


Using config: /manrao/jsr_perf_max/aorta/config/default.yaml
Top-level config sections: logging, training, optimizer, scheduler, dataset, model, fsdp, compile, dataloader, profiling


## Training Launch Helpers

The functions below wrap `train.py` with robust environment setup and argument handling. They default to short runs (`max_steps=5`) to keep notebook executions lightweightâ€”update or remove the overrides as needed for full-scale experiments.


In [12]:
import os
import shlex
import subprocess
import sys
from pathlib import Path
from typing import Mapping, Optional, Sequence, Union

if 'SRC_ROOT' not in globals():
    notebook_root = Path.cwd().resolve()
    guessed_src = notebook_root / 'src'
    SRC_ROOT = guessed_src if guessed_src.exists() else notebook_root
if 'CONFIG_PATH' not in globals():
    repo_root = Path.cwd().resolve()
    default_config = repo_root / 'config' / 'default.yaml'
    CONFIG_PATH = default_config if default_config.exists() else repo_root / 'config.yaml'


def _normalise_overrides(overrides: Optional[Union[Mapping[str, Union[str, int, float, bool]], Sequence[str]]]) -> Sequence[str]:
    """Convert override inputs into CLI-friendly dotted assignments."""

    if overrides is None:
        return []
    if isinstance(overrides, Mapping):
        items = []
        for key, value in overrides.items():
            if isinstance(value, bool):
                val_str = 'true' if value else 'false'
            else:
                val_str = str(value)
            items.append(f"{key}={val_str}")
        return sorted(items)
    return list(overrides)


def _prepare_env(extra_env: Optional[Mapping[str, str]] = None) -> dict[str, str]:
    env = os.environ.copy()
    pythonpath = env.get('PYTHONPATH', '')
    parts = [str(SRC_ROOT)] + ([pythonpath] if pythonpath else [])
    env['PYTHONPATH'] = os.pathsep.join(parts)
    if extra_env:
        env.update(extra_env)
    return env


def run_training_single_process(
    config_path: Union[str, Path] = CONFIG_PATH,
    overrides: Optional[Union[Mapping[str, Union[str, int, float, bool]], Sequence[str]]] = None,
    *,
    enable_rocm_metrics: bool = False,
    env: Optional[Mapping[str, str]] = None,
):
    """Execute `train.py` inside the notebook kernel using rank/world size = 1."""

    override_args = _normalise_overrides(overrides)
    cmd: list[str] = [sys.executable, '../train.py', '--config', str(config_path)]
    for item in override_args:
        cmd.extend(['--override', item])
    if enable_rocm_metrics:
        cmd.append('--enable-rocm-metrics')

    base_env = {
        'MASTER_ADDR': '127.0.0.1',
        'MASTER_PORT': os.environ.get('MASTER_PORT', '29500'),
        'RANK': '0',
        'WORLD_SIZE': '1',
        'LOCAL_RANK': '0',
    }

    complete_env = _prepare_env(base_env)
    if env:
        complete_env.update(env)

    print('Launching single-process training: ' + ' '.join(shlex.quote(token) for token in cmd))
    return subprocess.run(cmd, env=complete_env, check=False)


def run_training_torchrun(
    *,
    num_processes: int,
    config_path: Union[str, Path] = CONFIG_PATH,
    overrides: Optional[Union[Mapping[str, Union[str, int, float, bool]], Sequence[str]]] = None,
    enable_rocm_metrics: bool = False,
    extra_torchrun_args: Optional[Sequence[str]] = None,
    env: Optional[Mapping[str, str]] = None,
):
    """Launch distributed training via torchrun while streaming stdout/stderr into the notebook."""

    override_args = _normalise_overrides(overrides)
    cmd: list[str] = [
        'torchrun',
        '--standalone',
        '--nproc_per_node',
        str(num_processes),
    ]
    if extra_torchrun_args:
        cmd.extend(list(extra_torchrun_args))
    cmd.extend(['train.py', '--config', str(config_path)])
    for item in override_args:
        cmd.extend(['--override', item])
    if enable_rocm_metrics:
        cmd.append('--enable-rocm-metrics')

    complete_env = _prepare_env(env)

    print('Launching torchrun job: ' + ' '.join(shlex.quote(token) for token in cmd))
    return subprocess.run(cmd, env=complete_env, check=False)


## Example: Quick Smoke Test (Single GPU)

Uncomment or adjust the cell below to execute a short (5-step) run directly within the notebook. Increase `training.max_steps` or remove the overrides for longer profiling sessions.


In [13]:

# Example overrides tailored for a fast validation run.
quick_overrides = {
    "training.max_steps": 5,
    "training.log_interval": 1,
    "training.output_dir": "notebook_artifacts",
}

# Set `run = True` when you're ready to launch.
run = True

if run:
    result = run_training_single_process(
        overrides=quick_overrides,
        enable_rocm_metrics=False,
    )
    print(f"Process exited with return code {result.returncode}")
else:
    print("Set `run = True` to kick off the sample training run.")


Launching single-process training: /opt/conda/envs/py_3.10/bin/python ../train.py --config /manrao/jsr_perf_max/aorta/config/default.yaml --override training.log_interval=1 --override training.max_steps=5 --override training.output_dir=notebook_artifacts


2025-09-30 22:57:58,424 | INFO | aorta.training.fsdp_trainer | Initialised distributed training | backend=nccl rank=0 world=1 local_rank=0 device=cuda:0
2025-09-30 22:58:01,189 | INFO | aorta.training.fsdp_trainer | epoch=0 step=0 loss=0.35691 lr=0.000006 overlap=0.552ms compute=1215.788ms
2025-09-30 22:58:01,313 | INFO | aorta.training.fsdp_trainer | epoch=0 step=5 loss=0.34644 lr=0.000021 overlap=0.277ms compute=18.715ms
2025-09-30 22:58:01,430 | INFO | aorta.training.fsdp_trainer | epoch=0 step=10 loss=0.33926 lr=0.000036 overlap=0.300ms compute=18.395ms
2025-09-30 22:58:01,539 | INFO | aorta.training.fsdp_trainer | epoch=0 step=15 loss=0.35623 lr=0.000051 overlap=0.308ms compute=18.479ms
2025-09-30 22:58:01,647 | INFO | aorta.training.fsdp_trainer | epoch=0 step=20 loss=0.33731 lr=0.000066 overlap=0.321ms compute=18.776ms
2025-09-30 22:58:01,755 | INFO | aorta.training.fsdp_trainer | epoch=0 step=25 loss=0.34871 lr=0.000081 overlap=0.297ms compute=18.563ms
2025-09-30 22:58:01,863 |

Process exited with return code 0


## Example: Multi-GPU Launch with torchrun

Use this helper when you want the notebook to orchestrate a full distributed job across all visible GPUs. Ensure that the notebook kernel is running on the head node with access to the target devices.


In [15]:

# Example torchrun invocation (edit `num_processes` to match your GPU count).
# To enable ROCm metrics collection add `enable_rocm_metrics=True`.

multi_gpu_overrides = {
    "training.max_steps": 50,
    "training.output_dir": "notebook_artifacts_multi",
}

# Change to True when you want to launch.
launch_multi_gpu = False

if launch_multi_gpu:
    if torch is None:
        raise RuntimeError("PyTorch is required to discover device count before launching torchrun.")
    gpu_count = torch.cuda.device_count() if torch.cuda.is_available() else 1
    result = run_training_torchrun(
        num_processes=gpu_count,
        overrides=multi_gpu_overrides,
        extra_torchrun_args=("--rdzv_backend", "c10d"),
    )
    print(f"torchrun exited with return code {result.returncode}")
else:
    print("Set `launch_multi_gpu = True` to start the distributed run.")


Set `launch_multi_gpu = True` to start the distributed run.


## Next Steps

- Inspect the generated artefacts (logs, JSONL timelines, traces) inside the configured `training.output_dir`.
- Use `analysis/overlap_report.py` from the notebook (for example `!python analysis/overlap_report.py ...`) to compare ROCm vs CUDA runs once you have both datasets.
- Update the overrides to toggle profiling features (`profiling.enabled`, `profiling.chrome_trace`, etc.) or to adjust model scale for stress testing.
