From 8091069534732055c00e4fc6221c8b7de544ad35 Mon Sep 17 00:00:00 2001 From: gufengc Date: Fri, 17 Oct 2025 22:27:17 +0800 Subject: [PATCH 1/2] feat(lattica): specify tcp&udp port --- README.md | 2 -- src/backend/benchmark/benchmark_serving.py | 2 +- src/backend/main.py | 10 +--------- src/backend/server/server_args.py | 18 ++++-------------- src/parallax/cli.py | 2 -- src/parallax/launch.py | 15 +++++++-------- src/parallax/p2p/server.py | 17 ++++------------- src/parallax/server/server_args.py | 13 +++---------- 8 files changed, 20 insertions(+), 59 deletions(-) diff --git a/README.md b/README.md index f1a2f3e9..ee12ee12 100644 --- a/README.md +++ b/README.md @@ -208,7 +208,6 @@ An example of serving Qwen3-0.6B with 2-nodes: python3 ./parallax/src/parallax/launch.py \ --model-path Qwen/Qwen3-0.6B \ --port 3000 \ ---dht-port 5000 \ --max-batch-size 8 \ --start-layer 0 \ --end-layer 14 @@ -218,7 +217,6 @@ python3 ./parallax/src/parallax/launch.py \ python3 ./parallax/src/parallax/launch.py \ --model-path Qwen/Qwen3-0.6B \ --port 3000 \ ---dht-port 5000 \ --max-batch-size 8 \ --start-layer 14 \ --end-layer 28 diff --git a/src/backend/benchmark/benchmark_serving.py b/src/backend/benchmark/benchmark_serving.py index d905bad4..7f65e860 100644 --- a/src/backend/benchmark/benchmark_serving.py +++ b/src/backend/benchmark/benchmark_serving.py @@ -2,7 +2,7 @@ Adapted from vLLM: https://github.com/vllm-project/vllm/blob/v0.7.2/benchmarks/benchmark_serving.py On the server side (parallax scheduler with oAI API server), run - python src/backend/main.py --dht-port 31328 --port 31328 --init-nodes-num 1 + python src/backend/main.py --port 31328 --init-nodes-num 1 On the worker side (parallax worker nodes), 1. Get `scheduler-addr` get from scheduler launching output diff --git a/src/backend/main.py b/src/backend/main.py index 09702f13..c0c13e88 100644 --- a/src/backend/main.py +++ b/src/backend/main.py @@ -130,20 +130,12 @@ async def serve_index(): logger.info(f"args: {args}") if args.log_level != "DEBUG": display_parallax_run() - host_maddrs = args.host_maddrs - dht_port = args.dht_port - if args.dht_port is not None: - assert host_maddrs is None, "You can't use --dht-port and --host-maddrs at the same time" - else: - dht_port = 0 - if host_maddrs is None: - host_maddrs = [f"/ip4/0.0.0.0/tcp/{dht_port}", f"/ip6/::/tcp/{dht_port}"] scheduler_manage = SchedulerManage( initial_peers=args.initial_peers, relay_servers=args.relay_servers, dht_prefix=args.dht_prefix, - host_maddrs=host_maddrs, + host_maddrs=[f"/ip4/0.0.0.0/tcp/{args.tcp_port}", f"/ip4/0.0.0.0/udp/{args.udp_port}/quic-v1"], announce_maddrs=args.announce_maddrs, ) diff --git a/src/backend/server/server_args.py b/src/backend/server/server_args.py index 9624eb11..3117ef12 100644 --- a/src/backend/server/server_args.py +++ b/src/backend/server/server_args.py @@ -10,25 +10,18 @@ def parse_args() -> argparse.Namespace: formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) - # P2P configuration + # Lattica configuration parser.add_argument("--initial-peers", nargs="+", default=[], help="List of initial DHT peers") - parser.add_argument("--relay-servers", nargs="+", default=[], help="List of relay DHT peers") - parser.add_argument( "--announce-maddrs", nargs="+", default=[], help="List of multiaddresses to announce" ) - - parser.add_argument("--dht-port", type=int, default=None, help="Port for DHT communication") - - parser.add_argument("--host-maddrs", type=str, default=None, help="Multiaddress to host") - + parser.add_argument("--tcp-port", type=int, default=0, help="Port for Lattica TCP listening") + parser.add_argument("--udp-port", type=int, default=0, help="Port for Lattica UDP listening") parser.add_argument("--dht-prefix", type=str, default="gradient", help="Prefix for DHT keys") - parser.add_argument("--public-ip", type=str, default=None, help="Public IP address to announce") - + # Scheduler configuration parser.add_argument("--port", type=int, default=5000, help="Port to listen on") - parser.add_argument( "--log-level", type=str, @@ -36,11 +29,8 @@ def parse_args() -> argparse.Namespace: choices=["DEBUG", "INFO", "WARNING", "ERROR"], help="Log level", ) - parser.add_argument("--model-name", type=str, default=None, help="Model name") - parser.add_argument("--init-nodes-num", type=int, default=None, help="Number of initial nodes") - parser.add_argument( "--is-local-network", type=bool, default=True, help="Whether to use local network" ) diff --git a/src/parallax/cli.py b/src/parallax/cli.py index 18bb719a..51ee06ef 100644 --- a/src/parallax/cli.py +++ b/src/parallax/cli.py @@ -168,8 +168,6 @@ def run_command(args, passthrough_args: list[str] | None = None): # Build the command to run the backend main.py passthrough_args = passthrough_args or [] cmd = [sys.executable, str(backend_main)] - if not _flag_present(passthrough_args, ["--dht-port"]): - cmd.extend(["--dht-port", "5001"]) if not _flag_present(passthrough_args, ["--port"]): cmd.extend(["--port", "3001"]) diff --git a/src/parallax/launch.py b/src/parallax/launch.py index e4f321a3..de793b7a 100644 --- a/src/parallax/launch.py +++ b/src/parallax/launch.py @@ -6,12 +6,11 @@ Example command: python src/parallax/launch.py \ - --model-path Qwen/Qwen3-0.6B-MLX-bf16 \ + --model-path Qwen/Qwen3-0.6B \ --max-num-tokens-per-batch 16384 \ --max-batch-size 128 \ - --start-layer 14 \ - --end-layer 28 \ - --initial-peers {peer of GPU which hold the first half model} + --start-layer 0 \ + --end-layer 28 """ import multiprocessing @@ -82,9 +81,9 @@ pp_start_layer=args.start_layer, pp_end_layer=args.end_layer, hidden_layers=executor.config.get("num_hidden_layers"), - dht_port=args.dht_port, + tcp_port=args.tcp_port, + udp_port=args.udp_port, dht_prefix=args.dht_prefix, - host_maddrs=args.host_maddrs, announce_maddrs=args.announce_maddrs, http_port=args.port, notify_url=args.notify_url, @@ -102,9 +101,9 @@ pp_start_layer=None, pp_end_layer=None, hidden_layers=None, - dht_port=args.dht_port, + tcp_port=args.tcp_port, + udp_port=args.udp_port, dht_prefix=args.dht_prefix, - host_maddrs=args.host_maddrs, announce_maddrs=args.announce_maddrs, http_port=args.port, notify_url=args.notify_url, diff --git a/src/parallax/p2p/server.py b/src/parallax/p2p/server.py index 772a8a77..cc2c0f6f 100644 --- a/src/parallax/p2p/server.py +++ b/src/parallax/p2p/server.py @@ -354,8 +354,7 @@ def find_servers(self): """Find available servers in the DHT network""" # Find all announced blocks server_blocks = [] - block_announced_key = f"{self.dht_prefix}_announce" - block_servers = self.lattica.get(block_announced_key) + block_servers = self.lattica.get(self.prefix_id) if block_servers is None: return [] for peer_id, value in block_servers.value.items(): @@ -652,9 +651,9 @@ def launch_p2p_server( pp_start_layer: int, pp_end_layer: int, hidden_layers: int, - dht_port: Optional[int], + tcp_port: int, + udp_port: int, dht_prefix: str, - host_maddrs: Optional[List[str]], announce_maddrs: List[str], http_port: Optional[int], notify_url: str, @@ -664,14 +663,6 @@ def launch_p2p_server( max_batch_size: Optional[int] = None, max_sequence_length: Optional[int] = None, ): - if dht_port is not None: - assert host_maddrs is None, "You can't use --dht-port and --host-maddrs at the same time" - else: - dht_port = 0 - if host_maddrs is None: - host_maddrs = [f"/ip4/0.0.0.0/tcp/{dht_port}", f"/ip4/0.0.0.0/udp/{dht_port}/quic-v1"] - - # Run the server in a separate thread to keep the main thread free for event loop server = GradientServer( recv_from_peer_addr=recv_from_peer_addr, send_to_peer_addr=send_to_peer_addr, @@ -682,7 +673,7 @@ def launch_p2p_server( block_end_index=pp_end_layer, hidden_layers=hidden_layers, dht_prefix=dht_prefix, - host_maddrs=host_maddrs, + host_maddrs=[f"/ip4/0.0.0.0/tcp/{tcp_port}", f"/ip4/0.0.0.0/udp/{udp_port}/quic-v1"], announce_maddrs=announce_maddrs, http_port=http_port, notify_url=notify_url, diff --git a/src/parallax/server/server_args.py b/src/parallax/server/server_args.py index 7b8eead8..34c38412 100644 --- a/src/parallax/server/server_args.py +++ b/src/parallax/server/server_args.py @@ -29,23 +29,16 @@ def parse_args() -> argparse.Namespace: parser.add_argument("--host", type=str, default="localhost", help="Host of the HTTP server.") parser.add_argument("--port", type=int, default=3000, help="Port of the HTTP server") - # P2P configuration + # Lattica configuration parser.add_argument("--initial-peers", nargs="+", default=[], help="List of initial DHT peers") - parser.add_argument("--scheduler-addr", type=str, default=None, help="Scheduler address") - parser.add_argument("--relay-servers", nargs="+", default=[], help="List of relay DHT peers") - - parser.add_argument("--dht-port", type=int, default=None, help="Port for DHT communication") - - parser.add_argument("--host-maddrs", type=str, default=None, help="Multiaddress to host") - + parser.add_argument("--tcp-port", type=int, default=0, help="Port for Lattica TCP listening") + parser.add_argument("--udp-port", type=int, default=0, help="Port for Lattica UDP listening") parser.add_argument( "--announce-maddrs", nargs="+", default=[], help="List of multiaddresses to announce" ) - parser.add_argument("--dht-prefix", type=str, default="gradient", help="Prefix for DHT keys") - parser.add_argument( "--notify-url", type=str, default=None, help="URL to notify when a request is finished" ) From 617b7a43ae8278a32325856307bf9aa62be2a843 Mon Sep 17 00:00:00 2001 From: gufengc Date: Fri, 17 Oct 2025 22:32:10 +0800 Subject: [PATCH 2/2] update --- src/backend/main.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/backend/main.py b/src/backend/main.py index c0c13e88..9bda30d3 100644 --- a/src/backend/main.py +++ b/src/backend/main.py @@ -135,7 +135,10 @@ async def serve_index(): initial_peers=args.initial_peers, relay_servers=args.relay_servers, dht_prefix=args.dht_prefix, - host_maddrs=[f"/ip4/0.0.0.0/tcp/{args.tcp_port}", f"/ip4/0.0.0.0/udp/{args.udp_port}/quic-v1"], + host_maddrs=[ + f"/ip4/0.0.0.0/tcp/{args.tcp_port}", + f"/ip4/0.0.0.0/udp/{args.udp_port}/quic-v1", + ], announce_maddrs=args.announce_maddrs, )