From 3427349cf5c7b46340710252f6383f006509e71c Mon Sep 17 00:00:00 2001 From: jason Date: Mon, 3 Nov 2025 20:58:42 +0800 Subject: [PATCH 1/7] reload layers after layer change --- src/parallax/launch.py | 55 ++++++++++++++++++++++--- src/parallax/p2p/server.py | 21 ++++++++++ src/parallax/server/executor.py | 71 ++++++++++++++++++++++++++++----- 3 files changed, 132 insertions(+), 15 deletions(-) diff --git a/src/parallax/launch.py b/src/parallax/launch.py index d12c8948..02f0fb78 100644 --- a/src/parallax/launch.py +++ b/src/parallax/launch.py @@ -18,6 +18,8 @@ import tempfile import threading +from mlx_lm.utils import get_model_path, load_config + from common.version_check import check_latest_release from parallax.p2p.server import ServerState, launch_p2p_server from parallax.server.executor import Executor @@ -57,14 +59,17 @@ # only launch http server on head node if args.start_layer == 0: http_server_process = launch_http_server(args) - executor = Executor.create_from_args(args) + # Load config to get num_hidden_layers without creating executor + model_path = get_model_path(args.model_path)[0] + config = load_config(model_path) + hidden_layers = config.get("num_hidden_layers") launch_p2p_server( initial_peers=args.initial_peers, scheduler_addr=args.scheduler_addr, relay_servers=args.relay_servers, pp_start_layer=args.start_layer, pp_end_layer=args.end_layer, - hidden_layers=executor.config.get("num_hidden_layers"), + hidden_layers=hidden_layers, tcp_port=args.tcp_port, udp_port=args.udp_port, dht_prefix=args.dht_prefix, @@ -117,11 +122,49 @@ # only launch http server on head node if args.start_layer == 0: http_server_process = launch_http_server(args) - executor = Executor.create_from_args(args) - if gradient_server is not None: - gradient_server.status = ServerState.READY - executor.run_loop() + # Main execution loop with layer reallocation support + while True: + try: + executor = Executor.create_from_args(args, gradient_server=gradient_server) + if gradient_server is not None: + gradient_server.status = ServerState.READY + + executor.run_loop() + + # Check if layer allocation changed (executor exited due to reallocation) + if gradient_server is not None and gradient_server._layer_allocation_changed: + logger.warning( + "Layer allocation changed! Reloading executor with new layers..." + ) + executor.shutdown() + + # Update args with new layer allocation + args.start_layer = gradient_server.block_start_index + args.end_layer = gradient_server.block_end_index + if gradient_server.model_name: + args.model_path = gradient_server.model_name + + logger.info( + f"Creating new executor with layers [{args.start_layer}, {args.end_layer})" + ) + gradient_server._layer_allocation_changed = False + continue # Create new executor in next iteration + else: + break # Normal exit + except KeyboardInterrupt: + logger.debug("Received interrupt signal, shutting down...") + break + except Exception as e: + logger.exception(f"Executor error: {e}") + # If layer allocation changed, try to reload + if gradient_server is not None and gradient_server._layer_allocation_changed: + logger.info("Attempting to reload executor after error...") + if executor is not None: + executor.shutdown() + continue + else: + raise except KeyboardInterrupt: logger.debug("Received interrupt signal, shutting down...") except Exception as e: diff --git a/src/parallax/p2p/server.py b/src/parallax/p2p/server.py index 54533e49..4ab32647 100644 --- a/src/parallax/p2p/server.py +++ b/src/parallax/p2p/server.py @@ -247,6 +247,7 @@ def __init__( self.announcer = None self.connection_handler = None self.stop_event = threading.Event() + self._layer_allocation_changed = False def build_lattica(self): self.lattica = Lattica.builder().with_listen_addrs(self.host_maddrs) @@ -584,6 +585,26 @@ def _announcer_thread(): f"Heartbeat: Node {self.lattica.peer_id()}... " f"Model: {model_name}, Layers: [{start_layer}, {end_layer})" ) + # Check if layer allocation changed + if ( + start_layer != self.block_start_index + or end_layer != self.block_end_index + ): + logger.warning( + f"Layer allocation changed! " + f"Current: [{self.block_start_index}, {self.block_end_index}) -> " + f"New: [{start_layer}, {end_layer})" + ) + # Update layer allocation + self.block_start_index = start_layer + self.block_end_index = end_layer + if model_name: + self.model_name = model_name + # Set flag to trigger executor reload + self._layer_allocation_changed = True + logger.info( + "Layer allocation updated. Executor will reload on next check." + ) else: logger.warning(f"Heartbeat response: {response}") else: diff --git a/src/parallax/server/executor.py b/src/parallax/server/executor.py index 0819ea6d..32766a9c 100644 --- a/src/parallax/server/executor.py +++ b/src/parallax/server/executor.py @@ -97,6 +97,8 @@ def __init__( # GPU/SGLang Specialized Configs attention_backend: Optional[str] = "torch_native", moe_runner_backend: Optional[str] = "auto", + # Optional gradient server for layer reallocation detection + gradient_server: Optional[Any] = None, ): # Backend self.device = get_current_device() @@ -144,6 +146,10 @@ def __init__( self.finished_batch = [] self.start_layer = start_layer self.end_layer = end_layer + self._should_stop = False # Flag to gracefully stop the executor + self.gradient_server = ( + gradient_server # Reference to gradient server for layer reallocation detection + ) self.is_first_peer = start_layer == 0 self.is_last_peer = end_layer == self.config.get("num_hidden_layers") @@ -279,9 +285,9 @@ def __init__( ) @classmethod - def create_from_args(cls, args: argparse.Namespace): + def create_from_args(cls, args: argparse.Namespace, gradient_server=None): """Create executor from command line arguments.""" - return cls(**create_executor_config(args)) + return cls(**create_executor_config(args, gradient_server)) def recv_requests_from_http(self) -> List[Request]: """Receives requests from http frontend""" @@ -1144,7 +1150,8 @@ def run_loop(self): logger.debug( f"Executor for layers [{self.start_layer}, {self.end_layer}) starting run loop..." ) - while True: + self._should_stop = False + while not self._should_stop: # 1. Ingest new requests from the http frontend if self.is_first_peer: http_requests = self.recv_requests_from_http() @@ -1161,6 +1168,14 @@ def run_loop(self): ) self.finished_batch = [] + # Check for layer reallocation signal (before batch processing) + if self.gradient_server is not None and self.gradient_server._layer_allocation_changed: + logger.info( + "Layer reallocation detected. Stopping executor to reload with new layers." + ) + self._should_stop = True + break + # 4. Admit requests into running set up to capacity, then form batch self.scheduler.admit_requests() # 4.1 Check for request timeouts and abort timed out requests @@ -1249,15 +1264,52 @@ def run_loop_in_background(self): def shutdown(self): """Shuts down the executor.""" logger.debug("Executor shutting down...") - self.recv_from_peer_socket.close() - self.send_to_peer_socket.close() - self.recv_from_ipc_socket.close() - self.send_to_ipc_socket.close() - self.zmq_context.term() + self._should_stop = True + import time + + time.sleep(0.1) # Give run_loop a moment to exit gracefully + + # Clean up all pending requests in scheduler + if hasattr(self, "scheduler") and self.scheduler: + try: + all_requests = [req for _, _, _, req in self.scheduler._request_queue] + list( + self.scheduler._running_requests.values() + ) + for req in all_requests: + try: + self.scheduler.evict_request(req.request_id, RequestStatus.CANCELLED) + except Exception: + pass + except Exception: + pass + + # Close all sockets and terminate context + sockets = [ + getattr(self, attr, None) + for attr in [ + "recv_from_peer_socket", + "send_to_peer_socket", + "recv_from_ipc_socket", + "send_to_ipc_socket", + ] + ] + for socket in sockets: + try: + if socket: + socket.close() + except Exception: + pass + + if hasattr(self, "zmq_context") and self.zmq_context: + try: + self.zmq_context.term() + except Exception: + pass + logger.debug("Executor shutdown complete.") -def create_executor_config(args: argparse.Namespace): +def create_executor_config(args: argparse.Namespace, gradient_server=None): """Create executor configuration from command line arguments.""" config = { @@ -1280,5 +1332,6 @@ def create_executor_config(args: argparse.Namespace): "executor_output_ipc_addr": args.executor_output_ipc, "attention_backend": args.attention_backend, "moe_runner_backend": args.moe_runner_backend, + "gradient_server": gradient_server, } return config From 55a74d51f4e52dfc2d48a65d92e63b871d826fbb Mon Sep 17 00:00:00 2001 From: jason Date: Mon, 3 Nov 2025 21:04:14 +0800 Subject: [PATCH 2/7] small change --- src/parallax/server/executor.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/parallax/server/executor.py b/src/parallax/server/executor.py index 32766a9c..34e93cc1 100644 --- a/src/parallax/server/executor.py +++ b/src/parallax/server/executor.py @@ -147,9 +147,8 @@ def __init__( self.start_layer = start_layer self.end_layer = end_layer self._should_stop = False # Flag to gracefully stop the executor - self.gradient_server = ( - gradient_server # Reference to gradient server for layer reallocation detection - ) + # Reference to gradient server for layer reallocation detection + self.gradient_server = gradient_server self.is_first_peer = start_layer == 0 self.is_last_peer = end_layer == self.config.get("num_hidden_layers") From 384c52f1d636c01ad836f775961b0a6bbe46f883 Mon Sep 17 00:00:00 2001 From: jason Date: Mon, 3 Nov 2025 22:41:07 +0800 Subject: [PATCH 3/7] shutdown more readable --- src/parallax/server/executor.py | 54 ++++++++++++--------------------- 1 file changed, 19 insertions(+), 35 deletions(-) diff --git a/src/parallax/server/executor.py b/src/parallax/server/executor.py index 34e93cc1..dead8b84 100644 --- a/src/parallax/server/executor.py +++ b/src/parallax/server/executor.py @@ -1268,42 +1268,26 @@ def shutdown(self): time.sleep(0.1) # Give run_loop a moment to exit gracefully - # Clean up all pending requests in scheduler - if hasattr(self, "scheduler") and self.scheduler: - try: - all_requests = [req for _, _, _, req in self.scheduler._request_queue] + list( - self.scheduler._running_requests.values() - ) - for req in all_requests: - try: - self.scheduler.evict_request(req.request_id, RequestStatus.CANCELLED) - except Exception: - pass - except Exception: - pass - - # Close all sockets and terminate context - sockets = [ - getattr(self, attr, None) - for attr in [ - "recv_from_peer_socket", - "send_to_peer_socket", - "recv_from_ipc_socket", - "send_to_ipc_socket", - ] - ] - for socket in sockets: - try: - if socket: - socket.close() - except Exception: - pass + try: + all_requests = [req for _, _, _, req in self.scheduler._request_queue] + list( + self.scheduler._running_requests.values() + ) + for req in all_requests: + try: + self.scheduler.evict_request(req.request_id, RequestStatus.CANCELLED) + except Exception: + pass + except Exception: + pass - if hasattr(self, "zmq_context") and self.zmq_context: - try: - self.zmq_context.term() - except Exception: - pass + try: + self.recv_from_peer_socket.close() + self.send_to_peer_socket.close() + self.recv_from_ipc_socket.close() + self.send_to_ipc_socket.close() + self.zmq_context.term() + except Exception as e: + logger.debug(f"Error closing sockets (may already be closed): {e}") logger.debug("Executor shutdown complete.") From a92f3b8b0c35a22509b80a26e78bf9bac88e73fd Mon Sep 17 00:00:00 2001 From: jason Date: Tue, 4 Nov 2025 20:10:46 +0800 Subject: [PATCH 4/7] add http_server stop or restart --- src/parallax/launch.py | 23 ++++++++--------------- src/parallax/server/http_server.py | 25 +++++++++++++++++++++++++ 2 files changed, 33 insertions(+), 15 deletions(-) diff --git a/src/parallax/launch.py b/src/parallax/launch.py index 02f0fb78..8de110f5 100644 --- a/src/parallax/launch.py +++ b/src/parallax/launch.py @@ -23,7 +23,7 @@ from common.version_check import check_latest_release from parallax.p2p.server import ServerState, launch_p2p_server from parallax.server.executor import Executor -from parallax.server.http_server import launch_http_server +from parallax.server.http_server import launch_http_server, stop_http_server from parallax.server.server_args import parse_args from parallax_utils.ascii_anime import display_parallax_join from parallax_utils.logging_config import get_logger, set_log_level @@ -139,6 +139,11 @@ ) executor.shutdown() + if args.start_layer == 0: + stop_http_server(http_server_process) + if gradient_server.block_start_index == 0: + http_server_process = launch_http_server(args) + # Update args with new layer allocation args.start_layer = gradient_server.block_start_index args.end_layer = gradient_server.block_end_index @@ -172,20 +177,8 @@ finally: t = None if http_server_process is not None: - - def terminate_http_server_process(process): - logger.debug("Terminating HTTP server process...") - try: - process.kill() - process.join() - except Exception as e: - logger.error(f"Failed to terminate HTTP server process: {e}") - - if http_server_process is not None: - t = threading.Thread( - target=terminate_http_server_process, args=(http_server_process,) - ) - t.start() + t = threading.Thread(target=stop_http_server, args=(http_server_process,)) + t.start() if gradient_server is not None: gradient_server.shutdown() if executor is not None: diff --git a/src/parallax/server/http_server.py b/src/parallax/server/http_server.py index fd815688..b8d354c7 100644 --- a/src/parallax/server/http_server.py +++ b/src/parallax/server/http_server.py @@ -472,3 +472,28 @@ def launch_http_server(args): process = mp.Process(target=http_server.run) process.start() return process + + +def stop_http_server(http_server_process): + """ + Stop HTTP server process if it exists. + """ + if http_server_process is not None: + logger.info("Stopping HTTP server process...") + try: + http_server_process.kill() + http_server_process.join() + except Exception as e: + logger.error(f"Failed to terminate HTTP server process: {e}") + return None + return http_server_process + + +def restart_http_server(args, http_server_process): + """ + Restart HTTP server with new args. + Stops the old server if it exists and starts a new one. + """ + http_server_process = stop_http_server(http_server_process) + logger.info("Restarting HTTP server...") + return launch_http_server(args) From 023bf56428dd87fa5f52957570f8a68e25819e08 Mon Sep 17 00:00:00 2001 From: jason Date: Tue, 4 Nov 2025 20:19:49 +0800 Subject: [PATCH 5/7] not receive request when layer changed --- src/parallax/p2p/server.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/parallax/p2p/server.py b/src/parallax/p2p/server.py index 4ab32647..56e7babc 100644 --- a/src/parallax/p2p/server.py +++ b/src/parallax/p2p/server.py @@ -602,8 +602,12 @@ def _announcer_thread(): self.model_name = model_name # Set flag to trigger executor reload self._layer_allocation_changed = True + # Set status to INITIALIZING to prevent scheduler from sending requests + # during rebalancing + self.status = ServerState.INITIALIZING logger.info( - "Layer allocation updated. Executor will reload on next check." + "Layer allocation updated. Executor will reload on next check. " + "Status set to INITIALIZING to prevent new requests." ) else: logger.warning(f"Heartbeat response: {response}") From bf537cdd358a7e53cdd5ca61ac1211948a889d84 Mon Sep 17 00:00:00 2001 From: jason Date: Tue, 4 Nov 2025 20:50:41 +0800 Subject: [PATCH 6/7] back code , if no scheduler --- src/parallax/launch.py | 108 +++++++++++++++++++++++------------------ 1 file changed, 60 insertions(+), 48 deletions(-) diff --git a/src/parallax/launch.py b/src/parallax/launch.py index 8de110f5..a26e9ebb 100644 --- a/src/parallax/launch.py +++ b/src/parallax/launch.py @@ -18,8 +18,6 @@ import tempfile import threading -from mlx_lm.utils import get_model_path, load_config - from common.version_check import check_latest_release from parallax.p2p.server import ServerState, launch_p2p_server from parallax.server.executor import Executor @@ -59,17 +57,14 @@ # only launch http server on head node if args.start_layer == 0: http_server_process = launch_http_server(args) - # Load config to get num_hidden_layers without creating executor - model_path = get_model_path(args.model_path)[0] - config = load_config(model_path) - hidden_layers = config.get("num_hidden_layers") + executor = Executor.create_from_args(args) launch_p2p_server( initial_peers=args.initial_peers, scheduler_addr=args.scheduler_addr, relay_servers=args.relay_servers, pp_start_layer=args.start_layer, pp_end_layer=args.end_layer, - hidden_layers=hidden_layers, + hidden_layers=executor.config.get("num_hidden_layers"), tcp_port=args.tcp_port, udp_port=args.udp_port, dht_prefix=args.dht_prefix, @@ -84,6 +79,9 @@ param_hosting_ratio=args.param_hosting_ratio, kv_cache_ratio=args.kv_cache_ratio, ) + if gradient_server is not None: + gradient_server.status = ServerState.READY + executor.run_loop() else: gradient_server = launch_p2p_server( initial_peers=args.initial_peers, @@ -123,53 +121,67 @@ if args.start_layer == 0: http_server_process = launch_http_server(args) - # Main execution loop with layer reallocation support - while True: - try: - executor = Executor.create_from_args(args, gradient_server=gradient_server) - if gradient_server is not None: - gradient_server.status = ServerState.READY + # Main execution loop with layer reallocation support + while True: + try: + executor = Executor.create_from_args(args, gradient_server=gradient_server) + if gradient_server is not None: + gradient_server.status = ServerState.READY - executor.run_loop() + executor.run_loop() - # Check if layer allocation changed (executor exited due to reallocation) - if gradient_server is not None and gradient_server._layer_allocation_changed: - logger.warning( - "Layer allocation changed! Reloading executor with new layers..." - ) - executor.shutdown() + # Check if layer allocation changed (executor exited due to reallocation) + if gradient_server is not None and gradient_server._layer_allocation_changed: + logger.warning( + "Layer allocation changed! Reloading executor with new layers..." + ) + executor.shutdown() - if args.start_layer == 0: - stop_http_server(http_server_process) + if args.start_layer == 0: + stop_http_server(http_server_process) + http_server_process = None if gradient_server.block_start_index == 0: http_server_process = launch_http_server(args) - # Update args with new layer allocation - args.start_layer = gradient_server.block_start_index - args.end_layer = gradient_server.block_end_index - if gradient_server.model_name: - args.model_path = gradient_server.model_name - - logger.info( - f"Creating new executor with layers [{args.start_layer}, {args.end_layer})" - ) - gradient_server._layer_allocation_changed = False - continue # Create new executor in next iteration - else: - break # Normal exit - except KeyboardInterrupt: - logger.debug("Received interrupt signal, shutting down...") - break - except Exception as e: - logger.exception(f"Executor error: {e}") - # If layer allocation changed, try to reload - if gradient_server is not None and gradient_server._layer_allocation_changed: - logger.info("Attempting to reload executor after error...") - if executor is not None: - executor.shutdown() - continue - else: - raise + # Update args with new layer allocation + args.start_layer = gradient_server.block_start_index + args.end_layer = gradient_server.block_end_index + if gradient_server.model_name: + args.model_path = gradient_server.model_name + + logger.info( + f"Creating new executor with layers [{args.start_layer}, {args.end_layer})" + ) + + # Restart http_server if restarting worker includes layer 0 + if args.start_layer == 0: + # Stop old http_server if exists + if old_start_layer == 0: + stop_http_server(http_server_process) + # Start new http_server + http_server_process = launch_http_server(args) + elif old_start_layer == 0 and args.start_layer != 0: + # Stop http_server if we no longer have layer 0 + stop_http_server(http_server_process) + http_server_process = None + + gradient_server._layer_allocation_changed = False + continue # Create new executor in next iteration + else: + break # Normal exit + except KeyboardInterrupt: + logger.debug("Received interrupt signal, shutting down...") + break + except Exception as e: + logger.exception(f"Executor error: {e}") + # If layer allocation changed, try to reload + if gradient_server is not None and gradient_server._layer_allocation_changed: + logger.info("Attempting to reload executor after error...") + if executor is not None: + executor.shutdown() + continue + else: + raise except KeyboardInterrupt: logger.debug("Received interrupt signal, shutting down...") except Exception as e: From fa29a1edbdbf601d599dfbbbe99685d5ebcd03dc Mon Sep 17 00:00:00 2001 From: jason Date: Tue, 4 Nov 2025 21:11:23 +0800 Subject: [PATCH 7/7] bug fix --- src/parallax/launch.py | 15 +-------------- 1 file changed, 1 insertion(+), 14 deletions(-) diff --git a/src/parallax/launch.py b/src/parallax/launch.py index a26e9ebb..f41f5133 100644 --- a/src/parallax/launch.py +++ b/src/parallax/launch.py @@ -138,8 +138,7 @@ executor.shutdown() if args.start_layer == 0: - stop_http_server(http_server_process) - http_server_process = None + http_server_process = stop_http_server(http_server_process) if gradient_server.block_start_index == 0: http_server_process = launch_http_server(args) @@ -153,18 +152,6 @@ f"Creating new executor with layers [{args.start_layer}, {args.end_layer})" ) - # Restart http_server if restarting worker includes layer 0 - if args.start_layer == 0: - # Stop old http_server if exists - if old_start_layer == 0: - stop_http_server(http_server_process) - # Start new http_server - http_server_process = launch_http_server(args) - elif old_start_layer == 0 and args.start_layer != 0: - # Stop http_server if we no longer have layer 0 - stop_http_server(http_server_process) - http_server_process = None - gradient_server._layer_allocation_changed = False continue # Create new executor in next iteration else: