From 7683b27f88f68e90d626003ce4285239b6feccf5 Mon Sep 17 00:00:00 2001 From: sunlei1024 Date: Wed, 3 Sep 2025 22:13:34 +0800 Subject: [PATCH 1/2] Support for async processor added. --- fastdeploy/entrypoints/engine_client.py | 12 ++++++++---- fastdeploy/entrypoints/openai/api_server.py | 15 +++++++++++++++ fastdeploy/entrypoints/openai/serving_chat.py | 2 +- .../entrypoints/openai/serving_completion.py | 2 +- tests/utils/test_custom_chat_template.py | 4 ++-- 5 files changed, 27 insertions(+), 8 deletions(-) diff --git a/fastdeploy/entrypoints/engine_client.py b/fastdeploy/entrypoints/engine_client.py index aeb99f33fda..fa23aaaee7f 100644 --- a/fastdeploy/entrypoints/engine_client.py +++ b/fastdeploy/entrypoints/engine_client.py @@ -14,6 +14,7 @@ # limitations under the License. """ +import inspect import os import time import traceback @@ -112,7 +113,7 @@ def create_zmq_client(self, model, mode): self.zmq_client = ZmqClient(model, mode) self.zmq_client.connect() - def format_and_add_data(self, prompts: dict): + async def format_and_add_data(self, prompts: dict): """ Format the request data and send the request to the server. """ @@ -123,10 +124,10 @@ def format_and_add_data(self, prompts: dict): if "max_tokens" not in prompts: prompts["max_tokens"] = self.max_model_len - 1 - self.add_requests(prompts) + await self.add_requests(prompts) return prompts["prompt_token_ids"] - def add_requests(self, task): + async def add_requests(self, task): """ Add a new request to the queue. @@ -140,7 +141,10 @@ def add_requests(self, task): task["preprocess_start_time"] = time.time() try: - self.data_processor.process_request_dict(task, self.max_model_len) + if inspect.iscoroutinefunction(self.data_processor.process_request_dict): + await self.data_processor.process_request_dict(task, self.max_model_len) + else: + self.data_processor.process_request_dict(task, self.max_model_len) task["prompt_token_ids_len"] = len(task["prompt_token_ids"]) input_ids_len = task["prompt_token_ids_len"] diff --git a/fastdeploy/entrypoints/openai/api_server.py b/fastdeploy/entrypoints/openai/api_server.py index db78ec6c296..752ce6848e7 100644 --- a/fastdeploy/entrypoints/openai/api_server.py +++ b/fastdeploy/entrypoints/openai/api_server.py @@ -310,6 +310,13 @@ async def wrapped_generator(): return wrapped_generator +import yappi + +yappi.set_clock_type("cpu") # CPU 时间模式,可改 wall 时间 +yappi.start() +yappi_req_id = 0 + + @app.post("/v1/chat/completions") async def create_chat_completion(request: ChatCompletionRequest): """ @@ -322,6 +329,7 @@ async def create_chat_completion(request: ChatCompletionRequest): return JSONResponse(content={"error": "Worker Service Not Healthy"}, status_code=304) try: async with connection_manager(): + yappi.clear_stats() inject_to_metadata(request) generator = await app.state.chat_handler.create_chat_completion(request) if isinstance(generator, ErrorResponse): @@ -339,6 +347,13 @@ async def create_chat_completion(request: ChatCompletionRequest): except HTTPException as e: api_server_logger.error(f"Error in chat completion: {str(e)}") return JSONResponse(status_code=e.status_code, content={"detail": e.detail}) + finally: + # 获取函数统计 + global yappi_req_id + yappi_req_id = yappi_req_id + 1 + func_stats = yappi.get_func_stats() + func_stats.sort("ttot") # 按总耗时排序 + func_stats.save(f"profile_{yappi_req_id}.callgrind", type="callgrind") @app.post("/v1/completions") diff --git a/fastdeploy/entrypoints/openai/serving_chat.py b/fastdeploy/entrypoints/openai/serving_chat.py index 16f5f78a0ff..cddfef63471 100644 --- a/fastdeploy/entrypoints/openai/serving_chat.py +++ b/fastdeploy/entrypoints/openai/serving_chat.py @@ -119,7 +119,7 @@ async def create_chat_completion(self, request: ChatCompletionRequest): if "chat_template" not in current_req_dict: current_req_dict["chat_template"] = self.chat_template current_req_dict["arrival_time"] = time.time() - prompt_token_ids = self.engine_client.format_and_add_data(current_req_dict) + prompt_token_ids = await self.engine_client.format_and_add_data(current_req_dict) text_after_process = current_req_dict.get("text_after_process") if isinstance(prompt_token_ids, np.ndarray): prompt_token_ids = prompt_token_ids.tolist() diff --git a/fastdeploy/entrypoints/openai/serving_completion.py b/fastdeploy/entrypoints/openai/serving_completion.py index aa5d5f3c56e..3df22de9c5c 100644 --- a/fastdeploy/entrypoints/openai/serving_completion.py +++ b/fastdeploy/entrypoints/openai/serving_completion.py @@ -146,7 +146,7 @@ async def create_completion(self, request: CompletionRequest): request_id_idx = f"{request_id}-{idx}" current_req_dict = request.to_dict_for_infer(request_id_idx, prompt) current_req_dict["arrival_time"] = time.time() - prompt_token_ids = self.engine_client.format_and_add_data(current_req_dict) # tokenize + prompt_token_ids = await self.engine_client.format_and_add_data(current_req_dict) # tokenize if isinstance(prompt_token_ids, np.ndarray): prompt_token_ids = prompt_token_ids.tolist() text_after_process_list.append(current_req_dict.get("text_after_process")) diff --git a/tests/utils/test_custom_chat_template.py b/tests/utils/test_custom_chat_template.py index acb6be96079..71a617044b4 100644 --- a/tests/utils/test_custom_chat_template.py +++ b/tests/utils/test_custom_chat_template.py @@ -70,7 +70,7 @@ async def mock_chat_completion_full_generator( ): return prompt_token_ids - def mock_format_and_add_data(current_req_dict): + async def mock_format_and_add_data(current_req_dict): return current_req_dict self.chat_completion_handler.chat_completion_full_generator = mock_chat_completion_full_generator @@ -97,7 +97,7 @@ async def mock_chat_completion_full_generator( ): return prompt_token_ids - def mock_format_and_add_data(current_req_dict): + async def mock_format_and_add_data(current_req_dict): return current_req_dict self.chat_completion_handler.chat_completion_full_generator = mock_chat_completion_full_generator From a737fb861c430689f5e2b5847e59d6f547f7f1dd Mon Sep 17 00:00:00 2001 From: sunlei1024 Date: Wed, 3 Sep 2025 22:22:54 +0800 Subject: [PATCH 2/2] remove yappi code --- fastdeploy/entrypoints/openai/api_server.py | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/fastdeploy/entrypoints/openai/api_server.py b/fastdeploy/entrypoints/openai/api_server.py index 752ce6848e7..db78ec6c296 100644 --- a/fastdeploy/entrypoints/openai/api_server.py +++ b/fastdeploy/entrypoints/openai/api_server.py @@ -310,13 +310,6 @@ async def wrapped_generator(): return wrapped_generator -import yappi - -yappi.set_clock_type("cpu") # CPU 时间模式,可改 wall 时间 -yappi.start() -yappi_req_id = 0 - - @app.post("/v1/chat/completions") async def create_chat_completion(request: ChatCompletionRequest): """ @@ -329,7 +322,6 @@ async def create_chat_completion(request: ChatCompletionRequest): return JSONResponse(content={"error": "Worker Service Not Healthy"}, status_code=304) try: async with connection_manager(): - yappi.clear_stats() inject_to_metadata(request) generator = await app.state.chat_handler.create_chat_completion(request) if isinstance(generator, ErrorResponse): @@ -347,13 +339,6 @@ async def create_chat_completion(request: ChatCompletionRequest): except HTTPException as e: api_server_logger.error(f"Error in chat completion: {str(e)}") return JSONResponse(status_code=e.status_code, content={"detail": e.detail}) - finally: - # 获取函数统计 - global yappi_req_id - yappi_req_id = yappi_req_id + 1 - func_stats = yappi.get_func_stats() - func_stats.sort("ttot") # 按总耗时排序 - func_stats.save(f"profile_{yappi_req_id}.callgrind", type="callgrind") @app.post("/v1/completions")