Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 30 additions & 4 deletions fastdeploy/engine/sched/resource_manager_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,26 @@ def _update_mm_hashes(self, request):
inputs["mm_positions"] = []
inputs["mm_hashes"] = []

def _is_mm_request(self, request):
inputs = request.multimodal_inputs
if inputs is None or len(inputs) == 0:
return False

if (
(inputs.get("video_feature_urls") is not None and len(inputs["video_feature_urls"]) > 0)
or (inputs.get("image_feature_urls") is not None and len(inputs["image_feature_urls"]) > 0)
or (inputs.get("audio_feature_urls") is not None and len(inputs["audio_feature_urls"]) > 0)
):
return True
elif (
inputs.get("images", None) is not None
and inputs.get("image_patch_id", None) is not None
and inputs.get("grid_thw", None) is not None
):
return True

return False

def _get_num_new_tokens(self, request, token_budget):
# TODO: set condition to new _get_num_new_tokens
num_new_tokens = request.need_prefill_tokens - request.num_computed_tokens
Expand Down Expand Up @@ -465,6 +485,12 @@ def _get_num_new_tokens(self, request, token_budget):
# Compatible with scenarios without images and videos.
return num_new_tokens

def exist_mm_prefill(self, scheduled_reqs):
for request in scheduled_reqs:
if request.task_type == RequestType.PREFILL and self._is_mm_request(request):
return True
return False

def exist_prefill(self, scheduled_reqs):
for request in scheduled_reqs:
if request.task_type == RequestType.PREFILL:
Expand Down Expand Up @@ -610,12 +636,12 @@ def _allocate_decode_and_extend():
while self.waiting and token_budget > 0:
if len(self.running) == self.max_num_seqs:
break
if not self.enable_max_prefill and (
(self.config.model_config.enable_mm or paddle.is_compiled_with_xpu())
and self.exist_prefill(scheduled_reqs)

request = self.waiting[0]
if (self._is_mm_request(request) and self.exist_mm_prefill(scheduled_reqs)) or (
paddle.is_compiled_with_xpu() and self.exist_prefill(scheduled_reqs)
):
break
request = self.waiting[0]
if request.status == RequestStatus.WAITING:
self._update_mm_hashes(request)
# Enable prefix caching
Expand Down
3 changes: 1 addition & 2 deletions fastdeploy/worker/gpu_model_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -470,8 +470,6 @@ def _apply_mm_inputs(self, request: Request, multi_vision_inputs: dict, rope_3d_
else:
raise ValueError(f"multiple modalities model {self.model_config.model_type} is not supported")
self.share_inputs["image_features"] = image_features[-actual_image_token_num:]
else:
self.share_inputs["image_features"] = None

position_ids = request.multimodal_inputs["position_ids"]
rope_3d_position_ids["position_ids_idx"].append(request.idx)
Expand All @@ -494,6 +492,7 @@ def insert_tasks_v1(self, req_dicts: List[Request], num_running_requests: int =
req_len = len(req_dicts)
has_prefill_task = False
has_decode_task = False
self.share_inputs["image_features"] = None
multi_vision_inputs = {"images_lst": [], "grid_thw_lst": [], "vit_position_ids_lst": [], "cu_seqlens": [0]}
rope_3d_position_ids = {
"position_ids_idx": [],
Expand Down
1 change: 0 additions & 1 deletion fastdeploy/worker/worker_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,6 @@ def event_loop_normal(self) -> None:
# The first worker detects whether there are tasks in the task queue
if local_rank == 0:
if self.task_queue.num_tasks() > 0:
# VL only support 1 batch to prefill
if envs.ENABLE_V1_KVCACHE_SCHEDULER or not (
self.fd_config.model_config.enable_mm and self.worker.exist_prefill()
):
Expand Down
Loading