From 7d3d23d7bd197a9d18161b93d2e2178d17ea8d55 Mon Sep 17 00:00:00 2001 From: liyonghua0910 Date: Mon, 10 Nov 2025 16:49:43 +0800 Subject: [PATCH 1/2] [BugFix] fix num_requests_running after clear_data --- .../engine/sched/resource_manager_v1.py | 21 ++++++++++++------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/fastdeploy/engine/sched/resource_manager_v1.py b/fastdeploy/engine/sched/resource_manager_v1.py index 0ffbc3aa7c8..6ca099b6942 100644 --- a/fastdeploy/engine/sched/resource_manager_v1.py +++ b/fastdeploy/engine/sched/resource_manager_v1.py @@ -728,14 +728,7 @@ def _allocate_decode_and_extend(): if scheduled_reqs: llm_logger.debug(f"schedued_reqs: {scheduled_reqs}") - # Update metrics - num_tasks = sum([1 if task else 0 for task in self.tasks_list]) - num_blocks_used_by_tasks = sum([len(task.block_tables) if task else 0 for task in self.tasks_list]) - main_process_metrics.available_gpu_block_num.set(self.total_block_number() - num_blocks_used_by_tasks) - main_process_metrics.batch_size.set(self.max_num_seqs - self.available_batch()) - main_process_metrics.gpu_cache_usage_perc.set(self.get_gpu_cache_usage_perc()) - main_process_metrics.num_requests_running.set(len(self.running)) - main_process_metrics.num_requests_waiting.set(num_tasks - len(self.running)) + self.update_metrics() return scheduled_reqs @@ -981,7 +974,19 @@ def finish_requests(self, request_ids: Union[str, Iterable[str]]): del self.req_dict[req_id] except Exception as e: llm_logger.error(f"finish_request err: {e}, {str(traceback.format_exc())}") + finally: + self.update_metrics() def clear_data(self): self.waiting: deque[Request] = deque() self.to_be_rescheduled_request_id_set = set() + + def update_metrics(self): + # Update metrics + num_tasks = sum([1 if task else 0 for task in self.tasks_list]) + num_blocks_used_by_tasks = sum([len(task.block_tables) if task else 0 for task in self.tasks_list]) + main_process_metrics.available_gpu_block_num.set(self.total_block_number() - num_blocks_used_by_tasks) + main_process_metrics.batch_size.set(self.max_num_seqs - self.available_batch()) + main_process_metrics.gpu_cache_usage_perc.set(self.get_gpu_cache_usage_perc()) + main_process_metrics.num_requests_running.set(len(self.running)) + main_process_metrics.num_requests_waiting.set(num_tasks - len(self.running)) From 30e21fc864e70a41f378d1ccf07c26098c6c7221 Mon Sep 17 00:00:00 2001 From: liyonghua0910 Date: Tue, 11 Nov 2025 16:31:16 +0800 Subject: [PATCH 2/2] [fix] fix tasks_list and stop flags not cleared when _free_blocks failed --- fastdeploy/engine/sched/resource_manager_v1.py | 5 ++++- fastdeploy/output/token_processor.py | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/fastdeploy/engine/sched/resource_manager_v1.py b/fastdeploy/engine/sched/resource_manager_v1.py index 6ca099b6942..e7da5422f02 100644 --- a/fastdeploy/engine/sched/resource_manager_v1.py +++ b/fastdeploy/engine/sched/resource_manager_v1.py @@ -955,7 +955,10 @@ def finish_requests(self, request_ids: Union[str, Iterable[str]]): if request in self.running: # normally run and finished self.running.remove(request) request.status = RequestStatus.FINISHED - self._free_blocks(request) + try: + self._free_blocks(request) + except Exception as e: + llm_logger.warning(f"release block failed {req_id}: {e}") if ( request.request_id in self.to_be_rescheduled_request_id_set ): # finished after preempted, blocks have been recycled. diff --git a/fastdeploy/output/token_processor.py b/fastdeploy/output/token_processor.py index e6efcc33e09..dc8effb816a 100644 --- a/fastdeploy/output/token_processor.py +++ b/fastdeploy/output/token_processor.py @@ -835,7 +835,7 @@ def _record_speculative_decoding_mertics(self, accept_num): def clear_data(self): if envs.ENABLE_V1_KVCACHE_SCHEDULER: self.resource_manager.clear_data() - for i in range(self.cfg.max_num_seqs): + for i in range(self.resource_manager.max_num_seqs): if self.resource_manager.stop_flags[i]: continue task = self.resource_manager.tasks_list[i]