-
Notifications
You must be signed in to change notification settings - Fork 477
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement direct server-to-server communication #331
Conversation
4b94939
to
9aebf0a
Compare
9aebf0a
to
234bafb
Compare
src/petals/server/handler.py
Outdated
@@ -47,6 +51,8 @@ def __init__( | |||
dht: DHT, | |||
module_backends: Dict[str, TransformerBackend], | |||
*, | |||
push_manager: SyncManager, | |||
session_pipes: Dict[str, Tuple[PipeConnection, threading.Lock]], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rpc_push()
may be received by a connection handler different from the one holding the inference session, so we use some multiprocess communication here.
src/petals/server/handler.py
Outdated
@@ -92,11 +101,20 @@ def _unpack(req: runtime_pb2.ExpertRequest) -> Iterable[runtime_pb2.Tensor]: | |||
assert isinstance(block_uid, str) and isinstance(metadata, dict) | |||
return block_uid, inputs, metadata | |||
|
|||
async def rpc_push(self, request: runtime_pb2.ExpertRequest, context: P2PContext) -> runtime_pb2.ExpertResponse: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO: This can be stream-to-unary handler, so that (a) the previous server doesn't have to make a new connection each time and (b) we don't have to parse metadata at this stage each time (now it's done to find session_id
). I'm not sure if it affects performance much though, so I'd postpone that to a later PR.
for attempt_no in itertools.count(): | ||
logger.debug(f"Inference: block {block_idx}, attempt {attempt_no}") | ||
span = None | ||
try: | ||
if not self._chosen_spans or not self._server_sessions or attempt_no >= 1: | ||
# If there is a failed server session, this code closes it |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code was moved to a separate method InferenceSession._update_sequence()
to simplify this method.
73a0563
to
49bbce4
Compare
0e5ce37
to
2445aa5
Compare
2445aa5
to
cef5662
Compare
7d73698
to
429146e
Compare
429146e
to
9a23c1e
Compare
bccdc21
to
845c172
Compare
845c172
to
0635968
Compare
c5dfba2
to
a637fd2
Compare
a637fd2
to
7e204d0
Compare
9427cae
to
5b3f180
Compare
inputs = self.history # Pass full inputs including prefix | ||
else: | ||
inputs = inputs[:, -n_input_tokens:] # No need to pass prefix further | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactored: This code was moved from InferenceSession.step()
to _ServerInferenceSession.step()
, since it's actually about one server only. The overall structure is more clear this way.
Implement #226.
TODO:
manager.Queue()
)next_servers