Skip to content

Improve Megatron Tokenization: streaming, reasoning_content support, HF in-memory tokenization, etc#1221

Merged
kevalmorabia97 merged 5 commits intomainfrom
kmorabia/megatron-preprocess-data-improvements
Apr 9, 2026
Merged

Improve Megatron Tokenization: streaming, reasoning_content support, HF in-memory tokenization, etc#1221
kevalmorabia97 merged 5 commits intomainfrom
kmorabia/megatron-preprocess-data-improvements

Conversation

@kevalmorabia97
Copy link
Copy Markdown
Collaborator

@kevalmorabia97 kevalmorabia97 commented Apr 9, 2026

What does this PR do?

Type of change: New feature

Improvements to megatron_preprocess_data for Nemotron v3 post-training datasets and Megatron-Bridge distillation workflows:

  • --reasoning_content flag (strip / inline / native) to handle the reasoning_content field in Nemotron Post-Training v3 assistant messages
  • No intermediate JSONL for HuggingFace datasets — load directly from Arrow cache via _iter_hf_as_json + process_hf_split
  • Return output prefixes (list[str]) from the Python API so callers can build --data_paths without hardcoding paths; also printed at end of run
  • Gzip input support.jsonl.gz files accepted directly; --input_dir globs both *.jsonl and *.jsonl.gz
  • --strip_newlines flag (opt-in) to replace newlines with spaces in plain-text values; default preserves newlines (no breaking change for code/structured-text datasets)
  • --hf_streaming flag for very large datasets — only consumed rows are downloaded; automatically falls back to non-streaming (with a warning) if --hf_max_samples_per_split is not set, since streaming without a cap is slower than cached non-streaming
  • Auto-shuffle when --hf_max_samples_per_split is set — reservoir sampling (buffer=10,000, seed=42) applied before capping to avoid biased prefix sampling
  • Remove _document suffix from output filenames (_text.bin instead of _text_document.bin)
  • Fix duplicate BOS token for chat-template data (add_special_tokens=False)
  • Fix TypeError in process_hf_split (sum(list) not sum(int))
  • Suppress duplicate prints across pool workers via _is_main_or_first_worker()
  • Raise KeyError instead of warning for missing JSON keys
  • Default hf_split=None (all splits) instead of "train"

Usage

from modelopt.torch.utils.plugins.megatron_preprocess_data import megatron_preprocess_data

# Nemotron v3 with reasoning content preserved inline as <think>...</think>
prefixes = megatron_preprocess_data(
    hf_dataset="nvidia/Nemotron-Post-Training-Dataset-v3",
    json_keys=["messages"],
    tokenizer_name_or_path="Qwen/Qwen3-0.6B",
    output_dir="tokenized/",
    workers=32,
    reasoning_content="inline",
)
# prefixes == ["tokenized/nvidia--Nemotron-Post-Training-Dataset-v3_..._messages"]
data_paths = [x for p in prefixes for x in ("1.0", p)]

# Large pretraining dataset — stream + cap (auto-shuffled before capping)
prefixes = megatron_preprocess_data(
    hf_dataset="nvidia/Nemotron-CC-v2.1",
    hf_name="High-Quality",
    hf_max_samples_per_split=5_000_000,
    hf_streaming=True,
    json_keys=["text"],
    tokenizer_name_or_path="Qwen/Qwen3-0.6B",
    output_dir="tokenized/",
    workers=32,
    append_eod=True,
    strip_newlines=True,
)

Testing

  • New unit tests
  • Tested tokenization on Nemotron Pretraining and Post-training v3 datasets

Before your PR is "Ready for review"

Make sure you read and follow Contributor guidelines and your commits are signed (git commit -s -S).

Make sure you read and follow the Security Best Practices (e.g. avoiding hardcoded trust_remote_code=True, torch.load(..., weights_only=False), pickle, etc.).

  • Is this change backward compatible?: ✅ (output filename changed: _text_document_text; existing callers need to re-tokenize or rename files)
  • If you copied code from any other sources or added a new PIP dependency, did you follow guidance in CONTRIBUTING.md: N/A
  • Did you write any new necessary tests?: ✅
  • Did you update Changelog?: ✅

Summary by CodeRabbit

  • New Features

    • Preprocessing tool: configurable reasoning-content modes (strip|inline|native), optional newline stripping, gzip (.jsonl.gz) input support, HF streaming mode, auto-shuffle when per-split max samples set, returns output-file prefixes, and processes all HF splits by default without writing intermediate JSONL.
  • Documentation

    • Consolidated and reformatted dataset preparation and tokenization guidance; updated install/auth instructions and examples.
  • Tests

    • Tests updated to validate returned prefixes, reasoning-content behaviors, gzip input handling, HF streaming warnings, and HF output assertions.

@kevalmorabia97 kevalmorabia97 requested review from a team as code owners April 9, 2026 12:35
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai bot commented Apr 9, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Adds direct Hugging Face dataset streaming to megatron_preprocess_data (removes HF JSONL intermediates), introduces reasoning_content modes and --strip_newlines for Nemotron v3 messages, changes HF defaults to process all splits, updates API to return output file prefixes, and updates docs and tests accordingly.

Changes

Cohort / File(s) Summary
Documentation
CHANGELOG.rst, examples/dataset/README.md, examples/megatron_bridge/README.md
Changelog and README edits: reorganized dataset preparation docs, added "Tokenizing for Megatron Frameworks", updated install/auth commands, documented reasoning_content, --strip_newlines, --hf_streaming, and removed duplicated tokenization instructions.
Dataset utilities
modelopt/torch/utils/dataset_utils.py
download_hf_dataset_as_jsonl default split changed from "train"None; missing json_keys now raise KeyError (fail-fast) instead of warning and skipping.
Preprocessing plugin
modelopt/torch/utils/plugins/megatron_preprocess_data.py
HF ingestion switched to direct dataset loading/streaming (no JSONL intermediates); added reasoning_content (`strip
Tests
tests/gpu_megatron/torch/utils/plugins/test_megatron_preprocess_data.py
Tests updated to use returned prefixes, assert no HF JSONL intermediates, validate .bin/.idx per returned prefix, add reasoning_content and gzip-input tests, and add hf_streaming warning test.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant Preprocessor as megatron_preprocess_data
    participant HFDataset as HF Dataset Loader
    participant Tokenizer
    participant FileSystem

    rect rgba(100,150,200,0.5)
    Note over Client,FileSystem: Direct Hugging Face streaming + tokenization
    Client->>Preprocessor: invoke with --hf_dataset, --hf_split=None, --reasoning_content
    Preprocessor->>HFDataset: enumerate splits / load_dataset(split[, streaming])
    HFDataset-->>Preprocessor: stream rows as JSON
    Preprocessor->>Preprocessor: apply reasoning_content + strip_newlines
    Preprocessor->>Tokenizer: format messages + tokenize (chat-template: add_special_tokens=false)
    Tokenizer-->>Preprocessor: token ids
    Preprocessor->>FileSystem: write .bin and .idx files, collect prefixes
    end
    Preprocessor-->>Client: return list of output prefixes
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

🚥 Pre-merge checks | ✅ 3 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 47.06% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (3 passed)
Check name Status Explanation
Title check ✅ Passed The pull request title clearly summarizes the primary improvements introduced in the changeset: adding streaming support, reasoning_content support for Nemotron v3, and in-memory HuggingFace dataset tokenization.
Security Anti-Patterns ✅ Passed The PR contains no security anti-patterns. No unsafe torch.load(), numpy.load(), eval()/exec() on user input, or hardcoded trust_remote_code=True found.
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch kmorabia/megatron-preprocess-data-improvements

Comment @coderabbitai help to get the list of available commands and usage tips.

@kevalmorabia97 kevalmorabia97 requested review from ChenhanYu and removed request for Edwardf0t1 April 9, 2026 12:36
@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Apr 9, 2026

PR Preview Action v1.8.1
Preview removed because the pull request was closed.
2026-04-09 19:38 UTC

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
modelopt/torch/utils/plugins/megatron_preprocess_data.py (1)

226-264: ⚠️ Potential issue | 🟡 Minor

Resource leak: multiprocessing.Pool not closed in process_json_file.

The pool created at line 229 is never closed or joined, unlike in process_hf_split (lines 330-331). This can lead to resource leaks if the function is called multiple times.

🐛 Proposed fix
         for key in builders:
             builders[key].finalize(output_idx_files[key])
 
+        pool.close()
+        pool.join()
         return final_enc_len, prefixes
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@modelopt/torch/utils/plugins/megatron_preprocess_data.py` around lines 226 -
264, In process_json_file the multiprocessing.Pool instance (pool) created for
encoder.imap is never closed/joined, causing resource leaks; ensure the pool is
properly shut down after use—either wrap pool creation/usage in a context
manager (with multiprocessing.Pool(...) as pool:) or call pool.close() and
pool.join() after consuming encoded_docs (and before any return or when skipping
due to existing builders). Locate the pool variable and encoded_docs usage in
process_json_file and add the close/join (or convert to a with block) so the
worker processes are cleaned up in all code paths.
🧹 Nitpick comments (2)
tests/gpu_megatron/torch/utils/plugins/test_megatron_preprocess_data.py (1)

88-127: Consider adding test for native mode.

The test covers strip and inline but not native. While native passes messages unchanged (requiring tokenizer support), a basic test ensuring it doesn't crash would improve coverage.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/gpu_megatron/torch/utils/plugins/test_megatron_preprocess_data.py`
around lines 88 - 127, Add a subtest for the "native" reasoning_content mode to
ensure megatron_preprocess_data doesn’t crash and produces expected
prefix/index/bin files: call megatron_preprocess_data with
reasoning_content="native" (same jsonl input and args used in
test_megatron_preprocess_data_reasoning_content), assert it returns the same
single prefix (matching out_dir/"test_reasoning_messages"), and assert the .bin
and .idx files exist and the .bin size is > 0; reference
megatron_preprocess_data and reuse the existing tmp_path/jsonl setup and
assertion pattern from the "strip"/"inline" blocks.
modelopt/torch/utils/plugins/megatron_preprocess_data.py (1)

88-96: Accessing private _identity attribute.

multiprocessing.current_process()._identity is a private implementation detail that may change across Python versions. Consider using multiprocessing.current_process().name which is public, or check if we're in the main process using multiprocessing.parent_process() is None.

♻️ Alternative using public API
 def _is_main_or_first_worker() -> bool:
-    """Return True only for the main process or the first pool worker.
-
-    ``multiprocessing.current_process()._identity`` is ``()`` in the main process
-    and ``(N,)`` in the N-th pool worker.  Gating noisy prints on this prevents
-    the same message from appearing once per worker when using many workers.
-    """
-    identity = multiprocessing.current_process()._identity
-    return not identity or identity[0] == 1
+    """Return True only for the main process or the first pool worker."""
+    proc = multiprocessing.current_process()
+    # Main process has no parent; pool workers are named like "ForkPoolWorker-N"
+    if multiprocessing.parent_process() is None:
+        return True
+    # For pool workers, check if this is worker 1
+    name = proc.name
+    if name.startswith("ForkPoolWorker-") or name.startswith("SpawnPoolWorker-"):
+        return name.rsplit("-", 1)[-1] == "1"
+    return True
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@modelopt/torch/utils/plugins/megatron_preprocess_data.py` around lines 88 -
96, The function _is_main_or_first_worker currently reads the private attribute
multiprocessing.current_process()._identity; replace that with a public-API
check: use multiprocessing.parent_process() is None to detect the main process,
and for pool workers use multiprocessing.current_process().name (e.g.,
"ForkPoolWorker-1") to detect the first worker by parsing the trailing worker
index (treat name ending with "-1" or containing "1" as first worker). Update
_is_main_or_first_worker to return True when parent_process() is None or when
the parsed worker index equals 1, and avoid accessing any private attributes.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@examples/dataset/README.md`:
- Line 143: Fix the typo "differnt" to "different" in the README sentence
describing data_paths for Megatron training; update the sentence that references
megatron_preprocess_data and the data_paths argument so it reads "...with
relative weights on different files) in megatron training scripts." to ensure
clarity when mentioning megatron_preprocess_data and data_paths.

In `@modelopt/torch/utils/plugins/megatron_preprocess_data.py`:
- Around line 346-351: The bare except around
get_dataset_config_names(dataset_name) is too broad; replace it by catching the
actual failure cases (e.g., FileNotFoundError, ConnectionError,
DatasetNotFoundError, DataFilesNotFoundError) and fall back to configs = [None]
for those specific exceptions, and/or log the caught exception for debugging
(use the module logger or logger.exception) so failures are visible while
preserving the intended fallback behavior for configs.

---

Outside diff comments:
In `@modelopt/torch/utils/plugins/megatron_preprocess_data.py`:
- Around line 226-264: In process_json_file the multiprocessing.Pool instance
(pool) created for encoder.imap is never closed/joined, causing resource leaks;
ensure the pool is properly shut down after use—either wrap pool creation/usage
in a context manager (with multiprocessing.Pool(...) as pool:) or call
pool.close() and pool.join() after consuming encoded_docs (and before any return
or when skipping due to existing builders). Locate the pool variable and
encoded_docs usage in process_json_file and add the close/join (or convert to a
with block) so the worker processes are cleaned up in all code paths.

---

Nitpick comments:
In `@modelopt/torch/utils/plugins/megatron_preprocess_data.py`:
- Around line 88-96: The function _is_main_or_first_worker currently reads the
private attribute multiprocessing.current_process()._identity; replace that with
a public-API check: use multiprocessing.parent_process() is None to detect the
main process, and for pool workers use multiprocessing.current_process().name
(e.g., "ForkPoolWorker-1") to detect the first worker by parsing the trailing
worker index (treat name ending with "-1" or containing "1" as first worker).
Update _is_main_or_first_worker to return True when parent_process() is None or
when the parsed worker index equals 1, and avoid accessing any private
attributes.

In `@tests/gpu_megatron/torch/utils/plugins/test_megatron_preprocess_data.py`:
- Around line 88-127: Add a subtest for the "native" reasoning_content mode to
ensure megatron_preprocess_data doesn’t crash and produces expected
prefix/index/bin files: call megatron_preprocess_data with
reasoning_content="native" (same jsonl input and args used in
test_megatron_preprocess_data_reasoning_content), assert it returns the same
single prefix (matching out_dir/"test_reasoning_messages"), and assert the .bin
and .idx files exist and the .bin size is > 0; reference
megatron_preprocess_data and reuse the existing tmp_path/jsonl setup and
assertion pattern from the "strip"/"inline" blocks.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: 0f217cc4-46cb-4e75-9352-767705eeedbe

📥 Commits

Reviewing files that changed from the base of the PR and between 24b8dcf and 9ae6ec6.

📒 Files selected for processing (6)
  • CHANGELOG.rst
  • examples/dataset/README.md
  • examples/megatron_bridge/README.md
  • modelopt/torch/utils/dataset_utils.py
  • modelopt/torch/utils/plugins/megatron_preprocess_data.py
  • tests/gpu_megatron/torch/utils/plugins/test_megatron_preprocess_data.py

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
modelopt/torch/utils/plugins/megatron_preprocess_data.py (2)

237-272: ⚠️ Potential issue | 🟠 Major

Multiprocessing pool is never closed or joined.

The pool created at line 237 is never terminated. Compare to process_hf_split which properly calls pool.close() and pool.join() at lines 338-339. This can leave zombie worker processes.

🐛 Proposed fix
         fin.close()
+        pool.close()
+        pool.join()
         for key in builders:
             builders[key].finalize(output_idx_files[key])
 
         return final_enc_len, prefixes
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@modelopt/torch/utils/plugins/megatron_preprocess_data.py` around lines 237 -
272, The multiprocessing.Pool created with multiprocessing.Pool(self.workers,
initializer=encoder.initializer) is never closed/joined, which can leave worker
processes running; update the block that consumes pool.imap (encoded_docs) to
always call pool.close() and pool.join() after iteration and ensure you also
close/join before any early return (e.g., when builders is empty), mirroring the
pattern used in process_hf_split (call pool.close() then pool.join()) so the
pool is properly terminated in all code paths.

262-262: ⚠️ Potential issue | 🟠 Major

Bug: final_enc_len only counts tokens for the last json_key.

The variable key here refers to the last key from the loop at lines 244-252, not all keys. When multiple json_keys are provided, this only counts tokens for one of them.

Compare to the correct implementation in process_hf_split at line 330:

final_enc_len += sum(sum(sentence_lens[key]) for key in sentence_lens)
🐛 Proposed fix
-            final_enc_len += sum(sentence_lens[key])
+            final_enc_len += sum(sum(sentence_lens[k]) for k in sentence_lens)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@modelopt/torch/utils/plugins/megatron_preprocess_data.py` at line 262, The
accumulation of final_enc_len only adds tokens for the last loop variable `key`;
update the logic in the function (the loop handling `sentence_lens`/`json_keys`,
same area as `final_enc_len += sum(sentence_lens[key])`) to aggregate over all
keys by summing every entry in `sentence_lens` (e.g., final_enc_len +=
sum(sum(sentence_lens[k]) for k in sentence_lens)) so `final_enc_len`,
`sentence_lens`, and `key` correctly reflect tokens from all json_keys rather
than just the last one.
♻️ Duplicate comments (1)
examples/dataset/README.md (1)

143-143: ⚠️ Potential issue | 🟡 Minor

Fix typos and grammar in this line.

This line has multiple issues:

  1. "differnt" should be "different"
  2. "Below tokenization scripts prints" should be "The below tokenization scripts print" (subject-verb agreement)
📝 Suggested fix
-The distillation and pre-training scripts in Megatron-Bridge or Megatron-LM expect data pre-tokenized in Megatron's binary indexed format (`.bin` / `.idx`).  Use the `megatron_preprocess_data` utility to tokenize any JSONL or Hugging Face dataset. Below tokenization scripts prints the list of output prefixes (e.g. `tokenized_qwen3/data1_text`) that you can use for `data_paths` argument (with relative weights on differnt files) in megatron training scripts.
+The distillation and pre-training scripts in Megatron-Bridge or Megatron-LM expect data pre-tokenized in Megatron's binary indexed format (`.bin` / `.idx`).  Use the `megatron_preprocess_data` utility to tokenize any JSONL or Hugging Face dataset. The below tokenization scripts print the list of output prefixes (e.g. `tokenized_qwen3/data1_text`) that you can use for `data_paths` argument (with relative weights on different files) in megatron training scripts.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@examples/dataset/README.md` at line 143, The sentence contains typos and a
subject-verb agreement error: change "differnt" to "different" and "Below
tokenization scripts prints" to "The below tokenization scripts print"; ensure
references like megatron_preprocess_data, data_paths, and the example output
prefix tokenized_qwen3/data1_text remain unchanged while updating the sentence
accordingly.
🧹 Nitpick comments (3)
modelopt/torch/utils/plugins/megatron_preprocess_data.py (2)

89-98: Reliance on private _identity attribute.

multiprocessing.current_process()._identity is an undocumented private attribute. While this pattern is commonly used and works reliably, it could break in future Python versions without warning. Consider adding a brief inline comment noting this dependency, or wrapping in a try/except with a fallback.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@modelopt/torch/utils/plugins/megatron_preprocess_data.py` around lines 89 -
98, The function _is_main_or_first_worker relies on the private attribute
multiprocessing.current_process()._identity which is fragile; wrap access to
_identity in a try/except that handles AttributeError (and any other unexpected
exceptions) and provide a safe fallback (e.g., treat as main process when
attribute missing or not a sequence), and add a brief inline comment that
documents the reliance on a private implementation detail and why the fallback
is used; ensure the logic still returns True for the main process or first pool
worker when _identity is present and falls back safely when it is not.

122-124: Consider gating init prints with _is_main_or_first_worker().

These prints at initialization will appear for every worker process when using multiprocessing. For consistency with the approach used elsewhere in this file, consider gating them:

if _is_main_or_first_worker():
    print(f"Setting max document length: {self.max_document_length}")
    print(f"reasoning_content mode: {self.reasoning_content}")
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@modelopt/torch/utils/plugins/megatron_preprocess_data.py` around lines 122 -
124, Wrap the two init prints that currently unconditionally call
print(f"Setting max document length: {self.max_document_length}") and
print(f"reasoning_content mode: {self.reasoning_content}") with a guard using
_is_main_or_first_worker() so only the main/first worker logs them; locate these
prints (e.g., in the constructor where self.max_document_length and
self.reasoning_content are set) and change the logic to call the prints only
inside an if _is_main_or_first_worker(): block.
examples/dataset/README.md (1)

175-175: Consider consistent spelling of "pretraining" vs "pre-training".

The document uses "pre-training" elsewhere (e.g., line 141, 147) but "pretraining" twice on this line. Consider using consistent spelling throughout for readability.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@examples/dataset/README.md` at line 175, The README line uses "pretraining"
whereas other lines use "pre-training"; make the spelling consistent across the
document by choosing one form and updating this line to match (e.g., change
"pretraining" to "pre-training" on the sentence mentioning --strip_newlines) so
all occurrences of the term in the README.md use the same hyphenation.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Outside diff comments:
In `@modelopt/torch/utils/plugins/megatron_preprocess_data.py`:
- Around line 237-272: The multiprocessing.Pool created with
multiprocessing.Pool(self.workers, initializer=encoder.initializer) is never
closed/joined, which can leave worker processes running; update the block that
consumes pool.imap (encoded_docs) to always call pool.close() and pool.join()
after iteration and ensure you also close/join before any early return (e.g.,
when builders is empty), mirroring the pattern used in process_hf_split (call
pool.close() then pool.join()) so the pool is properly terminated in all code
paths.
- Line 262: The accumulation of final_enc_len only adds tokens for the last loop
variable `key`; update the logic in the function (the loop handling
`sentence_lens`/`json_keys`, same area as `final_enc_len +=
sum(sentence_lens[key])`) to aggregate over all keys by summing every entry in
`sentence_lens` (e.g., final_enc_len += sum(sum(sentence_lens[k]) for k in
sentence_lens)) so `final_enc_len`, `sentence_lens`, and `key` correctly reflect
tokens from all json_keys rather than just the last one.

---

Duplicate comments:
In `@examples/dataset/README.md`:
- Line 143: The sentence contains typos and a subject-verb agreement error:
change "differnt" to "different" and "Below tokenization scripts prints" to "The
below tokenization scripts print"; ensure references like
megatron_preprocess_data, data_paths, and the example output prefix
tokenized_qwen3/data1_text remain unchanged while updating the sentence
accordingly.

---

Nitpick comments:
In `@examples/dataset/README.md`:
- Line 175: The README line uses "pretraining" whereas other lines use
"pre-training"; make the spelling consistent across the document by choosing one
form and updating this line to match (e.g., change "pretraining" to
"pre-training" on the sentence mentioning --strip_newlines) so all occurrences
of the term in the README.md use the same hyphenation.

In `@modelopt/torch/utils/plugins/megatron_preprocess_data.py`:
- Around line 89-98: The function _is_main_or_first_worker relies on the private
attribute multiprocessing.current_process()._identity which is fragile; wrap
access to _identity in a try/except that handles AttributeError (and any other
unexpected exceptions) and provide a safe fallback (e.g., treat as main process
when attribute missing or not a sequence), and add a brief inline comment that
documents the reliance on a private implementation detail and why the fallback
is used; ensure the logic still returns True for the main process or first pool
worker when _identity is present and falls back safely when it is not.
- Around line 122-124: Wrap the two init prints that currently unconditionally
call print(f"Setting max document length: {self.max_document_length}") and
print(f"reasoning_content mode: {self.reasoning_content}") with a guard using
_is_main_or_first_worker() so only the main/first worker logs them; locate these
prints (e.g., in the constructor where self.max_document_length and
self.reasoning_content are set) and change the logic to call the prints only
inside an if _is_main_or_first_worker(): block.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: 7d030134-ea73-4cf6-a4f2-9a3403aed08b

📥 Commits

Reviewing files that changed from the base of the PR and between 9ae6ec6 and a472d0b.

📒 Files selected for processing (3)
  • CHANGELOG.rst
  • examples/dataset/README.md
  • modelopt/torch/utils/plugins/megatron_preprocess_data.py
🚧 Files skipped from review as they are similar to previous changes (1)
  • CHANGELOG.rst

kevalmorabia97 and others added 2 commits April 9, 2026 05:57
…y tokenization, and return output prefixes

Key changes to `megatron_preprocess_data`:
- Add `reasoning_content` param (strip/inline/native) to handle Nemotron v3
  `reasoning_content` field in assistant messages
- Eliminate intermediate JSONL for HF datasets: load Arrow cache directly via
  `_iter_hf_as_json` + `process_hf_split`, avoiding a disk round-trip
- Return `list[str]` of output file prefixes so callers can build `--data_paths`
  without hardcoding paths; also printed at end of run
- Remove `_document` suffix from output filenames (`_text.bin` not `_text_document.bin`)
- Fix `add_special_tokens=False` for chat-template data to prevent duplicate BOS
- Fix TypeError in `process_hf_split` (`sum(list)` not `sum(int)`)
- Suppress duplicate worker prints via `_is_main_or_first_worker()`
- Raise `KeyError` instead of warning for missing json keys
- Default `hf_split=None` (all splits) instead of `"train"`
- Add `_enumerate_hf_splits()` using datasets API (no HTTP dependency)

Tests:
- Update `test_megatron_preprocess_data.py` to assert on returned prefixes
- Merge two reasoning_content tests into one
- Drop unused `os` import

Docs:
- `examples/dataset/README.md`: add full "Tokenizing for Megatron Frameworks"
  section with JSONL, HF Hub, reasoning_content, and data_paths examples;
  restructure with two H2 sections and nav table
- `examples/megatron_bridge/README.md`: collapse Data Preparation to a pointer
  to the dataset README; fix output filename references

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Signed-off-by: Keval Morabia <28916987+kevalmorabia97@users.noreply.github.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Signed-off-by: Keval Morabia <28916987+kevalmorabia97@users.noreply.github.com>
@kevalmorabia97 kevalmorabia97 force-pushed the kmorabia/megatron-preprocess-data-improvements branch from a472d0b to 7eea5b9 Compare April 9, 2026 12:57
…s flag

- Support .jsonl.gz input files directly (auto-detected by extension)
- Glob *.jsonl.gz in addition to *.jsonl when using --input_dir
- Strip double extension for output prefix (foo.jsonl.gz -> foo_text)
- Add --strip_newlines flag (opt-in) to replace newlines with spaces in
  plain-text values; default preserves newlines to avoid breaking change
  for code/structured-text datasets

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Signed-off-by: Keval Morabia <28916987+kevalmorabia97@users.noreply.github.com>
@kevalmorabia97 kevalmorabia97 force-pushed the kmorabia/megatron-preprocess-data-improvements branch from 7eea5b9 to cdf7209 Compare April 9, 2026 13:00
@kevalmorabia97 kevalmorabia97 changed the title feat(megatron-preprocess): reasoning_content support, HF in-memory tokenization, return output prefixes Improve Megatron Tokenization: reasoning_content support, HF in-memory tokenization, skip newlines, return output prefixes Apr 9, 2026
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
modelopt/torch/utils/plugins/megatron_preprocess_data.py (1)

258-266: ⚠️ Potential issue | 🟠 Major

Fix the JSONL token-count accumulation and empty-file path.

final_enc_len += sum(sentence_lens[key]) only counts whichever key was left over from the builder setup loop, so multi-key inputs under-report tokens. Also, i is undefined when the file has zero rows, so the final stats print raises UnboundLocalError.

♻️ Proposed fix
-        total_doc_len, total_enc_len, final_enc_len = 0, 0, 0
-        for i, (doc, sentence_lens, (doc_len, enc_len)) in enumerate(encoded_docs, start=1):
+        total_doc_len, total_enc_len, final_enc_len = 0, 0, 0
+        i = 0
+        for i, (doc, sentence_lens, (doc_len, enc_len)) in enumerate(encoded_docs, start=1):
             total_doc_len += doc_len
             total_enc_len += enc_len
-            final_enc_len += sum(sentence_lens[key])
+            final_enc_len += sum(sum(lengths) for lengths in sentence_lens.values())
             for key in doc:
                 builders[key].add_document(doc[key], sentence_lens[key])
             self._print_processing_stats(i, total_doc_len, total_enc_len)
-        self._print_processing_stats(i, total_doc_len, total_enc_len, force_print=True)
+        if i:
+            self._print_processing_stats(i, total_doc_len, total_enc_len, force_print=True)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@modelopt/torch/utils/plugins/megatron_preprocess_data.py` around lines 258 -
266, The loop under encoded_docs mis-accumulates token counts and can use an
undefined i when encoded_docs is empty; change final_enc_len accumulation to sum
tokens across all keys each iteration (e.g., add sum(...) over sentence_lens
entries rather than using a single leftover key) and ensure the processed-count
variable is always defined before/after the loop (initialize i or use a separate
counter like processed_count = 0 and increment inside the loop) so
self._print_processing_stats(i, ...) cannot raise UnboundLocalError when there
are zero rows; update references to final_enc_len, sentence_lens, encoded_docs,
builders, and _print_processing_stats accordingly.
♻️ Duplicate comments (2)
examples/dataset/README.md (1)

143-143: ⚠️ Potential issue | 🟡 Minor

Polish this intro sentence before merging.

Line 143 still has prints/differnt. Suggested wording: “The script prints the list of output prefixes … with relative weights on different files …”.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@examples/dataset/README.md` at line 143, Update the intro sentence to fix
grammar and clarity: replace the current sentence that starts "Below
tokenization scripts prints the list of output prefixes..." with the suggested
wording "The script prints the list of output prefixes … with relative weights
on different files …", ensuring "prints" is singular ("script prints") and
"different" is spelled correctly; locate and update that sentence in the README
section describing the megatron_preprocess_data utility so it reads the
suggested phrasing exactly.
modelopt/torch/utils/plugins/megatron_preprocess_data.py (1)

356-359: ⚠️ Potential issue | 🟡 Minor

Don't swallow every dataset-discovery failure here.

except Exception hides auth, network, and repo-resolution errors and makes them look like “single unnamed config” datasets. Catch the expected datasets discovery exceptions and re-raise unexpected ones.

What exceptions can Hugging Face `datasets.get_dataset_config_names()` raise for missing, private, or offline datasets?
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@modelopt/torch/utils/plugins/megatron_preprocess_data.py` around lines 356 -
359, The current blanket except hides unexpected failures from
get_dataset_config_names(dataset_name); change it to only catch known
dataset-discovery errors (e.g., datasets' DatasetNotFoundError plus common I/O
errors like ValueError and OSError) and re-raise any other exceptions so
auth/network/repo-resolution errors surface. Concretely, import the specific
exceptions from the datasets package, replace "except Exception:" with "except
(datasets.builder.DatasetNotFoundError, ValueError, OSError) as e:" (or the
exact DatasetNotFoundError/Offline-errors provided by the datasets lib), set
configs = [None] for those cases, and use "raise" for any other exceptions so
unexpected failures aren’t swallowed in the
get_dataset_config_names(dataset_name) call.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@modelopt/torch/utils/plugins/megatron_preprocess_data.py`:
- Around line 231-238: process_json_file() currently opens fin and creates
multiprocessing.Pool(self.workers, initializer=encoder.initializer) then starts
pool.imap(encoder.encode, fin, 32) but returns early when outputs exist or if
encoder.encode() raises, leaking file descriptors and worker processes; fix by
ensuring fin and pool are always cleaned up: use context managers (with
open(...) as fin and with multiprocessing.Pool(...) as pool) or a try/finally
that closes fin and calls pool.close()/pool.join() on normal exit and
pool.terminate()/pool.join() on exception, and ensure any early-return path
closes both resources before returning (refer to fin, pool, encoded_docs,
process_json_file, encoder.encode).
- Around line 226-229: The current normalization of input_path -> stem ->
output_prefix can collapse distinct files (e.g., foo.jsonl and foo.jsonl.gz)
into the same output_prefix; update the logic that computes stem/output_prefix
(variables input_path, stem, output_prefix, prefixes and uses self.json_keys) to
detect or avoid collisions before writing: either preserve a differentiating
token (e.g., include the original suffix or full input_path.name in the prefix
when input_path.suffix == ".gz") or maintain a global set of generated
output_prefixes and raise a clear error if a duplicate is about to be created so
the caller can resolve the conflict; implement the chosen approach in the same
block that computes stem/output_prefix so collisions are prevented before any
files are written.

---

Outside diff comments:
In `@modelopt/torch/utils/plugins/megatron_preprocess_data.py`:
- Around line 258-266: The loop under encoded_docs mis-accumulates token counts
and can use an undefined i when encoded_docs is empty; change final_enc_len
accumulation to sum tokens across all keys each iteration (e.g., add sum(...)
over sentence_lens entries rather than using a single leftover key) and ensure
the processed-count variable is always defined before/after the loop (initialize
i or use a separate counter like processed_count = 0 and increment inside the
loop) so self._print_processing_stats(i, ...) cannot raise UnboundLocalError
when there are zero rows; update references to final_enc_len, sentence_lens,
encoded_docs, builders, and _print_processing_stats accordingly.

---

Duplicate comments:
In `@examples/dataset/README.md`:
- Line 143: Update the intro sentence to fix grammar and clarity: replace the
current sentence that starts "Below tokenization scripts prints the list of
output prefixes..." with the suggested wording "The script prints the list of
output prefixes … with relative weights on different files …", ensuring "prints"
is singular ("script prints") and "different" is spelled correctly; locate and
update that sentence in the README section describing the
megatron_preprocess_data utility so it reads the suggested phrasing exactly.

In `@modelopt/torch/utils/plugins/megatron_preprocess_data.py`:
- Around line 356-359: The current blanket except hides unexpected failures from
get_dataset_config_names(dataset_name); change it to only catch known
dataset-discovery errors (e.g., datasets' DatasetNotFoundError plus common I/O
errors like ValueError and OSError) and re-raise any other exceptions so
auth/network/repo-resolution errors surface. Concretely, import the specific
exceptions from the datasets package, replace "except Exception:" with "except
(datasets.builder.DatasetNotFoundError, ValueError, OSError) as e:" (or the
exact DatasetNotFoundError/Offline-errors provided by the datasets lib), set
configs = [None] for those cases, and use "raise" for any other exceptions so
unexpected failures aren’t swallowed in the
get_dataset_config_names(dataset_name) call.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: 73239c9d-f24c-4138-9d8c-e605acb858ea

📥 Commits

Reviewing files that changed from the base of the PR and between a472d0b and 7eea5b9.

📒 Files selected for processing (6)
  • CHANGELOG.rst
  • examples/dataset/README.md
  • examples/megatron_bridge/README.md
  • modelopt/torch/utils/dataset_utils.py
  • modelopt/torch/utils/plugins/megatron_preprocess_data.py
  • tests/gpu_megatron/torch/utils/plugins/test_megatron_preprocess_data.py
✅ Files skipped from review due to trivial changes (1)
  • examples/megatron_bridge/README.md
🚧 Files skipped from review as they are similar to previous changes (2)
  • CHANGELOG.rst
  • tests/gpu_megatron/torch/utils/plugins/test_megatron_preprocess_data.py

@codecov
Copy link
Copy Markdown

codecov bot commented Apr 9, 2026

Codecov Report

❌ Patch coverage is 77.70270% with 33 lines in your changes missing coverage. Please review.
✅ Project coverage is 76.46%. Comparing base (14e9bea) to head (6b00285).
⚠️ Report is 3 commits behind head on main.

Files with missing lines Patch % Lines
...pt/torch/utils/plugins/megatron_preprocess_data.py 77.70% 33 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #1221      +/-   ##
==========================================
+ Coverage   75.68%   76.46%   +0.78%     
==========================================
  Files         353      353              
  Lines       40491    40603     +112     
==========================================
+ Hits        30644    31046     +402     
+ Misses       9847     9557     -290     
Flag Coverage Δ
examples 42.81% <6.75%> (+1.08%) ⬆️
gpu 56.89% <77.70%> (-0.22%) ⬇️
unit 55.11% <2.70%> (-0.15%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@kevalmorabia97 kevalmorabia97 changed the title Improve Megatron Tokenization: reasoning_content support, HF in-memory tokenization, skip newlines, return output prefixes Improve Megatron Tokenization: streaming, reasoning_content support, HF in-memory tokenization, etc Apr 9, 2026
@kevalmorabia97 kevalmorabia97 force-pushed the kmorabia/megatron-preprocess-data-improvements branch from 47d0e7b to b56ee29 Compare April 9, 2026 13:42
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
modelopt/torch/utils/plugins/megatron_preprocess_data.py (1)

273-273: ⚠️ Potential issue | 🔴 Critical

Bug: key refers to last key from previous loop, causing incorrect token count with multiple json_keys.

Variable key on line 273 is bound to the last value from the for key in self.json_keys: loop (line 255), not the current document's keys. When multiple json_keys are specified, only the last key's tokens are counted in final_enc_len.

Compare with process_hf_split (line 359) which correctly sums over all keys:

final_enc_len += sum(sum(sentence_lens[key]) for key in sentence_lens)
🐛 Proposed fix
-            final_enc_len += sum(sentence_lens[key])
+            final_enc_len += sum(sum(sentence_lens[key]) for key in sentence_lens)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@modelopt/torch/utils/plugins/megatron_preprocess_data.py` at line 273, The
bug is that final_enc_len += sum(sentence_lens[key]) uses the loop variable key
from an outer loop (self.json_keys) so when multiple json_keys exist it only
sums the last key's tokens; fix by iterating the keys present in sentence_lens
instead of reusing key — replace that line with a summed comprehension over
sentence_lens (e.g., final_enc_len += sum(sum(sentence_lens[k]) for k in
sentence_lens) or equivalent) so final_enc_len accumulates tokens for all keys;
update the code in megatron_preprocess_data.py where sentence_lens and
final_enc_len are used (reference symbols: sentence_lens, final_enc_len,
self.json_keys).
♻️ Duplicate comments (2)
modelopt/torch/utils/plugins/megatron_preprocess_data.py (2)

237-240: ⚠️ Potential issue | 🟠 Major

Stem collision for .jsonl and .jsonl.gz files with same base name.

If --input_dir contains both foo.jsonl and foo.jsonl.gz, both normalize to stem foo, producing identical output prefixes. One file will silently skip (if outputs exist from the first) or overwrite the other's output.

Consider detecting collisions upfront or preserving a distinguishing suffix.


242-267: ⚠️ Potential issue | 🟠 Major

Resource leak: fin and pool not closed on early return or exception.

When outputs already exist (line 267), the function returns without closing the file handle or terminating the pool. Additionally, even on the normal path, pool.close() / pool.join() is never called (compare with process_hf_split which does this correctly).

♻️ Suggested refactor using context managers
-        print(f"\nOpening {input_file_name}")
-        if input_path.suffix == ".gz":
-            fin = gzip.open(input_path, "rt", encoding="utf-8")
-        else:
-            fin = open(input_path, encoding="utf-8")
-
-        pool = multiprocessing.Pool(self.workers, initializer=encoder.initializer)
-        encoded_docs = pool.imap(encoder.encode, fin, 32)
-
         output_bin_files = {}
         output_idx_files = {}
         builders = {}

         for key in self.json_keys:
             ...
             builders[key] = ...

         if not builders:
             print(f"\t[SKIP] Output files corresponding to {input_file_name} already exist")
             return 0, prefixes

+        print(f"\nOpening {input_file_name}")
+        open_fn = gzip.open if input_path.suffix == ".gz" else open
+        with open_fn(input_path, "rt", encoding="utf-8") as fin, \
+             multiprocessing.Pool(self.workers, initializer=encoder.initializer) as pool:
+            encoded_docs = pool.imap(encoder.encode, fin, 32)
+            total_doc_len, total_enc_len, final_enc_len = 0, 0, 0
+            ...
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@modelopt/torch/utils/plugins/megatron_preprocess_data.py` around lines 242 -
267, The code leaks the file handle fin and the multiprocessing.Pool pool on the
early return and on exceptions; wrap resource usage in context managers or
ensure cleanup: open fin using a with (use gzip.open or open) and create the
pool with "with multiprocessing.Pool(self.workers,
initializer=encoder.initializer) as pool:" so pool is closed/joined
automatically, or alternatively ensure a try/finally that calls
pool.close()/pool.terminate() and pool.join() and fin.close() before any return
(including the branch that checks builders and returns) — make these changes
around fin, pool, encoded_docs and the early-return that currently skips when
builders is empty (similar to process_hf_split).
🧹 Nitpick comments (1)
modelopt/torch/utils/plugins/megatron_preprocess_data.py (1)

100-108: Acceptable but note: relies on private _identity attribute.

Using multiprocessing.current_process()._identity is a common pattern for this purpose, but it's undocumented and could change in future Python versions. For now this is fine given the use case (suppressing duplicate log messages).

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@modelopt/torch/utils/plugins/megatron_preprocess_data.py` around lines 100 -
108, The implementation of _is_main_or_first_worker uses the private attribute
multiprocessing.current_process()._identity; replace this with a check on the
public process name instead: get multiprocessing.current_process().name and
return True if it equals "MainProcess" or if it identifies as the first pool
worker by matching the worker-name pattern (e.g., ends with "-1" or matches a
regex like r".*-1$"); update _is_main_or_first_worker to use
getattr(current_process(), "name", "") and perform these public-name checks so
we avoid relying on the private _identity attribute.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@examples/dataset/README.md`:
- Around line 199-200: The sentence after the `--hf_max_samples_per_split` flag
is missing a word; update the README sentence to read "the dataset is shuffled
so the sampled rows are representative rather than biased toward the dataset
prefix" (i.e., insert "shuffled") so the meaning is complete and clear.
- Around line 148-152: The bullet list in the README is malformed because the
fragment "appends EOS at the end of each conversation:" is separated from the
preceding bullet; fix it by moving that fragment to directly follow the bullet
"For Post-training chat data (`messages` key) — omit `--append_eod`; the chat
template already" so the two lines become one coherent bullet, and ensure the
surrounding bullets still read: "For Pretraining / raw-text data (`text` key) —
use `--append_eod` ..." and "Set `--max_sequence_length 256_000` ..." so the
list items are properly ordered and the chat-note belongs to the post-training
bullet.

---

Outside diff comments:
In `@modelopt/torch/utils/plugins/megatron_preprocess_data.py`:
- Line 273: The bug is that final_enc_len += sum(sentence_lens[key]) uses the
loop variable key from an outer loop (self.json_keys) so when multiple json_keys
exist it only sums the last key's tokens; fix by iterating the keys present in
sentence_lens instead of reusing key — replace that line with a summed
comprehension over sentence_lens (e.g., final_enc_len +=
sum(sum(sentence_lens[k]) for k in sentence_lens) or equivalent) so
final_enc_len accumulates tokens for all keys; update the code in
megatron_preprocess_data.py where sentence_lens and final_enc_len are used
(reference symbols: sentence_lens, final_enc_len, self.json_keys).

---

Duplicate comments:
In `@modelopt/torch/utils/plugins/megatron_preprocess_data.py`:
- Around line 242-267: The code leaks the file handle fin and the
multiprocessing.Pool pool on the early return and on exceptions; wrap resource
usage in context managers or ensure cleanup: open fin using a with (use
gzip.open or open) and create the pool with "with
multiprocessing.Pool(self.workers, initializer=encoder.initializer) as pool:" so
pool is closed/joined automatically, or alternatively ensure a try/finally that
calls pool.close()/pool.terminate() and pool.join() and fin.close() before any
return (including the branch that checks builders and returns) — make these
changes around fin, pool, encoded_docs and the early-return that currently skips
when builders is empty (similar to process_hf_split).

---

Nitpick comments:
In `@modelopt/torch/utils/plugins/megatron_preprocess_data.py`:
- Around line 100-108: The implementation of _is_main_or_first_worker uses the
private attribute multiprocessing.current_process()._identity; replace this with
a check on the public process name instead: get
multiprocessing.current_process().name and return True if it equals
"MainProcess" or if it identifies as the first pool worker by matching the
worker-name pattern (e.g., ends with "-1" or matches a regex like r".*-1$");
update _is_main_or_first_worker to use getattr(current_process(), "name", "")
and perform these public-name checks so we avoid relying on the private
_identity attribute.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: 9ba5fc32-577a-4a96-b583-b11b0df793c1

📥 Commits

Reviewing files that changed from the base of the PR and between 7eea5b9 and 47d0e7b.

📒 Files selected for processing (4)
  • CHANGELOG.rst
  • examples/dataset/README.md
  • modelopt/torch/utils/plugins/megatron_preprocess_data.py
  • tests/gpu_megatron/torch/utils/plugins/test_megatron_preprocess_data.py
✅ Files skipped from review due to trivial changes (1)
  • CHANGELOG.rst
🚧 Files skipped from review as they are similar to previous changes (1)
  • tests/gpu_megatron/torch/utils/plugins/test_megatron_preprocess_data.py

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (2)
examples/dataset/README.md (2)

148-152: ⚠️ Potential issue | 🟡 Minor

Bullet list is still malformed; attach the EOS clause to the chat-data bullet.

The “appends EOS at the end of each conversation” fragment is still detached, which breaks list structure and readability.

📝 Proposed fix
-- For Pretraining / raw-text data (`text` key) — use `--append_eod` so Megatron can tell where
-documents end when concatenating them into long sequences:
-- For Post-training chat data (`messages` key) — omit `--append_eod`; the chat template already
-- Set `--max_sequence_length 256_000` to avoid rare OOM errors if some text is very long.
-appends EOS at the end of each conversation:
+- For Pretraining / raw-text data (`text` key) — use `--append_eod` so Megatron can tell where
+  documents end when concatenating them into long sequences.
+- For Post-training chat data (`messages` key) — omit `--append_eod`; the chat template already
+  appends EOS at the end of each conversation.
+- Set `--max_sequence_length 256_000` to avoid rare OOM errors if some text is very long.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@examples/dataset/README.md` around lines 148 - 152, The bullet list in
README.md is malformed because the clause "appends EOS at the end of each
conversation" is detached; edit the three bullets so the second bullet for
Post-training chat data (key `messages`) includes the EOS clause (i.e., omit
`--append_eod` because the chat template already appends EOS at the end of each
conversation), keep the first bullet for Pretraining (key `text`) recommending
`--append_eod`, and keep the third bullet about setting `--max_sequence_length
256_000`; reference the `text` and `messages` keys and the `--append_eod` and
`--max_sequence_length 256_000` flags when fixing the list.

199-200: ⚠️ Potential issue | 🟡 Minor

Complete the streaming sentence (“dataset is shuffled”).

Line 199 currently reads “the dataset is so…”, which is incomplete and unclear.

📝 Proposed fix
-`--hf_max_samples_per_split`, the dataset is so the sampled rows are representative rather than
+`--hf_max_samples_per_split`, the dataset is shuffled so the sampled rows are representative rather than
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@examples/dataset/README.md` around lines 199 - 200, Fix the incomplete
sentence in the README by inserting the missing word so it reads clearly;
replace the fragment starting with "the dataset is so the sampled rows are
representative rather than biased toward the dataset prefix:" with a complete
phrase such as "the dataset is shuffled so the sampled rows are representative
rather than biased toward the dataset prefix." Ensure the updated sentence flows
with surrounding text and preserves the intended meaning.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@examples/dataset/README.md`:
- Line 143: The sentence "Below tokenization scripts prints the list of output
prefixes..." has a subject-verb agreement error; change it to a correct phrasing
such as "The tokenization scripts below print the list of output prefixes..." or
"The following tokenization scripts print the list of output prefixes..." so the
plural subject "tokenization scripts" matches the plural verb "print"; update
that sentence in the README content accordingly.

---

Duplicate comments:
In `@examples/dataset/README.md`:
- Around line 148-152: The bullet list in README.md is malformed because the
clause "appends EOS at the end of each conversation" is detached; edit the three
bullets so the second bullet for Post-training chat data (key `messages`)
includes the EOS clause (i.e., omit `--append_eod` because the chat template
already appends EOS at the end of each conversation), keep the first bullet for
Pretraining (key `text`) recommending `--append_eod`, and keep the third bullet
about setting `--max_sequence_length 256_000`; reference the `text` and
`messages` keys and the `--append_eod` and `--max_sequence_length 256_000` flags
when fixing the list.
- Around line 199-200: Fix the incomplete sentence in the README by inserting
the missing word so it reads clearly; replace the fragment starting with "the
dataset is so the sampled rows are representative rather than biased toward the
dataset prefix:" with a complete phrase such as "the dataset is shuffled so the
sampled rows are representative rather than biased toward the dataset prefix."
Ensure the updated sentence flows with surrounding text and preserves the
intended meaning.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: 551f0eb2-7ccd-4f42-a84d-d90a63899aea

📥 Commits

Reviewing files that changed from the base of the PR and between 47d0e7b and b56ee29.

📒 Files selected for processing (4)
  • CHANGELOG.rst
  • examples/dataset/README.md
  • modelopt/torch/utils/plugins/megatron_preprocess_data.py
  • tests/gpu_megatron/torch/utils/plugins/test_megatron_preprocess_data.py
✅ Files skipped from review due to trivial changes (1)
  • CHANGELOG.rst
🚧 Files skipped from review as they are similar to previous changes (2)
  • tests/gpu_megatron/torch/utils/plugins/test_megatron_preprocess_data.py
  • modelopt/torch/utils/plugins/megatron_preprocess_data.py

@kevalmorabia97 kevalmorabia97 force-pushed the kmorabia/megatron-preprocess-data-improvements branch from b56ee29 to b93e487 Compare April 9, 2026 14:07
Signed-off-by: Keval Morabia <28916987+kevalmorabia97@users.noreply.github.com>
@kevalmorabia97 kevalmorabia97 force-pushed the kmorabia/megatron-preprocess-data-improvements branch from b93e487 to ab916fd Compare April 9, 2026 14:31
Copy link
Copy Markdown
Collaborator

@ChenhanYu ChenhanYu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR Review: Improve Megatron Tokenization

Well-structured feature PR. Streaming mode, reasoning_content handling, direct HF tokenization, gzip support, and return prefixes are all clean additions. Documentation and test coverage are solid.

Comments

1. Output filename format changed — silent breaking change

Old: {stem}_{key}_document.{bin,idx} → New: {stem}_{key}.{bin,idx}

The _document suffix is dropped. Any downstream scripts or configs that hardcode the old naming pattern will break. Consider noting this in the CHANGELOG entry.

2. process_hf_split pool not closed on exception (megatron_preprocess_data.py:~413)

If an exception occurs between Pool() and pool.close(), the pool leaks. Consider try/finally or a context manager. Non-blocking.

3. Streaming vs non-streaming shuffle semantics differ

Non-streaming: split[:{max_samples}] takes first N rows then shuffles them. Streaming: shuffles the stream then take(N). Both randomize, but may select different subsets. Fine in practice, worth a brief comment if reproducibility matters.

4. final_enc_len computation differs between file and HF paths

process_json_file: sum(sentence_lens[key]) (one key). process_hf_split: sum(sum(sentence_lens[key]) for key in sentence_lens) (all keys). For multi-key scenarios, the file path undercounts. Minor.

5. _is_main_or_first_worker uses private _identity

Pragmatic choice for gating log noise but _identity is a CPython implementation detail. Worth a brief comment.

LGTM — the filename format change is the only item worth documenting before merge.

This is an AI-assisted review — human sign-off required before merging.

@kevalmorabia97 kevalmorabia97 force-pushed the kmorabia/megatron-preprocess-data-improvements branch from 1578503 to 2a1609e Compare April 9, 2026 16:27
Signed-off-by: Keval Morabia <28916987+kevalmorabia97@users.noreply.github.com>
@kevalmorabia97 kevalmorabia97 force-pushed the kmorabia/megatron-preprocess-data-improvements branch from 2a1609e to 6b00285 Compare April 9, 2026 16:42
yeyu-nvidia added a commit that referenced this pull request Apr 9, 2026
…_template (#1225)

## Summary

- `apply_chat_template(..., return_tensors="pt")` returns a
`BatchEncoding` in transformers 4.46+, which no longer subclasses `dict`
- The old guard `isinstance(tokenized, dict)` evaluates to `False` for
`BatchEncoding`, so `input_ids` was set to the whole `BatchEncoding`
object
- Calling `.shape[1]` on a `BatchEncoding` triggers
`__getattr__("shape")` → `AttributeError`
- Fix: check `isinstance(tokenized, torch.Tensor)` instead, which
correctly handles both old transformers (plain tensor) and new
transformers (BatchEncoding)

This is causing `test_collect_hidden_states` to fail in the speculative
decoding CI for all open PRs (#1207, #1210, #1221).

## Test plan

- [ ] `torch-pr (speculative_decoding, 26.01)` CI passes
- [ ] Verify fix handles both `torch.Tensor` return (old transformers)
and `BatchEncoding` return (new transformers 4.46+)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Signed-off-by: Ye Yu <yeyu@nvidia.com>
Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
@kevalmorabia97 kevalmorabia97 merged commit 9b4f43a into main Apr 9, 2026
42 of 44 checks passed
@kevalmorabia97 kevalmorabia97 deleted the kmorabia/megatron-preprocess-data-improvements branch April 9, 2026 19:38
Edwardf0t1 pushed a commit that referenced this pull request Apr 9, 2026
…_template (#1225)

## Summary

- `apply_chat_template(..., return_tensors="pt")` returns a
`BatchEncoding` in transformers 4.46+, which no longer subclasses `dict`
- The old guard `isinstance(tokenized, dict)` evaluates to `False` for
`BatchEncoding`, so `input_ids` was set to the whole `BatchEncoding`
object
- Calling `.shape[1]` on a `BatchEncoding` triggers
`__getattr__("shape")` → `AttributeError`
- Fix: check `isinstance(tokenized, torch.Tensor)` instead, which
correctly handles both old transformers (plain tensor) and new
transformers (BatchEncoding)

This is causing `test_collect_hidden_states` to fail in the speculative
decoding CI for all open PRs (#1207, #1210, #1221).

## Test plan

- [ ] `torch-pr (speculative_decoding, 26.01)` CI passes
- [ ] Verify fix handles both `torch.Tensor` return (old transformers)
and `BatchEncoding` return (new transformers 4.46+)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Signed-off-by: Ye Yu <yeyu@nvidia.com>
Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
Edwardf0t1 pushed a commit that referenced this pull request Apr 9, 2026
…HF in-memory tokenization, etc (#1221)

### What does this PR do?

Type of change: New feature

Improvements to `megatron_preprocess_data` for Nemotron v3 post-training
datasets and Megatron-Bridge distillation workflows:

- **`--reasoning_content`** flag (`strip` / `inline` / `native`) to
handle the `reasoning_content` field in Nemotron Post-Training v3
assistant messages
- **No intermediate JSONL** for HuggingFace datasets — load directly
from Arrow cache via `_iter_hf_as_json` + `process_hf_split`
- **Return output prefixes** (`list[str]`) from the Python API so
callers can build `--data_paths` without hardcoding paths; also printed
at end of run
- **Gzip input support** — `.jsonl.gz` files accepted directly;
`--input_dir` globs both `*.jsonl` and `*.jsonl.gz`
- **`--strip_newlines`** flag (opt-in) to replace newlines with spaces
in plain-text values; default preserves newlines (no breaking change for
code/structured-text datasets)
- **`--hf_streaming`** flag for very large datasets — only consumed rows
are downloaded; automatically falls back to non-streaming (with a
warning) if `--hf_max_samples_per_split` is not set, since streaming
without a cap is slower than cached non-streaming
- **Auto-shuffle** when `--hf_max_samples_per_split` is set — reservoir
sampling (buffer=10,000, seed=42) applied before capping to avoid biased
prefix sampling
- Remove `_document` suffix from output filenames (`_text.bin` instead
of `_text_document.bin`)
- Fix duplicate BOS token for chat-template data
(`add_special_tokens=False`)
- Fix `TypeError` in `process_hf_split` (`sum(list)` not `sum(int)`)
- Suppress duplicate prints across pool workers via
`_is_main_or_first_worker()`
- Raise `KeyError` instead of warning for missing JSON keys
- Default `hf_split=None` (all splits) instead of `"train"`

### Usage

```python
from modelopt.torch.utils.plugins.megatron_preprocess_data import megatron_preprocess_data

# Nemotron v3 with reasoning content preserved inline as <think>...</think>
prefixes = megatron_preprocess_data(
    hf_dataset="nvidia/Nemotron-Post-Training-Dataset-v3",
    json_keys=["messages"],
    tokenizer_name_or_path="Qwen/Qwen3-0.6B",
    output_dir="tokenized/",
    workers=32,
    reasoning_content="inline",
)
# prefixes == ["tokenized/nvidia--Nemotron-Post-Training-Dataset-v3_..._messages"]
data_paths = [x for p in prefixes for x in ("1.0", p)]

# Large pretraining dataset — stream + cap (auto-shuffled before capping)
prefixes = megatron_preprocess_data(
    hf_dataset="nvidia/Nemotron-CC-v2.1",
    hf_name="High-Quality",
    hf_max_samples_per_split=5_000_000,
    hf_streaming=True,
    json_keys=["text"],
    tokenizer_name_or_path="Qwen/Qwen3-0.6B",
    output_dir="tokenized/",
    workers=32,
    append_eod=True,
    strip_newlines=True,
)
```

### Testing

- New unit tests
- Tested tokenization on Nemotron Pretraining and Post-training v3
datasets

### Before your PR is "*Ready for review*"

Make sure you read and follow [Contributor
guidelines](https://github.com/NVIDIA/Model-Optimizer/blob/main/CONTRIBUTING.md)
and your commits are signed (`git commit -s -S`).

Make sure you read and follow the [Security Best
Practices](https://github.com/NVIDIA/Model-Optimizer/blob/main/SECURITY.md#security-coding-practices-for-contributors)
(e.g. avoiding hardcoded `trust_remote_code=True`, `torch.load(...,
weights_only=False)`, `pickle`, etc.).

- Is this change backward compatible?: ✅ (output filename changed:
`_text_document` → `_text`; existing callers need to re-tokenize or
rename files)
- If you copied code from any other sources or added a new PIP
dependency, did you follow guidance in `CONTRIBUTING.md`: N/A
- Did you write any new necessary tests?: ✅
- Did you update
[Changelog](https://github.com/NVIDIA/Model-Optimizer/blob/main/CHANGELOG.rst)?:
✅

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

* **New Features**
* Preprocessing tool: configurable reasoning-content modes
(strip|inline|native), optional newline stripping, gzip (.jsonl.gz)
input support, HF streaming mode, auto-shuffle when per-split max
samples set, returns output-file prefixes, and processes all HF splits
by default without writing intermediate JSONL.

* **Documentation**
* Consolidated and reformatted dataset preparation and tokenization
guidance; updated install/auth instructions and examples.

* **Tests**
* Tests updated to validate returned prefixes, reasoning-content
behaviors, gzip input handling, HF streaming warnings, and HF output
assertions.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Signed-off-by: Keval Morabia <28916987+kevalmorabia97@users.noreply.github.com>
Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants