In [None]:
!pip install --upgrade pip
# !pip install --no-deps --upgrade --force-reinstall --no-cache-dir "git+https://github.com/briangallagher/sdk@training-hub"
# !pip install --upgrade --force-reinstall --no-cache-dir "git+https://github.com/briangallagher/sdk@training-hub"
!pip install datasets

In [None]:
import os
import json
from datasets import load_dataset

# Prepare workspace directories
base_dir = "/opt/app-root/src"
data_dir = os.path.join(base_dir, "data")
outputs_dir = os.path.join(base_dir, "outputs")
ckpt_dir = os.path.join(base_dir, "checkpoints")

for d in [data_dir, outputs_dir, ckpt_dir]:
    os.makedirs(d, exist_ok=True)
    os.chmod(d, 0o777)
    print(f"[PY] Ensured directory exists and writable: {d}")

dataset_path = os.path.join(data_dir, "dataset.jsonl")

# Prepare workspace directories
ds = load_dataset("Open-Orca/OpenOrca", split="train")

def convert(example):
    msgs = [{"role": "system", "content": example['system_prompt']}]
    if example["question"]:
        msgs.append({"role": "user", "content": example["question"]})
    msgs.append({"role": "assistant", "content": example["response"]})
    return {"messages": msgs}

sam = ds.shuffle(seed=42).select(range(1000))
alp = sam.map(convert)
alp.to_json(dataset_path, orient="records", lines=True)
print(f"[PY] Finished writing dataset: {d}")
print(f"[PY] Created dataset file: {dataset_path}")

In [None]:
from kubeflow.trainer import TrainerClient, TrainingHubTrainer, TrainingHubAlgorithms
from kubeflow_trainer_api import models

client = TrainerClient()
print(client)

In [None]:
# NOTE: Needed to add cluster role binding to read the cluster scoped runtimes

for runtime in client.list_runtimes():
    if runtime.name == "training-hub-sft":
        sft_runtime = runtime
        print("Found runtime: " + str(sft_runtime))

In [None]:
args = {
    "model_path":"Qwen/Qwen2.5-0.5B",
    "data_path": "/opt/app-root/src/data/dataset.jsonl",
    "ckpt_output_dir": "/opt/app-root/src/checkpoints",
    "data_output_dir": "/opt/app-root/src/outputs",
    "num_epochs": 1,
    "effective_batch_size": 128,
    "max_tokens_per_gpu": 2048,
    "learning_rate": 1e-05,
    "max_seq_len": 512,
    "max_batch_len": 512,
    "save_samples": 0,
    "warmup_steps": 100,
    "checkpoint_at_epoch": True,
    "accelerate_full_state_at_epoch": True,
    "rdzv_id": 1,
    "disable_flash_attn": True,
    "packing": False,
    "enable_multipack": False,
    "fp16": True,
    "bf16": False,
    "gradient_checkpointing": True,
    "distributed_training_framework": "fsdp",
    "fsdp_sharding_strategy": "SHARD_GRAD_OP",
    "disable_multipack": True,
    "dtype": "float16",
    "nproc_per_node": 1,
    "nnodes": 2,
}

# Kubernetes volumes and mounts (PVC example)
volumes = [
    models.IoK8sApiCoreV1Volume(
        name="example",
        persistent_volume_claim=models.IoK8sApiCoreV1PersistentVolumeClaimVolumeSource(
            claim_name="example"
        ),
    ),
]
volume_mounts = [
    models.IoK8sApiCoreV1VolumeMount(
        name="example",
        mount_path="/opt/app-root/src/",
        read_only=False,
    ),
]

job_name = client.train(
    trainer=TrainingHubTrainer(
        algorithm=TrainingHubAlgorithms.SFT,
        func_args=args,
        # packages_to_install=["training-hub"],
        volumes=volumes,
        volume_mounts=volume_mounts
    ),
    runtime=sft_runtime,
)