-
Notifications
You must be signed in to change notification settings - Fork 477
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix handler memory leak, get rid of mp.Manager #373
Conversation
src/petals/server/handler.py
Outdated
@@ -146,6 +136,8 @@ async def rpc_inference( | |||
active_adapter = self._get_active_adapter(metadata) | |||
points = metadata.get("points", 0) | |||
session_id = metadata.get("session_id") | |||
todo_use_async_with = self._managed_session(session_id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's make with
in self._iterate_inference_steps()
?
Co-authored-by: Alexander Borzunov <borzunov.alexander@gmail.com>
src/petals/server/handler.py
Outdated
else: | ||
assert code == "PUSH", f"unexpected code: {code}" | ||
maybe_session_queue = self._session_queues.get(session_id) | ||
if maybe_session_queue is not None: | ||
maybe_session_queue.put_nowait(payload) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Please do it like this, but with enums
else: | |
assert code == "PUSH", f"unexpected code: {code}" | |
maybe_session_queue = self._session_queues.get(session_id) | |
if maybe_session_queue is not None: | |
maybe_session_queue.put_nowait(payload) | |
elif code == "PUSH": | |
maybe_session_queue = self._session_queues.get(session_id) | |
if maybe_session_queue is not None: | |
maybe_session_queue.put_nowait(payload) | |
else: | |
raise RuntimeError(f"Unexpected code: {code}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
applied
# Conflicts: # src/petals/server/handler.py
src/petals/server/handler.py
Outdated
self._handler_queues = handler_queues | ||
self._handler_index = handler_index | ||
self._own_queue = handler_queues[handler_index] | ||
self._listener_task: Optional[asyncio.Task] = None | ||
self._session_queues: Dict[str, asyncio.Queue] = dict() | ||
self._session_handlers: Dict[str, int] = dict() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
self._handler_queues = handler_queues | |
self._handler_index = handler_index | |
self._own_queue = handler_queues[handler_index] | |
self._listener_task: Optional[asyncio.Task] = None | |
self._session_queues: Dict[str, asyncio.Queue] = dict() | |
self._session_handlers: Dict[str, int] = dict() | |
self._handler_event_queues = handler_event_queues | |
self._handler_index = handler_index | |
self._own_event_queue = handler_event_queues[handler_index] | |
self._listener_task: Optional[asyncio.Task] = None | |
self._session_queues: Dict[str, asyncio.Queue] = dict() | |
self._session_handlers: Dict[str, int] = dict() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
renamed
src/petals/server/handler.py
Outdated
if handler_index is None: | ||
logger.debug(f"Ignored rpc_push to unknown session ID: {session_id}") | ||
elif handler_index == self._handler_index: | ||
await self._session_queues[session_id].put(request) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
await self._session_queues[session_id].put(request) | |
self._session_queues[session_id].put_nowait(request) |
Thus, this functon doesn't need to be async
- please make it a usual func
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
src/petals/server/handler.py
Outdated
self._session_queues: Dict[str, asyncio.Queue] = dict() | ||
self._session_handlers: Dict[str, int] = dict() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
self._session_queues: Dict[str, asyncio.Queue] = dict() | |
self._session_handlers: Dict[str, int] = dict() | |
self._session_queues: Dict[str, asyncio.Queue] = {} | |
self._session_handlers: Dict[str, int] = {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
src/petals/server/handler.py
Outdated
other_queue.put_nowait((Event.NEW_SESSION, session_id, self._handler_index)) | ||
yield | ||
finally: | ||
await self._session_queues.pop(session_id).put(None) # put None so that the get task will not hang |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
await self._session_queues.pop(session_id).put(None) # put None so that the get task will not hang | |
self._session_queues.pop(session_id).put_nowait(None) # put None so that the get task will not hang |
The contextmanager doesn't have to be async, please make it a usual contextmanager
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've valdated this on local and geo-distributed swarms, seems to work.
This code removes the memory leak from somewhere within handler.py that has something to do with mp SyncManger.
Tested properties:
testcorrect inference outputs
test that pushes are processed by the correct handler
test against memory leaks with the leak notebook
overnight memory leak test
make sure rebalancing is handled correctly (needs @borzunov )
After review: