[fix,refactor] Isolate notify_data_update ZMQ I/O into a dedicated background asyncio loop#117
Conversation
…background asyncio loop Root cause: `notify_data_update` ran on the caller's asyncio loop. Under heavy compute workloads (PyTorch, Ray), the loop would stall, causing `asyncio.wait_for` timers to fire before the controller ACK was ever read. Changes: - Spin up a dedicated daemon thread (`_notify_thread`) running an isolated `_notify_loop` per StorageManager instance, keeping notify I/O immune to caller-side event loop starvation - Pre-connect a single reusable async DEALER socket (`_notify_sock`) in `_init_notify_zmq`; bridge calls via `run_coroutine_threadsafe` + `wrap_future` so the caller-side `await` remains non-blocking - Add `_notify_lock` to serialize concurrent `notify_data_update` calls on the shared socket, preventing send/recv interleaving across coroutines - Guard `close()` with `hasattr(_notify_loop)` to avoid `AttributeError` in `__del__` when `__init__` fails before the loop is created Signed-off-by: yxstev <zhangyixiang9@huawei.com>
CLA Signature Passdodatboii, thanks for your pull request. All authors of the commits have signed the CLA. 👍 |
Signed-off-by: yxstev <zhangyixiang9@huawei.com>
CLA Signature Passdodatboii, thanks for your pull request. All authors of the commits have signed the CLA. 👍 |
There was a problem hiding this comment.
Pull request overview
This PR refactors StorageManager.notify_data_update() to move ZMQ notify send/recv off the caller’s asyncio event loop by introducing a dedicated background thread with its own asyncio loop and a pre-connected async DEALER socket, aiming to avoid event-loop starvation under heavy compute workloads.
Changes:
- Added a per-
StorageManagerdaemon thread running a dedicated asyncio loop for notify I/O. - Pre-initialized and reused a single async DEALER socket for notify traffic, bridging calls via
run_coroutine_threadsafe+wrap_future. - Added locking around notify send/recv to prevent concurrent interleaving on the shared socket.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| self._notify_zmq_ctx: zmq.asyncio.Context | None = None | ||
| self._notify_sock: zmq.asyncio.Socket | None = None | ||
| self._notify_lock = asyncio.Lock() | ||
| notify_sock_ready = threading.Event() | ||
| asyncio.run_coroutine_threadsafe(self._init_notify_zmq(notify_sock_ready), self._notify_loop) |
| notify_sock_ready = threading.Event() | ||
| asyncio.run_coroutine_threadsafe(self._init_notify_zmq(notify_sock_ready), self._notify_loop) | ||
| notify_sock_ready.wait() |
| async def _notify_and_wait(self, request_msg: list) -> None: | ||
| """Send a data status notification to the controller and block until ACK is received.""" | ||
| assert self._notify_sock is not None | ||
|
|
||
| async with self._notify_lock: | ||
| await self._notify_sock.send_multipart(request_msg) |
| self._notify_zmq_ctx: zmq.asyncio.Context | None = None | ||
| self._notify_sock: zmq.asyncio.Socket | None = None | ||
| self._notify_lock = asyncio.Lock() | ||
| notify_sock_ready = threading.Event() | ||
| asyncio.run_coroutine_threadsafe(self._init_notify_zmq(notify_sock_ready), self._notify_loop) |
There was a problem hiding this comment.
zmq.Context() is thread safe. We don't need to introduce a _init_notify_zmq function here.
| try: | ||
| self._notify_zmq_ctx = zmq.asyncio.Context() | ||
| identity = f"{self.storage_manager_id}-notify-{uuid4().hex[:8]}".encode() | ||
| self._notify_sock = create_zmq_socket( |
There was a problem hiding this comment.
We can preserve the previous dynamic socket design during the run
| """Send a data status notification to the controller and block until ACK is received.""" | ||
| assert self._notify_sock is not None | ||
|
|
||
| async with self._notify_lock: |
There was a problem hiding this comment.
Using the dynamic socket we can delete this lock
Signed-off-by: yxstev <zhangyixiang9@huawei.com>
CLA Signature Passdodatboii, thanks for your pull request. All authors of the commits have signed the CLA. 👍 |
Signed-off-by: yxstev <zhangyixiang9@huawei.com>
CLA Signature Passdodatboii, thanks for your pull request. All authors of the commits have signed the CLA. 👍 |
|
|
||
| async def _notify_and_wait(self, request_msg: list) -> None: | ||
| """Send a data status notification to the controller and block until ACK is received.""" | ||
| zmq_ctx = zmq.asyncio.Context() |
There was a problem hiding this comment.
Put this line during init and revert the _connect_to_controller
| except Exception: | ||
| pass | ||
| sock.close() | ||
| zmq_ctx.term() |
There was a problem hiding this comment.
do not term this during each run
| await sock.send_multipart(error_msg) | ||
| except Exception: | ||
| pass | ||
| raise TimeoutError( |
There was a problem hiding this comment.
I suggest we still use logger.error rather than raise error
Signed-off-by: yxstev <zhangyixiang9@huawei.com>
CLA Signature Passdodatboii, thanks for your pull request. All authors of the commits have signed the CLA. 👍 |
CLA Signature Pass0oshowero0, thanks for your pull request. All authors of the commits have signed the CLA. 👍 |
CLA Signature Pass0oshowero0, thanks for your pull request. All authors of the commits have signed the CLA. 👍 |
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
CLA Signature Pass0oshowero0, thanks for your pull request. All authors of the commits have signed the CLA. 👍 |
| thread_future = asyncio.run_coroutine_threadsafe( | ||
| self._notify_and_wait(request_msg), | ||
| self._notify_loop, | ||
| ) | ||
| await asyncio.wrap_future(thread_future) |
CLA Signature Pass0oshowero0, thanks for your pull request. All authors of the commits have signed the CLA. 👍 |
Root cause:
notify_data_updateran on the caller's asyncio loop. Under heavy compute workloads (PyTorch, Ray), the loop would stall, causingasyncio.wait_fortimers to fire before the controller ACK was ever read.Changes:
_notify_thread) running an isolated_notify_loopper StorageManager instance, keeping notify I/O immune to caller-side event loop starvationrun_coroutine_threadsafe+wrap_futureso the caller-sideawaitremains non-blocking_notify_lockto serialize concurrentnotify_data_updatecalls on the dynamic socket, preventing send/recv interleaving across coroutinesclose()withhasattr(_notify_loop)to avoidAttributeErrorin__del__when__init__fails before the loop is created