From 4dbb9062b7ed5a4ca21c58975a274948f4e2c2ad Mon Sep 17 00:00:00 2001 From: Chenhan Yu Date: Tue, 7 Apr 2026 20:42:11 -0700 Subject: [PATCH] feat(speculative): add vLLM data synthesis pipeline and Nemotron dataset preparation scripts - Use slurm_config.user for SSHTunnel user when set - Add assets field to SandboxPipeline for pre-submission asset checks - Add Nemotron specdec dataset preparation scripts - Add vLLM container support for data synthesis - Add Qwen3.5-4B vLLM specdec data synthesis YAML - Move dataset prep scripts to examples/dataset/ - Update spec dec conftest to use examples/dataset/make_dataset.py Signed-off-by: Chenhan D. Yu <5185878+ChenhanYu@users.noreply.github.com> Signed-off-by: Chenhan Yu --- examples/dataset/README.md | 128 ++++++++ .../add_nemotron_chat.py | 0 examples/dataset/augmentations.yaml | 93 ++++++ examples/dataset/conversation_utils.py | 200 ++++++++++++ .../example_data_config.yaml | 0 .../make_dataset.py | 0 .../dataset/make_nemotron_ptv2_dataset.py | 248 ++++++++++++++ .../dataset/make_nemotron_ptv3_dataset.py | 304 ++++++++++++++++++ examples/dataset/nemotron_ptv3_datasets.yaml | 193 +++++++++++ .../utils.py | 0 examples/speculative_decoding/README.md | 47 ++- .../prepare_input_conversations/__init__.py | 16 - .../examples/speculative_decoding/conftest.py | 2 +- tools/launcher/common/query.py | 90 +++++- tools/launcher/common/vllm/query.sh | 6 +- tools/launcher/core.py | 6 +- .../Qwen3.5-4B/query_specdec_dataset.yaml | 43 +++ 17 files changed, 1337 insertions(+), 39 deletions(-) create mode 100644 examples/dataset/README.md rename examples/{speculative_decoding/prepare_input_conversations => dataset}/add_nemotron_chat.py (100%) create mode 100644 examples/dataset/augmentations.yaml create mode 100644 examples/dataset/conversation_utils.py rename examples/{speculative_decoding/prepare_input_conversations => dataset}/example_data_config.yaml (100%) rename examples/{speculative_decoding/prepare_input_conversations => dataset}/make_dataset.py (100%) create mode 100644 examples/dataset/make_nemotron_ptv2_dataset.py create mode 100644 examples/dataset/make_nemotron_ptv3_dataset.py create mode 100644 examples/dataset/nemotron_ptv3_datasets.yaml rename examples/{speculative_decoding/prepare_input_conversations => dataset}/utils.py (100%) delete mode 100644 examples/speculative_decoding/prepare_input_conversations/__init__.py create mode 100644 tools/launcher/examples/Qwen/Qwen3.5-4B/query_specdec_dataset.yaml diff --git a/examples/dataset/README.md b/examples/dataset/README.md new file mode 100644 index 0000000000..34afcc74f6 --- /dev/null +++ b/examples/dataset/README.md @@ -0,0 +1,128 @@ +# Dataset Preparation Scripts + +Utilities for building conversation datasets from NVIDIA Nemotron Post-Training +collections and other HuggingFace sources. These scripts produce datasets in +**standard OpenAI chat format** (`{"messages": [{"role": ..., "content": ...}]}`) +and can be used for any downstream fine-tuning task — SFT, distillation, +speculative decoding draft-model training, etc. + +## Files + +| File | Description | +|---|---| +| `make_nemotron_ptv3_dataset.py` | Build a dataset from the [Nemotron PT v3 collection](https://huggingface.co/collections/nvidia/nemotron-post-training-v3) using a configurable YAML mix | +| `make_nemotron_ptv2_dataset.py` | Build a dataset from [Nemotron-Post-Training-Dataset-v2](https://huggingface.co/datasets/nvidia/Nemotron-Post-Training-Dataset-v2) | +| `make_dataset.py` | General-purpose mixer for arbitrary HuggingFace datasets (mtbench, sharegpt, ultrachat, magpie, etc.) | +| `conversation_utils.py` | Shared utilities: augmentation, role normalization, assistant-turn stripping | +| `add_nemotron_chat.py` | Add Nemotron v2 chat conversations to an existing dataset | +| `augmentations.yaml` | Augmentation variants (language redirects, style hints) for `make_nemotron_pt*.py` | +| `nemotron_ptv3_datasets.yaml` | Dataset mix config for `make_nemotron_ptv3_dataset.py` | +| `example_data_config.yaml` | Example YAML config for `make_dataset.py` | + +## Quick Start + +### Install dependencies + +```bash +pip install datasets huggingface_hub pyyaml +huggingface-cli login # required for gated datasets +```text + +### Build a Nemotron PT v3 dataset + +```bash +# Synthetic data generation inputs (strips last assistant turn so a model can regenerate it) +python make_nemotron_ptv3_dataset.py --output-dir /tmp/ptv3_gen + +# Full conversations for direct SFT training +python make_nemotron_ptv3_dataset.py --mode train --output-dir /tmp/ptv3_train + +# Use a custom dataset mix +python make_nemotron_ptv3_dataset.py --config my_mix.yaml --output-dir /tmp/ptv3_custom +```text + +### Build a Nemotron PT v2 dataset + +```bash +python make_nemotron_ptv2_dataset.py --output-dir /tmp/ptv2_gen +python make_nemotron_ptv2_dataset.py --mode train --output-dir /tmp/ptv2_train +```text + +### Build a general-purpose mixed dataset + +```bash +python make_dataset.py --config example_data_config.yaml --output-dir /tmp/mixed +```text + +## Dataset Modes + +Both `make_nemotron_pt*.py` scripts support two modes: + +| Mode | Description | Use case | +|---|---|---| +| `generate` (default) | Strips assistant turns, optionally augments prompts | Input data for synthetic generation (query a target model to produce training responses) | +| `train` | Keeps all turns, normalizes to clean OpenAI format | Direct SFT / distillation training | + +## Synthetic Generation Pipeline + +The `generate` mode produces conversation skeletons that are fed to a target model +via `tools/launcher/common/query.py` (vLLM or TRT-LLM). The output becomes training +data for a draft model (e.g. EAGLE3 speculative decoding) or a distilled student: + +```text +make_nemotron_ptv3_dataset.py --mode generate → skeleton.jsonl + ↓ +query.py (target model generates responses turn-by-turn) + ↓ +training data for draft model / student +```text + +## Augmentations + +`augmentations.yaml` defines language-redirect and style-hint variants that are +applied cyclically across the dataset. Each enabled entry produces one augmented +copy of the source rows. + +To customize augmentations: +- **Disable** a variant: add `enabled: false` +- **Add** a language redirect: append a `user_suffix` entry +- **Add** a system prompt: append a `system_prompt` entry + +```yaml +augmentations: + - type: user_suffix + text: " Please reply in French instead of English." + - type: system_prompt + content: "You are a helpful assistant." + enabled: false # disable without deleting +```text + +## Dataset Mix Config (`nemotron_ptv3_datasets.yaml`) + +Edit this file to add, remove, or re-weight datasets without touching the script: + +```yaml +datasets: + - repo_id: nvidia/Nemotron-Math-v2 + splits: [high_part00, high_part01] + cap_per_split: 200000 + augment: true + + - repo_id: nvidia/OpenMathReasoning-mini + splits: [train] + augment: false # multilingual — skip language-redirect augmentation +```text + +## Output Format + +Every output row is a JSONL object with a single `messages` key: + +```json +{"messages": [ + {"role": "system", "content": "You are a helpful assistant."}, + {"role": "user", "content": "What is 2+2?"}, + {"role": "assistant", "content": "4"} +]} +```text + +In `generate` mode, assistant turns are stripped so the row ends with a user turn. diff --git a/examples/speculative_decoding/prepare_input_conversations/add_nemotron_chat.py b/examples/dataset/add_nemotron_chat.py similarity index 100% rename from examples/speculative_decoding/prepare_input_conversations/add_nemotron_chat.py rename to examples/dataset/add_nemotron_chat.py diff --git a/examples/dataset/augmentations.yaml b/examples/dataset/augmentations.yaml new file mode 100644 index 0000000000..19ca785934 --- /dev/null +++ b/examples/dataset/augmentations.yaml @@ -0,0 +1,93 @@ +# Augmentation specs for make_nemotron_ptv2_dataset.py and make_nemotron_ptv3_dataset.py +# +# Each entry defines one augmentation variant applied cyclically across the dataset. +# The augmented copy is the same size as the source — each row gets exactly one variant. +# +# Supported types: +# +# user_suffix +# Appends `text` to the content of every user message in the conversation. +# Example use: language-redirect instructions, style/length hints. +# +# system_prompt +# Prepends a {"role": "system", "content": } message to the conversation. +# Use this for model-specific flags (e.g. /no_think) or persona instructions. +# Set `enabled: false` for variants that are not supported by your target model. +# +# To disable an entry without deleting it, add `enabled: false`. +# To add a new variant, append a new entry following the same schema. + +augmentations: + + # --- Language redirects (user_suffix) ------------------------------------ + + - type: user_suffix + text: " Please reply in French instead of English." + + - type: user_suffix + text: " Please reply in Italian instead of English." + + - type: user_suffix + text: " Please reply in German instead of English." + + - type: user_suffix + text: " Please reply in Spanish instead of English." + + - type: user_suffix + text: " Please reply in Mandarin Chinese instead of English." + + - type: user_suffix + text: " Please reply in Japanese instead of English." + + - type: user_suffix + text: " Please reply in Korean instead of English." + + - type: user_suffix + text: " Please reply in Turkish instead of English." + + - type: user_suffix + text: " Please reply in Modern Standard Arabic instead of English." + + - type: user_suffix + text: " Please reply in Russian instead of English." + + - type: user_suffix + text: " Please reply in Brazilian Portuguese instead of English." + + - type: user_suffix + text: " Please reply in Vietnamese instead of English." + + # --- Style / format hints (user_suffix) ---------------------------------- + + - type: user_suffix + text: " Be concise and answer in as few words as possible." + + - type: user_suffix + text: " Provide a detailed, step-by-step explanation." + + - type: user_suffix + text: " Format your response using Markdown (headers, bullet points, code blocks where appropriate)." + + - type: user_suffix + text: " Do not use Markdown formatting; reply in plain text only." + + - type: user_suffix + text: " Explain your answer as if I am a complete beginner with no prior knowledge." + + - type: user_suffix + text: " Assume I am an expert; skip basic explanations and go straight to the details." + + - type: user_suffix + text: " Think step by step before giving your final answer." + + # --- System-prompt variants (system_prompt) ------------------------------ + + # /no_think: suppresses chain-of-thought in models that support it (e.g. Qwen3). + # Set enabled: false if your target model does not support this flag. + - type: system_prompt + content: "You are a helpful assistant. /no_think" + enabled: false + + # Generic helpful-assistant system prompt (no special flags). + - type: system_prompt + content: "You are a helpful, respectful, and honest assistant." diff --git a/examples/dataset/conversation_utils.py b/examples/dataset/conversation_utils.py new file mode 100644 index 0000000000..1b80d59258 --- /dev/null +++ b/examples/dataset/conversation_utils.py @@ -0,0 +1,200 @@ +# SPDX-FileCopyrightText: Copyright (c) 2023-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Shared conversation manipulation and augmentation utilities for dataset preparation. + +Imported by make_nemotron_ptv2_dataset.py and make_nemotron_ptv3_dataset.py. + +These scripts produce *input conversations* for synthetic data generation: the +conversations are fed to a target model which generates responses, producing +training data for the speculative-decoding draft model. + +Conversation format +------------------- +Each conversation is stripped down to a skeleton of system + user turns only — all +assistant turns are removed. The downstream generation pipeline (query.py) feeds this +skeleton to the target model turn-by-turn, appending each generated response before +sending the next user turn, so the model produces coherent multi-turn continuations. + +Augmentations (``user_suffix``) are applied to *all* user messages so that the +language or style instruction is present at every turn — important for multi-turn +synthetic generation where the model must maintain the requested style throughout. +""" + +import logging +from dataclasses import dataclass +from pathlib import Path +from typing import Any + +import yaml + +logger = logging.getLogger(__name__) + + +@dataclass +class AugmentationSpec: + """One augmentation variant. + + type: "user_suffix" — appends ``text`` to the last user message. + "system_prompt" — prepends a system message with ``content``. + """ + + type: str + text: str = "" + content: str = "" + enabled: bool = True + + def __post_init__(self): + if self.type not in ("user_suffix", "system_prompt"): + raise ValueError( + f"Unknown augmentation type '{self.type}'. " + "Expected 'user_suffix' or 'system_prompt'." + ) + if self.type == "user_suffix" and not self.text: + raise ValueError("user_suffix augmentation requires a non-empty 'text' field.") + if self.type == "system_prompt" and not self.content: + raise ValueError("system_prompt augmentation requires a non-empty 'content' field.") + + +def load_augmentations(config_path: Path) -> list[AugmentationSpec]: + """Load and validate augmentation specs from a YAML file, returning only enabled ones.""" + with config_path.open("r", encoding="utf-8") as f: + data = yaml.safe_load(f) + + specs = [AugmentationSpec(**entry) for entry in data.get("augmentations", [])] + enabled = [s for s in specs if s.enabled] + + if not enabled: + raise ValueError( + f"No enabled augmentations found in {config_path}. " + "Enable at least one entry or pass --no-augmentation." + ) + + logger.info( + "Loaded %d augmentation spec(s) from %s (%d disabled).", + len(enabled), + config_path, + len(specs) - len(enabled), + ) + for spec in enabled: + if spec.type == "user_suffix": + logger.info(" [user_suffix] %r", spec.text.strip()) + else: + logger.info(" [system_prompt] %r", spec.content.strip()) + + return enabled + + +def make_augment_fn(specs: list[AugmentationSpec]): + """Return a datasets.map-compatible function that cycles through *specs* by row index. + + user_suffix: appended to ALL user messages so the language/style instruction is + present at every turn — important for multi-turn synthetic generation where the + model must maintain the requested language or style throughout the conversation. + + system_prompt: prepended only when the conversation has no existing system + message. If the dataset already provides a system prompt it is kept as-is and + this augmentation variant is skipped for that row (returning it unchanged). + """ + + def _augment(example: dict[str, Any], idx: int) -> dict[str, Any]: + spec = specs[idx % len(specs)] + messages = [dict(m) for m in example["messages"]] # shallow copy per row + + if spec.type == "user_suffix": + for msg in messages: + if msg["role"] == "user": + msg["content"] = msg["content"] + spec.text + else: # system_prompt + has_system = messages and messages[0]["role"] == "system" + if has_system: + # Conflict: dataset already has a system prompt — skip this augmentation. + pass + else: + messages = [{"role": "system", "content": spec.content}, *messages] + + return {"messages": messages} + + return _augment + + +def has_tool_turns(example: dict[str, Any]) -> bool: + """Return True if the conversation contains any ``tool`` role message.""" + return any(m.get("role") == "tool" for m in example["messages"]) + + +def strip_assistant_turns(example: dict[str, Any], idx: int) -> dict[str, Any]: + """Keep only the system prompt and user turns; remove all assistant turns. + + This produces a conversation skeleton for synthetic data generation. + The downstream generation pipeline feeds this skeleton to the target model + turn-by-turn, appending each generated response before sending the next user + turn: + + dataset: [system, user1, user2, user3] + + step 1: feed [system, user1] → gen_asst1 + step 2: feed [system, user1, gen_asst1, user2] → gen_asst2 + step 3: feed [system, user1, gen_asst1, user2, gen_asst2, user3] → gen_asst3 + + Rows with no user turns are returned empty and filtered out by the caller. + """ + messages = [m for m in example["messages"] if m["role"] in ("system", "user")] + if not any(m["role"] == "user" for m in messages): + return {"messages": []} + return {"messages": messages} + + +def normalize_messages(example: dict[str, Any], idx: int) -> dict[str, Any]: + """Normalize to clean OpenAI message format for SFT training. + + Drops dataset-specific extra fields (``reasoning_content``, etc.) while + preserving the fields required by the OpenAI chat format for each role: + + system / user → {role, content} + assistant → {role, content} + tool_calls if present + tool → {role, content, tool_call_id} + + ``tool`` turns are kept because dropping them breaks agentic conversations: + an assistant message that issued a tool_call must be followed by the tool + result before the next assistant message, or the training signal is corrupted. + + Prompt-only rows (no assistant turn) are returned with their messages intact; + callers should filter them out for training use. + """ + normalized = [] + for m in example["messages"]: + role = m.get("role") + if role in ("system", "user"): + normalized.append({"role": role, "content": m.get("content") or ""}) + elif role == "assistant": + msg: dict[str, Any] = {"role": "assistant", "content": m.get("content") or ""} + if m.get("tool_calls"): + msg["tool_calls"] = m["tool_calls"] + normalized.append(msg) + elif role == "tool": + normalized.append( + { + "role": "tool", + "content": m.get("content") or "", + "tool_call_id": m.get("tool_call_id", ""), + } + ) + elif role == "developer": + # Map developer-role messages to system per OpenAI schema conventions. + normalized.append({"role": "system", "content": m.get("content") or ""}) + # other roles (e.g. function, unknown) are dropped + return {"messages": normalized} diff --git a/examples/speculative_decoding/prepare_input_conversations/example_data_config.yaml b/examples/dataset/example_data_config.yaml similarity index 100% rename from examples/speculative_decoding/prepare_input_conversations/example_data_config.yaml rename to examples/dataset/example_data_config.yaml diff --git a/examples/speculative_decoding/prepare_input_conversations/make_dataset.py b/examples/dataset/make_dataset.py similarity index 100% rename from examples/speculative_decoding/prepare_input_conversations/make_dataset.py rename to examples/dataset/make_dataset.py diff --git a/examples/dataset/make_nemotron_ptv2_dataset.py b/examples/dataset/make_nemotron_ptv2_dataset.py new file mode 100644 index 0000000000..786e753477 --- /dev/null +++ b/examples/dataset/make_nemotron_ptv2_dataset.py @@ -0,0 +1,248 @@ +# SPDX-FileCopyrightText: Copyright (c) 2023-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Prepare a dataset from nvidia/Nemotron-Post-Training-Dataset-v2 for two purposes: + + generate (default) + Input conversations for synthetic data generation. The last assistant turn is + stripped so a target model can generate a fresh response. An augmented copy of + the major splits is appended (language-redirect and style variants) to diversify + the prompts seen during generation. Multilingual splits are included but not + augmented (they are already non-English). + + train + Full conversations for direct SFT training. All turns are kept and normalized + to clean OpenAI message format (role + content only). No augmentation is applied. + +Output format +------------- +Every output row is a JSON object with a single ``messages`` key whose value is a list +of ``{"role": ..., "content": ...}`` dicts — standard OpenAI chat format. + +Expected output size (approximate, default settings) +---------------------------------------------------- +Split sizes (Nemotron-Post-Training-Dataset-v2): + stem 355K | chat 628K | math 239K | code 175K → major total ~1.40M + multilingual ×5 splits, each capped at 100K → 500K + + generate mode: major ~1.40M + augmented ~1.40M + multilingual 500K ≈ 3.3M rows + train mode: major ~1.40M + multilingual 500K ≈ 1.9M rows + +Usage +----- + # Synthetic data generation (default): + python make_nemotron_ptv2_dataset.py --output-dir /tmp/ptv2_gen + + # Direct SFT training mix: + python make_nemotron_ptv2_dataset.py --mode train --output-dir /tmp/ptv2_train + + # Custom augmentation config: + python make_nemotron_ptv2_dataset.py --augmentations-config my_augs.yaml \\ + --output-dir /tmp/ptv2_gen +""" + +import argparse +import logging +import os +from pathlib import Path + +from conversation_utils import ( + has_tool_turns, + load_augmentations, + make_augment_fn, + normalize_messages, + strip_assistant_turns, +) +from datasets import concatenate_datasets, load_dataset + +logging.basicConfig( + level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", datefmt="%H:%M:%S" +) +logger = logging.getLogger(__name__) + +MAJOR_SPLITS = ["stem", "chat", "math", "code"] + +MULTILINGUAL_SPLITS = [ + "multilingual_ja", + "multilingual_de", + "multilingual_it", + "multilingual_es", + "multilingual_fr", +] + +_DEFAULT_AUGMENTATIONS_CONFIG = Path(__file__).parent / "augmentations.yaml" + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="Build a dataset from Nemotron-Post-Training-Dataset-v2.", + formatter_class=argparse.ArgumentDefaultsHelpFormatter, + ) + parser.add_argument( + "--dataset", + default="nvidia/Nemotron-Post-Training-Dataset-v2", + help="HuggingFace Hub repo ID or local path to the dataset.", + ) + parser.add_argument( + "--mode", + choices=["generate", "train"], + default="generate", + help=( + "generate: strip last assistant turn and apply augmentations — " + "produces multi-turn input conversations for synthetic data generation. " + "train: keep all turns in clean OpenAI format — " + "produces a dataset ready for direct SFT training." + ), + ) + parser.add_argument( + "--output-dir", + default="/tmp/ptv2_gen", + help="Directory where output JSONL files will be written.", + ) + parser.add_argument( + "--multilingual-cap", + type=int, + default=100_000, + help="Maximum number of rows to take from each multilingual split.", + ) + parser.add_argument( + "--seed", + type=int, + default=42, + help="Random seed for shuffling.", + ) + parser.add_argument( + "--num-proc", + type=int, + default=min(8, os.cpu_count() or 1), + help="Number of parallel workers for dataset.map().", + ) + parser.add_argument( + "--augmentations-config", + type=Path, + default=_DEFAULT_AUGMENTATIONS_CONFIG, + help="Path to a YAML file listing augmentation specs (generate mode only).", + ) + parser.add_argument( + "--no-augmentation", + action="store_true", + help="Skip augmentation even in generate mode.", + ) + parser.add_argument( + "--no-subsets", + action="store_true", + help="Skip writing the 1K / 10K / 100K prefix subsets.", + ) + return parser.parse_args() + + +def main() -> None: + args = parse_args() + output_dir = Path(args.output_dir) + output_dir.mkdir(parents=True, exist_ok=True) + logger.info("Mode: %s", args.mode) + + # ------------------------------------------------------------------ + # Augmentation (generate mode only) + # ------------------------------------------------------------------ + aug_fn = None + if args.mode == "generate" and not args.no_augmentation: + aug_specs = load_augmentations(args.augmentations_config) + aug_fn = make_augment_fn(aug_specs) + + combined = None + + # ------------------------------------------------------------------ + # Major splits + # ------------------------------------------------------------------ + logger.info("Loading major splits: %s", MAJOR_SPLITS) + for split in MAJOR_SPLITS: + logger.info(" loading split '%s'", split) + ds = load_dataset(args.dataset, split=split) + if args.mode == "generate": + ds = ds.filter(lambda ex: not has_tool_turns(ex), num_proc=args.num_proc) + ds = ds.map(strip_assistant_turns, with_indices=True, num_proc=args.num_proc) + ds = ds.filter(lambda ex: len(ex["messages"]) > 0, num_proc=args.num_proc) + else: + ds = ds.map(normalize_messages, with_indices=True, num_proc=args.num_proc) + # Train mode: drop prompt-only rows (no assistant turn = nothing to train on). + ds = ds.filter( + lambda ex: any(m["role"] == "assistant" for m in ex["messages"]), + num_proc=args.num_proc, + ) + logger.info(" %d rows", len(ds)) + combined = ds if combined is None else concatenate_datasets([combined, ds]) + + assert combined is not None + logger.info("Major splits total: %d rows", len(combined)) + + # ------------------------------------------------------------------ + # Augmented copy (generate mode only, applied to major splits) + # ------------------------------------------------------------------ + if aug_fn is not None: + logger.info("Augmenting with %d variant(s)...", len(aug_specs)) + augmented = combined.map(aug_fn, with_indices=True, num_proc=args.num_proc) + combined = concatenate_datasets([combined, augmented]) + logger.info("After major + augmented: %d rows", len(combined)) + + # ------------------------------------------------------------------ + # Multilingual splits (not augmented — already non-English) + # ------------------------------------------------------------------ + logger.info("Loading multilingual splits: %s", MULTILINGUAL_SPLITS) + for split in MULTILINGUAL_SPLITS: + logger.info(" loading split '%s'", split) + ds = load_dataset(args.dataset, split=split) + if len(ds) > args.multilingual_cap: + ds = ds.select(range(args.multilingual_cap)) + if args.mode == "generate": + ds = ds.filter(lambda ex: not has_tool_turns(ex), num_proc=args.num_proc) + ds = ds.map(strip_assistant_turns, with_indices=True, num_proc=args.num_proc) + ds = ds.filter(lambda ex: len(ex["messages"]) > 0, num_proc=args.num_proc) + else: + ds = ds.map(normalize_messages, with_indices=True, num_proc=args.num_proc) + ds = ds.filter( + lambda ex: any(m["role"] == "assistant" for m in ex["messages"]), + num_proc=args.num_proc, + ) + logger.info(" %d rows", len(ds)) + combined = concatenate_datasets([combined, ds]) + + logger.info("Combined (pre-shuffle): %d rows", len(combined)) + + # ------------------------------------------------------------------ + # Shuffle and save + # ------------------------------------------------------------------ + combined = combined.shuffle(seed=args.seed) + logger.info("Shuffled with seed=%d", args.seed) + + full_path = output_dir / "default.jsonl" + logger.info("Writing %d rows to %s", len(combined), full_path) + combined.to_json(str(full_path), num_proc=args.num_proc) + + if not args.no_subsets: + for n, name in [(1_000, "1K"), (10_000, "10K"), (100_000, "100K")]: + if len(combined) < n: + logger.warning("Fewer than %d rows — skipping %s subset.", n, name) + continue + subset_path = output_dir / f"sample-{name}.jsonl" + logger.info("Writing %s subset to %s", name, subset_path) + combined.select(range(n)).to_json(str(subset_path), num_proc=args.num_proc) + + logger.info("Done. Output files are in %s", output_dir) + + +if __name__ == "__main__": + main() diff --git a/examples/dataset/make_nemotron_ptv3_dataset.py b/examples/dataset/make_nemotron_ptv3_dataset.py new file mode 100644 index 0000000000..bb013e3e86 --- /dev/null +++ b/examples/dataset/make_nemotron_ptv3_dataset.py @@ -0,0 +1,304 @@ +# SPDX-FileCopyrightText: Copyright (c) 2023-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Prepare a dataset from the Nemotron Post-Training V3 collection for two purposes: +https://huggingface.co/collections/nvidia/nemotron-post-training-v3 + + generate (default) + Input conversations for synthetic data generation. The last assistant turn is + stripped so a target model can generate a fresh response. Datasets marked + ``augment: true`` in the config get an augmented copy appended (language-redirect + and style variants). Multilingual datasets (``augment: false``) are included + as-is without augmentation. + + train + Full conversations for direct SFT training. All turns are kept and normalized + to clean OpenAI message format (role + content only). No augmentation is applied. + The ``augment`` flag in the dataset config is ignored. + +Output format +------------- +Every output row is a JSON object with a single ``messages`` key whose value is a list +of ``{"role": ..., "content": ...}`` dicts — standard OpenAI chat format. + +Dataset config +-------------- +``nemotron_ptv3_datasets.yaml`` lists every dataset in the mix with: + repo_id — HuggingFace repo ID or local path + splits — list of splits to load + cap_per_split — max rows per split (null = no cap) + augment — false for multilingual splits (generate mode only) + +Edit the YAML to add, remove, or re-weight datasets without touching this script. + +Expected output size (approximate, default settings from nemotron_ptv3_datasets.yaml) +------------------------------------------------------------------------------------ +Datasets without tool turns (included in both modes): + Math-v2 200K | SFT-Math-v3 200K | Math-Proofs 50K → 450K + Comp-Prog-v1 capped 300K | SFT-Comp-Prog-v2 capped 200K → 500K + Science 226K | IF-Chat-v1 288K | IF-Chat-v2 ~2K → ~516K + Safety 45K | Finance capped 100K → 145K + Multilingual ×18 splits, each capped at 10K → 180K + Subtotal (base) → ~1.79M + +Datasets with tool turns (generate mode: excluded; train mode: included): + SWE-v1 51K | SFT-SWE-v2 256K | OpenCode ~459K → ~766K + Agentic-v1 335K | SFT-Agentic-v2 ~992K → ~1.33M + Tool-turn subtotal → ~2.09M + + generate mode: base ~1.61M (excl. multilingual) ×2 aug + multilingual 180K ≈ 3.4M rows + train mode: base ~1.61M + tool datasets ~2.09M + multilingual 180K ≈ 3.9M rows + +Usage +----- + # Synthetic data generation (default): + python make_nemotron_ptv3_dataset.py --output-dir /tmp/ptv3_gen + + # Direct SFT training mix: + python make_nemotron_ptv3_dataset.py --mode train --output-dir /tmp/ptv3_train + + # Custom dataset list: + python make_nemotron_ptv3_dataset.py --datasets-config my_datasets.yaml \\ + --output-dir /tmp/ptv3_gen +""" + +import argparse +import logging +import os +from dataclasses import dataclass +from pathlib import Path +from typing import Any + +import yaml +from conversation_utils import ( + has_tool_turns, + load_augmentations, + make_augment_fn, + normalize_messages, + strip_assistant_turns, +) +from datasets import concatenate_datasets, load_dataset + +logging.basicConfig( + level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", datefmt="%H:%M:%S" +) +logger = logging.getLogger(__name__) + +_DEFAULT_DATASETS_CONFIG = Path(__file__).parent / "nemotron_ptv3_datasets.yaml" +_DEFAULT_AUGMENTATIONS_CONFIG = Path(__file__).parent / "augmentations.yaml" + + +# ------------------------------------------------------------------- +# Dataset spec +# ------------------------------------------------------------------- + + +@dataclass +class DatasetSpec: + repo_id: str + splits: list[str] + cap_per_split: int | None = None + augment: bool = True + + +def load_dataset_specs(config_path: Path) -> list[DatasetSpec]: + with config_path.open("r", encoding="utf-8") as f: + data = yaml.safe_load(f) + return [DatasetSpec(**entry) for entry in data.get("datasets", [])] + + +# ------------------------------------------------------------------- +# Loading +# ------------------------------------------------------------------- + + +def load_split(repo_id: str, split: str, cap: int | None, num_proc: int, mode: str): + """Load one split, apply cap, normalize for the requested mode.""" + logger.info(" split '%s' ...", split) + ds = load_dataset(repo_id, split=split) + if cap is not None and len(ds) > cap: + ds = ds.select(range(cap)) + logger.info(" capped to %d rows", cap) + if mode == "generate": + ds = ds.filter(lambda ex: not has_tool_turns(ex), num_proc=num_proc) + ds = ds.map(strip_assistant_turns, with_indices=True, num_proc=num_proc) + ds = ds.filter(lambda ex: len(ex["messages"]) > 0, num_proc=num_proc) + else: + ds = ds.map(normalize_messages, with_indices=True, num_proc=num_proc) + # Train mode: drop prompt-only rows (no assistant turn = nothing to train on). + ds = ds.filter( + lambda ex: any(m["role"] == "assistant" for m in ex["messages"]), + num_proc=num_proc, + ) + logger.info(" %d rows", len(ds)) + return ds + + +# ------------------------------------------------------------------- +# CLI +# ------------------------------------------------------------------- + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="Build a dataset from the Nemotron Post-Training V3 collection.", + formatter_class=argparse.ArgumentDefaultsHelpFormatter, + ) + parser.add_argument( + "--mode", + choices=["generate", "train"], + default="generate", + help=( + "generate: strip last assistant turn and apply augmentations — " + "produces multi-turn input conversations for synthetic data generation. " + "train: keep all turns in clean OpenAI format — " + "produces a dataset ready for direct SFT training." + ), + ) + parser.add_argument( + "--datasets-config", + type=Path, + default=_DEFAULT_DATASETS_CONFIG, + help="YAML file listing datasets, splits, and caps.", + ) + parser.add_argument( + "--augmentations-config", + type=Path, + default=_DEFAULT_AUGMENTATIONS_CONFIG, + help="YAML file listing augmentation specs (generate mode only).", + ) + parser.add_argument( + "--output-dir", + default="/tmp/ptv3_train", + help="Directory where output JSONL files will be written.", + ) + parser.add_argument( + "--seed", + type=int, + default=42, + help="Random seed for shuffling.", + ) + parser.add_argument( + "--num-proc", + type=int, + default=min(8, os.cpu_count() or 1), + help="Number of parallel workers for dataset.map().", + ) + parser.add_argument( + "--no-augmentation", + action="store_true", + help="Skip augmentation even in generate mode.", + ) + parser.add_argument( + "--no-subsets", + action="store_true", + help="Skip writing the 1K / 10K / 100K prefix subsets.", + ) + return parser.parse_args() + + +# ------------------------------------------------------------------- +# Main +# ------------------------------------------------------------------- + + +def main() -> None: + args = parse_args() + output_dir = Path(args.output_dir) + output_dir.mkdir(parents=True, exist_ok=True) + logger.info("Mode: %s", args.mode) + + # ------------------------------------------------------------------ + # Load configs + # ------------------------------------------------------------------ + dataset_specs = load_dataset_specs(args.datasets_config) + logger.info("Loaded %d dataset spec(s) from %s.", len(dataset_specs), args.datasets_config) + + aug_fn = None + if args.mode == "generate" and not args.no_augmentation: + aug_specs = load_augmentations(args.augmentations_config) + aug_fn = make_augment_fn(aug_specs) + + # ------------------------------------------------------------------ + # Load all datasets + # In generate mode, partition into augmentable vs. non-augmentable. + # In train mode, all datasets are treated identically. + # ------------------------------------------------------------------ + augmentable_parts: list[Any] = [] + non_augmentable_parts: list[Any] = [] + + for spec in dataset_specs: + logger.info("Loading %s (augment=%s)", spec.repo_id, spec.augment) + for split in spec.splits: + ds = load_split(spec.repo_id, split, spec.cap_per_split, args.num_proc, args.mode) + if args.mode == "generate" and not spec.augment: + non_augmentable_parts.append(ds) + else: + augmentable_parts.append(ds) + + augmentable = concatenate_datasets(augmentable_parts) if augmentable_parts else None + non_augmentable = concatenate_datasets(non_augmentable_parts) if non_augmentable_parts else None + if augmentable is not None: + logger.info("Augmentable rows: %d", len(augmentable)) + if non_augmentable is not None: + logger.info("Non-augmentable (multilingual) rows: %d", len(non_augmentable)) + + # ------------------------------------------------------------------ + # Augmentation (generate mode only) + # ------------------------------------------------------------------ + parts_to_combine: list[Any] = [] + if augmentable is not None: + parts_to_combine.append(augmentable) + + if aug_fn is not None and augmentable is not None: + logger.info("Augmenting %d rows with %d variant(s)...", len(augmentable), len(aug_specs)) + augmented = augmentable.map(aug_fn, with_indices=True, num_proc=args.num_proc) + logger.info("Augmented dataset: %d rows", len(augmented)) + parts_to_combine.append(augmented) + + if non_augmentable is not None: + parts_to_combine.append(non_augmentable) + + if not parts_to_combine: + raise ValueError("No data to combine — all rows were filtered out.") + + combined = concatenate_datasets(parts_to_combine) + logger.info("Combined (pre-shuffle): %d rows", len(combined)) + + # ------------------------------------------------------------------ + # Shuffle and save + # ------------------------------------------------------------------ + combined = combined.shuffle(seed=args.seed) + logger.info("Shuffled with seed=%d", args.seed) + + full_path = output_dir / "default.jsonl" + logger.info("Writing %d rows to %s", len(combined), full_path) + combined.to_json(str(full_path), num_proc=args.num_proc) + + if not args.no_subsets: + for n, name in [(1_000, "1K"), (10_000, "10K"), (100_000, "100K")]: + if len(combined) < n: + logger.warning("Fewer than %d rows — skipping %s subset.", n, name) + continue + subset_path = output_dir / f"sample-{name}.jsonl" + logger.info("Writing %s subset to %s", name, subset_path) + combined.select(range(n)).to_json(str(subset_path), num_proc=args.num_proc) + + logger.info("Done. Output files are in %s", output_dir) + + +if __name__ == "__main__": + main() diff --git a/examples/dataset/nemotron_ptv3_datasets.yaml b/examples/dataset/nemotron_ptv3_datasets.yaml new file mode 100644 index 0000000000..882016148c --- /dev/null +++ b/examples/dataset/nemotron_ptv3_datasets.yaml @@ -0,0 +1,193 @@ +# Dataset mix for make_nemotron_ptv3_dataset.py +# +# Each entry defines one HuggingFace dataset to include in the training mix. Fields: +# +# repo_id — HuggingFace repo ID or local path. +# splits — List of splits to load. All splits are concatenated before capping. +# cap_per_split — Maximum rows taken from each split (omit or leave empty = no cap). +# augment — Whether to include this dataset in the augmented copy. +# Set to false for already-multilingual splits so language-redirect +# augmentations are not applied on top of existing non-English content. +# +# Edit this file to add, remove, or re-weight datasets without touching the script. +# After editing, re-run make_nemotron_ptv3_dataset.py to regenerate the dataset. + +datasets: + + # --------------------------------------------------------------------------- + # Math + # --------------------------------------------------------------------------- + + # Nemotron-Math-v2: 7 M rows across quality tiers. Cap each split to avoid + # math dominating the mix. + - repo_id: nvidia/Nemotron-Math-v2 + splits: + - high_part00 + - high_part01 + - high_part02 + - medium + - low + cap_per_split: 40000 # 200 K total across 5 splits + augment: true + + # Nemotron-SFT-Math-v3: 1.24 M rows, curated SFT math with tool use. + - repo_id: nvidia/Nemotron-SFT-Math-v3 + splits: + - train + cap_per_split: 200000 + augment: true + + # Nemotron-Math-Proofs-v1: formal Lean 4 proofs — specialized but useful for + # diversity. Cap to avoid over-representing this narrow domain. + - repo_id: nvidia/Nemotron-Math-Proofs-v1 + splits: + - lean + cap_per_split: 50000 + augment: true + + # --------------------------------------------------------------------------- + # Code / Software Engineering + # --------------------------------------------------------------------------- + + - repo_id: nvidia/Nemotron-SWE-v1 + splits: + - r2e_gym + cap_per_split: + augment: true + + - repo_id: nvidia/Nemotron-SFT-SWE-v2 + splits: + - agentless + - openhands_swe + cap_per_split: + augment: true + + # Competitive-Programming-v1 has ~3.93M rows across 6 splits (cpp ×2, python ×2, + # infinibyte ×2). Capped to prevent code from dominating the mix. + - repo_id: nvidia/Nemotron-Competitive-Programming-v1 + splits: + - competitive_coding_cpp_part00 + - competitive_coding_cpp_part01 + - competitive_coding_python_part00 + - competitive_coding_python_part01 + - infinibyte_part00 + - infinibyte_part01 + cap_per_split: 50000 # 300K total across 6 splits + augment: true + + # SFT-Competitive-Programming-v2 has ~845K rows across 4 splits. Also capped. + - repo_id: nvidia/Nemotron-SFT-Competitive-Programming-v2 + splits: + - exercism + - competitive_coding_python + - competitive_coding_cpp + - text_to_sql + cap_per_split: 50000 # 200K total across 4 splits + augment: true + + - repo_id: nvidia/Nemotron-SFT-OpenCode-v1 + splits: + - general + - bash_only_tool + - bash_only_tool_skills + - question_tool + - agent_skills + - agent_skills_question_tool + cap_per_split: + augment: true + + # --------------------------------------------------------------------------- + # Science + # --------------------------------------------------------------------------- + + - repo_id: nvidia/Nemotron-Science-v1 + splits: + - MCQ + - RQA + cap_per_split: # ~226 K total + augment: true + + # --------------------------------------------------------------------------- + # Chat / Instruction Following + # --------------------------------------------------------------------------- + + - repo_id: nvidia/Nemotron-Instruction-Following-Chat-v1 + splits: + - chat_if + - structured_outputs + cap_per_split: # ~288 K total + augment: true + + - repo_id: nvidia/Nemotron-SFT-Instruction-Following-Chat-v2 + splits: + - reasoning_off + - reasoning_on + cap_per_split: + augment: true + + # --------------------------------------------------------------------------- + # Agentic + # --------------------------------------------------------------------------- + + - repo_id: nvidia/Nemotron-Agentic-v1 + splits: + - interactive_agent + - tool_calling + cap_per_split: + augment: true + + - repo_id: nvidia/Nemotron-SFT-Agentic-v2 + splits: + - interactive_agent + - search + - tool_calling + cap_per_split: + augment: true + + # --------------------------------------------------------------------------- + # Safety + # --------------------------------------------------------------------------- + + - repo_id: nvidia/Nemotron-SFT-Safety-v1 + splits: + - train + cap_per_split: # ~45 K + augment: true + + # --------------------------------------------------------------------------- + # Finance (specialized domain) + # --------------------------------------------------------------------------- + + - repo_id: nvidia/Nemotron-SpecializedDomains-Finance-v1 + splits: + - train + cap_per_split: 100000 + augment: true + + # --------------------------------------------------------------------------- + # Multilingual — augment: false because content is already non-English. + # Language-redirect augmentations would be meaningless or contradictory. + # --------------------------------------------------------------------------- + + - repo_id: nvidia/Nemotron-SFT-Multilingual-v1 + splits: + - code_de + - code_es + - code_fr + - code_it + - code_ja + - code_zh + - math_de + - math_es + - math_fr + - math_it + - math_ja + - math_zh + - stem_de + - stem_es + - stem_fr + - stem_it + - stem_ja + - stem_zh + cap_per_split: 10000 # 180 K total across 18 splits + augment: false diff --git a/examples/speculative_decoding/prepare_input_conversations/utils.py b/examples/dataset/utils.py similarity index 100% rename from examples/speculative_decoding/prepare_input_conversations/utils.py rename to examples/dataset/utils.py diff --git a/examples/speculative_decoding/README.md b/examples/speculative_decoding/README.md index 2a29f644e6..aa377cb83d 100644 --- a/examples/speculative_decoding/README.md +++ b/examples/speculative_decoding/README.md @@ -48,13 +48,15 @@ pip install -r requirements.txt We support a range of input datasets. In this example, we will use the [UltraChat-200k](https://huggingface.co/datasets/HuggingFaceH4/ultrachat_200k) dataset. ```bash -python prepare_input_conversations/make_dataset.py -f prepare_input_conversations/example_data_config.yaml --full-conversations +python ../dataset/make_dataset.py -f ../dataset/example_data_config.yaml --full-conversations ``` See [other-datasets](#other-datasets) section for other dataset options and instruction for user-provided data. Omit `--full-conversations` if you plan to run synthetic data generation (see [data-synthesis](#data-synthesis)). +For large-scale training with NVIDIA's Nemotron datasets, use the dedicated scripts described in [Nemotron Datasets](#nemotron-datasets). + ## Getting Started: Simplified Workflow ```bash @@ -201,7 +203,7 @@ See more details on deployment of quantized model to TRTLLM [here](../llm_ptq/RE ### Other Datasets -In addition to the default dataset, we support adding several other commonly used datasets in `prepare_input_conversations/make_dataset.py`: +In addition to the default dataset, we support adding several other commonly used datasets in `../dataset/make_dataset.py`: - MTBench (for debugging) - ShareGPT @@ -219,11 +221,50 @@ To use your own datasets, please preprocess your data into a `.jsonl` file with } ``` +### Nemotron Datasets + +For large-scale training we provide dedicated scripts for NVIDIA's Nemotron Post-Training dataset collections. Both scripts support two modes: + +- **`generate` (default)** — strips all assistant turns, producing a conversation skeleton (`system` + `user` turns only) for synthetic data generation. The downstream pipeline feeds these to the target model turn-by-turn, appending each generated response before sending the next user turn. Optional augmentation adds language-redirect and style-hint variants to diversify prompts. +- **`train`** — keeps all turns in clean OpenAI message format (`role` + `content`) for direct SFT training. Prompt-only rows are dropped. Tool-call context (`tool_calls`, `tool_call_id`) is preserved for agentic datasets. + +**Nemotron Post-Training Dataset V2** ([`nvidia/Nemotron-Post-Training-Dataset-v2`](https://huggingface.co/datasets/nvidia/Nemotron-Post-Training-Dataset-v2)): + +```bash +# Synthetic data generation (~3.3M rows): +python ../dataset/make_nemotron_ptv2_dataset.py --output-dir /tmp/ptv2_gen + +# Direct SFT training mix (~1.9M rows): +python ../dataset/make_nemotron_ptv2_dataset.py --mode train --output-dir /tmp/ptv2_train +``` + +Covers: `stem`, `chat`, `math`, `code` + 5 multilingual splits (ja/de/it/es/fr, capped at 100K each). + +**Nemotron Post-Training V3 collection** ([16 datasets](https://huggingface.co/collections/nvidia/nemotron-post-training-v3)): + +```bash +# Synthetic data generation (~3.4M rows): +python ../dataset/make_nemotron_ptv3_dataset.py --output-dir /tmp/ptv3_gen + +# Direct SFT training mix (~3.9M rows, includes agentic/tool-use datasets): +python ../dataset/make_nemotron_ptv3_dataset.py --mode train --output-dir /tmp/ptv3_train +``` + +Covers: math, code, science, instruction-following, agentic/tool-use, safety, finance, and multilingual data. The dataset mix and per-split row caps are configurable via `../dataset/nemotron_ptv3_datasets.yaml`. + +**Augmentation** (generate mode only) is controlled by `../dataset/augmentations.yaml`. By default it includes 12 language-redirect variants and several style/format hints. The `/no_think` system-prompt variant is disabled by default (enable it for models that support it, e.g. Qwen3): + +```bash +# Custom augmentation config: +python ../dataset/make_nemotron_ptv2_dataset.py \ + --augmentations-config my_augs.yaml --output-dir /tmp/ptv2_gen +``` + ### Data Synthesis To achieve higher acceptance rates during speculative decoding, it is beneficial to use conversations generated by the base model as training data. This ensures that the draft model's output distribution closely aligns with that of the base model. -To prepare such data, we launch an inference server with the base model: +First, prepare input conversation skeletons using `--mode generate` (default) from the Nemotron scripts above, or with `make_dataset.py` (omitting `--full-conversations`). Then launch an inference server with the base model: ```bash pip install vllm diff --git a/examples/speculative_decoding/prepare_input_conversations/__init__.py b/examples/speculative_decoding/prepare_input_conversations/__init__.py deleted file mode 100644 index 926974f550..0000000000 --- a/examples/speculative_decoding/prepare_input_conversations/__init__.py +++ /dev/null @@ -1,16 +0,0 @@ -# SPDX-FileCopyrightText: Copyright (c) 2023-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. -# SPDX-License-Identifier: Apache-2.0 -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Scripts to add various datasets to a prompt dataset file.""" diff --git a/tests/examples/speculative_decoding/conftest.py b/tests/examples/speculative_decoding/conftest.py index 34ab4e4741..f382341175 100644 --- a/tests/examples/speculative_decoding/conftest.py +++ b/tests/examples/speculative_decoding/conftest.py @@ -36,7 +36,7 @@ def tiny_daring_anteater_path(tmp_path_factory): config_path.write_text(yaml.dump(config)) run_example_command( - ["python", "prepare_input_conversations/make_dataset.py", "-f", str(config_path), "--full"], + ["python", "../dataset/make_dataset.py", "-f", str(config_path), "--full-conversations"], "speculative_decoding", ) diff --git a/tools/launcher/common/query.py b/tools/launcher/common/query.py index 99f46efbcb..9b8973322f 100644 --- a/tools/launcher/common/query.py +++ b/tools/launcher/common/query.py @@ -20,9 +20,10 @@ (e.g., EAGLE3 data synthesis). """ -# ruff: noqa: D101, D102, D103, D107, F841, PLR1722 +# ruff: noqa: D101, D102, D103, D107, PLR1722 import argparse import os +import re from datasets import load_dataset from openai import OpenAI @@ -30,18 +31,43 @@ early_termination = False +def _strip_thinking(content: str) -> str: + """Strip ... blocks from assistant message content. + + Used to clean intermediate assistant turns before they are appended to the + context for the next generation step. Only the final assistant turn in a + multi-turn conversation should retain the full reasoning trace. + """ + return re.sub(r".*?", "", content, flags=re.DOTALL).strip() + + class LLM: def __init__(self, args): self.args = args + self._pid = os.getpid() self.client = OpenAI(base_url=args.base_url) self.generate(messages=[{"role": "user", "content": "Hello! /no_think"}], verbose=True) + def _ensure_client(self): + """Reinitialize the HTTP client if we've been forked into a new process. + + datasets.map(num_proc>1) forks worker processes that inherit the parent's + connection pool. Reusing inherited sockets across processes causes + "Invalid HTTP request" errors. Creating a fresh client per-process avoids this. + """ + if os.getpid() != self._pid: + self._pid = os.getpid() + self.client = OpenAI(base_url=self.args.base_url) + def generate(self, messages, verbose=False, **chat_template_kwargs): + global early_termination + self._ensure_client() try: completion = self.client.chat.completions.create( model=self.args.model, messages=messages, temperature=self.args.temperature, + max_tokens=self.args.max_tokens, ) new_message = completion.choices[0].message.content if verbose: @@ -52,11 +78,9 @@ def generate(self, messages, verbose=False, **chat_template_kwargs): new_message = {"role": "assistant", "content": new_message} except Exception as e: print(e) - if "Connection error" in str(e): early_termination = True - - new_message = None + raise # always propagate so datasets.map() halts the shard return new_message @@ -76,6 +100,9 @@ def generate(self, messages, verbose=False, **chat_template_kwargs): ) parser.add_argument("--num-proc", type=int, default=32, help="number of processes (concurrency).") parser.add_argument("--temperature", type=float, default=0.0, help="temperature.") +parser.add_argument( + "--max-tokens", type=int, default=None, help="maximum tokens to generate per response." +) args = parser.parse_args() llm = LLM(args) @@ -90,42 +117,77 @@ def disable_thinking_column(data): def synthesize(data): - messages = data.get("conversations", None) - if messages is None: - messages = data.get("messages", None) + messages = data.get("conversations") or data.get("messages") if messages is None: raise ValueError( - "No conversations of messages in the data. Only OAI chat data is supported." + "No conversations or messages in the data. Only OAI chat data is supported." ) # Handle generation specific kwargs. enable_thinking = data.get("enable_thinking", True) current_messages = [] + last_full_message = None # tracks the most recent generated response (unstripped) for msg in messages: - if msg["role"] == "system": + role = msg["role"] + if role == "system": current_messages.append(msg) - elif msg["role"] == "user": + elif role == "user": if not enable_thinking: + # Copy to avoid mutating the original dataset row. + msg = dict(msg) msg["content"] = msg["content"] + " /no_think" current_messages.append(msg) new_message = llm.generate(current_messages, verbose=False) if new_message is None: break + + last_full_message = new_message + + if enable_thinking: + # Append a thinking-stripped copy as context for the next turn. + # Multi-turn reasoning: only the *last* assistant turn should + # retain the full ... trace; prior turns are + # already resolved and the trace would distract the model. + # The full trace is restored to the last turn after the loop. + stripped = { + "role": "assistant", + "content": _strip_thinking(new_message["content"]), + } + current_messages.append(stripped) else: current_messages.append(new_message) - elif msg["role"] == "assistant": - # Original assistant messages are not used + elif role == "developer": + # Map developer-role messages to system per OpenAI schema conventions. + current_messages.append({"role": "system", "content": msg["content"]}) + elif role == "assistant": + # Original assistant messages are not used — the model generates fresh responses. + pass + elif role == "tool": + # Tool turns are not sent to the generation model — skip them. pass else: - raise ValueError("unknown role: {}".format(msg["role"])) + raise ValueError(f"Unexpected message role {role!r} in conversation.") + + # Restore the full reasoning trace for the last generated assistant turn. + if enable_thinking and last_full_message is not None: + for i in range(len(current_messages) - 1, -1, -1): + if current_messages[i]["role"] == "assistant": + current_messages[i] = last_full_message + break return {"conversations": current_messages} -dataset = load_dataset(args.data, split=args.data_split) +# Support both HF Hub repo IDs and local file paths (.jsonl, .json, .parquet, etc.) +if os.path.isfile(args.data): + ext = os.path.splitext(args.data)[1].lower() + fmt = "parquet" if ext == ".parquet" else "json" + dataset = load_dataset(fmt, data_files={"train": args.data}, split=args.data_split) +else: + dataset = load_dataset(args.data, split=args.data_split) if args.num_shards * 100 > len(dataset): args.num_shards = min(16, len(dataset) // 100) diff --git a/tools/launcher/common/vllm/query.sh b/tools/launcher/common/vllm/query.sh index d203e8994e..4ce6ded196 100755 --- a/tools/launcher/common/vllm/query.sh +++ b/tools/launcher/common/vllm/query.sh @@ -118,9 +118,9 @@ while true; do sleep 10 done -cmd="python common/query.py http://localhost:8000/v1 ${MODEL} ${QUERY_ARGS[*]}" -echo "Running command: $cmd" -eval $cmd +pip3 install -q datasets openai 2>/dev/null || true +echo "Running: python3 common/query.py http://localhost:8000/v1 ${MODEL} ${QUERY_ARGS[*]}" +python3 common/query.py http://localhost:8000/v1 "${MODEL}" "${QUERY_ARGS[@]}" echo "Main process exit" kill $SERVER_PID diff --git a/tools/launcher/core.py b/tools/launcher/core.py index 7004f7e662..7721c2c35d 100644 --- a/tools/launcher/core.py +++ b/tools/launcher/core.py @@ -157,6 +157,8 @@ class SandboxPipeline: task_4: SandboxTask4 = None tasks: list[SandboxTask] = None + assets: list[str] = None # HF repo paths (relative to hf_local) to verify before submission + test_level: int = 0 allow_to_fail: bool = False skip: bool = False @@ -252,7 +254,7 @@ def build_slurm_executor( tunnel = run.SSHTunnel( host=slurm_config.host, - user=getpass.getuser() if user is None else user, + user=user or getattr(slurm_config, "user", None) or getpass.getuser(), port=slurm_config.port, job_dir=job_dir, identity=identity, @@ -320,7 +322,7 @@ def build_docker_executor( ipc_mode="host", container_image=slurm_config.container, volumes=container_mounts, - additional_kwargs={"user": f"{os.getuid()}:{os.getgid()}"}, + additional_kwargs={"user": f"{os.getuid()}:{os.getgid()}", "entrypoint": ""}, packager=packager, ) return executor diff --git a/tools/launcher/examples/Qwen/Qwen3.5-4B/query_specdec_dataset.yaml b/tools/launcher/examples/Qwen/Qwen3.5-4B/query_specdec_dataset.yaml new file mode 100644 index 0000000000..f87b81a94f --- /dev/null +++ b/tools/launcher/examples/Qwen/Qwen3.5-4B/query_specdec_dataset.yaml @@ -0,0 +1,43 @@ +# Data synthesis for Qwen3.5-4B using the Speculative-Decoding-Multilingual-Prompt-v2 dataset. +# +# Starts a TRT-LLM server with Qwen3.5-4B, then runs query.py against it to generate +# synthetic assistant responses for EAGLE3 draft model training. +# +# Local run (requires GPU + Docker): +# uv run launch.py --yaml examples/Qwen/Qwen3.5-4B/query_specdec_dataset.yaml \ +# hf_local=/home/omniml_data_3/hf-local --yes +# +# Slurm run: +# uv run slurm.py --yaml modules/Model-Optimizer/tools/launcher/examples/Qwen/Qwen3.5-4B/query_specdec_dataset.yaml --yes + +job_name: Qwen3.5-4B_specdec_query + +pipeline: + global_vars: + hf_model: /hf-local/Qwen/Qwen3.5-4B + + task_0: + script: common/vllm/query.sh + args: + - --model <> + - --tensor-parallel-size 1 + - --max-num-seqs 32 + - --trust-remote-code + - --gpu-memory-utilization 0.87 + - -- + - --data /hf-local/nvidia/Speculative-Decoding-Multilingual-Prompt-v2/sample-1K.jsonl + - --save /scratchspace/data + - --num-shards 10 + - --num-proc 4 + - --max-tokens 4096 + environment: + - HF_LOCAL: /hf-local + - LOGNAME: chenhany + - USER: chenhany + - HOME: /tmp + slurm_config: + _factory_: "slurm_factory" + nodes: 1 + ntasks_per_node: 1 + gpus_per_node: 1 + container: vllm/vllm-openai:qwen3_5-cu130