From fdb35281762add00ec1801be2f356a6972ae506e Mon Sep 17 00:00:00 2001 From: jason Date: Fri, 31 Oct 2025 20:16:06 +0800 Subject: [PATCH 1/4] Heart add layers return --- src/backend/server/rpc_connection_handler.py | 4 +++- src/parallax/p2p/server.py | 19 ++++++++++++++++++- 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/src/backend/server/rpc_connection_handler.py b/src/backend/server/rpc_connection_handler.py index 0288f26c..10353537 100644 --- a/src/backend/server/rpc_connection_handler.py +++ b/src/backend/server/rpc_connection_handler.py @@ -80,7 +80,9 @@ def node_update(self, message): new_rtt_to_nodes=node.rtt_to_nodes, is_active=node.is_active, ) - return {} + # Return current layer allocation to node + layer_allocation = self.get_layer_allocation(node.node_id) + return layer_allocation if layer_allocation else {} except Exception as e: logger.exception(f"node_update error: {e}") return {} diff --git a/src/parallax/p2p/server.py b/src/parallax/p2p/server.py index d373820e..c78add4d 100644 --- a/src/parallax/p2p/server.py +++ b/src/parallax/p2p/server.py @@ -564,7 +564,24 @@ def _announcer_thread(): # Announce the range ID try: if self.scheduler_peer_id is not None: - self.scheduler_stub.node_update(self.get_node_info(is_update=True)) + response_future = self.scheduler_stub.node_update(self.get_node_info(is_update=True)) + # Get the response result + response = response_future.result(timeout=30) if hasattr(response_future, 'result') else response_future + + # Print layer allocation information + if response and isinstance(response, dict): + start_layer = response.get("start_layer") + end_layer = response.get("end_layer") + model_name = response.get("model_name") + if start_layer is not None and end_layer is not None: + logger.info( + f"Heartbeat: Node {self.lattica.peer_id()}... " + f"Model: {model_name}, Layers: [{start_layer}, {end_layer})" + ) + else: + logger.debug(f"Heartbeat response: {response}") + else: + logger.debug(f"Heartbeat: No layer allocation received yet") else: self.lattica.store( key=self.prefix_id, From 5ec876301058b365b7e98456276d3d7b763b0f00 Mon Sep 17 00:00:00 2001 From: jason Date: Fri, 31 Oct 2025 20:16:06 +0800 Subject: [PATCH 2/4] feat(p2p): add layer allocation logging in heartbeat - Return layer allocation in node_update RPC response - Log layer information (start_layer, end_layer, model_name) in worker heartbeat - Helps with debugging and monitoring node layer assignments --- src/backend/server/rpc_connection_handler.py | 4 +++- src/parallax/p2p/server.py | 19 ++++++++++++++++++- 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/src/backend/server/rpc_connection_handler.py b/src/backend/server/rpc_connection_handler.py index 0288f26c..10353537 100644 --- a/src/backend/server/rpc_connection_handler.py +++ b/src/backend/server/rpc_connection_handler.py @@ -80,7 +80,9 @@ def node_update(self, message): new_rtt_to_nodes=node.rtt_to_nodes, is_active=node.is_active, ) - return {} + # Return current layer allocation to node + layer_allocation = self.get_layer_allocation(node.node_id) + return layer_allocation if layer_allocation else {} except Exception as e: logger.exception(f"node_update error: {e}") return {} diff --git a/src/parallax/p2p/server.py b/src/parallax/p2p/server.py index d373820e..c78add4d 100644 --- a/src/parallax/p2p/server.py +++ b/src/parallax/p2p/server.py @@ -564,7 +564,24 @@ def _announcer_thread(): # Announce the range ID try: if self.scheduler_peer_id is not None: - self.scheduler_stub.node_update(self.get_node_info(is_update=True)) + response_future = self.scheduler_stub.node_update(self.get_node_info(is_update=True)) + # Get the response result + response = response_future.result(timeout=30) if hasattr(response_future, 'result') else response_future + + # Print layer allocation information + if response and isinstance(response, dict): + start_layer = response.get("start_layer") + end_layer = response.get("end_layer") + model_name = response.get("model_name") + if start_layer is not None and end_layer is not None: + logger.info( + f"Heartbeat: Node {self.lattica.peer_id()}... " + f"Model: {model_name}, Layers: [{start_layer}, {end_layer})" + ) + else: + logger.debug(f"Heartbeat response: {response}") + else: + logger.debug(f"Heartbeat: No layer allocation received yet") else: self.lattica.store( key=self.prefix_id, From 43478bb6822f9856d9a0ff6a394c2334001c5844 Mon Sep 17 00:00:00 2001 From: jason Date: Fri, 31 Oct 2025 22:09:47 +0800 Subject: [PATCH 3/4] feat(p2p): add layer allocation logging in heartbeat --- src/parallax/p2p/server.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/src/parallax/p2p/server.py b/src/parallax/p2p/server.py index c78add4d..807286f5 100644 --- a/src/parallax/p2p/server.py +++ b/src/parallax/p2p/server.py @@ -564,10 +564,16 @@ def _announcer_thread(): # Announce the range ID try: if self.scheduler_peer_id is not None: - response_future = self.scheduler_stub.node_update(self.get_node_info(is_update=True)) + response_future = self.scheduler_stub.node_update( + self.get_node_info(is_update=True) + ) # Get the response result - response = response_future.result(timeout=30) if hasattr(response_future, 'result') else response_future - + response = ( + response_future.result(timeout=30) + if hasattr(response_future, "result") + else response_future + ) + # Print layer allocation information if response and isinstance(response, dict): start_layer = response.get("start_layer") From df009e5933d49489f4c582b684586651abbbc089 Mon Sep 17 00:00:00 2001 From: jason Date: Mon, 3 Nov 2025 13:04:20 +0800 Subject: [PATCH 4/4] change after gufeng review --- src/backend/server/rpc_connection_handler.py | 2 +- src/parallax/p2p/server.py | 8 +++++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/src/backend/server/rpc_connection_handler.py b/src/backend/server/rpc_connection_handler.py index 10353537..9c7801a8 100644 --- a/src/backend/server/rpc_connection_handler.py +++ b/src/backend/server/rpc_connection_handler.py @@ -82,7 +82,7 @@ def node_update(self, message): ) # Return current layer allocation to node layer_allocation = self.get_layer_allocation(node.node_id) - return layer_allocation if layer_allocation else {} + return layer_allocation except Exception as e: logger.exception(f"node_update error: {e}") return {} diff --git a/src/parallax/p2p/server.py b/src/parallax/p2p/server.py index 807286f5..54533e49 100644 --- a/src/parallax/p2p/server.py +++ b/src/parallax/p2p/server.py @@ -580,14 +580,16 @@ def _announcer_thread(): end_layer = response.get("end_layer") model_name = response.get("model_name") if start_layer is not None and end_layer is not None: - logger.info( + logger.debug( f"Heartbeat: Node {self.lattica.peer_id()}... " f"Model: {model_name}, Layers: [{start_layer}, {end_layer})" ) else: - logger.debug(f"Heartbeat response: {response}") + logger.warning(f"Heartbeat response: {response}") else: - logger.debug(f"Heartbeat: No layer allocation received yet") + logger.warning( + f"Heartbeat: No layer allocation received yet, response: {response}" + ) else: self.lattica.store( key=self.prefix_id,