Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ loss_fn:

checkpointing:
enabled: true
checkpoint_dir: "results/grpo"
metric_name: "val:accuracy"
higher_is_better: true
keep_top_k: 3
Expand Down Expand Up @@ -229,13 +230,23 @@ policy:
num_nodes: null # Decides number of nodes to be dedicated to generation

data:
# Using the prepared train and validation datasets (downloaded from HuggingFace and split 90/10)
# Train: 1129 samples, Validation: 126 samples
train_jsonl_fpath: 3rdparty/Gym-workspace/Gym/data/workplace_assistant/train.jsonl
validation_jsonl_fpath: 3rdparty/Gym-workspace/Gym/data/workplace_assistant/validation.jsonl
max_input_seq_length: null # nemogym dataset doesn't use this parameter
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bxyu-nvidia @yuki-97 where does the truncation happen in this case? vllm will have max length which should prevent generating beyond that max_length set in the generation config, but does gym know to respect a max seqlength if tacking on an environment or tool output?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious as well, and I guess it should be in gym or didn't measure the max length (I saw lots of below warnings when using gym env).

ERROR 02-03 00:50:30 [serving_chat.py:257] ValueError: This model's maximum context length is 8192 tokens. However, your request has 9691 input tokens. Please reduce the length of the input messages.

since gym needs to directly pass the raw data (string) to gym env to let it handle all things now, so there's no way for nemorl to get its token_ids and handle the max_length.

shuffle: true
num_workers: 0

# Using the prepared train and validation datasets (downloaded from HuggingFace and split 90/10)
# Train: 1129 samples, Validation: 126 samples
train:
data_path: 3rdparty/Gym-workspace/Gym/data/workplace_assistant/train.jsonl
validation:
data_path: 3rdparty/Gym-workspace/Gym/data/workplace_assistant/validation.jsonl
default:
dataset_name: NemoGymDataset
env_name: "nemo_gym"
prompt_file: null # nemogym dataset doesn't use this parameter
system_prompt_file: null # nemogym dataset doesn't use this parameter
processor: "nemo_gym_data_processor"

env:
should_use_nemo_gym: true
should_log_nemo_gym_responses: true # If you have low logging storage, set this to false
Expand Down
70 changes: 11 additions & 59 deletions examples/nemo_gym/run_grpo_nemo_gym.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,8 @@
# limitations under the License.

import argparse
import json
import os
import pprint
from itertools import chain, repeat
from typing import Optional

# Increase the W&B single object size warning threshold. Initially 100_000 (100 KB) -> 10_000_000 (10 MB)
import wandb.util
Expand All @@ -42,18 +39,13 @@
setup,
)
from nemo_rl.algorithms.utils import get_tokenizer
from nemo_rl.data.datasets import AllTaskProcessedDataset
from nemo_rl.data.interfaces import DatumSpec
from nemo_rl.distributed.ray_actor_environment_registry import (
get_actor_python_env,
)
from nemo_rl.data.utils import setup_response_data
from nemo_rl.distributed.virtual_cluster import init_ray
from nemo_rl.environments.nemo_gym import (
NemoGym,
NemoGymConfig,
nemo_gym_example_to_nemo_rl_datum_spec,
setup_nemo_gym_config,
)
from nemo_rl.environments.utils import create_env
from nemo_rl.experience.rollouts import run_async_nemo_gym_rollout
from nemo_rl.models.generation import configure_generation_config
from nemo_rl.utils.config import load_config, parse_hydra_overrides
Expand All @@ -75,40 +67,6 @@ def parse_args() -> tuple[argparse.Namespace, list[str]]:
return args, overrides


def setup_single_nemo_gym_dataset(
jsonl_fpath: str, tokenizer, num_repeats: Optional[int] = None
):
with open(jsonl_fpath) as f:
nemo_gym_examples = list(map(json.loads, f))

print(f"Loaded data at {jsonl_fpath}. Found {len(nemo_gym_examples)} examples")

if num_repeats:
previous_length = len(nemo_gym_examples)
nemo_gym_examples = list(
chain.from_iterable(
repeat(nemo_gym_example, num_repeats)
for nemo_gym_example in nemo_gym_examples
)
)
print(
f"Repeating examples (in a pattern of abc to aabbcc) for {jsonl_fpath} from {previous_length} to {len(nemo_gym_examples)}!"
)

nemo_rl_compatible_examples: list[DatumSpec] = [
nemo_gym_example_to_nemo_rl_datum_spec(nemo_gym_example, idx)
for idx, nemo_gym_example in enumerate(nemo_gym_examples)
]

passthrough_task_processor = lambda datum_dict, *args, **kwargs: datum_dict
return AllTaskProcessedDataset(
nemo_rl_compatible_examples,
tokenizer,
None,
passthrough_task_processor,
)


# These types are directly imported from grpo_train since if something about the architecture changes we want to immediately fail.
def collect_trajectories(
policy: ColocatablePolicyInterface,
Expand Down Expand Up @@ -165,7 +123,7 @@ def main() -> None:
if not args.config:
args.config = os.path.join(
os.path.dirname(__file__),
"grpo_dapo17k_bytedtsinghua_qwen3_4binstruct_nf.yaml",
"grpo_workplace_assistant_nemotron_nano_v2_9b.yaml",
)

config = load_config(args.config)
Expand Down Expand Up @@ -201,14 +159,10 @@ def main() -> None:
# We assert here since this is right after the final config has been materialized.
assert _should_use_nemo_gym(config)

# NeMo-Gym environment needs to get dp_openai_server_base_urls from policy_generation, so we don't setup env here.
print("\n▶ Setting up data...")
train_dataset = setup_single_nemo_gym_dataset(
jsonl_fpath=config["data"]["train_jsonl_fpath"],
tokenizer=tokenizer,
)
val_dataset = setup_single_nemo_gym_dataset(
jsonl_fpath=config["data"]["validation_jsonl_fpath"],
tokenizer=tokenizer,
train_dataset, val_dataset = setup_response_data(
tokenizer, config["data"], env_configs=None
)

# Validation dataset config setup.
Expand Down Expand Up @@ -254,15 +208,13 @@ def main() -> None:
base_urls=policy_generation.dp_openai_server_base_urls,
initial_global_config_dict=config["env"]["nemo_gym"],
)
nemo_gym = NemoGym.options(
runtime_env={
"py_executable": get_actor_python_env(
"nemo_rl.environments.nemo_gym.NemoGym"
),
}
).remote(nemo_gym_config)
# Default nemo_gym env is used for trajectory collection
nemo_gym = create_env(env_name="nemo_gym", env_config=nemo_gym_config)
# Blocking wait for NeMo-Gym to spin up
ray.get(nemo_gym.health_check.remote())

# Bind task_to_env and val_task_to_env for nemo_gym env
# Hardcode here to match `run_async_nemo_gym_rollout`
task_to_env = {"nemo_gym": nemo_gym}
val_task_to_env = task_to_env

Expand Down
4 changes: 2 additions & 2 deletions examples/run_distillation.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

from nemo_rl.algorithms.distillation import MasterConfig, distillation_train, setup
from nemo_rl.algorithms.utils import get_tokenizer
from nemo_rl.data.utils import setup_data_with_envs
from nemo_rl.data.utils import setup_response_data
from nemo_rl.distributed.virtual_cluster import init_ray
from nemo_rl.models.generation import configure_generation_config
from nemo_rl.utils.config import load_config, parse_hydra_overrides
Expand Down Expand Up @@ -79,7 +79,7 @@ def main() -> None:
val_dataset,
task_to_env,
val_task_to_env,
) = setup_data_with_envs(tokenizer, config["data"], config["env"])
) = setup_response_data(tokenizer, config["data"], config["env"])

(
student_policy,
Expand Down
4 changes: 2 additions & 2 deletions examples/run_grpo.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

from nemo_rl.algorithms.grpo import MasterConfig, grpo_train, setup
from nemo_rl.algorithms.utils import get_tokenizer
from nemo_rl.data.utils import setup_data_with_envs
from nemo_rl.data.utils import setup_response_data
from nemo_rl.distributed.virtual_cluster import init_ray
from nemo_rl.models.generation import configure_generation_config
from nemo_rl.utils.config import load_config, parse_hydra_overrides
Expand Down Expand Up @@ -91,7 +91,7 @@ def main() -> None:
val_dataset,
task_to_env,
val_task_to_env,
) = setup_data_with_envs(tokenizer, config["data"], config["env"])
) = setup_response_data(tokenizer, config["data"], config["env"])

(
policy,
Expand Down
4 changes: 2 additions & 2 deletions examples/run_vlm_grpo.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

from nemo_rl.algorithms.grpo import MasterConfig, grpo_train, setup
from nemo_rl.algorithms.utils import get_tokenizer
from nemo_rl.data.utils import setup_data_with_envs
from nemo_rl.data.utils import setup_response_data
from nemo_rl.distributed.virtual_cluster import init_ray
from nemo_rl.models.generation import configure_generation_config
from nemo_rl.utils.config import load_config, parse_hydra_overrides
Expand Down Expand Up @@ -97,7 +97,7 @@ def main() -> None:
val_dataset,
task_to_env,
val_task_to_env,
) = setup_data_with_envs(processor, config["data"], config["env"], is_vlm=True)
) = setup_response_data(processor, config["data"], config["env"], is_vlm=True)

(
policy,
Expand Down
2 changes: 1 addition & 1 deletion nemo_rl/data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class PreferenceDatasetConfig(TypedDict):


class DataConfig(TypedDict):
max_input_seq_length: int
max_input_seq_length: int | None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# First, let's find the file and inspect the TypedDict definition
find . -name "__init__.py" -path "*/nemo_rl/data/*" -type f

# Then search for AllTaskProcessedDataset class and max_seq_length/max_input_seq_length usage
rg -n "class AllTaskProcessedDataset|max_seq_length|max_input_seq_length" -g'*.py' -C 3

Repository: NVIDIA-NeMo/RL

Length of output: 28206


🏁 Script executed:

# Check AllTaskProcessedDataset.__init__ for validation or safeguards
sed -n '46,70p' nemo_rl/data/datasets/processed_dataset.py

# Check example config files to see how max_input_seq_length is set
find . -name "*.yaml" -path "*/examples/configs/*" -type f | head -10
rg "max_input_seq_length" -g'*.yaml' -A 2 -B 2

Repository: NVIDIA-NeMo/RL

Length of output: 17681


Add None checks in task data processors or validate max_seq_length at dataset initialization.

The type definitions allow max_seq_length: int | None, but processor functions perform unsafe comparisons and arithmetic operations (e.g., if length > max_seq_length, max_seq_length // len(...)) without None guards. At least one example config (nemogym) explicitly sets max_input_seq_length: null, confirming that None can flow to AllTaskProcessedDataset. Either add explicit None checks in all processor functions before using max_seq_length, or validate and reject None during AllTaskProcessedDataset.__init__ if this field is required.

🤖 Prompt for AI Agents
In `@nemo_rl/data/__init__.py` at line 47, The code permits max_input_seq_length
(alias max_seq_length) to be None but downstream processors perform unsafe
comparisons/arithmetic with it; either add explicit None checks in every
processor function that uses max_input_seq_length (e.g., guard patterns in
token/window truncate logic before any "if length > max_input_seq_length" or
"max_input_seq_length // ..." operations) or enforce non-None at dataset
construction by validating in AllTaskProcessedDataset.__init__ (raise/config
error if max_input_seq_length is None) so processors can assume an int. Update
references to max_input_seq_length/max_seq_length in the processor functions and
AllTaskProcessedDataset to implement the chosen approach and add a clear error
message when rejecting None.

add_bos: NotRequired[bool]
add_eos: NotRequired[bool]
add_generation_prompt: NotRequired[bool]
Expand Down
3 changes: 3 additions & 0 deletions nemo_rl/data/datasets/response_datasets/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from nemo_rl.data.datasets.response_datasets.deepscaler import DeepScalerDataset
from nemo_rl.data.datasets.response_datasets.geometry3k import Geometry3KDataset
from nemo_rl.data.datasets.response_datasets.helpsteer3 import HelpSteer3Dataset
from nemo_rl.data.datasets.response_datasets.nemogym_dataset import NemoGymDataset
from nemo_rl.data.datasets.response_datasets.oai_format_dataset import (
OpenAIFormatDataset,
)
Expand Down Expand Up @@ -50,6 +51,7 @@
"tulu3_sft_mixture": Tulu3SftMixtureDataset,
# load from local JSONL file or HuggingFace
"openai_format": OpenAIFormatDataset,
"NemoGymDataset": NemoGymDataset,
"ResponseDataset": ResponseDataset,
}

Expand Down Expand Up @@ -87,6 +89,7 @@ def load_response_dataset(data_config: ResponseDatasetConfig):
"DeepScalerDataset",
"Geometry3KDataset",
"HelpSteer3Dataset",
"NemoGymDataset",
"OasstDataset",
"OpenAIFormatDataset",
"OpenMathInstruct2Dataset",
Expand Down
43 changes: 43 additions & 0 deletions nemo_rl/data/datasets/response_datasets/nemogym_dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from datasets import Dataset

from nemo_rl.data.datasets.raw_dataset import RawDataset


class NemoGymDataset(RawDataset):
"""Simple wrapper around the Nemo Gym dataset."""

def __init__(self, data_path: str, repeat: int = 1, **kwargs) -> None:
self.task_name = "-".join(data_path.split("/")[-2:]).split(".")[0]
if self.task_name[0] == "-":
self.task_name = self.task_name[1:]

# load raw line from jsonl
# will use `json.loads` to load to dict format at `nemo_gym_data_processor` later since `Dataset` cannot handle nested structure well
with open(data_path) as f:
self.dataset = [raw_line for raw_line in f]
Comment on lines +23 to +31
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Document __init__ and silence the unused kwargs lint.

Suggested fix
-    def __init__(self, data_path: str, repeat: int = 1, **kwargs) -> None:
+    def __init__(self, data_path: str, repeat: int = 1, **kwargs) -> None:
+        """Initialize the Nemo Gym dataset.
+
+        Args:
+            data_path: Path to the JSONL data file.
+            repeat: Number of times to repeat the dataset.
+            **kwargs: Unused extra args for RawDataset compatibility.
+        """
+        _ = kwargs
         self.task_name = "-".join(data_path.split("/")[-2:]).split(".")[0]
🧰 Tools
🪛 Ruff (0.14.14)

[warning] 23-23: Unused method argument: kwargs

(ARG002)

🤖 Prompt for AI Agents
In `@nemo_rl/data/datasets/response_datasets/nemogym_dataset.py` around lines 23 -
31, Add a short docstring to the __init__ method describing parameters
(data_path: path to jsonl file, repeat: repetition count) and what attributes
are created (task_name and dataset), and silence the unused kwargs lint by
either renaming kwargs to _kwargs or explicitly consuming it (e.g., _ = kwargs)
or adding a comment like # noqa: F401 after kwargs; update the __init__
signature reference in the docstring and mention task_name and dataset so
reviewers can find the code (look for __init__, task_name, dataset, kwargs).


# format the dataset
self.dataset = Dataset.from_dict(
{
"extra_env_info": self.dataset,
"task_name": [self.task_name] * len(self.dataset),
}
)

# repeat the dataset
if repeat > 1:
self.dataset = self.dataset.repeat(repeat)
23 changes: 23 additions & 0 deletions nemo_rl/data/processors.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

"""Contains data processors for evaluation."""

import json
import logging
from typing import Any, Dict, cast

Expand Down Expand Up @@ -663,6 +664,27 @@ def multichoice_qa_processor(
return output


def nemo_gym_data_processor(
datum_dict: dict[str, Any],
task_data_spec: TaskDataSpec,
tokenizer: TokenizerType,
max_seq_length: int,
idx: int,
) -> DatumSpec:
"""Process a datum dictionary (directly loaded from dataset) into a DatumSpec for Nemo Gym."""
output: DatumSpec = {
# load to dict format here since `Dataset` cannot handle nested structure well in `NemoGymDataset`
"extra_env_info": json.loads(datum_dict["extra_env_info"]),
"loss_multiplier": 1.0,
"idx": idx,
"task_name": datum_dict["task_name"],
# fake keys for compatibility with the current GRPO implementation
"message_log": [{"role": "user", "content": "", "token_ids": torch.tensor([])}],
"length": 0,
}
return output


# Processor registry. Key is the processor name, value is the processor function.
# Note: We cast the literal dict to Dict[str, TaskDataProcessFnCallable] because
# type checkers see each concrete function's signature as a distinct callable type.
Expand All @@ -679,6 +701,7 @@ def multichoice_qa_processor(
"multichoice_qa_processor": multichoice_qa_processor,
"sft_processor": sft_processor,
"vlm_hf_data_processor": vlm_hf_data_processor,
"nemo_gym_data_processor": nemo_gym_data_processor,
},
)

Expand Down
Loading
Loading