From 83e19f705de05b24d698ee5aa42d8fb9d8f625cd Mon Sep 17 00:00:00 2001 From: Paul Nechifor Date: Wed, 22 Apr 2026 07:11:13 +0300 Subject: [PATCH] fix(tests): fix flakey porcelain test --- dimos/porcelain/remote_module_source.py | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/dimos/porcelain/remote_module_source.py b/dimos/porcelain/remote_module_source.py index bbf08186e0..7a33a4271e 100644 --- a/dimos/porcelain/remote_module_source.py +++ b/dimos/porcelain/remote_module_source.py @@ -17,6 +17,7 @@ import copyreg import pickle import threading +import time from types import MappingProxyType from typing import TYPE_CHECKING, Any @@ -25,6 +26,8 @@ from dimos.porcelain.module_source import ModuleSource from dimos.utils.logging_config import setup_logger +_CONNECT_RETRY_DEADLINE_S = 2.0 + if TYPE_CHECKING: from dimos.core.coordination.blueprints import Blueprint @@ -53,7 +56,7 @@ class RemoteModuleSource(ModuleSource): is_remote = True def __init__(self, host: str, port: int) -> None: - self._coord_conn = rpyc.connect(host, port, config={"sync_request_timeout": 30}) + self._coord_conn = _rpyc_connect(host, port, config={"sync_request_timeout": 30}) self._cache: dict[str, tuple[rpyc.Connection, Any]] = {} self._lock = threading.RLock() @@ -68,7 +71,7 @@ def get_rpyc_module(self, name: str) -> Any: endpoint = self._coord_conn.root.get_module_endpoint(name) host, port, module_id = endpoint[0], int(endpoint[1]), int(endpoint[2]) - conn = rpyc.connect(host, port, config={"sync_request_timeout": 30}) + conn = _rpyc_connect(host, port, config={"sync_request_timeout": 30}) module = conn.root.get_module(module_id) self._cache[name] = (conn, module) return module @@ -105,3 +108,16 @@ def close(self) -> None: self._coord_conn.close() except Exception: pass + + +def _rpyc_connect(host: str, port: int, **kwargs: Any) -> rpyc.Connection: + deadline = time.monotonic() + _CONNECT_RETRY_DEADLINE_S + delay = 0.010 + while True: + try: + return rpyc.connect(host, port, **kwargs) + except ConnectionRefusedError: + if time.monotonic() >= deadline: + raise + time.sleep(delay) + delay = min(delay * 2, 0.200)