From 337d91ba88d6636e727c42cdb6bcc550e5048827 Mon Sep 17 00:00:00 2001 From: ShaneGZhu <1092841848@qq.com> Date: Mon, 13 Apr 2026 18:47:39 +0800 Subject: [PATCH 01/12] draft --- fastdeploy/model_executor/forward_meta.py | 2 +- .../layers/attention/dsa_attention_backend.py | 9 +----- .../model_executor/models/deepseek_v3.py | 32 ++++++++++++------- 3 files changed, 23 insertions(+), 20 deletions(-) diff --git a/fastdeploy/model_executor/forward_meta.py b/fastdeploy/model_executor/forward_meta.py index 44cf528bed3..bb7c935c5f5 100644 --- a/fastdeploy/model_executor/forward_meta.py +++ b/fastdeploy/model_executor/forward_meta.py @@ -161,7 +161,7 @@ class ForwardMeta: # for mla & dsa position_ids: Optional[paddle.Tensor] = None mask_encoder_batch: Optional[paddle.Tensor] = None - + slot_mapping: Optional[paddle.Tensor] = None real_bsz: int = 0 def clear_caches(self): diff --git a/fastdeploy/model_executor/layers/attention/dsa_attention_backend.py b/fastdeploy/model_executor/layers/attention/dsa_attention_backend.py index aaaf7018dbe..525e35fe820 100644 --- a/fastdeploy/model_executor/layers/attention/dsa_attention_backend.py +++ b/fastdeploy/model_executor/layers/attention/dsa_attention_backend.py @@ -346,18 +346,11 @@ def forward_mixed( scale = paddle.abs(compressed_kv).max() / 200.0 - slot_mapping = compute_slot_mapping( - forward_meta.block_tables, - forward_meta.position_ids, - forward_meta.batch_id_per_token, - self.block_size, - ) - dsk_attn_write_cache( compressed_kv, k_pe, latent_cache, - slot_mapping, + forward_meta.slot_mapping, scale.cast(paddle.float32), "fp8_ds_mla", ) diff --git a/fastdeploy/model_executor/models/deepseek_v3.py b/fastdeploy/model_executor/models/deepseek_v3.py index 29df0342a6a..343d7627f8e 100644 --- a/fastdeploy/model_executor/models/deepseek_v3.py +++ b/fastdeploy/model_executor/models/deepseek_v3.py @@ -661,17 +661,12 @@ def forward( weights = weights.unsqueeze(-1) * q_scale * self.softmax_scale * self.index_n_heads**-0.5 weights = weights.squeeze(-1) - slot_mapping = compute_slot_mapping( - forward_meta.block_tables, - forward_meta.position_ids, - forward_meta.batch_id_per_token, - 64, - ) - indexer_top_k = paddle.full([q_fp8.shape[0], self.index_topk], -1, dtype="int32") # indexer write_cache - indexer_k_quant_and_cache(k, self.indexer_cache, slot_mapping, self.quant_block_size, self.scale_fmt) + indexer_k_quant_and_cache( + k, self.indexer_cache, forward_meta.slot_mapping, self.quant_block_size, self.scale_fmt + ) import deep_gemm @@ -1152,6 +1147,9 @@ def __init__(self, fd_config: FDConfig): self.mask_encoder_batch_buffer = paddle.empty( [fd_config.scheduler_config.max_num_batched_tokens, 1], dtype=paddle.int32 ) + self.slot_mapping_buffer = paddle.empty( + [fd_config.scheduler_config.max_num_batched_tokens], dtype=paddle.int64 + ) @classmethod def name(cls): @@ -1248,7 +1246,7 @@ def compute_logits(self, hidden_states: paddle.Tensor, forward_meta: ForwardMeta logits[:, self.ori_vocab_size :] = -float("inf") return logits - def pre_process(self, forward_meta): + def pre_process(self, forward_meta) -> None: """ """ seq_lens_encoder = forward_meta.seq_lens_encoder seq_lens_decoder = forward_meta.seq_lens_decoder @@ -1265,7 +1263,19 @@ def pre_process(self, forward_meta): position_ids, mask_encoder_batch, ) - return position_ids, mask_encoder_batch + + block_size = self.fd_config.cache_config.block_size + block_idx = position_ids // block_size # [num_tokens] + block_ids = forward_meta.block_tables[forward_meta.batch_id_per_token, block_idx] # [num_tokens] + block_offset = position_ids % block_size # [num_tokens] + slot_mapping = (block_ids * block_size + block_offset).cast(paddle.int64) + + forward_meta.position_ids = position_ids + paddle.assign( + slot_mapping, + self.slot_mapping_buffer[:current_total_tokens], + ) + forward_meta.slot_mapping = self.slot_mapping_buffer[:current_total_tokens] def empty_input_forward(self, forward_meta): """ @@ -1287,7 +1297,7 @@ def forward( forward_meta: ForwardMeta, ): ids_remove_padding = inputs["ids_remove_padding"] - forward_meta.position_ids, forward_meta.mask_encoder_batch = self.pre_process(forward_meta) + self.pre_process(forward_meta) hidden_states = self.model( ids_remove_padding=ids_remove_padding, forward_meta=forward_meta, From fa5947c42fb22add95ff3fdc5529323bcfca660b Mon Sep 17 00:00:00 2001 From: ShaneGZhu <1092841848@qq.com> Date: Mon, 13 Apr 2026 19:54:55 +0800 Subject: [PATCH 02/12] After accurate test, delete compute function. --- .../layers/attention/dsa_attention_backend.py | 27 ------------------- .../model_executor/models/deepseek_v3.py | 27 ------------------- 2 files changed, 54 deletions(-) diff --git a/fastdeploy/model_executor/layers/attention/dsa_attention_backend.py b/fastdeploy/model_executor/layers/attention/dsa_attention_backend.py index 525e35fe820..61974bcba04 100644 --- a/fastdeploy/model_executor/layers/attention/dsa_attention_backend.py +++ b/fastdeploy/model_executor/layers/attention/dsa_attention_backend.py @@ -54,33 +54,6 @@ def yarn_get_mscale(scale=1, mscale=1): return 0.1 * mscale * math.log(scale) + 1.0 -def compute_slot_mapping( - block_tables: paddle.Tensor, # [num_reqs, max_blocks_per_req] - positions: paddle.Tensor, # [num_tokens] 每个token的位置 - batch_id_per_token: paddle.Tensor, # [num_tokens] 每个token属于哪个请求 - block_size: int, -) -> paddle.Tensor: - """ - 计算 slot_mapping - - 公式: slot = block_id * block_size + offset_in_block - """ - # 1. 计算每个 token 对应的 block 索引 - block_idx = positions // block_size # [num_tokens] - - # 2. 从 block_tables 中查表获取 block_id - # block_tables[batch_id_per_token, block_idx] - block_ids = block_tables[batch_id_per_token, block_idx] # [num_tokens] - - # 3. 计算在 block 内的偏移 - block_offset = positions % block_size # [num_tokens] - - # 4. 计算 slot_mapping - slot_mapping = block_ids * block_size + block_offset - - return slot_mapping.cast(paddle.int64) - - @dataclass class DSAAttentionMetadata(AttentionMetadata): """ diff --git a/fastdeploy/model_executor/models/deepseek_v3.py b/fastdeploy/model_executor/models/deepseek_v3.py index 343d7627f8e..ed867bf2e06 100644 --- a/fastdeploy/model_executor/models/deepseek_v3.py +++ b/fastdeploy/model_executor/models/deepseek_v3.py @@ -451,33 +451,6 @@ def forward( return output -def compute_slot_mapping( - block_tables: paddle.Tensor, # [num_reqs, max_blocks_per_req] - positions: paddle.Tensor, # [num_tokens] 每个token的位置 - batch_id_per_token: paddle.Tensor, # [num_tokens] 每个token属于哪个请求 - block_size: int, -) -> paddle.Tensor: - """ - 计算 slot_mapping - - 公式: slot = block_id * block_size + offset_in_block - """ - # 1. 计算每个 token 对应的 block 索引 - block_idx = positions // block_size # [num_tokens] - - # 2. 从 block_tables 中查表获取 block_id - # block_tables[batch_id_per_token, block_idx] - block_ids = block_tables[batch_id_per_token, block_idx] # [num_tokens] - - # 3. 计算在 block 内的偏移 - block_offset = positions % block_size # [num_tokens] - - # 4. 计算 slot_mapping - slot_mapping = block_ids * block_size + block_offset - - return slot_mapping.cast(paddle.int64) - - import triton import triton.language as tl From 6ed9a77da23f62fddeced0800fc951836fcd6dce Mon Sep 17 00:00:00 2001 From: ShaneGZhu <1092841848@qq.com> Date: Tue, 14 Apr 2026 17:56:25 +0800 Subject: [PATCH 03/12] Enable pre_process func to be captured --- .../model_executor/models/deepseek_v3.py | 122 +++++++++++------- 1 file changed, 77 insertions(+), 45 deletions(-) diff --git a/fastdeploy/model_executor/models/deepseek_v3.py b/fastdeploy/model_executor/models/deepseek_v3.py index ed867bf2e06..829b28f4769 100644 --- a/fastdeploy/model_executor/models/deepseek_v3.py +++ b/fastdeploy/model_executor/models/deepseek_v3.py @@ -1066,12 +1066,28 @@ def __init__( prefix="deepseek_v3.norm", ) + # self.position_ids_buffer = paddle.empty( + # [fd_config.scheduler_config.max_num_batched_tokens], dtype=paddle.int32 + # ) + # self.mask_encoder_batch_buffer = paddle.empty( + # [fd_config.scheduler_config.max_num_batched_tokens, 1], dtype=paddle.int32 + # ) + # self.slot_mapping_buffer = paddle.empty( + # [fd_config.scheduler_config.max_num_batched_tokens], dtype=paddle.int64 + # ) + self.fd_config = fd_config + def forward( self, ids_remove_padding: paddle.Tensor, forward_meta: ForwardMeta, ): """ """ + import nvtx + + with nvtx.annotate("pre_process", color="green"): + self.pre_process(forward_meta) + hidden_states = self.embed_tokens(ids_remove_padding=ids_remove_padding, forward_meta=forward_meta) residual = None @@ -1088,6 +1104,60 @@ def forward( return out + # def pre_process(self, forward_meta) -> None: + # """ """ + + # seq_lens_encoder = forward_meta.seq_lens_encoder + # seq_lens_decoder = forward_meta.seq_lens_decoder + # seq_lens_this_time = forward_meta.seq_lens_this_time + + # current_total_tokens = forward_meta.ids_remove_padding.shape[0] + # position_ids = self.position_ids_buffer[:current_total_tokens] + # mask_encoder_batch = self.mask_encoder_batch_buffer[:current_total_tokens] + + # get_position_ids_and_mask_encoder_batch( + # seq_lens_encoder, + # seq_lens_decoder, + # seq_lens_this_time, + # position_ids, + # mask_encoder_batch, + # ) + + # block_size = self.fd_config.cache_config.block_size + # block_idx = position_ids // block_size # [num_tokens] + # block_ids = forward_meta.block_tables[forward_meta.batch_id_per_token, block_idx] # [num_tokens] + # block_offset = position_ids % block_size # [num_tokens] + # slot_mapping = (block_ids * block_size + block_offset).cast(paddle.int64) + + # forward_meta.position_ids = position_ids + # paddle.assign( + # slot_mapping, + # self.slot_mapping_buffer[:current_total_tokens], + # ) + # forward_meta.slot_mapping = self.slot_mapping_buffer[:current_total_tokens] + def pre_process(self, forward_meta) -> None: + """ """ + current_total_tokens = forward_meta.ids_remove_padding.shape[0] + position_ids = paddle.empty([current_total_tokens], dtype=paddle.int32) + mask_encoder_batch = paddle.empty([current_total_tokens, 1], dtype=paddle.int32) + + get_position_ids_and_mask_encoder_batch( + forward_meta.seq_lens_encoder, + forward_meta.seq_lens_decoder, + forward_meta.seq_lens_this_time, + position_ids, + mask_encoder_batch, + ) + + block_size = self.fd_config.cache_config.block_size + block_idx = position_ids // block_size # [num_tokens] + block_ids = forward_meta.block_tables[forward_meta.batch_id_per_token, block_idx] # [num_tokens] + block_offset = position_ids % block_size # [num_tokens] + slot_mapping = (block_ids * block_size + block_offset).cast(paddle.int64) + + forward_meta.position_ids = position_ids + forward_meta.slot_mapping = slot_mapping + @ModelRegistry.register_model_class( architecture="DeepseekV3ForCausalLM", @@ -1114,15 +1184,6 @@ def __init__(self, fd_config: FDConfig): num_embeddings=fd_config.model_config.vocab_size, prefix="lm_head", ) - self.position_ids_buffer = paddle.empty( - [fd_config.scheduler_config.max_num_batched_tokens], dtype=paddle.int32 - ) - self.mask_encoder_batch_buffer = paddle.empty( - [fd_config.scheduler_config.max_num_batched_tokens, 1], dtype=paddle.int32 - ) - self.slot_mapping_buffer = paddle.empty( - [fd_config.scheduler_config.max_num_batched_tokens], dtype=paddle.int64 - ) @classmethod def name(cls): @@ -1219,37 +1280,6 @@ def compute_logits(self, hidden_states: paddle.Tensor, forward_meta: ForwardMeta logits[:, self.ori_vocab_size :] = -float("inf") return logits - def pre_process(self, forward_meta) -> None: - """ """ - seq_lens_encoder = forward_meta.seq_lens_encoder - seq_lens_decoder = forward_meta.seq_lens_decoder - seq_lens_this_time = forward_meta.seq_lens_this_time - - current_total_tokens = forward_meta.ids_remove_padding.shape[0] - position_ids = self.position_ids_buffer[:current_total_tokens] - mask_encoder_batch = self.mask_encoder_batch_buffer[:current_total_tokens] - - get_position_ids_and_mask_encoder_batch( - seq_lens_encoder, - seq_lens_decoder, - seq_lens_this_time, - position_ids, - mask_encoder_batch, - ) - - block_size = self.fd_config.cache_config.block_size - block_idx = position_ids // block_size # [num_tokens] - block_ids = forward_meta.block_tables[forward_meta.batch_id_per_token, block_idx] # [num_tokens] - block_offset = position_ids % block_size # [num_tokens] - slot_mapping = (block_ids * block_size + block_offset).cast(paddle.int64) - - forward_meta.position_ids = position_ids - paddle.assign( - slot_mapping, - self.slot_mapping_buffer[:current_total_tokens], - ) - forward_meta.slot_mapping = self.slot_mapping_buffer[:current_total_tokens] - def empty_input_forward(self, forward_meta): """ empty_input_forward @@ -1270,11 +1300,13 @@ def forward( forward_meta: ForwardMeta, ): ids_remove_padding = inputs["ids_remove_padding"] - self.pre_process(forward_meta) - hidden_states = self.model( - ids_remove_padding=ids_remove_padding, - forward_meta=forward_meta, - ) + import nvtx + + with nvtx.annotate("model_forward", color="red"): + hidden_states = self.model( + ids_remove_padding=ids_remove_padding, + forward_meta=forward_meta, + ) return hidden_states def clear_grpah_opt_backend(self): From 5f2e485166b7c5a236595b0617457bcec1d2871b Mon Sep 17 00:00:00 2001 From: ShaneGZhu <1092841848@qq.com> Date: Tue, 14 Apr 2026 20:35:56 +0800 Subject: [PATCH 04/12] [Refactor]Move the preprocessing logic to the and adapted it to support CUDA Graphs. --- fastdeploy/model_executor/forward_meta.py | 2 + .../model_executor/models/deepseek_v3.py | 72 +------------------ fastdeploy/worker/gpu_model_runner.py | 30 ++++++++ fastdeploy/worker/input_batch.py | 6 ++ 4 files changed, 41 insertions(+), 69 deletions(-) diff --git a/fastdeploy/model_executor/forward_meta.py b/fastdeploy/model_executor/forward_meta.py index bb7c935c5f5..fa0dadb0cfc 100644 --- a/fastdeploy/model_executor/forward_meta.py +++ b/fastdeploy/model_executor/forward_meta.py @@ -161,7 +161,9 @@ class ForwardMeta: # for mla & dsa position_ids: Optional[paddle.Tensor] = None mask_encoder_batch: Optional[paddle.Tensor] = None + # for kvcache slot slot_mapping: Optional[paddle.Tensor] = None + real_bsz: int = 0 def clear_caches(self): diff --git a/fastdeploy/model_executor/models/deepseek_v3.py b/fastdeploy/model_executor/models/deepseek_v3.py index 829b28f4769..4464474acf8 100644 --- a/fastdeploy/model_executor/models/deepseek_v3.py +++ b/fastdeploy/model_executor/models/deepseek_v3.py @@ -45,6 +45,9 @@ from fastdeploy.model_executor.layers.lm_head import ParallelLMHead from fastdeploy.model_executor.layers.moe.moe import FusedMoE from fastdeploy.model_executor.layers.normalization import RMSNorm +from fastdeploy.model_executor.layers.quantization.fp8_utils import ( + per_token_group_quant_fp8, +) from fastdeploy.model_executor.layers.rotary_embedding import ( DeepseekScalingRotaryEmbedding, ) @@ -58,16 +61,6 @@ ) from fastdeploy.platforms import current_platform -if current_platform.is_cuda() or current_platform.is_maca(): - from fastdeploy.model_executor.ops.gpu import ( - get_position_ids_and_mask_encoder_batch, - ) - -from fastdeploy.model_executor.layers.quantization.fp8_utils import ( - per_token_group_quant_fp8, -) -from fastdeploy.platforms import current_platform - if current_platform.is_cuda(): from fastdeploy.model_executor.ops.gpu import ( cp_gather_indexer_k_quant_cache, @@ -1083,11 +1076,6 @@ def forward( forward_meta: ForwardMeta, ): """ """ - import nvtx - - with nvtx.annotate("pre_process", color="green"): - self.pre_process(forward_meta) - hidden_states = self.embed_tokens(ids_remove_padding=ids_remove_padding, forward_meta=forward_meta) residual = None @@ -1104,60 +1092,6 @@ def forward( return out - # def pre_process(self, forward_meta) -> None: - # """ """ - - # seq_lens_encoder = forward_meta.seq_lens_encoder - # seq_lens_decoder = forward_meta.seq_lens_decoder - # seq_lens_this_time = forward_meta.seq_lens_this_time - - # current_total_tokens = forward_meta.ids_remove_padding.shape[0] - # position_ids = self.position_ids_buffer[:current_total_tokens] - # mask_encoder_batch = self.mask_encoder_batch_buffer[:current_total_tokens] - - # get_position_ids_and_mask_encoder_batch( - # seq_lens_encoder, - # seq_lens_decoder, - # seq_lens_this_time, - # position_ids, - # mask_encoder_batch, - # ) - - # block_size = self.fd_config.cache_config.block_size - # block_idx = position_ids // block_size # [num_tokens] - # block_ids = forward_meta.block_tables[forward_meta.batch_id_per_token, block_idx] # [num_tokens] - # block_offset = position_ids % block_size # [num_tokens] - # slot_mapping = (block_ids * block_size + block_offset).cast(paddle.int64) - - # forward_meta.position_ids = position_ids - # paddle.assign( - # slot_mapping, - # self.slot_mapping_buffer[:current_total_tokens], - # ) - # forward_meta.slot_mapping = self.slot_mapping_buffer[:current_total_tokens] - def pre_process(self, forward_meta) -> None: - """ """ - current_total_tokens = forward_meta.ids_remove_padding.shape[0] - position_ids = paddle.empty([current_total_tokens], dtype=paddle.int32) - mask_encoder_batch = paddle.empty([current_total_tokens, 1], dtype=paddle.int32) - - get_position_ids_and_mask_encoder_batch( - forward_meta.seq_lens_encoder, - forward_meta.seq_lens_decoder, - forward_meta.seq_lens_this_time, - position_ids, - mask_encoder_batch, - ) - - block_size = self.fd_config.cache_config.block_size - block_idx = position_ids // block_size # [num_tokens] - block_ids = forward_meta.block_tables[forward_meta.batch_id_per_token, block_idx] # [num_tokens] - block_offset = position_ids % block_size # [num_tokens] - slot_mapping = (block_ids * block_size + block_offset).cast(paddle.int64) - - forward_meta.position_ids = position_ids - forward_meta.slot_mapping = slot_mapping - @ModelRegistry.register_model_class( architecture="DeepseekV3ForCausalLM", diff --git a/fastdeploy/worker/gpu_model_runner.py b/fastdeploy/worker/gpu_model_runner.py index 7253c6f2cf0..1b6502dede0 100644 --- a/fastdeploy/worker/gpu_model_runner.py +++ b/fastdeploy/worker/gpu_model_runner.py @@ -79,6 +79,7 @@ speculate_schedule_cache, set_data_ipc, unset_data_ipc, + get_position_ids_and_mask_encoder_batch, ) import zmq @@ -1237,6 +1238,31 @@ def _prepare_inputs(self, cached_token_num=-1, cached_real_bsz=-1, is_dummy_or_p ) return token_num, token_num_event + def _compute_position_ids_and_slot_mapping(self) -> None: + """Compute position_ids and slot_mapping for KV cache addressing. + This is a general computation based on sequence length info and block tables, + applicable to all models that need per-token KV cache physical slot addresses. + Results are stored in self.forward_meta. + """ + current_total_tokens = self.forward_meta.ids_remove_padding.shape[0] + position_ids = self.share_inputs["position_ids_buffer"][:current_total_tokens] + mask_encoder_batch = self.share_inputs["mask_encoder_batch_buffer"][:current_total_tokens] + get_position_ids_and_mask_encoder_batch( + self.forward_meta.seq_lens_encoder, + self.forward_meta.seq_lens_decoder, + self.forward_meta.seq_lens_this_time, + position_ids, + mask_encoder_batch, + ) + block_size = self.cache_config.block_size + block_idx = position_ids // block_size # [num_tokens] + block_ids = self.forward_meta.block_tables[self.forward_meta.batch_id_per_token, block_idx] # [num_tokens] + block_offset = position_ids % block_size # [num_tokens] + slot_mapping = self.share_inputs["slot_mapping_buffer"][:current_total_tokens] + paddle.assign((block_ids * block_size + block_offset).cast(paddle.int64), slot_mapping) + self.forward_meta.position_ids = position_ids + self.forward_meta.slot_mapping = slot_mapping + def _process_reorder(self) -> None: if self.attn_backends and getattr(self.attn_backends[0], "enable_ids_reorder", False): self.share_inputs.enable_pd_reorder = True @@ -1830,6 +1856,8 @@ def _dummy_run( # 2. Padding inputs for cuda graph self.forward_meta.step_use_cudagraph = in_capturing and self.forward_meta.step_use_cudagraph self.padding_cudagraph_inputs() + # Compute position_ids and slot_mapping for KV cache addressing + self._compute_position_ids_and_slot_mapping() model_inputs = {} model_inputs["ids_remove_padding"] = self.share_inputs["ids_remove_padding"] @@ -2167,6 +2195,8 @@ def _preprocess( # Padding inputs for cuda graph self.padding_cudagraph_inputs() + # Compute position_ids and slot_mapping for KV cache addressing + self._compute_position_ids_and_slot_mapping() model_inputs = {} model_inputs["ids_remove_padding"] = self.share_inputs["ids_remove_padding"] diff --git a/fastdeploy/worker/input_batch.py b/fastdeploy/worker/input_batch.py index 55a3f39a2ee..d80763f43c0 100644 --- a/fastdeploy/worker/input_batch.py +++ b/fastdeploy/worker/input_batch.py @@ -188,6 +188,12 @@ def init_share_inputs(self): self.cu_seqlens_q = paddle.full([max_num_seqs + 1], 0, dtype="int32") self.cu_seqlens_k = paddle.full([max_num_seqs + 1], 0, dtype="int32") + # Initialize addressing buffers + _max_batched_tokens = self.scheduler_config.max_num_batched_tokens + self.position_ids_buffer = paddle.zeros([_max_batched_tokens], dtype=paddle.int32) + self.slot_mapping_buffer = paddle.zeros([_max_batched_tokens], dtype=paddle.int64) + self.mask_encoder_batch_buffer = paddle.empty([_max_batched_tokens, 1], dtype=paddle.int32) + # Declare AttentionBackend buffers self.decoder_batch_ids = None self.decoder_tile_ids_per_batch = None From fd9b282e7d85809c69beeec5b9aaf6a5b79cbbd9 Mon Sep 17 00:00:00 2001 From: ShaneGZhu <1092841848@qq.com> Date: Tue, 14 Apr 2026 20:42:23 +0800 Subject: [PATCH 05/12] =?UTF-8?q?=E5=88=A0=E9=99=A4=E6=B3=A8=E9=87=8A?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../model_executor/models/deepseek_v3.py | 22 ++++--------------- 1 file changed, 4 insertions(+), 18 deletions(-) diff --git a/fastdeploy/model_executor/models/deepseek_v3.py b/fastdeploy/model_executor/models/deepseek_v3.py index 4464474acf8..ab1010ade77 100644 --- a/fastdeploy/model_executor/models/deepseek_v3.py +++ b/fastdeploy/model_executor/models/deepseek_v3.py @@ -1059,17 +1059,6 @@ def __init__( prefix="deepseek_v3.norm", ) - # self.position_ids_buffer = paddle.empty( - # [fd_config.scheduler_config.max_num_batched_tokens], dtype=paddle.int32 - # ) - # self.mask_encoder_batch_buffer = paddle.empty( - # [fd_config.scheduler_config.max_num_batched_tokens, 1], dtype=paddle.int32 - # ) - # self.slot_mapping_buffer = paddle.empty( - # [fd_config.scheduler_config.max_num_batched_tokens], dtype=paddle.int64 - # ) - self.fd_config = fd_config - def forward( self, ids_remove_padding: paddle.Tensor, @@ -1234,13 +1223,10 @@ def forward( forward_meta: ForwardMeta, ): ids_remove_padding = inputs["ids_remove_padding"] - import nvtx - - with nvtx.annotate("model_forward", color="red"): - hidden_states = self.model( - ids_remove_padding=ids_remove_padding, - forward_meta=forward_meta, - ) + hidden_states = self.model( + ids_remove_padding=ids_remove_padding, + forward_meta=forward_meta, + ) return hidden_states def clear_grpah_opt_backend(self): From db33441e011624079f02f88d2d33b4a8bd9a858b Mon Sep 17 00:00:00 2001 From: ShaneGZhu <1092841848@qq.com> Date: Wed, 15 Apr 2026 17:48:32 +0800 Subject: [PATCH 06/12] Restrict usage to DeepSeek models only, and clean up the code. --- custom_ops/gpu_ops/cpp_extensions.cc | 10 +++--- ...get_position_ids_and_mask_encoder_batch.cu | 32 ++++++++----------- fastdeploy/worker/gpu_model_runner.py | 9 +++--- fastdeploy/worker/input_batch.py | 1 - 4 files changed, 22 insertions(+), 30 deletions(-) diff --git a/custom_ops/gpu_ops/cpp_extensions.cc b/custom_ops/gpu_ops/cpp_extensions.cc index 8e9cf6a3ddc..eecc3590595 100644 --- a/custom_ops/gpu_ops/cpp_extensions.cc +++ b/custom_ops/gpu_ops/cpp_extensions.cc @@ -540,12 +540,10 @@ std::vector count_tokens_per_expert_func( const paddle::Tensor& topk_ids, int64_t num_experts, bool compute_padded_cumsum = false); -void GetPositionIdsAndMaskEncoderBatch( - const paddle::Tensor& seq_lens_encoder, - const paddle::Tensor& seq_lens_decoder, - const paddle::Tensor& seq_lens_this_time, - const paddle::Tensor& position_ids, - const paddle::Tensor& mask_encoder_batch); +void GetPositionIdsAndMaskEncoderBatch(const paddle::Tensor& seq_lens_encoder, + const paddle::Tensor& seq_lens_decoder, + const paddle::Tensor& seq_lens_this_time, + const paddle::Tensor& position_ids); std::vector DecodeMLAWriteCacheKernel( const paddle::Tensor& kv_nope, diff --git a/custom_ops/gpu_ops/get_position_ids_and_mask_encoder_batch.cu b/custom_ops/gpu_ops/get_position_ids_and_mask_encoder_batch.cu index 946c9754072..63bc77c9afc 100644 --- a/custom_ops/gpu_ops/get_position_ids_and_mask_encoder_batch.cu +++ b/custom_ops/gpu_ops/get_position_ids_and_mask_encoder_batch.cu @@ -20,8 +20,7 @@ __global__ void GetPositionIdsAndMaskEncoderBatchKernel( const int* seq_lens_decoder, // [bsz] 每个批次的 decoder 长度 const int* seq_lens_this_time, int* position_ids, // 输出的一维 position_ids - int* mask_encoder_batch, - const int bsz) { // 批次大小 + const int bsz) { // 批次大小 // 当前线程索引(每个线程对应一个批次) int tid = threadIdx.x; if (tid >= bsz) return; @@ -43,7 +42,6 @@ __global__ void GetPositionIdsAndMaskEncoderBatchKernel( // 写入 encoder 的 position_ids for (int i = 0; i < encoder_len; i++) { position_ids[offset + i] = i; - mask_encoder_batch[offset + i] = 1; } offset += encoder_len; @@ -51,17 +49,14 @@ __global__ void GetPositionIdsAndMaskEncoderBatchKernel( if (decoder_len > 0) { for (int i = 0; i < seq_len_this_time; i++) { position_ids[offset + i] = decoder_len + i; // 使用 decoder 长度本身 - mask_encoder_batch[offset + i] = 0; } } } -void GetPositionIdsAndMaskEncoderBatch( - const paddle::Tensor& seq_lens_encoder, - const paddle::Tensor& seq_lens_decoder, - const paddle::Tensor& seq_lens_this_time, - const paddle::Tensor& position_ids, - const paddle::Tensor& mask_encoder_batch) { +void GetPositionIdsAndMaskEncoderBatch(const paddle::Tensor& seq_lens_encoder, + const paddle::Tensor& seq_lens_decoder, + const paddle::Tensor& seq_lens_this_time, + const paddle::Tensor& position_ids) { const int bsz = seq_lens_this_time.shape()[0]; GetPositionIdsAndMaskEncoderBatchKernel<<<1, bsz, 0, position_ids.stream()>>>( @@ -69,17 +64,16 @@ void GetPositionIdsAndMaskEncoderBatch( seq_lens_decoder.data(), seq_lens_this_time.data(), const_cast(position_ids.data()), - const_cast(mask_encoder_batch.data()), bsz); } PD_BUILD_STATIC_OP(get_position_ids_and_mask_encoder_batch) - .Inputs({"seq_lens_encoder", - "seq_lens_decoder", - "seq_lens_this_time", - "position_ids", - "mask_encoder_batch"}) - .Outputs({"position_ids_out", "mask_encoder_batch_out"}) - .SetInplaceMap({{"position_ids", "position_ids_out"}, - {"mask_encoder_batch", "mask_encoder_batch_out"}}) + .Inputs({ + "seq_lens_encoder", + "seq_lens_decoder", + "seq_lens_this_time", + "position_ids", + }) + .Outputs({"position_ids_out"}) + .SetInplaceMap({{"position_ids", "position_ids_out"}}) .SetKernelFn(PD_KERNEL(GetPositionIdsAndMaskEncoderBatch)); diff --git a/fastdeploy/worker/gpu_model_runner.py b/fastdeploy/worker/gpu_model_runner.py index 1b6502dede0..6f2e48a1f73 100644 --- a/fastdeploy/worker/gpu_model_runner.py +++ b/fastdeploy/worker/gpu_model_runner.py @@ -1244,15 +1244,16 @@ def _compute_position_ids_and_slot_mapping(self) -> None: applicable to all models that need per-token KV cache physical slot addresses. Results are stored in self.forward_meta. """ + # NOTE(zhushengguang): Currently, only DeepSeek is supported. + if "Deepseek" not in str(self.model_config.architectures): + return current_total_tokens = self.forward_meta.ids_remove_padding.shape[0] position_ids = self.share_inputs["position_ids_buffer"][:current_total_tokens] - mask_encoder_batch = self.share_inputs["mask_encoder_batch_buffer"][:current_total_tokens] get_position_ids_and_mask_encoder_batch( self.forward_meta.seq_lens_encoder, self.forward_meta.seq_lens_decoder, self.forward_meta.seq_lens_this_time, position_ids, - mask_encoder_batch, ) block_size = self.cache_config.block_size block_idx = position_ids // block_size # [num_tokens] @@ -1856,7 +1857,7 @@ def _dummy_run( # 2. Padding inputs for cuda graph self.forward_meta.step_use_cudagraph = in_capturing and self.forward_meta.step_use_cudagraph self.padding_cudagraph_inputs() - # Compute position_ids and slot_mapping for KV cache addressing + # Compute position_ids and slot_mapping self._compute_position_ids_and_slot_mapping() model_inputs = {} @@ -2195,7 +2196,7 @@ def _preprocess( # Padding inputs for cuda graph self.padding_cudagraph_inputs() - # Compute position_ids and slot_mapping for KV cache addressing + # Compute position_ids and slot_mapping self._compute_position_ids_and_slot_mapping() model_inputs = {} diff --git a/fastdeploy/worker/input_batch.py b/fastdeploy/worker/input_batch.py index d80763f43c0..f47c7bccc6d 100644 --- a/fastdeploy/worker/input_batch.py +++ b/fastdeploy/worker/input_batch.py @@ -192,7 +192,6 @@ def init_share_inputs(self): _max_batched_tokens = self.scheduler_config.max_num_batched_tokens self.position_ids_buffer = paddle.zeros([_max_batched_tokens], dtype=paddle.int32) self.slot_mapping_buffer = paddle.zeros([_max_batched_tokens], dtype=paddle.int64) - self.mask_encoder_batch_buffer = paddle.empty([_max_batched_tokens, 1], dtype=paddle.int32) # Declare AttentionBackend buffers self.decoder_batch_ids = None From 90ee366b2849c5a1adeac6dd784a5c3117cce6ba Mon Sep 17 00:00:00 2001 From: ShaneGZhu <1092841848@qq.com> Date: Wed, 15 Apr 2026 18:25:27 +0800 Subject: [PATCH 07/12] clean code --- fastdeploy/model_executor/forward_meta.py | 1 - fastdeploy/worker/gpu_model_runner.py | 1 + 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/fastdeploy/model_executor/forward_meta.py b/fastdeploy/model_executor/forward_meta.py index fa0dadb0cfc..4efbc06ecc0 100644 --- a/fastdeploy/model_executor/forward_meta.py +++ b/fastdeploy/model_executor/forward_meta.py @@ -160,7 +160,6 @@ class ForwardMeta: # for mla & dsa position_ids: Optional[paddle.Tensor] = None - mask_encoder_batch: Optional[paddle.Tensor] = None # for kvcache slot slot_mapping: Optional[paddle.Tensor] = None diff --git a/fastdeploy/worker/gpu_model_runner.py b/fastdeploy/worker/gpu_model_runner.py index 36b2f791218..dac930d2581 100644 --- a/fastdeploy/worker/gpu_model_runner.py +++ b/fastdeploy/worker/gpu_model_runner.py @@ -1287,6 +1287,7 @@ def _compute_position_ids_and_slot_mapping(self) -> None: ) block_size = self.cache_config.block_size block_idx = position_ids // block_size # [num_tokens] + assert self.forward_meta.batch_id_per_token.shape == block_idx.shape block_ids = self.forward_meta.block_tables[self.forward_meta.batch_id_per_token, block_idx] # [num_tokens] block_offset = position_ids % block_size # [num_tokens] slot_mapping = self.share_inputs["slot_mapping_buffer"][:current_total_tokens] From be243e5d2dcadbfeceeb5d8097cdfc46a3be5401 Mon Sep 17 00:00:00 2001 From: ShaneGZhu <1092841848@qq.com> Date: Wed, 15 Apr 2026 20:07:46 +0800 Subject: [PATCH 08/12] Compatible with backends utilizing DSA and MLA, such as GLM. --- fastdeploy/worker/gpu_model_runner.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/fastdeploy/worker/gpu_model_runner.py b/fastdeploy/worker/gpu_model_runner.py index dac930d2581..f7490c63f63 100644 --- a/fastdeploy/worker/gpu_model_runner.py +++ b/fastdeploy/worker/gpu_model_runner.py @@ -45,6 +45,12 @@ from fastdeploy.model_executor.layers.attention.base_attention_backend import ( AttentionBackend, ) +from fastdeploy.model_executor.layers.attention.dsa_attention_backend import ( + DSAAttentionBackend, +) +from fastdeploy.model_executor.layers.attention.mla_attention_backend import ( + MLAAttentionBackend, +) from fastdeploy.model_executor.layers.moe.routing_indices_cache import ( RoutingReplayManager, ) @@ -1274,8 +1280,8 @@ def _compute_position_ids_and_slot_mapping(self) -> None: applicable to all models that need per-token KV cache physical slot addresses. Results are stored in self.forward_meta. """ - # NOTE(zhushengguang): Currently, only DeepSeek is supported. - if "Deepseek" not in str(self.model_config.architectures): + # NOTE(zhushengguang): Only support MLAAttentionBackend and DSAAttentionBackend currently. + if not isinstance(self.attn_backends[0], (MLAAttentionBackend, DSAAttentionBackend)): return current_total_tokens = self.forward_meta.ids_remove_padding.shape[0] position_ids = self.share_inputs["position_ids_buffer"][:current_total_tokens] From 20cfaa4c40789a9a2afd8476768f72de34d99ba8 Mon Sep 17 00:00:00 2001 From: ShaneGZhu <1092841848@qq.com> Date: Thu, 16 Apr 2026 14:23:04 +0800 Subject: [PATCH 09/12] Fix test file about 'ge_postion_ids_and_mask_encoder_batch ' ops --- .../test_pre_process_slot_mapping.py | 324 ++++++++++++++++++ 1 file changed, 324 insertions(+) create mode 100644 tests/operators/test_pre_process_slot_mapping.py diff --git a/tests/operators/test_pre_process_slot_mapping.py b/tests/operators/test_pre_process_slot_mapping.py new file mode 100644 index 00000000000..c837b563716 --- /dev/null +++ b/tests/operators/test_pre_process_slot_mapping.py @@ -0,0 +1,324 @@ +# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +验证 DeepseekV3ForCausalLM.pre_process 中计算的 slot_mapping +与独立辅助函数 compute_slot_mapping 的结果一致。 + +测试策略: + pre_process 的 slot_mapping 计算逻辑等价于: + block_idx = position_ids // block_size + block_ids = block_tables[batch_id_per_token, block_idx] + block_offset = position_ids % block_size + slot_mapping = (block_ids * block_size + block_offset).cast(int64) + + compute_slot_mapping 封装了完全相同的公式。 + 因此,只需用相同的 position_ids / block_tables / batch_id_per_token 分别调用 + 两段逻辑,断言结果相等即可。 + + 为了获得与 pre_process 完全一致的 position_ids,测试直接调用 + get_position_ids_and_mask_encoder_batch(pre_process 内部也调用它), + 而不需要实例化整个模型。 +""" + +import unittest + +import numpy as np +import paddle + +from fastdeploy.model_executor.ops.gpu import get_position_ids_and_mask_encoder_batch + +# --------------------------------------------------------------------------- +# 被测辅助函数(与用户提供的代码完全一致) +# --------------------------------------------------------------------------- + + +def compute_slot_mapping( + block_tables: paddle.Tensor, # [num_reqs, max_blocks_per_req] + positions: paddle.Tensor, # [num_tokens] + batch_id_per_token: paddle.Tensor, # [num_tokens] + block_size: int, +) -> paddle.Tensor: + """ + 计算 slot_mapping + + 公式: slot = block_id * block_size + offset_in_block + """ + block_idx = positions // block_size + block_ids = block_tables[batch_id_per_token, block_idx] + block_offset = positions % block_size + slot_mapping = block_ids * block_size + block_offset + return slot_mapping.cast(paddle.int64) + + +# --------------------------------------------------------------------------- +# 与 pre_process 中相同的内联 slot_mapping 计算逻辑(抽成函数便于复用) +# --------------------------------------------------------------------------- + + +def _pre_process_slot_mapping( + block_tables: paddle.Tensor, + batch_id_per_token: paddle.Tensor, + position_ids: paddle.Tensor, + block_size: int, +) -> paddle.Tensor: + """复刻 pre_process 中的 slot_mapping 计算,不依赖模型对象。""" + block_idx = position_ids // block_size + block_ids = block_tables[batch_id_per_token, block_idx] + block_offset = position_ids % block_size + return (block_ids * block_size + block_offset).cast(paddle.int64) + + +# --------------------------------------------------------------------------- +# 测试工具函数 +# --------------------------------------------------------------------------- + + +def _build_position_ids( + seq_lens_encoder: paddle.Tensor, + seq_lens_decoder: paddle.Tensor, + seq_lens_this_time: paddle.Tensor, +) -> paddle.Tensor: + """调用 custom op 得到 position_ids(与 pre_process 完全一致的路径)。""" + total_len = int(seq_lens_encoder.numpy().sum() + seq_lens_this_time.numpy().sum()) + position_ids = paddle.zeros([total_len], dtype="int32") + get_position_ids_and_mask_encoder_batch( + seq_lens_encoder, + seq_lens_decoder, + seq_lens_this_time, + position_ids, + ) + return position_ids + + +def _build_batch_id_per_token( + seq_lens_encoder: paddle.Tensor, + seq_lens_this_time: paddle.Tensor, +) -> paddle.Tensor: + """根据 encoder 序列长度和本次处理长度构建 batch_id_per_token。""" + enc = seq_lens_encoder.numpy().tolist() + dec_this = seq_lens_this_time.numpy().tolist() + batch_ids = [] + for bid, (e, d) in enumerate(zip(enc, dec_this)): + batch_ids.extend([bid] * (e + d)) + return paddle.to_tensor(batch_ids, dtype="int32") + + +# --------------------------------------------------------------------------- +# 单测类 +# --------------------------------------------------------------------------- + + +class TestPreProcessSlotMappingConsistency(unittest.TestCase): + """验证 pre_process 内联逻辑与 compute_slot_mapping 的一致性。""" + + def setUp(self): + paddle.set_device("gpu") + + def _assert_slot_mapping_equal( + self, + seq_lens_encoder, + seq_lens_decoder, + seq_lens_this_time, + block_tables, + block_size, + test_name="", + ): + """通用断言:两种计算路径的 slot_mapping 完全相等。""" + position_ids = _build_position_ids(seq_lens_encoder, seq_lens_decoder, seq_lens_this_time) + batch_id_per_token = _build_batch_id_per_token(seq_lens_encoder, seq_lens_this_time) + + # pre_process 内联逻辑 + ref = _pre_process_slot_mapping(block_tables, batch_id_per_token, position_ids, block_size) + + # compute_slot_mapping + got = compute_slot_mapping(block_tables, position_ids, batch_id_per_token, block_size) + + np.testing.assert_array_equal( + ref.numpy(), + got.numpy(), + err_msg=f"[{test_name}] slot_mapping mismatch", + ) + + # ------------------------------------------------------------------ + # case 1: 纯 prefill(batch_size=2,无 decode) + # ------------------------------------------------------------------ + def test_pure_prefill(self): + """两条请求均处于 prefill 阶段。""" + block_size = 4 + seq_lens_encoder = paddle.to_tensor([0, 0], dtype="int32") + seq_lens_decoder = paddle.to_tensor([0, 0], dtype="int32") + seq_lens_this_time = paddle.to_tensor([3, 5], dtype="int32") + + # block_tables: [2 reqs, 4 blocks each],随机填充合法 block id + block_tables = paddle.to_tensor([[10, 11, 12, 13], [20, 21, 22, 23]], dtype="int32") + + self._assert_slot_mapping_equal( + seq_lens_encoder, + seq_lens_decoder, + seq_lens_this_time, + block_tables, + block_size, + test_name="pure_prefill", + ) + + # ------------------------------------------------------------------ + # case 2: 纯 decode(每条请求本次只处理 1 个 token) + # ------------------------------------------------------------------ + def test_pure_decode(self): + """两条请求均处于 decode 阶段,seq_lens_decoder 反映历史已填充长度。""" + block_size = 8 + seq_lens_encoder = paddle.to_tensor([0, 0], dtype="int32") + # decode 历史已填充:req0=7 tokens,req1=15 tokens + seq_lens_decoder = paddle.to_tensor([7, 15], dtype="int32") + seq_lens_this_time = paddle.to_tensor([1, 1], dtype="int32") + + block_tables = paddle.to_tensor([[5, 6, 0, 0], [30, 31, 0, 0]], dtype="int32") + + self._assert_slot_mapping_equal( + seq_lens_encoder, + seq_lens_decoder, + seq_lens_this_time, + block_tables, + block_size, + test_name="pure_decode", + ) + + # ------------------------------------------------------------------ + # case 3: 混合(prefill + decode)batch + # ------------------------------------------------------------------ + def test_mixed_prefill_decode(self): + """batch 中同时包含 prefill 和 decode 请求(MIXED 模式)。""" + block_size = 4 + # req0: prefill(encoder=3, this_time=3) + # req1: decode(decoder=5, this_time=1) + seq_lens_encoder = paddle.to_tensor([3, 0], dtype="int32") + seq_lens_decoder = paddle.to_tensor([1, 5], dtype="int32") + seq_lens_this_time = paddle.to_tensor([1, 1], dtype="int32") + + block_tables = paddle.to_tensor([[100, 101, 102, 103], [200, 201, 202, 203]], dtype="int32") + + self._assert_slot_mapping_equal( + seq_lens_encoder, + seq_lens_decoder, + seq_lens_this_time, + block_tables, + block_size, + test_name="mixed_prefill_decode", + ) + + # ------------------------------------------------------------------ + # case 4: block_size=1(每个 block 仅容纳 1 个 token,边界条件) + # ------------------------------------------------------------------ + def test_block_size_one(self): + """block_size=1 时 block_offset 恒为 0,slot == block_id。""" + block_size = 1 + seq_lens_encoder = paddle.to_tensor([0, 0], dtype="int32") + seq_lens_decoder = paddle.to_tensor([0, 0], dtype="int32") + seq_lens_this_time = paddle.to_tensor([4, 3], dtype="int32") + + # 每个 token 占一个 block + block_tables = paddle.to_tensor( + [[0, 1, 2, 3, 4, 5, 6, 7], [8, 9, 10, 11, 12, 13, 14, 15]], + dtype="int32", + ) + + self._assert_slot_mapping_equal( + seq_lens_encoder, + seq_lens_decoder, + seq_lens_this_time, + block_tables, + block_size, + test_name="block_size_one", + ) + + # ------------------------------------------------------------------ + # case 5: 较大 batch_size,验证批量正确性 + # ------------------------------------------------------------------ + def test_large_batch(self): + """batch_size=8,混合 prefill 和 decode,较多 token。""" + block_size = 16 + bsz = 8 + np.random.seed(0) + + enc_lens = np.random.randint(0, 32, size=bsz).astype(np.int32) + dec_lens = np.random.randint(0, 16, size=bsz).astype(np.int32) + this_lens = np.random.randint(1, 8, size=bsz).astype(np.int32) + + # 确保 decoder 历史不超过 block_tables 能索引的范围 + max_blocks = 8 + block_tables_np = np.random.randint(0, 1024, size=(bsz, max_blocks), dtype=np.int32) + + # 检查 this_lens 对应的最大位置不超过 max_blocks * block_size + total_pos = enc_lens + dec_lens + this_lens - 1 + cap = max_blocks * block_size - 1 + # 超出容量的请求截断到安全范围 + this_lens = np.where(total_pos > cap, np.maximum(1, cap - enc_lens - dec_lens + 1), this_lens) + this_lens = np.maximum(this_lens, 1).astype(np.int32) + + seq_lens_encoder = paddle.to_tensor(enc_lens) + seq_lens_decoder = paddle.to_tensor(dec_lens) + seq_lens_this_time = paddle.to_tensor(this_lens) + block_tables = paddle.to_tensor(block_tables_np) + + self._assert_slot_mapping_equal( + seq_lens_encoder, + seq_lens_decoder, + seq_lens_this_time, + block_tables, + block_size, + test_name="large_batch", + ) + + # ------------------------------------------------------------------ + # case 6: 单请求,精确验证数值 + # ------------------------------------------------------------------ + def test_single_request_exact_values(self): + """ + 单请求 prefill,block_size=4,seq_len=6。 + 手工推导期望值并验证。 + + seq_lens_encoder=0, seq_lens_decoder=0, seq_lens_this_time=6 + => position_ids = [0,1,2,3,4,5] + block_tables[0] = [10, 20, 30, ...] + + slot[i] = block_tables[0, pos//4] * 4 + pos%4 + pos=0 -> block=10, off=0 -> slot=40 + pos=1 -> block=10, off=1 -> slot=41 + pos=2 -> block=10, off=2 -> slot=42 + pos=3 -> block=10, off=3 -> slot=43 + pos=4 -> block=20, off=0 -> slot=80 + pos=5 -> block=20, off=1 -> slot=81 + """ + block_size = 4 + seq_lens_encoder = paddle.to_tensor([0], dtype="int32") + seq_lens_decoder = paddle.to_tensor([0], dtype="int32") + seq_lens_this_time = paddle.to_tensor([6], dtype="int32") + block_tables = paddle.to_tensor([[10, 20, 30, 40]], dtype="int32") + + position_ids = _build_position_ids(seq_lens_encoder, seq_lens_decoder, seq_lens_this_time) + batch_id_per_token = _build_batch_id_per_token(seq_lens_encoder, seq_lens_this_time) + + expected = np.array([40, 41, 42, 43, 80, 81], dtype=np.int64) + + ref = _pre_process_slot_mapping(block_tables, batch_id_per_token, position_ids, block_size) + got = compute_slot_mapping(block_tables, position_ids, batch_id_per_token, block_size) + + np.testing.assert_array_equal(ref.numpy(), expected, err_msg="pre_process mismatch expected") + np.testing.assert_array_equal(got.numpy(), expected, err_msg="compute_slot_mapping mismatch expected") + np.testing.assert_array_equal(ref.numpy(), got.numpy(), err_msg="two paths mismatch") + + +if __name__ == "__main__": + unittest.main() From 76de51a66f119bf28f86d74907fb8383efba74dd Mon Sep 17 00:00:00 2001 From: ShaneGZhu <1092841848@qq.com> Date: Thu, 16 Apr 2026 14:26:42 +0800 Subject: [PATCH 10/12] Revert "Fix test file about 'ge_postion_ids_and_mask_encoder_batch" This reverts commit 20cfaa4c40789a9a2afd8476768f72de34d99ba8. --- .../test_pre_process_slot_mapping.py | 324 ------------------ 1 file changed, 324 deletions(-) delete mode 100644 tests/operators/test_pre_process_slot_mapping.py diff --git a/tests/operators/test_pre_process_slot_mapping.py b/tests/operators/test_pre_process_slot_mapping.py deleted file mode 100644 index c837b563716..00000000000 --- a/tests/operators/test_pre_process_slot_mapping.py +++ /dev/null @@ -1,324 +0,0 @@ -# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -验证 DeepseekV3ForCausalLM.pre_process 中计算的 slot_mapping -与独立辅助函数 compute_slot_mapping 的结果一致。 - -测试策略: - pre_process 的 slot_mapping 计算逻辑等价于: - block_idx = position_ids // block_size - block_ids = block_tables[batch_id_per_token, block_idx] - block_offset = position_ids % block_size - slot_mapping = (block_ids * block_size + block_offset).cast(int64) - - compute_slot_mapping 封装了完全相同的公式。 - 因此,只需用相同的 position_ids / block_tables / batch_id_per_token 分别调用 - 两段逻辑,断言结果相等即可。 - - 为了获得与 pre_process 完全一致的 position_ids,测试直接调用 - get_position_ids_and_mask_encoder_batch(pre_process 内部也调用它), - 而不需要实例化整个模型。 -""" - -import unittest - -import numpy as np -import paddle - -from fastdeploy.model_executor.ops.gpu import get_position_ids_and_mask_encoder_batch - -# --------------------------------------------------------------------------- -# 被测辅助函数(与用户提供的代码完全一致) -# --------------------------------------------------------------------------- - - -def compute_slot_mapping( - block_tables: paddle.Tensor, # [num_reqs, max_blocks_per_req] - positions: paddle.Tensor, # [num_tokens] - batch_id_per_token: paddle.Tensor, # [num_tokens] - block_size: int, -) -> paddle.Tensor: - """ - 计算 slot_mapping - - 公式: slot = block_id * block_size + offset_in_block - """ - block_idx = positions // block_size - block_ids = block_tables[batch_id_per_token, block_idx] - block_offset = positions % block_size - slot_mapping = block_ids * block_size + block_offset - return slot_mapping.cast(paddle.int64) - - -# --------------------------------------------------------------------------- -# 与 pre_process 中相同的内联 slot_mapping 计算逻辑(抽成函数便于复用) -# --------------------------------------------------------------------------- - - -def _pre_process_slot_mapping( - block_tables: paddle.Tensor, - batch_id_per_token: paddle.Tensor, - position_ids: paddle.Tensor, - block_size: int, -) -> paddle.Tensor: - """复刻 pre_process 中的 slot_mapping 计算,不依赖模型对象。""" - block_idx = position_ids // block_size - block_ids = block_tables[batch_id_per_token, block_idx] - block_offset = position_ids % block_size - return (block_ids * block_size + block_offset).cast(paddle.int64) - - -# --------------------------------------------------------------------------- -# 测试工具函数 -# --------------------------------------------------------------------------- - - -def _build_position_ids( - seq_lens_encoder: paddle.Tensor, - seq_lens_decoder: paddle.Tensor, - seq_lens_this_time: paddle.Tensor, -) -> paddle.Tensor: - """调用 custom op 得到 position_ids(与 pre_process 完全一致的路径)。""" - total_len = int(seq_lens_encoder.numpy().sum() + seq_lens_this_time.numpy().sum()) - position_ids = paddle.zeros([total_len], dtype="int32") - get_position_ids_and_mask_encoder_batch( - seq_lens_encoder, - seq_lens_decoder, - seq_lens_this_time, - position_ids, - ) - return position_ids - - -def _build_batch_id_per_token( - seq_lens_encoder: paddle.Tensor, - seq_lens_this_time: paddle.Tensor, -) -> paddle.Tensor: - """根据 encoder 序列长度和本次处理长度构建 batch_id_per_token。""" - enc = seq_lens_encoder.numpy().tolist() - dec_this = seq_lens_this_time.numpy().tolist() - batch_ids = [] - for bid, (e, d) in enumerate(zip(enc, dec_this)): - batch_ids.extend([bid] * (e + d)) - return paddle.to_tensor(batch_ids, dtype="int32") - - -# --------------------------------------------------------------------------- -# 单测类 -# --------------------------------------------------------------------------- - - -class TestPreProcessSlotMappingConsistency(unittest.TestCase): - """验证 pre_process 内联逻辑与 compute_slot_mapping 的一致性。""" - - def setUp(self): - paddle.set_device("gpu") - - def _assert_slot_mapping_equal( - self, - seq_lens_encoder, - seq_lens_decoder, - seq_lens_this_time, - block_tables, - block_size, - test_name="", - ): - """通用断言:两种计算路径的 slot_mapping 完全相等。""" - position_ids = _build_position_ids(seq_lens_encoder, seq_lens_decoder, seq_lens_this_time) - batch_id_per_token = _build_batch_id_per_token(seq_lens_encoder, seq_lens_this_time) - - # pre_process 内联逻辑 - ref = _pre_process_slot_mapping(block_tables, batch_id_per_token, position_ids, block_size) - - # compute_slot_mapping - got = compute_slot_mapping(block_tables, position_ids, batch_id_per_token, block_size) - - np.testing.assert_array_equal( - ref.numpy(), - got.numpy(), - err_msg=f"[{test_name}] slot_mapping mismatch", - ) - - # ------------------------------------------------------------------ - # case 1: 纯 prefill(batch_size=2,无 decode) - # ------------------------------------------------------------------ - def test_pure_prefill(self): - """两条请求均处于 prefill 阶段。""" - block_size = 4 - seq_lens_encoder = paddle.to_tensor([0, 0], dtype="int32") - seq_lens_decoder = paddle.to_tensor([0, 0], dtype="int32") - seq_lens_this_time = paddle.to_tensor([3, 5], dtype="int32") - - # block_tables: [2 reqs, 4 blocks each],随机填充合法 block id - block_tables = paddle.to_tensor([[10, 11, 12, 13], [20, 21, 22, 23]], dtype="int32") - - self._assert_slot_mapping_equal( - seq_lens_encoder, - seq_lens_decoder, - seq_lens_this_time, - block_tables, - block_size, - test_name="pure_prefill", - ) - - # ------------------------------------------------------------------ - # case 2: 纯 decode(每条请求本次只处理 1 个 token) - # ------------------------------------------------------------------ - def test_pure_decode(self): - """两条请求均处于 decode 阶段,seq_lens_decoder 反映历史已填充长度。""" - block_size = 8 - seq_lens_encoder = paddle.to_tensor([0, 0], dtype="int32") - # decode 历史已填充:req0=7 tokens,req1=15 tokens - seq_lens_decoder = paddle.to_tensor([7, 15], dtype="int32") - seq_lens_this_time = paddle.to_tensor([1, 1], dtype="int32") - - block_tables = paddle.to_tensor([[5, 6, 0, 0], [30, 31, 0, 0]], dtype="int32") - - self._assert_slot_mapping_equal( - seq_lens_encoder, - seq_lens_decoder, - seq_lens_this_time, - block_tables, - block_size, - test_name="pure_decode", - ) - - # ------------------------------------------------------------------ - # case 3: 混合(prefill + decode)batch - # ------------------------------------------------------------------ - def test_mixed_prefill_decode(self): - """batch 中同时包含 prefill 和 decode 请求(MIXED 模式)。""" - block_size = 4 - # req0: prefill(encoder=3, this_time=3) - # req1: decode(decoder=5, this_time=1) - seq_lens_encoder = paddle.to_tensor([3, 0], dtype="int32") - seq_lens_decoder = paddle.to_tensor([1, 5], dtype="int32") - seq_lens_this_time = paddle.to_tensor([1, 1], dtype="int32") - - block_tables = paddle.to_tensor([[100, 101, 102, 103], [200, 201, 202, 203]], dtype="int32") - - self._assert_slot_mapping_equal( - seq_lens_encoder, - seq_lens_decoder, - seq_lens_this_time, - block_tables, - block_size, - test_name="mixed_prefill_decode", - ) - - # ------------------------------------------------------------------ - # case 4: block_size=1(每个 block 仅容纳 1 个 token,边界条件) - # ------------------------------------------------------------------ - def test_block_size_one(self): - """block_size=1 时 block_offset 恒为 0,slot == block_id。""" - block_size = 1 - seq_lens_encoder = paddle.to_tensor([0, 0], dtype="int32") - seq_lens_decoder = paddle.to_tensor([0, 0], dtype="int32") - seq_lens_this_time = paddle.to_tensor([4, 3], dtype="int32") - - # 每个 token 占一个 block - block_tables = paddle.to_tensor( - [[0, 1, 2, 3, 4, 5, 6, 7], [8, 9, 10, 11, 12, 13, 14, 15]], - dtype="int32", - ) - - self._assert_slot_mapping_equal( - seq_lens_encoder, - seq_lens_decoder, - seq_lens_this_time, - block_tables, - block_size, - test_name="block_size_one", - ) - - # ------------------------------------------------------------------ - # case 5: 较大 batch_size,验证批量正确性 - # ------------------------------------------------------------------ - def test_large_batch(self): - """batch_size=8,混合 prefill 和 decode,较多 token。""" - block_size = 16 - bsz = 8 - np.random.seed(0) - - enc_lens = np.random.randint(0, 32, size=bsz).astype(np.int32) - dec_lens = np.random.randint(0, 16, size=bsz).astype(np.int32) - this_lens = np.random.randint(1, 8, size=bsz).astype(np.int32) - - # 确保 decoder 历史不超过 block_tables 能索引的范围 - max_blocks = 8 - block_tables_np = np.random.randint(0, 1024, size=(bsz, max_blocks), dtype=np.int32) - - # 检查 this_lens 对应的最大位置不超过 max_blocks * block_size - total_pos = enc_lens + dec_lens + this_lens - 1 - cap = max_blocks * block_size - 1 - # 超出容量的请求截断到安全范围 - this_lens = np.where(total_pos > cap, np.maximum(1, cap - enc_lens - dec_lens + 1), this_lens) - this_lens = np.maximum(this_lens, 1).astype(np.int32) - - seq_lens_encoder = paddle.to_tensor(enc_lens) - seq_lens_decoder = paddle.to_tensor(dec_lens) - seq_lens_this_time = paddle.to_tensor(this_lens) - block_tables = paddle.to_tensor(block_tables_np) - - self._assert_slot_mapping_equal( - seq_lens_encoder, - seq_lens_decoder, - seq_lens_this_time, - block_tables, - block_size, - test_name="large_batch", - ) - - # ------------------------------------------------------------------ - # case 6: 单请求,精确验证数值 - # ------------------------------------------------------------------ - def test_single_request_exact_values(self): - """ - 单请求 prefill,block_size=4,seq_len=6。 - 手工推导期望值并验证。 - - seq_lens_encoder=0, seq_lens_decoder=0, seq_lens_this_time=6 - => position_ids = [0,1,2,3,4,5] - block_tables[0] = [10, 20, 30, ...] - - slot[i] = block_tables[0, pos//4] * 4 + pos%4 - pos=0 -> block=10, off=0 -> slot=40 - pos=1 -> block=10, off=1 -> slot=41 - pos=2 -> block=10, off=2 -> slot=42 - pos=3 -> block=10, off=3 -> slot=43 - pos=4 -> block=20, off=0 -> slot=80 - pos=5 -> block=20, off=1 -> slot=81 - """ - block_size = 4 - seq_lens_encoder = paddle.to_tensor([0], dtype="int32") - seq_lens_decoder = paddle.to_tensor([0], dtype="int32") - seq_lens_this_time = paddle.to_tensor([6], dtype="int32") - block_tables = paddle.to_tensor([[10, 20, 30, 40]], dtype="int32") - - position_ids = _build_position_ids(seq_lens_encoder, seq_lens_decoder, seq_lens_this_time) - batch_id_per_token = _build_batch_id_per_token(seq_lens_encoder, seq_lens_this_time) - - expected = np.array([40, 41, 42, 43, 80, 81], dtype=np.int64) - - ref = _pre_process_slot_mapping(block_tables, batch_id_per_token, position_ids, block_size) - got = compute_slot_mapping(block_tables, position_ids, batch_id_per_token, block_size) - - np.testing.assert_array_equal(ref.numpy(), expected, err_msg="pre_process mismatch expected") - np.testing.assert_array_equal(got.numpy(), expected, err_msg="compute_slot_mapping mismatch expected") - np.testing.assert_array_equal(ref.numpy(), got.numpy(), err_msg="two paths mismatch") - - -if __name__ == "__main__": - unittest.main() From 00e013f1040922c77ab482f8d0d073c573651fe8 Mon Sep 17 00:00:00 2001 From: ShaneGZhu <1092841848@qq.com> Date: Thu, 16 Apr 2026 14:34:25 +0800 Subject: [PATCH 11/12] Fix test file --- ...st_get_position_ids_and_mask_encoder_batch.py | 16 ++-------------- 1 file changed, 2 insertions(+), 14 deletions(-) diff --git a/tests/operators/test_get_position_ids_and_mask_encoder_batch.py b/tests/operators/test_get_position_ids_and_mask_encoder_batch.py index 41474b4726c..2d1dd8e2f7c 100644 --- a/tests/operators/test_get_position_ids_and_mask_encoder_batch.py +++ b/tests/operators/test_get_position_ids_and_mask_encoder_batch.py @@ -33,24 +33,17 @@ def test_basic_functionality(self): total_len = int(seq_lens_encoder.numpy().sum() + seq_lens_this_time.numpy().sum()) position_ids = paddle.zeros([total_len], dtype="int32") - mask_encoder_batch = paddle.zeros([total_len], dtype="int32") # Call the custom operator - get_position_ids_and_mask_encoder_batch( - seq_lens_encoder, seq_lens_decoder, seq_lens_this_time, position_ids, mask_encoder_batch - ) + get_position_ids_and_mask_encoder_batch(seq_lens_encoder, seq_lens_decoder, seq_lens_this_time, position_ids) expected_position_ids = np.array([0, 1, 2, 1, 0, 1, 2, 3], dtype=np.int32) - expected_mask = np.array([1, 1, 1, 0, 1, 1, 0, 0], dtype=np.int32) - # Convert to numpy for comparison position_ids_np = position_ids.numpy() - mask_encoder_batch_np = mask_encoder_batch.numpy() # Assert equality np.testing.assert_array_equal(position_ids_np, expected_position_ids) - np.testing.assert_array_equal(mask_encoder_batch_np, expected_mask) def test_empty_decoder(self): # Test case where decoder length is 0 @@ -59,17 +52,12 @@ def test_empty_decoder(self): seq_lens_this_time = paddle.to_tensor([0], dtype="int32") position_ids = paddle.zeros([2], dtype="int32") - mask_encoder_batch = paddle.zeros([2], dtype="int32") - get_position_ids_and_mask_encoder_batch( - seq_lens_encoder, seq_lens_decoder, seq_lens_this_time, position_ids, mask_encoder_batch - ) + get_position_ids_and_mask_encoder_batch(seq_lens_encoder, seq_lens_decoder, seq_lens_this_time, position_ids) expected_position_ids = np.array([0, 1], dtype=np.int32) - expected_mask = np.array([1, 1], dtype=np.int32) np.testing.assert_array_equal(position_ids.numpy(), expected_position_ids) - np.testing.assert_array_equal(mask_encoder_batch.numpy(), expected_mask) if __name__ == "__main__": From 736ddc1d1c3d7d484715e131b1eac64219bc4f15 Mon Sep 17 00:00:00 2001 From: ShaneGZhu <1092841848@qq.com> Date: Thu, 16 Apr 2026 17:05:44 +0800 Subject: [PATCH 12/12] add mock config args --- tests/distributed/chunked_moe.py | 1 + tests/worker/test_reorder_split_prefill_and_decode.py | 1 + 2 files changed, 2 insertions(+) diff --git a/tests/distributed/chunked_moe.py b/tests/distributed/chunked_moe.py index d4535327059..49e3f9b2cab 100644 --- a/tests/distributed/chunked_moe.py +++ b/tests/distributed/chunked_moe.py @@ -85,6 +85,7 @@ class SchedulerConfig: name = "default" splitwise_role = "mixed" max_num_seqs = 2 + max_num_batched_tokens = 2048 parallel_config = ParallelConfig() scheduler_config = SchedulerConfig() diff --git a/tests/worker/test_reorder_split_prefill_and_decode.py b/tests/worker/test_reorder_split_prefill_and_decode.py index d2d9e3a1f61..147e9581201 100644 --- a/tests/worker/test_reorder_split_prefill_and_decode.py +++ b/tests/worker/test_reorder_split_prefill_and_decode.py @@ -59,6 +59,7 @@ def create_mock_config(): scheduler_config = Mock(spec=SchedulerConfig) scheduler_config.max_num_seqs = 10 + scheduler_config.max_num_batched_tokens = 2048 speculative_config = Mock(spec=SpeculativeConfig) speculative_config.method = None