From a4873aa876189462ddb022b061ef041b75d07d0c Mon Sep 17 00:00:00 2001 From: gufengc Date: Thu, 16 Oct 2025 12:03:15 +0800 Subject: [PATCH 1/6] update --- src/backend/server/request_handler.py | 61 +++++++++++++++++---------- src/parallax/p2p/server.py | 1 + src/parallax/sglang/model_runner.py | 2 +- src/parallax_utils/logging_config.py | 2 +- 4 files changed, 42 insertions(+), 24 deletions(-) diff --git a/src/backend/server/request_handler.py b/src/backend/server/request_handler.py index 36354bcb..5b413514 100644 --- a/src/backend/server/request_handler.py +++ b/src/backend/server/request_handler.py @@ -1,8 +1,10 @@ import json from typing import Dict +import asyncio import aiohttp from fastapi.responses import JSONResponse, StreamingResponse +from starlette.concurrency import iterate_in_threadpool from backend.server.constants import NODE_STATUS_AVAILABLE from parallax_utils.logging_config import get_logger @@ -92,29 +94,44 @@ async def _forward_request(self, request_data: Dict, request_id: str, received_t request_data["routing_table"] = routing_table stub = self.get_stub(routing_table[0]) is_stream = request_data.get("stream", False) - - if is_stream: - - def stream_generator(): - for chunk in stub.chat_completion(request_data): - yield chunk - - resp = StreamingResponse( - stream_generator(), - media_type="text/event-stream", - headers={ - "X-Content-Type-Options": "nosniff", - "Cache-Control": "no-cache", - }, + try: + if is_stream: + + async def stream_generator(): + response = stub.chat_completion(request_data) + try: + iterator = iterate_in_threadpool(response) + async for chunk in iterator: + logger.warning( + f"request id: {request_id}, scheduler got chunk from node: {chunk}" + ) + yield chunk + finally: + logger.warning(f"client disconnected for {request_id}") + # response.cancel() + + resp = StreamingResponse( + stream_generator(), + media_type="text/event-stream", + headers={ + "X-Content-Type-Options": "nosniff", + "Cache-Control": "no-cache", + }, + ) + logger.debug(f"Streaming response initiated for {request_id}") + return resp + else: + response = stub.chat_completion(request_data) + response = next(response).decode() + logger.debug(f"Non-stream response completed for {request_id}") + # response is a JSON string; parse to Python object before returning + return JSONResponse(content=json.loads(response)) + except Exception as e: + logger.exception(f"Error in _forward_request: {e}") + return JSONResponse( + content={"error": "Internal server error"}, + status_code=500, ) - logger.debug(f"Streaming response initiated for {request_id}") - return resp - else: - response = stub.chat_completion(request_data) - response = next(response).decode() - logger.debug(f"Non-stream response completed for {request_id}") - # response is a JSON string; parse to Python object before returning - return JSONResponse(content=json.loads(response)) async def v1_chat_completions(self, request_data: Dict, request_id: str, received_ts: int): return await self._forward_request(request_data, request_id, received_ts) diff --git a/src/parallax/p2p/server.py b/src/parallax/p2p/server.py index 772a8a77..eb24e8e4 100644 --- a/src/parallax/p2p/server.py +++ b/src/parallax/p2p/server.py @@ -173,6 +173,7 @@ def chat_completion( ) as response: for chunk in response.iter_bytes(): if chunk: + logger.warning(f"node got chunk from inference: {chunk}") yield chunk else: response = client.post( diff --git a/src/parallax/sglang/model_runner.py b/src/parallax/sglang/model_runner.py index 1e0b4d9f..4b79c05a 100755 --- a/src/parallax/sglang/model_runner.py +++ b/src/parallax/sglang/model_runner.py @@ -496,7 +496,7 @@ def form_sgl_server_args( ): """Creates a SGL ServerArgs object""" sgl_server_args = ServerArgs( - model_path=model_path, + model_path=model_path dtype=dtype, attention_backend=attention_backend, page_size=kv_block_size, diff --git a/src/parallax_utils/logging_config.py b/src/parallax_utils/logging_config.py index 6dba33bb..0fde5eb5 100644 --- a/src/parallax_utils/logging_config.py +++ b/src/parallax_utils/logging_config.py @@ -90,7 +90,7 @@ def _initialize_if_necessary(): _default_handler.setFormatter(formatter) # root level from env or INFO - level_name = os.getenv("PARALLAX_LOGLEVEL", "INFO").upper() + level_name = os.getenv("PARALLAX_LOGLEVEL", "DEBUG").upper() logging.getLogger().setLevel(level_name) # Allow logs from our main packages by default From 81279bac0a6e74d8e09d01a3bfebfcb649002b02 Mon Sep 17 00:00:00 2001 From: gufengc Date: Thu, 16 Oct 2025 18:46:39 +0800 Subject: [PATCH 2/6] update --- src/backend/server/request_handler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/backend/server/request_handler.py b/src/backend/server/request_handler.py index 5b413514..d2571ffb 100644 --- a/src/backend/server/request_handler.py +++ b/src/backend/server/request_handler.py @@ -108,7 +108,7 @@ async def stream_generator(): yield chunk finally: logger.warning(f"client disconnected for {request_id}") - # response.cancel() + response.cancel() resp = StreamingResponse( stream_generator(), From f74524d2fa636146649911a0c37602da23847fd4 Mon Sep 17 00:00:00 2001 From: gufengc Date: Thu, 16 Oct 2025 18:49:49 +0800 Subject: [PATCH 3/6] update --- src/parallax/p2p/server.py | 1 - src/parallax/sglang/model_runner.py | 2 +- src/parallax_utils/logging_config.py | 2 +- 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/src/parallax/p2p/server.py b/src/parallax/p2p/server.py index eb24e8e4..772a8a77 100644 --- a/src/parallax/p2p/server.py +++ b/src/parallax/p2p/server.py @@ -173,7 +173,6 @@ def chat_completion( ) as response: for chunk in response.iter_bytes(): if chunk: - logger.warning(f"node got chunk from inference: {chunk}") yield chunk else: response = client.post( diff --git a/src/parallax/sglang/model_runner.py b/src/parallax/sglang/model_runner.py index 4b79c05a..1e0b4d9f 100755 --- a/src/parallax/sglang/model_runner.py +++ b/src/parallax/sglang/model_runner.py @@ -496,7 +496,7 @@ def form_sgl_server_args( ): """Creates a SGL ServerArgs object""" sgl_server_args = ServerArgs( - model_path=model_path + model_path=model_path, dtype=dtype, attention_backend=attention_backend, page_size=kv_block_size, diff --git a/src/parallax_utils/logging_config.py b/src/parallax_utils/logging_config.py index 0fde5eb5..6dba33bb 100644 --- a/src/parallax_utils/logging_config.py +++ b/src/parallax_utils/logging_config.py @@ -90,7 +90,7 @@ def _initialize_if_necessary(): _default_handler.setFormatter(formatter) # root level from env or INFO - level_name = os.getenv("PARALLAX_LOGLEVEL", "DEBUG").upper() + level_name = os.getenv("PARALLAX_LOGLEVEL", "INFO").upper() logging.getLogger().setLevel(level_name) # Allow logs from our main packages by default From 6417822b06e761b84a71731f2d6ed887b6abaf4b Mon Sep 17 00:00:00 2001 From: gufengc Date: Thu, 16 Oct 2025 18:50:13 +0800 Subject: [PATCH 4/6] update --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index fdc6d91f..a53887e3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,7 +30,7 @@ dependencies = [ "pydantic", "protobuf==6.31.1", "dijkstar==2.6.0", - "lattica==1.0.4", + "lattica==1.0.6", ] [project.scripts] From 7c4e2a2cd5789e6c809562956fecf973f140273b Mon Sep 17 00:00:00 2001 From: gufengc Date: Thu, 16 Oct 2025 18:51:25 +0800 Subject: [PATCH 5/6] update --- src/backend/server/request_handler.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/backend/server/request_handler.py b/src/backend/server/request_handler.py index d2571ffb..cc43d5c9 100644 --- a/src/backend/server/request_handler.py +++ b/src/backend/server/request_handler.py @@ -102,12 +102,9 @@ async def stream_generator(): try: iterator = iterate_in_threadpool(response) async for chunk in iterator: - logger.warning( - f"request id: {request_id}, scheduler got chunk from node: {chunk}" - ) yield chunk finally: - logger.warning(f"client disconnected for {request_id}") + logger.debug(f"client disconnected for {request_id}") response.cancel() resp = StreamingResponse( From d89d38fe16346c3851db4c05dcf5dcd8330d6225 Mon Sep 17 00:00:00 2001 From: gufengc Date: Thu, 16 Oct 2025 19:15:06 +0800 Subject: [PATCH 6/6] update --- src/backend/server/request_handler.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/backend/server/request_handler.py b/src/backend/server/request_handler.py index cc43d5c9..b32b188a 100644 --- a/src/backend/server/request_handler.py +++ b/src/backend/server/request_handler.py @@ -1,6 +1,5 @@ import json from typing import Dict -import asyncio import aiohttp from fastapi.responses import JSONResponse, StreamingResponse @@ -123,7 +122,7 @@ async def stream_generator(): logger.debug(f"Non-stream response completed for {request_id}") # response is a JSON string; parse to Python object before returning return JSONResponse(content=json.loads(response)) - except Exception as e: + except Exception as e: logger.exception(f"Error in _forward_request: {e}") return JSONResponse( content={"error": "Internal server error"},