From 06e3ae160dedc19b08f262ea7e5e4ec09d14d75b Mon Sep 17 00:00:00 2001 From: yuanlehome Date: Tue, 11 Nov 2025 20:07:30 +0800 Subject: [PATCH 01/17] support r3 --- fastdeploy/model_executor/forward_meta.py | 2 + .../layers/moe/fused_moe_backend_base.py | 7 +- .../layers/moe/fused_moe_deepgemm_backend.py | 7 + fastdeploy/model_executor/layers/moe/moe.py | 16 +- .../layers/moe/routing_indices_cache.py | 147 ++++++++++++++++++ 5 files changed, 174 insertions(+), 5 deletions(-) create mode 100644 fastdeploy/model_executor/layers/moe/routing_indices_cache.py diff --git a/fastdeploy/model_executor/forward_meta.py b/fastdeploy/model_executor/forward_meta.py index 06ef4b75537..f73b6c3caf2 100644 --- a/fastdeploy/model_executor/forward_meta.py +++ b/fastdeploy/model_executor/forward_meta.py @@ -110,6 +110,8 @@ class ForwardMeta: block_tables: Optional[paddle.Tensor] = None # KV caches caches: Optional[list[paddle.Tensor]] = None + # Routing Replay table buffer + routing_table_buffer: Optional[paddle.Tensor] = None def clear_caches(self): """Safely clean up the caches""" diff --git a/fastdeploy/model_executor/layers/moe/fused_moe_backend_base.py b/fastdeploy/model_executor/layers/moe/fused_moe_backend_base.py index 039bb2b134f..e532768ec62 100644 --- a/fastdeploy/model_executor/layers/moe/fused_moe_backend_base.py +++ b/fastdeploy/model_executor/layers/moe/fused_moe_backend_base.py @@ -155,6 +155,7 @@ def apply( layer: nn.Layer, x: paddle.Tensor, gate: nn.Layer, + topk_ids_hookfunc, ) -> paddle.Tensor: """ Paddle Cutlass compute Fused MoE. @@ -163,13 +164,13 @@ def apply( if layer.fd_config.parallel_config.moe_phase.phase == "prefill": if layer.fd_config.parallel_config.splitwise_role == "mixed" and layer.layer_idx == 0: self.ep_prefill_runner.clean_low_latency_buffer() - return self.apply_ep_prefill(layer, x, gate) + return self.apply_ep_prefill(layer, x, gate, topk_ids_hookfunc) else: if layer.fd_config.parallel_config.splitwise_role == "mixed" and layer.layer_idx == 0: self.ep_decoder_runner.clean_low_latency_buffer() - return self.apply_ep_decode(layer, x, gate) + return self.apply_ep_decode(layer, x, gate, topk_ids_hookfunc) else: - return self.apply_tp(layer, x, gate) + return self.apply_tp(layer, x, gate, topk_ids_hookfunc) class UnquantizedFusedMoEMethod(MoEMethodBase): diff --git a/fastdeploy/model_executor/layers/moe/fused_moe_deepgemm_backend.py b/fastdeploy/model_executor/layers/moe/fused_moe_deepgemm_backend.py index 178bc74b342..44e4eb9fd7a 100644 --- a/fastdeploy/model_executor/layers/moe/fused_moe_deepgemm_backend.py +++ b/fastdeploy/model_executor/layers/moe/fused_moe_deepgemm_backend.py @@ -298,6 +298,7 @@ def apply_ep_prefill( layer: nn.Layer, x: paddle.Tensor, gate: nn.Layer, + topk_ids_hookfunc, ) -> paddle.Tensor: """ Apply the EP prefill method. @@ -305,6 +306,7 @@ def apply_ep_prefill( gate_out = gate(x.cast("float32")) # 1. Select topk experts and weights topk_idx, topk_weights = self.ep_prefill_runner.moe_select(layer, gate_out) + topk_ids_hookfunc(topk_ids=topk_idx) # 2. Dynamic compute blockwise quantization scales x, x_scale_tensor = fastdeploy.model_executor.ops.gpu.per_token_quant( x, self.quant_config.weight_block_size[0] @@ -406,6 +408,7 @@ def apply_ep_decode( layer: nn.Layer, x: paddle.Tensor, gate: nn.Layer, + topk_ids_hookfunc, ) -> paddle.Tensor: """ Apply the EP decoder method. @@ -413,6 +416,7 @@ def apply_ep_decode( gate_out = gate(x.cast("float32")) # 1. Select topk experts and weights topk_idx, topk_weights = self.ep_decoder_runner.moe_select(layer, gate_out) + topk_ids_hookfunc(topk_ids=topk_idx) # 2. EP Dispatch permute_input, token_nums_per_expert, handle = self.ep_decoder_runner.dispatch( x, topk_idx, topk_weights, use_fp8=True @@ -477,6 +481,7 @@ def apply_tp( layer: nn.Layer, x: paddle.Tensor, gate: nn.Layer, + topk_ids_hookfunc, ) -> paddle.Tensor: """ Paddle Use DeepGemm compute Fused MoE. @@ -504,6 +509,8 @@ def apply_tp( False, ) + topk_ids_hookfunc(topk_ids=topk_ids) + tmp = count_tokens_per_expert_func(topk_ids, layer.num_experts) recv_x, recv_x_scale = fastdeploy.model_executor.ops.gpu.per_token_quant(x, 128) diff --git a/fastdeploy/model_executor/layers/moe/moe.py b/fastdeploy/model_executor/layers/moe/moe.py index 8b83aeccabe..73091bb80cf 100644 --- a/fastdeploy/model_executor/layers/moe/moe.py +++ b/fastdeploy/model_executor/layers/moe/moe.py @@ -26,6 +26,9 @@ from fastdeploy.model_executor.utils import slice_fn from fastdeploy.platforms import current_platform from fastdeploy.worker.experts_manager import RedundantExpertManger +from fastdeploy.model_executor.forward_meta import ForwardMeta +from fastdeploy.model_executor.layers.moe.routing_indices_cache import save_routing_to_buffer +from functools import partial try: from fastdeploy.model_executor.ops.gpu import noaux_tc, noaux_tc_redundant @@ -532,7 +535,7 @@ def load_state_dict(self, state_dict, is_rearrange: bool = False): else: self.quant_method.process_loaded_weights(self, state_dict) - def forward(self, x: paddle.Tensor, gate: nn.Layer): + def forward(self, x: paddle.Tensor, gate: nn.Layer, forward_meta: ForwardMeta): """ Defines the forward computation of the moe layer. @@ -543,5 +546,14 @@ def forward(self, x: paddle.Tensor, gate: nn.Layer): Tensor: Output tensor.s """ - out = self.quant_method.apply(self, x, gate) + topk_ids_hookfunc = partial( + save_routing_to_buffer, + routing_table_buffer=forward_meta.routing_table_buffer, + batch_id_per_token=forward_meta.batch_id_per_token, + seq_lens_decoder=forward_meta.seq_lens_decoder, + cu_seqlens_q=forward_meta.cu_seqlens_q, + layer_idx=self.layer_idx, + ) + + out = self.quant_method.apply(self, x, gate, topk_ids_hookfunc) return out diff --git a/fastdeploy/model_executor/layers/moe/routing_indices_cache.py b/fastdeploy/model_executor/layers/moe/routing_indices_cache.py new file mode 100644 index 00000000000..f36d729553f --- /dev/null +++ b/fastdeploy/model_executor/layers/moe/routing_indices_cache.py @@ -0,0 +1,147 @@ +""" +# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" + +import paddle +import triton +import triton.language as tl + + +@triton.jit +def _save_routing_kernel( + ROUTING_TABLE_BUFFER_PTR, + TOPK_IDS_PTR, + BATCH_ID_PER_TOKEN_PTR, + TOKEN_RELATIVE_INDICES_PTR, + # SEQ_LENS_ENCODER_PTR, + SEQ_LENS_DECODER_PTR, + # SEQ_LENS_THIS_TIME_PTR, + LAYER_IDX, + TOKEN_NUM, + TOP_K, + NUM_HIDDEN_LAYERS, + MAX_MODEL_LEN, + BLOCK_SIZE_M: tl.constexpr, + BLOCK_SIZE_K: tl.constexpr, +): + pid_m = tl.program_id(axis=0) + + token_offsets = pid_m * BLOCK_SIZE_M + tl.arange(0, BLOCK_SIZE_M) + token_mask = token_offsets < TOKEN_NUM + + k_offsets = tl.arange(0, BLOCK_SIZE_K) + topk_ids_ptrs = TOPK_IDS_PTR + token_offsets[:, None] * TOP_K + k_offsets[None, :] + # [BLOCK_SIZE_M, BLOCK_SIZE_K] + topk_vals = tl.load(topk_ids_ptrs, mask=token_mask[:, None]) + + batch_ids = tl.load(BATCH_ID_PER_TOKEN_PTR + token_offsets, mask=token_mask) + token_relative_index = tl.load(TOKEN_RELATIVE_INDICES_PTR + token_offsets, mask=token_mask) + len_decoder = tl.load(SEQ_LENS_DECODER_PTR + batch_ids, mask=token_mask) + + # [BLOCK_SIZE_M] + token_seq_pos = len_decoder + token_relative_index + + STRIDE_BUF_SEQ = NUM_HIDDEN_LAYERS * MAX_MODEL_LEN * TOP_K + STRIDE_BUF_LAYER = MAX_MODEL_LEN * TOP_K + STRIDE_BUF_TOKEN = TOP_K + + # [BLOCK_SIZE_M, BLOCK_SIZE_K] + output_ptrs = ( + ROUTING_TABLE_BUFFER_PTR + + batch_ids[:, None] * STRIDE_BUF_SEQ + + LAYER_IDX * STRIDE_BUF_LAYER + + token_seq_pos[:, None] * STRIDE_BUF_TOKEN + + k_offsets[None, :] + ) + + pos_mask = token_seq_pos < MAX_MODEL_LEN + final_mask = token_mask[:, None] & pos_mask[:, None] + + tl.store(output_ptrs, topk_vals, mask=final_mask) + + +def save_routing_to_buffer( + routing_table_buffer: paddle.Tensor, # [max_num_seqs, num_layers, max_len, top_k] + topk_ids: paddle.Tensor, # [token_num, top_k] + batch_id_per_token: paddle.Tensor, # [token_num] + # seq_lens_encoder: paddle.Tensor, # [max_num_seqs] + seq_lens_decoder: paddle.Tensor, # [max_num_seqs] + # seq_lens_this_time: paddle.Tensor, # [max_num_seqs] + cu_seqlens_q: paddle.Tensor, # [max_num_seqs + 1] + layer_idx: int, +): + token_num, top_k = topk_ids.shape + if token_num == 0: + return + + max_num_seqs, num_hidden_layers, max_model_len, _ = routing_table_buffer.shape + assert topk_ids.shape[1] == routing_table_buffer.shape[3] + assert batch_id_per_token.shape[0] == token_num + assert seq_lens_decoder.shape[0] == max_num_seqs + + token_indices = paddle.arange(token_num, dtype="int32") + # [0, 3, 4, 10, 12][0, 0, 0, 0, 2, 2, 2, 2, 2, 2, 3, 3] + # -> [0, 0, 0, 0, 4, 4, 4, 4, 4, 4, 10, 10] + # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11] - [0, 0, 0, 0, 4, 4, 4, 4, 4, 4, 10, 10] + # -> [0, 1, 2, 3, 0, 1, 2, 3, 4, 5, 0, 1] + token_relative_indices = token_indices - cu_seqlens_q.view([-1])[batch_id_per_token].view([-1]) + + BLOCK_SIZE_M = 128 + BLOCK_SIZE_K = top_k # 值一般很小,直接设为 top_k + + grid = (triton.cdiv(token_num, BLOCK_SIZE_M),) + _save_routing_kernel[grid]( + routing_table_buffer, + topk_ids, + batch_id_per_token, + token_relative_indices, + # seq_lens_encoder, + seq_lens_decoder, + # seq_lens_this_time, + LAYER_IDX=layer_idx, + TOKEN_NUM=token_num, + TOP_K=top_k, + NUM_HIDDEN_LAYERS=num_hidden_layers, + MAX_MODEL_LEN=max_model_len, + BLOCK_SIZE_M=BLOCK_SIZE_M, + BLOCK_SIZE_K=BLOCK_SIZE_K, + ) + + +# max_num_seqs = 4 +# num_layers = 1 +# max_len = 10 +# top_k = 8 +# token_num = 12 + +# routing_table_buffer = paddle.full([max_num_seqs, num_layers, max_len, top_k], -1, dtype="int32") +# topk_ids = paddle.randint(0, 384, [token_num, top_k], dtype="int32") +# batch_id_per_token = paddle.to_tensor([0, 0, 0, 0, 2, 2, 2, 2, 2, 2, 3, 3], dtype="int32").reshape([-1, 1]) +# # seq_lens_encoder = paddle.to_tensor([3, 1, 6, 2], dtype="int32").reshape([-1, 1]) +# seq_lens_decoder = paddle.to_tensor([0, 2, 0, 3], dtype="int32").reshape([-1, 1]) +# # seq_lens_this_time = paddle.to_tensor([3, 1, 6, 2], dtype="int32").reshape([-1, 1]) +# cu_seqlens_q = paddle.to_tensor([0, 3, 4, 10, 12], dtype="int32").reshape([-1, 1]) +# current_layer_idx = 0 + +# save_routing_to_buffer( +# routing_table_buffer=routing_table_buffer, +# topk_ids=topk_ids, +# batch_id_per_token=batch_id_per_token, +# # seq_lens_encoder=seq_lens_encoder, +# seq_lens_decoder=seq_lens_decoder, +# # seq_lens_this_time=seq_lens_this_time, +# cu_seqlens_q=cu_seqlens_q, +# layer_idx=current_layer_idx, +# ) From 2592a7b290b1ee4b6689b304263f103e87102a9c Mon Sep 17 00:00:00 2001 From: yuanlehome Date: Tue, 11 Nov 2025 20:24:43 +0800 Subject: [PATCH 02/17] update --- fastdeploy/envs.py | 1 + .../layers/moe/fused_moe_backend_base.py | 9 +++--- .../layers/moe/fused_moe_deepgemm_backend.py | 21 +++++++++---- fastdeploy/model_executor/layers/moe/moe.py | 30 +++++++++++-------- 4 files changed, 38 insertions(+), 23 deletions(-) diff --git a/fastdeploy/envs.py b/fastdeploy/envs.py index 782fef74164..5fb858a4845 100644 --- a/fastdeploy/envs.py +++ b/fastdeploy/envs.py @@ -115,6 +115,7 @@ "ENCODE_FEATURE_BOS_SK": lambda: os.getenv("ENCODE_FEATURE_BOS_SK"), "FD_ENABLE_E2W_TENSOR_CONVERT": lambda: int(os.getenv("FD_ENABLE_E2W_TENSOR_CONVERT", "0")), "FD_MAX_EXTRA_NUM_BATCHED_TOKENS": lambda: int(os.getenv("FD_MAX_EXTRA_NUM_BATCHED_TOKENS", "16384")), + "FD_ENABLE_ROLLOUT_ROUTING_REPLAY": lambda: bool(int(os.getenv("FD_ENABLE_ROLLOUT_ROUTING_REPLAY", "0"))), } diff --git a/fastdeploy/model_executor/layers/moe/fused_moe_backend_base.py b/fastdeploy/model_executor/layers/moe/fused_moe_backend_base.py index e532768ec62..27be88c87d6 100644 --- a/fastdeploy/model_executor/layers/moe/fused_moe_backend_base.py +++ b/fastdeploy/model_executor/layers/moe/fused_moe_backend_base.py @@ -15,6 +15,7 @@ """ from abc import abstractmethod +from typing import Callable import paddle from paddle import nn @@ -155,7 +156,7 @@ def apply( layer: nn.Layer, x: paddle.Tensor, gate: nn.Layer, - topk_ids_hookfunc, + topk_ids_hookfunc: Callable = None, ) -> paddle.Tensor: """ Paddle Cutlass compute Fused MoE. @@ -164,13 +165,13 @@ def apply( if layer.fd_config.parallel_config.moe_phase.phase == "prefill": if layer.fd_config.parallel_config.splitwise_role == "mixed" and layer.layer_idx == 0: self.ep_prefill_runner.clean_low_latency_buffer() - return self.apply_ep_prefill(layer, x, gate, topk_ids_hookfunc) + return self.apply_ep_prefill(layer, x, gate, topk_ids_hookfunc=topk_ids_hookfunc) else: if layer.fd_config.parallel_config.splitwise_role == "mixed" and layer.layer_idx == 0: self.ep_decoder_runner.clean_low_latency_buffer() - return self.apply_ep_decode(layer, x, gate, topk_ids_hookfunc) + return self.apply_ep_decode(layer, x, gate, topk_ids_hookfunc=topk_ids_hookfunc) else: - return self.apply_tp(layer, x, gate, topk_ids_hookfunc) + return self.apply_tp(layer, x, gate, topk_ids_hookfunc=topk_ids_hookfunc) class UnquantizedFusedMoEMethod(MoEMethodBase): diff --git a/fastdeploy/model_executor/layers/moe/fused_moe_deepgemm_backend.py b/fastdeploy/model_executor/layers/moe/fused_moe_deepgemm_backend.py index 44e4eb9fd7a..b23615fd43e 100644 --- a/fastdeploy/model_executor/layers/moe/fused_moe_deepgemm_backend.py +++ b/fastdeploy/model_executor/layers/moe/fused_moe_deepgemm_backend.py @@ -14,6 +14,8 @@ # limitations under the License. """ +from typing import Callable + import paddle from paddle import nn from paddleformers.utils.log import logger @@ -298,7 +300,7 @@ def apply_ep_prefill( layer: nn.Layer, x: paddle.Tensor, gate: nn.Layer, - topk_ids_hookfunc, + topk_ids_hookfunc: Callable = None, ) -> paddle.Tensor: """ Apply the EP prefill method. @@ -306,7 +308,10 @@ def apply_ep_prefill( gate_out = gate(x.cast("float32")) # 1. Select topk experts and weights topk_idx, topk_weights = self.ep_prefill_runner.moe_select(layer, gate_out) - topk_ids_hookfunc(topk_ids=topk_idx) + + if topk_ids_hookfunc is not None: + topk_ids_hookfunc(topk_ids=topk_idx) + # 2. Dynamic compute blockwise quantization scales x, x_scale_tensor = fastdeploy.model_executor.ops.gpu.per_token_quant( x, self.quant_config.weight_block_size[0] @@ -408,7 +413,7 @@ def apply_ep_decode( layer: nn.Layer, x: paddle.Tensor, gate: nn.Layer, - topk_ids_hookfunc, + topk_ids_hookfunc: Callable = None, ) -> paddle.Tensor: """ Apply the EP decoder method. @@ -416,7 +421,10 @@ def apply_ep_decode( gate_out = gate(x.cast("float32")) # 1. Select topk experts and weights topk_idx, topk_weights = self.ep_decoder_runner.moe_select(layer, gate_out) - topk_ids_hookfunc(topk_ids=topk_idx) + + if topk_ids_hookfunc is not None: + topk_ids_hookfunc(topk_ids=topk_idx) + # 2. EP Dispatch permute_input, token_nums_per_expert, handle = self.ep_decoder_runner.dispatch( x, topk_idx, topk_weights, use_fp8=True @@ -481,7 +489,7 @@ def apply_tp( layer: nn.Layer, x: paddle.Tensor, gate: nn.Layer, - topk_ids_hookfunc, + topk_ids_hookfunc: Callable = None, ) -> paddle.Tensor: """ Paddle Use DeepGemm compute Fused MoE. @@ -509,7 +517,8 @@ def apply_tp( False, ) - topk_ids_hookfunc(topk_ids=topk_ids) + if topk_ids_hookfunc is not None: + topk_ids_hookfunc(topk_ids=topk_ids) tmp = count_tokens_per_expert_func(topk_ids, layer.num_experts) diff --git a/fastdeploy/model_executor/layers/moe/moe.py b/fastdeploy/model_executor/layers/moe/moe.py index 73091bb80cf..6e00414efe5 100644 --- a/fastdeploy/model_executor/layers/moe/moe.py +++ b/fastdeploy/model_executor/layers/moe/moe.py @@ -14,6 +14,7 @@ # limitations under the License. """ +from functools import partial from typing import Optional import numpy as np @@ -22,13 +23,14 @@ from paddleformers.utils.log import logger from fastdeploy import envs +from fastdeploy.model_executor.forward_meta import ForwardMeta +from fastdeploy.model_executor.layers.moe.routing_indices_cache import ( + save_routing_to_buffer, +) from fastdeploy.model_executor.layers.utils import get_tensor from fastdeploy.model_executor.utils import slice_fn from fastdeploy.platforms import current_platform from fastdeploy.worker.experts_manager import RedundantExpertManger -from fastdeploy.model_executor.forward_meta import ForwardMeta -from fastdeploy.model_executor.layers.moe.routing_indices_cache import save_routing_to_buffer -from functools import partial try: from fastdeploy.model_executor.ops.gpu import noaux_tc, noaux_tc_redundant @@ -535,7 +537,7 @@ def load_state_dict(self, state_dict, is_rearrange: bool = False): else: self.quant_method.process_loaded_weights(self, state_dict) - def forward(self, x: paddle.Tensor, gate: nn.Layer, forward_meta: ForwardMeta): + def forward(self, x: paddle.Tensor, gate: nn.Layer, forward_meta: ForwardMeta = None): """ Defines the forward computation of the moe layer. @@ -546,14 +548,16 @@ def forward(self, x: paddle.Tensor, gate: nn.Layer, forward_meta: ForwardMeta): Tensor: Output tensor.s """ - topk_ids_hookfunc = partial( - save_routing_to_buffer, - routing_table_buffer=forward_meta.routing_table_buffer, - batch_id_per_token=forward_meta.batch_id_per_token, - seq_lens_decoder=forward_meta.seq_lens_decoder, - cu_seqlens_q=forward_meta.cu_seqlens_q, - layer_idx=self.layer_idx, - ) + topk_ids_hookfunc = None + if envs.FD_ENABLE_ROLLOUT_ROUTING_REPLAY: + topk_ids_hookfunc = partial( + save_routing_to_buffer, + routing_table_buffer=forward_meta.routing_table_buffer, + batch_id_per_token=forward_meta.batch_id_per_token, + seq_lens_decoder=forward_meta.seq_lens_decoder, + cu_seqlens_q=forward_meta.cu_seqlens_q, + layer_idx=self.layer_idx, + ) - out = self.quant_method.apply(self, x, gate, topk_ids_hookfunc) + out = self.quant_method.apply(self, x, gate, topk_ids_hookfunc=topk_ids_hookfunc) return out From fe3d14c16df57a3f3f81e3ef4268cf2ea0e799e1 Mon Sep 17 00:00:00 2001 From: yuanlehome Date: Wed, 12 Nov 2025 21:54:10 +0800 Subject: [PATCH 03/17] support tp>1&&ep>1 --- fastdeploy/model_executor/layers/moe/moe.py | 20 ++++---- .../layers/moe/routing_indices_cache.py | 46 +++++++++---------- 2 files changed, 34 insertions(+), 32 deletions(-) diff --git a/fastdeploy/model_executor/layers/moe/moe.py b/fastdeploy/model_executor/layers/moe/moe.py index 6e00414efe5..03dd0c88695 100644 --- a/fastdeploy/model_executor/layers/moe/moe.py +++ b/fastdeploy/model_executor/layers/moe/moe.py @@ -550,14 +550,18 @@ def forward(self, x: paddle.Tensor, gate: nn.Layer, forward_meta: ForwardMeta = """ topk_ids_hookfunc = None if envs.FD_ENABLE_ROLLOUT_ROUTING_REPLAY: - topk_ids_hookfunc = partial( - save_routing_to_buffer, - routing_table_buffer=forward_meta.routing_table_buffer, - batch_id_per_token=forward_meta.batch_id_per_token, - seq_lens_decoder=forward_meta.seq_lens_decoder, - cu_seqlens_q=forward_meta.cu_seqlens_q, - layer_idx=self.layer_idx, - ) + if forward_meta is not None: # forward_meta is None when execute empty_input_forward + topk_ids_hookfunc = partial( + save_routing_to_buffer, + routing_table_buffer=forward_meta.routing_table_buffer, + batch_id_per_token=forward_meta.batch_id_per_token, + seq_lens_decoder=forward_meta.seq_lens_decoder, + cu_seqlens_q=forward_meta.cu_seqlens_q, + layer_idx=self.layer_idx, + tp_size=self.fd_config.parallel_config.tensor_parallel_size, + ep_size=self.fd_config.parallel_config.expert_parallel_size, + tp_group=self.fd_config.parallel_config.tp_group, + ) out = self.quant_method.apply(self, x, gate, topk_ids_hookfunc=topk_ids_hookfunc) return out diff --git a/fastdeploy/model_executor/layers/moe/routing_indices_cache.py b/fastdeploy/model_executor/layers/moe/routing_indices_cache.py index f36d729553f..40d59c27204 100644 --- a/fastdeploy/model_executor/layers/moe/routing_indices_cache.py +++ b/fastdeploy/model_executor/layers/moe/routing_indices_cache.py @@ -15,6 +15,7 @@ """ import paddle +import paddle.distributed as dist import triton import triton.language as tl @@ -24,10 +25,8 @@ def _save_routing_kernel( ROUTING_TABLE_BUFFER_PTR, TOPK_IDS_PTR, BATCH_ID_PER_TOKEN_PTR, - TOKEN_RELATIVE_INDICES_PTR, - # SEQ_LENS_ENCODER_PTR, + CU_SEQLENS_Q_PTR, SEQ_LENS_DECODER_PTR, - # SEQ_LENS_THIS_TIME_PTR, LAYER_IDX, TOKEN_NUM, TOP_K, @@ -47,10 +46,15 @@ def _save_routing_kernel( topk_vals = tl.load(topk_ids_ptrs, mask=token_mask[:, None]) batch_ids = tl.load(BATCH_ID_PER_TOKEN_PTR + token_offsets, mask=token_mask) - token_relative_index = tl.load(TOKEN_RELATIVE_INDICES_PTR + token_offsets, mask=token_mask) - len_decoder = tl.load(SEQ_LENS_DECODER_PTR + batch_ids, mask=token_mask) + # [0, 3, 4, 10, 12][0, 0, 0, 0, 2, 2, 2, 2, 2, 2, 3, 3] + # -> [0, 0, 0, 0, 4, 4, 4, 4, 4, 4, 10, 10] + # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11] - [0, 0, 0, 0, 4, 4, 4, 4, 4, 4, 10, 10] + # -> [0, 1, 2, 3, 0, 1, 2, 3, 4, 5, 0, 1] + start_offsets = tl.load(CU_SEQLENS_Q_PTR + batch_ids, mask=token_mask) + token_relative_index = token_offsets - start_offsets # [BLOCK_SIZE_M] + len_decoder = tl.load(SEQ_LENS_DECODER_PTR + batch_ids, mask=token_mask) token_seq_pos = len_decoder + token_relative_index STRIDE_BUF_SEQ = NUM_HIDDEN_LAYERS * MAX_MODEL_LEN * TOP_K @@ -76,27 +80,27 @@ def save_routing_to_buffer( routing_table_buffer: paddle.Tensor, # [max_num_seqs, num_layers, max_len, top_k] topk_ids: paddle.Tensor, # [token_num, top_k] batch_id_per_token: paddle.Tensor, # [token_num] - # seq_lens_encoder: paddle.Tensor, # [max_num_seqs] seq_lens_decoder: paddle.Tensor, # [max_num_seqs] - # seq_lens_this_time: paddle.Tensor, # [max_num_seqs] cu_seqlens_q: paddle.Tensor, # [max_num_seqs + 1] layer_idx: int, + tp_size: int, + ep_size: int, + tp_group: dist.communication.group.Group, ): + if tp_size > 1 and ep_size > 1: + token_num_per_rank = topk_ids.shape[0] + topk_ids_all = paddle.zeros([token_num_per_rank * tp_size, topk_ids.shape[1]], dtype=topk_ids.dtype) + paddle.distributed.all_gather(topk_ids_all, topk_ids, tp_group) + topk_ids = topk_ids_all[:batch_id_per_token.shape[0], :] + token_num, top_k = topk_ids.shape if token_num == 0: return max_num_seqs, num_hidden_layers, max_model_len, _ = routing_table_buffer.shape - assert topk_ids.shape[1] == routing_table_buffer.shape[3] - assert batch_id_per_token.shape[0] == token_num - assert seq_lens_decoder.shape[0] == max_num_seqs - - token_indices = paddle.arange(token_num, dtype="int32") - # [0, 3, 4, 10, 12][0, 0, 0, 0, 2, 2, 2, 2, 2, 2, 3, 3] - # -> [0, 0, 0, 0, 4, 4, 4, 4, 4, 4, 10, 10] - # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11] - [0, 0, 0, 0, 4, 4, 4, 4, 4, 4, 10, 10] - # -> [0, 1, 2, 3, 0, 1, 2, 3, 4, 5, 0, 1] - token_relative_indices = token_indices - cu_seqlens_q.view([-1])[batch_id_per_token].view([-1]) + assert topk_ids.shape[1] == routing_table_buffer.shape[3], (topk_ids.shape[1], routing_table_buffer.shape[3]) + assert batch_id_per_token.shape[0] == token_num, (batch_id_per_token.shape[0], token_num) + assert seq_lens_decoder.shape[0] == max_num_seqs, (seq_lens_decoder.shape[0], max_num_seqs) BLOCK_SIZE_M = 128 BLOCK_SIZE_K = top_k # 值一般很小,直接设为 top_k @@ -106,10 +110,8 @@ def save_routing_to_buffer( routing_table_buffer, topk_ids, batch_id_per_token, - token_relative_indices, - # seq_lens_encoder, + cu_seqlens_q, seq_lens_decoder, - # seq_lens_this_time, LAYER_IDX=layer_idx, TOKEN_NUM=token_num, TOP_K=top_k, @@ -129,9 +131,7 @@ def save_routing_to_buffer( # routing_table_buffer = paddle.full([max_num_seqs, num_layers, max_len, top_k], -1, dtype="int32") # topk_ids = paddle.randint(0, 384, [token_num, top_k], dtype="int32") # batch_id_per_token = paddle.to_tensor([0, 0, 0, 0, 2, 2, 2, 2, 2, 2, 3, 3], dtype="int32").reshape([-1, 1]) -# # seq_lens_encoder = paddle.to_tensor([3, 1, 6, 2], dtype="int32").reshape([-1, 1]) # seq_lens_decoder = paddle.to_tensor([0, 2, 0, 3], dtype="int32").reshape([-1, 1]) -# # seq_lens_this_time = paddle.to_tensor([3, 1, 6, 2], dtype="int32").reshape([-1, 1]) # cu_seqlens_q = paddle.to_tensor([0, 3, 4, 10, 12], dtype="int32").reshape([-1, 1]) # current_layer_idx = 0 @@ -139,9 +139,7 @@ def save_routing_to_buffer( # routing_table_buffer=routing_table_buffer, # topk_ids=topk_ids, # batch_id_per_token=batch_id_per_token, -# # seq_lens_encoder=seq_lens_encoder, # seq_lens_decoder=seq_lens_decoder, -# # seq_lens_this_time=seq_lens_this_time, # cu_seqlens_q=cu_seqlens_q, # layer_idx=current_layer_idx, # ) From daa83d3d5583f115ddaee149b356b6c56cde9487 Mon Sep 17 00:00:00 2001 From: yuanlehome Date: Wed, 12 Nov 2025 22:39:37 +0800 Subject: [PATCH 04/17] support cudagraph padding --- .../layers/moe/routing_indices_cache.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/fastdeploy/model_executor/layers/moe/routing_indices_cache.py b/fastdeploy/model_executor/layers/moe/routing_indices_cache.py index 40d59c27204..d97886db772 100644 --- a/fastdeploy/model_executor/layers/moe/routing_indices_cache.py +++ b/fastdeploy/model_executor/layers/moe/routing_indices_cache.py @@ -46,15 +46,16 @@ def _save_routing_kernel( topk_vals = tl.load(topk_ids_ptrs, mask=token_mask[:, None]) batch_ids = tl.load(BATCH_ID_PER_TOKEN_PTR + token_offsets, mask=token_mask) + pad_mask = token_mask & (batch_ids != -1) # [0, 3, 4, 10, 12][0, 0, 0, 0, 2, 2, 2, 2, 2, 2, 3, 3] # -> [0, 0, 0, 0, 4, 4, 4, 4, 4, 4, 10, 10] # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11] - [0, 0, 0, 0, 4, 4, 4, 4, 4, 4, 10, 10] # -> [0, 1, 2, 3, 0, 1, 2, 3, 4, 5, 0, 1] - start_offsets = tl.load(CU_SEQLENS_Q_PTR + batch_ids, mask=token_mask) + start_offsets = tl.load(CU_SEQLENS_Q_PTR + batch_ids, mask=pad_mask) token_relative_index = token_offsets - start_offsets # [BLOCK_SIZE_M] - len_decoder = tl.load(SEQ_LENS_DECODER_PTR + batch_ids, mask=token_mask) + len_decoder = tl.load(SEQ_LENS_DECODER_PTR + batch_ids, mask=pad_mask) token_seq_pos = len_decoder + token_relative_index STRIDE_BUF_SEQ = NUM_HIDDEN_LAYERS * MAX_MODEL_LEN * TOP_K @@ -71,6 +72,7 @@ def _save_routing_kernel( ) pos_mask = token_seq_pos < MAX_MODEL_LEN + pos_mask = pos_mask & pad_mask final_mask = token_mask[:, None] & pos_mask[:, None] tl.store(output_ptrs, topk_vals, mask=final_mask) @@ -91,13 +93,11 @@ def save_routing_to_buffer( token_num_per_rank = topk_ids.shape[0] topk_ids_all = paddle.zeros([token_num_per_rank * tp_size, topk_ids.shape[1]], dtype=topk_ids.dtype) paddle.distributed.all_gather(topk_ids_all, topk_ids, tp_group) - topk_ids = topk_ids_all[:batch_id_per_token.shape[0], :] + topk_ids = topk_ids_all[: batch_id_per_token.shape[0], :] token_num, top_k = topk_ids.shape - if token_num == 0: - return - max_num_seqs, num_hidden_layers, max_model_len, _ = routing_table_buffer.shape + assert token_num > 0 assert topk_ids.shape[1] == routing_table_buffer.shape[3], (topk_ids.shape[1], routing_table_buffer.shape[3]) assert batch_id_per_token.shape[0] == token_num, (batch_id_per_token.shape[0], token_num) assert seq_lens_decoder.shape[0] == max_num_seqs, (seq_lens_decoder.shape[0], max_num_seqs) From 7007e45a55c92192b504d7315f862e89a158bc5d Mon Sep 17 00:00:00 2001 From: yuanlehome Date: Thu, 13 Nov 2025 16:10:44 +0800 Subject: [PATCH 05/17] support all backends --- .../backends/dcu/fused_moe_triton_backends.py | 6 ++++++ .../gcu/moe/fused_moe_method_gcu_backend.py | 6 ++++++ .../metax/moe/fused_moe_triton_metax_backend.py | 7 +++++++ .../layers/moe/fused_moe_backend_base.py | 2 ++ .../layers/moe/fused_moe_cutlass_backend.py | 16 ++++++++++++++++ .../layers/moe/fused_moe_marlin_backend.py | 6 ++++++ .../layers/moe/fused_moe_triton_backend.py | 14 ++++++++++++++ .../layers/moe/fused_moe_wint2_backend.py | 10 ++++++++++ .../layers/moe/fused_moe_xpu_backend.py | 6 ++++++ 9 files changed, 73 insertions(+) diff --git a/fastdeploy/model_executor/layers/backends/dcu/fused_moe_triton_backends.py b/fastdeploy/model_executor/layers/backends/dcu/fused_moe_triton_backends.py index 0038ed14973..43e0bf8c756 100644 --- a/fastdeploy/model_executor/layers/backends/dcu/fused_moe_triton_backends.py +++ b/fastdeploy/model_executor/layers/backends/dcu/fused_moe_triton_backends.py @@ -14,6 +14,8 @@ # limitations under the License. """ +from typing import Callable + import paddle from paddle import nn @@ -102,6 +104,7 @@ def apply( layer: nn.Layer, x: paddle.Tensor, gate: nn.Layer, + topk_ids_hookfunc: Callable = None, ) -> paddle.Tensor: """ Triton compute Fused MoE. @@ -119,6 +122,9 @@ def apply( topk_weights, topk_ids = paddle.topk(scores, k=top_k, axis=-1, sorted=False) topk_weights = topk_weights / topk_weights.sum(axis=-1, keepdim=True) + if topk_ids_hookfunc is not None: + topk_ids_hookfunc(topk_ids=topk_ids) + intermediate_cache1 = paddle.empty( [token_num * top_k, moe_intermediate_size * 2], dtype=x.dtype, diff --git a/fastdeploy/model_executor/layers/backends/gcu/moe/fused_moe_method_gcu_backend.py b/fastdeploy/model_executor/layers/backends/gcu/moe/fused_moe_method_gcu_backend.py index c899cafc792..019416ec696 100644 --- a/fastdeploy/model_executor/layers/backends/gcu/moe/fused_moe_method_gcu_backend.py +++ b/fastdeploy/model_executor/layers/backends/gcu/moe/fused_moe_method_gcu_backend.py @@ -16,6 +16,7 @@ import multiprocessing import os +from typing import Callable import numpy as np import paddle @@ -189,6 +190,7 @@ def apply( layer: nn.Layer, x: paddle.Tensor, gate: nn.Layer, + topk_ids_hookfunc: Callable = None, ) -> paddle.Tensor: """ Paddle gcu compute Fused MoE. @@ -201,6 +203,7 @@ def apply_ep_prefill( layer: nn.Layer, x: paddle.Tensor, gate: nn.Layer, + topk_ids_hookfunc: Callable = None, ) -> paddle.Tensor: """ Apply the EP prefill method. @@ -212,6 +215,7 @@ def apply_ep_decode( layer: nn.Layer, x: paddle.Tensor, gate: nn.Layer, + topk_ids_hookfunc: Callable = None, ) -> paddle.Tensor: """ Apply the EP decoder method. @@ -223,6 +227,7 @@ def apply_tp( layer: nn.Layer, x: paddle.Tensor, gate: nn.Layer, + topk_ids_hookfunc: Callable = None, ) -> paddle.Tensor: """ Paddle Cutlass compute Fused MoE. @@ -388,6 +393,7 @@ def apply( layer: nn.Layer, x: paddle.Tensor, gate: nn.Layer, + topk_ids_hookfunc: Callable = None, ) -> paddle.Tensor: """ Paddle gcu compute Fused MoE. diff --git a/fastdeploy/model_executor/layers/backends/metax/moe/fused_moe_triton_metax_backend.py b/fastdeploy/model_executor/layers/backends/metax/moe/fused_moe_triton_metax_backend.py index e945a189a09..78ac8982d06 100644 --- a/fastdeploy/model_executor/layers/backends/metax/moe/fused_moe_triton_metax_backend.py +++ b/fastdeploy/model_executor/layers/backends/metax/moe/fused_moe_triton_metax_backend.py @@ -14,6 +14,8 @@ # limitations under the License. """ +from typing import Callable + import paddle from paddle import nn @@ -132,6 +134,7 @@ def apply( layer: nn.Layer, x: paddle.Tensor, gate: nn.Layer, + topk_ids_hookfunc: Callable = None, ) -> paddle.Tensor: """ Triton compute Fused MoE. @@ -151,6 +154,10 @@ def apply( True, # apply_norm_weight, False, ) + + if topk_ids_hookfunc is not None: + topk_ids_hookfunc(topk_ids=topk_ids) + up_gate_proj_out = paddle.empty( [token_num * top_k, moe_intermediate_size * 2], dtype=x.dtype, diff --git a/fastdeploy/model_executor/layers/moe/fused_moe_backend_base.py b/fastdeploy/model_executor/layers/moe/fused_moe_backend_base.py index 27be88c87d6..0c5bccc3ddf 100644 --- a/fastdeploy/model_executor/layers/moe/fused_moe_backend_base.py +++ b/fastdeploy/model_executor/layers/moe/fused_moe_backend_base.py @@ -121,6 +121,7 @@ def apply_ep_prefill( layer: nn.Layer, x: paddle.Tensor, gate: nn.Layer, + topk_ids_hookfunc: Callable = None, ) -> paddle.Tensor: """ Apply the EP prefill method. @@ -145,6 +146,7 @@ def apply_tp( layer: nn.Layer, x: paddle.Tensor, gate: nn.Layer, + topk_ids_hookfunc: Callable = None, ) -> paddle.Tensor: """ Paddle Cutlass compute Fused MoE. diff --git a/fastdeploy/model_executor/layers/moe/fused_moe_cutlass_backend.py b/fastdeploy/model_executor/layers/moe/fused_moe_cutlass_backend.py index 9dd5c9984a9..f0623bcc991 100644 --- a/fastdeploy/model_executor/layers/moe/fused_moe_cutlass_backend.py +++ b/fastdeploy/model_executor/layers/moe/fused_moe_cutlass_backend.py @@ -14,6 +14,8 @@ # limitations under the License. """ +from typing import Callable + import paddle from paddle import nn from paddle.nn.quant import weight_quantize @@ -105,6 +107,7 @@ def apply_ep_prefill( layer: nn.Layer, x: paddle.Tensor, gate: nn.Layer, + topk_ids_hookfunc: Callable = None, ) -> paddle.Tensor: """ Apply the EP prefill method. @@ -121,6 +124,10 @@ def apply_ep_prefill( handle, _, ) = self.ep_prefill_runner.dispatch(x, topk_idx, topk_weights) + + if topk_ids_hookfunc is not None: + topk_ids_hookfunc(topk_ids=topk_idx) + token_all_num = sum(recv_num_tokens_per_expert_list) # 3. Compute ffn @@ -178,6 +185,7 @@ def apply_ep_decode( layer: nn.Layer, x: paddle.Tensor, gate: nn.Layer, + topk_ids_hookfunc: Callable = None, ) -> paddle.Tensor: """ Apply the EP decoder method. @@ -186,6 +194,10 @@ def apply_ep_decode( estimate_total_token_nums = gate_out.shape[0] * layer.top_k # 1. Select topk experts and weights topk_idx, topk_weights = self.ep_decoder_runner.moe_select(layer, gate_out) + + if topk_ids_hookfunc is not None: + topk_ids_hookfunc(topk_ids=topk_idx) + expertwise_scale = None if hasattr(layer, "up_gate_proj_in_scale_all_experts"): # only use in w4a8 expertwise_scale = getattr(layer, "up_gate_proj_in_scale_all_experts", None) @@ -220,6 +232,7 @@ def apply_tp( layer: nn.Layer, x: paddle.Tensor, gate: nn.Layer, + topk_ids_hookfunc: Callable = None, ) -> paddle.Tensor: """ Paddle Cutlass compute Fused MoE. @@ -277,6 +290,9 @@ def apply_tp( topk_only_mode=False, ) + if topk_ids_hookfunc is not None: + topk_ids_hookfunc(topk_ids=topk_idx) + if self.moe_quant_type != "w4a8" and self.moe_quant_type != "w4afp8": # only w4a8 need expert_idx_per_token # Other need not this tensor, so we make it None. diff --git a/fastdeploy/model_executor/layers/moe/fused_moe_marlin_backend.py b/fastdeploy/model_executor/layers/moe/fused_moe_marlin_backend.py index b3aa306e90e..062c4123dd7 100644 --- a/fastdeploy/model_executor/layers/moe/fused_moe_marlin_backend.py +++ b/fastdeploy/model_executor/layers/moe/fused_moe_marlin_backend.py @@ -14,6 +14,8 @@ # limitations under the License. """ +from typing import Callable + import paddle from paddle import nn @@ -240,6 +242,7 @@ def apply( layer: nn.Layer, x: paddle.Tensor, gate: nn.Layer, + topk_ids_hookfunc: Callable = None, ) -> paddle.Tensor: """ Marlin compute Fused MoE. @@ -275,6 +278,9 @@ def apply( False, ) + if topk_ids_hookfunc is not None: + topk_ids_hookfunc(topk_ids=topk_ids) + block_size_m = 64 for m in [8, 16, 32, 48, 64]: diff --git a/fastdeploy/model_executor/layers/moe/fused_moe_triton_backend.py b/fastdeploy/model_executor/layers/moe/fused_moe_triton_backend.py index 735662642c0..170557c8d9b 100644 --- a/fastdeploy/model_executor/layers/moe/fused_moe_triton_backend.py +++ b/fastdeploy/model_executor/layers/moe/fused_moe_triton_backend.py @@ -14,6 +14,8 @@ # limitations under the License. """ +from typing import Callable + import paddle from paddle import nn @@ -156,6 +158,7 @@ def apply( layer: nn.Layer, x: paddle.Tensor, gate: nn.Layer, + topk_ids_hookfunc: Callable = None, ) -> paddle.Tensor: """ Triton compute Fused MoE. @@ -186,6 +189,9 @@ def apply( False, ) + if topk_ids_hookfunc is not None: + topk_ids_hookfunc(topk_ids=topk_ids) + up_gate_proj_out = paddle.empty( [token_num * top_k, moe_intermediate_size * 2], dtype=x.dtype, @@ -420,6 +426,7 @@ def apply( layer: nn.Layer, x: paddle.Tensor, gate: nn.Layer, + topk_ids_hookfunc: Callable = None, ) -> paddle.Tensor: """ Triton compute Fused MoE. @@ -451,6 +458,9 @@ def apply( False, ) + if topk_ids_hookfunc is not None: + topk_ids_hookfunc(topk_ids=topk_ids) + up_gate_proj_out = paddle.empty( [token_num * top_k, moe_intermediate_size * 2], dtype=x.dtype, @@ -840,6 +850,7 @@ def apply( layer: nn.Layer, x: paddle.Tensor, gate: nn.Layer, + topk_ids_hookfunc: Callable = None, ) -> paddle.Tensor: """ Triton compute Fused MoE. @@ -871,6 +882,9 @@ def apply( False, ) + if topk_ids_hookfunc is not None: + topk_ids_hookfunc(topk_ids=topk_ids) + config = { "BLOCK_SIZE_M": 64, "BLOCK_SIZE_N": self.quant_config.weight_block_size[1], diff --git a/fastdeploy/model_executor/layers/moe/fused_moe_wint2_backend.py b/fastdeploy/model_executor/layers/moe/fused_moe_wint2_backend.py index f9f717d3139..3c22d99b4c0 100644 --- a/fastdeploy/model_executor/layers/moe/fused_moe_wint2_backend.py +++ b/fastdeploy/model_executor/layers/moe/fused_moe_wint2_backend.py @@ -14,6 +14,8 @@ # limitations under the License. """ +from typing import Callable + import paddle from paddle import nn @@ -261,6 +263,7 @@ def apply( layer: nn.Layer, x: paddle.Tensor, gate: nn.Layer, + topk_ids_hookfunc: Callable = None, ) -> paddle.Tensor: """ Use Wint2 Triton Fusedmoe compute Fused MoE. @@ -288,6 +291,9 @@ def apply( topk_only_mode=False, ) + if topk_ids_hookfunc is not None: + topk_ids_hookfunc(topk_ids=topk_idx) + ffn_out = fastdeploy.model_executor.ops.gpu.moe_expert_ffn_wint2( permute_input, token_nums_per_expert, @@ -333,6 +339,7 @@ def apply( layer: nn.Layer, x: paddle.Tensor, gate: nn.Layer, + topk_ids_hookfunc: Callable = None, ) -> paddle.Tensor: """ Use Wint2 Triton Fusedmoe compute Fused MoE. @@ -348,6 +355,9 @@ def apply( False, ) + if topk_ids_hookfunc is not None: + topk_ids_hookfunc(topk_ids=topk_ids) + num_tokens, K = x.shape E, _, N = layer.up_gate_proj_weight.shape M = num_tokens diff --git a/fastdeploy/model_executor/layers/moe/fused_moe_xpu_backend.py b/fastdeploy/model_executor/layers/moe/fused_moe_xpu_backend.py index b83cce96d21..c860e2ad443 100644 --- a/fastdeploy/model_executor/layers/moe/fused_moe_xpu_backend.py +++ b/fastdeploy/model_executor/layers/moe/fused_moe_xpu_backend.py @@ -14,6 +14,8 @@ # limitations under the License. """ +from typing import Callable + import paddle from paddle import nn @@ -47,6 +49,7 @@ def apply_tp( layer: nn.Layer, x: paddle.Tensor, gate: nn.Layer, + topk_ids_hookfunc: Callable = None, ) -> paddle.Tensor: """ Paddle Cutlass compute Fused MoE. @@ -82,6 +85,7 @@ def apply_ep_prefill( layer: nn.Layer, x: paddle.Tensor, gate: nn.Layer, + topk_ids_hookfunc: Callable = None, ) -> paddle.Tensor: """ Apply the EP prefill method. @@ -93,6 +97,7 @@ def apply_ep_decode( layer: nn.Layer, x: paddle.Tensor, gate: nn.Layer, + topk_ids_hookfunc: Callable = None, ) -> paddle.Tensor: """ Apply the EP decoder method. @@ -227,6 +232,7 @@ def apply( layer: nn.Layer, x: paddle.Tensor, gate: nn.Layer, + topk_ids_hookfunc: Callable = None, ) -> paddle.Tensor: """ XPU compute Fused MoE. From 7fa66904e58577afb8c9d3a42db01a43bd0167b3 Mon Sep 17 00:00:00 2001 From: yuanlehome Date: Thu, 13 Nov 2025 17:25:00 +0800 Subject: [PATCH 06/17] replace env with options --- fastdeploy/config.py | 2 ++ fastdeploy/engine/args_utils.py | 12 ++++++++++++ fastdeploy/engine/engine.py | 1 + fastdeploy/envs.py | 1 - fastdeploy/model_executor/layers/moe/moe.py | 5 +++-- .../layers/moe/routing_indices_cache.py | 6 +++--- fastdeploy/rl/rollout_config.py | 2 ++ fastdeploy/worker/worker_process.py | 7 +++++++ 8 files changed, 30 insertions(+), 6 deletions(-) diff --git a/fastdeploy/config.py b/fastdeploy/config.py index fb838d04540..665bc28e105 100644 --- a/fastdeploy/config.py +++ b/fastdeploy/config.py @@ -1200,6 +1200,7 @@ def __init__( test_mode=False, enable_attention_dp_balance: bool = False, attention_dp_time_out_iters: int = 0, + enable_rollout_routing_replay: bool = False, ): self.model_config: ModelConfig = model_config # type: ignore self.cache_config: CacheConfig = cache_config # type: ignore @@ -1217,6 +1218,7 @@ def __init__( self.moba_attention_config: Optional[MobaAttentionConfig] = moba_attention_config self.enable_attention_dp_balance = enable_attention_dp_balance self.attention_dp_time_out_iters = attention_dp_time_out_iters + self.enable_rollout_routing_replay = enable_rollout_routing_replay # Initialize cuda graph capture list max_capture_shape = self.parallel_config.max_num_seqs if self.speculative_config is not None and self.speculative_config.method == "mtp": diff --git a/fastdeploy/engine/args_utils.py b/fastdeploy/engine/args_utils.py index e41154cd0fd..f1558740ef3 100644 --- a/fastdeploy/engine/args_utils.py +++ b/fastdeploy/engine/args_utils.py @@ -407,6 +407,11 @@ class EngineArgs: Configuration for eplb. """ + enable_rollout_routing_replay: bool = False + """ + Flag to enable rollout routing replay(r3) + """ + def __post_init__(self): """ Post-initialization processing to set default tokenizer if not provided. @@ -715,6 +720,12 @@ def add_cli_args(parser: FlexibleArgumentParser) -> FlexibleArgumentParser: default=EngineArgs.eplb_config, help="Config of eplb.", ) + parallel_group.add_argument( + "--enable-rollout-routing-replay", + action="store_true", + default=EngineArgs.enable_rollout_routing_replay, + help="Flag to enable rollout routing replay(r3).", + ) # Load group load_group = parser.add_argument_group("Load Configuration") @@ -1141,4 +1152,5 @@ def create_engine_config(self, port_availability_check: bool = True) -> FDConfig early_stop_config=early_stop_cfg, enable_attention_dp_balance=self.enable_attention_dp_balance, attention_dp_time_out_iters=self.attention_dp_time_out_iters, + enable_rollout_routing_replay=self.enable_rollout_routing_replay, ) diff --git a/fastdeploy/engine/engine.py b/fastdeploy/engine/engine.py index b26337da3ef..fd08a62442a 100644 --- a/fastdeploy/engine/engine.py +++ b/fastdeploy/engine/engine.py @@ -477,6 +477,7 @@ def _start_worker_service(self): "enable_logprob": self.cfg.model_config.enable_logprob, "lm_head_fp32": self.cfg.model_config.lm_head_fp32, "enable_attention_dp_balance": self.cfg.enable_attention_dp_balance, + "enable_rollout_routing_replay": self.cfg.enable_rollout_routing_replay, } for worker_flag, value in worker_append_flag.items(): if value: diff --git a/fastdeploy/envs.py b/fastdeploy/envs.py index 5fb858a4845..782fef74164 100644 --- a/fastdeploy/envs.py +++ b/fastdeploy/envs.py @@ -115,7 +115,6 @@ "ENCODE_FEATURE_BOS_SK": lambda: os.getenv("ENCODE_FEATURE_BOS_SK"), "FD_ENABLE_E2W_TENSOR_CONVERT": lambda: int(os.getenv("FD_ENABLE_E2W_TENSOR_CONVERT", "0")), "FD_MAX_EXTRA_NUM_BATCHED_TOKENS": lambda: int(os.getenv("FD_MAX_EXTRA_NUM_BATCHED_TOKENS", "16384")), - "FD_ENABLE_ROLLOUT_ROUTING_REPLAY": lambda: bool(int(os.getenv("FD_ENABLE_ROLLOUT_ROUTING_REPLAY", "0"))), } diff --git a/fastdeploy/model_executor/layers/moe/moe.py b/fastdeploy/model_executor/layers/moe/moe.py index 03dd0c88695..c9054aac18a 100644 --- a/fastdeploy/model_executor/layers/moe/moe.py +++ b/fastdeploy/model_executor/layers/moe/moe.py @@ -200,6 +200,7 @@ def __init__( if self.ep_size > 1: self.quant_method.init_ep(self) + self.enable_rollout_routing_replay = fd_config.enable_rollout_routing_replay # Merge normal and RL build model if gate_correction_bias is not None: self.gate_correction_bias = gate_correction_bias @@ -549,8 +550,8 @@ def forward(self, x: paddle.Tensor, gate: nn.Layer, forward_meta: ForwardMeta = """ topk_ids_hookfunc = None - if envs.FD_ENABLE_ROLLOUT_ROUTING_REPLAY: - if forward_meta is not None: # forward_meta is None when execute empty_input_forward + if self.enable_rollout_routing_replay: + if forward_meta is not None: # forward_meta is None when execute empty_input_forward topk_ids_hookfunc = partial( save_routing_to_buffer, routing_table_buffer=forward_meta.routing_table_buffer, diff --git a/fastdeploy/model_executor/layers/moe/routing_indices_cache.py b/fastdeploy/model_executor/layers/moe/routing_indices_cache.py index d97886db772..e6d84d2cd0c 100644 --- a/fastdeploy/model_executor/layers/moe/routing_indices_cache.py +++ b/fastdeploy/model_executor/layers/moe/routing_indices_cache.py @@ -81,9 +81,9 @@ def _save_routing_kernel( def save_routing_to_buffer( routing_table_buffer: paddle.Tensor, # [max_num_seqs, num_layers, max_len, top_k] topk_ids: paddle.Tensor, # [token_num, top_k] - batch_id_per_token: paddle.Tensor, # [token_num] - seq_lens_decoder: paddle.Tensor, # [max_num_seqs] - cu_seqlens_q: paddle.Tensor, # [max_num_seqs + 1] + batch_id_per_token: paddle.Tensor, # [token_num, 1] + seq_lens_decoder: paddle.Tensor, # [max_num_seqs, 1] + cu_seqlens_q: paddle.Tensor, # [max_num_seqs + 1, 1] layer_idx: int, tp_size: int, ep_size: int, diff --git a/fastdeploy/rl/rollout_config.py b/fastdeploy/rl/rollout_config.py index 940d378215a..8d1b1ce6d49 100644 --- a/fastdeploy/rl/rollout_config.py +++ b/fastdeploy/rl/rollout_config.py @@ -67,6 +67,7 @@ def __init__( enable_attention_dp_balance: bool = False, attention_dp_time_out_iters: int = 0, eplb_config: str = {}, + enable_rollout_routing_replay: bool = False, ): # Required parameters self.model = model_name_or_path @@ -117,6 +118,7 @@ def __init__( self.enable_attention_dp_balance = enable_attention_dp_balance self.attention_dp_time_out_iters = attention_dp_time_out_iters self.eplb_config = eplb_config + self.enable_rollout_routing_replay = enable_rollout_routing_replay def __str__(self): return "\n".join(f"{k}: {v}" for k, v in self.__dict__.items()) diff --git a/fastdeploy/worker/worker_process.py b/fastdeploy/worker/worker_process.py index 84ed49948e2..4d72a49295d 100644 --- a/fastdeploy/worker/worker_process.py +++ b/fastdeploy/worker/worker_process.py @@ -840,6 +840,12 @@ def parse_args(): help="EPLB Configuration.", ) + parser.add_argument( + "--enable_rollout_routing_replay", + action="store_true", + help="enable rollout routing replay(r3)", + ) + args = parser.parse_args() return args @@ -998,6 +1004,7 @@ def initialize_fd_config(args, ranks: int = 1, local_rank: int = 0) -> FDConfig: enable_attention_dp_balance=args.enable_attention_dp_balance, attention_dp_time_out_iters=args.attention_dp_time_out_iters, eplb_config=eplb_config, + enable_rollout_routing_replay=args.enable_rollout_routing_replay, ) update_fd_config_for_mm(fd_config) From 9c9bb0279edd3abde48b12ca323e93e799b0c2bd Mon Sep 17 00:00:00 2001 From: yuanlehome Date: Thu, 13 Nov 2025 22:34:15 +0800 Subject: [PATCH 07/17] modularize --- .../layers/moe/routing_indices_cache.py | 75 +++++++++++++++++++ 1 file changed, 75 insertions(+) diff --git a/fastdeploy/model_executor/layers/moe/routing_indices_cache.py b/fastdeploy/model_executor/layers/moe/routing_indices_cache.py index e6d84d2cd0c..bea7eca1d0a 100644 --- a/fastdeploy/model_executor/layers/moe/routing_indices_cache.py +++ b/fastdeploy/model_executor/layers/moe/routing_indices_cache.py @@ -14,11 +14,17 @@ # limitations under the License. """ +import copy +import os +from typing import Dict + import paddle import paddle.distributed as dist import triton import triton.language as tl +from fastdeploy.config import FDConfig + @triton.jit def _save_routing_kernel( @@ -143,3 +149,72 @@ def save_routing_to_buffer( # cu_seqlens_q=cu_seqlens_q, # layer_idx=current_layer_idx, # ) + + +class RoutingReplayManager: + def __init__( + self, + fd_config: FDConfig, + output_dir: str = "./routing_replay_output", + ): + self.max_num_seqs = fd_config.parallel_config.max_num_seqs + self.max_model_len = fd_config.model_config.max_model_len + self.num_moe_layers = fd_config.model_config.num_hidden_layers - fd_config.model_config.moe_layer_start_index + self.moe_top_k = fd_config.model_config.moe_k + self.tp_rank = fd_config.parallel_config.tensor_parallel_rank + + self.output_dir = output_dir + + self.routing_batch_to_request: Dict[int, str] = {} + + self.routing_table_buffer = paddle.full( + shape=[self.max_num_seqs, self.num_moe_layers, self.max_model_len, self.moe_top_k], + fill_value=-1, + dtype="int32", + ) + + def register_request(self, batch_id: int, request_id: str): + if batch_id in self.routing_batch_to_request: + pre_request_id = self.deregister_request(batch_id) + self.save_routing_to_file(batch_id, pre_request_id) + + self.routing_batch_to_request[batch_id] = request_id + + def deregister_request(self, batch_id: int) -> str: + assert batch_id in self.routing_batch_to_request + return self.routing_batch_to_request.pop(batch_id) + + def get_buffer(self) -> paddle.Tensor: + return self.routing_table_buffer + + def clear_buffer_slot(self, batch_id: int): + assert 0 <= batch_id < self.max_num_seqs + self.routing_table_buffer[batch_id].fill_(-1) + + def clear_buffer(self): + self.routing_table_buffer.fill_(-1) + + def save_routing_to_file( + self, + batch_id: int, + request_id: str, + ): + if self.tp_rank == 0: + dir_path = os.path.join(self.output_dir, f"{request_id}") + os.makedirs(dir_path, exist_ok=True) + batch_buffer = self.routing_table_buffer[batch_id] + for layer_id in range(self.num_moe_layers): + layer_buffer = batch_buffer[layer_id] + print(f"{layer_id=}, {layer_buffer=}") + file_path = os.path.join( + dir_path, f"layer_{layer_id}_shape_{self.max_model_len}x{self.moe_top_k}.pdtensor" + ) + paddle.save(layer_buffer, file_path) + + self.clear_buffer_slot(batch_id) + + def save_tail_routing(self): + batch_ids = copy.deepcopy(list(self.routing_batch_to_request.keys())) + for batch_id in batch_ids: + request_id = self.deregister_request(batch_id) + self.save_routing_to_file(batch_id, request_id) From d8bc82e648d5fa982556e011fa484922ad93a65e Mon Sep 17 00:00:00 2001 From: yuanlehome Date: Thu, 13 Nov 2025 22:42:29 +0800 Subject: [PATCH 08/17] update --- .../layers/moe/routing_indices_cache.py | 38 +++++++++---------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/fastdeploy/model_executor/layers/moe/routing_indices_cache.py b/fastdeploy/model_executor/layers/moe/routing_indices_cache.py index bea7eca1d0a..7bc0d35ee70 100644 --- a/fastdeploy/model_executor/layers/moe/routing_indices_cache.py +++ b/fastdeploy/model_executor/layers/moe/routing_indices_cache.py @@ -173,28 +173,15 @@ def __init__( dtype="int32", ) - def register_request(self, batch_id: int, request_id: str): - if batch_id in self.routing_batch_to_request: - pre_request_id = self.deregister_request(batch_id) - self.save_routing_to_file(batch_id, pre_request_id) - - self.routing_batch_to_request[batch_id] = request_id - - def deregister_request(self, batch_id: int) -> str: + def _deregister_request(self, batch_id: int) -> str: assert batch_id in self.routing_batch_to_request return self.routing_batch_to_request.pop(batch_id) - def get_buffer(self) -> paddle.Tensor: - return self.routing_table_buffer - - def clear_buffer_slot(self, batch_id: int): + def _clear_buffer_slot(self, batch_id: int): assert 0 <= batch_id < self.max_num_seqs self.routing_table_buffer[batch_id].fill_(-1) - def clear_buffer(self): - self.routing_table_buffer.fill_(-1) - - def save_routing_to_file( + def _save_routing_to_file( self, batch_id: int, request_id: str, @@ -211,10 +198,23 @@ def save_routing_to_file( ) paddle.save(layer_buffer, file_path) - self.clear_buffer_slot(batch_id) + self._clear_buffer_slot(batch_id) + + def clear_buffer(self): + self.routing_table_buffer.fill_(-1) + + def register_request(self, batch_id: int, request_id: str): + if batch_id in self.routing_batch_to_request: + pre_request_id = self._deregister_request(batch_id) + self._save_routing_to_file(batch_id, pre_request_id) + + self.routing_batch_to_request[batch_id] = request_id + + def get_buffer(self) -> paddle.Tensor: + return self.routing_table_buffer def save_tail_routing(self): batch_ids = copy.deepcopy(list(self.routing_batch_to_request.keys())) for batch_id in batch_ids: - request_id = self.deregister_request(batch_id) - self.save_routing_to_file(batch_id, request_id) + request_id = self._deregister_request(batch_id) + self._save_routing_to_file(batch_id, request_id) From 60b69e95683c3bcc65849d57d6dcecdc112d6d5d Mon Sep 17 00:00:00 2001 From: gongshaotian Date: Tue, 18 Nov 2025 21:00:41 +0800 Subject: [PATCH 09/17] Add RoutingStore and refine code --- fastdeploy/model_executor/forward_meta.py | 2 +- fastdeploy/model_executor/layers/moe/moe.py | 2 +- .../layers/moe/routing_indices_cache.py | 235 +++++++++++++----- 3 files changed, 174 insertions(+), 65 deletions(-) diff --git a/fastdeploy/model_executor/forward_meta.py b/fastdeploy/model_executor/forward_meta.py index f73b6c3caf2..bead52c9d18 100644 --- a/fastdeploy/model_executor/forward_meta.py +++ b/fastdeploy/model_executor/forward_meta.py @@ -111,7 +111,7 @@ class ForwardMeta: # KV caches caches: Optional[list[paddle.Tensor]] = None # Routing Replay table buffer - routing_table_buffer: Optional[paddle.Tensor] = None + routing_repaly_table: Optional[paddle.Tensor] = None def clear_caches(self): """Safely clean up the caches""" diff --git a/fastdeploy/model_executor/layers/moe/moe.py b/fastdeploy/model_executor/layers/moe/moe.py index c9054aac18a..d8d4ac286c8 100644 --- a/fastdeploy/model_executor/layers/moe/moe.py +++ b/fastdeploy/model_executor/layers/moe/moe.py @@ -554,7 +554,7 @@ def forward(self, x: paddle.Tensor, gate: nn.Layer, forward_meta: ForwardMeta = if forward_meta is not None: # forward_meta is None when execute empty_input_forward topk_ids_hookfunc = partial( save_routing_to_buffer, - routing_table_buffer=forward_meta.routing_table_buffer, + routing_repaly_table=forward_meta.routing_repaly_table, batch_id_per_token=forward_meta.batch_id_per_token, seq_lens_decoder=forward_meta.seq_lens_decoder, cu_seqlens_q=forward_meta.cu_seqlens_q, diff --git a/fastdeploy/model_executor/layers/moe/routing_indices_cache.py b/fastdeploy/model_executor/layers/moe/routing_indices_cache.py index 7bc0d35ee70..01f9d81c866 100644 --- a/fastdeploy/model_executor/layers/moe/routing_indices_cache.py +++ b/fastdeploy/model_executor/layers/moe/routing_indices_cache.py @@ -16,7 +16,9 @@ import copy import os -from typing import Dict +import shutil +from abc import ABC, abstractmethod +from typing import Dict, List, Optional import paddle import paddle.distributed as dist @@ -28,7 +30,7 @@ @triton.jit def _save_routing_kernel( - ROUTING_TABLE_BUFFER_PTR, + ROUTING_REPLAY_TABLE_PTR, TOPK_IDS_PTR, BATCH_ID_PER_TOKEN_PTR, CU_SEQLENS_Q_PTR, @@ -70,7 +72,7 @@ def _save_routing_kernel( # [BLOCK_SIZE_M, BLOCK_SIZE_K] output_ptrs = ( - ROUTING_TABLE_BUFFER_PTR + ROUTING_REPLAY_TABLE_PTR + batch_ids[:, None] * STRIDE_BUF_SEQ + LAYER_IDX * STRIDE_BUF_LAYER + token_seq_pos[:, None] * STRIDE_BUF_TOKEN @@ -85,7 +87,7 @@ def _save_routing_kernel( def save_routing_to_buffer( - routing_table_buffer: paddle.Tensor, # [max_num_seqs, num_layers, max_len, top_k] + routing_replay_table: paddle.Tensor, # [max_num_seqs, num_layers, max_len, top_k] topk_ids: paddle.Tensor, # [token_num, top_k] batch_id_per_token: paddle.Tensor, # [token_num, 1] seq_lens_decoder: paddle.Tensor, # [max_num_seqs, 1] @@ -102,18 +104,18 @@ def save_routing_to_buffer( topk_ids = topk_ids_all[: batch_id_per_token.shape[0], :] token_num, top_k = topk_ids.shape - max_num_seqs, num_hidden_layers, max_model_len, _ = routing_table_buffer.shape + max_num_seqs, num_hidden_layers, max_model_len, _ = routing_replay_table.shape assert token_num > 0 - assert topk_ids.shape[1] == routing_table_buffer.shape[3], (topk_ids.shape[1], routing_table_buffer.shape[3]) + assert topk_ids.shape[1] == routing_replay_table.shape[3], (topk_ids.shape[1], routing_replay_table.shape[3]) assert batch_id_per_token.shape[0] == token_num, (batch_id_per_token.shape[0], token_num) assert seq_lens_decoder.shape[0] == max_num_seqs, (seq_lens_decoder.shape[0], max_num_seqs) BLOCK_SIZE_M = 128 - BLOCK_SIZE_K = top_k # 值一般很小,直接设为 top_k + BLOCK_SIZE_K = top_k grid = (triton.cdiv(token_num, BLOCK_SIZE_M),) _save_routing_kernel[grid]( - routing_table_buffer, + routing_replay_table, topk_ids, batch_id_per_token, cu_seqlens_q, @@ -128,34 +130,12 @@ def save_routing_to_buffer( ) -# max_num_seqs = 4 -# num_layers = 1 -# max_len = 10 -# top_k = 8 -# token_num = 12 - -# routing_table_buffer = paddle.full([max_num_seqs, num_layers, max_len, top_k], -1, dtype="int32") -# topk_ids = paddle.randint(0, 384, [token_num, top_k], dtype="int32") -# batch_id_per_token = paddle.to_tensor([0, 0, 0, 0, 2, 2, 2, 2, 2, 2, 3, 3], dtype="int32").reshape([-1, 1]) -# seq_lens_decoder = paddle.to_tensor([0, 2, 0, 3], dtype="int32").reshape([-1, 1]) -# cu_seqlens_q = paddle.to_tensor([0, 3, 4, 10, 12], dtype="int32").reshape([-1, 1]) -# current_layer_idx = 0 - -# save_routing_to_buffer( -# routing_table_buffer=routing_table_buffer, -# topk_ids=topk_ids, -# batch_id_per_token=batch_id_per_token, -# seq_lens_decoder=seq_lens_decoder, -# cu_seqlens_q=cu_seqlens_q, -# layer_idx=current_layer_idx, -# ) - - class RoutingReplayManager: + """Request level routing replay table manager""" + def __init__( self, fd_config: FDConfig, - output_dir: str = "./routing_replay_output", ): self.max_num_seqs = fd_config.parallel_config.max_num_seqs self.max_model_len = fd_config.model_config.max_model_len @@ -163,58 +143,187 @@ def __init__( self.moe_top_k = fd_config.model_config.moe_k self.tp_rank = fd_config.parallel_config.tensor_parallel_rank - self.output_dir = output_dir - + self.routing_store = get_routing_store(fd_config=fd_config) self.routing_batch_to_request: Dict[int, str] = {} - - self.routing_table_buffer = paddle.full( + self.routing_replay_table = paddle.full( shape=[self.max_num_seqs, self.num_moe_layers, self.max_model_len, self.moe_top_k], fill_value=-1, dtype="int32", ) + def register_request(self, batch_id: int, request_id: str): + """ + Register a new request to routing replay table + Args: + batch_id: The batch ID of this request + request_id: The global ID of the request is usually executed by the training process in RL + """ + # Save requests that have been finished for the current slot + if batch_id in self.routing_batch_to_request: + pre_request_id = self._deregister_request(batch_id) + self._put_request_to_store(batch_id, pre_request_id) + # Register the new request + self.routing_batch_to_request[batch_id] = request_id + def _deregister_request(self, batch_id: int) -> str: + """ + Deregister a request from routing replay table + """ assert batch_id in self.routing_batch_to_request return self.routing_batch_to_request.pop(batch_id) - def _clear_buffer_slot(self, batch_id: int): - assert 0 <= batch_id < self.max_num_seqs - self.routing_table_buffer[batch_id].fill_(-1) - - def _save_routing_to_file( + def _put_request_to_store( self, batch_id: int, request_id: str, ): + layer_buffer_shape = [self.max_model_len, self.moe_top_k] if self.tp_rank == 0: - dir_path = os.path.join(self.output_dir, f"{request_id}") - os.makedirs(dir_path, exist_ok=True) - batch_buffer = self.routing_table_buffer[batch_id] + batch_buffer = self.routing_replay_table[batch_id] for layer_id in range(self.num_moe_layers): layer_buffer = batch_buffer[layer_id] - print(f"{layer_id=}, {layer_buffer=}") - file_path = os.path.join( - dir_path, f"layer_{layer_id}_shape_{self.max_model_len}x{self.moe_top_k}.pdtensor" + self.routing_store.put( + routing_indices=layer_buffer, request_id=request_id, layer_idx=layer_id, shape=layer_buffer_shape ) - paddle.save(layer_buffer, file_path) - self._clear_buffer_slot(batch_id) + self._clear_table_slot(batch_id) - def clear_buffer(self): - self.routing_table_buffer.fill_(-1) + def put_table_to_store(self): + """Put the routin table""" + batch_ids = copy.deepcopy(list(self.routing_batch_to_request.keys())) + for batch_id in batch_ids: + request_id = self._deregister_request(batch_id) + self._put_request_to_store(batch_id, request_id) - def register_request(self, batch_id: int, request_id: str): - if batch_id in self.routing_batch_to_request: - pre_request_id = self._deregister_request(batch_id) - self._save_routing_to_file(batch_id, pre_request_id) + def _clear_table_slot(self, batch_id: int): + assert 0 <= batch_id < self.max_num_seqs + self.routing_replay_table[batch_id].fill_(-1) - self.routing_batch_to_request[batch_id] = request_id + def clear_routing_table(self): + """Clear all slots of the routing replay table""" + self.routing_replay_table.fill_(-1) - def get_buffer(self) -> paddle.Tensor: - return self.routing_table_buffer + def _clear_store(self): + """Clear routing store""" + self.routing_store.clear_store() - def save_tail_routing(self): - batch_ids = copy.deepcopy(list(self.routing_batch_to_request.keys())) - for batch_id in batch_ids: - request_id = self._deregister_request(batch_id) - self._save_routing_to_file(batch_id, request_id) + def _clear_request_of_store(self, request_id): + """Clear one request of routing store""" + for layer_idx in range(self.num_moe_layers): + self.routing_store.clear(request_id=request_id, layer_idx=layer_idx) + + def get_request_from_store(self, request_id: str) -> List[paddle.Tensor]: + """Get the routing indices of the reuest from store""" + routing_list = [] + for layer_idx in range(self.num_moe_layers): + one_layer_routing = self.routing_store.get(request_id, layer_idx) + routing_list.append(one_layer_routing) + + return routing_list + + def get_routing_table(self) -> paddle.Tensor: + return self.routing_replay_table + + +class RoutingStoreBase(ABC): + """Base class for routing store""" + + def __init__(self, fd_config: FDConfig) -> None: + self.fd_config = fd_config + + @abstractmethod + def put(self, routing_indices: paddle.Tensor, request_id: str, layer_idx: Optional[int] = None) -> None: + """Put the routing indices into store""" + raise NotImplementedError + + @abstractmethod + def get(self, request_id: str, layer_idx: Optional[int] = None) -> paddle.Tensor: + """Get the routing indices from store""" + raise NotImplementedError + + @abstractmethod + def clear(self, request_id: str, layer_idx: Optional[int] = None) -> None: + """Clear the routing indices of the request""" + raise NotImplementedError + + @abstractmethod + def clear_store( + self, + ): + """Clear the routing indices store""" + raise NotImplementedError + + +class RoutingStoreLocal(RoutingStoreBase): + """Routing Store using local memory""" + + def __init__(self, fd_config) -> None: + super().__init__(fd_config=fd_config) + self.output_dir = fd_config.routing_replay_config.output_dir # output_dir: str = "./routing_replay_output", + + def put(self, routing_indices: paddle.Tensor, request_id: str, layer_idx: int) -> None: + """Put the routing indices into store""" + dir_path = os.path.join(self.output_dir, f"{request_id}") + os.makedirs(dir_path, exist_ok=True) + file_path = os.path.join(dir_path, f"layer_{layer_idx}.pdtensor") + paddle.save(routing_indices, file_path) + + def get( + self, + request_id: str, + layer_idx: int = None, + ) -> paddle.Tensor: + """Get the routing indices from store""" + dir_path = os.path.join(self.output_dir, f"{request_id}") + file_path = os.path.join(dir_path, f"layer_{layer_idx}.pdtensor") + assert os.path.exists(file_path), f"File not found: {file_path}" + layer_routing_indices = paddle.load(file_path) + + return layer_routing_indices + + def clear( + self, + request_id: str, + layer_idx: int = None, + ) -> None: + """Clear the routing indices of the request""" + dir_path = os.path.join(self.output_dir, f"{request_id}") + file_path = os.path.join(dir_path, f"layer_{layer_idx}.pdtensor") + assert os.path.exists(file_path), f"File not found: {file_path}" + os.remove(file_path) + + # Delete empty directory + if len(os.listdir(dir_path)) == 0: + os.rmdir(dir_path) + + def clear_store(self): + """Clear the routing indices store""" + if os.path.isdir(self.output_dir): + for file_name in os.listdir(self.output_dir): + file_path = os.path.join(self.output_dir, file_name) + shutil.rmtree(file_path) + + +class RoutingStoreAFS(RoutingStoreBase): + """Routing Store using AFS""" + + def __init__(self) -> None: + super().__init__() + + +class RoutingStoreRDMA(RoutingStoreBase): + """Routing Store using RDMA""" + + def __init__(self) -> None: + super().__init__() + + +def get_routing_store(fd_config: FDConfig) -> RoutingStoreBase: + if fd_config.store_type == "local": + return RoutingStoreLocal() + elif fd_config.store_type == "afs": + return RoutingStoreAFS() + elif fd_config.store_type == "rdma": + return RoutingStoreRDMA() + else: + raise ValueError("Invalid store type") From b49377cea2a868af9eca9c4a34141fe88995f503 Mon Sep 17 00:00:00 2001 From: gongshaotian Date: Tue, 18 Nov 2025 21:36:07 +0800 Subject: [PATCH 10/17] add routing replay cofig --- fastdeploy/config.py | 35 +++++++++++++++++-- .../layers/moe/routing_indices_cache.py | 14 ++++---- 2 files changed, 40 insertions(+), 9 deletions(-) diff --git a/fastdeploy/config.py b/fastdeploy/config.py index 665bc28e105..150c34f87ef 100644 --- a/fastdeploy/config.py +++ b/fastdeploy/config.py @@ -1157,6 +1157,34 @@ def print(self): logger.info("=============================================================") +class RoutingReplayConfig: + """Configuration for Routing Replay used in RL training""" + + def __init__(self, args) -> None: + self.enable_routing_replay: bool = False + self.routing_store_type: str = "local" + + # Local routing store + self.local_store_dir: str = "./routing_replay_output" + + # AFS routing store + self.afs_ip: str = "" + self.afs_port: int = 0 + self.afs_password: str = "" + self.afs_user: str = "" + self.afs_store_dir: str = "" + + # RDMA routing store + pass + + for key, value in args.items(): + if hasattr(self, key) and value != "None": + setattr(self, key, value) + + def __str__(self) -> str: + return json.dumps({key: value for key, value in self.__dict__.items()}) + + class FDConfig: """ The configuration class which contains all fastdeploy-related configuration. This @@ -1200,7 +1228,8 @@ def __init__( test_mode=False, enable_attention_dp_balance: bool = False, attention_dp_time_out_iters: int = 0, - enable_rollout_routing_replay: bool = False, + # enable_rollout_routing_replay: bool = False, + routing_replay_config: Optional[RoutingReplayConfig] = None, ): self.model_config: ModelConfig = model_config # type: ignore self.cache_config: CacheConfig = cache_config # type: ignore @@ -1216,9 +1245,11 @@ def __init__( self.cache_config: CacheConfig = cache_config # type: ignore self.eplb_config: Optional[EPLBConfig] = eplb_config self.moba_attention_config: Optional[MobaAttentionConfig] = moba_attention_config + self.routing_replay_config = routing_replay_config self.enable_attention_dp_balance = enable_attention_dp_balance self.attention_dp_time_out_iters = attention_dp_time_out_iters - self.enable_rollout_routing_replay = enable_rollout_routing_replay + # self.enable_rollout_routing_replay = enable_rollout_routing_replay + # Initialize cuda graph capture list max_capture_shape = self.parallel_config.max_num_seqs if self.speculative_config is not None and self.speculative_config.method == "mtp": diff --git a/fastdeploy/model_executor/layers/moe/routing_indices_cache.py b/fastdeploy/model_executor/layers/moe/routing_indices_cache.py index 01f9d81c866..134d6533705 100644 --- a/fastdeploy/model_executor/layers/moe/routing_indices_cache.py +++ b/fastdeploy/model_executor/layers/moe/routing_indices_cache.py @@ -259,11 +259,11 @@ class RoutingStoreLocal(RoutingStoreBase): def __init__(self, fd_config) -> None: super().__init__(fd_config=fd_config) - self.output_dir = fd_config.routing_replay_config.output_dir # output_dir: str = "./routing_replay_output", + self.routing_store_dir = fd_config.routing_replay_config.routing_store_dir def put(self, routing_indices: paddle.Tensor, request_id: str, layer_idx: int) -> None: """Put the routing indices into store""" - dir_path = os.path.join(self.output_dir, f"{request_id}") + dir_path = os.path.join(self.routing_store_dir, f"{request_id}") os.makedirs(dir_path, exist_ok=True) file_path = os.path.join(dir_path, f"layer_{layer_idx}.pdtensor") paddle.save(routing_indices, file_path) @@ -274,7 +274,7 @@ def get( layer_idx: int = None, ) -> paddle.Tensor: """Get the routing indices from store""" - dir_path = os.path.join(self.output_dir, f"{request_id}") + dir_path = os.path.join(self.routing_store_dir, f"{request_id}") file_path = os.path.join(dir_path, f"layer_{layer_idx}.pdtensor") assert os.path.exists(file_path), f"File not found: {file_path}" layer_routing_indices = paddle.load(file_path) @@ -287,7 +287,7 @@ def clear( layer_idx: int = None, ) -> None: """Clear the routing indices of the request""" - dir_path = os.path.join(self.output_dir, f"{request_id}") + dir_path = os.path.join(self.routing_store_dir, f"{request_id}") file_path = os.path.join(dir_path, f"layer_{layer_idx}.pdtensor") assert os.path.exists(file_path), f"File not found: {file_path}" os.remove(file_path) @@ -298,9 +298,9 @@ def clear( def clear_store(self): """Clear the routing indices store""" - if os.path.isdir(self.output_dir): - for file_name in os.listdir(self.output_dir): - file_path = os.path.join(self.output_dir, file_name) + if os.path.isdir(self.routing_store_dir): + for file_name in os.listdir(self.routing_store_dir): + file_path = os.path.join(self.routing_store_dir, file_name) shutil.rmtree(file_path) From cfb0e31409979e0506de81be44e54d0d2ed7101f Mon Sep 17 00:00:00 2001 From: gongshaotian Date: Wed, 19 Nov 2025 16:46:31 +0800 Subject: [PATCH 11/17] add routing repaly config --- fastdeploy/config.py | 14 ++++------- fastdeploy/engine/args_utils.py | 24 +++++++++++++------ fastdeploy/engine/engine.py | 2 +- fastdeploy/model_executor/layers/moe/moe.py | 4 ++-- .../layers/moe/routing_indices_cache.py | 9 ------- fastdeploy/rl/rollout_config.py | 4 ++-- fastdeploy/worker/worker_process.py | 9 +++---- 7 files changed, 31 insertions(+), 35 deletions(-) diff --git a/fastdeploy/config.py b/fastdeploy/config.py index 150c34f87ef..100e61ed511 100644 --- a/fastdeploy/config.py +++ b/fastdeploy/config.py @@ -1167,13 +1167,6 @@ def __init__(self, args) -> None: # Local routing store self.local_store_dir: str = "./routing_replay_output" - # AFS routing store - self.afs_ip: str = "" - self.afs_port: int = 0 - self.afs_password: str = "" - self.afs_user: str = "" - self.afs_store_dir: str = "" - # RDMA routing store pass @@ -1181,7 +1174,10 @@ def __init__(self, args) -> None: if hasattr(self, key) and value != "None": setattr(self, key, value) - def __str__(self) -> str: + def to_json_string(self): + """ + Convert routing replay config to json string. + """ return json.dumps({key: value for key, value in self.__dict__.items()}) @@ -1228,7 +1224,6 @@ def __init__( test_mode=False, enable_attention_dp_balance: bool = False, attention_dp_time_out_iters: int = 0, - # enable_rollout_routing_replay: bool = False, routing_replay_config: Optional[RoutingReplayConfig] = None, ): self.model_config: ModelConfig = model_config # type: ignore @@ -1248,7 +1243,6 @@ def __init__( self.routing_replay_config = routing_replay_config self.enable_attention_dp_balance = enable_attention_dp_balance self.attention_dp_time_out_iters = attention_dp_time_out_iters - # self.enable_rollout_routing_replay = enable_rollout_routing_replay # Initialize cuda graph capture list max_capture_shape = self.parallel_config.max_num_seqs diff --git a/fastdeploy/engine/args_utils.py b/fastdeploy/engine/args_utils.py index f1558740ef3..daf6d323ea7 100644 --- a/fastdeploy/engine/args_utils.py +++ b/fastdeploy/engine/args_utils.py @@ -33,6 +33,7 @@ MobaAttentionConfig, ModelConfig, ParallelConfig, + RoutingReplayConfig, SpeculativeConfig, TaskOption, ) @@ -407,9 +408,9 @@ class EngineArgs: Configuration for eplb. """ - enable_rollout_routing_replay: bool = False + routing_replay_config: Optional[Dict[str, Any]] = None """ - Flag to enable rollout routing replay(r3) + Flag to rollout routing replay(r3) """ def __post_init__(self): @@ -721,10 +722,10 @@ def add_cli_args(parser: FlexibleArgumentParser) -> FlexibleArgumentParser: help="Config of eplb.", ) parallel_group.add_argument( - "--enable-rollout-routing-replay", - action="store_true", - default=EngineArgs.enable_rollout_routing_replay, - help="Flag to enable rollout routing replay(r3).", + "--routing-replay-config", + type=json.loads, + default=EngineArgs.routing_replay_config, + help="Flag of rollout routing replay(r3).", ) # Load group @@ -1065,6 +1066,14 @@ def create_eplb_config(self) -> EPLBConfig: eplb_args[k] = v return EPLBConfig(eplb_args) + def create_routing_repaly_config(self) -> RoutingReplayConfig: + """ """ + routing_replay_args = asdict(self) + if self.eplb_config is not None: + for k, v in self.eplb_config.items(): + routing_replay_args[k] = v + return RoutingReplayConfig(routing_replay_args) + def create_engine_config(self, port_availability_check: bool = True) -> FDConfig: """ Create and return a Config object based on the current settings. @@ -1107,6 +1116,7 @@ def create_engine_config(self, port_availability_check: bool = True) -> FDConfig graph_opt_cfg.update_use_cudagraph(self.use_cudagraph) moba_attention_config = self.create_moba_attention_config() eplb_cfg = self.create_eplb_config() + routing_replay_config = self.create_routing_repaly_config() early_stop_cfg = self.create_early_stop_config() early_stop_cfg.update_enable_early_stop(self.enable_early_stop) @@ -1152,5 +1162,5 @@ def create_engine_config(self, port_availability_check: bool = True) -> FDConfig early_stop_config=early_stop_cfg, enable_attention_dp_balance=self.enable_attention_dp_balance, attention_dp_time_out_iters=self.attention_dp_time_out_iters, - enable_rollout_routing_replay=self.enable_rollout_routing_replay, + routing_replay_config=routing_replay_config, ) diff --git a/fastdeploy/engine/engine.py b/fastdeploy/engine/engine.py index fd08a62442a..4b712614d62 100644 --- a/fastdeploy/engine/engine.py +++ b/fastdeploy/engine/engine.py @@ -462,6 +462,7 @@ def _start_worker_service(self): f" --moba_attention_config '{self.cfg.moba_attention_config.to_json_string()}'" f" --attention_dp_time_out_iters {self.cfg.attention_dp_time_out_iters}" f" --eplb_config '{self.cfg.eplb_config.to_json_string()}'" + f" --routing_replay_config '{self.cfg.routing_replay_config.to_json_string()}'" f" --ips {ips}" ) @@ -477,7 +478,6 @@ def _start_worker_service(self): "enable_logprob": self.cfg.model_config.enable_logprob, "lm_head_fp32": self.cfg.model_config.lm_head_fp32, "enable_attention_dp_balance": self.cfg.enable_attention_dp_balance, - "enable_rollout_routing_replay": self.cfg.enable_rollout_routing_replay, } for worker_flag, value in worker_append_flag.items(): if value: diff --git a/fastdeploy/model_executor/layers/moe/moe.py b/fastdeploy/model_executor/layers/moe/moe.py index d8d4ac286c8..8bb7a3b79e9 100644 --- a/fastdeploy/model_executor/layers/moe/moe.py +++ b/fastdeploy/model_executor/layers/moe/moe.py @@ -200,7 +200,7 @@ def __init__( if self.ep_size > 1: self.quant_method.init_ep(self) - self.enable_rollout_routing_replay = fd_config.enable_rollout_routing_replay + self.enable_routing_replay = fd_config.routing_replay_config.enable_routing_replay # Merge normal and RL build model if gate_correction_bias is not None: self.gate_correction_bias = gate_correction_bias @@ -550,7 +550,7 @@ def forward(self, x: paddle.Tensor, gate: nn.Layer, forward_meta: ForwardMeta = """ topk_ids_hookfunc = None - if self.enable_rollout_routing_replay: + if self.enable_routing_replay: if forward_meta is not None: # forward_meta is None when execute empty_input_forward topk_ids_hookfunc = partial( save_routing_to_buffer, diff --git a/fastdeploy/model_executor/layers/moe/routing_indices_cache.py b/fastdeploy/model_executor/layers/moe/routing_indices_cache.py index 134d6533705..32b248423f8 100644 --- a/fastdeploy/model_executor/layers/moe/routing_indices_cache.py +++ b/fastdeploy/model_executor/layers/moe/routing_indices_cache.py @@ -304,13 +304,6 @@ def clear_store(self): shutil.rmtree(file_path) -class RoutingStoreAFS(RoutingStoreBase): - """Routing Store using AFS""" - - def __init__(self) -> None: - super().__init__() - - class RoutingStoreRDMA(RoutingStoreBase): """Routing Store using RDMA""" @@ -321,8 +314,6 @@ def __init__(self) -> None: def get_routing_store(fd_config: FDConfig) -> RoutingStoreBase: if fd_config.store_type == "local": return RoutingStoreLocal() - elif fd_config.store_type == "afs": - return RoutingStoreAFS() elif fd_config.store_type == "rdma": return RoutingStoreRDMA() else: diff --git a/fastdeploy/rl/rollout_config.py b/fastdeploy/rl/rollout_config.py index 8d1b1ce6d49..58238dfadde 100644 --- a/fastdeploy/rl/rollout_config.py +++ b/fastdeploy/rl/rollout_config.py @@ -67,7 +67,7 @@ def __init__( enable_attention_dp_balance: bool = False, attention_dp_time_out_iters: int = 0, eplb_config: str = {}, - enable_rollout_routing_replay: bool = False, + enable_routing_replay: bool = False, ): # Required parameters self.model = model_name_or_path @@ -118,7 +118,7 @@ def __init__( self.enable_attention_dp_balance = enable_attention_dp_balance self.attention_dp_time_out_iters = attention_dp_time_out_iters self.eplb_config = eplb_config - self.enable_rollout_routing_replay = enable_rollout_routing_replay + self.enable_routing_replay = enable_routing_replay def __str__(self): return "\n".join(f"{k}: {v}" for k, v in self.__dict__.items()) diff --git a/fastdeploy/worker/worker_process.py b/fastdeploy/worker/worker_process.py index 4d72a49295d..a959d78bcf5 100644 --- a/fastdeploy/worker/worker_process.py +++ b/fastdeploy/worker/worker_process.py @@ -841,9 +841,10 @@ def parse_args(): ) parser.add_argument( - "--enable_rollout_routing_replay", - action="store_true", - help="enable rollout routing replay(r3)", + "--routing_replay_config", + type=json.loads, + default=None, + help="Configation of Rollout Routing Replay.", ) args = parser.parse_args() @@ -1004,7 +1005,7 @@ def initialize_fd_config(args, ranks: int = 1, local_rank: int = 0) -> FDConfig: enable_attention_dp_balance=args.enable_attention_dp_balance, attention_dp_time_out_iters=args.attention_dp_time_out_iters, eplb_config=eplb_config, - enable_rollout_routing_replay=args.enable_rollout_routing_replay, + routing_replay_config=args.routing_replay_config, ) update_fd_config_for_mm(fd_config) From f58b3df02665ef93490f34cd3d332018f6020e3a Mon Sep 17 00:00:00 2001 From: gongshaotian Date: Fri, 21 Nov 2025 16:22:00 +0800 Subject: [PATCH 12/17] success run routing store --- fastdeploy/engine/args_utils.py | 5 ++-- fastdeploy/model_executor/forward_meta.py | 2 +- fastdeploy/model_executor/layers/moe/moe.py | 2 +- .../layers/moe/routing_indices_cache.py | 27 +++++++++---------- fastdeploy/worker/worker_process.py | 5 +++- 5 files changed, 21 insertions(+), 20 deletions(-) diff --git a/fastdeploy/engine/args_utils.py b/fastdeploy/engine/args_utils.py index daf6d323ea7..7790693a110 100644 --- a/fastdeploy/engine/args_utils.py +++ b/fastdeploy/engine/args_utils.py @@ -1069,8 +1069,8 @@ def create_eplb_config(self) -> EPLBConfig: def create_routing_repaly_config(self) -> RoutingReplayConfig: """ """ routing_replay_args = asdict(self) - if self.eplb_config is not None: - for k, v in self.eplb_config.items(): + if self.routing_replay_config is not None: + for k, v in self.routing_replay_config.items(): routing_replay_args[k] = v return RoutingReplayConfig(routing_replay_args) @@ -1117,6 +1117,7 @@ def create_engine_config(self, port_availability_check: bool = True) -> FDConfig moba_attention_config = self.create_moba_attention_config() eplb_cfg = self.create_eplb_config() routing_replay_config = self.create_routing_repaly_config() + print("after create {routing_replay_config}") early_stop_cfg = self.create_early_stop_config() early_stop_cfg.update_enable_early_stop(self.enable_early_stop) diff --git a/fastdeploy/model_executor/forward_meta.py b/fastdeploy/model_executor/forward_meta.py index bead52c9d18..e66289fdbbc 100644 --- a/fastdeploy/model_executor/forward_meta.py +++ b/fastdeploy/model_executor/forward_meta.py @@ -111,7 +111,7 @@ class ForwardMeta: # KV caches caches: Optional[list[paddle.Tensor]] = None # Routing Replay table buffer - routing_repaly_table: Optional[paddle.Tensor] = None + routing_replay_table: Optional[paddle.Tensor] = None def clear_caches(self): """Safely clean up the caches""" diff --git a/fastdeploy/model_executor/layers/moe/moe.py b/fastdeploy/model_executor/layers/moe/moe.py index 8bb7a3b79e9..5afd2268818 100644 --- a/fastdeploy/model_executor/layers/moe/moe.py +++ b/fastdeploy/model_executor/layers/moe/moe.py @@ -554,7 +554,7 @@ def forward(self, x: paddle.Tensor, gate: nn.Layer, forward_meta: ForwardMeta = if forward_meta is not None: # forward_meta is None when execute empty_input_forward topk_ids_hookfunc = partial( save_routing_to_buffer, - routing_repaly_table=forward_meta.routing_repaly_table, + routing_replay_table=forward_meta.routing_replay_table, batch_id_per_token=forward_meta.batch_id_per_token, seq_lens_decoder=forward_meta.seq_lens_decoder, cu_seqlens_q=forward_meta.cu_seqlens_q, diff --git a/fastdeploy/model_executor/layers/moe/routing_indices_cache.py b/fastdeploy/model_executor/layers/moe/routing_indices_cache.py index 32b248423f8..add86e36e03 100644 --- a/fastdeploy/model_executor/layers/moe/routing_indices_cache.py +++ b/fastdeploy/model_executor/layers/moe/routing_indices_cache.py @@ -177,14 +177,11 @@ def _put_request_to_store( batch_id: int, request_id: str, ): - layer_buffer_shape = [self.max_model_len, self.moe_top_k] if self.tp_rank == 0: batch_buffer = self.routing_replay_table[batch_id] for layer_id in range(self.num_moe_layers): layer_buffer = batch_buffer[layer_id] - self.routing_store.put( - routing_indices=layer_buffer, request_id=request_id, layer_idx=layer_id, shape=layer_buffer_shape - ) + self.routing_store.put(routing_indices=layer_buffer, request_id=request_id, layer_idx=layer_id) self._clear_table_slot(batch_id) @@ -259,11 +256,11 @@ class RoutingStoreLocal(RoutingStoreBase): def __init__(self, fd_config) -> None: super().__init__(fd_config=fd_config) - self.routing_store_dir = fd_config.routing_replay_config.routing_store_dir + self.local_store_dir = fd_config.routing_replay_config.local_store_dir def put(self, routing_indices: paddle.Tensor, request_id: str, layer_idx: int) -> None: """Put the routing indices into store""" - dir_path = os.path.join(self.routing_store_dir, f"{request_id}") + dir_path = os.path.join(self.local_store_dir, f"{request_id}") os.makedirs(dir_path, exist_ok=True) file_path = os.path.join(dir_path, f"layer_{layer_idx}.pdtensor") paddle.save(routing_indices, file_path) @@ -274,7 +271,7 @@ def get( layer_idx: int = None, ) -> paddle.Tensor: """Get the routing indices from store""" - dir_path = os.path.join(self.routing_store_dir, f"{request_id}") + dir_path = os.path.join(self.local_store_dir, f"{request_id}") file_path = os.path.join(dir_path, f"layer_{layer_idx}.pdtensor") assert os.path.exists(file_path), f"File not found: {file_path}" layer_routing_indices = paddle.load(file_path) @@ -287,7 +284,7 @@ def clear( layer_idx: int = None, ) -> None: """Clear the routing indices of the request""" - dir_path = os.path.join(self.routing_store_dir, f"{request_id}") + dir_path = os.path.join(self.local_store_dir, f"{request_id}") file_path = os.path.join(dir_path, f"layer_{layer_idx}.pdtensor") assert os.path.exists(file_path), f"File not found: {file_path}" os.remove(file_path) @@ -298,9 +295,9 @@ def clear( def clear_store(self): """Clear the routing indices store""" - if os.path.isdir(self.routing_store_dir): - for file_name in os.listdir(self.routing_store_dir): - file_path = os.path.join(self.routing_store_dir, file_name) + if os.path.isdir(self.local_store_dir): + for file_name in os.listdir(self.local_store_dir): + file_path = os.path.join(self.local_store_dir, file_name) shutil.rmtree(file_path) @@ -312,9 +309,9 @@ def __init__(self) -> None: def get_routing_store(fd_config: FDConfig) -> RoutingStoreBase: - if fd_config.store_type == "local": - return RoutingStoreLocal() - elif fd_config.store_type == "rdma": - return RoutingStoreRDMA() + if fd_config.routing_replay_config.routing_store_type == "local": + return RoutingStoreLocal(fd_config=fd_config) + elif fd_config.routing_replay_config.routing_store_type == "rdma": + return RoutingStoreRDMA(fd_config=fd_config) else: raise ValueError("Invalid store type") diff --git a/fastdeploy/worker/worker_process.py b/fastdeploy/worker/worker_process.py index a959d78bcf5..fe379954b08 100644 --- a/fastdeploy/worker/worker_process.py +++ b/fastdeploy/worker/worker_process.py @@ -38,6 +38,7 @@ MobaAttentionConfig, ModelConfig, ParallelConfig, + RoutingReplayConfig, SpeculativeConfig, ) from fastdeploy.engine.request import RequestType @@ -907,6 +908,8 @@ def initialize_fd_config(args, ranks: int = 1, local_rank: int = 0) -> FDConfig: eplb_config = EPLBConfig(args.eplb_config) + routing_replay_config = RoutingReplayConfig(args.routing_replay_config) + # Note(tangbinhan): used for load_checkpoint model_config.pretrained_config.tensor_parallel_rank = parallel_config.tensor_parallel_rank model_config.pretrained_config.tensor_parallel_degree = parallel_config.tensor_parallel_size @@ -1005,7 +1008,7 @@ def initialize_fd_config(args, ranks: int = 1, local_rank: int = 0) -> FDConfig: enable_attention_dp_balance=args.enable_attention_dp_balance, attention_dp_time_out_iters=args.attention_dp_time_out_iters, eplb_config=eplb_config, - routing_replay_config=args.routing_replay_config, + routing_replay_config=routing_replay_config, ) update_fd_config_for_mm(fd_config) From cd5ce869cb35ca933b460ad3ea4a3d0e4ed0d2e1 Mon Sep 17 00:00:00 2001 From: gongshaotian Date: Tue, 25 Nov 2025 21:31:35 +0800 Subject: [PATCH 13/17] convert request id as rollout id --- .../layers/moe/routing_indices_cache.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/fastdeploy/model_executor/layers/moe/routing_indices_cache.py b/fastdeploy/model_executor/layers/moe/routing_indices_cache.py index add86e36e03..22d5f59fc25 100644 --- a/fastdeploy/model_executor/layers/moe/routing_indices_cache.py +++ b/fastdeploy/model_executor/layers/moe/routing_indices_cache.py @@ -190,7 +190,8 @@ def put_table_to_store(self): batch_ids = copy.deepcopy(list(self.routing_batch_to_request.keys())) for batch_id in batch_ids: request_id = self._deregister_request(batch_id) - self._put_request_to_store(batch_id, request_id) + rollout_id = self.split_request_id(request_id) + self._put_request_to_store(batch_id, rollout_id) def _clear_table_slot(self, batch_id: int): assert 0 <= batch_id < self.max_num_seqs @@ -221,6 +222,14 @@ def get_request_from_store(self, request_id: str) -> List[paddle.Tensor]: def get_routing_table(self) -> paddle.Tensor: return self.routing_replay_table + def split_request_id(self, request_id: str): + """Split the request id to get rollout id""" + chat_type, tmp_str = request_id.split("-", 1) + assert chat_type == "chatcmpl" + reversed_tmp_str = tmp_str[::-1].split("-", 5) + rollout_id = reversed_tmp_str[-1][::-1] + return rollout_id + class RoutingStoreBase(ABC): """Base class for routing store""" From ca0e7b622bbb61eae55b5fe967abf3d51782b557 Mon Sep 17 00:00:00 2001 From: gongshaotian Date: Thu, 27 Nov 2025 20:09:36 +0800 Subject: [PATCH 14/17] fix rollout config bug --- fastdeploy/config.py | 7 ++++--- fastdeploy/rl/rollout_config.py | 4 ++-- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/fastdeploy/config.py b/fastdeploy/config.py index 100e61ed511..123d7a48ca2 100644 --- a/fastdeploy/config.py +++ b/fastdeploy/config.py @@ -1170,9 +1170,10 @@ def __init__(self, args) -> None: # RDMA routing store pass - for key, value in args.items(): - if hasattr(self, key) and value != "None": - setattr(self, key, value) + if args is not None: + for key, value in args.items(): + if hasattr(self, key) and value != "None": + setattr(self, key, value) def to_json_string(self): """ diff --git a/fastdeploy/rl/rollout_config.py b/fastdeploy/rl/rollout_config.py index 58238dfadde..58535236d68 100644 --- a/fastdeploy/rl/rollout_config.py +++ b/fastdeploy/rl/rollout_config.py @@ -67,7 +67,7 @@ def __init__( enable_attention_dp_balance: bool = False, attention_dp_time_out_iters: int = 0, eplb_config: str = {}, - enable_routing_replay: bool = False, + routing_replay_config: str = None, ): # Required parameters self.model = model_name_or_path @@ -118,7 +118,7 @@ def __init__( self.enable_attention_dp_balance = enable_attention_dp_balance self.attention_dp_time_out_iters = attention_dp_time_out_iters self.eplb_config = eplb_config - self.enable_routing_replay = enable_routing_replay + self.routing_replay_config = routing_replay_config def __str__(self): return "\n".join(f"{k}: {v}" for k, v in self.__dict__.items()) From a3a91d35ea1f402b8756beb6dadcf2484942729e Mon Sep 17 00:00:00 2001 From: gongshaotian Date: Mon, 1 Dec 2025 16:47:31 +0800 Subject: [PATCH 15/17] unify code --- .../model_executor/layers/moe/routing_indices_cache.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/fastdeploy/model_executor/layers/moe/routing_indices_cache.py b/fastdeploy/model_executor/layers/moe/routing_indices_cache.py index 22d5f59fc25..4049eb21e7b 100644 --- a/fastdeploy/model_executor/layers/moe/routing_indices_cache.py +++ b/fastdeploy/model_executor/layers/moe/routing_indices_cache.py @@ -181,7 +181,8 @@ def _put_request_to_store( batch_buffer = self.routing_replay_table[batch_id] for layer_id in range(self.num_moe_layers): layer_buffer = batch_buffer[layer_id] - self.routing_store.put(routing_indices=layer_buffer, request_id=request_id, layer_idx=layer_id) + rollout_id = self.split_request_id(request_id) + self.routing_store.put(routing_indices=layer_buffer, request_id=rollout_id, layer_idx=layer_id) self._clear_table_slot(batch_id) @@ -190,8 +191,7 @@ def put_table_to_store(self): batch_ids = copy.deepcopy(list(self.routing_batch_to_request.keys())) for batch_id in batch_ids: request_id = self._deregister_request(batch_id) - rollout_id = self.split_request_id(request_id) - self._put_request_to_store(batch_id, rollout_id) + self._put_request_to_store(batch_id, request_id) def _clear_table_slot(self, batch_id: int): assert 0 <= batch_id < self.max_num_seqs From e49203f25e3ad7f3f66b4b9f300b8b5fce3f321c Mon Sep 17 00:00:00 2001 From: gongshaotian Date: Mon, 1 Dec 2025 17:08:20 +0800 Subject: [PATCH 16/17] use rollout_id to replace request_id in routing store --- .../layers/moe/routing_indices_cache.py | 26 ++++++++++--------- 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/fastdeploy/model_executor/layers/moe/routing_indices_cache.py b/fastdeploy/model_executor/layers/moe/routing_indices_cache.py index 4049eb21e7b..d4d971a9326 100644 --- a/fastdeploy/model_executor/layers/moe/routing_indices_cache.py +++ b/fastdeploy/model_executor/layers/moe/routing_indices_cache.py @@ -182,7 +182,7 @@ def _put_request_to_store( for layer_id in range(self.num_moe_layers): layer_buffer = batch_buffer[layer_id] rollout_id = self.split_request_id(request_id) - self.routing_store.put(routing_indices=layer_buffer, request_id=rollout_id, layer_idx=layer_id) + self.routing_store.put(routing_indices=layer_buffer, rollout_id=rollout_id, layer_idx=layer_id) self._clear_table_slot(batch_id) @@ -207,14 +207,16 @@ def _clear_store(self): def _clear_request_of_store(self, request_id): """Clear one request of routing store""" + rollout_id = self.split_request_id(request_id) for layer_idx in range(self.num_moe_layers): - self.routing_store.clear(request_id=request_id, layer_idx=layer_idx) + self.routing_store.clear(rollout_id=rollout_id, layer_idx=layer_idx) def get_request_from_store(self, request_id: str) -> List[paddle.Tensor]: """Get the routing indices of the reuest from store""" routing_list = [] + rollout_id = self.split_request_id(request_id) for layer_idx in range(self.num_moe_layers): - one_layer_routing = self.routing_store.get(request_id, layer_idx) + one_layer_routing = self.routing_store.get(rollout_id, layer_idx) routing_list.append(one_layer_routing) return routing_list @@ -238,17 +240,17 @@ def __init__(self, fd_config: FDConfig) -> None: self.fd_config = fd_config @abstractmethod - def put(self, routing_indices: paddle.Tensor, request_id: str, layer_idx: Optional[int] = None) -> None: + def put(self, routing_indices: paddle.Tensor, rollout_id: str, layer_idx: Optional[int] = None) -> None: """Put the routing indices into store""" raise NotImplementedError @abstractmethod - def get(self, request_id: str, layer_idx: Optional[int] = None) -> paddle.Tensor: + def get(self, rollout_id: str, layer_idx: Optional[int] = None) -> paddle.Tensor: """Get the routing indices from store""" raise NotImplementedError @abstractmethod - def clear(self, request_id: str, layer_idx: Optional[int] = None) -> None: + def clear(self, rollout_id: str, layer_idx: Optional[int] = None) -> None: """Clear the routing indices of the request""" raise NotImplementedError @@ -267,20 +269,20 @@ def __init__(self, fd_config) -> None: super().__init__(fd_config=fd_config) self.local_store_dir = fd_config.routing_replay_config.local_store_dir - def put(self, routing_indices: paddle.Tensor, request_id: str, layer_idx: int) -> None: + def put(self, routing_indices: paddle.Tensor, rollout_id: str, layer_idx: int) -> None: """Put the routing indices into store""" - dir_path = os.path.join(self.local_store_dir, f"{request_id}") + dir_path = os.path.join(self.local_store_dir, f"{rollout_id}") os.makedirs(dir_path, exist_ok=True) file_path = os.path.join(dir_path, f"layer_{layer_idx}.pdtensor") paddle.save(routing_indices, file_path) def get( self, - request_id: str, + rollout_id: str, layer_idx: int = None, ) -> paddle.Tensor: """Get the routing indices from store""" - dir_path = os.path.join(self.local_store_dir, f"{request_id}") + dir_path = os.path.join(self.local_store_dir, f"{rollout_id}") file_path = os.path.join(dir_path, f"layer_{layer_idx}.pdtensor") assert os.path.exists(file_path), f"File not found: {file_path}" layer_routing_indices = paddle.load(file_path) @@ -289,11 +291,11 @@ def get( def clear( self, - request_id: str, + rollout_id: str, layer_idx: int = None, ) -> None: """Clear the routing indices of the request""" - dir_path = os.path.join(self.local_store_dir, f"{request_id}") + dir_path = os.path.join(self.local_store_dir, f"{rollout_id}") file_path = os.path.join(dir_path, f"layer_{layer_idx}.pdtensor") assert os.path.exists(file_path), f"File not found: {file_path}" os.remove(file_path) From 77beeccb51502628960761cc985592e9a7b6483e Mon Sep 17 00:00:00 2001 From: gongshaotian Date: Thu, 4 Dec 2025 15:40:46 +0800 Subject: [PATCH 17/17] delete code --- fastdeploy/engine/args_utils.py | 1 - 1 file changed, 1 deletion(-) diff --git a/fastdeploy/engine/args_utils.py b/fastdeploy/engine/args_utils.py index 7f844dbf914..70ed2d5637f 100644 --- a/fastdeploy/engine/args_utils.py +++ b/fastdeploy/engine/args_utils.py @@ -1141,7 +1141,6 @@ def create_engine_config(self, port_availability_check: bool = True) -> FDConfig moba_attention_config = self.create_moba_attention_config() eplb_cfg = self.create_eplb_config() routing_replay_config = self.create_routing_repaly_config() - print("after create {routing_replay_config}") early_stop_cfg = self.create_early_stop_config() early_stop_cfg.update_enable_early_stop(self.enable_early_stop)