# Notebook 1 — HyperPod + NeMo Capability Map (Executable)


What to prove :
- The Space can see and write to the hyperpod shared filessytem **FSx for Lustre**.
- A HyperPod training pod can mount that same FSx path and write back to it.
- The cluster has the add-ons we rely on (Training Operator + Task Governance/Kueue).
- The **customised images** for this cluster is discoverable and used for jobs.

## 0) Inputs, toggles, and fail-fast checks

**Do this first:**
- In SMUS, open the HyperPod connection panel.
- Copy the **HyperPod cluster name** and the **project S3 bucket/path**.
- Paste them into `HYPERPOD_CLUSTER_NAME` 

In [1]:
%pip install -Uq sagemaker-hyperpod==3.5.0 datasets fsspec==2023.12.2 transformers huggingface-hub 

Note: you may need to restart the kernel to use updated packages.


In [2]:
from datetime import datetime
from pathlib import Path
import os
import re
import uuid

def require(cond, msg):
    if not cond:
        raise ValueError(msg)

AWS_ACCOUNT_ID_RE = re.compile(r'\b\d{12}\b')
AWS_ARN_RE = re.compile(r'arn:aws:[^\s\"\']+')

HF_ACCESS_TOKEN_RE = re.compile(r'(recipes\.run\.hf_access_token=)(\S+)')
HF_ENV_TOKEN_RE = re.compile(r'(\+env_vars\.HF_TOKEN=)(\S+)')
HF_BARE_TOKEN_RE = re.compile(r'(\bHF_TOKEN=)(\S+)')

def rdct_sens(value):
    if value is None:
        return value
    text = rdct_aws(value)
    text = HF_ACCESS_TOKEN_RE.sub(r'\1<HF_TOKEN>', text)
    text = HF_ENV_TOKEN_RE.sub(r'\1<HF_TOKEN>', text)
    text = HF_BARE_TOKEN_RE.sub(r'\1<HF_TOKEN>', text)
    return text


def rdct_aws(value):
    if value is None:
        return value
    text = str(value)
    text = AWS_ARN_RE.sub('<ARN>', text)
    text = AWS_ACCOUNT_ID_RE.sub('<ACCOUNT_ID>', text)
    return text

import boto3
# --- USER INPUTS (paste from HyperPod connection panel) ---
REGION = ''  # paste from HyperPod connection panel (optional; auto-detect if empty)
HYPERPOD_CLUSTER_NAME = os.environ.get('HYPERPOD_CLUSTER_NAME', 'nemo-hyperpod-drdwirm8n64vqf').strip()  # e.g., nemo-hyperpod-xxxx

# --- REGION AUTO-DETECT (if empty) ---
if not REGION:
    REGION = (os.environ.get('AWS_REGION') or os.environ.get('AWS_DEFAULT_REGION') or '').strip()
if not REGION:
    try:
        REGION = boto3.Session().region_name or ''
    except Exception:
        REGION = ''

# --- DERIVED DEFAULTS ---
RUN_ID = datetime.utcnow().strftime('%Y%m%d-%H%M%S') + '-' + uuid.uuid4().hex[:6]
REPOS_DIR = str(Path.home() / 'smus-repos')

# --- SECTION TOGGLES (set False to skip) ---
RUN_VALIDATE_ENV = True
RUN_FSX_CHECKS = False
RUN_KUEUE_TOPOLOGY = True
RUN_KUBECTL_CONTEXT = True

# --- FAIL-FAST VALIDATION ---
require(REGION.strip(), 'Missing REGION. Paste it from HyperPod connection panel or set AWS_REGION.')
require(HYPERPOD_CLUSTER_NAME.strip(), 'Missing HYPERPOD_CLUSTER_NAME. Paste it from HyperPod connection panel.')

print('Inputs:')
print(f'  HYPERPOD_CLUSTER_NAME = {rdct_aws(HYPERPOD_CLUSTER_NAME)}')
print('Derived:')

print(f'  REPOS_DIR            = {REPOS_DIR}')
print('Toggles:')
print({
    'RUN_VALIDATE_ENV': RUN_VALIDATE_ENV,
    'RUN_FSX_CHECKS': RUN_FSX_CHECKS,
    'RUN_KUEUE_TOPOLOGY': RUN_KUEUE_TOPOLOGY,
    'RUN_KUBECTL_CONTEXT': RUN_KUBECTL_CONTEXT,
})

Inputs:
  HYPERPOD_CLUSTER_NAME = nemo-hyperpod-drdwirm8n64vqf
Derived:
  REPOS_DIR            = /home/sagemaker-user/smus-repos
Toggles:
{'RUN_VALIDATE_ENV': True, 'RUN_FSX_CHECKS': False, 'RUN_KUEUE_TOPOLOGY': True, 'RUN_KUBECTL_CONTEXT': True}


## 1) Base imports + notebook utilities

Before we touch the cluster, we set up a few tiny helpers used throughout the notebook:

- `print_header(...)` makes logs readable when you scroll.
- `run(...)` executes shell commands and surfaces stdout/stderr on failure.
- `ensure_dir(...)` creates local folders (for repo clones, temp files, etc.).

Just some plumbing for the rest of the notebook

In [3]:
import json
import os
import subprocess
import textwrap
import time
from pathlib import Path

def print_header(title: str):
    bar = '=' * len(title)
    print(f'\n{title}\n{bar}')

def run(cmd, check=True, capture=True):
    """Run a shell command and return (returncode, stdout)."""
    result = subprocess.run(
        cmd,
        shell=True,
        check=False,
        text=True,
        stdout=subprocess.PIPE if capture else None,
        stderr=subprocess.STDOUT if capture else None,
    )
    if check and result.returncode != 0:
        raise RuntimeError(f'Command failed ({result.returncode}): {cmd}\n{result.stdout}')
    return result.returncode, (result.stdout or '').strip()

def get_hyp_job_status(job_name, namespace):
    if not job_name:
        return None, []
    cmd = f"kubectl get hyp-pytorch-job {job_name} -n {namespace} -o json"
    rc, out = run(cmd, check=False)
    if rc != 0 or not out:
        return None, []
    try:
        data = json.loads(out)
    except Exception:
        return None, []
    status = data.get('status', {}) or {}
    phase = status.get('phase') or status.get('jobStatus') or status.get('status')
    conds = [(c.get('type'), c.get('status')) for c in status.get('conditions', []) or []]
    return phase, conds

def _parse_hyp_list_line(line, namespace):
    if not line or not line.strip():
        return None
    if line.strip().startswith('---') or line.strip().startswith('NAME'):
        return None
    if namespace and namespace in line:
        idx = line.find(namespace)
        name = line[:idx].strip()
        rest = line[idx + len(namespace):].strip()
        tokens = rest.split()
        status = tokens[0] if tokens else None
        age = tokens[1] if len(tokens) > 1 else None
        return name, status, age, line
    parts = line.split()
    if not parts:
        return None
    name = parts[0]
    status = None
    age = None
    if namespace and len(parts) > 2 and parts[1] == namespace:
        status = parts[2]
        age = parts[3] if len(parts) > 3 else None
    else:
        status = parts[2] if len(parts) > 2 else (parts[1] if len(parts) > 1 else None)
        age = parts[-1] if len(parts) > 1 else None
    return name, status, age, line

def get_hyp_list_status(job_name, namespace):
    if not job_name:
        return None, None, None
    cmd = f"hyp list hyp-pytorch-job -n {namespace}"
    try:
        result = subprocess.run(['bash', '-lc', cmd], text=True, capture_output=True)
    except Exception:
        return None, None, None
    out = result.stdout or ''
    if result.returncode != 0 or not out:
        return None, None, None
    for line in out.splitlines():
        parsed = _parse_hyp_list_line(line, namespace)
        if not parsed:
            continue
        name, status, age, raw = parsed
        if name == job_name:
            return status, age, raw
    return None, None, None
def ensure_dir(path: str | Path):
    p = Path(path)
    p.mkdir(parents=True, exist_ok=True)
    return p

print_header('Environment summary')
print('cwd:', os.getcwd())
print('user:', os.environ.get('USER', 'unknown'))
print('python:', run('python --version')[1])

# Ensure local repo workspace exists
try:
    REPOS_DIR
except NameError:
    REPOS_DIR = str(Path.home() / 'smus-repos')
    print(f'Repos dir not set yet; defaulting to {REPOS_DIR}')

ensure_dir(REPOS_DIR)
print('REPOS_DIR ready:', REPOS_DIR)



Environment summary
cwd: /home/sagemaker-user/bobber/notebooks
user: sagemaker-user
python: Python 3.11.11
REPOS_DIR ready: /home/sagemaker-user/smus-repos


## 2) FSx for Lustre detection (shared storage: Space path <-> pod path)

HyperPod needs a **shared, POSIX filesystem** so *every* training pod can **read the same data** and **write to the same checkpoint/artifact directory** across nodes (critical for restarts/resume). ([AWS Documentation][1])
This is especially important for **NeMo/Megatron distributed checkpointing**: NeMo’s *Fully Parallel Saving* has each data-parallel rank write its checkpoint shard **independently to shared storage** (many concurrent writers). ([NVIDIA Docs][2])

We use **FSx for Lustre** because it’s a fully managed, **POSIX-compliant parallel filesystem** optimized for low-latency / high-throughput GPU workloads, and it integrates with SageMaker HyperPod (and can link to S3 as a data repository). ([Amazon Web Services, Inc.][3])

In this project, we also mount the same FSx into the **Studio Space**, so the *same* filesystem shows up under different paths:

* **Space:** `~/custom-file-systems/fsx_lustre/<fs-id>` (or `/mnt/...`)
* **HyperPod pods:** a fixed mount like `/fsx/...` (often via a PVC) with `subPath=<fs-id>`

This cell detects the FSx mount **as the Space sees it**, then derives the values we plug into pod specs:
`FSX_SPACE_ROOT`, `FSX_SPACE_REAL`, `FSX_FILE_SYSTEM_ID (fs-...)`, `FSX_POD_PREFIX`, `FSX_PVC_SUBPATH`.

**Success looks like:** at least one Lustre mount is found, the resolved FSx path exists, and the derived ID starts with `fs-`.

[1]: https://docs.aws.amazon.com/sagemaker/latest/dg/sagemaker-hyperpod-lifecycle-best-practices-slurm-slurm-setup-with-fsx.html "Mounting Amazon FSx for Lustre to a HyperPod cluster - Amazon SageMaker AI"
[2]: https://docs.nvidia.com/nemo-framework/user-guide/latest/nemotoolkit/checkpoints/dist_ckpt.html "NeMo Distributed Checkpoint User Guide — NVIDIA NeMo Framework User Guide"
[3]: https://aws.amazon.com/fsx/lustre/ "Amazon FSx for Lustre | Cloud File Storage Integrated with S3 | AWS"


In [4]:
import json
import os
import re
from pathlib import Path

def detect_lustre_mounts():
    mounts = []
    with open('/proc/mounts', 'r', encoding='utf-8') as f:
        for line in f:
            parts = line.split()
            if len(parts) < 3:
                continue
            src, mnt, fstype = parts[:3]
            if fstype.lower() == 'lustre':
                mounts.append({"source": src, "mount": mnt, "fstype": fstype})
    return mounts

def find_fsx_lustre_space_paths():
    bases = [
        Path.home() / 'custom-file-systems' / 'fsx_lustre',
        Path('/home/sagemaker-user/custom-file-systems/fsx_lustre'),
        Path('/mnt/custom-file-systems/fsx_lustre'),
    ]
    candidates = []
    for base in bases:
        if not base.exists():
            continue
        for p in base.iterdir():
            if p.is_dir() and re.match(r'^fs-[0-9a-f]+$', p.name):
                candidates.append(p)
    # Deduplicate while keeping order
    uniq = []
    seen = set()
    for p in candidates:
        k = str(p)
        if k in seen:
            continue
        seen.add(k)
        uniq.append(p)
    return uniq

lustre_mounts = detect_lustre_mounts()
print('Lustre mounts (from /proc/mounts):')
print(json.dumps(lustre_mounts, indent=2) if lustre_mounts else '(none)')

fsx_candidates = find_fsx_lustre_space_paths()
print('FSx Lustre candidates (as seen in the Space):')
print('\n'.join(str(p) for p in fsx_candidates) if fsx_candidates else '(none)')

FSX_ID_HINT = (os.environ.get('FSX_ID') or os.environ.get('FSX_FILE_SYSTEM_ID') or '').strip()
chosen_space_path = None
if FSX_ID_HINT:
    for p in fsx_candidates:
        if p.name == FSX_ID_HINT:
            chosen_space_path = p
            break
if not chosen_space_path and fsx_candidates:
    home_base = str(Path.home())
    chosen_space_path = next((p for p in fsx_candidates if str(p).startswith(home_base)), fsx_candidates[0])

if not chosen_space_path:
    require(lustre_mounts, 'No Lustre mounts detected and no FSx custom-file-systems path found. Is FSx attached?')
    chosen = next((m for m in lustre_mounts if 'fsx' in (m['source'].lower() + ' ' + m['mount'].lower())), lustre_mounts[0])
    FSX_SPACE_ROOT = Path(chosen['mount']).resolve()
    FSX_SPACE_REAL = FSX_SPACE_ROOT
else:
    FSX_SPACE_ROOT = chosen_space_path
    FSX_SPACE_REAL = Path(os.path.realpath(FSX_SPACE_ROOT)).resolve()

require(FSX_SPACE_ROOT.exists(), f'FSx Space path does not exist: {FSX_SPACE_ROOT}')
print(f'FSx path (Space-visible): {FSX_SPACE_ROOT}')
print(f'FSx path (resolved):      {FSX_SPACE_REAL}')

FSX_FILE_SYSTEM_ID = FSX_SPACE_ROOT.name if FSX_SPACE_ROOT.name.startswith('fs-') else ''
if not FSX_FILE_SYSTEM_ID and chosen_space_path:
    FSX_FILE_SYSTEM_ID = chosen_space_path.name
print(f"FSX_FILE_SYSTEM_ID={FSX_FILE_SYSTEM_ID or '(unknown)'}")

FSX_FILE_SYSTEM_PATH = os.environ.get('FSX_FILE_SYSTEM_PATH', '').strip() or f'/{FSX_FILE_SYSTEM_ID}'
if FSX_FILE_SYSTEM_PATH and not FSX_FILE_SYSTEM_PATH.startswith('/'):
    FSX_FILE_SYSTEM_PATH = f'/{FSX_FILE_SYSTEM_PATH}'
print(f'FSX_FILE_SYSTEM_PATH={FSX_FILE_SYSTEM_PATH}')

FSX_POD_PREFIX = '/fsx'
FSX_PVC_SUBPATH = os.environ.get('FSX_PVC_SUBPATH', '').strip() or FSX_FILE_SYSTEM_PATH.lstrip('/')
require(FSX_PVC_SUBPATH, 'Could not determine FSX_PVC_SUBPATH; set env var FSX_PVC_SUBPATH')
print(f'FSX_POD_PREFIX={FSX_POD_PREFIX}')
print(f'FSX_PVC_SUBPATH={FSX_PVC_SUBPATH}')

# Ensure /fsx/<subpath> exists in the Space for local staging (pods still use /fsx)
pod_fsx_root = Path(FSX_POD_PREFIX) / FSX_PVC_SUBPATH
if not pod_fsx_root.exists():
    try:
        import subprocess
        subprocess.run(['sudo', 'mkdir', '-p', FSX_POD_PREFIX], check=True)
        subprocess.run(['sudo', 'ln', '-s', str(FSX_SPACE_REAL), str(pod_fsx_root)], check=True)
    except Exception as e:
        print(f'WARNING: could not create local alias {pod_fsx_root} -> {FSX_SPACE_REAL}: {e}')
require(pod_fsx_root.exists(), f'Local pod FSx path not available: {pod_fsx_root}. You may need to create a /fsx symlink with sudo.')

FSX_BASE_DIRNAME = 'smus-nemo-smoke'
FSX_BASE_DIR = FSX_SPACE_ROOT / FSX_BASE_DIRNAME
FSX_RUN_DIR = None

print(f'FSX_BASE_DIR (intended): {FSX_BASE_DIR}')
print(f'Writable now? {os.access(FSX_SPACE_ROOT, os.W_OK)} (root), {os.access(FSX_BASE_DIR.parent, os.W_OK)} (parent)')


Lustre mounts (from /proc/mounts):
[
  {
    "source": "10.38.246.180@tcp:/gu7rfamv/fs-03b1953d09801303c",
    "mount": "/mnt/custom-file-systems/fsx_lustre/fs-03b1953d09801303c",
    "fstype": "lustre"
  }
]
FSx Lustre candidates (as seen in the Space):
/home/sagemaker-user/custom-file-systems/fsx_lustre/fs-03b1953d09801303c
/mnt/custom-file-systems/fsx_lustre/fs-03b1953d09801303c
FSx path (Space-visible): /home/sagemaker-user/custom-file-systems/fsx_lustre/fs-03b1953d09801303c
FSx path (resolved):      /mnt/custom-file-systems/fsx_lustre/fs-03b1953d09801303c
FSX_FILE_SYSTEM_ID=fs-03b1953d09801303c
FSX_FILE_SYSTEM_PATH=/fs-03b1953d09801303c
FSX_POD_PREFIX=/fsx
FSX_PVC_SUBPATH=fs-03b1953d09801303c
FSX_BASE_DIR (intended): /home/sagemaker-user/custom-file-systems/fsx_lustre/fs-03b1953d09801303c/smus-nemo-smoke
Writable now? True (root), True (parent)


## 3) Custom container discovery (from SSM)

In this project we build a LLMFT llama recipe based training image and publish it to **ECR** (for example, adding EFA + AWS-OFI-NCCL bits for high-performance multi-node training).

We store the current image URI in SSM at:

`/sagemaker/hyperpod/<cluster-name>/llmft-container-uri`

This cell looks up that value and sets `LLMFT_IMAGE_CUSTOM`, which is the image we pass into `HyperPodPytorchJob` specs.

**Override option:** if you’re testing a custom image, set the `LLMFT_IMAGE_CUSTOM` environment variable and this section will use it instead. Dockerfile for image is in llmft-container-pipeline/Dockerfile

In [5]:
import boto3

# --- Discover the NeMo container URI from SSM (best-effort) ---
# Stored at: /sagemaker/hyperpod/<cluster-name>/nemo-container-uri
ssm = boto3.client('ssm', region_name=REGION)

nemo_param_names = []
paginator = ssm.get_paginator('get_parameters_by_path')
for page in paginator.paginate(Path='/sagemaker/hyperpod/', Recursive=True, WithDecryption=False):
    for p in page.get('Parameters', []):
        name = p.get('Name', '')
        if name.endswith('/nemo-container-uri'):
            nemo_param_names.append(name)

nemo_param_names = sorted(set(nemo_param_names))
print('Found NeMo container SSM parameters:')
print('\n'.join(nemo_param_names) if nemo_param_names else '(none found under /sagemaker/hyperpod/)')

NEMO_IMAGE = os.environ.get('NEMO_IMAGE', '').strip()
NEMO_IMAGE_SOURCE = ''

if not NEMO_IMAGE:
    if len(nemo_param_names) == 1:
        NEMO_IMAGE_SOURCE = nemo_param_names[0]
        NEMO_IMAGE = ssm.get_parameter(Name=NEMO_IMAGE_SOURCE)['Parameter']['Value'].strip()
    elif len(nemo_param_names) > 1:
        raise ValueError(
            'Multiple nemo-container-uri parameters found. Set NEMO_IMAGE env var or reduce to one. '
            + str(nemo_param_names)
        )

require(NEMO_IMAGE, 'Could not determine NeMo container image. Set env var NEMO_IMAGE or ensure SSM has */nemo-container-uri.')
print(f'Using NeMo image: {rdct_aws(NEMO_IMAGE)}')
if NEMO_IMAGE_SOURCE:
    print(f'NEMO_IMAGE_SOURCE (SSM): {NEMO_IMAGE_SOURCE}')

# --- Discover the LLMFT custom container URI from SSM (best-effort) ---
# Stored at: /sagemaker/hyperpod/<cluster-name>/llmft-container-uri

llmft_param_names = []
paginator = ssm.get_paginator('get_parameters_by_path')
for page in paginator.paginate(Path='/sagemaker/hyperpod/', Recursive=True, WithDecryption=False):
    for p in page.get('Parameters', []):
        name = p.get('Name', '')
        if name.endswith('/llmft-container-uri'):
            llmft_param_names.append(name)

llmft_param_names = sorted(set(llmft_param_names))
print('Found LLMFT container SSM parameters:')
print('\n'.join(llmft_param_names) if llmft_param_names else '(none found under /sagemaker/hyperpod/)')

LLMFT_IMAGE_CUSTOM = os.environ.get('LLMFT_IMAGE_CUSTOM', '').strip()
LLMFT_IMAGE_CUSTOM_SOURCE = ''

if not LLMFT_IMAGE_CUSTOM:
    if len(llmft_param_names) == 1:
        LLMFT_IMAGE_CUSTOM_SOURCE = llmft_param_names[0]
        LLMFT_IMAGE_CUSTOM = ssm.get_parameter(Name=LLMFT_IMAGE_CUSTOM_SOURCE)['Parameter']['Value'].strip()
    elif len(llmft_param_names) > 1:
        raise ValueError(
            'Multiple llmft-container-uri parameters found. Set LLMFT_IMAGE_CUSTOM env var or reduce to one. '
            + str(llmft_param_names)
        )

require(LLMFT_IMAGE_CUSTOM, 'Could not determine LLMFT container image. Set env var LLMFT_IMAGE_CUSTOM or ensure SSM has */llmft-container-uri.')
print(f'Using LLMFT image: {rdct_aws(LLMFT_IMAGE_CUSTOM)}')
if LLMFT_IMAGE_CUSTOM_SOURCE:
    print(f'LLMFT_IMAGE_CUSTOM_SOURCE (SSM): {LLMFT_IMAGE_CUSTOM_SOURCE}')


Found NeMo container SSM parameters:
/sagemaker/hyperpod/nemo-hyperpod-drdwirm8n64vqf/nemo-container-uri
Using NeMo image: <ACCOUNT_ID>.dkr.ecr.us-east-1.amazonaws.com/nemo-framework-hyperpod:25.04-eks
NEMO_IMAGE_SOURCE (SSM): /sagemaker/hyperpod/nemo-hyperpod-drdwirm8n64vqf/nemo-container-uri
Found LLMFT container SSM parameters:
/sagemaker/hyperpod/nemo-hyperpod-drdwirm8n64vqf/llmft-container-uri
Using LLMFT image: <ACCOUNT_ID>.dkr.ecr.us-east-1.amazonaws.com/hyperpod-recipes-llmft-custom:llmft-v1.0.0-llama-custom
LLMFT_IMAGE_CUSTOM_SOURCE (SSM): /sagemaker/hyperpod/nemo-hyperpod-drdwirm8n64vqf/llmft-container-uri


## 3b) Hugging Face access token (Secrets Manager)

Some models/datasets (for example, Llama) require an authenticated Hugging Face token.

This notebook reads the token from AWS Secrets Manager (`nemo-container-build/hf-access-token`) and sets standard environment variables used by Hugging Face libraries.

We also point Hugging Face caches at FSx so downloads persist across runs.

**Security note:** the token is never printed.

In [6]:
import base64
import json
import boto3

print_header('Hugging Face token + cache config')

HF_SECRET_ID = os.environ.get('HF_SECRET_ID', 'nemo-container-build/hf-access-token').strip()
secrets = boto3.client('secretsmanager', region_name=REGION)

hf_token = ''
try:
    resp = secrets.get_secret_value(SecretId=HF_SECRET_ID)
    secret_str = resp.get('SecretString')
    if not secret_str and resp.get('SecretBinary'):
        secret_str = base64.b64decode(resp['SecretBinary']).decode('utf-8')
    secret_str = (secret_str or '').strip()
    if secret_str:
        try:
            data = json.loads(secret_str)
            hf_token = (
                data.get('token')
                or data.get('hf_token')
                or data.get('HUGGING_FACE_HUB_TOKEN')
                or data.get('HF_TOKEN')
                or ''
            ).strip()
        except Exception:
            hf_token = secret_str
except Exception as e:
    print(f'WARNING: Could not read HF token secret ({HF_SECRET_ID}): {e}')

if hf_token:
    os.environ['HF_TOKEN'] = hf_token
    print(f'Resolved HF token from Secrets Manager: {HF_SECRET_ID} (length={len(hf_token)})')
else:
    print('HF token not set. Public models/datasets will still work, but gated repos will fail.')

# Put Hugging Face caches on FSx so downloads persist (Space + jobs share the same filesystem).
HF_HOME_DIR = FSX_BASE_DIR / 'hf-home'
HF_HUB_CACHE_DIR = HF_HOME_DIR / 'hub'
HF_DATASETS_CACHE_DIR = HF_HOME_DIR / 'datasets'
HF_ASSETS_CACHE_DIR = HF_HOME_DIR / 'assets'
for d in [HF_HOME_DIR, HF_HUB_CACHE_DIR, HF_DATASETS_CACHE_DIR, HF_ASSETS_CACHE_DIR]:
    ensure_dir(d)

os.environ['HF_HOME'] = str(HF_HOME_DIR)
os.environ['HF_HUB_CACHE'] = str(HF_HUB_CACHE_DIR)
os.environ['HF_DATASETS_CACHE'] = str(HF_DATASETS_CACHE_DIR)
os.environ['HF_ASSETS_CACHE'] = str(HF_ASSETS_CACHE_DIR)
# Optional compatibility for older Transformers docs/code: keep this aligned with HF_HUB_CACHE
os.environ.setdefault('TOKENIZERS_PARALLELISM', 'false')

print('HF_HOME:', os.environ['HF_HOME'])
print('HF_HUB_CACHE:', os.environ['HF_HUB_CACHE'])
print('HF_DATASETS_CACHE:', os.environ['HF_DATASETS_CACHE'])
print('HF_ASSETS_CACHE:', os.environ['HF_ASSETS_CACHE'])



Hugging Face token + cache config
Resolved HF token from Secrets Manager: nemo-container-build/hf-access-token (length=37)
HF_HOME: /home/sagemaker-user/custom-file-systems/fsx_lustre/fs-03b1953d09801303c/smus-nemo-smoke/hf-home
HF_HUB_CACHE: /home/sagemaker-user/custom-file-systems/fsx_lustre/fs-03b1953d09801303c/smus-nemo-smoke/hf-home/hub
HF_DATASETS_CACHE: /home/sagemaker-user/custom-file-systems/fsx_lustre/fs-03b1953d09801303c/smus-nemo-smoke/hf-home/datasets
HF_ASSETS_CACHE: /home/sagemaker-user/custom-file-systems/fsx_lustre/fs-03b1953d09801303c/smus-nemo-smoke/hf-home/assets


## 4) Platform validation (HyperPod + EKS add-ons)

Now we sanity-check the *platform* before we spend time running workloads.

This section:
1. Describes the SageMaker HyperPod cluster and confirms it’s using an **EKS orchestrator**.
2. Extracts the underlying EKS cluster name and lists installed add-ons.
3. Sets guardrails we use later:
   - `HAS_TRAINING_OPERATOR` means the HyperPod **training operator** add-on is installed (it provides the `HyperPodPytorchJob` Kubernetes API + controller that turns a job spec into real training pods, and adds resiliency features like surgical recovery and hang monitoring).
   - `HAS_TASK_GOV` means HyperPod **task governance** is installed (it integrates with Kueue for admission/priority/preemption; when it’s present, Kueue queue/priority labels and topology hints are meaningful).
4. (Optional) Updates your local `kubectl` context so we can inspect Kubernetes objects when troubleshooting.

**Note:** some kubectl builds don’t support `kubectl version --client --short`; that warning is harmless.

In [7]:
import boto3

sm = boto3.client('sagemaker', region_name=REGION)
eks = boto3.client('eks', region_name=REGION)

print_header('HyperPod cluster')
cluster_desc = sm.describe_cluster(ClusterName=HYPERPOD_CLUSTER_NAME)
CLUSTER_STATUS = cluster_desc.get('ClusterStatus')
ORCHESTRATOR = cluster_desc.get('Orchestrator', {})
ORCHESTRATOR_TYPE = 'EKS' if ORCHESTRATOR.get('Eks') else None
EKS_CLUSTER_ARN = ORCHESTRATOR.get('Eks', {}).get('ClusterArn', '')
EKS_CLUSTER_NAME = EKS_CLUSTER_ARN.split('/')[-1] if EKS_CLUSTER_ARN else ''
print('ClusterStatus:', CLUSTER_STATUS)
print('OrchestratorType:', ORCHESTRATOR_TYPE)

require(ORCHESTRATOR_TYPE == 'EKS', f'Expected EKS orchestrator, got: {ORCHESTRATOR}')
require(EKS_CLUSTER_NAME, 'Could not derive EKS cluster name from Orchestrator.Eks.ClusterArn')

print_header('EKS add-ons')
addons = eks.list_addons(clusterName=EKS_CLUSTER_NAME).get('addons', [])
print('Add-ons:', addons)

HAS_TRAINING_OPERATOR = 'amazon-sagemaker-hyperpod-training-operator' in addons
HAS_TASK_GOV = 'amazon-sagemaker-hyperpod-taskgovernance' in addons
print('HAS_TRAINING_OPERATOR:', HAS_TRAINING_OPERATOR)
print('HAS_TASK_GOV:', HAS_TASK_GOV)

TRAINING_OPERATOR_VERSION = None
TASK_GOV_VERSION = None
if HAS_TRAINING_OPERATOR:
    TRAINING_OPERATOR_VERSION = eks.describe_addon(
        clusterName=EKS_CLUSTER_NAME,
        addonName='amazon-sagemaker-hyperpod-training-operator'
    )['addon'].get('addonVersion')
if HAS_TASK_GOV:
    TASK_GOV_VERSION = eks.describe_addon(
        clusterName=EKS_CLUSTER_NAME,
        addonName='amazon-sagemaker-hyperpod-taskgovernance'
    )['addon'].get('addonVersion')

print('TRAINING_OPERATOR_VERSION:', TRAINING_OPERATOR_VERSION)
print('TASK_GOV_VERSION:', TASK_GOV_VERSION)

if not HAS_TRAINING_OPERATOR:
    print('WARNING: Training operator not found. Recipe-based HyperPod jobs will be skipped.')
if not HAS_TASK_GOV:
    print('WARNING: Task governance not found. Kueue/topology scheduling will be skipped.')


print_header('kubectl context')
import shutil
from urllib.request import urlopen, urlretrieve
import platform

if RUN_KUBECTL_CONTEXT:
    # kubeconfig update does not require kubectl
    cmd = f"aws eks update-kubeconfig --name {EKS_CLUSTER_NAME} --region {REGION}"
    rc, out = run(cmd, check=False)
    print(rdct_aws(out))
    if rc != 0:
        raise RuntimeError('Failed to update kubeconfig. Check AWS credentials and permissions.')

    if shutil.which('kubectl'):
        print('kubectl already available:', run('kubectl version --client --short', check=False)[1])
    else:
        require(EKS_CLUSTER_NAME, 'Could not resolve EKS cluster name for kubectl install')
        eks = boto3.client('eks', region_name=REGION)
        k8s_version = eks.describe_cluster(name=EKS_CLUSTER_NAME)['cluster']['version']  # e.g., '1.30'
        major_minor = '.'.join(k8s_version.split('.')[:2])

        # Prefer EKS-hosted kubectl if path is provided; otherwise derive upstream version
        eks_s3_path = os.environ.get('KUBECTL_EKS_S3_PATH', '').strip()
        upstream_version = ''
        if not eks_s3_path:
            try:
                with urlopen(f'https://dl.k8s.io/release/stable-{major_minor}.txt') as r:
                    upstream_version = r.read().decode().strip()
            except Exception as e:
                print('WARN: Could not resolve upstream kubectl version:', e)

        arch = platform.machine().lower()
        arch = 'amd64' if arch in ('x86_64', 'amd64') else 'arm64' if arch in ('aarch64', 'arm64') else 'amd64'
        os_name = 'linux'

        url = ''
        if eks_s3_path:
            s3_region = os.environ.get('KUBECTL_S3_REGION', '').strip() or REGION
            url = f'https://s3.{s3_region}.amazonaws.com/amazon-eks/{eks_s3_path}/bin/{os_name}/{arch}/kubectl'
        elif upstream_version:
            url = f'https://dl.k8s.io/release/{upstream_version}/bin/{os_name}/{arch}/kubectl'
        else:
            print('Could not determine kubectl download URL; skipping install.')

        if url:
            dest_dir = Path.home() / '.local' / 'bin'
            dest_dir.mkdir(parents=True, exist_ok=True)
            dest = dest_dir / 'kubectl'
            print('Downloading kubectl:', url)
            urlretrieve(url, dest)
            dest.chmod(0o755)
            os.environ['PATH'] = f'{dest_dir}:' + os.environ.get('PATH', '')
            print('kubectl installed at', dest)
            print('kubectl version:', run('kubectl version --client', check=False)[1])

    if shutil.which('kubectl'):
        _, ctx = run('kubectl config current-context', check=False)
        print('Current kubectl context:', rdct_aws(ctx))
else:
    print('Skipping kubectl setup (RUN_KUBECTL_CONTEXT=False).')



HyperPod cluster
ClusterStatus: InService
OrchestratorType: EKS

EKS add-ons
Add-ons: ['amazon-sagemaker-hyperpod-observability', 'amazon-sagemaker-hyperpod-taskgovernance', 'amazon-sagemaker-hyperpod-training-operator', 'aws-fsx-csi-driver', 'cert-manager', 'coredns', 'eks-pod-identity-agent', 'kube-proxy', 'vpc-cni']
HAS_TRAINING_OPERATOR: True
HAS_TASK_GOV: True
TRAINING_OPERATOR_VERSION: v1.2.0-eksbuild.1
TASK_GOV_VERSION: v1.3.1-eksbuild.1

kubectl context
Updated context <ARN> in /home/sagemaker-user/.kube/config
Downloading kubectl: https://dl.k8s.io/release/v1.33.7/bin/linux/amd64/kubectl
kubectl installed at /home/sagemaker-user/.local/bin/kubectl
kubectl version: Client Version: v1.33.7
Kustomize Version: v5.6.0
Current kubectl context: <ARN>


## 5) Storage & scheduling validation (FSx + real HyperPod pod)

We do two writes to the same FSx location:
- Write from the Space (local FSx mount) → proves the Space has read/write access.
- Write from a HyperPod training pod → proves pods mount the same filesystem and can write back.



### 5a) FSx local sanity check (Space → FSx)

We create a run-specific directory on FSx and write a small “sentinel” file from the Space.

Pay attention to:
- `FSX_BASE_DIR`: where we write (under FSx)
- `RUN_ID`: part of the filename, so reruns don’t overwrite earlier runs

**Success looks like:** the cell prints a file path under `.../smus-nemo-smoke/` and reads back the timestamp it just wrote. You should also be able to see the file in the Jupyter file browser under the home symlink to FSx (for example `~/custom-file-systems/fsx_lustre/<fs-id>/smus-nemo-smoke/`).

In [8]:
import time
from datetime import datetime

# --- Local FSx sanity check ---
if RUN_FSX_CHECKS:
    print_header('FSx local sanity check')
    ensure_dir(FSX_BASE_DIR)
    sentinel = FSX_BASE_DIR / f'space_sentinel_{RUN_ID}.txt'
    sentinel.write_text(f'space-write {datetime.utcnow().isoformat()}\n')
    print('Wrote:', sentinel)
    print('Read:', sentinel.read_text().strip())
else:
    print('Skipping FSx local sanity check (RUN_FSX_CHECKS=False).')


Skipping FSx local sanity check (RUN_FSX_CHECKS=False).


### 5b) HyperPod SDK + FSx PVC (pinned + predictable)

Before submitting anything, we do two practical checks:

1. The FSx PersistentVolumeClaim exists in your user namespace (default: `fsx-claim`).
2. The notebook can import the pinned HyperPod SDK (`sagemaker-hyperpod==3.5.0`) using the stable module layout:
   `from sagemaker.hyperpod.training import HyperPodPytorchJob`

**Parameters you might tweak:**
- `HYPERPOD_NAMESPACE` (defaults to `hyperpod-ns-datascientist1`)
- `FSX_PVC_NAME` (defaults to `fsx-claim`)

**Success looks like:** `kubectl get pvc ...` shows `Bound`, and the import cell runs without errors.

In [9]:
namespace = os.environ.get('HYPERPOD_NAMESPACE', '').strip() or 'hyperpod-ns-datascientist1'
os.environ['HYPERPOD_NAMESPACE'] = namespace

In [10]:
!kubectl get pvc -n $HYPERPOD_NAMESPACE

NAME        STATUS   VOLUME                              CAPACITY   ACCESS MODES   STORAGECLASS   VOLUMEATTRIBUTESCLASS   AGE
fsx-claim   Bound    fsx-pv-hyperpod-ns-datascientist1   1200Gi     RWX                           <unset>                 2d12h


In [11]:
# --- HyperPod pod write/read check (Python SDK) ---
RUN_FSX_POD_TEST = RUN_FSX_CHECKS and HAS_TRAINING_OPERATOR
if RUN_FSX_POD_TEST:
    print_header('FSx pod write test (HyperPodPytorchJob)')
    try:
        from sagemaker.hyperpod.training.hyperpod_pytorch_job import HyperPodPytorchJob
        from sagemaker.hyperpod.training.config.hyperpod_pytorch_job_unified_config import (
            ReplicaSpec, Template, Spec, Containers, Resources, RunPolicy
        )
    except Exception as e:
        raise e

    import json
    import shutil
else:
    if not RUN_FSX_CHECKS:
        print('Skipping FSx pod test (RUN_FSX_CHECKS=False).')
    elif not HAS_TRAINING_OPERATOR:
        print('Skipping FSx pod test (training operator not available).')


Skipping FSx pod test (RUN_FSX_CHECKS=False).


### 5c) Kueue/task governance discovery (queue, priority, topology)

If Task Governance is installed, HyperPod uses **Kueue** to decide whether your job is admitted and *where* it can run.

This cell auto-discovers (via `kubectl`) some defaults for this namespace:
- LocalQueue name (used for admission)
- Priority class (used to sort work)
- Topology label (optional) for topology-aware placement

**Overrides (optional):**
- `KUEUE_QUEUE_NAME`
- `KUEUE_PRIORITY_CLASS`
- `KUEUE_TOPOLOGY_LABEL`
- `KUEUE_TOPOLOGY_MODE` = `preferred` or `required`

In [12]:
if RUN_FSX_POD_TEST:
    print_header('Kueue discovery (namespace-scoped)')

    # --- Env overrides (optional) ---
    KUEUE_QUEUE_NAME = os.environ.get('KUEUE_QUEUE_NAME', '').strip()
    KUEUE_PRIORITY_CLASS = os.environ.get('KUEUE_PRIORITY_CLASS', 'interactive-priority').strip()
    KUEUE_TOPOLOGY_LABEL = os.environ.get('KUEUE_TOPOLOGY_LABEL', '').strip()
    KUEUE_TOPOLOGY_MODE = (os.environ.get('KUEUE_TOPOLOGY_MODE', '') or 'preferred').strip().lower()
    if KUEUE_TOPOLOGY_MODE not in ['preferred', 'required']:
        KUEUE_TOPOLOGY_MODE = 'preferred'

    DEFAULT_KUEUE_PRIORITY_CLASS = os.environ.get('DEFAULT_KUEUE_PRIORITY_CLASS', 'training-priority').strip() or 'training-priority'
    job_namespace = os.environ.get('HYPERPOD_NAMESPACE', '').strip() or 'hyperpod-ns-datascientist1'

    def _extract_json(blob):
        if not blob:
            return None
        for idx, ch in enumerate(blob):
            if ch in '{[':
                try:
                    return json.loads(blob[idx:])
                except Exception:
                    return None
        return None

    def _kubectl_json(cmd, label=None):
        if not shutil.which('kubectl'):
            return None
        rc, out = run(cmd, check=False)
        if rc != 0 or not out:
            if label:
                print(f"{label} not visible: {out or '(no output)'}")
            return None
        data = _extract_json(out)
        if data is None and label:
            print(f"{label} not visible (non-JSON output): {out[:200]}")
        return data

    def _can_i(resource, namespace=None):
        cmd = f"kubectl auth can-i list {resource}"
        if namespace:
            cmd += f" -n {namespace}"
        rc, out = run(cmd, check=False)
        if rc != 0:
            return False
        last = out.strip().splitlines()[-1].strip().lower() if out else ''
        return last == 'yes'

    def _list_localqueues(ns):
        if not _can_i('localqueues.kueue.x-k8s.io', namespace=ns):
            return []
        data = _kubectl_json(f"kubectl get localqueue -n {ns} -o json", label=f"LocalQueues in {ns}")
        if not data:
            return []
        items = data.get('items', [])
        return [i.get('metadata', {}).get('name', '') for i in items if i.get('metadata', {}).get('name', '')]

    def _list_workloadpriorityclasses():
        if not _can_i('workloadpriorityclasses.kueue.x-k8s.io'):
            return []
        data = _kubectl_json('kubectl get workloadpriorityclass -o json', label='WorkloadPriorityClasses')
        if not data:
            return []
        items = data.get('items', [])
        result = []
        for item in items:
            name = item.get('metadata', {}).get('name', '')
            value = item.get('value') or 0
            if name:
                result.append({'name': name, 'value': value})
        return result

    def _list_priorityclasses():
        if not _can_i('priorityclasses.scheduling.k8s.io'):
            return []
        data = _kubectl_json('kubectl get priorityclass -o json', label='PriorityClasses')
        if not data:
            return []
        items = data.get('items', [])
        result = []
        for item in items:
            name = item.get('metadata', {}).get('name', '')
            value = item.get('value') or 0
            if name:
                result.append({'name': name, 'value': value})
        return result

    def _list_topology_labels():
        if not _can_i('nodes'):
            return {}
        data = _kubectl_json('kubectl get nodes -o json', label='Topology labels')
        if not data:
            return {}
        labels_map = {}
        for node in data.get('items', []):
            labels = node.get('metadata', {}).get('labels', {}) or {}
            for key, value in labels.items():
                if key.startswith('topology.') or key.startswith('topology.k8s.aws/'):
                    labels_map.setdefault(key, set()).add(value)
        return {k: sorted(v) for k, v in labels_map.items()}

    def _auto_localqueue(localqueues):
        if not localqueues:
            return ''
        expected = f"{job_namespace}-localqueue"
        for name in localqueues:
            if name == expected:
                return name
        return localqueues[0]

    def _auto_workload_priority(items):
        if not items:
            return ''
        def score(item):
            name = item.get('name', '')
            value = item.get('value') or 0
            return (1 if 'high' in name else 0, value)
        items = sorted(items, key=score, reverse=True)
        return items[0].get('name', '')

    def _auto_priority_class(items):
        if not items:
            return ''
        candidates = [i for i in items if not i.get('name', '').startswith('system-')]
        if not candidates:
            candidates = items
        candidates = sorted(candidates, key=lambda i: i.get('value') or 0, reverse=True)
        return candidates[0].get('name', '')

    def _auto_topology_label(labels_map):
        if not labels_map:
            return ''
        preferred_order = [
            'topology.kubernetes.io/zone',
            'topology.k8s.aws/zone-id',
            'topology.k8s.aws/network-node-layer-3',
            'topology.k8s.aws/network-node-layer-2',
            'topology.k8s.aws/network-node-layer-1',
        ]
        for key in preferred_order:
            if key in labels_map:
                return key
        for key in sorted(labels_map.keys()):
            if key.startswith('topology.k8s.aws/'):
                return key
        return ''

    if HAS_TASK_GOV:
        print('Using namespace:', job_namespace)
        localqueues = _list_localqueues(job_namespace)
        workload_priorities = _list_workloadpriorityclasses()
        priority_classes = _list_priorityclasses()
        topology_labels = _list_topology_labels()

        print(f"LocalQueues in {job_namespace}:", localqueues or '(none visible)')
        if workload_priorities:
            print('WorkloadPriorityClasses:', [(i['name'], i['value']) for i in workload_priorities])
        else:
            print('WorkloadPriorityClasses: (none visible)')
        if priority_classes:
            print('PriorityClasses:', [(i['name'], i['value']) for i in priority_classes])
        else:
            print('PriorityClasses: (none visible)')
        if topology_labels:
            print('Topology labels (key -> values):')
            for key in sorted(topology_labels.keys()):
                print(f"  {key}: {topology_labels[key]}")
        else:
            print('Topology labels: (none visible)')

        print_header('Kueue defaults (selected for this run)')
        if not KUEUE_QUEUE_NAME:
            KUEUE_QUEUE_NAME = _auto_localqueue(localqueues)
        if not KUEUE_PRIORITY_CLASS:
            KUEUE_PRIORITY_CLASS = _auto_workload_priority(workload_priorities) or _auto_priority_class(priority_classes) or DEFAULT_KUEUE_PRIORITY_CLASS
        if RUN_KUEUE_TOPOLOGY and not KUEUE_TOPOLOGY_LABEL:
            KUEUE_TOPOLOGY_LABEL = _auto_topology_label(topology_labels)

        print('Kueue queue:', KUEUE_QUEUE_NAME or '(not set)')
        print('Kueue priority class:', KUEUE_PRIORITY_CLASS or '(not set)')
        if RUN_KUEUE_TOPOLOGY:
            print('Kueue topology label:', KUEUE_TOPOLOGY_LABEL or '(not set)')
    else:
        print('Task governance not enabled; skipping Kueue discovery.')


### 5d) Build a minimal HyperPod job spec (with labels/annotations)

Here we assemble the smallest possible “real” HyperPod job:
- 1 replica, 1 process (`nprocPerNode=1`)
- Mounts the FSx PVC at `/fsx`
- Requests 1 GPU (to land on training nodes)
- Runs a short bash command that writes `pod_sentinel_<RUN_ID>.txt` into FSx

When Task Governance is enabled, we also attach:
- `kueue.x-k8s.io/queue-name`
- `kueue.x-k8s.io/priority-class`
- `kueue.x-k8s.io/podset-(preferred|required)-topology` (if configured)

In [13]:
if RUN_FSX_POD_TEST:
    pod_labels = {}
    pod_annotations = {}
    job_labels = {}
    job_annotations = {}
    if HAS_TASK_GOV and KUEUE_QUEUE_NAME:
        pod_labels['kueue.x-k8s.io/queue-name'] = KUEUE_QUEUE_NAME
        job_labels['kueue.x-k8s.io/queue-name'] = KUEUE_QUEUE_NAME
    if HAS_TASK_GOV and KUEUE_PRIORITY_CLASS:
        pod_labels['kueue.x-k8s.io/priority-class'] = KUEUE_PRIORITY_CLASS
        job_labels['kueue.x-k8s.io/priority-class'] = KUEUE_PRIORITY_CLASS
    if HAS_TASK_GOV and RUN_KUEUE_TOPOLOGY and KUEUE_TOPOLOGY_LABEL:
        key = 'kueue.x-k8s.io/podset-preferred-topology' if KUEUE_TOPOLOGY_MODE == 'preferred' else 'kueue.x-k8s.io/podset-required-topology'
        pod_annotations[key] = KUEUE_TOPOLOGY_LABEL
        job_annotations[key] = KUEUE_TOPOLOGY_LABEL

    FSX_TEST_GPU = '1'  # request GPU to force scheduling on training nodes
    FSX_PVC_NAME = os.environ.get('FSX_PVC_NAME', 'fsx-claim')
    fsx_pod_path = f"{FSX_POD_PREFIX}/{FSX_PVC_SUBPATH}/{FSX_BASE_DIRNAME}"
    sentinel_name = f'pod_sentinel_{RUN_ID}.txt'

    inner_cmd = (
        f'mkdir -p {fsx_pod_path}; '
        f'echo pod-write $(hostname) $(date -u +%Y-%m-%dT%H:%M:%SZ) > {fsx_pod_path}/{sentinel_name}; '
        f'ls -l {fsx_pod_path}/{sentinel_name}'
    )

    cmd = (
        'set -euo pipefail; '
        'if ! command -v hyperpodrun >/dev/null 2>&1; then '
        '  echo "hyperpodrun not found; installing hyperpod-elastic-agent"; '
        '  python -m pip install -q hyperpod-elastic-agent; '
        'fi; '
        f'hyperpodrun --nnodes 1 --nproc-per-node 1 --rdzv-backend hyperpod --no-python /bin/bash -lc \"{inner_cmd}\"'
    )

    replica_specs = [
        ReplicaSpec(
            name='pod',
            replicas=1,
            template=Template(
                metadata={
                    'labels': pod_labels if pod_labels else None,
                    'annotations': pod_annotations if pod_annotations else None,
                },
                spec=Spec(
                    containers=[
                        Containers(
                            name='fsx-test',
                            image=LLMFT_IMAGE_CUSTOM,
                            image_pull_policy='Always',
                            resources=Resources(
                                requests={'nvidia.com/gpu': FSX_TEST_GPU},
                                limits={'nvidia.com/gpu': FSX_TEST_GPU},
                            ),
                            command=['bash', '-lc'],
                            args=[cmd],
                            volume_mounts=[{'name': 'fsx-claim', 'mount_path': FSX_POD_PREFIX}],
                        )
                    ],
                    volumes=[{'name': 'fsx-claim', 'persistent_volume_claim': {'claim_name': FSX_PVC_NAME}}],
                ),
            ),
        ),
    ]

    job_name = f'fsx-test-{RUN_ID}'

    metadata = {
        'name': job_name,
        'namespace': job_namespace or None,
        'labels': job_labels if job_labels else None,
        'annotations': job_annotations if job_annotations else None,
    }

    job_kwargs = dict(
        metadata=metadata,
        nproc_per_node='1',
        replica_specs=replica_specs,
        run_policy=RunPolicy(clean_pod_policy='OnlyComplete'),
    )
    model_fields = getattr(HyperPodPytorchJob, 'model_fields', None) or getattr(HyperPodPytorchJob, '__fields__', {})


    pytorch_job = HyperPodPytorchJob(**job_kwargs)
    print_header('FSx test job spec (computed)')
    try:
        job_spec = pytorch_job.model_dump(exclude_none=True)
    except Exception:
        job_spec = job_kwargs
    print(rdct_aws(json.dumps(job_spec, indent=2, default=str)))


### 5e) Submit, poll, and verify on FSx

We submit the job via the Python SDK, poll until it completes, then verify the sentinel file is visible from the Space FSx mount.

What you might see while polling:
- `Suspended`: waiting for Kueue admission / scheduling (common when the cluster is busy)
- `Completed`: success
- `Faulted`: something went wrong (check pod logs / events)

**Success looks like:** the notebook prints `Pod wrote file: .../pod_sentinel_<RUN_ID>.txt` and shows its contents (hostname + timestamp). You should also see the pod written file in the mounted fsx file system folder

In [14]:
if RUN_FSX_POD_TEST:
    if 'pytorch_job' not in globals():
        pytorch_job = HyperPodPytorchJob(**job_kwargs)
    print('Submitting job:', job_name)
    pytorch_job.create()

    os.environ['FSX_TEST_JOB_NAME'] = job_name
    os.environ['HYP_JOB_NAME'] = job_name
    os.environ['FSX_TEST_SENTINEL_PATH'] = str(FSX_BASE_DIR / sentinel_name)
    os.environ['FSX_TEST_POD_PATH'] = str(fsx_pod_path)
    print('FSX_TEST_JOB_NAME set:', job_name)
    print('Note: first run may take ~10 minutes due to initial image pull.')
    print('Next: run the status cell below to check progress and locate the sentinel file.')


In [15]:
# --- FSx test job status (async) ---
job_name = os.environ.get('FSX_TEST_JOB_NAME', '').strip()
if not job_name:
    print('FSX_TEST_JOB_NAME not set. Run the submission cell first.')
else:
    os.environ['FSX_TEST_JOB_NAME'] = job_name
    !hyp list hyp-pytorch-job -n ${HYPERPOD_NAMESPACE} | grep -F -- "${FSX_TEST_JOB_NAME}" || echo '(job not found)'
    list_status, list_age, _line = get_hyp_list_status(job_name, namespace)
    phase, conds = get_hyp_job_status(job_name, namespace)
    phase = phase or list_status
    print('Job:', job_name)
    print('Phase:', phase if phase is not None else '(unavailable)')
    if list_age:
        print('Age:', list_age)
    if conds:
        print('Conditions:', conds)
    if phase == 'Completed':
        sentinel_path = os.environ.get('FSX_TEST_SENTINEL_PATH', '').strip()
        if sentinel_path:
            print('Job completed. Check the shared FSx folder for:', sentinel_path)
        else:
            print('Job completed. Check the shared FSx folder for the sentinel file.')
    elif phase in {'Failed', 'Faulted'}:
        print('Job failed. Re-run this cell or use hyp describe for details.')
    elif phase is None:
        print('Status unavailable (kubectl not ready?). You can use hyp describe in a terminal if needed (may show env vars).')
    else:
        print('Job still running. Re-run this cell in a few minutes.')


FSX_TEST_JOB_NAME not set. Run the submission cell first.


# Section 7 —  Training with HyperPod Recipes (Primary Path)

This section is the **training** for this notebook. We use **HyperPod Recipes** end‑to‑end to show:

- **SFT LoRA** (multi‑node + elastic training)
- **DPO** (preference alignment)
- **Checkpointless training** (fast recovery)
- **Kueue + topology‑aware scheduling**
- **Cluster‑based inference at multiple stages** (base → SFT → DPO)

We will dig into the recipes and hyperpod adapter for nemo afterwards

In [16]:
# =============================================================================
# Section 7a: Shared config + toggles (Recipes primary)
# =============================================================================

print_header('Section 7 setup (Recipes primary)')

# --- Section 7 toggles ---
RUN_RECIPE_SFT = bool(str(os.environ.get('RUN_RECIPE_SFT', 'true')).lower() in {'1','true','yes'})
RUN_RECIPE_DPO = bool(str(os.environ.get('RUN_RECIPE_DPO', 'true')).lower() in {'1','true','yes'})
RUN_CHECKPOINTLESS_RAW = os.environ.get('RUN_CHECKPOINTLESS', 'auto').strip().lower()
if RUN_CHECKPOINTLESS_RAW in {'1','true','yes'}:
    RUN_CHECKPOINTLESS = True
elif RUN_CHECKPOINTLESS_RAW in {'0','false','no'}:
    RUN_CHECKPOINTLESS = False
else:
    RUN_CHECKPOINTLESS = None  # auto

RUN_INFER_BASELINE = bool(str(os.environ.get('RUN_INFER_BASELINE', 'true')).lower() in {'1','true','yes'})
RUN_INFER_POST_SFT = bool(str(os.environ.get('RUN_INFER_POST_SFT', 'true')).lower() in {'1','true','yes'})
RUN_INFER_POST_DPO = bool(str(os.environ.get('RUN_INFER_POST_DPO', 'true')).lower() in {'1','true','yes'})

# --- Namespace / Kueue / Topology ---
HYPERPOD_NAMESPACE = os.environ.get('HYPERPOD_NAMESPACE', 'hyperpod-ns-datascientist1').strip()
KUEUE_QUEUE_NAME = os.environ.get('KUEUE_QUEUE_NAME', '').strip() or f"{HYPERPOD_NAMESPACE}-localqueue"
KUEUE_PRIORITY_CLASS_TRAIN = os.environ.get('KUEUE_PRIORITY_CLASS', 'training-priority').strip()
KUEUE_PRIORITY_CLASS_INFER = os.environ.get('KUEUE_PRIORITY_CLASS_INFER', 'inference-priority').strip()

KUEUE_TOPOLOGY_LABEL_TRAIN = os.environ.get('KUEUE_TOPOLOGY_LABEL_TRAIN', 'topology.k8s.aws/network-node-layer-3').strip()
KUEUE_TOPOLOGY_LABEL_INFER = os.environ.get('KUEUE_TOPOLOGY_LABEL_INFER', 'topology.kubernetes.io/zone').strip()
KUEUE_TOPOLOGY_MODE_TRAIN = (os.environ.get('KUEUE_TOPOLOGY_MODE_TRAIN', 'preferred') or 'preferred').strip().lower()
KUEUE_TOPOLOGY_MODE_INFER = (os.environ.get('KUEUE_TOPOLOGY_MODE_INFER', 'preferred') or 'preferred').strip().lower()

if KUEUE_TOPOLOGY_MODE_TRAIN not in {'required', 'preferred'}:
    KUEUE_TOPOLOGY_MODE_TRAIN = 'preferred'
if KUEUE_TOPOLOGY_MODE_INFER not in {'required', 'preferred'}:
    KUEUE_TOPOLOGY_MODE_INFER = 'preferred'

# --- FSx pod-visible root ---
POD_FSX_ROOT = f"{FSX_POD_PREFIX}/{FSX_PVC_SUBPATH}/{FSX_BASE_DIRNAME}"

# --- Hugging Face cache paths (pod-visible) ---
POD_HF_HOME = f"{POD_FSX_ROOT}/hf-home"
POD_HF_HUB_CACHE = f"{POD_HF_HOME}/hub"
POD_HF_DATASETS_CACHE = f"{POD_HF_HOME}/datasets"
POD_HF_ASSETS_CACHE = f"{POD_HF_HOME}/assets"

HF_ENV = [
    {'name': 'HF_TOKEN', 'value': os.environ.get('HF_TOKEN','')},
    {'name': 'HF_HOME', 'value': POD_HF_HOME},
    {'name': 'HF_HUB_CACHE', 'value': POD_HF_HUB_CACHE},
    {'name': 'HF_DATASETS_CACHE', 'value': POD_HF_DATASETS_CACHE},
    {'name': 'HF_ASSETS_CACHE', 'value': POD_HF_ASSETS_CACHE},
    {'name': 'TOKENIZERS_PARALLELISM', 'value': 'false'},
]

SECTION7_DIR = FSX_BASE_DIR / 'section7'
RECIPE_WORK_DIR = SECTION7_DIR / 'recipes'
DATA_WORK_DIR = SECTION7_DIR / 'data'
INFER_WORK_DIR = SECTION7_DIR / 'inference'

for d in [SECTION7_DIR, RECIPE_WORK_DIR, DATA_WORK_DIR, INFER_WORK_DIR]:
    ensure_dir(d)

# --- Instance type discovery (required by recipes launcher) ---
import json as _json
import shutil as _shutil

def _find_recipes_src():
    env_val = os.environ.get('RECIPES_SRC', '').strip() or os.environ.get('HYPERPOD_RECIPES_ROOT', '').strip()
    if env_val:
        p = Path(env_val).expanduser().resolve()
        if p.exists():
            return p
        raise FileNotFoundError(f'RECIPES_SRC not found: {p}')
    candidates = []
    for base in [Path.cwd(), *Path.cwd().parents]:
        cand = base / 'sagemaker-hyperpod-recipes'
        candidates.append(cand)
        if cand.exists():
            return cand
    raise FileNotFoundError(
        'Missing repo: searched ' + ', '.join(str(c) for c in candidates) +
        '. Set RECIPES_SRC to the local clone path.'
    )

RECIPES_SRC = _find_recipes_src()

GPU_PER_NODE_MAP = {
    'p5.48xlarge': 8,
    'p5e.48xlarge': 8,
    'p5en.48xlarge': 8,
    'p4d.24xlarge': 8,
    'p4de.24xlarge': 8,
    'g5.48xlarge': 8,
    'g5.12xlarge': 4,
}

def _detect_instance_type_and_count():
    env_val = os.environ.get('HYPERPOD_INSTANCE_TYPE', '').strip()
    if env_val:
        return env_val.replace('ml.', ''), None
    if _shutil.which('kubectl'):
        rc, out = run('kubectl get nodes -o json', check=False)
        if rc == 0 and out:
            try:
                data = _json.loads(out)
                items = data.get('items', [])
                itype = None
                count = 0
                for n in items:
                    labels = n.get('metadata', {}).get('labels', {})
                    cur = labels.get('node.kubernetes.io/instance-type') or labels.get('beta.kubernetes.io/instance-type')
                    if cur:
                        cur = cur.replace('ml.', '')
                        if itype is None:
                            itype = cur
                        if cur == itype:
                            count += 1
                return itype or 'p5.48xlarge', count or None
            except Exception:
                pass
    return 'p5.48xlarge', None

INSTANCE_TYPE, INSTANCE_TYPE_COUNT = _detect_instance_type_and_count()
GPUS_PER_NODE = int(os.environ.get('GPUS_PER_NODE', '0')) or GPU_PER_NODE_MAP.get(INSTANCE_TYPE, 8)

# --- Recipe IDs (primary path) ---
RECIPE_SFT_ID = os.environ.get(
    'RECIPE_SFT_ID',
    'fine-tuning/llama/llmft_llama3_1_8b_instruct_seq4k_gpu_sft_lora'
).strip()
RECIPE_DPO_ID = os.environ.get(
    'RECIPE_DPO_ID',
    'fine-tuning/llama/llmft_llama3_1_8b_instruct_seq4k_gpu_dpo'
).strip()
RECIPE_CKPTLESS_ID = os.environ.get(
    'RECIPE_CKPTLESS_ID',
    'fine-tuning/llama/checkpointless_llama3_70b_lora'
).strip()

# --- Elastic settings for SFT/DPO ---
LLMFT_ELASTIC = bool(str(os.environ.get('LLMFT_ELASTIC', 'true')).lower() in {'1','true','yes'})
LLMFT_ELASTIC_MIN = int(os.environ.get('LLMFT_ELASTIC_MIN', '1'))
LLMFT_ELASTIC_MAX = int(os.environ.get('LLMFT_ELASTIC_MAX', '16'))
LLMFT_NUM_NODES = int(os.environ.get('LLMFT_NUM_NODES', str(LLMFT_ELASTIC_MIN)))

# Enforce at least 1 node; set >=2 for multi-node-only runs
require(LLMFT_NUM_NODES >= 1, 'LLMFT_NUM_NODES must be >=1; set >=2 for multi-node-only runs.')

# --- Checkpointless settings ---
CKPTLESS_NUM_NODES = int(os.environ.get('CKPTLESS_NUM_NODES', '2'))
require(CKPTLESS_NUM_NODES >= 2, 'CKPTLESS_NUM_NODES must be >=2 for checkpointless peer recovery.')
CKPTLESS_MAX_STEPS = int(os.environ.get('CKPTLESS_MAX_STEPS', '10'))
CHECKPOINTLESS_MODEL_NAME_OR_PATH = os.environ.get(
    'CHECKPOINTLESS_MODEL_NAME_OR_PATH',
    'meta-llama/Llama-3-70B-Instruct'
).strip()

# --- Base model for inference (no shortcuts) ---
BASE_MODEL_ID = os.environ.get('BASE_MODEL_ID', 'meta-llama/Llama-3.1-8B-Instruct').strip()

# --- Run directories (pod-visible) ---
POD_RESULTS_DIR = f"{POD_FSX_ROOT}/section7/recipes/results"
POD_SFT_TRAIN_DIR = f"{POD_RESULTS_DIR}/sft-{RUN_ID}"
POD_DPO_TRAIN_DIR = f"{POD_RESULTS_DIR}/dpo-{RUN_ID}"
POD_CKPTLESS_DIR = f"{POD_RESULTS_DIR}/checkpointless-{RUN_ID}"

POD_INFER_DIR = f"{POD_FSX_ROOT}/section7/inference"

# --- Dataset paths (pod-visible) ---
POD_SFT_DATA_TRAIN_DIR = f"{POD_FSX_ROOT}/section7/data/llmft_sft/train"
POD_SFT_DATA_VAL_DIR = f"{POD_FSX_ROOT}/section7/data/llmft_sft/val"
POD_DPO_DATA_TRAIN_DIR = f"{POD_FSX_ROOT}/section7/data/llmft_dpo/train"
POD_DPO_DATA_VAL_DIR = f"{POD_FSX_ROOT}/section7/data/llmft_dpo/val"
POD_CKPTLESS_DATA_DIR = f"{POD_FSX_ROOT}/section7/data/checkpointless_mmap"

# --- Resolve container images ---

LLMFT_IMAGE = os.environ.get('RECIPES_LLMFT_IMAGE', '').strip()
if not LLMFT_IMAGE:
    params_path = RECIPES_SRC / 'launcher' / 'recipe_templatization' / 'llmft' / 'llmft_regional_parameters.json'
    with open(params_path, 'r') as f:
        params = _json.load(f)
    llama_keys = sorted([k for k in params.keys() if k.startswith('llmft_llama')])
    image = ''
    for k in llama_keys:
        region_map = params.get(k, {}).get('k8s', {}).get('container_image', {}).get('prod', {})
        if REGION in region_map:
            image = region_map[REGION]
            break
    if not image:
        region_map = params.get('llmft', {}).get('k8s', {}).get('container_image', {}).get('prod', {})
        image = region_map.get(REGION, '')
    LLMFT_IMAGE = image

require(LLMFT_IMAGE, 'Could not resolve LLMFT container image. Set RECIPES_LLMFT_IMAGE env var.')

CKPTLESS_IMAGE = os.environ.get('RECIPES_CHECKPOINTLESS_IMAGE', '').strip()
available_ckpt_regions = []
if not CKPTLESS_IMAGE:
    ckpt_params_path = RECIPES_SRC / 'launcher' / 'recipe_templatization' / 'checkpointless' / 'checkpointless_regional_parameters.json'
    with open(ckpt_params_path, 'r') as f:
        ckpt_params = _json.load(f)
    region_map = ckpt_params.get('checkpointless_nemo', {}).get('k8s', {}).get('container_image', {}).get('prod', {})
    available_ckpt_regions = sorted(region_map.keys())
    CKPTLESS_IMAGE = region_map.get('us-west-2', '')

if RUN_CHECKPOINTLESS is None:
    RUN_CHECKPOINTLESS = bool(CKPTLESS_IMAGE)
    if not RUN_CHECKPOINTLESS:
        print(
            f'Checkpointless disabled: no image for region {REGION}. '
            f'Available regions: {available_ckpt_regions}. '
            'Set RECIPES_CHECKPOINTLESS_IMAGE or switch region.'
        )
elif RUN_CHECKPOINTLESS and not CKPTLESS_IMAGE:
    raise ValueError(
        f'No checkpointless container found for region {REGION}. '
        f'Available regions: {available_ckpt_regions}. '
        'Set RECIPES_CHECKPOINTLESS_IMAGE or switch region.'
    )

print('Section 7 toggles:')
print({
    'RUN_RECIPE_SFT': RUN_RECIPE_SFT,
    'RUN_RECIPE_DPO': RUN_RECIPE_DPO,
    'RUN_CHECKPOINTLESS': RUN_CHECKPOINTLESS,
    'RUN_INFER_BASELINE': RUN_INFER_BASELINE,
    'RUN_INFER_POST_SFT': RUN_INFER_POST_SFT,
    'RUN_INFER_POST_DPO': RUN_INFER_POST_DPO,
})
print('Kueue/Topology:')
print({
    'HYPERPOD_NAMESPACE': HYPERPOD_NAMESPACE,
    'KUEUE_QUEUE_NAME': KUEUE_QUEUE_NAME,
    'KUEUE_PRIORITY_CLASS_TRAIN': KUEUE_PRIORITY_CLASS_TRAIN,
    'KUEUE_PRIORITY_CLASS_INFER': KUEUE_PRIORITY_CLASS_INFER,
    'KUEUE_TOPOLOGY_LABEL_TRAIN': KUEUE_TOPOLOGY_LABEL_TRAIN,
    'KUEUE_TOPOLOGY_LABEL_INFER': KUEUE_TOPOLOGY_LABEL_INFER,
    'KUEUE_TOPOLOGY_MODE_TRAIN': KUEUE_TOPOLOGY_MODE_TRAIN,
    'KUEUE_TOPOLOGY_MODE_INFER': KUEUE_TOPOLOGY_MODE_INFER,
})
print('Instance type:', INSTANCE_TYPE, 'count:', INSTANCE_TYPE_COUNT)
print('GPUs per node:', GPUS_PER_NODE)
print('LLMFT nodes:', LLMFT_NUM_NODES, '(elastic:', LLMFT_ELASTIC, f'{LLMFT_ELASTIC_MIN}-{LLMFT_ELASTIC_MAX})')
print('Checkpointless nodes:', CKPTLESS_NUM_NODES)
print('POD_FSX_ROOT:', POD_FSX_ROOT)
print('LLMFT_IMAGE:', LLMFT_IMAGE)
print('CKPTLESS_IMAGE:', CKPTLESS_IMAGE)



Section 7 setup (Recipes primary)


Section 7 toggles:
{'RUN_RECIPE_SFT': True, 'RUN_RECIPE_DPO': True, 'RUN_CHECKPOINTLESS': True, 'RUN_INFER_BASELINE': True, 'RUN_INFER_POST_SFT': True, 'RUN_INFER_POST_DPO': True}
Kueue/Topology:
{'HYPERPOD_NAMESPACE': 'hyperpod-ns-datascientist1', 'KUEUE_QUEUE_NAME': 'hyperpod-ns-datascientist1-localqueue', 'KUEUE_PRIORITY_CLASS_TRAIN': 'training-priority', 'KUEUE_PRIORITY_CLASS_INFER': 'inference-priority', 'KUEUE_TOPOLOGY_LABEL_TRAIN': 'topology.k8s.aws/network-node-layer-3', 'KUEUE_TOPOLOGY_LABEL_INFER': 'topology.kubernetes.io/zone', 'KUEUE_TOPOLOGY_MODE_TRAIN': 'preferred', 'KUEUE_TOPOLOGY_MODE_INFER': 'preferred'}
Instance type: p4d.24xlarge count: 3
GPUs per node: 8
LLMFT nodes: 1 (elastic: True 1-16)
Checkpointless nodes: 2
POD_FSX_ROOT: /fsx/fs-03b1953d09801303c/smus-nemo-smoke
LLMFT_IMAGE: 327873000638.dkr.ecr.us-east-1.amazonaws.com/hyperpod-recipes:llmft-v1.0.0-llama
CKPTLESS_IMAGE: 839249767557.dkr.ecr.us-west-2.amazonaws.com/hyperpod-checkpointless-training:v1.0.0


In [17]:
# =============================================================================
# Section 7b: Sync recipes repo to FSx + toolchain
# =============================================================================

print_header('Sync recipes repo to FSx + toolchain')

RECIPES_FSX_DIR = RECIPE_WORK_DIR / 'sagemaker-hyperpod-recipes'

# Copy recipes repo to FSx so pods can access the same code paths
import shutil

def _sync_tree(src: Path, dst: Path):
    if not src.exists():
        raise FileNotFoundError(f"Missing repo: {src}")
    ensure_dir(dst)
    if shutil.which('rsync'):
        run(f"rsync -a --delete --exclude='.git' {src}/ {dst}/")
    else:
        shutil.copytree(src, dst, dirs_exist_ok=True)

_sync_tree(RECIPES_SRC, RECIPES_FSX_DIR)

print('RECIPES_FSX_DIR:', RECIPES_FSX_DIR)
os.environ['HYPERPOD_RECIPES_ROOT'] = str(RECIPES_FSX_DIR)

# Create/ensure venv for recipes launcher deps
VENV_DIR = Path('.venv').resolve()
VENV_PY = VENV_DIR / 'bin' / 'python'
VENV_PIP = VENV_DIR / 'bin' / 'pip'
if not VENV_PY.exists():
    run('python -m venv .venv')

# Install recipes deps into venv
run(f"{VENV_PIP} install -q -e {RECIPES_SRC}")

# Ensure helm is available (recipes launcher uses helm template)
dest_dir = Path.home() / '.local' / 'bin'
if str(dest_dir) not in os.environ.get('PATH', '').split(':'):
    os.environ['PATH'] = f"{dest_dir}:" + os.environ.get('PATH', '')

if not shutil.which('helm'):
    helm_version = os.environ.get('HELM_VERSION', 'v3.14.4')
    arch = 'amd64'
    os_name = 'linux'
    url = f"https://get.helm.sh/helm-{helm_version}-{os_name}-{arch}.tar.gz"
    dest_dir.mkdir(parents=True, exist_ok=True)
    tar_path = dest_dir / 'helm.tgz'
    run(f"curl -sSL {url} -o {tar_path}")
    run(f"tar -xzf {tar_path} -C {dest_dir}")
    helm_bin = dest_dir / f"{os_name}-{arch}" / 'helm'
    if helm_bin.exists():
        run(f"mv {helm_bin} {dest_dir / 'helm'}")
        run(f"rm -rf {dest_dir / (os_name + '-' + arch)}")
    (dest_dir / 'helm').chmod(0o755)

print('helm:', run('helm version --short', check=False)[1])



Sync recipes repo to FSx + toolchain
RECIPES_FSX_DIR: /home/sagemaker-user/custom-file-systems/fsx_lustre/fs-03b1953d09801303c/smus-nemo-smoke/section7/recipes/sagemaker-hyperpod-recipes
helm: v3.14.4+g81c902a


In [18]:
# =============================================================================
# Section 7c: Prepare datasets (SFT + DPO) and raw text for checkpointless
# =============================================================================

print_header('Prepare datasets (SFT + DPO + checkpointless raw text)')

from datasets import load_dataset
from huggingface_hub import hf_hub_download
from transformers import AutoTokenizer
import json as _json
import re as _re

LLMFT_SFT_DIR = DATA_WORK_DIR / 'llmft_sft'
LLMFT_DPO_DIR = DATA_WORK_DIR / 'llmft_dpo'
LLMFT_SFT_TRAIN_DIR = LLMFT_SFT_DIR / 'train'
LLMFT_SFT_VAL_DIR = LLMFT_SFT_DIR / 'val'
LLMFT_DPO_TRAIN_DIR = LLMFT_DPO_DIR / 'train'
LLMFT_DPO_VAL_DIR = LLMFT_DPO_DIR / 'val'
CKPTLESS_RAW_DIR = DATA_WORK_DIR / 'checkpointless_raw'

for d in [
    LLMFT_SFT_DIR,
    LLMFT_DPO_DIR,
    LLMFT_SFT_TRAIN_DIR,
    LLMFT_SFT_VAL_DIR,
    LLMFT_DPO_TRAIN_DIR,
    LLMFT_DPO_VAL_DIR,
    CKPTLESS_RAW_DIR,
]:
    ensure_dir(d)


def _clear_json(dir_path):
    for p in dir_path.glob('*.json'):
        p.unlink()


_clear_json(LLMFT_SFT_TRAIN_DIR)
_clear_json(LLMFT_SFT_VAL_DIR)
_clear_json(LLMFT_DPO_TRAIN_DIR)
_clear_json(LLMFT_DPO_VAL_DIR)

# --- Tokenizer setup ---
TOKENIZER_ID = os.environ.get('TOKENIZER_ID', BASE_MODEL_ID).strip() or BASE_MODEL_ID
HF_TOKEN = os.environ.get('HF_TOKEN') or None

tokenizer = AutoTokenizer.from_pretrained(TOKENIZER_ID, token=HF_TOKEN)
if tokenizer.pad_token is None:
    tokenizer.pad_token = tokenizer.eos_token
# Use right padding during training (collator will pad dynamically)
tokenizer.padding_side = 'right'

SFT_MAX_LEN = int(os.environ.get('SFT_MAX_LEN', '4096'))
DPO_MAX_PROMPT_LEN = int(os.environ.get('DPO_MAX_PROMPT_LEN', '2048'))
DPO_MAX_COMPLETION_LEN = int(os.environ.get('DPO_MAX_COMPLETION_LEN', '2048'))


def _apply_chat_template(messages, add_generation_prompt):
    if hasattr(tokenizer, 'apply_chat_template'):
        return tokenizer.apply_chat_template(
            messages,
            tokenize=False,
            add_generation_prompt=add_generation_prompt,
        )
    # Fallback for tokenizers without chat templates
    parts = []
    for msg in messages:
        role = msg.get('role', 'user')
        prefix = 'User: ' if role == 'user' else 'Assistant: '
        parts.append(prefix + msg.get('content', ''))
    if add_generation_prompt:
        parts.append('Assistant: ')
    return '\n\n'.join(parts)


def _tokenize(text, max_len=None, truncation_side='left'):
    tokenizer.truncation_side = truncation_side
    if max_len:
        return tokenizer(text, add_special_tokens=False, truncation=True, max_length=max_len).input_ids
    return tokenizer(text, add_special_tokens=False).input_ids


# --- SFT dataset: Dolly -> tokenized JSON ---
TRAIN_SAMPLES = int(os.environ.get('TRAIN_SAMPLES', '2000'))
VAL_SAMPLES = int(os.environ.get('VAL_SAMPLES', '200'))
SEED = int(os.environ.get('DATASET_SEED', '42'))

SFT_DATASET = os.environ.get('SFT_DATASET', 'databricks/databricks-dolly-15k')
SFT_DATAFILE = os.environ.get('SFT_DATAFILE', 'databricks-dolly-15k.jsonl')

try:
    raw = load_dataset(SFT_DATASET, split='train')
except Exception as e:
    print(f'WARNING: load_dataset failed for {SFT_DATASET} ({type(e).__name__}: {e})')
    print('Falling back to direct JSONL download via huggingface_hub...')
    data_path = hf_hub_download(
        repo_id=SFT_DATASET,
        filename=SFT_DATAFILE,
        token=HF_TOKEN,
    )
    raw = load_dataset('json', data_files=data_path, split='train')
raw = raw.shuffle(seed=SEED)

train_end = min(TRAIN_SAMPLES, len(raw))
val_end = min(TRAIN_SAMPLES + VAL_SAMPLES, len(raw))

train_raw = raw.select(range(0, train_end))
val_raw = raw.select(range(train_end, val_end))


def to_messages(example):
    instruction = example.get('instruction', '')
    context = example.get('context', '') or ''
    response = example.get('response', '')
    if context:
        user_text = f"{instruction}\n\nContext: {context}"
    else:
        user_text = instruction
    return {
        'messages': [
            {'role': 'user', 'content': user_text},
            {'role': 'assistant', 'content': response},
        ]
    }


train_messages = train_raw.map(to_messages, remove_columns=train_raw.column_names)
val_messages = val_raw.map(to_messages, remove_columns=val_raw.column_names)


def _build_sft_record(example):
    messages = example['messages']
    prompt_text = _apply_chat_template(messages[:-1], add_generation_prompt=True)
    full_text = _apply_chat_template(messages, add_generation_prompt=False)

    prompt_ids = _tokenize(prompt_text, max_len=SFT_MAX_LEN, truncation_side='left')
    full_ids = _tokenize(full_text, max_len=SFT_MAX_LEN, truncation_side='left')

    labels = list(full_ids)
    prompt_len = min(len(prompt_ids), len(labels))
    for i in range(prompt_len):
        labels[i] = -100

    return {
        'input_ids': list(full_ids),
        'attention_mask': [1] * len(full_ids),
        'labels': labels,
    }


train_sft = [
    _build_sft_record(ex) for ex in train_messages
]
val_sft = [
    _build_sft_record(ex) for ex in val_messages
]

SFT_TRAIN_JSON = LLMFT_SFT_TRAIN_DIR / 'train.json'
SFT_VAL_JSON = LLMFT_SFT_VAL_DIR / 'val.json'

with open(SFT_TRAIN_JSON, 'w') as f:
    for ex in train_sft:
        f.write(_json.dumps(ex) + '\n')

with open(SFT_VAL_JSON, 'w') as f:
    for ex in val_sft:
        f.write(_json.dumps(ex) + '\n')

print('SFT Train dir:', LLMFT_SFT_TRAIN_DIR, 'rows:', len(train_sft))
print('SFT Val dir:', LLMFT_SFT_VAL_DIR, 'rows:', len(val_sft))
print('SFT Sample (tokenized):', train_sft[0])

# --- DPO dataset: Anthropic HH -> tokenized JSON ---
DPO_DATASET = os.environ.get('DPO_DATASET', 'Anthropic/hh-rlhf')
DPO_TRAIN_SAMPLES = int(os.environ.get('DPO_TRAIN_SAMPLES', '2000'))
DPO_VAL_SAMPLES = int(os.environ.get('DPO_VAL_SAMPLES', '200'))

raw_dpo = load_dataset(DPO_DATASET)
train_split = raw_dpo['train']
val_split = raw_dpo.get('test') or raw_dpo.get('validation') or raw_dpo['train']

train_split = train_split.shuffle(seed=SEED)
val_split = val_split.shuffle(seed=SEED)

train_split = train_split.select(range(0, min(DPO_TRAIN_SAMPLES, len(train_split))))
val_split = val_split.select(range(0, min(DPO_VAL_SAMPLES, len(val_split))))

START_PROMPT_FORMAT = "User: {body}\n\nAssistant: {response}"
PROMPT_CONTINUATION_FORMAT = "{text}\n\nUser: {body}\n\nAssistant: {response}"


def _convert_anthropic(text):
    split_string = text.split("\n\nHuman: ")
    string_to_use = ""
    prompt_string_to_use = ""
    for item in split_string:
        if len(item) == 0:
            continue
        output = item.split("\n\nAssistant: ")
        if len(output) != 2:
            return None
        body, response = output
        if len(string_to_use) == 0:
            prompt_string_to_use = START_PROMPT_FORMAT.format(body=body, response="")
            string_to_use = START_PROMPT_FORMAT.format(body=body, response=response)
        else:
            prompt_string_to_use = PROMPT_CONTINUATION_FORMAT.format(text=string_to_use, body=body, response="")
            string_to_use = PROMPT_CONTINUATION_FORMAT.format(text=string_to_use, body=body, response=response)
    return string_to_use, prompt_string_to_use.rstrip()


def to_dpo(example):
    # Prefer explicit prompt/chosen/rejected if present
    if all(k in example for k in ('prompt', 'chosen', 'rejected')):
        return {
            'prompt': example['prompt'],
            'chosen': example['chosen'],
            'rejected': example['rejected'],
        }
    # Anthropic HH fallback (chosen/rejected are full conversations)
    c = _convert_anthropic(example.get('chosen', ''))
    r = _convert_anthropic(example.get('rejected', ''))
    if c is None or r is None:
        return {'prompt': None, 'chosen': None, 'rejected': None}
    chosen_text, chosen_prompt = c
    rejected_text, rejected_prompt = r
    if chosen_prompt != rejected_prompt:
        return {'prompt': None, 'chosen': None, 'rejected': None}
    return {
        'prompt': chosen_prompt,
        'chosen': chosen_text,
        'rejected': rejected_text,
    }


def _filter_none(ds):
    return ds.filter(lambda x: x['prompt'] is not None, desc='filter-none')


train_dpo = train_split.map(to_dpo, remove_columns=train_split.column_names)
val_dpo = val_split.map(to_dpo, remove_columns=val_split.column_names)
train_dpo = _filter_none(train_dpo)
val_dpo = _filter_none(val_dpo)

_UA_RE = _re.compile(r'(User|Assistant):')


def _parse_dialogue(text):
    if text is None:
        return []
    text = str(text).replace('\r\n', '\n').strip()
    if not text:
        return []
    matches = list(_UA_RE.finditer(text))
    if not matches:
        return []
    messages = []
    for idx, match in enumerate(matches):
        role = match.group(1).lower()
        start = match.end()
        end = matches[idx + 1].start() if idx + 1 < len(matches) else len(text)
        content = text[start:end].strip()
        messages.append({'role': role, 'content': content})
    return messages


def _strip_empty_tail(messages):
    while messages and messages[-1]['role'] == 'assistant' and not messages[-1]['content']:
        messages = messages[:-1]
    return messages


def _extract_last_assistant(text):
    messages = _parse_dialogue(text)
    if messages:
        for msg in reversed(messages):
            if msg['role'] == 'assistant':
                return msg.get('content', '').strip()
    return str(text).strip()


def _build_prompt_messages(prompt_text):
    messages = _parse_dialogue(prompt_text)
    if messages:
        return _strip_empty_tail(messages)
    return [{'role': 'user', 'content': str(prompt_text).strip()}]


def _build_dpo_record(prompt_text, chosen_text, rejected_text):
    prompt_messages = _build_prompt_messages(prompt_text)
    chosen_resp = _extract_last_assistant(chosen_text)
    rejected_resp = _extract_last_assistant(rejected_text)
    if not chosen_resp or not rejected_resp:
        return None

    prompt_text_rendered = _apply_chat_template(prompt_messages, add_generation_prompt=True)
    prompt_ids = _tokenize(prompt_text_rendered, max_len=DPO_MAX_PROMPT_LEN, truncation_side='left')
    chosen_ids = _tokenize(chosen_resp, max_len=DPO_MAX_COMPLETION_LEN, truncation_side='right')
    rejected_ids = _tokenize(rejected_resp, max_len=DPO_MAX_COMPLETION_LEN, truncation_side='right')

    if not prompt_ids or not chosen_ids or not rejected_ids:
        return None

    return {
        'prompt_input_ids': list(prompt_ids),
        'chosen_input_ids': list(chosen_ids),
        'rejected_input_ids': list(rejected_ids),
    }


train_dpo_tok = []
for ex in train_dpo:
    rec = _build_dpo_record(ex['prompt'], ex['chosen'], ex['rejected'])
    if rec is not None:
        train_dpo_tok.append(rec)

val_dpo_tok = []
for ex in val_dpo:
    rec = _build_dpo_record(ex['prompt'], ex['chosen'], ex['rejected'])
    if rec is not None:
        val_dpo_tok.append(rec)

DPO_TRAIN_JSON = LLMFT_DPO_TRAIN_DIR / 'train.json'
DPO_VAL_JSON = LLMFT_DPO_VAL_DIR / 'val.json'

with open(DPO_TRAIN_JSON, 'w') as f:
    for ex in train_dpo_tok:
        f.write(_json.dumps(ex) + '\n')

with open(DPO_VAL_JSON, 'w') as f:
    for ex in val_dpo_tok:
        f.write(_json.dumps(ex) + '\n')

print('DPO Train dir:', LLMFT_DPO_TRAIN_DIR, 'rows:', len(train_dpo_tok))
print('DPO Val dir:', LLMFT_DPO_VAL_DIR, 'rows:', len(val_dpo_tok))
print('DPO Sample (tokenized):', train_dpo_tok[0])

# --- Raw text for checkpointless preprocessing ---
CKPTLESS_RAW_TEXT = CKPTLESS_RAW_DIR / 'train.txt'
with open(CKPTLESS_RAW_TEXT, 'w') as f:
    for ex in train_messages:
        msgs = ex['messages']
        user = msgs[0]['content'] if msgs else ''
        assistant = msgs[1]['content'] if len(msgs) > 1 else ''
        f.write(f"User: {user}\nAssistant: {assistant}\n\n")

# Mapping file (one line per worker) for custom dataprep
CKPTLESS_MAPPING = CKPTLESS_RAW_DIR / 'mapping.txt'
CKPTLESS_MAPPING.write_text(str(CKPTLESS_RAW_TEXT) + '\n')

print('Checkpointless raw text:', CKPTLESS_RAW_TEXT)
print('Checkpointless mapping file:', CKPTLESS_MAPPING)



Prepare datasets (SFT + DPO + checkpointless raw text)


SFT Train dir: /home/sagemaker-user/custom-file-systems/fsx_lustre/fs-03b1953d09801303c/smus-nemo-smoke/section7/data/llmft_sft/train rows: 2000
SFT Val dir: /home/sagemaker-user/custom-file-systems/fsx_lustre/fs-03b1953d09801303c/smus-nemo-smoke/section7/data/llmft_sft/val rows: 200
SFT Sample (tokenized): {'input_ids': [128000, 128006, 9125, 128007, 271, 38766, 1303, 33025, 2696, 25, 6790, 220, 2366, 18, 198, 15724, 2696, 25, 220, 1627, 10263, 220, 2366, 19, 271, 128009, 128006, 882, 128007, 271, 15546, 1051, 279, 2911, 315, 279, 28812, 12471, 339, 7997, 10888, 11, 279, 5234, 6342, 315, 279, 5629, 11258, 304, 279, 4101, 362, 19508, 315, 20534, 323, 6785, 30, 128009, 128006, 78191, 128007, 271, 45030, 339, 279, 23251, 804, 11, 3842, 279, 18787, 11, 46092, 315, 279, 650, 1572, 11, 32866, 315, 279, 94672, 37080, 11, 435, 3746, 279, 59979, 11, 47809, 18787, 729, 71, 823, 11, 5340, 12490, 279, 24008, 11, 6385, 86452, 315, 279, 27206, 11, 426, 1105, 279, 15996, 261, 11, 3061, 683, 82, 279,

In [19]:
# =============================================================================
# Section 7d: Configure recipes K8s settings + helpers
# =============================================================================

print_header('Configure recipes K8s settings + helpers')

import time
import shlex

cfg_script = f"""
import yaml
from pathlib import Path

k8s_cfg_path = Path(r"{RECIPES_FSX_DIR}") / 'recipes_collection' / 'cluster' / 'k8s.yaml'
with open(k8s_cfg_path, 'r') as f:
    k8s_cfg = yaml.safe_load(f)

k8s_cfg['namespace'] = r"{HYPERPOD_NAMESPACE}"
if {HAS_TASK_GOV}:
    if not isinstance(k8s_cfg.get('custom_labels'), dict):
        k8s_cfg['custom_labels'] = dict()
    k8s_cfg['custom_labels']['kueue.x-k8s.io/priority-class'] = r"{KUEUE_PRIORITY_CLASS_TRAIN}"
k8s_cfg['queue_name'] = r"{KUEUE_QUEUE_NAME}" if {HAS_TASK_GOV} else None
k8s_cfg['annotations'] = (
    {{'kueue.x-k8s.io/podset-{KUEUE_TOPOLOGY_MODE_TRAIN}-topology': r"{KUEUE_TOPOLOGY_LABEL_TRAIN}"}}
    if {HAS_TASK_GOV} and r"{KUEUE_TOPOLOGY_LABEL_TRAIN}" else None
)

k8s_cfg['persistent_volume_claims'] = [
    {{'claimName': 'fsx-claim', 'mountPath': r"{FSX_POD_PREFIX}"}}
]

k8s_cfg['label_selector'] = {{
    'required': {{
        'sagemaker.amazonaws.com/node-health-status': ['Schedulable']
    }}
}}

k8s_cfg['cleanPodPolicy'] = 'OnlyComplete'

with open(k8s_cfg_path, 'w') as f:
    yaml.safe_dump(k8s_cfg, f, sort_keys=False)

print('Updated k8s.yaml:', k8s_cfg_path)
"""

script_path = SECTION7_DIR / f'update_k8s_{RUN_ID}.py'
script_path.write_text(cfg_script)
run(f"{VENV_PY} {script_path}")

# --- Helpers for cluster jobs (inference, dataprep) ---
if HAS_TRAINING_OPERATOR:
    from sagemaker.hyperpod.training.hyperpod_pytorch_job import HyperPodPytorchJob
    from sagemaker.hyperpod.training.config.hyperpod_pytorch_job_unified_config import (
        ReplicaSpec, Template, Spec, Containers, Resources, RunPolicy
    )
from sagemaker.hyperpod.common.config.metadata import Metadata

FSX_PVC_NAME = os.environ.get('FSX_PVC_NAME', 'fsx-claim').strip()

def submit_simple_job(name, image, command, gpu='1', labels=None, annotations=None, env=None, use_hyperpodrun=True, nnodes=1, nproc_per_node=1):
    require(HAS_TRAINING_OPERATOR, 'Training operator not available; cannot submit job.')
    env = env or []

    # Use hyperpodrun so the elastic agent can mark the job complete
    cmd = command
    if use_hyperpodrun:
        # umask is applied in cmd_parts above
        cmd_parts = [
            'set -euo pipefail; ',
            'export HOME=/tmp; ',
            'mkdir -p /tmp/hp-rdzv; ',
            'if ! command -v hyperpodrun >/dev/null 2>&1; then ',
            '  echo "hyperpodrun not found; installing hyperpod-elastic-agent"; ',
            '  python -m pip install -q hyperpod-elastic-agent; ',
            'fi; ',
            f'hyperpodrun --nnodes {nnodes} --nproc-per-node {nproc_per_node} ',
            '--rdzv-conf resource_config_dir=/tmp/hp-rdzv ',
            '--rdzv-backend hyperpod --no-python /bin/bash -lc ',
        ]
        cmd = ''.join(cmd_parts) + shlex.quote(command)

    container = Containers(
        name=name,
        image=image,
        image_pull_policy='Always',
        command=['bash', '-lc'],
        args=[cmd],
        resources=Resources(requests={'nvidia.com/gpu': gpu}, limits={'nvidia.com/gpu': gpu}),
        volume_mounts=[
            {'name': 'fsx-claim', 'mount_path': FSX_POD_PREFIX},
            {'name': 'dshm', 'mount_path': '/dev/shm'},
        ],
        env=env,
    )

    replica_spec = ReplicaSpec(
        name='worker',
        replicas=1,
        template=Template(
            metadata={'labels': labels or None, 'annotations': annotations or None},
            spec=Spec(
                containers=[container],
                volumes=[
                    {'name': 'fsx-claim', 'persistent_volume_claim': {'claim_name': FSX_PVC_NAME}},
                    {'name': 'dshm', 'empty_dir': {'medium': 'Memory', 'size_limit': '128Gi'}},
                ],
                node_selector={'sagemaker.amazonaws.com/node-health-status': 'Schedulable'},
                    ),
        ),
    )

    job = HyperPodPytorchJob(
        metadata=Metadata(name=name, namespace=HYPERPOD_NAMESPACE, labels=labels or None, annotations=annotations or None),
        nproc_per_node=str(nproc_per_node),
        replica_specs=[replica_spec],
        run_policy=RunPolicy(clean_pod_policy='OnlyComplete'),
    )
    job.create()
    return job
# Inference script (runs inside cluster)
INFER_SCRIPT = INFER_WORK_DIR / 'run_inference.py'
INFER_SCRIPT.write_text("""import json, os
from pathlib import Path
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM

BASE_MODEL = os.environ.get('BASE_MODEL')
TRAINING_DIR = os.environ.get('TRAINING_DIR', '').strip()
OUTPUT_PATH = os.environ.get('OUTPUT_PATH')
PROMPTS = json.loads(os.environ.get('PROMPTS_JSON', '[]'))
MAX_NEW_TOKENS = int(os.environ.get('MAX_NEW_TOKENS', '128'))

if not BASE_MODEL:
    raise ValueError('BASE_MODEL not set')
if not OUTPUT_PATH:
    raise ValueError('OUTPUT_PATH not set')

# Find adapter or merged model in training dir
adapter_dir = None
merged_dir = None
if TRAINING_DIR:
    root = Path(TRAINING_DIR)
    if root.exists():
        for p in root.rglob('adapter_config.json'):
            adapter_dir = p.parent
            break
        if adapter_dir is None:
            for p in root.rglob('config.json'):
                if any(p.parent.glob('*.safetensors')) or (p.parent / 'pytorch_model.bin').exists():
                    merged_dir = p.parent
                    break

if adapter_dir:
    from peft import PeftModel
    model = AutoModelForCausalLM.from_pretrained(BASE_MODEL, torch_dtype=torch.bfloat16, device_map='auto')
    model = PeftModel.from_pretrained(model, str(adapter_dir))
    tokenizer = AutoTokenizer.from_pretrained(BASE_MODEL, use_fast=True)
elif merged_dir:
    model = AutoModelForCausalLM.from_pretrained(str(merged_dir), torch_dtype=torch.bfloat16, device_map='auto')
    tokenizer = AutoTokenizer.from_pretrained(str(merged_dir), use_fast=True)
else:
    model = AutoModelForCausalLM.from_pretrained(BASE_MODEL, torch_dtype=torch.bfloat16, device_map='auto')
    tokenizer = AutoTokenizer.from_pretrained(BASE_MODEL, use_fast=True)

model.eval()

def format_prompt(text):
    if hasattr(tokenizer, 'apply_chat_template'):
        msgs = [{'role': 'user', 'content': text}]
        return tokenizer.apply_chat_template(msgs, tokenize=False, add_generation_prompt=True)
    return text

results = []
for p in PROMPTS:
    prompt = format_prompt(p)
    inputs = tokenizer(prompt, return_tensors='pt')
    inputs = {k: v.to(model.device) for k, v in inputs.items()}
    with torch.no_grad():
        out = model.generate(**inputs, max_new_tokens=MAX_NEW_TOKENS)
    text = tokenizer.decode(out[0], skip_special_tokens=True)
    results.append({'prompt': p, 'output': text})

Path(OUTPUT_PATH).parent.mkdir(parents=True, exist_ok=True)
with open(OUTPUT_PATH, 'w') as f:
    json.dump(results, f, indent=2)

print('Wrote', OUTPUT_PATH)
""")

print('Inference script:', INFER_SCRIPT)



Configure recipes K8s settings + helpers
Inference script: /home/sagemaker-user/custom-file-systems/fsx_lustre/fs-03b1953d09801303c/smus-nemo-smoke/section7/inference/run_inference.py


### Section 7d.2 — Prefetch base model to FSx cache

This downloads the base model into the **FSx Hugging Face cache** so cluster pods reuse it
instead of re-downloading from the Hub.


In [20]:
print_header('Prefetch base model to FSx cache')

PREFETCH_BASE_MODEL = bool(str(os.environ.get('PREFETCH_BASE_MODEL', 'true')).lower() in {'1','true','yes'})
if PREFETCH_BASE_MODEL:
    run('python -m pip install -q huggingface_hub transformers')
    from huggingface_hub import snapshot_download

    model_id = os.environ.get('PREFETCH_MODEL_ID', BASE_MODEL_ID).strip()
    revision = os.environ.get('PREFETCH_MODEL_REVISION', '').strip() or None

    # Prefer the HF cache already configured earlier in the notebook
    cache_dir = os.environ.get('HF_HUB_CACHE', '') or str(HF_HUB_CACHE_DIR)

    require(os.environ.get('HF_TOKEN','').strip(), 'HF_TOKEN not set; required to prefetch gated model.')
    print('Prefetching model:', model_id)
    print('Cache dir:', cache_dir)

    snapshot_download(
        repo_id=model_id,
        revision=revision,
        cache_dir=cache_dir,
        token=os.environ.get('HF_TOKEN') or None,
        local_dir_use_symlinks=False,
    )
    print('Prefetch complete.')
else:
    print('Skipping prefetch (PREFETCH_BASE_MODEL=False).')



Prefetch base model to FSx cache


Prefetching model: meta-llama/Llama-3.1-8B-Instruct
Cache dir: /home/sagemaker-user/custom-file-systems/fsx_lustre/fs-03b1953d09801303c/smus-nemo-smoke/hf-home/hub


Fetching 17 files:   0%|          | 0/17 [00:00<?, ?it/s]

Prefetch complete.


In [21]:
# =============================================================================
# Section 7e: Baseline inference + SFT LoRA (multi-node + elastic)
# =============================================================================

PROMPTS = [
    "Summarize the paragraph below in exactly 2 bullet points:\nHyperPod schedules GPU pods using topology hints so colocated nodes share faster links and lower latency. It also reduces noisy-neighbor traffic by avoiding oversubscribed network paths during distributed training. Operators can trade strict placement for faster queue times depending on cluster load.",
    "Extract the fields and output ONLY valid JSON with keys: name, company, role.\nText: 'Dr. Mina Park joined Acme Robotics as Head of Research in 2024.'",
    "Classify the sentiment as Positive, Neutral, or Negative. Output ONLY the label.\nText: 'The deployment finished on time, but the logs were noisy and confusing.'",
    "Rewrite as a short, professional email (3 sentences max):\n'hey team we broke the build again pls fix asap'",
    "Give a step-by-step list (numbered) to debug a failing Kubernetes job with no logs.",
]
PROMPTS_JSON = _json.dumps(PROMPTS)

# --- Baseline inference (base model) ---
if RUN_INFER_BASELINE and HAS_TRAINING_OPERATOR:
    print_header('Baseline inference (base model)')
    infer_name = f"infer-base-{RUN_ID}"
    out_path = f"{POD_INFER_DIR}/baseline-{RUN_ID}.json"

    cmd = f"python {POD_INFER_DIR}/run_inference.py"
    job = submit_simple_job(
        name=infer_name,
        image=LLMFT_IMAGE_CUSTOM,
        command=cmd,
        gpu='1',
        labels={
            'kueue.x-k8s.io/queue-name': KUEUE_QUEUE_NAME,
            'kueue.x-k8s.io/priority-class': KUEUE_PRIORITY_CLASS_INFER,
        } if HAS_TASK_GOV else None,
        annotations={
            f'kueue.x-k8s.io/podset-{KUEUE_TOPOLOGY_MODE_INFER}-topology': KUEUE_TOPOLOGY_LABEL_INFER,
        } if HAS_TASK_GOV else None,
        env=HF_ENV + [
            {'name': 'BASE_MODEL', 'value': BASE_MODEL_ID},
            {'name': 'TRAINING_DIR', 'value': ''},
            {'name': 'OUTPUT_PATH', 'value': out_path},
            {'name': 'PROMPTS_JSON', 'value': PROMPTS_JSON},
        ],
    )
    os.environ['INFER_BASE_JOB_NAME'] = infer_name
    os.environ['INFER_BASE_OUTPUT_PATH'] = out_path
    os.environ['HYP_JOB_NAME'] = infer_name
    print('Submitted baseline inference job:', infer_name)
    print('Note: first run may take ~10 minutes due to initial image pull.')
    print('Next: run the status cell below to check progress and then open the shared folder.')
else:
    if not RUN_INFER_BASELINE:
        print('Skipping baseline inference (RUN_INFER_BASELINE=False).')
    elif not HAS_TRAINING_OPERATOR:
        print('Skipping baseline inference (training operator not available).')



Baseline inference (base model)


Successfully submitted HyperPodPytorchJob 'infer-base-20260104-192842-98ffab'!


Submitted baseline inference job: infer-base-20260104-192842-98ffab
Note: first run may take ~10 minutes due to initial image pull.
Next: run the status cell below to check progress and then open the shared folder.


In [22]:
# --- Baseline inference job status (async) ---
job_name = os.environ.get('INFER_BASE_JOB_NAME', '').strip()
out_path = os.environ.get('INFER_BASE_OUTPUT_PATH', '').strip()
if not RUN_INFER_BASELINE:
    print('Baseline inference not run; no job to check.')
elif not job_name:
    print('INFER_BASE_JOB_NAME not set. Run the submission cell first.')
else:
    namespace = os.environ.get('HYPERPOD_NAMESPACE', '').strip() or 'hyperpod-ns-datascientist1'
    os.environ['HYPERPOD_NAMESPACE'] = namespace
    os.environ['INFER_BASE_JOB_NAME'] = job_name
    !hyp list hyp-pytorch-job -n ${HYPERPOD_NAMESPACE} | grep -F -- "${INFER_BASE_JOB_NAME}" || echo '(job not found)'
    list_status, list_age, _line = get_hyp_list_status(job_name, namespace)
    phase, conds = get_hyp_job_status(job_name, namespace)
    phase = phase or list_status
    print('Job:', job_name)
    print('Phase:', phase if phase is not None else '(unavailable)')
    if list_age:
        print('Age:', list_age)
    if conds:
        print('Conditions:', conds)
    if phase == 'Completed':
        if out_path:
            try:
                results_path = FSX_SPACE_ROOT / Path(out_path).relative_to(POD_FSX_ROOT)
            except Exception:
                results_path = None
            if results_path:
                print('Job completed. Check the shared folder for:', results_path)
            else:
                print('Job completed. Check the shared folder for the output path:', out_path)
        else:
            print('Job completed. Check the shared folder for the inference output JSON.')
    elif phase in {'Failed', 'Faulted'}:
        print('Job failed. Re-run this cell or use hyp describe for details.')
    elif phase is None:
        print('Status unavailable (kubectl not ready?). You can use hyp describe in a terminal if needed (may show env vars).')
    else:
        print('Job still running. Re-run this cell in a few minutes.')


infer-base-20260104-192842-98ffabhyperpod-ns-datascientist1Created        0m             
Job: infer-base-20260104-192842-98ffab
Phase: Running
Age: 0m
Job still running. Re-run this cell in a few minutes.


In [23]:
# --- SFT LoRA recipe ---
if RUN_RECIPE_SFT:
    print_header('Submit SFT LoRA recipe (multi-node + elastic)')

    SFT_RUN_NAME = f"llmft-sft-{RUN_ID}"
    SFT_MODEL_SAVE_NAME = os.environ.get('SFT_MODEL_SAVE_NAME', 'Meta-Llama-3.1-8B-Instruct-SFT').strip()
    SFT_MAX_EPOCHS = int(os.environ.get('SFT_MAX_EPOCHS', '1'))

    SFT_PRE_SCRIPT = [
        f'JOB_DATA_DIR="{POD_SFT_TRAIN_DIR}/data"',
        'mkdir -p "$JOB_DATA_DIR"',
        'ln -sfn "$JOB_DATA_DIR" /data'
    ]
    SFT_PRE_SCRIPT_ARG = "+recipes.pre_script=" + shlex.quote(json.dumps(SFT_PRE_SCRIPT))


    cmd = (
        f"{VENV_PY} -m launcher "
        f"recipes={RECIPE_SFT_ID} "
        f"base_results_dir={POD_RESULTS_DIR} "
        f"container={LLMFT_IMAGE_CUSTOM} "
        f"cluster=k8s cluster_type=k8s instance_type={INSTANCE_TYPE} "
        f"recipes.run.name={SFT_RUN_NAME} "
        f"recipes.run.hf_access_token={os.environ.get('HF_TOKEN','')} "
        f"recipes.trainer.num_nodes={LLMFT_NUM_NODES} recipes.trainer.devices={GPUS_PER_NODE} "
        f"recipes.elastic_policy.is_elastic={str(LLMFT_ELASTIC).lower()} "
        f"recipes.elastic_policy.min_nodes={LLMFT_ELASTIC_MIN} "
        f"recipes.elastic_policy.max_nodes={LLMFT_ELASTIC_MAX} "
        f"recipes.training_config.model_config.model_save_name={SFT_MODEL_SAVE_NAME} "
        f"recipes.training_config.training_args.training_dir={POD_SFT_TRAIN_DIR} "
        f"{SFT_PRE_SCRIPT_ARG} "
        f"recipes.training_config.training_args.max_epochs={SFT_MAX_EPOCHS} "
        f"recipes.training_config.datasets.train_data.name=llmft-train "
        f"recipes.training_config.datasets.train_data.file_path={POD_SFT_DATA_TRAIN_DIR} "
        f"recipes.training_config.datasets.val_data.name=llmft-val "
        f"recipes.training_config.datasets.val_data.file_path={POD_SFT_DATA_VAL_DIR} "
        f"+env_vars.HF_TOKEN={os.environ.get('HF_TOKEN','')} "
        f"+env_vars.HF_HOME={POD_HF_HOME} "
        f"+env_vars.HF_HUB_CACHE={POD_HF_HUB_CACHE} "
        f"+env_vars.HF_DATASETS_CACHE={POD_HF_DATASETS_CACHE} "
        f"+env_vars.HF_ASSETS_CACHE={POD_HF_ASSETS_CACHE} "
        f"+env_vars.TOKENIZERS_PARALLELISM=false "
    )

    print('Recipe:', RECIPE_SFT_ID)
    print('Command:', rdct_sens(cmd))
    run(cmd)
    # Persist the run name so the status cell can resolve the actual job name (suffix added by recipes)
    os.environ['SFT_RUN_NAME'] = SFT_RUN_NAME
    os.environ['SFT_JOB_NAME'] = ''  # clear any stale value so status cell can re-resolve
else:
    print('Skipping SFT recipe submission (RUN_RECIPE_SFT=False).')



Submit SFT LoRA recipe (multi-node + elastic)
Recipe: fine-tuning/llama/llmft_llama3_1_8b_instruct_seq4k_gpu_sft_lora
Command: /home/sagemaker-user/bobber/notebooks/.venv/bin/python -m launcher recipes=fine-tuning/llama/llmft_llama3_1_8b_instruct_seq4k_gpu_sft_lora base_results_dir=/fsx/fs-03b1953d09801303c/smus-nemo-smoke/section7/recipes/results container=<ACCOUNT_ID>.dkr.ecr.us-east-1.amazonaws.com/hyperpod-recipes-llmft-custom:llmft-v1.0.0-llama-custom cluster=k8s cluster_type=k8s instance_type=p4d.24xlarge recipes.run.name=llmft-sft-20260104-192842-98ffab recipes.run.hf_access_token=<HF_TOKEN> recipes.trainer.num_nodes=1 recipes.trainer.devices=8 recipes.elastic_policy.is_elastic=true recipes.elastic_policy.min_nodes=1 recipes.elastic_policy.max_nodes=16 recipes.training_config.model_config.model_save_name=Meta-Llama-3.1-8B-Instruct-SFT recipes.training_config.training_args.training_dir=/fsx/fs-03b1953d09801303c/smus-nemo-smoke/section7/recipes/results/sft-20260104-192842-98ffab 

In [24]:
# --- SFT recipe job status (async) ---
print_header('SFT recipe status (async)')

job_name = os.environ.get('SFT_JOB_NAME', '').strip()
job_prefix = os.environ.get('SFT_RUN_NAME', '').strip()

# If SFT_RUN_NAME not set, treat SFT_JOB_NAME as a prefix
if not job_prefix and job_name:
    job_prefix = job_name
# Fallback to RUN_ID-derived prefix
if not job_prefix:
    try:
        job_prefix = f"llmft-sft-{RUN_ID}"
    except Exception:
        job_prefix = ''

def _parse_age_to_seconds(age):
    if not age:
        return None
    age = age.strip()
    total = 0
    num = ''
    for ch in age:
        if ch.isdigit():
            num += ch
            continue
        if not num:
            continue
        val = int(num)
        num = ''
        if ch == 's':
            total += val
        elif ch == 'm':
            total += val * 60
        elif ch == 'h':
            total += val * 3600
        elif ch == 'd':
            total += val * 86400
    return total if total > 0 else None

def _find_latest_hyp_job_by_prefix_with_hyp(prefix, namespace):
    if not prefix:
        return None
    rc, out_txt = run(f"hyp list hyp-pytorch-job -n {namespace}", check=False)
    if rc != 0 or not out_txt:
        return None
    candidates = []
    for line in out_txt.splitlines():
        parsed = _parse_hyp_list_line(line, namespace)
        if not parsed:
            continue
        name, _status, age, _raw = parsed
        if prefix in name:
            age_s = _parse_age_to_seconds(age)
            candidates.append((age_s, name))
    if not candidates:
        return None
    candidates.sort(key=lambda x: (x[0] is None, x[0]))
    return candidates[0][1]



SFT recipe status (async)


In [25]:
# --- SFT inference (guarded; requires completed SFT + adapter artifacts) ---
print_header('SFT inference (guarded)')

def _resolve_fsx_path(pod_path):
    candidates = []
    # Primary: map POD path to FSx space via POD_FSX_ROOT
    try:
        candidates.append(FSX_SPACE_ROOT / Path(pod_path).relative_to(POD_FSX_ROOT))
    except Exception:
        pass
    # Fallback: map using FSX_BASE_DIRNAME in the path
    try:
        if 'FSX_BASE_DIRNAME' in globals() and FSX_BASE_DIRNAME and FSX_BASE_DIRNAME in pod_path:
            rel = pod_path.split(FSX_BASE_DIRNAME, 1)[1].lstrip('/')
            candidates.append(FSX_BASE_DIR / rel)
    except Exception:
        pass
    # Return first existing candidate
    for c in candidates:
        if c and c.exists():
            return c
    return candidates[0] if candidates else None

if not RUN_INFER_POST_SFT:

    def _resolve_fsx_path(pod_path):
        candidates = []
        # Primary: map POD path to FSx space via POD_FSX_ROOT
        try:
            candidates.append(FSX_SPACE_ROOT / Path(pod_path).relative_to(POD_FSX_ROOT))
        except Exception:
            pass
        # Fallback: map using FSX_BASE_DIRNAME in the path
        try:
            if 'FSX_BASE_DIRNAME' in globals() and FSX_BASE_DIRNAME and FSX_BASE_DIRNAME in pod_path:
                rel = pod_path.split(FSX_BASE_DIRNAME, 1)[1].lstrip('/')
                candidates.append(FSX_BASE_DIR / rel)
        except Exception:
            pass
        # Return first existing candidate
        for c in candidates:
            if c and c.exists():
                return c
        return candidates[0] if candidates else None
    print('Skipping SFT inference (RUN_INFER_POST_SFT=False).')
else:
    # Resolve job name if we have it
    namespace = os.environ.get('HYPERPOD_NAMESPACE', '').strip() or 'hyperpod-ns-datascientist1'
    os.environ['HYPERPOD_NAMESPACE'] = namespace

    sft_job = os.environ.get('SFT_JOB_NAME', '').strip()
    sft_prefix = os.environ.get('SFT_RUN_NAME', '').strip()
    if not sft_job and sft_prefix:
        sft_job = _find_latest_hyp_job_by_prefix_with_hyp(sft_prefix, namespace)
    if sft_job:
        os.environ['SFT_JOB_NAME'] = sft_job

    # Check job completion if possible
    phase = None
    if sft_job:
        phase, _conds = get_hyp_job_status(sft_job, namespace)
    if phase and phase != 'Completed':
        print(f'SFT job not completed yet (phase={phase}). Skipping inference.')
    else:
        # Sense-check outputs on FSx
        out_dir = POD_SFT_TRAIN_DIR if 'POD_SFT_TRAIN_DIR' in globals() else os.environ.get('POD_SFT_TRAIN_DIR', '').strip()
        if not out_dir:
            print('Missing POD_SFT_TRAIN_DIR; cannot locate SFT outputs.')
        else:
            local_out_dir = _resolve_fsx_path(out_dir)
            if not local_out_dir or not local_out_dir.exists():
                print('Tried to resolve:', out_dir)
                print('FSX_BASE_DIR:', FSX_BASE_DIR if 'FSX_BASE_DIR' in globals() else '(unset)')
                print('POD_FSX_ROOT:', POD_FSX_ROOT if 'POD_FSX_ROOT' in globals() else '(unset)')
                print('SFT output path not found on FSx:', out_dir)
            else:
                # adapter artifacts check
                adapter_cfgs = list(local_out_dir.rglob('adapter_config.json'))
                adapter_weights = list(local_out_dir.rglob('adapter_model*.safetensors'))
                if not adapter_cfgs or not adapter_weights:
                    print('SFT outputs missing adapter artifacts. Found:')
                    print('  adapter_config.json:', len(adapter_cfgs))
                    print('  adapter_model*.safetensors:', len(adapter_weights))
                    print('Skipping inference until artifacts are present.')
                else:
                    infer_name = f"infer-sft-{RUN_ID}"
                    out_path = f"{POD_INFER_DIR}/sft-{RUN_ID}.json"
                    cmd = f"python {POD_INFER_DIR}/run_inference.py"
                    job = submit_simple_job(
                        name=infer_name,
                        image=LLMFT_IMAGE_CUSTOM,
                        command=cmd,
                        gpu='1',
                        labels={
                            'kueue.x-k8s.io/queue-name': KUEUE_QUEUE_NAME,
                            'kueue.x-k8s.io/priority-class': KUEUE_PRIORITY_CLASS_INFER,
                        } if HAS_TASK_GOV else None,
                        annotations={
                            f'kueue.x-k8s.io/podset-{KUEUE_TOPOLOGY_MODE_INFER}-topology': KUEUE_TOPOLOGY_LABEL_INFER,
                        } if HAS_TASK_GOV else None,
                        env=HF_ENV + [
                            {'name': 'BASE_MODEL', 'value': BASE_MODEL_ID},
                            {'name': 'TRAINING_DIR', 'value': out_dir},
                            {'name': 'OUTPUT_PATH', 'value': out_path},
                            {'name': 'PROMPTS_JSON', 'value': PROMPTS_JSON},
                            {'name': 'MAX_NEW_TOKENS', 'value': '128'},
                        ],
                    )
                    os.environ['INFER_SFT_JOB_NAME'] = infer_name
                    os.environ['INFER_SFT_OUTPUT_PATH'] = out_path
                    os.environ['HYP_JOB_NAME'] = infer_name
                    print('Submitted SFT inference job:', infer_name)
                    print('Output will be written to:', out_path)



SFT inference (guarded)


Tried to resolve: /fsx/fs-03b1953d09801303c/smus-nemo-smoke/section7/recipes/results/sft-20260104-192842-98ffab
FSX_BASE_DIR: /home/sagemaker-user/custom-file-systems/fsx_lustre/fs-03b1953d09801303c/smus-nemo-smoke
POD_FSX_ROOT: /fsx/fs-03b1953d09801303c/smus-nemo-smoke
SFT output path not found on FSx: /fsx/fs-03b1953d09801303c/smus-nemo-smoke/section7/recipes/results/sft-20260104-192842-98ffab


In [26]:
# --- Compare baseline vs SFT outputs (readable) ---
print_header('Compare baseline vs SFT outputs')

base_out = os.environ.get('INFER_BASE_OUTPUT_PATH', '').strip()
sft_out = os.environ.get('INFER_SFT_OUTPUT_PATH', '').strip()

if not base_out or not sft_out:
    print('Missing output paths. Run baseline + SFT inference first.')
else:
    def _local(pod_path):
        candidates = []
        try:
            candidates.append(FSX_SPACE_ROOT / Path(pod_path).relative_to(POD_FSX_ROOT))
        except Exception:
            pass
        try:
            if "FSX_BASE_DIRNAME" in globals() and FSX_BASE_DIRNAME and FSX_BASE_DIRNAME in pod_path:
                rel = pod_path.split(FSX_BASE_DIRNAME, 1)[1].lstrip("/")
                candidates.append(FSX_BASE_DIR / rel)
        except Exception:
            pass
        for c in candidates:
            if c and c.exists():
                return c
        return candidates[0] if candidates else None

    base_path = _local(base_out)
    sft_path = _local(sft_out)

    if not base_path.exists() or not sft_path.exists():
        print('Output file(s) not found:')
        print('  baseline:', base_path)
        print('  sft     :', sft_path)
    else:
        import json

        with open(base_path) as f:
            base_rows = json.load(f)
        with open(sft_path) as f:
            sft_rows = json.load(f)

        def _out(row):
            return row.get('output') or row.get('response') or row.get('text') or ''

        def _clean(text):
            if not text:
                return ''
            t = text.strip()
            low = t.lower()
            # Strip system/user preambles if they leaked into outputs
            if 'assistant' in low:
                idx = low.rfind('assistant')
                t = t[idx + len('assistant'):]
            elif 'user' in low:
                idx = low.rfind('user')
                t = t[idx + len('user'):]
            t = t.replace('Cutting Knowledge Date:', 'Knowledge Date:')
            return t.strip()

        sft_map = {r.get('prompt'): _out(r) for r in sft_rows}

        for i, r in enumerate(base_rows, 1):
            prompt = r.get('prompt', '')
            base_text = _clean(_out(r))
            sft_text = _clean(sft_map.get(prompt, ''))

            print(f"PROMPT {i}:{prompt}")
            print("BASELINE:" + (base_text or "(empty)"))
            print("SFT (LoRA):" + (sft_text or "(empty)"))
            print("-"*80)



Compare baseline vs SFT outputs
Missing output paths. Run baseline + SFT inference first.


# Section 8 — Function-Calling SFT (Qwen2.5-Coder-7B/14B-Instruct)

This section builds on Section 7 and adds a dedicated function-calling SFT stage **using NeMo 2.0**.

Goals:
- Prepare XLAM function-calling data (train/val/test)
- Prefetch the base model to FSx
- Full fine-tune Qwen2.5-Coder-7B/14B-Instruct with NeMo 2.0
- Produce splits for evaluation and error analysis

Set env vars to control behavior:
- FC_QWEN_SIZE (7B or 14B; case-insensitive)
- RUN_FC_PREP, RUN_FC_SFT, RUN_FC_PREFETCH
- RUN_FC_EVAL_BASELINE, RUN_FC_EVAL_SFT
- RUN_FC_NEMO2_PREP


In [36]:
# =============================================================================
# Section 8a: Shared config + toggles (Function-calling SFT)
# =============================================================================

print_header('Section 8 setup (Function-calling SFT)')

import math

# --- Section 8 toggles ---
RUN_FC_PREP = bool(str(os.environ.get('RUN_FC_PREP', 'true')).lower() in {'1','true','yes'})
RUN_FC_SFT = bool(str(os.environ.get('RUN_FC_SFT', 'true')).lower() in {'1','true','yes'})
RUN_FC_IMPORT = bool(str(os.environ.get('RUN_FC_IMPORT', 'true')).lower() in {'1','true','yes'})
RUN_FC_PREFETCH = bool(str(os.environ.get('RUN_FC_PREFETCH', 'true')).lower() in {'1','true','yes'})
RUN_FC_EVAL_BASELINE = bool(str(os.environ.get('RUN_FC_EVAL_BASELINE', 'true')).lower() in {'1','true','yes'})
RUN_FC_EVAL_SFT = bool(str(os.environ.get('RUN_FC_EVAL_SFT', 'true')).lower() in {'1','true','yes'})
RUN_FC_NEMO2_PREP = bool(str(os.environ.get('RUN_FC_NEMO2_PREP', 'true')).lower() in {'1','true','yes'})

# --- Dataset + split settings ---
FC_DATASET_ID = os.environ.get('FC_DATASET_ID', 'Salesforce/xlam-function-calling-60k').strip()
FC_SEED = int(os.environ.get('FC_SEED', '42'))

# Size controls (0 = use full split)
FC_TRAIN_SIZE = int(os.environ.get('FC_TRAIN_SIZE', '54000'))
FC_VAL_SIZE = int(os.environ.get('FC_VAL_SIZE', '3000'))
FC_TEST_SIZE = int(os.environ.get('FC_TEST_SIZE', '3000'))
FC_TOTAL_LIMIT = int(os.environ.get('FC_TOTAL_LIMIT', '0'))  # optional cap for quick tests

# --- Qwen size selector (7B/14B) ---
FC_QWEN_SIZE = os.environ.get('FC_QWEN_SIZE', '7B').strip()
if not FC_QWEN_SIZE:
    FC_QWEN_SIZE = '7B'
_fc_qwen_size_norm = FC_QWEN_SIZE.lower()
if _fc_qwen_size_norm in {'7', '7b'}:
    _fc_qwen_size_norm = '7b'
elif _fc_qwen_size_norm in {'14', '14b'}:
    _fc_qwen_size_norm = '14b'
else:
    raise ValueError("FC_QWEN_SIZE must be '7B' or '14B' (case-insensitive).")
FC_QWEN_SIZE = _fc_qwen_size_norm
FC_QWEN_SIZE_LABEL = FC_QWEN_SIZE.upper()

# --- Model + recipe ---
FC_DEFAULT_MODEL_ID = f"Qwen/Qwen2.5-Coder-{FC_QWEN_SIZE_LABEL}-Instruct"
FC_BASE_MODEL_ID = os.environ.get('FC_BASE_MODEL_ID', FC_DEFAULT_MODEL_ID).strip()
FC_TOKENIZER_ID = os.environ.get('FC_TOKENIZER_ID', FC_BASE_MODEL_ID).strip()
FC_RECIPE_SFT_ID = os.environ.get(
    'FC_RECIPE_SFT_ID',
    f'fine-tuning/qwen/nemo2_qwen2_5_{FC_QWEN_SIZE}_instruct_seq4k_gpu_sft_fft'
).strip()

FC_RECIPE_IMPORT_ID = os.environ.get(
    'FC_RECIPE_IMPORT_ID',
    f'fine-tuning/qwen/nemo2_qwen2_5_{FC_QWEN_SIZE}_instruct_import'
).strip()

# --- Prefetch settings ---
FC_PREFETCH_MODEL_ID = os.environ.get('FC_PREFETCH_MODEL_ID', FC_BASE_MODEL_ID).strip()
FC_PREFETCH_REVISION = os.environ.get('FC_PREFETCH_REVISION', '').strip() or None


# --- Training hyperparams (defaults tuned for longer context) ---
FC_MAX_LEN = int(os.environ.get('FC_MAX_LEN', '8192'))
FC_LR = float(os.environ.get('FC_LR', '1e-5'))
FC_MAX_EPOCHS = int(os.environ.get('FC_MAX_EPOCHS', '3'))
FC_WARMUP_RATIO = float(os.environ.get('FC_WARMUP_RATIO', '0.03'))
FC_EVAL_STEPS = int(os.environ.get('FC_EVAL_STEPS', '200'))
FC_SAVE_STEPS = int(os.environ.get('FC_SAVE_STEPS', '200'))

# Ensure eval steps are on for deliverables
if FC_EVAL_STEPS <= 0:
    FC_EVAL_STEPS = 200
    print('FC_EVAL_STEPS was <=0; forcing to 200 for validation loss.')

# --- Scale ---
FC_TRAIN_NUM_NODES = int(os.environ.get('FC_TRAIN_NUM_NODES', '2'))
FC_NUM_NODES =int(os.environ.get('FC_NUM_NODES', '4'))
FC_GPUS_PER_NODE = int(os.environ.get('FC_GPUS_PER_NODE', str(GPUS_PER_NODE)))
FC_MICRO_BS = int(os.environ.get('FC_MICRO_BS', '1'))
FC_TRAIN_BS = int(os.environ.get('FC_TRAIN_BS', str(FC_MICRO_BS * FC_TRAIN_NUM_NODES * FC_GPUS_PER_NODE * 2)))

# --- NeMo 2.0 SFT hyperparams ---
FC_NEMO2_SEQ_LEN = int(os.environ.get('FC_NEMO2_SEQ_LEN', str(FC_MAX_LEN)))
FC_NEMO2_MICRO_BS = int(os.environ.get('FC_NEMO2_MICRO_BS', str(FC_MICRO_BS)))
FC_NEMO2_GLOBAL_BS = int(os.environ.get('FC_NEMO2_GLOBAL_BS', str(FC_TRAIN_BS)))
_steps_per_epoch = math.ceil(FC_TRAIN_SIZE / max(FC_NEMO2_GLOBAL_BS, 1)) if FC_TRAIN_SIZE > 0 else 0
FC_NEMO2_MAX_STEPS = int(os.environ.get('FC_NEMO2_MAX_STEPS', str(_steps_per_epoch * FC_MAX_EPOCHS if _steps_per_epoch else 0)))
FC_NEMO2_WARMUP_STEPS = int(os.environ.get('FC_NEMO2_WARMUP_STEPS', str(max(1, int(FC_NEMO2_MAX_STEPS * FC_WARMUP_RATIO)) if FC_NEMO2_MAX_STEPS else 0)))
FC_NEMO2_LR = float(os.environ.get('FC_NEMO2_LR', str(FC_LR)))
FC_NEMO2_VAL_CHECK_INTERVAL = float(os.environ.get('FC_NEMO2_VAL_CHECK_INTERVAL', '1.0'))
FC_NEMO2_LOG_EVERY_N_STEPS = int(os.environ.get('FC_NEMO2_LOG_EVERY_N_STEPS', '1'))
FC_NEMO2_NUM_WORKERS = int(os.environ.get('FC_NEMO2_NUM_WORKERS', '2'))
FC_NEMO2_PEFT = os.environ.get('FC_NEMO2_PEFT', 'none').strip()
FC_NEMO2_SEED = int(os.environ.get('FC_NEMO2_SEED', str(FC_SEED)))
FC_NEMO2_RECOMPUTE_GRANULARITY = os.environ.get('FC_NEMO2_RECOMPUTE_GRANULARITY', 'full').strip()
FC_NEMO2_RECOMPUTE_METHOD = os.environ.get('FC_NEMO2_RECOMPUTE_METHOD', 'block').strip()
if not FC_NEMO2_RECOMPUTE_METHOD:
    FC_NEMO2_RECOMPUTE_METHOD = 'none'
FC_NEMO2_RECOMPUTE_NUM_LAYERS = int(os.environ.get('FC_NEMO2_RECOMPUTE_NUM_LAYERS', '1'))
FC_NEMO2_GRAD_ACCUM_FUSION = str(os.environ.get('FC_NEMO2_GRAD_ACCUM_FUSION', 'false')).strip().lower()
FC_NEMO2_GRAD_ACCUM_FUSION = FC_NEMO2_GRAD_ACCUM_FUSION in {'1','true','yes','on'}

# --- Elastic settings ---
FC_ELASTIC = bool(str(os.environ.get('FC_ELASTIC', 'true')).lower() in {'1','true','yes'})
FC_ELASTIC_MIN = int(os.environ.get('FC_ELASTIC_MIN', '1'))
FC_ELASTIC_MAX = int(os.environ.get('FC_ELASTIC_MAX', str(FC_NUM_NODES)))
if FC_ELASTIC_MAX < FC_ELASTIC_MIN:
    FC_ELASTIC_MAX = FC_ELASTIC_MIN
    print('FC_ELASTIC_MAX < FC_ELASTIC_MIN; forcing max = min')
FC_ELASTIC_REPLICA_INCREMENT_STEP = int(os.environ.get('FC_ELASTIC_REPLICA_INCREMENT_STEP', '1'))
if FC_ELASTIC and FC_TRAIN_NUM_NODES < FC_ELASTIC_MIN:
    FC_TRAIN_NUM_NODES = FC_ELASTIC_MIN
    print('FC_TRAIN_NUM_NODES < FC_ELASTIC_MIN; forcing nodes = min')
if FC_ELASTIC and FC_TRAIN_NUM_NODES > FC_ELASTIC_MAX:
    FC_TRAIN_NUM_NODES = FC_ELASTIC_MAX
    print('FC_TRAIN_NUM_NODES > FC_ELASTIC_MAX; forcing nodes = max')

# --- Directories (Space + Pod) ---
SECTION8_DIR = FSX_BASE_DIR / 'section8_fc'
FC_DATA_DIR = SECTION8_DIR / 'data'
FC_RESULTS_DIR = SECTION8_DIR / 'results'
FC_EVAL_DIR = SECTION8_DIR / 'eval'

FC_DATA_TRAIN_DIR = FC_DATA_DIR / 'train'
FC_DATA_VAL_DIR = FC_DATA_DIR / 'val'
FC_DATA_TEST_DIR = FC_DATA_DIR / 'test'
FC_NEMO2_DATA_DIR = SECTION8_DIR / 'data_nemo2'

FC_NEMO2_IMPORT_DIR = SECTION8_DIR / 'imports'
FC_NEMO2_IMPORT_OUTPUT = FC_NEMO2_IMPORT_DIR / f'qwen25_{FC_QWEN_SIZE}_instruct.nemo'

for d in [SECTION8_DIR, FC_DATA_DIR, FC_RESULTS_DIR, FC_EVAL_DIR, FC_DATA_TRAIN_DIR, FC_DATA_VAL_DIR, FC_DATA_TEST_DIR, FC_NEMO2_DATA_DIR, FC_NEMO2_IMPORT_DIR]:
    ensure_dir(d)

POD_FC_DATA_TRAIN_DIR = f"{POD_FSX_ROOT}/section8_fc/data/train"
POD_FC_DATA_VAL_DIR = f"{POD_FSX_ROOT}/section8_fc/data/val"
POD_FC_DATA_TEST_DIR = f"{POD_FSX_ROOT}/section8_fc/data/test"
POD_FC_NEMO2_DATA_DIR = f"{POD_FSX_ROOT}/section8_fc/data_nemo2"
POD_FC_RESULTS_DIR = f"{POD_FSX_ROOT}/section8_fc/results"

POD_FC_NEMO2_IMPORT_OUTPUT = f"{POD_FSX_ROOT}/section8_fc/imports/qwen25_{FC_QWEN_SIZE}_instruct.nemo"

FC_RUN_NAME = f"fc-sft-{RUN_ID}"
FC_IMPORT_RUN_NAME = f"fc-import-{RUN_ID}"
FC_NEMO2_RUN_NAME = FC_RUN_NAME
POD_FC_TRAIN_DIR = f"{POD_FC_RESULTS_DIR}/{FC_RUN_NAME}"
POD_FC_EVAL_DIR = f"{POD_FSX_ROOT}/section8_fc/eval"

FC_MODEL_SAVE_NAME = os.environ.get('FC_MODEL_SAVE_NAME', f'Qwen2.5-Coder-{FC_QWEN_SIZE_LABEL}-Instruct-SFT').strip()

print('Section 8 toggles:')
print({
    'RUN_FC_PREP': RUN_FC_PREP,
    'RUN_FC_SFT': RUN_FC_SFT,
    'RUN_FC_IMPORT': RUN_FC_IMPORT,
    'RUN_FC_EVAL_BASELINE': RUN_FC_EVAL_BASELINE,
    'RUN_FC_EVAL_SFT': RUN_FC_EVAL_SFT,
    'RUN_FC_NEMO2_PREP': RUN_FC_NEMO2_PREP,
})
print('Dataset:')
print({
    'FC_DATASET_ID': FC_DATASET_ID,
    'FC_TRAIN_SIZE': FC_TRAIN_SIZE,
    'FC_VAL_SIZE': FC_VAL_SIZE,
    'FC_TEST_SIZE': FC_TEST_SIZE,
    'FC_TOTAL_LIMIT': FC_TOTAL_LIMIT,
})
print('Model/recipe:')
print({
    'FC_QWEN_SIZE': FC_QWEN_SIZE_LABEL,
    'FC_BASE_MODEL_ID': FC_BASE_MODEL_ID,
    'FC_TOKENIZER_ID': FC_TOKENIZER_ID,
    'FC_RECIPE_SFT_ID': FC_RECIPE_SFT_ID,
    'FC_RECIPE_IMPORT_ID': FC_RECIPE_IMPORT_ID,
})
print('Training:')
print({
    'FC_TRAIN_NUM_NODES': FC_TRAIN_NUM_NODES,
    'FC_GPUS_PER_NODE': FC_GPUS_PER_NODE,
    'FC_MICRO_BS': FC_MICRO_BS,
    'FC_TRAIN_BS': FC_TRAIN_BS,
    'FC_LR': FC_LR,
    'FC_MAX_EPOCHS': FC_MAX_EPOCHS,
    'FC_MAX_LEN': FC_MAX_LEN,
    'FC_WARMUP_RATIO': FC_WARMUP_RATIO,
})
print('Elastic (NeMo 2.0 SFT):')
print({
    'FC_ELASTIC': FC_ELASTIC,
    'FC_ELASTIC_MIN': FC_ELASTIC_MIN,
    'FC_ELASTIC_MAX': FC_ELASTIC_MAX,
    'FC_ELASTIC_REPLICA_INCREMENT_STEP': FC_ELASTIC_REPLICA_INCREMENT_STEP,
})
print('NeMo 2.0 SFT:')
print({
    'FC_NEMO2_SEQ_LEN': FC_NEMO2_SEQ_LEN,
    'FC_NEMO2_MICRO_BS': FC_NEMO2_MICRO_BS,
    'FC_NEMO2_GLOBAL_BS': FC_NEMO2_GLOBAL_BS,
    'FC_NEMO2_MAX_STEPS': FC_NEMO2_MAX_STEPS,
    'FC_NEMO2_LR': FC_NEMO2_LR,
    'FC_NEMO2_WARMUP_STEPS': FC_NEMO2_WARMUP_STEPS,
    'FC_NEMO2_VAL_CHECK_INTERVAL': FC_NEMO2_VAL_CHECK_INTERVAL,
    'FC_NEMO2_LOG_EVERY_N_STEPS': FC_NEMO2_LOG_EVERY_N_STEPS,
    'FC_NEMO2_NUM_WORKERS': FC_NEMO2_NUM_WORKERS,
    'FC_NEMO2_PEFT': FC_NEMO2_PEFT,
    'FC_NEMO2_RECOMPUTE_GRANULARITY': FC_NEMO2_RECOMPUTE_GRANULARITY,
    'FC_NEMO2_RECOMPUTE_METHOD': FC_NEMO2_RECOMPUTE_METHOD,
    'FC_NEMO2_RECOMPUTE_NUM_LAYERS': FC_NEMO2_RECOMPUTE_NUM_LAYERS,
    'FC_NEMO2_GRAD_ACCUM_FUSION': FC_NEMO2_GRAD_ACCUM_FUSION,
})
print('Paths:')
print({
    'FC_DATA_TRAIN_DIR': str(FC_DATA_TRAIN_DIR),
    'FC_DATA_VAL_DIR': str(FC_DATA_VAL_DIR),
    'FC_DATA_TEST_DIR': str(FC_DATA_TEST_DIR),
    'FC_NEMO2_DATA_DIR': str(FC_NEMO2_DATA_DIR),
    'FC_NEMO2_IMPORT_OUTPUT': str(FC_NEMO2_IMPORT_OUTPUT),
    'POD_FC_DATA_TRAIN_DIR': POD_FC_DATA_TRAIN_DIR,
    'POD_FC_NEMO2_IMPORT_OUTPUT': POD_FC_NEMO2_IMPORT_OUTPUT,
    'POD_FC_RESULTS_DIR': POD_FC_RESULTS_DIR,
})




Section 8 setup (Function-calling SFT)
Section 8 toggles:
{'RUN_FC_PREP': True, 'RUN_FC_SFT': True, 'RUN_FC_IMPORT': True, 'RUN_FC_EVAL_BASELINE': True, 'RUN_FC_EVAL_SFT': True, 'RUN_FC_NEMO2_PREP': True}
Dataset:
{'FC_DATASET_ID': 'Salesforce/xlam-function-calling-60k', 'FC_TRAIN_SIZE': 54000, 'FC_VAL_SIZE': 3000, 'FC_TEST_SIZE': 3000, 'FC_TOTAL_LIMIT': 0}
Model/recipe:
{'FC_QWEN_SIZE': '7B', 'FC_BASE_MODEL_ID': 'Qwen/Qwen2.5-Coder-7B-Instruct', 'FC_TOKENIZER_ID': 'Qwen/Qwen2.5-Coder-7B-Instruct', 'FC_RECIPE_SFT_ID': 'fine-tuning/qwen/nemo2_qwen2_5_7b_instruct_seq4k_gpu_sft_fft', 'FC_RECIPE_IMPORT_ID': 'fine-tuning/qwen/nemo2_qwen2_5_7b_instruct_import'}
Training:
{'FC_TRAIN_NUM_NODES': 2, 'FC_GPUS_PER_NODE': 8, 'FC_MICRO_BS': 1, 'FC_TRAIN_BS': 32, 'FC_LR': 1e-05, 'FC_MAX_EPOCHS': 2, 'FC_MAX_LEN': 8192, 'FC_WARMUP_RATIO': 0.1}
Elastic (NeMo 2.0 SFT):
{'FC_ELASTIC': True, 'FC_ELASTIC_MIN': 1, 'FC_ELASTIC_MAX': 4, 'FC_ELASTIC_REPLICA_INCREMENT_STEP': 1}
NeMo 2.0 SFT:
{'FC_NEMO2_SEQ_LEN

In [28]:
# =============================================================================
# Section 8b: Prepare XLAM function-calling dataset (train/val/test)
# =============================================================================

if not RUN_FC_PREP:
    print('Skipping Section 8 data prep (RUN_FC_PREP=False).')
else:
    print_header('Section 8 data prep (XLAM function-calling)')

    from datasets import load_dataset
    from transformers import AutoTokenizer
    import json as _json

    HF_TOKEN = os.environ.get('HF_TOKEN') or None
    tokenizer = AutoTokenizer.from_pretrained(FC_TOKENIZER_ID, token=HF_TOKEN)
    if tokenizer.pad_token is None:
        tokenizer.pad_token = tokenizer.eos_token
    tokenizer.padding_side = 'right'

    def _apply_fc_chat_template(messages, add_generation_prompt):
        if hasattr(tokenizer, 'apply_chat_template'):
            return tokenizer.apply_chat_template(
                messages,
                tokenize=False,
                add_generation_prompt=add_generation_prompt,
            )
        parts = []
        for msg in messages:
            role = msg.get('role', 'user')
            prefix = 'User: ' if role == 'user' else 'Assistant: '
            parts.append(prefix + msg.get('content', ''))
        if add_generation_prompt:
            parts.append('Assistant: ')
        return '\n'.join(parts)

    def _tokenize(text, max_len=None, truncation_side='left'):
        tokenizer.truncation_side = truncation_side
        if max_len:
            return tokenizer(text, add_special_tokens=False, truncation=True, max_length=max_len).input_ids
        return tokenizer(text, add_special_tokens=False).input_ids

    def _format_example(example):
        tools = example.get('tools', '')
        query = example.get('query', '')
        answer = example.get('answers', '')

        system = (
            "You are a helpful assistant with access to the following functions. "
            "Use them if required.\n" + str(tools)
        )

        messages = [
            {'role': 'system', 'content': system},
            {'role': 'user', 'content': str(query)},
            {'role': 'assistant', 'content': str(answer)},
        ]

        prompt_text = _apply_fc_chat_template(messages[:-1], add_generation_prompt=True)
        full_text = _apply_fc_chat_template(messages, add_generation_prompt=False)

        prompt_ids = _tokenize(prompt_text, max_len=FC_MAX_LEN, truncation_side='left')
        full_ids = _tokenize(full_text, max_len=FC_MAX_LEN, truncation_side='left')

        labels = list(full_ids)
        prompt_len = min(len(prompt_ids), len(labels))
        for i in range(prompt_len):
            labels[i] = -100

        rec = {
            'input_ids': list(full_ids),
            'attention_mask': [1] * len(full_ids),
            'labels': labels,
        }
        raw = {
            'query': query,
            'tools': tools,
            'answers': answer,
        }
        return rec, raw

    # Load dataset and split
    ds = load_dataset(FC_DATASET_ID, split='train')
    ds = ds.shuffle(seed=FC_SEED)
    if FC_TOTAL_LIMIT and FC_TOTAL_LIMIT > 0:
        ds = ds.select(range(min(FC_TOTAL_LIMIT, len(ds))))

    splits = ds.train_test_split(test_size=0.1, seed=FC_SEED)
    train_set = splits['train']
    temp = splits['test'].train_test_split(test_size=0.5, seed=FC_SEED)
    val_set = temp['train']
    test_set = temp['test']

    # Optional size caps
    if FC_TRAIN_SIZE > 0:
        train_set = train_set.select(range(min(FC_TRAIN_SIZE, len(train_set))))
    if FC_VAL_SIZE > 0:
        val_set = val_set.select(range(min(FC_VAL_SIZE, len(val_set))))
    if FC_TEST_SIZE > 0:
        test_set = test_set.select(range(min(FC_TEST_SIZE, len(test_set))))

    print('Split sizes:', {'train': len(train_set), 'val': len(val_set), 'test': len(test_set)})

    # Write tokenized JSONL + raw JSONL
    def _write_split(ds_split, token_path, raw_path):
        count = 0
        with open(token_path, 'w') as f_tok, open(raw_path, 'w') as f_raw:
            for ex in ds_split:
                rec, raw = _format_example(ex)
                f_tok.write(_json.dumps(rec) + '\n')
                f_raw.write(_json.dumps(raw) + '\n')
                count += 1
        return count

    train_tok = FC_DATA_TRAIN_DIR / 'train.json'
    val_tok = FC_DATA_VAL_DIR / 'val.json'
    test_tok = FC_DATA_TEST_DIR / 'test.json'  # optional; not used by trainer

    train_raw = FC_DATA_TRAIN_DIR / 'train_raw.jsonl'
    val_raw = FC_DATA_VAL_DIR / 'val_raw.jsonl'
    test_raw = FC_DATA_TEST_DIR / 'test_raw.jsonl'

    n_train = _write_split(train_set, train_tok, train_raw)
    n_val = _write_split(val_set, val_tok, val_raw)
    n_test = _write_split(test_set, test_tok, test_raw)

    print('Wrote:')
    print('  train:', train_tok, 'rows:', n_train)
    print('  val  :', val_tok, 'rows:', n_val)
    print('  test :', test_raw, 'rows:', n_test)
    print('Raw splits:', train_raw, val_raw, test_raw)

    # Quick sanity sample
    try:
        with open(train_raw, 'r') as f:
            sample = _json.loads(next(f))
        print('Sample raw:', sample)
    except Exception:
        pass



Section 8 data prep (XLAM function-calling)


Split sizes: {'train': 54000, 'val': 3000, 'test': 3000}
Wrote:
  train: /home/sagemaker-user/custom-file-systems/fsx_lustre/fs-03b1953d09801303c/smus-nemo-smoke/section8_fc/data/train/train.json rows: 54000
  val  : /home/sagemaker-user/custom-file-systems/fsx_lustre/fs-03b1953d09801303c/smus-nemo-smoke/section8_fc/data/val/val.json rows: 3000
  test : /home/sagemaker-user/custom-file-systems/fsx_lustre/fs-03b1953d09801303c/smus-nemo-smoke/section8_fc/data/test/test_raw.jsonl rows: 3000
Raw splits: /home/sagemaker-user/custom-file-systems/fsx_lustre/fs-03b1953d09801303c/smus-nemo-smoke/section8_fc/data/train/train_raw.jsonl /home/sagemaker-user/custom-file-systems/fsx_lustre/fs-03b1953d09801303c/smus-nemo-smoke/section8_fc/data/val/val_raw.jsonl /home/sagemaker-user/custom-file-systems/fsx_lustre/fs-03b1953d09801303c/smus-nemo-smoke/section8_fc/data/test/test_raw.jsonl
Sample raw: {'query': "What is 'December 31, 2022' in the 'day-month-year' format?", 'tools': '[{"name": "format_date

In [29]:
# =============================================================================
# Section 8b.2: Prepare NeMo 2.0 JSONL splits (input/output)
# =============================================================================

if not RUN_FC_NEMO2_PREP:
    print('Skipping NeMo 2.0 data prep (RUN_FC_NEMO2_PREP=False).')
else:
    print_header('Section 8 NeMo 2.0 data prep (input/output jsonl)')

    from transformers import AutoTokenizer
    import json as _json

    HF_TOKEN = os.environ.get('HF_TOKEN') or None
    tokenizer = AutoTokenizer.from_pretrained(FC_TOKENIZER_ID, token=HF_TOKEN)
    if tokenizer.pad_token is None:
        tokenizer.pad_token = tokenizer.eos_token
    tokenizer.padding_side = 'right'

    raw_train = FC_DATA_TRAIN_DIR / 'train_raw.jsonl'
    raw_val = FC_DATA_VAL_DIR / 'val_raw.jsonl'
    raw_test = FC_DATA_TEST_DIR / 'test_raw.jsonl'
    if not raw_train.exists() or not raw_val.exists() or not raw_test.exists():
        raise FileNotFoundError('Raw splits not found. Run Section 8b (RUN_FC_PREP=True) first.')

    def _apply_fc_chat_template(messages, add_generation_prompt):
        if hasattr(tokenizer, 'apply_chat_template'):
            return tokenizer.apply_chat_template(
                messages,
                tokenize=False,
                add_generation_prompt=add_generation_prompt,
            )
        parts = []
        for msg in messages:
            role = msg.get('role', 'user')
            prefix = 'User: ' if role == 'user' else 'Assistant: '
            parts.append(prefix + msg.get('content', ''))
        if add_generation_prompt:
            parts.append('Assistant: ')
        return '\n'.join(parts)

    def _format_nemo(raw):
        tools = raw.get('tools', '')
        query = raw.get('query', '')
        answer = raw.get('answers', '')

        system = (
            "You are a helpful assistant with access to the following functions. "
            "Use them if required.\n" + str(tools)
        )

        messages = [
            {'role': 'system', 'content': system},
            {'role': 'user', 'content': str(query)},
            {'role': 'assistant', 'content': str(answer)},
        ]

        prompt_text = _apply_fc_chat_template(messages[:-1], add_generation_prompt=True)
        return {
            'input': prompt_text,
            'output': str(answer),
            'query': query,
            'tools': tools,
        }

    def _write_nemo(raw_path, out_path):
        count = 0
        with open(raw_path, 'r') as f_in, open(out_path, 'w') as f_out:
            for line in f_in:
                rec = _format_nemo(_json.loads(line))
                f_out.write(_json.dumps(rec) + '\n')
                count += 1
        return count

    out_train = FC_NEMO2_DATA_DIR / 'training.jsonl'
    out_val = FC_NEMO2_DATA_DIR / 'validation.jsonl'
    out_test = FC_NEMO2_DATA_DIR / 'test.jsonl'

    n_train = _write_nemo(raw_train, out_train)
    n_val = _write_nemo(raw_val, out_val)
    n_test = _write_nemo(raw_test, out_test)

    print('Wrote NeMo 2.0 JSONL:')
    print('  train:', out_train, 'rows:', n_train)
    print('  val  :', out_val, 'rows:', n_val)
    print('  test :', out_test, 'rows:', n_test)



Section 8 NeMo 2.0 data prep (input/output jsonl)


Wrote NeMo 2.0 JSONL:
  train: /home/sagemaker-user/custom-file-systems/fsx_lustre/fs-03b1953d09801303c/smus-nemo-smoke/section8_fc/data_nemo2/training.jsonl rows: 54000
  val  : /home/sagemaker-user/custom-file-systems/fsx_lustre/fs-03b1953d09801303c/smus-nemo-smoke/section8_fc/data_nemo2/validation.jsonl rows: 3000
  test : /home/sagemaker-user/custom-file-systems/fsx_lustre/fs-03b1953d09801303c/smus-nemo-smoke/section8_fc/data_nemo2/test.jsonl rows: 3000


In [37]:
# =============================================================================
# Section 8b.1: Prefetch Qwen2.5-Coder model to FSx cache
# =============================================================================

if not RUN_FC_PREFETCH:
    print('Skipping model prefetch (RUN_FC_PREFETCH=False).')
else:
    print_header('Prefetch Qwen2.5-Coder base model to FSx cache')

    try:
        from huggingface_hub import snapshot_download
    except Exception:
        run('python -m pip install -q huggingface_hub')
        from huggingface_hub import snapshot_download

    model_id = FC_PREFETCH_MODEL_ID
    revision = FC_PREFETCH_REVISION

    cache_dir = os.environ.get('HF_HUB_CACHE', '') or str(HF_HUB_CACHE_DIR)
    print('Prefetching model:', model_id)
    print('Cache dir:', cache_dir)

    try:
        snapshot_download(
            repo_id=model_id,
            revision=revision,
            cache_dir=cache_dir,
            token=os.environ.get('HF_TOKEN') or None,
            local_dir_use_symlinks=False,
        )
        print('Prefetch complete.')
    except Exception as e:
        print('WARNING: Prefetch failed:', e)



Prefetch Qwen2.5-Coder base model to FSx cache
Prefetching model: Qwen/Qwen2.5-Coder-7B-Instruct
Cache dir: /home/sagemaker-user/custom-file-systems/fsx_lustre/fs-03b1953d09801303c/smus-nemo-smoke/hf-home/hub


Fetching 14 files:   0%|          | 0/14 [00:00<?, ?it/s]

Prefetch complete.


In [38]:
# =============================================================================
# Section 8c: Import HF -> NeMo + Submit SFT (NeMo 2.0, Qwen2.5-Coder-7B/14B-Instruct)
# =============================================================================
SFT_PRE_SCRIPT = [
        f'JOB_DATA_DIR="{POD_FC_TRAIN_DIR}/data"',
        'mkdir -p "$JOB_DATA_DIR"',
        'ln -sfn "$JOB_DATA_DIR" /data'
    ]
SFT_PRE_SCRIPT_ARG = "+recipes.pre_script=" + shlex.quote(json.dumps(SFT_PRE_SCRIPT))
if not RUN_FC_IMPORT:
    print('Skipping HF -> NeMo import (RUN_FC_IMPORT=False).')
else:
    print_header(f'Submit NeMo 2.0 HF -> NeMo import (Qwen2.5-Coder-{FC_QWEN_SIZE_LABEL}-Instruct)')

    # Import output (shared across runs)
    os.environ['FC_NEMO2_HF_MODEL_ID'] = FC_BASE_MODEL_ID
    os.environ['FC_NEMO2_SEQ_LEN'] = str(FC_NEMO2_SEQ_LEN)
    os.environ['FC_NEMO2_IMPORT_OUTPUT'] = str(POD_FC_NEMO2_IMPORT_OUTPUT)

    import_output = Path(FC_NEMO2_IMPORT_OUTPUT).expanduser().resolve()
    if import_output.exists():
        print(f'Import artifact already exists: {import_output} (skipping)')
    else:
        cmd = (
            f"{VENV_PY} -m launcher recipes={FC_RECIPE_IMPORT_ID} "
            f"base_results_dir={POD_FC_RESULTS_DIR} "
            f"container={NEMO_IMAGE} "
            f"cluster=k8s cluster_type=k8s instance_type={INSTANCE_TYPE} "
            f"recipes.run.name={FC_IMPORT_RUN_NAME} "
            f"+env_vars.HF_TOKEN={os.environ.get('HF_TOKEN','')} "
            f"+env_vars.HF_HOME={POD_HF_HOME} "
            f"+env_vars.HF_HUB_CACHE={POD_HF_HUB_CACHE} "
            f"+env_vars.HF_DATASETS_CACHE={POD_HF_DATASETS_CACHE} "
            f"+env_vars.HF_ASSETS_CACHE={POD_HF_ASSETS_CACHE} "
            f"+env_vars.TOKENIZERS_PARALLELISM=false "
            f"{SFT_PRE_SCRIPT_ARG}"
        )
        print('Recipe:', FC_RECIPE_IMPORT_ID)
        print('Command:', rdct_sens(cmd))
        run(cmd)

        # Wait for the imported checkpoint to appear on FSx
        timeout_s = int(os.environ.get('FC_IMPORT_TIMEOUT_S', '7200'))
        poll_s = int(os.environ.get('FC_IMPORT_POLL_S', '30'))
        print(f'Waiting for import artifact (timeout={timeout_s}s): {import_output}')
        start = time.time()
        while not import_output.exists():
            if time.time() - start > timeout_s:
                raise TimeoutError(f'Timed out waiting for import artifact: {import_output}')
            time.sleep(poll_s)
        print(f'Import completed: {import_output}')

if not RUN_FC_SFT:
    print('Skipping Section 8 SFT (RUN_FC_SFT=False).')
else:
    print_header(f'Submit NeMo 2.0 SFT (Qwen2.5-Coder-{FC_QWEN_SIZE_LABEL}-Instruct)')

    

    # Populate env vars used by the NeMo 2.0 recipe
    os.environ['FC_NEMO2_DATASET_ROOT'] = str(POD_FC_NEMO2_DATA_DIR)
    os.environ['FC_NEMO2_HF_MODEL_ID'] = FC_BASE_MODEL_ID
    os.environ['FC_NEMO2_IMPORT_OUTPUT'] = str(POD_FC_NEMO2_IMPORT_OUTPUT)
    os.environ['FC_NEMO2_SEQ_LEN'] = str(FC_NEMO2_SEQ_LEN)
    os.environ['FC_NEMO2_MICRO_BS'] = str(FC_NEMO2_MICRO_BS)
    os.environ['FC_NEMO2_GLOBAL_BS'] = str(FC_NEMO2_GLOBAL_BS)
    os.environ['FC_NEMO2_MAX_STEPS'] = str(FC_NEMO2_MAX_STEPS)
    os.environ['FC_NEMO2_LR'] = str(FC_NEMO2_LR)
    os.environ['FC_NEMO2_WARMUP_STEPS'] = str(FC_NEMO2_WARMUP_STEPS)
    os.environ['FC_NEMO2_VAL_CHECK_INTERVAL'] = str(FC_NEMO2_VAL_CHECK_INTERVAL)
    os.environ['FC_NEMO2_LOG_EVERY_N_STEPS'] = str(FC_NEMO2_LOG_EVERY_N_STEPS)
    os.environ['FC_NEMO2_NUM_WORKERS'] = str(FC_NEMO2_NUM_WORKERS)
    os.environ['FC_NEMO2_PEFT'] = FC_NEMO2_PEFT
    os.environ['FC_NEMO2_SEED'] = str(FC_NEMO2_SEED)
    os.environ['FC_NEMO2_RECOMPUTE_GRANULARITY'] = str(FC_NEMO2_RECOMPUTE_GRANULARITY)
    os.environ['FC_NEMO2_RECOMPUTE_METHOD'] = str(FC_NEMO2_RECOMPUTE_METHOD)
    os.environ['FC_NEMO2_RECOMPUTE_NUM_LAYERS'] = str(FC_NEMO2_RECOMPUTE_NUM_LAYERS)
    os.environ['FC_NEMO2_GRAD_ACCUM_FUSION'] = 'true' if FC_NEMO2_GRAD_ACCUM_FUSION else 'false'

    recompute_env_args = ''
    if FC_NEMO2_RECOMPUTE_GRANULARITY:
        recompute_env_args += f"+env_vars.FC_NEMO2_RECOMPUTE_GRANULARITY={FC_NEMO2_RECOMPUTE_GRANULARITY} "
    if FC_NEMO2_RECOMPUTE_METHOD and FC_NEMO2_RECOMPUTE_METHOD.lower() != 'none':
        recompute_env_args += f"+env_vars.FC_NEMO2_RECOMPUTE_METHOD={FC_NEMO2_RECOMPUTE_METHOD} "
    if FC_NEMO2_RECOMPUTE_NUM_LAYERS > 0:
        recompute_env_args += f"+env_vars.FC_NEMO2_RECOMPUTE_NUM_LAYERS={FC_NEMO2_RECOMPUTE_NUM_LAYERS} "

    elastic_args = f"recipes.elastic_policy.is_elastic={str(FC_ELASTIC).lower()} "
    if FC_ELASTIC:
        elastic_args += f"recipes.elastic_policy.min_nodes={FC_ELASTIC_MIN} "
        elastic_args += f"recipes.elastic_policy.max_nodes={FC_ELASTIC_MAX} "
        elastic_args += f"recipes.elastic_policy.replica_increment_step={FC_ELASTIC_REPLICA_INCREMENT_STEP} "
    # Guard: import artifact must exist when skip-import is enabled in the recipe
    if not Path(FC_NEMO2_IMPORT_OUTPUT).exists():
        raise FileNotFoundError(
            f"Missing HF -> NeMo import artifact: {FC_NEMO2_IMPORT_OUTPUT}. "
            "Run the import step first or set RUN_FC_IMPORT=true."
        )

    cmd = (
        f"{VENV_PY} -m launcher recipes={FC_RECIPE_SFT_ID} "
        f"base_results_dir={POD_FC_RESULTS_DIR} "
        f"container={NEMO_IMAGE} "
        f"cluster=k8s cluster_type=k8s instance_type={INSTANCE_TYPE} "
        f"recipes.run.name={FC_RUN_NAME} "
        f"recipes.run.nodes={FC_TRAIN_NUM_NODES} recipes.trainer.num_nodes={FC_TRAIN_NUM_NODES} recipes.run.ntasks_per_node={FC_GPUS_PER_NODE} "
        f"+env_vars.HF_TOKEN={os.environ.get('HF_TOKEN','')} "
        f"+env_vars.HF_HOME={POD_HF_HOME} "
        f"+env_vars.HF_HUB_CACHE={POD_HF_HUB_CACHE} "
        f"+env_vars.HF_DATASETS_CACHE={POD_HF_DATASETS_CACHE} "
        f"+env_vars.HF_ASSETS_CACHE={POD_HF_ASSETS_CACHE} "
        f"+env_vars.TOKENIZERS_PARALLELISM=false "
        f"env_vars.NCCL_DEBUG=INFO "
        rf"+env_vars.NCCL_DEBUG_SUBSYS=INIT\\,NET\\,ENV\\,COLL "
        f"+env_vars.TORCH_NCCL_ASYNC_ERROR_HANDLING=1 "
        f"+env_vars.TORCH_DISABLE_ADDR2LINE=1 "
        f"+env_vars.CUDA_LAUNCH_BLOCKING=1 "
        f"+env_vars.TORCH_CPP_LOG_LEVEL=INFO "
        f"+env_vars.NCCL_DEBUG_FILE=/data/nccl_%h_%p.log "
        f"+env_vars.TORCH_DISTRIBUTED_DEBUG=INFO "
        f"+env_vars.TORCH_SHOW_CPP_STACKTRACES=1 "
        f"{recompute_env_args}"
        f"{elastic_args}"
        f"{SFT_PRE_SCRIPT_ARG}"
    )
    print('Recipe:', FC_RECIPE_SFT_ID)
    print('Command:', rdct_sens(cmd))
    run(cmd)

    # Reuse existing SFT env vars for downstream eval
    os.environ['FC_SFT_RUN_NAME'] = FC_RUN_NAME
    os.environ['FC_SFT_JOB_NAME'] = ''
    os.environ['FC_SFT_TRAIN_DIR'] = f"{POD_FC_RESULTS_DIR}/{FC_RUN_NAME}"



Submit NeMo 2.0 HF -> NeMo import (Qwen2.5-Coder-7B-Instruct)
Import artifact already exists: /mnt/custom-file-systems/fsx_lustre/fs-03b1953d09801303c/smus-nemo-smoke/section8_fc/imports/qwen25_7b_instruct.nemo (skipping)

Submit NeMo 2.0 SFT (Qwen2.5-Coder-7B-Instruct)
Recipe: fine-tuning/qwen/nemo2_qwen2_5_7b_instruct_seq4k_gpu_sft_fft
Command: /home/sagemaker-user/bobber/notebooks/.venv/bin/python -m launcher recipes=fine-tuning/qwen/nemo2_qwen2_5_7b_instruct_seq4k_gpu_sft_fft base_results_dir=/fsx/fs-03b1953d09801303c/smus-nemo-smoke/section8_fc/results container=<ACCOUNT_ID>.dkr.ecr.us-east-1.amazonaws.com/nemo-framework-hyperpod:25.04-eks cluster=k8s cluster_type=k8s instance_type=p4d.24xlarge recipes.run.name=fc-sft-20260105-000949-d8a722 recipes.run.nodes=2 recipes.trainer.num_nodes=2 recipes.run.ntasks_per_node=8 +env_vars.HF_TOKEN=<HF_TOKEN> +env_vars.HF_HOME=/fsx/fs-03b1953d09801303c/smus-nemo-smoke/hf-home +env_vars.HF_HUB_CACHE=/fsx/fs-03b1953d09801303c/smus-nemo-smoke/hf

In [41]:
# --- Section 8 SFT job status (async) ---
print_header('Section 8 SFT status (async)')

job_name = os.environ.get('FC_SFT_JOB_NAME', '').strip()
job_prefix = os.environ.get('FC_SFT_RUN_NAME', '').strip()
namespace = os.environ.get('HYPERPOD_NAMESPACE', '').strip() or 'hyperpod-ns-datascientist1'

if not job_prefix and job_name:
    job_prefix = job_name
if not job_prefix:
    try:
        job_prefix = f"fc-sft-{RUN_ID}"
    except Exception:
        job_prefix = ''

def _find_latest_hyp_job_by_prefix_with_hyp(prefix, namespace):
    if not prefix:
        return None
    rc, out_txt = run(f"hyp list hyp-pytorch-job -n {namespace}", check=False)
    if rc != 0 or not out_txt:
        return None
    matches = []
    for line in out_txt.splitlines():
        parsed = _parse_hyp_list_line(line, namespace)
        if not parsed:
            continue
        name, status, age, raw = parsed
        if name.startswith(prefix):
            matches.append((name, age, line))
    if not matches:
        return None
    # Pick most recent by age (smallest age wins when parsed)
    def _age_to_seconds(age):
        if not age:
            return 10**12
        total = 0
        num = ''
        for ch in age:
            if ch.isdigit():
                num += ch
                continue
            if not num:
                continue
            val = int(num)
            num = ''
            if ch == 's':
                total += val
            elif ch == 'm':
                total += val * 60
            elif ch == 'h':
                total += val * 3600
            elif ch == 'd':
                total += val * 86400
        return total if total > 0 else 10**12
    matches.sort(key=lambda x: _age_to_seconds(x[1]))
    return matches[0][0]

if not job_name and job_prefix:
    job_name = _find_latest_hyp_job_by_prefix_with_hyp(job_prefix, namespace)
    if job_name:
        os.environ['FC_SFT_JOB_NAME'] = job_name

if not job_name:
    print('FC_SFT_JOB_NAME not set. Run the submission cell first.')
else:
    !hyp list hyp-pytorch-job -n ${HYPERPOD_NAMESPACE} | grep -F -- "${FC_SFT_JOB_NAME}" || echo '(job not found)'
    phase, conds = get_hyp_job_status(job_name, namespace)
    print('Job:', job_name)
    print('Phase:', phase if phase is not None else '(unavailable)')
    if conds:
        print('Conditions:', conds)



Section 8 SFT status (async)
fc-sft-20260105-000949-d8a722-qgjkhhyperpod-ns-datascientist1Completed      9m             
Job: fc-sft-20260105-000949-d8a722-qgjkh
Phase: (unavailable)


In [42]:
# =============================================================================
# Section 8d: Evaluation script (baseline + SFT)
# =============================================================================

print_header('Section 8 eval script setup')

# Eval params (reuse if already set)
FC_EVAL_MODE = os.environ.get('FC_EVAL_MODE', 'quick').strip().lower()
if FC_EVAL_MODE not in {'quick', 'full'}:
    FC_EVAL_MODE = 'quick'

_fc_eval_limit_env = os.environ.get('FC_EVAL_LIMIT', '').strip()
if _fc_eval_limit_env:
    FC_EVAL_LIMIT = int(_fc_eval_limit_env)
else:
    FC_EVAL_LIMIT = 10 if FC_EVAL_MODE == 'quick' else int(FC_TEST_SIZE)

FC_EVAL_MAX_NEW_TOKENS = int(os.environ.get('FC_EVAL_MAX_NEW_TOKENS', '256'))
FC_EVAL_LOG_EVERY = int(os.environ.get('FC_EVAL_LOG_EVERY', '200'))
FC_EVAL_BATCH_SIZE = int(os.environ.get('FC_EVAL_BATCH_SIZE', '4'))

from pathlib import Path

def _resolve_fc_sft_run_dir():
    run_dir_hint = os.environ.get('FC_SFT_TRAIN_DIR', '').strip()
    if run_dir_hint:
        p = Path(run_dir_hint)
        if p.exists():
            return p

    results_dir = Path(FC_RESULTS_DIR)
    if not results_dir.exists():
        return None

    candidates = []
    prefixes = []
    if os.environ.get('FC_SFT_RUN_NAME'):
        prefixes.append(os.environ.get('FC_SFT_RUN_NAME'))
    if FC_RUN_NAME:
        prefixes.append(FC_RUN_NAME)
    for prefix in prefixes:
        if prefix:
            candidates.extend([p for p in results_dir.glob(f"{prefix}*") if p.is_dir()])

    if not candidates:
        for pat in ('fc-sft-*', 'sft-*'):
            candidates.extend([p for p in results_dir.glob(pat) if p.is_dir()])

    if not candidates:
        return None

    def _score(p: Path):
        has_nemo = (p / 'nemo_experiments').exists()
        return (1 if has_nemo else 0, p.stat().st_mtime)

    return max(candidates, key=_score)


def _to_pod_path(local_path: Path) -> str:
    try:
        local = local_path.resolve()
    except Exception:
        local = local_path
    fsx_root = Path(FSX_BASE_DIR).resolve()
    try:
        rel = local.relative_to(fsx_root)
        return f"{POD_FSX_ROOT}/{rel.as_posix()}"
    except Exception:
        return str(local).replace(str(fsx_root), POD_FSX_ROOT, 1)


_sft_run_dir = _resolve_fc_sft_run_dir()
if _sft_run_dir:
    os.environ['FC_SFT_TRAIN_DIR'] = str(_sft_run_dir)
    os.environ['POD_FC_SFT_TRAIN_DIR'] = _to_pod_path(_sft_run_dir)
    print('Resolved SFT run dir:', _sft_run_dir)
else:
    print('SFT run dir not found yet; will resolve later if needed.')

FC_EVAL_SCRIPT = FC_EVAL_DIR / 'run_fc_eval.py'
FC_EVAL_SCRIPT.write_text('import json, os\nfrom pathlib import Path\nimport torch\nfrom transformers import AutoTokenizer, AutoModelForCausalLM\n\n\ndef _find_adapter_dir(root: Path):\n    for p in root.rglob(\'adapter_config.json\'):\n        return p.parent\n    return None\n\n\ndef _find_merged_dir(root: Path):\n    for p in root.rglob(\'config.json\'):\n        parent = p.parent\n        if any(parent.glob(\'*.safetensors\')) or (parent / \'pytorch_model.bin\').exists():\n            return parent\n    return None\n\n\ndef _find_dist_ckpt_dir(root: Path):\n    candidates = []\n    for p in root.rglob(\'checkpoints\'):\n        if not p.is_dir():\n            continue\n        for ckpt in p.rglob(\'*\'):\n            if not ckpt.is_dir():\n                continue\n            if (ckpt / \'weights\').is_dir() and (ckpt / \'context\').is_dir():\n                candidates.append(ckpt)\n    if not candidates:\n        return None\n    return max(candidates, key=lambda p: p.stat().st_mtime)\n\n\ndef _maybe_export_nemo_to_hf(root: Path):\n    export_dir = root / \'hf_export\'\n    if (export_dir / \'config.json\').exists():\n        return export_dir\n\n    # Prefer .nemo if present\n    nemo_ckpts = list(root.rglob(\'*.nemo\'))\n    ckpt = max(nemo_ckpts, key=lambda p: p.stat().st_mtime) if nemo_ckpts else None\n    if ckpt is None:\n        ckpt = _find_dist_ckpt_dir(root)\n\n    if ckpt is None:\n        return None\n\n    try:\n        from nemo.collections import llm\n        llm.export_ckpt(path=ckpt, target=\'hf\', output_path=export_dir)\n    except Exception as e:\n        print(f\'WARNING: NeMo export failed: {e}\')\n        return None\n    return export_dir if (export_dir / \'config.json\').exists() else None\n\n\ndef _load_model(base_model, training_dir, require_sft=False):\n    adapter_dir = None\n    merged_dir = None\n    used = \'base\'\n\n    if training_dir:\n        root = Path(training_dir)\n        if root.exists():\n            adapter_dir = _find_adapter_dir(root)\n            if adapter_dir is None:\n                merged_dir = _find_merged_dir(root)\n            if adapter_dir is None and merged_dir is None:\n                export_dir = _maybe_export_nemo_to_hf(root)\n                if export_dir is not None:\n                    merged_dir = export_dir\n\n    if adapter_dir:\n        from peft import PeftModel\n        model = AutoModelForCausalLM.from_pretrained(base_model, torch_dtype=torch.bfloat16, device_map=\'auto\')\n        model = PeftModel.from_pretrained(model, str(adapter_dir))\n        tokenizer = AutoTokenizer.from_pretrained(base_model, use_fast=True)\n        used = \'adapter\'\n    elif merged_dir:\n        model = AutoModelForCausalLM.from_pretrained(str(merged_dir), torch_dtype=torch.bfloat16, device_map=\'auto\')\n        tokenizer = AutoTokenizer.from_pretrained(str(merged_dir), use_fast=True)\n        used = \'merged\'\n    else:\n        if require_sft:\n            raise RuntimeError(\n                \'SFT eval requested but no fine-tuned weights were found. \'\n                \'Export HF weights (hf_export/) or provide TRAINING_DIR with adapters/merged weights.\'\n            )\n        model = AutoModelForCausalLM.from_pretrained(base_model, torch_dtype=torch.bfloat16, device_map=\'auto\')\n        tokenizer = AutoTokenizer.from_pretrained(base_model, use_fast=True)\n\n    if tokenizer.pad_token is None:\n        tokenizer.pad_token = tokenizer.eos_token\n    tokenizer.padding_side = \'right\'\n    model.eval()\n    return model, tokenizer, used\n\n\ndef _apply_chat_template(tokenizer, messages, add_generation_prompt=True):\n    if hasattr(tokenizer, \'apply_chat_template\'):\n        return tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=add_generation_prompt)\n    parts = []\n    for msg in messages:\n        role = msg.get(\'role\', \'user\')\n        prefix = \'User: \' if role == \'user\' else \'Assistant: \'\n        parts.append(prefix + msg.get(\'content\', \'\'))\n    if add_generation_prompt:\n        parts.append(\'Assistant: \')\n    return \'\n\n\'.join(parts)\n\n\ndef _safe_json_loads(text):\n    try:\n        return json.loads(text)\n    except Exception:\n        pass\n    for start_char in [\'[\', \'{\']:\n        start = text.find(start_char)\n        if start == -1:\n            continue\n        for end_char in [\']\', \'}\']:\n            end = text.rfind(end_char)\n            if end > start:\n                frag = text[start:end+1]\n                try:\n                    return json.loads(frag)\n                except Exception:\n                    continue\n    return None\n\n\ndef _get_name_and_args(obj):\n    if isinstance(obj, list) and obj:\n        obj = obj[0]\n    if not isinstance(obj, dict):\n        return None, None\n    return obj.get(\'name\'), obj.get(\'arguments\')\n\n\ndef evaluate(data_path, base_model, training_dir, out_path, max_new_tokens, limit, err_path=None, log_every=100, batch_size=4, require_sft=False):\n    print(f"Loading model: base_model={base_model} training_dir={training_dir}")\n    model, tokenizer, used = _load_model(base_model, training_dir, require_sft=require_sft)\n    print(f"Model loaded (source={used})")\n\n    total = None\n    if log_every and log_every > 0:\n        try:\n            with open(data_path, \'r\') as _f:\n                total = sum(1 for _ in _f)\n        except Exception as e:\n            print(f"WARNING: Could not count eval lines: {e}")\n        if limit and total is not None:\n            total = min(total, limit)\n        elif limit and total is None:\n            total = limit\n    print(f"Starting eval: limit={limit} total={total} batch_size={batch_size}")\n\n    results = {"format_acc": [], "function_acc": [], "argument_acc": [], "exact_match": [], "hallucination": []}\n    errors = []\n\n    def _score_one(query, tools, answers, gen):\n        expected = _safe_json_loads(str(answers))\n        pred = _safe_json_loads(gen)\n\n        if pred is None:\n            results[\'format_acc\'].append(0)\n            results[\'function_acc\'].append(0)\n            results[\'argument_acc\'].append(0)\n            results[\'exact_match\'].append(0)\n            results[\'hallucination\'].append(0)\n            errors.append({\'query\': query, \'error\': \'invalid_json\', \'output\': gen})\n            return\n\n        results[\'format_acc\'].append(1)\n        pred_name, pred_args = _get_name_and_args(pred)\n        exp_name, exp_args = _get_name_and_args(expected)\n\n        try:\n            tool_names = [t.get(\'name\') for t in json.loads(tools)]\n        except Exception:\n            tool_names = []\n\n        is_hallucination = pred_name not in tool_names if tool_names else False\n        results[\'hallucination\'].append(1 if is_hallucination else 0)\n\n        fn_correct = pred_name == exp_name\n        results[\'function_acc\'].append(1 if fn_correct else 0)\n\n        args_correct = pred_args == exp_args\n        results[\'argument_acc\'].append(1 if args_correct else 0)\n\n        results[\'exact_match\'].append(1 if (fn_correct and args_correct) else 0)\n\n        if not fn_correct or not args_correct:\n            errors.append({\n                \'query\': query,\n                \'expected\': expected,\n                \'pred\': pred,\n                \'output\': gen,\n            })\n\n    processed = 0\n    batch_prompts = []\n    batch_meta = []\n\n    with open(data_path, \'r\') as f:\n        for idx, line in enumerate(f):\n            if limit and idx >= limit:\n                break\n            ex = json.loads(line)\n            tools = ex.get(\'tools\', \'\')\n            query = ex.get(\'query\', \'\')\n            answers = ex.get(\'answers\', \'\')\n\n            system = (\n                "You are a helpful assistant with access to the following functions. "\n                "Use them if required.\n\n" + str(tools)\n            )\n            messages = [\n                {\'role\': \'system\', \'content\': system},\n                {\'role\': \'user\', \'content\': str(query)},\n            ]\n            prompt = _apply_chat_template(tokenizer, messages, add_generation_prompt=True)\n\n            batch_prompts.append(prompt)\n            batch_meta.append((query, tools, answers))\n\n            if len(batch_prompts) >= max(1, batch_size):\n                enc = tokenizer(batch_prompts, return_tensors=\'pt\', padding=True)\n                input_ids = enc[\'input_ids\'].to(model.device)\n                attention_mask = enc[\'attention_mask\'].to(model.device)\n                with torch.no_grad():\n                    out = model.generate(\n                        input_ids=input_ids,\n                        attention_mask=attention_mask,\n                        max_new_tokens=max_new_tokens,\n                        do_sample=False,\n                        temperature=0.0,\n                    )\n                for i in range(len(batch_prompts)):\n                    prompt_len = int(attention_mask[i].sum().item())\n                    gen = tokenizer.decode(out[i][prompt_len:], skip_special_tokens=True).strip()\n                    q, t, a = batch_meta[i]\n                    _score_one(q, t, a, gen)\n                processed += len(batch_prompts)\n                if log_every and log_every > 0 and processed % log_every == 0:\n                    if total is not None:\n                        print(f"Processed {processed}/{total}")\n                    else:\n                        print(f"Processed {processed}")\n                batch_prompts = []\n                batch_meta = []\n\n    if batch_prompts:\n        enc = tokenizer(batch_prompts, return_tensors=\'pt\', padding=True)\n        input_ids = enc[\'input_ids\'].to(model.device)\n        attention_mask = enc[\'attention_mask\'].to(model.device)\n        with torch.no_grad():\n            out = model.generate(\n                input_ids=input_ids,\n                attention_mask=attention_mask,\n                max_new_tokens=max_new_tokens,\n                do_sample=False,\n                temperature=0.0,\n            )\n        for i in range(len(batch_prompts)):\n            prompt_len = int(attention_mask[i].sum().item())\n            gen = tokenizer.decode(out[i][prompt_len:], skip_special_tokens=True).strip()\n            q, t, a = batch_meta[i]\n            _score_one(q, t, a, gen)\n        processed += len(batch_prompts)\n\n    summary = {k: (sum(v) / len(v) if v else 0.0) for k, v in results.items()}\n    Path(out_path).parent.mkdir(parents=True, exist_ok=True)\n    with open(out_path, \'w\') as f:\n        json.dump(summary, f, indent=2)\n\n    if err_path:\n        with open(err_path, \'w\') as f:\n            json.dump(errors[:200], f, indent=2)\n\n    print(\'Wrote metrics to\', out_path)\n    if err_path:\n        print(\'Wrote errors to\', err_path)\n\n\nif __name__ == \'__main__\':\n    data_path = os.environ.get(\'EVAL_DATA_PATH\')\n    base_model = os.environ.get(\'BASE_MODEL\')\n    training_dir = os.environ.get(\'TRAINING_DIR\', \'\').strip()\n    out_path = os.environ.get(\'OUTPUT_PATH\')\n    err_path = os.environ.get(\'ERRORS_PATH\', \'\')\n    max_new_tokens = int(os.environ.get(\'MAX_NEW_TOKENS\', \'256\'))\n    limit = int(os.environ.get(\'EVAL_LIMIT\', \'0\'))\n    log_every = int(os.environ.get(\'EVAL_LOG_EVERY\', \'100\'))\n    batch_size = int(os.environ.get(\'EVAL_BATCH_SIZE\', \'4\'))\n    require_sft = str(os.environ.get(\'REQUIRE_SFT\', \'false\')).lower() in {\'1\',\'true\',\'yes\'}\n\n    if not data_path or not base_model or not out_path:\n        raise ValueError(\'Missing EVAL_DATA_PATH, BASE_MODEL, or OUTPUT_PATH\')\n\n    evaluate(\n        data_path,\n        base_model,\n        training_dir,\n        out_path,\n        max_new_tokens,\n        limit,\n        err_path or None,\n        log_every=log_every,\n        batch_size=batch_size,\n        require_sft=require_sft,\n    )\n')

print('Eval script:', FC_EVAL_SCRIPT)



Section 8 eval script setup
Eval script: /home/sagemaker-user/custom-file-systems/fsx_lustre/fs-03b1953d09801303c/smus-nemo-smoke/section8_fc/eval/run_fc_eval.py


In [43]:
# =============================================================================
# Section 8e: Submit baseline evaluation (optional, Qwen2.5-Coder-7B/14B)
# =============================================================================

if not RUN_FC_EVAL_BASELINE:
    print('Skipping baseline eval (RUN_FC_EVAL_BASELINE=False).')
else:
    print_header(f'Baseline eval (Qwen2.5-Coder-{FC_QWEN_SIZE_LABEL}-Instruct)')

    eval_name = f"fc-eval-base-{RUN_ID}"
    eval_out = f"{POD_FC_EVAL_DIR}/baseline-metrics-{RUN_ID}.json"
    eval_err = f"{POD_FC_EVAL_DIR}/baseline-errors-{RUN_ID}.json"
    data_path = f"{POD_FC_DATA_TEST_DIR}/test_raw.jsonl"

    cmd = f"python {POD_FC_EVAL_DIR}/run_fc_eval.py"
    job = submit_simple_job(
        name=eval_name,
        image=NEMO_IMAGE,
        command=cmd,
        gpu='1',
        labels={
            'kueue.x-k8s.io/queue-name': KUEUE_QUEUE_NAME,
            'kueue.x-k8s.io/priority-class': KUEUE_PRIORITY_CLASS_INFER,
        } if HAS_TASK_GOV else None,
        annotations={
            f'kueue.x-k8s.io/podset-{KUEUE_TOPOLOGY_MODE_INFER}-topology': KUEUE_TOPOLOGY_LABEL_INFER,
        } if HAS_TASK_GOV else None,
        env=HF_ENV + [
            {'name': 'BASE_MODEL', 'value': FC_BASE_MODEL_ID},
            {'name': 'TRAINING_DIR', 'value': ''},
            {'name': 'EVAL_DATA_PATH', 'value': data_path},
            {'name': 'OUTPUT_PATH', 'value': eval_out},
            {'name': 'ERRORS_PATH', 'value': eval_err},
            {'name': 'EVAL_LIMIT', 'value': str(FC_EVAL_LIMIT)},
            {'name': 'MAX_NEW_TOKENS', 'value': str(FC_EVAL_MAX_NEW_TOKENS)},
            {'name': 'EVAL_LOG_EVERY', 'value': str(FC_EVAL_LOG_EVERY)},
            {'name': 'EVAL_BATCH_SIZE', 'value': str(FC_EVAL_BATCH_SIZE)},
        ],
    )
    os.environ['FC_EVAL_BASE_JOB_NAME'] = eval_name
    os.environ['FC_EVAL_BASE_OUTPUT_PATH'] = eval_out
    os.environ['FC_BASE_METRICS_PATH'] = str(FC_EVAL_DIR / f'baseline-metrics-{RUN_ID}.json')
    os.environ['FC_BASE_ERRORS_PATH'] = str(FC_EVAL_DIR / f'baseline-errors-{RUN_ID}.json')
    print('Submitted baseline eval job:', eval_name)



Baseline eval (Qwen2.5-Coder-7B-Instruct)


Successfully submitted HyperPodPytorchJob 'fc-eval-base-20260105-000949-d8a722'!


Submitted baseline eval job: fc-eval-base-20260105-000949-d8a722


In [44]:
# =============================================================================
# Section 8f: Submit SFT evaluation (optional)
# =============================================================================

if not RUN_FC_EVAL_SFT:
    print('Skipping SFT eval (RUN_FC_EVAL_SFT=False).')
else:
    print_header('SFT eval (function-calling)')

    eval_name = f"fc-eval-sft-{RUN_ID}"
    eval_out = f"{POD_FC_EVAL_DIR}/sft-metrics-{RUN_ID}.json"
    eval_err = f"{POD_FC_EVAL_DIR}/sft-errors-{RUN_ID}.json"
    data_path = f"{POD_FC_DATA_TEST_DIR}/test_raw.jsonl"

    train_dir = os.environ.get('POD_FC_SFT_TRAIN_DIR', '').strip()
    if not train_dir:
        local_dir = _resolve_fc_sft_run_dir()
        if local_dir:
            train_dir = _to_pod_path(local_dir)
            os.environ['POD_FC_SFT_TRAIN_DIR'] = train_dir
            os.environ['FC_SFT_TRAIN_DIR'] = str(local_dir)

    cmd = f"python {POD_FC_EVAL_DIR}/run_fc_eval.py"
    job = submit_simple_job(
        name=eval_name,
        image=NEMO_IMAGE,
        command=cmd,
        gpu='1',
        labels={
            'kueue.x-k8s.io/queue-name': KUEUE_QUEUE_NAME,
            'kueue.x-k8s.io/priority-class': KUEUE_PRIORITY_CLASS_INFER,
        } if HAS_TASK_GOV else None,
        annotations={
            f'kueue.x-k8s.io/podset-{KUEUE_TOPOLOGY_MODE_INFER}-topology': KUEUE_TOPOLOGY_LABEL_INFER,
        } if HAS_TASK_GOV else None,
        env=HF_ENV + [
            {'name': 'BASE_MODEL', 'value': FC_BASE_MODEL_ID},
            {'name': 'TRAINING_DIR', 'value': train_dir},
            {'name': 'EVAL_DATA_PATH', 'value': data_path},
            {'name': 'OUTPUT_PATH', 'value': eval_out},
            {'name': 'ERRORS_PATH', 'value': eval_err},
            {'name': 'EVAL_LIMIT', 'value': str(FC_EVAL_LIMIT)},
            {'name': 'MAX_NEW_TOKENS', 'value': str(FC_EVAL_MAX_NEW_TOKENS)},
            {'name': 'EVAL_LOG_EVERY', 'value': str(FC_EVAL_LOG_EVERY)},
            {'name': 'EVAL_BATCH_SIZE', 'value': str(FC_EVAL_BATCH_SIZE)},
            {'name': 'REQUIRE_SFT', 'value': 'true'},
        ],
    )
    os.environ['FC_EVAL_SFT_JOB_NAME'] = eval_name
    os.environ['FC_EVAL_SFT_OUTPUT_PATH'] = eval_out
    os.environ['FC_SFT_METRICS_PATH'] = str(FC_EVAL_DIR / f'sft-metrics-{RUN_ID}.json')
    os.environ['FC_SFT_ERRORS_PATH'] = str(FC_EVAL_DIR / f'sft-errors-{RUN_ID}.json')
    os.environ['FC_ERRORS_PATH'] = os.environ['FC_SFT_ERRORS_PATH']
    print('Submitted SFT eval job:', eval_name)


Successfully submitted HyperPodPytorchJob 'fc-eval-sft-20260105-000949-d8a722'!



SFT eval (function-calling)
Submitted SFT eval job: fc-eval-sft-20260105-000949-d8a722


## Section 8g — Checkpoint + Training Curves + Eval Summary

These cells surface the remaining Stage‑1 deliverables:
- checkpoint location and key files
- training curves (train/val loss)
- evaluation table (baseline vs SFT)
- error analysis buckets


In [45]:
# =============================================================================
# Section 8g.1: Locate checkpoint + logs
# =============================================================================

print_header('Section 8 checkpoint discovery')

from pathlib import Path
import os

run_dir = _resolve_fc_sft_run_dir()
if run_dir:
    os.environ['FC_SFT_TRAIN_DIR'] = str(run_dir)
    os.environ['POD_FC_SFT_TRAIN_DIR'] = _to_pod_path(run_dir)

if not run_dir or not run_dir.exists():
    print('No SFT run directory found yet under:', FC_RESULTS_DIR)
else:
    print('Run dir:', run_dir)

    # HF export/adapter/merged checkpoints
    hf_export = run_dir / 'hf_export'
    if hf_export.exists() and (hf_export / 'config.json').exists():
        print('HF export dir:', hf_export)

    patterns = ['adapter_config.json', 'config.json', '*.safetensors', 'pytorch_model.bin']
    found = []
    for pat in patterns:
        found.extend(run_dir.rglob(pat))
    if not found:
        print('No HF/adapter checkpoint files found yet.')
    else:
        for p in sorted(found)[:20]:
            print('  ', p)
        if len(found) > 20:
            print('  ...')

    # NeMo distributed checkpoints
    ckpt_dirs = []
    for p in run_dir.rglob('checkpoints'):
        if not p.is_dir():
            continue
        for ckpt in p.rglob('*'):
            if ckpt.is_dir() and (ckpt / 'weights').is_dir() and (ckpt / 'context').is_dir():
                ckpt_dirs.append(ckpt)
    if ckpt_dirs:
        ckpt_dirs = sorted(ckpt_dirs, key=lambda p: p.stat().st_mtime, reverse=True)
        print('NeMo dist checkpoint dirs:', len(ckpt_dirs))
        for p in ckpt_dirs[:5]:
            print('  ', p)
        if len(ckpt_dirs) > 5:
            print('  ...')
    else:
        print('No NeMo dist checkpoints found yet.')

    # TensorBoard logs
    tb_files = list(run_dir.rglob('events.out.tfevents*'))
    print('TensorBoard event files:', len(tb_files))
    if tb_files:
        for p in tb_files[:5]:
            print('  ', p)
        if len(tb_files) > 5:
            print('  ...')



Section 8 checkpoint discovery
Run dir: /fsx/fs-03b1953d09801303c/smus-nemo-smoke/section8_fc/results/fc-sft-20260105-000949-d8a722
No checkpoint files found yet. Job may still be running.
TensorBoard event files: 0


In [46]:
# =============================================================================
# Section 8g.2: Plot training curves (train/val loss)
# =============================================================================

RUN_FC_PLOT_LOSS = bool(str(os.environ.get('RUN_FC_PLOT_LOSS', 'true')).lower() in {'1','true','yes'})
if not RUN_FC_PLOT_LOSS:
    print('Skipping loss plot (RUN_FC_PLOT_LOSS=False).')
else:
    print_header('Section 8 training curves')

    from pathlib import Path
    from collections import defaultdict

    try:
        from tensorboard.backend.event_processing import event_accumulator
    except Exception:
        print('Installing tensorboard...')
        run('python -m pip install -q tensorboard')
        from tensorboard.backend.event_processing import event_accumulator

    run_dir = _resolve_fc_sft_run_dir()
    if not run_dir or not run_dir.exists():
        print('No run directory found for plots.')
    else:
        tb_files = list(run_dir.rglob('events.out.tfevents*'))
        if not tb_files:
            print('No TensorBoard event files found in:', run_dir)
        else:
            series = defaultdict(list)
            for f in tb_files:
                ea = event_accumulator.EventAccumulator(str(f))
                try:
                    ea.Reload()
                except Exception:
                    continue
                tags = ea.Tags().get('scalars', [])
                for tag in tags:
                    if 'loss' in tag.lower():
                        for ev in ea.Scalars(tag):
                            series[tag].append((ev.step, ev.value))

            if not series:
                print('No loss scalars found. Available scalar tags:')
                try:
                    tags = ea.Tags().get('scalars', [])
                    print(tags)
                except Exception:
                    pass
            else:
                import matplotlib.pyplot as plt
                plt.figure(figsize=(8, 4))
                for tag, pts in series.items():
                    pts = sorted(pts, key=lambda x: x[0])
                    xs = [p[0] for p in pts]
                    ys = [p[1] for p in pts]
                    plt.plot(xs, ys, label=tag)
                plt.xlabel('step')
                plt.ylabel('loss')
                plt.title('Training/Validation Loss')
                plt.legend()
                plt.grid(True)
                plt.show()



Section 8 training curves
No TensorBoard event files found in: /fsx/fs-03b1953d09801303c/smus-nemo-smoke/section8_fc/results/fc-sft-20260105-000949-d8a722


In [47]:
# =============================================================================
# Section 8g.3: Evaluation summary table (baseline vs SFT)
# =============================================================================

print_header('Section 8 eval summary')

from pathlib import Path
import json

# Resolve latest metrics files if not explicitly set
base_metrics = os.environ.get('FC_BASE_METRICS_PATH', '').strip()
sft_metrics = os.environ.get('FC_SFT_METRICS_PATH', '').strip()

if not base_metrics:
    candidates = sorted(Path(FC_EVAL_DIR).glob('baseline-metrics-*.json'), key=lambda p: p.stat().st_mtime, reverse=True)
    base_metrics = str(candidates[0]) if candidates else ''
if not sft_metrics:
    candidates = sorted(Path(FC_EVAL_DIR).glob('sft-metrics-*.json'), key=lambda p: p.stat().st_mtime, reverse=True)
    sft_metrics = str(candidates[0]) if candidates else ''

print('Baseline metrics file:', base_metrics or '(none)')
print('SFT metrics file:', sft_metrics or '(none)')

if not base_metrics or not sft_metrics:
    print('Missing metrics files. Run Section 8e/8f to generate them.')
else:
    with open(base_metrics, 'r') as f:
        base = json.load(f)
    with open(sft_metrics, 'r') as f:
        sft = json.load(f)

    metrics = sorted(set(base.keys()) | set(sft.keys()))
    rows = []
    for m in metrics:
        b = base.get(m, 0.0)
        s = sft.get(m, 0.0)
        rows.append((m, b, s, s - b))

    try:
        import pandas as pd
        df = pd.DataFrame(rows, columns=['metric', 'baseline', 'sft', 'delta'])
        display(df)
    except Exception:
        for m, b, s, d in rows:
            print(f"{m:16s} baseline={b:.4f} sft={s:.4f} delta={d:+.4f}")



Section 8 eval summary
Baseline metrics file: (none)
SFT metrics file: (none)
Missing metrics files. Run Section 8e/8f to generate them.


In [48]:
# =============================================================================
# Section 8g.4: Error analysis buckets
# =============================================================================

print_header('Section 8 error analysis')

from pathlib import Path
import json

err_path = os.environ.get('FC_ERRORS_PATH', '').strip()
if not err_path:
    # Prefer SFT errors if available
    sft_errs = sorted(Path(FC_EVAL_DIR).glob('sft-errors-*.json'), key=lambda p: p.stat().st_mtime, reverse=True)
    base_errs = sorted(Path(FC_EVAL_DIR).glob('baseline-errors-*.json'), key=lambda p: p.stat().st_mtime, reverse=True)
    err_path = str(sft_errs[0]) if sft_errs else (str(base_errs[0]) if base_errs else '')

print('Errors file:', err_path or '(none)')

if not err_path:
    print('No error file found. Run Section 8e/8f first.')
else:
    # Build a lookup from query -> tools/answers
    lookup = {}
    test_raw = FC_DATA_TEST_DIR / 'test_raw.jsonl'
    if test_raw.exists():
        with open(test_raw, 'r') as f:
            for line in f:
                ex = json.loads(line)
                q = ex.get('query')
                if q not in lookup:
                    lookup[q] = ex
    else:
        print('Missing test_raw.jsonl at', test_raw)

    def _safe_json_loads(text):
        try:
            return json.loads(text)
        except Exception:
            return None

    def _get_name_and_args(obj):
        if isinstance(obj, list) and obj:
            obj = obj[0]
        if not isinstance(obj, dict):
            return None, None
        return obj.get('name'), obj.get('arguments')

    with open(err_path, 'r') as f:
        errs = json.load(f)

    buckets = {
        'invalid_json': [],
        'hallucination': [],
        'wrong_function': [],
        'wrong_arguments': [],
        'other': [],
    }

    for ex in errs:
        if ex.get('error') == 'invalid_json':
            buckets['invalid_json'].append(ex)
            continue

        pred = ex.get('pred')
        expected = ex.get('expected')
        if pred is None and ex.get('output'):
            pred = _safe_json_loads(ex['output'])
        if expected is None and ex.get('answers'):
            expected = _safe_json_loads(ex['answers'])

        pred_name, pred_args = _get_name_and_args(pred)
        exp_name, exp_args = _get_name_and_args(expected)

        tools = []
        if ex.get('query') in lookup:
            tools_raw = lookup[ex['query']].get('tools')
            try:
                tools = json.loads(tools_raw)
            except Exception:
                tools = []

        tool_names = [t.get('name') for t in tools] if tools else []

        if tool_names and pred_name not in tool_names:
            buckets['hallucination'].append(ex)
        elif pred_name != exp_name:
            buckets['wrong_function'].append(ex)
        elif pred_args != exp_args:
            buckets['wrong_arguments'].append(ex)
        else:
            buckets['other'].append(ex)

    print('Bucket counts:')
    for k, v in buckets.items():
        print(f"  {k:16s} {len(v)}")

    # Show a few examples per bucket
    for k, v in buckets.items():
        if not v:
            continue
        print('\n', k)
        for ex in v[:3]:
            print('  query:', ex.get('query'))
            if ex.get('output'):
                print('  output:', ex.get('output'))
            if ex.get('pred'):
                print('  pred:', ex.get('pred'))
            if ex.get('expected'):
                print('  expected:', ex.get('expected'))



Section 8 error analysis
Errors file: (none)
No error file found. Run Section 8e/8f first.
