-
Notifications
You must be signed in to change notification settings - Fork 48
run each agent in its own subprocess #4381
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codex Review
Here are some automated review suggestions for this pull request.
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| policy_class = load_symbol(resolve_policy_class_path(policy_spec.class_path)) | ||
| policy = policy_class(policy_env_info, **(policy_spec.init_kwargs or {})) # type: ignore[call-arg] | ||
|
|
||
| agent = policy.agent_policy(agent_id=agent_id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Load policy data in subprocess execution path
When run_agents_in_subprocesses is enabled (the new default), policies are launched through run_one_agent.py, but the subprocess path instantiates the policy and immediately grabs agent_policy without invoking policy.load_policy_data even if PolicySpec.data_path is provided (lines 61‑64). The in-process path used to load the checkpoint right after construction, so any policy that depends on saved weights will now run with default/uninitialized parameters whenever subprocess mode is used, yielding incorrect actions for checkpointed evaluations.
Useful? React with 👍 / 👎.
a721a8d to
e32ad14
Compare
| action = self._process.stdout.readline().strip() | ||
| self._step += 1 | ||
| return Action(name=action) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No error handling if subprocess has crashed or closed stdout. readline() will return empty string if the subprocess is dead, resulting in Action(name="") being returned instead of detecting the failure.
Fix:
action = self._process.stdout.readline().strip()
if not action:
raise RuntimeError(f"Subprocess for agent {self._agent_id} crashed or closed connection")
self._step += 1
return Action(name=action)| action = self._process.stdout.readline().strip() | |
| self._step += 1 | |
| return Action(name=action) | |
| action = self._process.stdout.readline().strip() | |
| if not action: | |
| raise RuntimeError(f"Subprocess for agent {self._agent_id} crashed or closed connection") | |
| self._step += 1 | |
| return Action(name=action) |
Spotted by Graphite Agent
Is this helpful? React 👍 or 👎 to let us know.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What would we even do in response to a failure? We'd replace the actions with "noop", and maybe report it somewhere? There's no place to report. And invalid commands will be replaced with noop anyway.
| parts = observation_line.split(" ") | ||
| raw_tokens = parts[2:] # Skip agent ID and step number |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No validation of observation line format. If the line has fewer than 2 space-separated parts, parts[2:] will return an empty list silently. The agent_id and step number from the observation line are extracted but never validated against expected values.
Fix:
parts = observation_line.split(" ")
if len(parts) < 2:
raise RuntimeError(f"Malformed observation line: {observation_line}")
obs_agent_id = int(parts[0], 16)
obs_step = int(parts[1], 16)
if obs_agent_id != agent_id:
raise RuntimeError(f"Agent ID mismatch: expected {agent_id}, got {obs_agent_id}")
raw_tokens = parts[2:]Spotted by Graphite Agent
Is this helpful? React 👍 or 👎 to let us know.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The line-based protocol is a dirty hack. That change won't make it meaningfully less of a hack.
85242c4 to
3c1e175
Compare
| worker = os.path.join(os.path.dirname(__file__), "run_one_agent.py") | ||
| self._socket, child = socket.socketpair() | ||
| child.set_inheritable(True) | ||
| self._file = self._socket.makefile(mode="rw") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using mode="rw" with socket.makefile() is invalid. Python's socket.makefile() doesn't support "rw" mode - it only accepts "r", "w", "rb", "wb" and similar. This will raise a ValueError at runtime. For bidirectional communication, either use separate file objects for reading and writing, or use "r+b" for binary mode (though text mode doesn't support "+").
# Fix: Use separate file objects or handle buffering carefully
self._read_file = self._socket.makefile(mode="r")
self._write_file = self._socket.makefile(mode="w")
# Then use _read_file for readline() and _write_file for write()| self._file = self._socket.makefile(mode="rw") | |
| self._read_file = self._socket.makefile(mode="r") | |
| self._write_file = self._socket.makefile(mode="w") |
Spotted by Graphite Agent
Is this helpful? React 👍 or 👎 to let us know.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No ValueError is raised. Passing "rw" works in practice.
3c1e175 to
c1117d1
Compare
| def _destroy(self): | ||
| if self._process is not None: | ||
| self._process.kill() | ||
| if self._socket is not None: | ||
| self._socket.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Resource leak: self._file is not closed before closing the socket. The file wrapper should be closed first to ensure buffers are flushed and the file descriptor is properly released.
def _destroy(self):
if self._file is not None:
self._file.close()
if self._process is not None:
self._process.kill()
if self._socket is not None:
self._socket.close()| def _destroy(self): | |
| if self._process is not None: | |
| self._process.kill() | |
| if self._socket is not None: | |
| self._socket.close() | |
| def _destroy(self): | |
| if self._file is not None: | |
| self._file.close() | |
| if self._process is not None: | |
| self._process.kill() | |
| if self._socket is not None: | |
| self._socket.close() |
Spotted by Graphite Agent
Is this helpful? React 👍 or 👎 to let us know.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The docs say that closing the wrapper won't close the underlying fd. It seems that the fd is the only thing that really needs to be closed.
| ready = self._file.readline().strip() | ||
| if ready != "READY": | ||
| self._destroy() | ||
| raise RuntimeError("Failed to start agent subprocess") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The readline() call could hang indefinitely if the subprocess crashes during initialization but doesn't close the socket. Should add a timeout or check if the subprocess is still alive. Additionally, if readline() returns empty string (subprocess died), the error message won't capture any stderr output from the subprocess that could explain what went wrong.
ready = self._file.readline().strip()
if ready != "READY":
# Check if process died and try to get error info
if self._process.poll() is not None:
raise RuntimeError(f"Agent subprocess died during initialization (exit code: {self._process.returncode})")
self._destroy()
raise RuntimeError("Failed to start agent subprocess")Spotted by Graphite Agent
Is this helpful? React 👍 or 👎 to let us know.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if the subprocess crashes during initialization but doesn't close the socket
The parent closes its copy of the fd after passing it to the child. The child has the last remaining copy of the fd, so the OS will close it if the child dies. And with no mechanism for bubbling errors upwards anyway, I'm not sure it's worthwhile to collect more detail at this level.
c1117d1 to
31768c5
Compare
31768c5 to
b7a5e5a
Compare
|
|
||
| init_kwargs: dict[str, Any] = Field(default_factory=dict) | ||
|
|
||
| python_path: list[str] = Field(default=[], description="Optional PYTHONPATH entries to add when loading the policy") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this change
Not a bad time to change the docstring to say something like "Specification for a locally initializable policy"
| policy_spec.init_kwargs = kwargs | ||
|
|
||
| if sandbox: | ||
| policy = PipedPolicyWrapper(policy_spec, policy_env_info, **kwargs) # type: ignore[call-arg] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if having callers of initialize_or_load_policy(..., sandbox=True) instead pass a PolicySpec(policy_class=path.to.PipedPolicyWrapper). The upside is keeping initialize_or_load_policy simple, which may not be worth it; in the current format, we'd need to them specify the substantive policy spec as init_kwargs to the PipedPolicyWrapper, which seems awful
My guess is that what you have written is the better option, and unless my rambling comment makes you feel otherwise, ignore it
| """ | ||
|
|
||
| policy_class = load_symbol(resolve_policy_class_path(policy_spec.class_path)) | ||
| policy_spec = policy_spec.model_copy(deep=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for this
| if sandbox: | ||
| policy = PipedPolicyWrapper(policy_spec, policy_env_info, **kwargs) # type: ignore[call-arg] | ||
| else: | ||
| for path in reversed(policy_spec.python_path): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think moving this to be within this function, instead of its caller, makes sense; thanks for the change
| kwargs["device"] = device_override | ||
| policy_spec.init_kwargs = kwargs | ||
|
|
||
| if sandbox: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we should call this subprocess_piped or piped or something instead of sandbox.
| # TODO(rhys): we'll want to hear about the end of the simulation so we know when to kill | ||
| self._destroy() # This is still a single-use wrapper, don't reset | ||
| else: | ||
| worker = os.path.join(os.path.dirname(__file__), "run_one_agent.py") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't feel strongly, but run_one_agent.py could define SELF_PATH = __file__ and this could import it, so that if it moves this impl doesn't break
| # That's our cue to create the subprocess. | ||
| if self._process is not None: | ||
| # TODO(rhys): we'll want to hear about the end of the simulation so we know when to kill | ||
| self._destroy() # This is still a single-use wrapper, don't reset |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we want to support reset getting called more than once (after init), then maybe _destroy should set self_process /etc to None. If we don't, then maybe this if suite should error instead of calling ._destroy()
|
|
||
| def step(self, obs: AgentObservation) -> Action: | ||
| if self._process is None or self._file is None: | ||
| return Action(name="noop") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this raise an error instead?
| if len(setup_lines) < 3: | ||
| raise RuntimeError("Insufficient setup lines received") | ||
|
|
||
| agent_id = int(setup_lines[0], 16) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a slight preference for the setup step sending one line as json with agent_id, policy_spec, and policy_env_info
then this side doesn't have to wait until READY; it just responds READY after the first line. and we remove one instance of str-to-hex and hex-str-to-int
| agent_id = int(setup_lines[0], 16) | ||
| policy_spec = PolicySpec.model_validate_json(setup_lines[1]) | ||
|
|
||
| for path in reversed(policy_spec.python_path): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this just call initialize_or_load_policy?
|
|
||
| agent = policy.agent_policy(agent_id=agent_id) | ||
|
|
||
| assembler_protocols: list[ProtocolConfig] = [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i dont totally follow these lines -- can we change PolicyEnvInfo's serialize/deserialize to handle this itself?
|
This PR has been marked as stale due to 10 days of inactivity. |
nishu-builder
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
requesting changes to get this out of my graphite inbox; i figure you've got a round of merge conflict addressing + testing before it's ready for review, but let me know if not and if it's ready now
We'll land this as three separate commits. But here's the whole stack for CI and review.
Asana Task