From 3106ee91e9410e9c662cbf74b5a0c3c9ae4c8fc5 Mon Sep 17 00:00:00 2001 From: Jiajun-Ji Date: Fri, 3 Apr 2026 11:08:06 +0800 Subject: [PATCH 01/10] [XPU] cherry-pick PR-6947 --- custom_ops/xpu_ops/src/ops/pybind/pybind.cc | 26 +++++++++++++++++++ .../xpu_ops/src/plugin/include/xpu/plugin.h | 1 - 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/custom_ops/xpu_ops/src/ops/pybind/pybind.cc b/custom_ops/xpu_ops/src/ops/pybind/pybind.cc index b73a3d8f151..21c292332bc 100644 --- a/custom_ops/xpu_ops/src/ops/pybind/pybind.cc +++ b/custom_ops/xpu_ops/src/ops/pybind/pybind.cc @@ -1073,6 +1073,32 @@ PYBIND11_MODULE(fastdeploy_ops, m) { py::arg("max_draft_tokens"), "Unified update model status"); + m.def("verify_draft_tokens", + &VerifyDraftTokens, + py::arg("step_output_ids"), + py::arg("step_output_len"), + py::arg("step_input_ids"), + py::arg("target_tokens"), + py::arg("candidate_ids"), + py::arg("candidate_scores"), + py::arg("candidate_lens"), + py::arg("topp"), + py::arg("stop_flags"), + py::arg("seq_lens_encoder"), + py::arg("seq_lens_this_time"), + py::arg("end_tokens"), + py::arg("is_block_step"), + py::arg("cu_seqlens_q_output"), + py::arg("reasoning_status"), + py::arg("max_dec_len"), + py::arg("step_idx"), + py::arg("max_seq_len"), + py::arg("verify_window"), + py::arg("verify_strategy"), + py::arg("reject_all"), + py::arg("accept_all"), + "Perform speculative verification for decoding v2"); + m.def("mtp_step_paddle", &MTPStepPaddle, py::arg("base_model_stop_flags"), diff --git a/custom_ops/xpu_ops/src/plugin/include/xpu/plugin.h b/custom_ops/xpu_ops/src/plugin/include/xpu/plugin.h index 76e684dc498..f7bcc2042dd 100644 --- a/custom_ops/xpu_ops/src/plugin/include/xpu/plugin.h +++ b/custom_ops/xpu_ops/src/plugin/include/xpu/plugin.h @@ -766,7 +766,6 @@ DLL_EXPORT int speculate_limit_thinking_content_length_kernel( const int eos_token_id_len, const int inject_len, const bool splitwise_role_is_decode); - DLL_EXPORT int verify_draft_tokens( api::Context* ctx, // Core I/O From de888d9a95b7c2f2968903c937b2b340fbbf2dbc Mon Sep 17 00:00:00 2001 From: Jiajun-Ji Date: Fri, 3 Apr 2026 11:10:57 +0800 Subject: [PATCH 02/10] [XPU] use unified_update_model_status. --- .../xpu_pre_and_post_process.py | 26 ++++++++----------- 1 file changed, 11 insertions(+), 15 deletions(-) diff --git a/fastdeploy/model_executor/xpu_pre_and_post_process.py b/fastdeploy/model_executor/xpu_pre_and_post_process.py index 0674f2b6d7f..b1e18d30fb7 100644 --- a/fastdeploy/model_executor/xpu_pre_and_post_process.py +++ b/fastdeploy/model_executor/xpu_pre_and_post_process.py @@ -43,11 +43,10 @@ speculate_pre_process, speculate_save_output, speculate_set_stop_value_multi_seqs, - speculate_set_value_by_flags_and_idx, speculate_step_paddle, speculate_step_reschedule, speculate_step_system_cache, - speculate_update, + unified_update_model_status, step_paddle, update_inputs, update_inputs_v1, @@ -408,6 +407,8 @@ def xpu_post_process_specualate( share_inputs: Dict[str, paddle.Tensor], save_each_rank: bool = False, skip_save_output: bool = False, + is_naive_mode: bool = False, + prefill_one_step_stop: bool = False, ): """""" @@ -424,7 +425,7 @@ def xpu_post_process_specualate( model_output.min_tokens, ) - speculate_update( + unified_update_model_status( model_output.seq_lens_encoder, model_output.seq_lens_decoder, model_output.not_need_stop, @@ -436,6 +437,13 @@ def xpu_post_process_specualate( model_output.seq_lens_this_time, model_output.is_block_step, model_output.mask_rollback, + model_output.pre_ids, + model_output.prompt_lens, + model_output.step_idx, + model_output.eos_token_id, + model_output.max_dec_len, + is_naive_mode, + prefill_one_step_stop, ) if not skip_save_output: if sampler_output.logprobs_tensors is None: @@ -456,18 +464,6 @@ def xpu_post_process_specualate( speculate_clear_accept_nums(model_output.accept_num, model_output.seq_lens_decoder) - # Update pre_ids through accept tokens - speculate_set_value_by_flags_and_idx( - model_output.pre_ids, - model_output.accept_tokens, - model_output.accept_num, - model_output.stop_flags, - model_output.seq_lens_this_time, - model_output.seq_lens_encoder, - model_output.seq_lens_decoder, - model_output.step_idx, - ) - def step_xpu( share_inputs: Dict[str, paddle.Tensor], From b5b42739df74b5dd74ae6f3234aaab69ce858c23 Mon Sep 17 00:00:00 2001 From: Jiajun-Ji Date: Fri, 3 Apr 2026 11:37:50 +0800 Subject: [PATCH 03/10] refactor xpu_model_runner. --- fastdeploy/worker/xpu_model_runner.py | 54 ++++++++++++++------------- 1 file changed, 28 insertions(+), 26 deletions(-) diff --git a/fastdeploy/worker/xpu_model_runner.py b/fastdeploy/worker/xpu_model_runner.py index b983be82084..54a4d1fc105 100644 --- a/fastdeploy/worker/xpu_model_runner.py +++ b/fastdeploy/worker/xpu_model_runner.py @@ -135,8 +135,8 @@ def __init__( self.encoder_cache = None self.device_id = device_id - self.speculative_method = self.fd_config.speculative_config.method - self.speculative_decoding = self.speculative_method is not None + self.spec_method = self.fd_config.speculative_config.method + self.speculative_decoding = self.spec_method is not None # used by SamplingMetadata self.enable_logprob = fd_config.model_config.enable_logprob # fd_config.model_config.enable_logprob @@ -728,7 +728,7 @@ def insert_tasks_v1(self, req_dicts: List[Request], num_running_requests: int): if has_prefill_task or has_decode_task: self.share_inputs["not_need_stop"][0] = True - if self.speculative_method == SpecMethod.MTP: + if self.spec_method == SpecMethod.MTP: self.proposer.insert_tasks_v1(req_dicts, num_running_requests) def insert_prefill_inputs(self, req_dicts: List[Request], num_running_requests: int): @@ -877,7 +877,7 @@ def get_attr_from_request(request, attr, default_value=None): self.share_inputs["not_need_stop"][0] = True - if self.speculative_method == SpecMethod.MTP: + if self.spec_method == SpecMethod.MTP: self.share_inputs["temp_scaled_logprobs"][idx : idx + 1] = get_attr_from_request( request, "temp_scaled_logprobs", False ) @@ -1070,11 +1070,11 @@ def _init_share_inputs(self, max_num_seqs: int): fill_value=max_draft_token_num, dtype="int32", ) - self.share_inputs["output_cum_offsets"] = paddle.full(shape=[max_num_seqs, 1], fill_value=0, dtype="int32") - self.share_inputs["output_padding_offset"] = paddle.full( - shape=[max_num_seqs * (max_draft_token_num + 1)], - fill_value=0, - dtype="int32", + # reasoning_status: per-sequence reasoning phase indicator + # 0=thinking, 1=emitting boundary, 2=response, 3=end + # verify_draft_tokens 在 reasoning_status==1 时强制拒绝所有 draft token + self.share_inputs["reasoning_status"] = paddle.full( + shape=[max_num_seqs, 1], fill_value=0, dtype="int32" ) # For V1_KVCACHE_SCHEDULER self.share_inputs["step_draft_tokens"] = paddle.full( @@ -1438,7 +1438,7 @@ def _dummy_run( block_num=block_num, ) - if self.speculative_method == SpecMethod.MTP: + if self.spec_method == SpecMethod.MTP: self.proposer.dummy_prefill_inputs( num_tokens=num_tokens, batch_size=batch_size, @@ -1455,19 +1455,16 @@ def _init_speculative_proposer(self): """ Init speculative proposer """ - if self.speculative_method == SpecMethod.NGRAM: - # xpu not support ngram proposer now - self.proposer = None - elif self.speculative_method == SpecMethod.MTP: - self.proposer = self.speculative_method.create_proposer( - self.fd_config, - main_model=self.get_model(), - local_rank=self.local_rank, - device_id=self.device_id, - share_inputs=self.share_inputs, - ) - else: + if self.spec_method is None: self.proposer = None + return + self.proposer = self.spec_method.create_proposer( + self.fd_config, + main_model=self.get_model(), + local_rank=self.local_rank, + device_id=self.device_id, + share_inputs=self.share_inputs, + ) def _set_debug_level( self, debug_level: int = 0x1, model_forward_batch: Optional[List[Request]] = None, is_dummy_run: bool = False @@ -1669,6 +1666,8 @@ class at the server level, which is too granular for ModelRunner. self.share_inputs, self.parallel_config.data_parallel_size > 1, skip_save_output, + is_naive_mode=(self.speculative_decoding and self.proposer is None), + prefill_one_step_stop=self.parallel_config.prefill_one_step_stop, ) else: xpu_post_process_normal( @@ -1684,8 +1683,11 @@ class at the server level, which is too granular for ModelRunner. ) # 6. Draft model propose - if self.speculative_method == SpecMethod.MTP: - self.proposer.run(full_hidden_states=model_output) + if self.speculative_decoding and self.proposer is not None: + if self.spec_method == SpecMethod.MTP: + self.proposer.run(full_hidden_states=model_output) + else: + self.proposer.run(share_inputs=self.share_inputs) # 7. Updata 'infer_seed' and step_paddle() self.share_inputs["infer_seed"].add_(self.infer_seed_increment) @@ -1737,7 +1739,7 @@ def profile_run(self) -> None: """Execute a forward pass with dummy inputs to profile the memory usage of the model""" self.num_gpu_blocks = self.cache_config.total_block_num - if self.speculative_method == SpecMethod.MTP: + if self.spec_method == SpecMethod.MTP: self.proposer.initialize_kv_cache(main_model_num_blocks=self.num_gpu_blocks, profile=True) self.initialize_kv_cache(profile=True) @@ -1759,7 +1761,7 @@ def update_share_input_block_num(self, num_gpu_blocks: int) -> None: self.num_gpu_blocks = num_gpu_blocks # Reset block table and kv cache with global block num - if self.speculative_method == SpecMethod.MTP: + if self.spec_method == SpecMethod.MTP: self.proposer.initialize_kv_cache(main_model_num_blocks=self.num_gpu_blocks) self.initialize_kv_cache() From 495ac5d5b15b0dd11e3f180948920535510b89eb Mon Sep 17 00:00:00 2001 From: Jiajun-Ji Date: Fri, 3 Apr 2026 12:20:20 +0800 Subject: [PATCH 04/10] refactor sampler. --- .../model_executor/layers/sample/sampler.py | 175 ++++++++++++------ 1 file changed, 118 insertions(+), 57 deletions(-) diff --git a/fastdeploy/model_executor/layers/sample/sampler.py b/fastdeploy/model_executor/layers/sample/sampler.py index 08a33c11096..2e8a64ed2c0 100644 --- a/fastdeploy/model_executor/layers/sample/sampler.py +++ b/fastdeploy/model_executor/layers/sample/sampler.py @@ -1044,6 +1044,114 @@ def forward_cuda( sampler_output.cu_batch_token_offset = cu_batch_token_offset.cpu() return sampler_output + def _normal_sample_xpu( + self, + logits: paddle.Tensor, + probs: paddle.Tensor, + sampling_metadata: SamplingMetadata, + share_inputs: List[paddle.Tensor], + ) -> SamplerOutput: + """Normal sampling for NAIVE mode on XPU.""" + top_p, top_k, topp_seed = padding_sampling_params( + sampling_metadata.top_p, + sampling_metadata.top_k, + sampling_metadata.seed, + paddle.reshape(share_inputs["seq_lens_this_time"], shape=[-1]), + paddle.reshape(share_inputs["seq_lens_encoder"], shape=[-1]), + ) + _, next_tokens = top_k_top_p_sampling( + probs, top_p=top_p, top_k=top_k, + top_k_list=sampling_metadata.top_k_list, topp_seed=topp_seed, + ) + real_bsz = share_inputs["seq_lens_this_time"].shape[0] + running_mask = ( + paddle.reshape(share_inputs["seq_lens_this_time"], shape=[-1]) > 0 + ).cast("int32") + share_inputs["accept_tokens"][:real_bsz, 0] = next_tokens.squeeze(-1) + share_inputs["accept_num"][:real_bsz] = running_mask + return SamplerOutput( + sampled_token_ids=share_inputs["accept_tokens"], + logprobs_tensors=None, + token_num_per_batch=share_inputs["accept_num"], + logits=logits, + ) + + def _verify_and_sample_xpu( + self, + logits: paddle.Tensor, + probs: paddle.Tensor, + sampling_metadata: SamplingMetadata, + max_model_len: int, + share_inputs: List[paddle.Tensor], + accept_all_drafts: bool = False, + reject_all_drafts: bool = False, + ) -> SamplerOutput: + """Verify draft tokens (MTP/Ngram mode) on XPU using verify_draft_tokens.""" + from fastdeploy.model_executor.ops.xpu import verify_draft_tokens, top_p_candidates + + target_tokens = None + candidate_ids, candidate_scores, candidate_lens = None, None, None + + if self.verify_strategy == VerifyStrategy.TARGET_MATCH: + top_p, top_k, topp_seed = padding_sampling_params( + sampling_metadata.top_p, + sampling_metadata.top_k, + sampling_metadata.seed, + paddle.reshape(share_inputs["seq_lens_this_time"], shape=[-1]), + paddle.reshape(share_inputs["seq_lens_encoder"], shape=[-1]), + ) + _, target_tokens = top_k_top_p_sampling( + probs, top_p=top_p, top_k=top_k, + top_k_list=sampling_metadata.top_k_list, topp_seed=topp_seed, + ) + elif self.verify_strategy == VerifyStrategy.GREEDY: + target_tokens = paddle.argmax(probs, axis=-1) + elif self.verify_strategy == VerifyStrategy.TOPP: + candidate_scores, candidate_ids, candidate_lens = top_p_candidates( + probs, + sampling_metadata.top_p, + share_inputs["batch_id_per_token_output"], + self.speculative_max_candidate_len, + max_model_len, + ) + else: + raise ValueError(f"Unknown verify strategy: {self.verify_strategy}") + + final_accept_all = self.config_accept_all or accept_all_drafts + final_reject_all = (self.config_reject_all or reject_all_drafts + or self.speculative_benchmark_mode) + + verify_draft_tokens( + share_inputs["accept_tokens"], + share_inputs["accept_num"], + share_inputs["draft_tokens"], + target_tokens, + candidate_ids, + candidate_scores, + candidate_lens, + sampling_metadata.top_p, + share_inputs["stop_flags"], + share_inputs["seq_lens_encoder"], + share_inputs["seq_lens_this_time"], + sampling_metadata.eos_token_ids, + share_inputs["is_block_step"], + share_inputs["cu_seqlens_q_output"], + share_inputs["reasoning_status"], + share_inputs["max_dec_len"], + share_inputs["step_idx"], + max_model_len, + self.speculative_verify_window, + self.verify_strategy.value, + final_reject_all, + final_accept_all, + ) + return SamplerOutput( + sampled_token_ids=share_inputs["accept_tokens"], + logprobs_tensors=None, + token_num_per_batch=share_inputs["accept_num"], + logits=logits, + ) + def forward_xpu( self, logits: paddle.Tensor, @@ -1052,8 +1160,8 @@ def forward_xpu( share_inputs: List[paddle.Tensor], accept_all_drafts: bool = False, reject_all_drafts: bool = False, - ) -> paddle.Tensor: - from fastdeploy.model_executor.ops.xpu import speculate_verify, top_p_candidates + ) -> SamplerOutput: + from fastdeploy.model_executor.ops.xpu import apply_speculative_penalty_multi_scores logits = apply_speculative_penalty_multi_scores( sampling_metadata.token_ids_all, @@ -1077,61 +1185,14 @@ def forward_xpu( probs = F.softmax(logits) - top_p, top_k, topp_seed = padding_sampling_params( - sampling_metadata.top_p, - sampling_metadata.top_k, - sampling_metadata.seed, - paddle.reshape(share_inputs["seq_lens_this_time"], shape=[-1]), - paddle.reshape(share_inputs["seq_lens_encoder"], shape=[-1]), - ) - _, sampled_token_ids = top_k_top_p_sampling( - probs, top_p=top_p, top_k=top_k, top_k_list=sampling_metadata.top_k_list, topp_seed=topp_seed - ) - - verify_scores, verify_tokens, actual_candidate_len = top_p_candidates( - probs, - sampling_metadata.top_p, - share_inputs["batch_id_per_token_output"], - self.speculative_max_candidate_len, - max_model_len, - ) - - speculate_verify( - sampled_token_ids, - share_inputs["accept_tokens"], - share_inputs["accept_num"], - share_inputs["step_idx"], - share_inputs["stop_flags"], - share_inputs["seq_lens_encoder"], - share_inputs["seq_lens_decoder"], - share_inputs[ - "draft_tokens" - ], # Both input and output, need to write the last 1 token accepted to position 0. - share_inputs["seq_lens_this_time"], - verify_tokens, - verify_scores, - share_inputs["max_dec_len"], - sampling_metadata.eos_token_ids, - share_inputs["is_block_step"], - share_inputs["cu_seqlens_q_output"], - actual_candidate_len, - share_inputs["actual_draft_token_num"], - sampling_metadata.top_p, - max_model_len, - self.speculative_verify_window, - True, # enable_topp - (self.speculative_benchmark_mode or reject_all_drafts), - accept_all_drafts, - ) - # TODO(chenhuan09): support return logprobs - token_ids = share_inputs["accept_tokens"] - sampler_output = SamplerOutput( - sampled_token_ids=token_ids, - logprobs_tensors=None, - token_num_per_batch=share_inputs["accept_num"], - cu_batch_token_offset=None, - ) - return sampler_output + is_naive = self.spec_method is None or self.spec_method == SpecMethod.NAIVE + if is_naive: + return self._normal_sample_xpu(logits, probs, sampling_metadata, share_inputs) + else: + return self._verify_and_sample_xpu( + logits, probs, sampling_metadata, max_model_len, share_inputs, + accept_all_drafts, reject_all_drafts, + ) class MTPSampler(nn.Layer): From 085cfcc5da56aa8194f7c73e3f6944a95a9bf995 Mon Sep 17 00:00:00 2001 From: Jiajun-Ji Date: Fri, 3 Apr 2026 12:44:34 +0800 Subject: [PATCH 05/10] fix codestyle. --- .../model_executor/layers/sample/sampler.py | 39 ++++++++++++------- .../xpu_pre_and_post_process.py | 2 +- fastdeploy/worker/xpu_model_runner.py | 4 +- 3 files changed, 28 insertions(+), 17 deletions(-) diff --git a/fastdeploy/model_executor/layers/sample/sampler.py b/fastdeploy/model_executor/layers/sample/sampler.py index 2e8a64ed2c0..9ad2679c24a 100644 --- a/fastdeploy/model_executor/layers/sample/sampler.py +++ b/fastdeploy/model_executor/layers/sample/sampler.py @@ -1060,13 +1060,14 @@ def _normal_sample_xpu( paddle.reshape(share_inputs["seq_lens_encoder"], shape=[-1]), ) _, next_tokens = top_k_top_p_sampling( - probs, top_p=top_p, top_k=top_k, - top_k_list=sampling_metadata.top_k_list, topp_seed=topp_seed, + probs, + top_p=top_p, + top_k=top_k, + top_k_list=sampling_metadata.top_k_list, + topp_seed=topp_seed, ) real_bsz = share_inputs["seq_lens_this_time"].shape[0] - running_mask = ( - paddle.reshape(share_inputs["seq_lens_this_time"], shape=[-1]) > 0 - ).cast("int32") + running_mask = (paddle.reshape(share_inputs["seq_lens_this_time"], shape=[-1]) > 0).cast("int32") share_inputs["accept_tokens"][:real_bsz, 0] = next_tokens.squeeze(-1) share_inputs["accept_num"][:real_bsz] = running_mask return SamplerOutput( @@ -1087,7 +1088,10 @@ def _verify_and_sample_xpu( reject_all_drafts: bool = False, ) -> SamplerOutput: """Verify draft tokens (MTP/Ngram mode) on XPU using verify_draft_tokens.""" - from fastdeploy.model_executor.ops.xpu import verify_draft_tokens, top_p_candidates + from fastdeploy.model_executor.ops.xpu import ( + top_p_candidates, + verify_draft_tokens, + ) target_tokens = None candidate_ids, candidate_scores, candidate_lens = None, None, None @@ -1101,8 +1105,11 @@ def _verify_and_sample_xpu( paddle.reshape(share_inputs["seq_lens_encoder"], shape=[-1]), ) _, target_tokens = top_k_top_p_sampling( - probs, top_p=top_p, top_k=top_k, - top_k_list=sampling_metadata.top_k_list, topp_seed=topp_seed, + probs, + top_p=top_p, + top_k=top_k, + top_k_list=sampling_metadata.top_k_list, + topp_seed=topp_seed, ) elif self.verify_strategy == VerifyStrategy.GREEDY: target_tokens = paddle.argmax(probs, axis=-1) @@ -1118,8 +1125,7 @@ def _verify_and_sample_xpu( raise ValueError(f"Unknown verify strategy: {self.verify_strategy}") final_accept_all = self.config_accept_all or accept_all_drafts - final_reject_all = (self.config_reject_all or reject_all_drafts - or self.speculative_benchmark_mode) + final_reject_all = self.config_reject_all or reject_all_drafts or self.speculative_benchmark_mode verify_draft_tokens( share_inputs["accept_tokens"], @@ -1161,7 +1167,9 @@ def forward_xpu( accept_all_drafts: bool = False, reject_all_drafts: bool = False, ) -> SamplerOutput: - from fastdeploy.model_executor.ops.xpu import apply_speculative_penalty_multi_scores + from fastdeploy.model_executor.ops.xpu import ( + apply_speculative_penalty_multi_scores, + ) logits = apply_speculative_penalty_multi_scores( sampling_metadata.token_ids_all, @@ -1190,8 +1198,13 @@ def forward_xpu( return self._normal_sample_xpu(logits, probs, sampling_metadata, share_inputs) else: return self._verify_and_sample_xpu( - logits, probs, sampling_metadata, max_model_len, share_inputs, - accept_all_drafts, reject_all_drafts, + logits, + probs, + sampling_metadata, + max_model_len, + share_inputs, + accept_all_drafts, + reject_all_drafts, ) diff --git a/fastdeploy/model_executor/xpu_pre_and_post_process.py b/fastdeploy/model_executor/xpu_pre_and_post_process.py index b1e18d30fb7..653f28a3185 100644 --- a/fastdeploy/model_executor/xpu_pre_and_post_process.py +++ b/fastdeploy/model_executor/xpu_pre_and_post_process.py @@ -46,8 +46,8 @@ speculate_step_paddle, speculate_step_reschedule, speculate_step_system_cache, - unified_update_model_status, step_paddle, + unified_update_model_status, update_inputs, update_inputs_v1, ) diff --git a/fastdeploy/worker/xpu_model_runner.py b/fastdeploy/worker/xpu_model_runner.py index 54a4d1fc105..38ecb53328d 100644 --- a/fastdeploy/worker/xpu_model_runner.py +++ b/fastdeploy/worker/xpu_model_runner.py @@ -1073,9 +1073,7 @@ def _init_share_inputs(self, max_num_seqs: int): # reasoning_status: per-sequence reasoning phase indicator # 0=thinking, 1=emitting boundary, 2=response, 3=end # verify_draft_tokens 在 reasoning_status==1 时强制拒绝所有 draft token - self.share_inputs["reasoning_status"] = paddle.full( - shape=[max_num_seqs, 1], fill_value=0, dtype="int32" - ) + self.share_inputs["reasoning_status"] = paddle.full(shape=[max_num_seqs, 1], fill_value=0, dtype="int32") # For V1_KVCACHE_SCHEDULER self.share_inputs["step_draft_tokens"] = paddle.full( shape=[max_num_seqs, max_draft_token_num + 1], From 85be497681b2c6b81281c2383242f0d657a5a571 Mon Sep 17 00:00:00 2001 From: Jiajun-Ji Date: Fri, 3 Apr 2026 16:05:52 +0800 Subject: [PATCH 06/10] Fix XPU speculative decoding: rename output tensors to cu_seqlens_q_output/batch_id_per_token_output, correct WRAPPER_CHECK_PTR types, and fix dynamic gather shape in verify_draft_tokens path. --- fastdeploy/model_executor/layers/sample/sampler.py | 4 ---- .../model_executor/xpu_pre_and_post_process.py | 6 ++++++ fastdeploy/worker/input_batch.py | 14 +++++++------- fastdeploy/worker/xpu_model_runner.py | 6 ++++++ 4 files changed, 19 insertions(+), 11 deletions(-) diff --git a/fastdeploy/model_executor/layers/sample/sampler.py b/fastdeploy/model_executor/layers/sample/sampler.py index 9ad2679c24a..1afb9493897 100644 --- a/fastdeploy/model_executor/layers/sample/sampler.py +++ b/fastdeploy/model_executor/layers/sample/sampler.py @@ -1167,10 +1167,6 @@ def forward_xpu( accept_all_drafts: bool = False, reject_all_drafts: bool = False, ) -> SamplerOutput: - from fastdeploy.model_executor.ops.xpu import ( - apply_speculative_penalty_multi_scores, - ) - logits = apply_speculative_penalty_multi_scores( sampling_metadata.token_ids_all, sampling_metadata.prompt_lens, diff --git a/fastdeploy/model_executor/xpu_pre_and_post_process.py b/fastdeploy/model_executor/xpu_pre_and_post_process.py index 653f28a3185..1c21276ca1c 100644 --- a/fastdeploy/model_executor/xpu_pre_and_post_process.py +++ b/fastdeploy/model_executor/xpu_pre_and_post_process.py @@ -242,8 +242,14 @@ def xpu_process_output( if isinstance(share_inputs, dict): output_padding_offset = share_inputs.get("output_padding_offset", None) + if output_padding_offset is None: + # For XPU speculative decoding, force mtp gather path to keep + # output shape dynamic (real output token num) instead of max_bsz. + output_padding_offset = share_inputs.get("batch_id_per_token_output", None) else: output_padding_offset = getattr(share_inputs, "output_padding_offset", None) + if output_padding_offset is None: + output_padding_offset = getattr(share_inputs, "batch_id_per_token_output", None) hidden_states = gather_next_token( forward_output, diff --git a/fastdeploy/worker/input_batch.py b/fastdeploy/worker/input_batch.py index 55a3f39a2ee..48e0384aa92 100644 --- a/fastdeploy/worker/input_batch.py +++ b/fastdeploy/worker/input_batch.py @@ -294,8 +294,8 @@ def init_share_inputs(self): dtype="int32", ) else: - self.output_cum_offsets = paddle.full(shape=[max_num_seqs, 1], fill_value=0, dtype="int32") - self.output_padding_offset = paddle.full( + self.cu_seqlens_q_output = paddle.full(shape=[max_num_seqs + 1, 1], fill_value=0, dtype="int32") + self.batch_id_per_token_output = paddle.full( shape=[max_num_seqs * (max_draft_token_num + 1)], fill_value=0, dtype="int32", @@ -437,7 +437,7 @@ def swap_data(tensor, idx1, idx2): if current_platform.is_cuda(): swap_data(self.cu_seqlens_q_output, i1, i2) else: - swap_data(self.output_cum_offsets, i1, i2) + swap_data(self.cu_seqlens_q_output, i1, i2) swap_data(self.step_draft_tokens, i1, i2) swap_data(self.step_seq_lens_this_time, i1, i2) swap_data(self.draft_logits, i1, i2) @@ -628,8 +628,8 @@ def reset_share_inputs(self): fill_paddle_tensor(self, "accept_num", 0) fill_paddle_tensor(self, "draft_tokens", -1) fill_paddle_tensor(self, "actual_draft_token_num", max_draft_token_num) - fill_paddle_tensor(self, "output_cum_offsets", 0) - fill_paddle_tensor(self, "output_padding_offset", 0) + fill_paddle_tensor(self, "cu_seqlens_q_output", 0) + fill_paddle_tensor(self, "batch_id_per_token_output", 0) fill_paddle_tensor(self, "step_draft_tokens", 0) fill_paddle_tensor(self, "step_seq_lens_this_time", 0) fill_paddle_tensor(self, "draft_logits", -1) @@ -742,8 +742,8 @@ def init_share_inputs(self): self.pre_ids = paddle.clone(self.target_model_input_batch["pre_ids"]) self.token_ids_all = None else: - self.output_cum_offsets = paddle.clone(self.target_model_input_batch["output_cum_offsets"]) - self.output_padding_offset = paddle.clone(self.target_model_input_batch["output_padding_offset"]) + self.cu_seqlens_q_output = paddle.clone(self.target_model_input_batch["cu_seqlens_q_output"]) + self.batch_id_per_token_output = paddle.clone(self.target_model_input_batch["batch_id_per_token_output"]) self.pre_ids = paddle.clone(self.target_model_input_batch["pre_ids"]) self.ids_remove_padding = paddle.clone(self.target_model_input_batch["ids_remove_padding"]) self.batch_id_per_token = paddle.clone(self.target_model_input_batch["batch_id_per_token"]) diff --git a/fastdeploy/worker/xpu_model_runner.py b/fastdeploy/worker/xpu_model_runner.py index 38ecb53328d..e92502b0ca5 100644 --- a/fastdeploy/worker/xpu_model_runner.py +++ b/fastdeploy/worker/xpu_model_runner.py @@ -1070,6 +1070,12 @@ def _init_share_inputs(self, max_num_seqs: int): fill_value=max_draft_token_num, dtype="int32", ) + self.share_inputs["cu_seqlens_q_output"] = paddle.full(shape=[max_num_seqs + 1, 1], fill_value=0, dtype="int32") + self.share_inputs["batch_id_per_token_output"] = paddle.full( + shape=[max_num_seqs * (max_draft_token_num + 1)], + fill_value=0, + dtype="int32", + ) # reasoning_status: per-sequence reasoning phase indicator # 0=thinking, 1=emitting boundary, 2=response, 3=end # verify_draft_tokens 在 reasoning_status==1 时强制拒绝所有 draft token From 964881f9c5d6ce05975b17e7daccad9175da3e7b Mon Sep 17 00:00:00 2001 From: Jiajun-Ji Date: Fri, 3 Apr 2026 16:08:54 +0800 Subject: [PATCH 07/10] fix codestyle. --- fastdeploy/worker/xpu_model_runner.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/fastdeploy/worker/xpu_model_runner.py b/fastdeploy/worker/xpu_model_runner.py index e92502b0ca5..40526d523ac 100644 --- a/fastdeploy/worker/xpu_model_runner.py +++ b/fastdeploy/worker/xpu_model_runner.py @@ -1070,7 +1070,9 @@ def _init_share_inputs(self, max_num_seqs: int): fill_value=max_draft_token_num, dtype="int32", ) - self.share_inputs["cu_seqlens_q_output"] = paddle.full(shape=[max_num_seqs + 1, 1], fill_value=0, dtype="int32") + self.share_inputs["cu_seqlens_q_output"] = paddle.full( + shape=[max_num_seqs + 1, 1], fill_value=0, dtype="int32" + ) self.share_inputs["batch_id_per_token_output"] = paddle.full( shape=[max_num_seqs * (max_draft_token_num + 1)], fill_value=0, From 7ceefe407c6d4bcc212e660d27c65b2943a011d1 Mon Sep 17 00:00:00 2001 From: Jiajun-Ji Date: Thu, 9 Apr 2026 14:48:24 +0800 Subject: [PATCH 08/10] replace output_padding_offset with is_speculative flag in gather_next_token. --- .../xpu_ops/src/ops/gather_next_token.cc | 22 +++++++------------ custom_ops/xpu_ops/src/ops/pybind/pybind.cc | 4 ++-- ...test_adjust_batch_and_gather_next_token.py | 17 +++++--------- fastdeploy/model_executor/forward_meta.py | 1 + .../xpu_pre_and_post_process.py | 16 +++----------- 5 files changed, 20 insertions(+), 40 deletions(-) diff --git a/custom_ops/xpu_ops/src/ops/gather_next_token.cc b/custom_ops/xpu_ops/src/ops/gather_next_token.cc index 31c2142ca07..ee261965d6b 100644 --- a/custom_ops/xpu_ops/src/ops/gather_next_token.cc +++ b/custom_ops/xpu_ops/src/ops/gather_next_token.cc @@ -32,7 +32,7 @@ std::vector GatherNextToken( const paddle::Tensor& encoder_batch_map_cpu, const paddle::Tensor& decoder_batch_map_cpu, const paddle::Tensor& len_info_cpu, - const paddle::optional& output_padding_offset, + bool is_speculative, int max_bsz) { phi::XPUPlace place(phi::backends::xpu::GetXPUCurrentDeviceId()); auto dev_ctx = paddle::experimental::DeviceContextPool::Instance().Get(place); @@ -73,7 +73,7 @@ std::vector GatherNextToken( const_cast(decoder_batch_map.data())}; paddle::Tensor out; - if (output_padding_offset) { + if (is_speculative) { int need_delete_token_num = 0; if (enc_batch > 0) { need_delete_token_num = @@ -88,7 +88,7 @@ std::vector GatherNextToken( return {out}; } - if (output_padding_offset) { + if (is_speculative) { int r = fastdeploy::plugin::eb_mtp_gather_next_token( ctx, reinterpret_cast(x.data()), @@ -124,14 +124,10 @@ std::vector> GatherNextTokenInferShape( const std::vector& encoder_batch_map_cpu_shape, const std::vector& decoder_batch_map_cpu_shape, const std::vector& len_info_cpu_shape, - const paddle::optional>& output_padding_offset_shape) { - // if (output_padding_offset_shape) { - // PD_THROW("speculative decoding is not supported in XPU."); - // } - // int64_t bsz = cum_offsets_shape[0]; + bool is_speculative) { int64_t bsz = 0; int64_t dim_embed = x_shape[1]; - if (output_padding_offset_shape) { + if (is_speculative) { return {{-1, dim_embed}}; } else { return {{bsz, dim_embed}}; @@ -148,8 +144,7 @@ std::vector GatherNextTokenInferDtype( const paddle::DataType& decoder_seq_lod_cpu_dtype, const paddle::DataType& encoder_batch_map_cpu_dtype, const paddle::DataType& decoder_batch_map_cpu_dtype, - const paddle::DataType& len_info_cpu_dtype, - const paddle::optional& output_padding_offset_dtype) { + const paddle::DataType& len_info_cpu_dtype) { return {x_dtype}; } @@ -163,10 +158,9 @@ PD_BUILD_STATIC_OP(gather_next_token) "decoder_seq_lod_cpu", "encoder_batch_map_cpu", "decoder_batch_map_cpu", - "len_info_cpu", - paddle::Optional("output_padding_offset")}) + "len_info_cpu"}) .Outputs({"out"}) - .Attrs({"max_bsz: int"}) + .Attrs({"is_speculative: bool", "max_bsz: int"}) .SetKernelFn(PD_KERNEL(GatherNextToken)) .SetInferShapeFn(PD_INFER_SHAPE(GatherNextTokenInferShape)) .SetInferDtypeFn(PD_INFER_DTYPE(GatherNextTokenInferDtype)); diff --git a/custom_ops/xpu_ops/src/ops/pybind/pybind.cc b/custom_ops/xpu_ops/src/ops/pybind/pybind.cc index 21c292332bc..c7daaeae588 100644 --- a/custom_ops/xpu_ops/src/ops/pybind/pybind.cc +++ b/custom_ops/xpu_ops/src/ops/pybind/pybind.cc @@ -421,7 +421,7 @@ std::vector GatherNextToken( const paddle::Tensor& encoder_batch_map_cpu, const paddle::Tensor& decoder_batch_map_cpu, const paddle::Tensor& len_info_cpu, - const paddle::optional& output_padding_offset, + bool is_speculative, int max_bsz); std::vector GetImgBoundaries( @@ -945,7 +945,7 @@ PYBIND11_MODULE(fastdeploy_ops, m) { py::arg("encoder_batch_map_cpu"), py::arg("decoder_batch_map_cpu"), py::arg("len_info_cpu"), - py::arg("output_padding_offset"), + py::arg("is_speculative"), py::arg("max_bsz"), "Gather next token for XPU"); diff --git a/custom_ops/xpu_ops/test/test_adjust_batch_and_gather_next_token.py b/custom_ops/xpu_ops/test/test_adjust_batch_and_gather_next_token.py index 758dff17e58..bc074242b4e 100644 --- a/custom_ops/xpu_ops/test/test_adjust_batch_and_gather_next_token.py +++ b/custom_ops/xpu_ops/test/test_adjust_batch_and_gather_next_token.py @@ -24,7 +24,7 @@ ) -def _run_test_base(seq_lens_this_time_data, output_padding_offset): +def _run_test_base(seq_lens_this_time_data, is_speculative): """ 通用的基础测试执行函数,包含了两个场景共有的逻辑。 """ @@ -120,7 +120,7 @@ def _run_test_base(seq_lens_this_time_data, output_padding_offset): encoder_batch_map_cpu, decoder_batch_map_cpu, len_info_cpu, - output_padding_offset, + is_speculative, -1, ) @@ -136,14 +136,14 @@ def _run_test_base(seq_lens_this_time_data, output_padding_offset): encoder_batch_map_cpu, decoder_batch_map_cpu, len_info_cpu, - output_padding_offset, + is_speculative, -1, ) gather_out_np = gather_out.astype("float32").cpu().numpy() gather_out_cpu_np = gather_out_cpu.astype("float32").cpu().numpy() - if output_padding_offset is not None: + if is_speculative: np.testing.assert_allclose(gather_out_np, gather_out_cpu_np, err_msg="gather_next_token check failed!") else: for i in range(gather_out_cpu.shape[0]): @@ -160,19 +160,14 @@ def test_mix_with_mtp(self): """测试混合批次处理中的 MTP (Multi-Token Prediction) 场景""" print("\nRunning test: test_mix_with_mtp") seq_lens_this_time_data = [100, 2, 0, 1, 120, 140, 3] - bsz = len(seq_lens_this_time_data) - output_padding_offset = paddle.zeros(bsz, dtype="int32") - - _run_test_base(seq_lens_this_time_data, output_padding_offset) + _run_test_base(seq_lens_this_time_data, True) print("Test passed for scenario: With MTP") def test_mix_without_mtp(self): """测试非 MTP (Single-Token Prediction) 场景下的功能""" print("\nRunning test: test_mix_without_mtp") seq_lens_this_time_data = [100, 1, 0, 1, 120, 140, 1] - output_padding_offset = None # 非 MTP 场景下,此参数为 None - - _run_test_base(seq_lens_this_time_data, output_padding_offset) + _run_test_base(seq_lens_this_time_data, False) print("Test passed for scenario: Without MTP") diff --git a/fastdeploy/model_executor/forward_meta.py b/fastdeploy/model_executor/forward_meta.py index 44cf528bed3..ff5f32057c7 100644 --- a/fastdeploy/model_executor/forward_meta.py +++ b/fastdeploy/model_executor/forward_meta.py @@ -275,6 +275,7 @@ class XPUForwardMeta(ForwardMeta): hidden_states: Optional[paddle.Tensor] = None is_draft: bool = False + is_speculative: bool = False # max bs max_num_seqs: int = 0 diff --git a/fastdeploy/model_executor/xpu_pre_and_post_process.py b/fastdeploy/model_executor/xpu_pre_and_post_process.py index 1c21276ca1c..fe506c5e995 100644 --- a/fastdeploy/model_executor/xpu_pre_and_post_process.py +++ b/fastdeploy/model_executor/xpu_pre_and_post_process.py @@ -170,6 +170,7 @@ def xpu_pre_process( block_tables=share_inputs["block_tables"], caches=share_inputs["caches"], max_num_seqs=share_inputs["seq_lens_this_time"].shape[0], + is_speculative=use_speculate_method, ) ( @@ -240,18 +241,7 @@ def xpu_process_output( ) -> paddle.Tensor: """ """ - if isinstance(share_inputs, dict): - output_padding_offset = share_inputs.get("output_padding_offset", None) - if output_padding_offset is None: - # For XPU speculative decoding, force mtp gather path to keep - # output shape dynamic (real output token num) instead of max_bsz. - output_padding_offset = share_inputs.get("batch_id_per_token_output", None) - else: - output_padding_offset = getattr(share_inputs, "output_padding_offset", None) - if output_padding_offset is None: - output_padding_offset = getattr(share_inputs, "batch_id_per_token_output", None) - - hidden_states = gather_next_token( + hiddden_states = gather_next_token( forward_output, xpu_forward_meta.encoder_seq_lod, xpu_forward_meta.decoder_seq_lod, @@ -262,7 +252,7 @@ def xpu_process_output( xpu_forward_meta.encoder_batch_map_cpu, xpu_forward_meta.decoder_batch_map_cpu, xpu_forward_meta.len_info_cpu, - output_padding_offset, # output_padding_offset + xpu_forward_meta.is_speculative, xpu_forward_meta.max_num_seqs, ) return hidden_states From 6b188b8c9702e65add85c6790e502ca90accf8e9 Mon Sep 17 00:00:00 2001 From: Jiajun-Ji Date: Wed, 15 Apr 2026 14:19:49 +0800 Subject: [PATCH 09/10] rename hiddden_states. --- fastdeploy/model_executor/xpu_pre_and_post_process.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fastdeploy/model_executor/xpu_pre_and_post_process.py b/fastdeploy/model_executor/xpu_pre_and_post_process.py index fe506c5e995..f58aa251d41 100644 --- a/fastdeploy/model_executor/xpu_pre_and_post_process.py +++ b/fastdeploy/model_executor/xpu_pre_and_post_process.py @@ -241,7 +241,7 @@ def xpu_process_output( ) -> paddle.Tensor: """ """ - hiddden_states = gather_next_token( + hidden_states = gather_next_token( forward_output, xpu_forward_meta.encoder_seq_lod, xpu_forward_meta.decoder_seq_lod, From 56567657efe9aada9b55032406a9f3a08f52edf8 Mon Sep 17 00:00:00 2001 From: Jiajun-Ji Date: Wed, 15 Apr 2026 20:53:02 +0800 Subject: [PATCH 10/10] unify cu_seqlens_q_output and batch_id_per_token_output init. --- fastdeploy/worker/input_batch.py | 20 ++++++-------------- 1 file changed, 6 insertions(+), 14 deletions(-) diff --git a/fastdeploy/worker/input_batch.py b/fastdeploy/worker/input_batch.py index 48e0384aa92..4c313586842 100644 --- a/fastdeploy/worker/input_batch.py +++ b/fastdeploy/worker/input_batch.py @@ -286,20 +286,12 @@ def init_share_inputs(self): fill_value=max_draft_token_num, dtype="int32", ) - if current_platform.is_cuda(): - self.cu_seqlens_q_output = paddle.full(shape=[max_num_seqs + 1, 1], fill_value=0, dtype="int32") - self.batch_id_per_token_output = paddle.full( - shape=[max_num_seqs * (max_draft_token_num + 1)], - fill_value=0, - dtype="int32", - ) - else: - self.cu_seqlens_q_output = paddle.full(shape=[max_num_seqs + 1, 1], fill_value=0, dtype="int32") - self.batch_id_per_token_output = paddle.full( - shape=[max_num_seqs * (max_draft_token_num + 1)], - fill_value=0, - dtype="int32", - ) + self.cu_seqlens_q_output = paddle.full(shape=[max_num_seqs + 1, 1], fill_value=0, dtype="int32") + self.batch_id_per_token_output = paddle.full( + shape=[max_num_seqs * (max_draft_token_num + 1)], + fill_value=0, + dtype="int32", + ) # For V1_KVCACHE_SCHEDULER self.step_draft_tokens = paddle.full( shape=[max_num_seqs, max_draft_token_num + 1],