From 14ceebfc8d6c55e6dfc137d4f48ce643b9bf2a0f Mon Sep 17 00:00:00 2001 From: gufengc Date: Fri, 10 Oct 2025 15:30:45 +0800 Subject: [PATCH 01/12] feat(scheduler): sending chat complete request through p2p --- src/backend/server/request_handler.py | 31 +++++++++++++------------- src/backend/server/scheduler_manage.py | 8 +++++++ src/parallax/p2p/server.py | 28 ++++++++++++++++++++++- 3 files changed, 50 insertions(+), 17 deletions(-) diff --git a/src/backend/server/request_handler.py b/src/backend/server/request_handler.py index dde860d6..6355a529 100644 --- a/src/backend/server/request_handler.py +++ b/src/backend/server/request_handler.py @@ -1,4 +1,6 @@ from typing import Dict +import time +import json import aiohttp from fastapi import HTTPException @@ -98,8 +100,12 @@ async def _forward_request( content={"error": "Call url not found of peer id: " + routing_table[0]}, status_code=500, ) - + url = call_url + endpoint + + stub = self.scheduler_manage.completion_handler.get_stub(routing_table[0]) + logger.info(f"get stub for {routing_table[0]}: {dir(stub)}") + is_stream = request_data.get("stream", False) logger.debug(f"POST upstream: url={url}, stream={is_stream}") @@ -114,15 +120,9 @@ async def _process_upstream_response(response: aiohttp.ClientResponse): raise HTTPException(status_code=response.status, detail=error_msg) if is_stream: - - async def stream_generator(): - async with aiohttp.ClientSession(timeout=AIOHTTP_TIMEOUT) as session: - async with session.post(url, json=request_data) as response: - await _process_upstream_response(response) - - async for chunk in response.content: - if chunk: - yield chunk + def stream_generator(): + for chunk in stub.chat_completion(request_data): + yield chunk resp = StreamingResponse( stream_generator(), @@ -135,12 +135,11 @@ async def stream_generator(): logger.debug(f"Streaming response initiated for {request_id}") return resp else: - async with aiohttp.ClientSession(timeout=AIOHTTP_TIMEOUT) as session: - async with session.post(url, json=request_data) as response: - await _process_upstream_response(response) - result = await response.json() - logger.debug(f"Non-stream response completed for {request_id}") - return JSONResponse(content=result) + response = stub.chat_completion(request_data) + response = next(response).decode() + logger.info(f"Non-stream response completed for {request_id}, response: {response}") + # response is a JSON string; parse to Python object before returning + return JSONResponse(content=json.loads(response)) async def v1_completions(self, request_data: Dict, request_id: str, received_ts: int): return await self._forward_request("/v1/completions", request_data, request_id, received_ts) diff --git a/src/backend/server/scheduler_manage.py b/src/backend/server/scheduler_manage.py index 56907358..a2796f37 100644 --- a/src/backend/server/scheduler_manage.py +++ b/src/backend/server/scheduler_manage.py @@ -11,6 +11,7 @@ get_model_info, get_node_join_command, ) +from parallax.p2p.server import TransformerConnectionHandler from parallax_utils.logging_config import get_logger from scheduling.node import RequestSignal from scheduling.scheduler import Scheduler @@ -63,6 +64,13 @@ def run(self, model_name, init_nodes_num, is_local_network=True): self._start_scheduler(model_name, init_nodes_num) self._start_lattica() + self.completion_handler = TransformerConnectionHandler( + lattica=self.lattica, + recv_from_peer_addr="", + send_to_peer_addr="", + block_start_index=0, + block_end_index=1, + ) def is_running(self): """ diff --git a/src/parallax/p2p/server.py b/src/parallax/p2p/server.py index 9102d572..bceef8e7 100644 --- a/src/parallax/p2p/server.py +++ b/src/parallax/p2p/server.py @@ -10,14 +10,16 @@ import dataclasses import enum import logging +import json import threading import time from typing import List, Optional import dijkstar import httpx +from fastapi import HTTPException import zmq -from lattica import ConnectionHandler, Lattica, rpc_method, rpc_stream +from lattica import ConnectionHandler, Lattica, rpc_method, rpc_stream, rpc_stream_iter from backend.server.rpc_connection_handler import RPCConnectionHandler from parallax.p2p.proto import forward_pb2 @@ -153,6 +155,30 @@ def rpc_abort( logger.exception(f"Error in rpc_abort: {e}") return forward_pb2.AbortResponse() + @rpc_stream_iter + def chat_completion( + self, + request, + ): + """Handle chat completion request""" + logger.info(f"Chat completion request: {request}, type: {type(request)}") + try: + if request.get('stream', False): + logger.info("Stream request") + with httpx.Client(timeout=20 * 60 * 60) as client: + with client.stream("POST", "http://localhost:3000/v1/chat/completions", json=request) as response: + for chunk in response.iter_bytes(): + if chunk: + yield chunk + else: + logger.info("Non-stream request") + with httpx.Client(timeout=20 * 60 * 60) as client: + response = client.post("http://localhost:3000/v1/chat/completions", json=request).json() + logger.info(f"response: {response}, type: {type(response)}") + yield json.dumps(response).encode() + except Exception as e: + logger.exception(f"Error in chat completion: {e}") + class GradientServer: """ From 545d0273ff4ab438e28b4e3800a9323c92348d39 Mon Sep 17 00:00:00 2001 From: gufengc Date: Fri, 10 Oct 2025 15:49:37 +0800 Subject: [PATCH 02/12] update --- src/backend/server/request_handler.py | 45 +++++--------------- src/backend/server/rpc_connection_handler.py | 14 ------ src/backend/server/scheduler_manage.py | 2 +- src/parallax/launch.py | 4 +- src/parallax/p2p/server.py | 8 ++-- src/parallax/server/server_args.py | 5 --- 6 files changed, 18 insertions(+), 60 deletions(-) diff --git a/src/backend/server/request_handler.py b/src/backend/server/request_handler.py index 6355a529..f078dff8 100644 --- a/src/backend/server/request_handler.py +++ b/src/backend/server/request_handler.py @@ -28,15 +28,21 @@ class RequestHandler: def __init__(self): self.scheduler_manage = None + self.stubs = {} def set_scheduler_manage(self, scheduler_manage): self.scheduler_manage = scheduler_manage + def get_stub(self, node_id): + if node_id not in self.stubs: + self.stubs[node_id] = self.scheduler_manage.completion_handler.get_stub(node_id) + return self.stubs[node_id] + async def _forward_request( - self, endpoint: str, request_data: Dict, request_id: str, received_ts: int + self, request_data: Dict, request_id: str, received_ts: int ): logger.debug( - f"Forwarding request {request_id} to endpoint {endpoint}; stream={request_data.get('stream', False)}" + f"Forwarding request {request_id}; stream={request_data.get('stream', False)}" ) if ( self.scheduler_manage is None @@ -90,34 +96,8 @@ async def _forward_request( ) request_data["routing_table"] = routing_table - call_url = self.scheduler_manage.get_call_url_by_node_id(routing_table[0]) - logger.debug( - f"Resolved call_url for request {request_id}: node={routing_table[0]} -> {call_url}" - ) - - if not call_url: - return JSONResponse( - content={"error": "Call url not found of peer id: " + routing_table[0]}, - status_code=500, - ) - - url = call_url + endpoint - - stub = self.scheduler_manage.completion_handler.get_stub(routing_table[0]) - logger.info(f"get stub for {routing_table[0]}: {dir(stub)}") - + stub = self.get_stub(routing_table[0]) is_stream = request_data.get("stream", False) - logger.debug(f"POST upstream: url={url}, stream={is_stream}") - - async def _process_upstream_response(response: aiohttp.ClientResponse): - logger.debug(f"post: {request_id}, code: {response.status}, params: {request_data}") - if response.status != 200: - error_text = await response.text() - error_msg = ( - f"Upstream service returned status {response.status}, response: {error_text}" - ) - logger.error(f"completions error: {error_msg}, request_id: {request_id}") - raise HTTPException(status_code=response.status, detail=error_msg) if is_stream: def stream_generator(): @@ -141,10 +121,5 @@ def stream_generator(): # response is a JSON string; parse to Python object before returning return JSONResponse(content=json.loads(response)) - async def v1_completions(self, request_data: Dict, request_id: str, received_ts: int): - return await self._forward_request("/v1/completions", request_data, request_id, received_ts) - async def v1_chat_completions(self, request_data: Dict, request_id: str, received_ts: int): - return await self._forward_request( - "/v1/chat/completions", request_data, request_id, received_ts - ) + return await self._forward_request(request_data, request_id, received_ts) diff --git a/src/backend/server/rpc_connection_handler.py b/src/backend/server/rpc_connection_handler.py index ef61909c..50185d86 100644 --- a/src/backend/server/rpc_connection_handler.py +++ b/src/backend/server/rpc_connection_handler.py @@ -23,12 +23,10 @@ def __init__( # Initialize the base class super().__init__(lattica) self.scheduler = scheduler - self.call_url_map = {} @rpc_stream def node_join(self, message): # node = { - # "http_port": "8000", # "node_id": "lattica peer id", # "hardware": { # "node_id": "lattica peer id", @@ -44,14 +42,6 @@ def node_join(self, message): logger.info(f"receive node_join request: {message}") try: node = self.build_node(message) - - try: - node_ip = self.lattica_instance.get_peer_addresses(node.node_id)[0].split("/")[2] - logger.info(f"get ip for {node.node_id}: {node_ip}") - except Exception as e: - logger.warning(f"Failed to get ip for {node.node_id}: {e}, using 127.0.0.1") - node_ip = "127.0.0.1" - self.call_url_map[node.node_id] = f"http://{node_ip}:{message.get('http_port')}" self.scheduler.enqueue_join(node) response = self.wait_layer_allocation(node.node_id, wait_seconds=300) @@ -67,7 +57,6 @@ def node_leave(self, message): try: node = self.build_node(message) self.scheduler.enqueue_leave(node.node_id) - self.call_url_map.pop(node.node_id) return {} except Exception as e: logger.exception(f"node_leave error: {e}") @@ -148,6 +137,3 @@ def build_hardware(self, hardware_json): memory_gb=memory_gb, memory_bandwidth_gbps=memory_bandwidth_gbps, ) - - def get_call_url_by_node_id(self, node_id): - return self.call_url_map.get(node_id, None) diff --git a/src/backend/server/scheduler_manage.py b/src/backend/server/scheduler_manage.py index a2796f37..3a59744a 100644 --- a/src/backend/server/scheduler_manage.py +++ b/src/backend/server/scheduler_manage.py @@ -69,7 +69,7 @@ def run(self, model_name, init_nodes_num, is_local_network=True): recv_from_peer_addr="", send_to_peer_addr="", block_start_index=0, - block_end_index=1, + block_end_index=1 ) def is_running(self): diff --git a/src/parallax/launch.py b/src/parallax/launch.py index 64bc9552..eb5c4743 100644 --- a/src/parallax/launch.py +++ b/src/parallax/launch.py @@ -77,7 +77,7 @@ dht_prefix=args.dht_prefix, host_maddrs=args.host_maddrs, announce_maddrs=args.announce_maddrs, - http_port=args.port if args.announce_http_port is None else args.announce_http_port, + http_port=args.port, notify_url=args.notify_url, recv_from_peer_addr=args.recv_from_peer_addr, send_to_peer_addr=args.send_to_peer_addr, @@ -97,7 +97,7 @@ dht_prefix=args.dht_prefix, host_maddrs=args.host_maddrs, announce_maddrs=args.announce_maddrs, - http_port=args.port if args.announce_http_port is None else args.announce_http_port, + http_port=args.port, notify_url=args.notify_url, recv_from_peer_addr=args.recv_from_peer_addr, send_to_peer_addr=args.send_to_peer_addr, diff --git a/src/parallax/p2p/server.py b/src/parallax/p2p/server.py index bceef8e7..9f7e14c9 100644 --- a/src/parallax/p2p/server.py +++ b/src/parallax/p2p/server.py @@ -107,6 +107,7 @@ def __init__( send_to_peer_addr: str, block_start_index: int, block_end_index: int, + http_port: Optional[int] = None, notify_url: Optional[str] = None, ): # Initialize the base class @@ -115,6 +116,7 @@ def __init__( self.send_to_peer_addr = send_to_peer_addr self.block_start_index = block_start_index self.block_end_index = block_end_index + self.http_port = http_port self.notify_url = notify_url self._recv_from_peer = None self._recv_from_peer_lock = threading.Lock() @@ -166,14 +168,14 @@ def chat_completion( if request.get('stream', False): logger.info("Stream request") with httpx.Client(timeout=20 * 60 * 60) as client: - with client.stream("POST", "http://localhost:3000/v1/chat/completions", json=request) as response: + with client.stream("POST", f"http://localhost:{self.http_port}/v1/chat/completions", json=request) as response: for chunk in response.iter_bytes(): if chunk: yield chunk else: logger.info("Non-stream request") with httpx.Client(timeout=20 * 60 * 60) as client: - response = client.post("http://localhost:3000/v1/chat/completions", json=request).json() + response = client.post(f"http://localhost:{self.http_port}/v1/chat/completions", json=request).json() logger.info(f"response: {response}, type: {type(response)}") yield json.dumps(response).encode() except Exception as e: @@ -339,6 +341,7 @@ def _publish_metrics(_snapshot): send_to_peer_addr=self.send_to_peer_addr, block_start_index=self.block_start_index, block_end_index=self.block_end_index, + http_port=self.http_port, notify_url=self.notify_url, ) # thread @@ -603,7 +606,6 @@ def get_node_info(self, is_update: bool = False): self.rtt_last_update = time.time() info = { - "http_port": f"{self.http_port}", "node_id": self.lattica.peer_id(), "hardware": detect_node_hardware(self.lattica.peer_id()), "kv_cache_ratio": 0.25, diff --git a/src/parallax/server/server_args.py b/src/parallax/server/server_args.py index 20585e1d..8d0e1839 100644 --- a/src/parallax/server/server_args.py +++ b/src/parallax/server/server_args.py @@ -27,13 +27,8 @@ def parse_args() -> argparse.Namespace: # HTTP server configuration parser.add_argument("--host", type=str, default="localhost", help="Host of the HTTP server.") - parser.add_argument("--port", type=int, default=3000, help="Port of the HTTP server") - parser.add_argument( - "--announce-http-port", type=str, default=None, help="HTTP port to announce" - ) - # P2P configuration parser.add_argument("--initial-peers", nargs="+", default=[], help="List of initial DHT peers") From 062dd1b866d7e23ccb503bd9071fcf828ad9534d Mon Sep 17 00:00:00 2001 From: gufengc Date: Fri, 10 Oct 2025 15:50:10 +0800 Subject: [PATCH 03/12] update --- src/backend/server/request_handler.py | 13 ++++--------- src/backend/server/scheduler_manage.py | 2 +- src/parallax/p2p/server.py | 15 ++++++++++----- 3 files changed, 15 insertions(+), 15 deletions(-) diff --git a/src/backend/server/request_handler.py b/src/backend/server/request_handler.py index f078dff8..6fd4309b 100644 --- a/src/backend/server/request_handler.py +++ b/src/backend/server/request_handler.py @@ -1,9 +1,7 @@ -from typing import Dict -import time import json +from typing import Dict import aiohttp -from fastapi import HTTPException from fastapi.responses import JSONResponse, StreamingResponse from backend.server.constants import NODE_STATUS_AVAILABLE @@ -38,12 +36,8 @@ def get_stub(self, node_id): self.stubs[node_id] = self.scheduler_manage.completion_handler.get_stub(node_id) return self.stubs[node_id] - async def _forward_request( - self, request_data: Dict, request_id: str, received_ts: int - ): - logger.debug( - f"Forwarding request {request_id}; stream={request_data.get('stream', False)}" - ) + async def _forward_request(self, request_data: Dict, request_id: str, received_ts: int): + logger.debug(f"Forwarding request {request_id}; stream={request_data.get('stream', False)}") if ( self.scheduler_manage is None or not self.scheduler_manage.get_schedule_status() == NODE_STATUS_AVAILABLE @@ -100,6 +94,7 @@ async def _forward_request( is_stream = request_data.get("stream", False) if is_stream: + def stream_generator(): for chunk in stub.chat_completion(request_data): yield chunk diff --git a/src/backend/server/scheduler_manage.py b/src/backend/server/scheduler_manage.py index 3a59744a..a2796f37 100644 --- a/src/backend/server/scheduler_manage.py +++ b/src/backend/server/scheduler_manage.py @@ -69,7 +69,7 @@ def run(self, model_name, init_nodes_num, is_local_network=True): recv_from_peer_addr="", send_to_peer_addr="", block_start_index=0, - block_end_index=1 + block_end_index=1, ) def is_running(self): diff --git a/src/parallax/p2p/server.py b/src/parallax/p2p/server.py index 9f7e14c9..6a6c1a75 100644 --- a/src/parallax/p2p/server.py +++ b/src/parallax/p2p/server.py @@ -9,15 +9,14 @@ import dataclasses import enum -import logging import json +import logging import threading import time from typing import List, Optional import dijkstar import httpx -from fastapi import HTTPException import zmq from lattica import ConnectionHandler, Lattica, rpc_method, rpc_stream, rpc_stream_iter @@ -165,17 +164,23 @@ def chat_completion( """Handle chat completion request""" logger.info(f"Chat completion request: {request}, type: {type(request)}") try: - if request.get('stream', False): + if request.get("stream", False): logger.info("Stream request") with httpx.Client(timeout=20 * 60 * 60) as client: - with client.stream("POST", f"http://localhost:{self.http_port}/v1/chat/completions", json=request) as response: + with client.stream( + "POST", + f"http://localhost:{self.http_port}/v1/chat/completions", + json=request, + ) as response: for chunk in response.iter_bytes(): if chunk: yield chunk else: logger.info("Non-stream request") with httpx.Client(timeout=20 * 60 * 60) as client: - response = client.post(f"http://localhost:{self.http_port}/v1/chat/completions", json=request).json() + response = client.post( + f"http://localhost:{self.http_port}/v1/chat/completions", json=request + ).json() logger.info(f"response: {response}, type: {type(response)}") yield json.dumps(response).encode() except Exception as e: From 4710297425c4e77dd29fbefa4e31e5f400187c03 Mon Sep 17 00:00:00 2001 From: gufengc Date: Fri, 10 Oct 2025 16:38:42 +0800 Subject: [PATCH 04/12] fix relay server address --- src/parallax/cli.py | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/src/parallax/cli.py b/src/parallax/cli.py index 5873d44f..d3cf105b 100644 --- a/src/parallax/cli.py +++ b/src/parallax/cli.py @@ -71,7 +71,15 @@ def run_command(args): cmd.extend( [ "--relay-servers", - "/dns4/relay-lattica.gradient.network/udp/18080/quic-v1/p2p/12D3KooWDaqDAsFupYvffBDxjHHuWmEAJE4sMDCXiuZiB8aG8rjf /dns4/relay-lattica.gradient.network/tcp/18080/p2p/12D3KooWDaqDAsFupYvffBDxjHHuWmEAJE4sMDCXiuZiB8aG8rjf", + "/ip4/3.1.132.169/udp/18080/quic-v1/p2p/12D3KooWLX7MWuzi1Txa5LyZS4eTQ2tPaJijheH8faHggB9SxnBu /ip4/3.1.132.169/tcp/18080/p2p/12D3KooWLX7MWuzi1Txa5LyZS4eTQ2tPaJijheH8faHggB9SxnBu", + # "/dns4/relay-lattica.gradient.network/udp/18080/quic-v1/p2p/12D3KooWDaqDAsFupYvffBDxjHHuWmEAJE4sMDCXiuZiB8aG8rjf /dns4/relay-lattica.gradient.network/tcp/18080/p2p/12D3KooWDaqDAsFupYvffBDxjHHuWmEAJE4sMDCXiuZiB8aG8rjf", + ] + ) + cmd.extend( + [ + "--initial-peers", + "/ip4/3.1.132.169/udp/19090/quic-v1/p2p/12D3KooWABDc2ksv41TG2Yj6N3gEsQepyUNitqaGgEkD9Gu5Kcwb /ip4/3.1.132.169/tcp/19090/p2p/12D3KooWABDc2ksv41TG2Yj6N3gEsQepyUNitqaGgEkD9Gu5Kcwb", + # "/dns4/bootstrap-lattica.gradient.network/udp/18080/quic-v1/p2p/12D3KooWJHXvu8TWkFn6hmSwaxdCLy4ZzFwr4u5mvF9Fe2rMmFXb /dns4/bootstrap-lattica.gradient.network/tcp/18080/p2p/12D3KooWJHXvu8TWkFn6hmSwaxdCLy4ZzFwr4u5mvF9Fe2rMmFXb", ] ) @@ -146,16 +154,19 @@ def join_command(args): if args.use_relay or ( args.scheduler_addr != "auto" and not str(args.scheduler_addr).startswith("/") ): + logger.info("Using public relay servers") cmd.extend( [ "--relay-servers", - "/dns4/relay-lattica.gradient.network/udp/18080/quic-v1/p2p/12D3KooWDaqDAsFupYvffBDxjHHuWmEAJE4sMDCXiuZiB8aG8rjf /dns4/relay-lattica.gradient.network/tcp/18080/p2p/12D3KooWDaqDAsFupYvffBDxjHHuWmEAJE4sMDCXiuZiB8aG8rjf", + "/ip4/3.1.132.169/udp/18080/quic-v1/p2p/12D3KooWLX7MWuzi1Txa5LyZS4eTQ2tPaJijheH8faHggB9SxnBu /ip4/3.1.132.169/tcp/18080/p2p/12D3KooWLX7MWuzi1Txa5LyZS4eTQ2tPaJijheH8faHggB9SxnBu", + # "/dns4/relay-lattica.gradient.network/udp/18080/quic-v1/p2p/12D3KooWDaqDAsFupYvffBDxjHHuWmEAJE4sMDCXiuZiB8aG8rjf /dns4/relay-lattica.gradient.network/tcp/18080/p2p/12D3KooWDaqDAsFupYvffBDxjHHuWmEAJE4sMDCXiuZiB8aG8rjf", ] ) cmd.extend( [ "--initial-peers", - "/dns4/bootstrap-lattica.gradient.network/udp/18080/quic-v1/p2p/12D3KooWJHXvu8TWkFn6hmSwaxdCLy4ZzFwr4u5mvF9Fe2rMmFXb /dns4/bootstrap-lattica.gradient.network/tcp/18080/p2p/12D3KooWJHXvu8TWkFn6hmSwaxdCLy4ZzFwr4u5mvF9Fe2rMmFXb", + "/ip4/3.1.132.169/udp/19090/quic-v1/p2p/12D3KooWABDc2ksv41TG2Yj6N3gEsQepyUNitqaGgEkD9Gu5Kcwb /ip4/3.1.132.169/tcp/19090/p2p/12D3KooWABDc2ksv41TG2Yj6N3gEsQepyUNitqaGgEkD9Gu5Kcwb", + # "/dns4/bootstrap-lattica.gradient.network/udp/18080/quic-v1/p2p/12D3KooWJHXvu8TWkFn6hmSwaxdCLy4ZzFwr4u5mvF9Fe2rMmFXb /dns4/bootstrap-lattica.gradient.network/tcp/18080/p2p/12D3KooWJHXvu8TWkFn6hmSwaxdCLy4ZzFwr4u5mvF9Fe2rMmFXb", ] ) @@ -248,3 +259,4 @@ def main(): if __name__ == "__main__": main() + From 16a5cab3cf6b9972cc4d3d1462ab5b17c41dbd2b Mon Sep 17 00:00:00 2001 From: gufengc Date: Fri, 10 Oct 2025 17:20:28 +0800 Subject: [PATCH 05/12] update --- src/parallax/cli.py | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/src/parallax/cli.py b/src/parallax/cli.py index d3cf105b..24c93e49 100644 --- a/src/parallax/cli.py +++ b/src/parallax/cli.py @@ -71,15 +71,19 @@ def run_command(args): cmd.extend( [ "--relay-servers", - "/ip4/3.1.132.169/udp/18080/quic-v1/p2p/12D3KooWLX7MWuzi1Txa5LyZS4eTQ2tPaJijheH8faHggB9SxnBu /ip4/3.1.132.169/tcp/18080/p2p/12D3KooWLX7MWuzi1Txa5LyZS4eTQ2tPaJijheH8faHggB9SxnBu", - # "/dns4/relay-lattica.gradient.network/udp/18080/quic-v1/p2p/12D3KooWDaqDAsFupYvffBDxjHHuWmEAJE4sMDCXiuZiB8aG8rjf /dns4/relay-lattica.gradient.network/tcp/18080/p2p/12D3KooWDaqDAsFupYvffBDxjHHuWmEAJE4sMDCXiuZiB8aG8rjf", + # "/ip4/3.1.132.169/udp/18080/quic-v1/p2p/12D3KooWLX7MWuzi1Txa5LyZS4eTQ2tPaJijheH8faHggB9SxnBu", + # "/ip4/3.1.132.169/tcp/18080/p2p/12D3KooWLX7MWuzi1Txa5LyZS4eTQ2tPaJijheH8faHggB9SxnBu", + "/dns4/relay-lattica.gradient.network/udp/18080/quic-v1/p2p/12D3KooWDaqDAsFupYvffBDxjHHuWmEAJE4sMDCXiuZiB8aG8rjf", + "/dns4/relay-lattica.gradient.network/tcp/18080/p2p/12D3KooWDaqDAsFupYvffBDxjHHuWmEAJE4sMDCXiuZiB8aG8rjf", ] ) cmd.extend( [ "--initial-peers", - "/ip4/3.1.132.169/udp/19090/quic-v1/p2p/12D3KooWABDc2ksv41TG2Yj6N3gEsQepyUNitqaGgEkD9Gu5Kcwb /ip4/3.1.132.169/tcp/19090/p2p/12D3KooWABDc2ksv41TG2Yj6N3gEsQepyUNitqaGgEkD9Gu5Kcwb", - # "/dns4/bootstrap-lattica.gradient.network/udp/18080/quic-v1/p2p/12D3KooWJHXvu8TWkFn6hmSwaxdCLy4ZzFwr4u5mvF9Fe2rMmFXb /dns4/bootstrap-lattica.gradient.network/tcp/18080/p2p/12D3KooWJHXvu8TWkFn6hmSwaxdCLy4ZzFwr4u5mvF9Fe2rMmFXb", + # "/ip4/3.1.132.169/udp/19090/quic-v1/p2p/12D3KooWABDc2ksv41TG2Yj6N3gEsQepyUNitqaGgEkD9Gu5Kcwb", + # "/ip4/3.1.132.169/tcp/19090/p2p/12D3KooWABDc2ksv41TG2Yj6N3gEsQepyUNitqaGgEkD9Gu5Kcwb", + "/dns4/bootstrap-lattica.gradient.network/udp/18080/quic-v1/p2p/12D3KooWJHXvu8TWkFn6hmSwaxdCLy4ZzFwr4u5mvF9Fe2rMmFXb", + "/dns4/bootstrap-lattica.gradient.network/tcp/18080/p2p/12D3KooWJHXvu8TWkFn6hmSwaxdCLy4ZzFwr4u5mvF9Fe2rMmFXb", ] ) @@ -158,15 +162,19 @@ def join_command(args): cmd.extend( [ "--relay-servers", - "/ip4/3.1.132.169/udp/18080/quic-v1/p2p/12D3KooWLX7MWuzi1Txa5LyZS4eTQ2tPaJijheH8faHggB9SxnBu /ip4/3.1.132.169/tcp/18080/p2p/12D3KooWLX7MWuzi1Txa5LyZS4eTQ2tPaJijheH8faHggB9SxnBu", - # "/dns4/relay-lattica.gradient.network/udp/18080/quic-v1/p2p/12D3KooWDaqDAsFupYvffBDxjHHuWmEAJE4sMDCXiuZiB8aG8rjf /dns4/relay-lattica.gradient.network/tcp/18080/p2p/12D3KooWDaqDAsFupYvffBDxjHHuWmEAJE4sMDCXiuZiB8aG8rjf", + # "/ip4/3.1.132.169/udp/18080/quic-v1/p2p/12D3KooWLX7MWuzi1Txa5LyZS4eTQ2tPaJijheH8faHggB9SxnBu", + # "/ip4/3.1.132.169/tcp/18080/p2p/12D3KooWLX7MWuzi1Txa5LyZS4eTQ2tPaJijheH8faHggB9SxnBu", + "/dns4/relay-lattica.gradient.network/udp/18080/quic-v1/p2p/12D3KooWDaqDAsFupYvffBDxjHHuWmEAJE4sMDCXiuZiB8aG8rjf", + "/dns4/relay-lattica.gradient.network/tcp/18080/p2p/12D3KooWDaqDAsFupYvffBDxjHHuWmEAJE4sMDCXiuZiB8aG8rjf", ] ) cmd.extend( [ "--initial-peers", - "/ip4/3.1.132.169/udp/19090/quic-v1/p2p/12D3KooWABDc2ksv41TG2Yj6N3gEsQepyUNitqaGgEkD9Gu5Kcwb /ip4/3.1.132.169/tcp/19090/p2p/12D3KooWABDc2ksv41TG2Yj6N3gEsQepyUNitqaGgEkD9Gu5Kcwb", - # "/dns4/bootstrap-lattica.gradient.network/udp/18080/quic-v1/p2p/12D3KooWJHXvu8TWkFn6hmSwaxdCLy4ZzFwr4u5mvF9Fe2rMmFXb /dns4/bootstrap-lattica.gradient.network/tcp/18080/p2p/12D3KooWJHXvu8TWkFn6hmSwaxdCLy4ZzFwr4u5mvF9Fe2rMmFXb", + # "/ip4/3.1.132.169/udp/19090/quic-v1/p2p/12D3KooWABDc2ksv41TG2Yj6N3gEsQepyUNitqaGgEkD9Gu5Kcwb", + # "/ip4/3.1.132.169/tcp/19090/p2p/12D3KooWABDc2ksv41TG2Yj6N3gEsQepyUNitqaGgEkD9Gu5Kcwb", + "/dns4/bootstrap-lattica.gradient.network/udp/18080/quic-v1/p2p/12D3KooWJHXvu8TWkFn6hmSwaxdCLy4ZzFwr4u5mvF9Fe2rMmFXb", + "/dns4/bootstrap-lattica.gradient.network/tcp/18080/p2p/12D3KooWJHXvu8TWkFn6hmSwaxdCLy4ZzFwr4u5mvF9Fe2rMmFXb", ] ) From 54f5990dea85905aead052cee2838ad8d3ae30e9 Mon Sep 17 00:00:00 2001 From: gufengc Date: Fri, 10 Oct 2025 17:29:22 +0800 Subject: [PATCH 06/12] update --- src/parallax/p2p/server.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/parallax/p2p/server.py b/src/parallax/p2p/server.py index 6a6c1a75..7b619f16 100644 --- a/src/parallax/p2p/server.py +++ b/src/parallax/p2p/server.py @@ -580,7 +580,7 @@ def get_node_info(self, is_update: bool = False): if time.time() - self.rtt_last_update > self.rtt_update_interval: self.rtts = {} all_peers = [] - for _ in range(1 if is_update else 30): + for _ in range(1 if is_update else 10): all_peers = self.lattica.get_all_peers() if len(all_peers) > 0 and self.scheduler_peer_id in all_peers: break @@ -614,7 +614,7 @@ def get_node_info(self, is_update: bool = False): "node_id": self.lattica.peer_id(), "hardware": detect_node_hardware(self.lattica.peer_id()), "kv_cache_ratio": 0.25, - "param_hosting_ratio": 0.65, + "param_hosting_ratio": 0.12, "max_concurrent_requests": self.max_batch_size, "max_sequence_length": ( 1024 if self.max_sequence_length is None else self.max_sequence_length From e1228efd5df2596e90a075f396e0927db47f699f Mon Sep 17 00:00:00 2001 From: gufengc Date: Fri, 10 Oct 2025 18:01:52 +0800 Subject: [PATCH 07/12] update --- src/parallax/p2p/server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/parallax/p2p/server.py b/src/parallax/p2p/server.py index 7b619f16..76866391 100644 --- a/src/parallax/p2p/server.py +++ b/src/parallax/p2p/server.py @@ -614,7 +614,7 @@ def get_node_info(self, is_update: bool = False): "node_id": self.lattica.peer_id(), "hardware": detect_node_hardware(self.lattica.peer_id()), "kv_cache_ratio": 0.25, - "param_hosting_ratio": 0.12, + "param_hosting_ratio": 0.65, "max_concurrent_requests": self.max_batch_size, "max_sequence_length": ( 1024 if self.max_sequence_length is None else self.max_sequence_length From 7753167c40aa79492c542ce8486247fda54e0f80 Mon Sep 17 00:00:00 2001 From: gufengc Date: Fri, 10 Oct 2025 18:05:57 +0800 Subject: [PATCH 08/12] update --- src/parallax/cli.py | 48 ++++++++++++--------------------------------- 1 file changed, 12 insertions(+), 36 deletions(-) diff --git a/src/parallax/cli.py b/src/parallax/cli.py index 24c93e49..39451364 100644 --- a/src/parallax/cli.py +++ b/src/parallax/cli.py @@ -41,6 +41,16 @@ def get_project_root(): return Path.cwd() +def get_relay_params(): + return [ + "--relay-servers", + "/dns4/relay-lattica.gradient.network/udp/18080/quic-v1/p2p/12D3KooWDaqDAsFupYvffBDxjHHuWmEAJE4sMDCXiuZiB8aG8rjf", + "/dns4/relay-lattica.gradient.network/tcp/18080/p2p/12D3KooWDaqDAsFupYvffBDxjHHuWmEAJE4sMDCXiuZiB8aG8rjf", + "--initial-peers", + "/dns4/bootstrap-lattica.gradient.network/udp/18080/quic-v1/p2p/12D3KooWJHXvu8TWkFn6hmSwaxdCLy4ZzFwr4u5mvF9Fe2rMmFXb", + "/dns4/bootstrap-lattica.gradient.network/tcp/18080/p2p/12D3KooWJHXvu8TWkFn6hmSwaxdCLy4ZzFwr4u5mvF9Fe2rMmFXb", + ] + def run_command(args): """Run the scheduler (equivalent to scripts/start.sh).""" check_python_version() @@ -68,24 +78,7 @@ def run_command(args): if args.init_nodes_num: cmd.extend(["--init-nodes-num", str(args.init_nodes_num)]) if args.use_relay: - cmd.extend( - [ - "--relay-servers", - # "/ip4/3.1.132.169/udp/18080/quic-v1/p2p/12D3KooWLX7MWuzi1Txa5LyZS4eTQ2tPaJijheH8faHggB9SxnBu", - # "/ip4/3.1.132.169/tcp/18080/p2p/12D3KooWLX7MWuzi1Txa5LyZS4eTQ2tPaJijheH8faHggB9SxnBu", - "/dns4/relay-lattica.gradient.network/udp/18080/quic-v1/p2p/12D3KooWDaqDAsFupYvffBDxjHHuWmEAJE4sMDCXiuZiB8aG8rjf", - "/dns4/relay-lattica.gradient.network/tcp/18080/p2p/12D3KooWDaqDAsFupYvffBDxjHHuWmEAJE4sMDCXiuZiB8aG8rjf", - ] - ) - cmd.extend( - [ - "--initial-peers", - # "/ip4/3.1.132.169/udp/19090/quic-v1/p2p/12D3KooWABDc2ksv41TG2Yj6N3gEsQepyUNitqaGgEkD9Gu5Kcwb", - # "/ip4/3.1.132.169/tcp/19090/p2p/12D3KooWABDc2ksv41TG2Yj6N3gEsQepyUNitqaGgEkD9Gu5Kcwb", - "/dns4/bootstrap-lattica.gradient.network/udp/18080/quic-v1/p2p/12D3KooWJHXvu8TWkFn6hmSwaxdCLy4ZzFwr4u5mvF9Fe2rMmFXb", - "/dns4/bootstrap-lattica.gradient.network/tcp/18080/p2p/12D3KooWJHXvu8TWkFn6hmSwaxdCLy4ZzFwr4u5mvF9Fe2rMmFXb", - ] - ) + cmd.extend(get_relay_params()) logger.info(f"Running command: {' '.join(cmd)}") @@ -159,24 +152,7 @@ def join_command(args): args.scheduler_addr != "auto" and not str(args.scheduler_addr).startswith("/") ): logger.info("Using public relay servers") - cmd.extend( - [ - "--relay-servers", - # "/ip4/3.1.132.169/udp/18080/quic-v1/p2p/12D3KooWLX7MWuzi1Txa5LyZS4eTQ2tPaJijheH8faHggB9SxnBu", - # "/ip4/3.1.132.169/tcp/18080/p2p/12D3KooWLX7MWuzi1Txa5LyZS4eTQ2tPaJijheH8faHggB9SxnBu", - "/dns4/relay-lattica.gradient.network/udp/18080/quic-v1/p2p/12D3KooWDaqDAsFupYvffBDxjHHuWmEAJE4sMDCXiuZiB8aG8rjf", - "/dns4/relay-lattica.gradient.network/tcp/18080/p2p/12D3KooWDaqDAsFupYvffBDxjHHuWmEAJE4sMDCXiuZiB8aG8rjf", - ] - ) - cmd.extend( - [ - "--initial-peers", - # "/ip4/3.1.132.169/udp/19090/quic-v1/p2p/12D3KooWABDc2ksv41TG2Yj6N3gEsQepyUNitqaGgEkD9Gu5Kcwb", - # "/ip4/3.1.132.169/tcp/19090/p2p/12D3KooWABDc2ksv41TG2Yj6N3gEsQepyUNitqaGgEkD9Gu5Kcwb", - "/dns4/bootstrap-lattica.gradient.network/udp/18080/quic-v1/p2p/12D3KooWJHXvu8TWkFn6hmSwaxdCLy4ZzFwr4u5mvF9Fe2rMmFXb", - "/dns4/bootstrap-lattica.gradient.network/tcp/18080/p2p/12D3KooWJHXvu8TWkFn6hmSwaxdCLy4ZzFwr4u5mvF9Fe2rMmFXb", - ] - ) + cmd.extend(get_relay_params()) logger.info(f"Running command: {' '.join(cmd)}") logger.info(f"Scheduler address: {args.scheduler_addr}") From 72a7742c7838ba0f712777ae35aeb8c200d0b6db Mon Sep 17 00:00:00 2001 From: gufengc Date: Fri, 10 Oct 2025 18:07:28 +0800 Subject: [PATCH 09/12] update --- src/parallax/cli.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/parallax/cli.py b/src/parallax/cli.py index 39451364..de545cb5 100644 --- a/src/parallax/cli.py +++ b/src/parallax/cli.py @@ -51,6 +51,7 @@ def get_relay_params(): "/dns4/bootstrap-lattica.gradient.network/tcp/18080/p2p/12D3KooWJHXvu8TWkFn6hmSwaxdCLy4ZzFwr4u5mvF9Fe2rMmFXb", ] + def run_command(args): """Run the scheduler (equivalent to scripts/start.sh).""" check_python_version() @@ -243,4 +244,3 @@ def main(): if __name__ == "__main__": main() - From 3139e6bfa25f445ab45e576e4f4b10271e853dfe Mon Sep 17 00:00:00 2001 From: gufengc Date: Fri, 10 Oct 2025 18:10:32 +0800 Subject: [PATCH 10/12] update --- src/backend/server/request_handler.py | 3 +-- src/parallax/p2p/server.py | 5 +---- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/src/backend/server/request_handler.py b/src/backend/server/request_handler.py index 6fd4309b..68653ebb 100644 --- a/src/backend/server/request_handler.py +++ b/src/backend/server/request_handler.py @@ -94,7 +94,6 @@ async def _forward_request(self, request_data: Dict, request_id: str, received_t is_stream = request_data.get("stream", False) if is_stream: - def stream_generator(): for chunk in stub.chat_completion(request_data): yield chunk @@ -112,7 +111,7 @@ def stream_generator(): else: response = stub.chat_completion(request_data) response = next(response).decode() - logger.info(f"Non-stream response completed for {request_id}, response: {response}") + logger.debug(f"Non-stream response completed for {request_id}") # response is a JSON string; parse to Python object before returning return JSONResponse(content=json.loads(response)) diff --git a/src/parallax/p2p/server.py b/src/parallax/p2p/server.py index 76866391..77d0e00b 100644 --- a/src/parallax/p2p/server.py +++ b/src/parallax/p2p/server.py @@ -162,10 +162,9 @@ def chat_completion( request, ): """Handle chat completion request""" - logger.info(f"Chat completion request: {request}, type: {type(request)}") + logger.debug(f"Chat completion request: {request}, type: {type(request)}") try: if request.get("stream", False): - logger.info("Stream request") with httpx.Client(timeout=20 * 60 * 60) as client: with client.stream( "POST", @@ -176,12 +175,10 @@ def chat_completion( if chunk: yield chunk else: - logger.info("Non-stream request") with httpx.Client(timeout=20 * 60 * 60) as client: response = client.post( f"http://localhost:{self.http_port}/v1/chat/completions", json=request ).json() - logger.info(f"response: {response}, type: {type(response)}") yield json.dumps(response).encode() except Exception as e: logger.exception(f"Error in chat completion: {e}") From cc21c3b86bc6b88a1579a6c1b41aef8ba54409c5 Mon Sep 17 00:00:00 2001 From: gufengc Date: Fri, 10 Oct 2025 18:11:57 +0800 Subject: [PATCH 11/12] update --- src/backend/server/request_handler.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/backend/server/request_handler.py b/src/backend/server/request_handler.py index 68653ebb..36354bcb 100644 --- a/src/backend/server/request_handler.py +++ b/src/backend/server/request_handler.py @@ -94,6 +94,7 @@ async def _forward_request(self, request_data: Dict, request_id: str, received_t is_stream = request_data.get("stream", False) if is_stream: + def stream_generator(): for chunk in stub.chat_completion(request_data): yield chunk From a0b9313e8fa1815a0b6ca33ebca4fd5155c28ea9 Mon Sep 17 00:00:00 2001 From: gufengc Date: Fri, 10 Oct 2025 18:45:41 +0800 Subject: [PATCH 12/12] update --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 206e8790..854edbef 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,7 +30,7 @@ dependencies = [ "protobuf==6.31.1", "dijkstar==2.6.0", "huggingface-hub", - "lattica==1.0.2", + "lattica==1.0.3", ] [project.scripts]