Conversation
Greptile SummaryThis PR introduces a Confidence Score: 4/5Safe to merge with awareness of the lock-contention hazard in getattr when the RPyC server has not yet been started. All findings are P2, but the lock-holding during rpyc.connect() can block stop() for up to 30 seconds on the first app.SomeModule access — a real reliability concern for concurrent usage. The global config mutation and stale skill-cache issues are edge cases. Score is 4 rather than 5 to prompt at least the lock fix before shipping. dimos/porcelain.py — specifically _get_rpyc_proxy (lock contention), run() (global_config side-effect), and _SkillsProxy._build_cache() (silent error suppression + stale cache after restart). Important Files Changed
Sequence DiagramsequenceDiagram
participant U as User
participant D as Dimos
participant C as ModuleCoordinator
participant R as RPCClient
participant A as Actor
participant W as WorkerProcess
participant P as RPyCServer
U->>D: run(unitree-go2-agentic)
D->>C: build(blueprint)
C->>W: DeployModuleRequest
W-->>C: WorkerResponse(module_id)
C-->>D: coordinator ready
U->>D: app.skills.relative_move(forward=2.0)
D->>R: get_skills() via LCM RPC
R-->>D: list[SkillInfo]
D->>R: relative_move via RpcCall
R-->>U: result
U->>D: app.SomeModule
D->>A: start_rpyc()
A->>W: StartRpycRequest via pipe
W->>P: _start_rpyc_server()
P-->>W: port bound
W-->>A: WorkerResponse(port)
D->>P: rpyc.connect(localhost, port)
P-->>D: module_proxy
D-->>U: RPyC module proxy
U->>D: stop()
D->>P: conn.close()
D->>C: stop()
C->>W: ShutdownRequest
Reviews (1): Last reviewed commit: "feat(api): add porcelain api" | Re-trigger Greptile |
| def _get_rpyc_proxy(self, module_class: type[ModuleBase], proxy: RPCClient) -> Any: | ||
| """Get or create an RPyC proxy to a remote module instance.""" | ||
| if module_class in self._rpyc_cache: | ||
| conn, module_proxy = self._rpyc_cache[module_class] | ||
| if not conn.closed: | ||
| return module_proxy | ||
|
|
||
| actor = proxy.actor_instance | ||
| port = actor.start_rpyc() | ||
| conn = rpyc.connect("localhost", port, config={"sync_request_timeout": 30}) | ||
| module_proxy = conn.root.get_module(actor._module_id) | ||
| self._rpyc_cache[module_class] = (conn, module_proxy) | ||
| return module_proxy |
There was a problem hiding this comment.
RPyC connect blocks
self._lock for up to 30 s
_get_rpyc_proxy is always called while self._lock is held (inside the with self._lock: block in __getattr__). The rpyc.connect() call on line 145 has sync_request_timeout=30, meaning any thread that calls stop(), run(), restart(), or skills will block for up to 30 seconds while a new RPyC connection is being established. Consider releasing the lock before the network call:
def _get_rpyc_proxy(self, module_class: type[ModuleBase], proxy: RPCClient) -> Any:
with self._lock:
if module_class in self._rpyc_cache:
conn, module_proxy = self._rpyc_cache[module_class]
if not conn.closed:
return module_proxy
# Establish connection outside the global lock
actor = proxy.actor_instance
port = actor.start_rpyc()
conn = rpyc.connect("localhost", port, config={"sync_request_timeout": 30})
module_proxy = conn.root.get_module(actor._module_id)
with self._lock:
self._rpyc_cache[module_class] = (conn, module_proxy)
return module_proxy| if self._coordinator is None: | ||
| from dimos.core.coordination.module_coordinator import ModuleCoordinator | ||
| from dimos.core.global_config import global_config | ||
|
|
||
| if self._config_overrides: | ||
| global_config.update(**self._config_overrides) | ||
| self._coordinator = ModuleCoordinator.build(blueprint) |
There was a problem hiding this comment.
global_config.update() is last-writer-wins across instances
global_config is a module-level singleton. If two Dimos instances are created with different overrides and both call run(), the second call overwrites the first's config. This affects any module that reads global_config after both coordinators are built (e.g., newly spawned workers). In test suites that create multiple Dimos instances this can cause subtle ordering-dependent failures.
| for cls, proxy in self._coordinator._deployed_modules.items(): | ||
| try: | ||
| skills: list[SkillInfo] = proxy.get_skills() # type: ignore[attr-defined] | ||
| except Exception: | ||
| continue |
There was a problem hiding this comment.
Silent
except Exception: continue hides module connectivity errors
If proxy.get_skills() fails (e.g. the module process crashed, the RPC channel is down, or the method simply doesn't exist), the module is silently skipped. The caller then sees AttributeError: No skill named 'foo' with no indication that some modules failed to report their skills. At minimum, log the error at WARNING level:
try:
skills: list[SkillInfo] = proxy.get_skills() # type: ignore[attr-defined]
except Exception as exc:
logger.warning("Failed to get skills from module", module=cls.__name__, error=str(exc))
continue| def _build_cache(self) -> None: | ||
| modules_key = frozenset(self._coordinator._deployed_modules.keys()) | ||
| if self._cache_key == modules_key and self._cache is not None: | ||
| return | ||
|
|
||
| skill_map: dict[str, list[tuple[type[ModuleBase], RPCClient, SkillInfo]]] = {} | ||
| for cls, proxy in self._coordinator._deployed_modules.items(): | ||
| try: | ||
| skills: list[SkillInfo] = proxy.get_skills() # type: ignore[attr-defined] | ||
| except Exception: | ||
| continue | ||
| for info in skills: | ||
| skill_map.setdefault(info.func_name, []).append((cls, proxy, info)) # type: ignore[arg-type] | ||
|
|
||
| self._cache = skill_map | ||
| self._cache_key = modules_key |
There was a problem hiding this comment.
_SkillsProxy cache not invalidated after restart()
_build_cache() keys its short-circuit on frozenset(_coordinator._deployed_modules.keys()). A restart() call replaces a module in-place (same class key, potentially new RPCClient proxy), so the frozenset is unchanged and the cache is NOT rebuilt. A stored _SkillsProxy will keep calling the old proxy, which may be connected to a dead worker. Because each app.skills access creates a fresh _SkillsProxy this only bites users who store the proxy (skills = app.skills; app.restart(...); skills.foo()). Consider keying the cache on a monotonic generation counter bumped on each restart.
|
Closed in favor of #1779 |
Problem
Closes DIM-XXX
Solution
Breaking Changes
How to Test
Contributor License Agreement