Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions fastdeploy/cache_manager/prefix_cache_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,8 @@ def request_match_blocks(self, task, block_size, *args):
"cpu_cache_blocks": 0,
"gpu_match_token_num": 0,
"cpu_match_token_num": 0,
"match_gpu_block_ids": [],
"match_cpu_block_ids": [],
}
self.metrics.req_count += 1
if isinstance(task.prompt_token_ids, np.ndarray):
Expand Down Expand Up @@ -745,6 +747,8 @@ def request_match_blocks(self, task, block_size, *args):
hit_info["cpu_cache_blocks"] = len(match_cpu_block_ids)
hit_info["gpu_match_token_num"] = gpu_match_token_num
hit_info["cpu_match_token_num"] = cpu_match_token_num
hit_info["match_gpu_block_ids"] = match_gpu_block_ids
hit_info["match_cpu_block_ids"] = match_cpu_block_ids
self.metrics._update_history_hit_metrics()
if self.metrics.req_count % 10000 == 0:
self.metrics.reset_metrics()
Expand Down
42 changes: 30 additions & 12 deletions fastdeploy/engine/sched/resource_manager_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,11 +376,17 @@ def revert_chunked_mm_input(self, mm_inputs, matched_token_num):
if mm_inputs is None or "mm_positions" not in mm_inputs or len(mm_inputs["mm_positions"]) == 0:
return matched_token_num

for idx in range(len(mm_inputs["mm_positions"])):
position = mm_inputs["mm_positions"][idx]
position_idx = len(mm_inputs["mm_positions"]) - 1
while matched_token_num > 0 and position_idx >= 0:
position = mm_inputs["mm_positions"][position_idx]
if position.offset < matched_token_num < position.offset + position.length:
return position.offset
matched_token_num = (
position.offset // self.config.cache_config.block_size
) * self.config.cache_config.block_size
position_idx -= 1
elif matched_token_num < position.offset:
position_idx -= 1
elif matched_token_num >= position.offset + position.length:
break
return matched_token_num

Expand Down Expand Up @@ -950,17 +956,9 @@ def get_prefix_cached_blocks(self, request: Request):
)

request.num_cached_tokens = matched_token_num
request.gpu_cache_token_num = hit_info["gpu_match_token_num"]
request.cpu_cache_token_num = hit_info["cpu_match_token_num"]
request.cache_info = (matched_block_num, no_cache_block_num)
request.block_tables = common_block_ids
request.skip_allocate = False

# Report the number of cached tokens to Prometheus metrics
main_process_metrics.prefix_cache_token_num.inc(matched_token_num)
main_process_metrics.prefix_gpu_cache_token_num.inc(request.gpu_cache_token_num)
main_process_metrics.prefix_cpu_cache_token_num.inc(request.cpu_cache_token_num)

if self.config.cache_config.disable_chunked_mm_input:
if matched_token_num == request.need_prefill_tokens:
matched_token_num = matched_token_num - self.config.cache_config.block_size
Expand All @@ -974,7 +972,27 @@ def get_prefix_cached_blocks(self, request: Request):
request.skip_allocate = True
else:
request.num_computed_tokens = matched_token_num
llm_logger.info(f"request {request.request_id} num_computed_tokens: {request.num_computed_tokens}")

if request.num_cached_tokens != request.num_computed_tokens:
revert_tokens_num = request.num_cached_tokens - request.num_computed_tokens
llm_logger.info(
f"request {request.request_id} num_cached_tokens: {request.num_cached_tokens}, revert_tokens_num: {revert_tokens_num}"
)

revert_block_idx = revert_tokens_num // self.config.cache_config.block_size
for block_idx in range(len(common_block_ids) - 1, revert_block_idx, -1):
if common_block_ids[block_idx] in hit_info["match_gpu_block_ids"]:
hit_info["gpu_match_token_num"] -= self.config.cache_config.block_size
elif common_block_ids[block_idx] in hit_info["match_cpu_block_ids"]:
hit_info["cpu_match_token_num"] -= self.config.cache_config.block_size

request.gpu_cache_token_num = hit_info["gpu_match_token_num"]
request.cpu_cache_token_num = hit_info["cpu_match_token_num"]

# Report the number of cached tokens to Prometheus metrics
main_process_metrics.prefix_cache_token_num.inc(request.num_computed_tokens)
main_process_metrics.prefix_gpu_cache_token_num.inc(request.gpu_cache_token_num)
main_process_metrics.prefix_cpu_cache_token_num.inc(request.cpu_cache_token_num)
request.cache_prepare_time = time.time() - cache_prepare_time
return True
except Exception as e:
Expand Down
6 changes: 0 additions & 6 deletions fastdeploy/multimodal/hasher.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,11 @@

import numpy as np

from fastdeploy.utils import data_processor_logger


class MultimodalHasher:

@classmethod
def hash_features(cls, obj: object) -> str:
if isinstance(obj, np.ndarray):
return hashlib.sha256((obj.tobytes())).hexdigest()

data_processor_logger.warning(
f"Unsupported type for hashing features: {type(obj)}" + ", use pickle for serialization"
)
return hashlib.sha256((pickle.dumps(obj))).hexdigest()
43 changes: 22 additions & 21 deletions tests/v1/test_resource_manager_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ def setUp(self):
model_cfg.max_model_len = 5120
model_cfg.architectures = ["test_model"]
cache_cfg.bytes_per_layer_per_block = 1
cache_cfg.block_size = 64
parallel_cfg = ParallelConfig(args)
scheduler_cfg = SchedulerConfig(args)
graph_opt_cfg = engine_args.create_graph_optimization_config()
Expand All @@ -214,58 +215,58 @@ def setUp(self):
self.request.multimodal_inputs = {}

def test_revert_chunked_mm_input_none_input(self):
result = self.manager.revert_chunked_mm_input(None, 10)
self.assertEqual(result, 10)
result = self.manager.revert_chunked_mm_input(None, 64)
self.assertEqual(result, 64)

def test_revert_chunked_mm_input_no_mm_positions(self):
mm_inputs = {"other_field": "value"}
result = self.manager.revert_chunked_mm_input(mm_inputs, 10)
self.assertEqual(result, 10)
result = self.manager.revert_chunked_mm_input(mm_inputs, 128)
self.assertEqual(result, 128)

def test_revert_chunked_mm_input_empty_positions(self):
mm_inputs = {"mm_positions": []}
result = self.manager.revert_chunked_mm_input(mm_inputs, 10)
self.assertEqual(result, 10)
result = self.manager.revert_chunked_mm_input(mm_inputs, 128)
self.assertEqual(result, 128)

def test_revert_chunked_mm_input_matched_in_chunk(self):
mm_inputs = {
"mm_positions": [
ImagePosition(offset=5, length=10),
ImagePosition(offset=20, length=10),
ImagePosition(offset=40, length=100),
ImagePosition(offset=200, length=80),
]
}
result = self.manager.revert_chunked_mm_input(mm_inputs, 8)
self.assertEqual(result, 5)
result = self.manager.revert_chunked_mm_input(mm_inputs, 256)
self.assertEqual(result, 192)

def test_revert_chunked_mm_input_matched_in_second_chunk(self):
mm_inputs = {
"mm_positions": [
ImagePosition(offset=5, length=10),
ImagePosition(offset=20, length=10),
ImagePosition(offset=100, length=100),
ImagePosition(offset=200, length=80),
]
}
result = self.manager.revert_chunked_mm_input(mm_inputs, 25)
self.assertEqual(result, 20)
result = self.manager.revert_chunked_mm_input(mm_inputs, 256)
self.assertEqual(result, 64)

def test_revert_chunked_mm_input_before_first_chunk(self):
mm_inputs = {
"mm_positions": [
ImagePosition(offset=5, length=10),
ImagePosition(offset=20, length=10),
ImagePosition(offset=60, length=100),
ImagePosition(offset=180, length=100),
]
}
result = self.manager.revert_chunked_mm_input(mm_inputs, 3)
self.assertEqual(result, 3)
result = self.manager.revert_chunked_mm_input(mm_inputs, 256)
self.assertEqual(result, 0)

def test_revert_chunked_mm_input_after_last_chunk(self):
mm_inputs = {
"mm_positions": [
ImagePosition(offset=5, length=10),
ImagePosition(offset=20, length=10),
ImagePosition(offset=200, length=56),
]
}
result = self.manager.revert_chunked_mm_input(mm_inputs, 35)
self.assertEqual(result, 35)
result = self.manager.revert_chunked_mm_input(mm_inputs, 256)
self.assertEqual(result, 256)


if __name__ == "__main__":
Expand Down
Loading