From 5ea36911e9b47fbe8fc76f4147e5dd44aad7ee1b Mon Sep 17 00:00:00 2001 From: gufengc Date: Mon, 29 Sep 2025 17:02:20 +0800 Subject: [PATCH 01/12] update --- src/backend/server/static_config.py | 2 +- src/parallax/p2p/server.py | 3 --- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/src/backend/server/static_config.py b/src/backend/server/static_config.py index 85427cab..bc1ba5a3 100644 --- a/src/backend/server/static_config.py +++ b/src/backend/server/static_config.py @@ -7,7 +7,7 @@ # Supported model list MODEL_LIST = [ "Qwen/Qwen3-0.6B", - # "Qwen/Qwen3-8B", + "Qwen/Qwen3-8B", # "Qwen/Qwen3-8B-FP8", "Qwen/Qwen3-32B", "Qwen/Qwen3-32B-FP8", diff --git a/src/parallax/p2p/server.py b/src/parallax/p2p/server.py index 508730ce..a4713a5c 100644 --- a/src/parallax/p2p/server.py +++ b/src/parallax/p2p/server.py @@ -180,9 +180,6 @@ def __init__( max_batch_size: Optional[int] = None, max_sequence_length: Optional[int] = None, ): - assert not ( - scheduler_addr is not None and len(initial_peers) > 0 - ), "scheduler_addr and initial_peers are not allowed at the same time" self.recv_from_peer_addr = recv_from_peer_addr self.send_to_peer_addr = send_to_peer_addr self.initial_peers = initial_peers From 646f6303a80ebca0103a2aa20213c0be863da7f4 Mon Sep 17 00:00:00 2001 From: gufengc Date: Tue, 30 Sep 2025 01:37:01 +0800 Subject: [PATCH 02/12] update --- src/backend/server/scheduler_manage.py | 2 +- src/parallax/p2p/server.py | 14 +++++++++----- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/src/backend/server/scheduler_manage.py b/src/backend/server/scheduler_manage.py index da3f3d60..6702ad11 100644 --- a/src/backend/server/scheduler_manage.py +++ b/src/backend/server/scheduler_manage.py @@ -134,7 +134,7 @@ def _start_lattica(self): if len(self.relay_servers) > 0: print(f"Using relay servers: {self.relay_servers}") - self.lattica.with_relay_servers(self.relay_servers).with_dcutr(True) + self.lattica.with_relay_servers(self.relay_servers).with_dcutr(True).with_protocol("") if len(self.announce_maddrs) > 0: print(f"Using announce maddrs: {self.announce_maddrs}") diff --git a/src/parallax/p2p/server.py b/src/parallax/p2p/server.py index a4713a5c..4a4f0486 100644 --- a/src/parallax/p2p/server.py +++ b/src/parallax/p2p/server.py @@ -217,9 +217,18 @@ def __init__( def build_lattica(self): self.lattica = Lattica.builder().with_listen_addrs(self.host_maddrs) + if self.scheduler_addr is not None and self.scheduler_addr != "auto": + if self.scheduler_addr.startswith("/"): + logger.info(f"Using scheduler addr: {self.scheduler_addr}") + self.lattica.with_bootstraps([self.scheduler_addr]) + self.scheduler_peer_id = self.scheduler_addr.split("/")[-1] + if len(self.relay_servers) > 0: logger.info(f"Using relay servers: {self.relay_servers}") self.lattica.with_relay_servers(self.relay_servers).with_dcutr(True) + if self.scheduler_peer_id is not None: + logger.info(f"Using protocol: /{self.scheduler_peer_id}") + self.lattica.with_protocol("/" + self.scheduler_peer_id) if len(self.announce_maddrs) > 0: logger.info(f"Using announce maddrs: {self.announce_maddrs}") @@ -229,11 +238,6 @@ def build_lattica(self): logger.info(f"Using initial peers: {self.initial_peers}") self.lattica.with_bootstraps(self.initial_peers) - if self.scheduler_addr is not None and self.scheduler_addr != "auto": - logger.info(f"Using scheduler addr: {self.scheduler_addr}") - self.lattica.with_bootstraps([self.scheduler_addr]) - self.scheduler_peer_id = self.scheduler_addr.split("/")[-1] - self.lattica.build() if self.scheduler_addr == "auto": From bd5be2391d05ea5975ce5d9481e2c6cd8a4a6119 Mon Sep 17 00:00:00 2001 From: gufengc Date: Tue, 30 Sep 2025 13:53:21 +0800 Subject: [PATCH 03/12] update --- src/backend/server/scheduler_manage.py | 6 +++--- src/parallax/p2p/server.py | 3 ++- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/src/backend/server/scheduler_manage.py b/src/backend/server/scheduler_manage.py index 6702ad11..413180c9 100644 --- a/src/backend/server/scheduler_manage.py +++ b/src/backend/server/scheduler_manage.py @@ -133,15 +133,15 @@ def _start_lattica(self): self.lattica = Lattica.builder().with_listen_addrs(self.host_maddrs).with_key_path(".") if len(self.relay_servers) > 0: - print(f"Using relay servers: {self.relay_servers}") + logger.info(f"Using relay servers: {self.relay_servers}") self.lattica.with_relay_servers(self.relay_servers).with_dcutr(True).with_protocol("") if len(self.announce_maddrs) > 0: - print(f"Using announce maddrs: {self.announce_maddrs}") + logger.info(f"Using announce maddrs: {self.announce_maddrs}") self.lattica.with_external_addrs(self.announce_maddrs) if len(self.initial_peers) > 0: - print(f"Using initial peers: {self.initial_peers}") + logger.info(f"Using initial peers: {self.initial_peers}") self.lattica.with_bootstraps(self.initial_peers) self.lattica.build() diff --git a/src/parallax/p2p/server.py b/src/parallax/p2p/server.py index 4a4f0486..6fe33976 100644 --- a/src/parallax/p2p/server.py +++ b/src/parallax/p2p/server.py @@ -569,7 +569,8 @@ def get_node_info(self, is_update: bool = False): "node_id": self.lattica.peer_id(), "hardware": detect_node_hardware(self.lattica.peer_id()), "kv_cache_ratio": 0.25, - "param_hosting_ratio": 0.65, + # temp change for testing + "param_hosting_ratio": 0.01, "max_concurrent_requests": self.max_batch_size, "max_sequence_length": ( 1024 if self.max_sequence_length is None else self.max_sequence_length From 13d198247b48f826935dfd764172d8d4cc8f4bcd Mon Sep 17 00:00:00 2001 From: gufengc Date: Tue, 30 Sep 2025 14:30:13 +0800 Subject: [PATCH 04/12] update --- src/backend/server/scheduler_manage.py | 29 ++++++++++++++++++-------- 1 file changed, 20 insertions(+), 9 deletions(-) diff --git a/src/backend/server/scheduler_manage.py b/src/backend/server/scheduler_manage.py index 413180c9..408e7926 100644 --- a/src/backend/server/scheduler_manage.py +++ b/src/backend/server/scheduler_manage.py @@ -146,15 +146,26 @@ def _start_lattica(self): self.lattica.build() logger.debug("Lattica node built") - - if self.lattica.store( - "scheduler_peer_id", - self.lattica.peer_id(), - expiration_time=time.time() + 365 * 24 * 60 * 60, - ): - logger.info(f"Stored scheduler peer id: {self.lattica.peer_id()}") - else: - logger.error("Failed to store scheduler peer id") + + store_success = False + for _ in range(10): + try: + if self.lattica.store( + "scheduler_peer_id", + self.lattica.peer_id(), + expiration_time=time.time() + 365 * 24 * 60 * 60, + ): + logger.info(f"Stored scheduler peer id: {self.lattica.peer_id()}") + store_success = True + break + logger.warning("Failed to store scheduler peer id, waiting for 10 seconds") + time.sleep(10) + except Exception as e: + logger.error(f"Failed to store scheduler peer id: {e}, waiting for 10 seconds") + time.sleep(10) + + if not store_success: + logger.error("Failed to store scheduler peer id, after 10 times") exit(1) self.connection_handler = RPCConnectionHandler( From 807c7ed88485938e5688311811cbace0b68b0c02 Mon Sep 17 00:00:00 2001 From: gufengc Date: Tue, 30 Sep 2025 15:07:04 +0800 Subject: [PATCH 05/12] update --- src/parallax/p2p/server.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/parallax/p2p/server.py b/src/parallax/p2p/server.py index 6fe33976..880ee299 100644 --- a/src/parallax/p2p/server.py +++ b/src/parallax/p2p/server.py @@ -273,6 +273,8 @@ def run(self): self.scheduler_stub = RPCConnectionHandler(self.lattica, None).get_stub( self.scheduler_peer_id ) + + time.sleep(10) response = self.scheduler_stub.node_join(self.get_node_info()) response = response.result(timeout=300) if response == {}: From 1a79c672fd1900f280b4599f6a8afaa6947d7e54 Mon Sep 17 00:00:00 2001 From: gufengc Date: Tue, 30 Sep 2025 15:25:07 +0800 Subject: [PATCH 06/12] update --- src/parallax/p2p/server.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/parallax/p2p/server.py b/src/parallax/p2p/server.py index 880ee299..3499c299 100644 --- a/src/parallax/p2p/server.py +++ b/src/parallax/p2p/server.py @@ -511,7 +511,7 @@ def _announcer_thread(): while not self.stop_event.is_set(): # Announce the range ID try: - if self.scheduler_addr is not None: + if self.scheduler_peer_id is not None: self.scheduler_stub.node_update(self.get_node_info(is_update=True)) else: self.lattica.store( @@ -572,7 +572,7 @@ def get_node_info(self, is_update: bool = False): "hardware": detect_node_hardware(self.lattica.peer_id()), "kv_cache_ratio": 0.25, # temp change for testing - "param_hosting_ratio": 0.01, + "param_hosting_ratio": 0.012, "max_concurrent_requests": self.max_batch_size, "max_sequence_length": ( 1024 if self.max_sequence_length is None else self.max_sequence_length From 6481b43d48662c89fae6f96e545e4cd6cd57ca72 Mon Sep 17 00:00:00 2001 From: gufengc Date: Tue, 30 Sep 2025 15:34:46 +0800 Subject: [PATCH 07/12] update --- src/parallax/p2p/server.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/parallax/p2p/server.py b/src/parallax/p2p/server.py index 3499c299..9d72ab96 100644 --- a/src/parallax/p2p/server.py +++ b/src/parallax/p2p/server.py @@ -274,7 +274,6 @@ def run(self): self.scheduler_peer_id ) - time.sleep(10) response = self.scheduler_stub.node_join(self.get_node_info()) response = response.result(timeout=300) if response == {}: @@ -543,9 +542,9 @@ def get_node_info(self, is_update: bool = False): all_peers = [] for _ in range(1 if is_update else 30): all_peers = self.lattica.get_all_peers() - if len(all_peers) > 0: + if len(all_peers) > 0 and self.scheduler_peer_id in all_peers: break - logger.warning("No peers found, waiting for 1 second.") + logger.warning("No peers found or scheduler peer id not found, waiting for 1 second.") time.sleep(1) if len(all_peers) == 0: From a7059052e81d14f6c2fa569cea856dac6af429ef Mon Sep 17 00:00:00 2001 From: gufengc Date: Tue, 30 Sep 2025 15:58:08 +0800 Subject: [PATCH 08/12] update --- src/parallax/p2p/server.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/src/parallax/p2p/server.py b/src/parallax/p2p/server.py index 9d72ab96..d731e047 100644 --- a/src/parallax/p2p/server.py +++ b/src/parallax/p2p/server.py @@ -273,8 +273,13 @@ def run(self): self.scheduler_stub = RPCConnectionHandler(self.lattica, None).get_stub( self.scheduler_peer_id ) - - response = self.scheduler_stub.node_join(self.get_node_info()) + node_info = self.get_node_info() + if node_info == {}: + logger.error("Failed to get node info, try again after 10 seconds") + self.lattica = None + time.sleep(10) + return self.run() + response = self.scheduler_stub.node_join(node_info) response = response.result(timeout=300) if response == {}: logger.error("Failed to join scheduler") @@ -547,8 +552,10 @@ def get_node_info(self, is_update: bool = False): logger.warning("No peers found or scheduler peer id not found, waiting for 1 second.") time.sleep(1) - if len(all_peers) == 0: - logger.warning("No peers found, send empty rtt_to_nodes.") + + if len(all_peers) == 0 or self.scheduler_peer_id not in all_peers: + logger.warning("No peers found or scheduler peer id not found, return empty node info.") + return {} for peer_id in all_peers: rtt = None From 30e5b0484250c1b90706657b7fb286a5da5f853e Mon Sep 17 00:00:00 2001 From: gufengc Date: Tue, 30 Sep 2025 16:14:13 +0800 Subject: [PATCH 09/12] update --- src/parallax/p2p/server.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/parallax/p2p/server.py b/src/parallax/p2p/server.py index d731e047..8f32ff3a 100644 --- a/src/parallax/p2p/server.py +++ b/src/parallax/p2p/server.py @@ -276,6 +276,7 @@ def run(self): node_info = self.get_node_info() if node_info == {}: logger.error("Failed to get node info, try again after 10 seconds") + del self.lattica self.lattica = None time.sleep(10) return self.run() From 01056e29d0d8cb0dd3a24a13bb709c8ed2b02ff9 Mon Sep 17 00:00:00 2001 From: gufengc Date: Tue, 30 Sep 2025 18:28:18 +0800 Subject: [PATCH 10/12] update --- src/parallax/p2p/server.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/parallax/p2p/server.py b/src/parallax/p2p/server.py index 8f32ff3a..db4440d5 100644 --- a/src/parallax/p2p/server.py +++ b/src/parallax/p2p/server.py @@ -578,8 +578,7 @@ def get_node_info(self, is_update: bool = False): "node_id": self.lattica.peer_id(), "hardware": detect_node_hardware(self.lattica.peer_id()), "kv_cache_ratio": 0.25, - # temp change for testing - "param_hosting_ratio": 0.012, + "param_hosting_ratio": 0.65, "max_concurrent_requests": self.max_batch_size, "max_sequence_length": ( 1024 if self.max_sequence_length is None else self.max_sequence_length From 3b146f462c414a97a6ea4718f73cb6d7b916b95a Mon Sep 17 00:00:00 2001 From: gufengc Date: Tue, 30 Sep 2025 19:42:09 +0800 Subject: [PATCH 11/12] update --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index e92a16ef..0fe9a48b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,7 +30,7 @@ dependencies = [ "protobuf==6.31.1", "dijkstar==2.6.0", "huggingface-hub", - "lattica==1.0.0", + "lattica==1.0.1", ] [project.scripts] From 4e2fca2f0f14e5d73caf42be72916f96ae2b47a1 Mon Sep 17 00:00:00 2001 From: gufengc Date: Tue, 30 Sep 2025 19:44:53 +0800 Subject: [PATCH 12/12] update --- src/backend/server/scheduler_manage.py | 2 +- src/parallax/p2p/server.py | 9 ++++++--- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/src/backend/server/scheduler_manage.py b/src/backend/server/scheduler_manage.py index 408e7926..b98f7700 100644 --- a/src/backend/server/scheduler_manage.py +++ b/src/backend/server/scheduler_manage.py @@ -146,7 +146,7 @@ def _start_lattica(self): self.lattica.build() logger.debug("Lattica node built") - + store_success = False for _ in range(10): try: diff --git a/src/parallax/p2p/server.py b/src/parallax/p2p/server.py index db4440d5..9102d572 100644 --- a/src/parallax/p2p/server.py +++ b/src/parallax/p2p/server.py @@ -550,12 +550,15 @@ def get_node_info(self, is_update: bool = False): all_peers = self.lattica.get_all_peers() if len(all_peers) > 0 and self.scheduler_peer_id in all_peers: break - logger.warning("No peers found or scheduler peer id not found, waiting for 1 second.") + logger.warning( + "No peers found or scheduler peer id not found, waiting for 1 second." + ) time.sleep(1) - if len(all_peers) == 0 or self.scheduler_peer_id not in all_peers: - logger.warning("No peers found or scheduler peer id not found, return empty node info.") + logger.warning( + "No peers found or scheduler peer id not found, return empty node info." + ) return {} for peer_id in all_peers: