Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 17 additions & 9 deletions fastdeploy/engine/sched/resource_manager_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -728,14 +728,7 @@ def _allocate_decode_and_extend():
if scheduled_reqs:
llm_logger.debug(f"schedued_reqs: {scheduled_reqs}")

# Update metrics
num_tasks = sum([1 if task else 0 for task in self.tasks_list])
num_blocks_used_by_tasks = sum([len(task.block_tables) if task else 0 for task in self.tasks_list])
main_process_metrics.available_gpu_block_num.set(self.total_block_number() - num_blocks_used_by_tasks)
main_process_metrics.batch_size.set(self.max_num_seqs - self.available_batch())
main_process_metrics.gpu_cache_usage_perc.set(self.get_gpu_cache_usage_perc())
main_process_metrics.num_requests_running.set(len(self.running))
main_process_metrics.num_requests_waiting.set(num_tasks - len(self.running))
self.update_metrics()

return scheduled_reqs

Expand Down Expand Up @@ -962,7 +955,10 @@ def finish_requests(self, request_ids: Union[str, Iterable[str]]):
if request in self.running: # normally run and finished
self.running.remove(request)
request.status = RequestStatus.FINISHED
self._free_blocks(request)
try:
self._free_blocks(request)
except Exception as e:
llm_logger.warning(f"release block failed {req_id}: {e}")
if (
request.request_id in self.to_be_rescheduled_request_id_set
): # finished after preempted, blocks have been recycled.
Expand All @@ -981,7 +977,19 @@ def finish_requests(self, request_ids: Union[str, Iterable[str]]):
del self.req_dict[req_id]
except Exception as e:
llm_logger.error(f"finish_request err: {e}, {str(traceback.format_exc())}")
finally:
self.update_metrics()

def clear_data(self):
self.waiting: deque[Request] = deque()
self.to_be_rescheduled_request_id_set = set()

def update_metrics(self):
# Update metrics
num_tasks = sum([1 if task else 0 for task in self.tasks_list])
num_blocks_used_by_tasks = sum([len(task.block_tables) if task else 0 for task in self.tasks_list])
main_process_metrics.available_gpu_block_num.set(self.total_block_number() - num_blocks_used_by_tasks)
main_process_metrics.batch_size.set(self.max_num_seqs - self.available_batch())
main_process_metrics.gpu_cache_usage_perc.set(self.get_gpu_cache_usage_perc())
main_process_metrics.num_requests_running.set(len(self.running))
main_process_metrics.num_requests_waiting.set(num_tasks - len(self.running))
2 changes: 1 addition & 1 deletion fastdeploy/output/token_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -847,7 +847,7 @@ def _record_speculative_decoding_mertics(self, accept_num):
def clear_data(self):
if envs.ENABLE_V1_KVCACHE_SCHEDULER:
self.resource_manager.clear_data()
for i in range(self.cfg.max_num_seqs):
for i in range(self.resource_manager.max_num_seqs):
if self.resource_manager.stop_flags[i]:
continue
task = self.resource_manager.tasks_list[i]
Expand Down
Loading