From a7c1b8ad547b48eaf07051c4abd995591094d2a8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=99=BD=E6=B0=B8=E6=96=8C?= Date: Thu, 18 Sep 2025 15:10:00 +0800 Subject: [PATCH 1/4] add model_register_gpu MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 白永斌 --- vllm/distributed/eplb/model_register.py | 127 -------------------- vllm/distributed/eplb/model_register_gpu.py | 66 ++++++++++ vllm/model_executor/models/deepseek_v2.py | 26 ++-- vllm/model_executor/models/glm4_moe.py | 27 ++--- vllm/model_executor/models/qwen3_moe.py | 26 ++-- vllm/v1/worker/gpu_model_runner.py | 3 + 6 files changed, 96 insertions(+), 179 deletions(-) delete mode 100644 vllm/distributed/eplb/model_register.py create mode 100644 vllm/distributed/eplb/model_register_gpu.py diff --git a/vllm/distributed/eplb/model_register.py b/vllm/distributed/eplb/model_register.py deleted file mode 100644 index 4bbc83195832..000000000000 --- a/vllm/distributed/eplb/model_register.py +++ /dev/null @@ -1,127 +0,0 @@ -# SPDX-License-Identifier: Apache-2.0 -# SPDX-FileCopyrightText: Copyright contributors to the vLLM project - -import types - -import torch - - -def get_expert_map(self, layer_id): - """ - Retrieves the expert map for a specific MoE layer. This map typically - indicates the mapping from logical expert IDs to physical expert IDs or - some other internal representation of expert distribution within that layer. - - Args: - self: The model instance (e.g., an instance of a vLLM model). - layer_id: The index of the MoE layer. - - Returns: - A torch.Tensor representing the expert map for the specified layer. - """ - return self.model.layers[layer_id].mlp.experts.get_map() - - -def get_log2phy_map(self, layer_id): - """ - Retrieves the logical-to-physical expert mapping for a specific MoE layer. - This map determines which physical expert (identified by its global - physical ID) a logical expert ID should map to for the current rank. - - Args: - self: The model instance. - layer_id: The index of the MoE layer. - - Returns: - A torch.Tensor representing the logical-to-physical mapping for - the specified layer. - """ - return self.model.layers[layer_id].mlp.experts.get_log2phy_map() - - -def get_all_expert_map(self, num_moe_layers): - """ - Aggregates the expert maps from all MoE layers into a single tensor. - - Args: - self: The model instance. - num_moe_layers: The total number of MoE layers in the model. - - Returns: - A torch.Tensor of shape (num_moe_layers, num_experts_per_layer), - where each row is the expert map for a corresponding MoE layer. - """ - all_loads = [] - for layer_id in range(num_moe_layers): - load_tensor = self.get_expert_map(self.num_dense_layers + layer_id) - all_loads.append(load_tensor) - - return torch.stack(all_loads, dim=0) - - -def get_all_moe_loads(self): - """ - Retrieves the current expert load (e.g., token counts routed to each - expert) for all MoE layers. This typically reflects the load - accumulated during the most recent forward pass. - - Args: - self: The model instance. - - Returns: - A torch.Tensor of shape (num_moe_layers, ...), where the dimensions - after the first depend on how expert_load_view is structured (e.g., - (num_physical_experts) or (num_ranks, num_physical_experts)). - """ - if self.num_moe_layers == 0: - return torch.empty((0, 0), dtype=torch.int64) - - return torch.stack( - [self.model.layers[self.num_dense_layers + \ - layer_id].mlp.experts.expert_load_view \ - for layer_id in range(self.num_moe_layers)], dim=0 - ) - - -def clear_all_moe_loads(self): - """ - Resets the expert load counters for all MoE layers. - This is typically called after an aggregation step or at the beginning of - a new load measurement period. - - Args: - self: The model instance. - """ - for layer_id in range(self.num_moe_layers): - self.model.layers[self.num_dense_layers + \ - layer_id].mlp.experts.clear_moe_load() - - -def model_register(model, model_config): - """ - Registers custom methods related to Expert Parallel Load Balancing (EPLB) - onto the vLLM model instance. It also determines the number of MoE layers - based on the model configuration. - - Args: - model: The vLLM model instance to which the methods will be added. - model_config: The configuration object for the model, containing - details like model_type and layer counts. - """ - model.get_expert_map = types.MethodType(get_expert_map, model) - model.get_log2phy_map = types.MethodType(get_log2phy_map, model) - model.get_all_expert_map = types.MethodType(get_all_expert_map, model) - model.get_all_moe_loads = types.MethodType(get_all_moe_loads, model) - model.clear_all_moe_loads = types.MethodType(clear_all_moe_loads, model) - - config = model_config.hf_config - - if config.model_type == "qwen3_moe": - model.num_dense_layers = 0 - model.num_moe_layers = config.num_hidden_layers - elif config.model_type == "deepseek_v2": - model.num_dense_layers = config.first_k_dense_replace - model.num_moe_layers = config.num_hidden_layers - \ - model.num_dense_layers - else: - raise NotImplementedError("EPLB is not supported.") diff --git a/vllm/distributed/eplb/model_register_gpu.py b/vllm/distributed/eplb/model_register_gpu.py new file mode 100644 index 000000000000..d7d49031f0f1 --- /dev/null +++ b/vllm/distributed/eplb/model_register_gpu.py @@ -0,0 +1,66 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project + +import types + +import torch +from vllm.model_executor.layers.fused_moe import FusedMoE + +def set_eplb_state( + self, + expert_load_view: torch.Tensor, + logical_to_physical_map: torch.Tensor, + logical_replica_count: torch.Tensor, +) -> None: + for layer_idx, layer in enumerate(self.moe_layers): + # Register the expert weights. + self.expert_weights.append(layer.get_expert_weights()) + layer.set_eplb_state( + moe_layer_idx=layer_idx, + expert_load_view=expert_load_view, + logical_to_physical_map=logical_to_physical_map, + logical_replica_count=logical_replica_count, + ) + +def update_physical_experts_metadata( + self, + num_physical_experts: int, + num_local_physical_experts: int, +) -> None: + assert self.num_local_physical_experts == num_local_physical_experts + self.num_physical_experts = num_physical_experts + self.num_local_physical_experts = num_local_physical_experts + self.num_redundant_experts = (num_physical_experts - + self.num_logical_experts) + for layer in self.model.layers: + if isinstance(layer.mlp, self.example_moe): + moe = layer.mlp + moe.n_local_physical_experts = num_local_physical_experts + moe.n_physical_experts = num_physical_experts + moe.n_redundant_experts = self.num_redundant_experts + moe.experts.update_expert_map() + +def get_expert_mapping(self) -> list[tuple[str, str, int, str]]: + # Params for weights, fp8 weight scales, fp8 activation scales + # (param_name, weight_name, expert_id, shard_id) + return FusedMoE.make_expert_params_mapping( + ckpt_gate_proj_name="gate_proj", + ckpt_down_proj_name="down_proj", + ckpt_up_proj_name="up_proj", + num_experts=self.config.n_routed_experts + num_redundant_experts=self.num_redundant_experts) + +def model_register(model): + """ + Registers custom methods related to Expert Parallel Load Balancing (EPLB) + onto the vLLM model instance. It also determines the number of MoE layers + based on the model configuration. + + Args: + model: The vLLM model instance to which the methods will be added. + """ + model.set_eplb_state = types.MethodType(set_eplb_state, model) + model.update_physical_experts_metadata = \ + types.MethodType(update_physical_experts_metadata, model) + model.model.get_expert_mapping = \ + types.MethodType(get_expert_mapping, model.model) diff --git a/vllm/model_executor/models/deepseek_v2.py b/vllm/model_executor/models/deepseek_v2.py index e4a21febc5bd..92ac7e023f2f 100644 --- a/vllm/model_executor/models/deepseek_v2.py +++ b/vllm/model_executor/models/deepseek_v2.py @@ -839,7 +839,7 @@ def __init__(self, *, vllm_config: VllmConfig, prefix: str = ""): self.num_expert_groups = config.n_group self.moe_layers: list[FusedMoE] = [] - example_moe = None + self.example_moe = None for layer in self.model.layers: if isinstance(layer, PPMissingLayer): continue @@ -847,18 +847,18 @@ def __init__(self, *, vllm_config: VllmConfig, prefix: str = ""): assert isinstance(layer, DeepseekV2DecoderLayer) if isinstance(layer.mlp, DeepseekV2MoE): # Pick last one layer since the first ones may be dense layers. - example_moe = layer.mlp + self.example_moe = layer.mlp self.moe_layers.append(layer.mlp.experts) - if example_moe is None: + if self.example_moe is None: raise RuntimeError("No DeepseekV2MoE layer found in model.layers.") - self.num_logical_experts = example_moe.n_logical_experts - self.num_physical_experts = example_moe.n_physical_experts - self.num_local_physical_experts = example_moe.n_local_physical_experts - self.num_routed_experts = example_moe.n_routed_experts - self.num_shared_experts = example_moe.n_shared_experts - self.num_redundant_experts = example_moe.n_redundant_experts + self.num_logical_experts = self.example_moe.n_logical_experts + self.num_physical_experts = self.example_moe.n_physical_experts + self.num_local_physical_experts = self.example_moe.n_local_physical_experts + self.num_routed_experts = self.example_moe.n_routed_experts + self.num_shared_experts = self.example_moe.n_shared_experts + self.num_redundant_experts = self.example_moe.n_redundant_experts def set_eplb_state( self, @@ -929,13 +929,7 @@ def load_weights(self, weights: Iterable[tuple[str, # Params for weights, fp8 weight scales, fp8 activation scales # (param_name, weight_name, expert_id, shard_id) - expert_params_mapping = FusedMoE.make_expert_params_mapping( - ckpt_gate_proj_name="gate_proj", - ckpt_down_proj_name="down_proj", - ckpt_up_proj_name="up_proj", - num_experts=self.config.n_routed_experts, - num_redundant_experts=self.num_redundant_experts) - + expert_params_mapping = self.model.get_expert_mapping() params_dict = dict(self.named_parameters()) loaded_params: set[str] = set() for name, loaded_weight in weights: diff --git a/vllm/model_executor/models/glm4_moe.py b/vllm/model_executor/models/glm4_moe.py index 1fb457609289..a7b1d759c985 100644 --- a/vllm/model_executor/models/glm4_moe.py +++ b/vllm/model_executor/models/glm4_moe.py @@ -471,15 +471,6 @@ def make_empty_intermediate_tensors( device=device), }) - def get_expert_mapping(self) -> list[tuple[str, str, int, str]]: - # Params for weights, fp8 weight scales, fp8 activation scales - # (param_name, weight_name, expert_id, shard_id) - return FusedMoE.make_expert_params_mapping( - ckpt_gate_proj_name="gate_proj", - ckpt_down_proj_name="down_proj", - ckpt_up_proj_name="up_proj", - num_experts=self.config.n_routed_experts) - def load_weights(self, weights: Iterable[tuple[str, torch.Tensor]]) -> set[str]: stacked_params_mapping = [ @@ -622,7 +613,7 @@ def __init__(self, *, vllm_config: VllmConfig, prefix: str = ""): self.num_expert_groups = config.n_group self.moe_layers: list[FusedMoE] = [] - example_moe = None + self.example_moe = None for layer in self.model.layers: if isinstance(layer, PPMissingLayer): continue @@ -630,18 +621,18 @@ def __init__(self, *, vllm_config: VllmConfig, prefix: str = ""): assert isinstance(layer, Glm4MoeDecoderLayer) if isinstance(layer.mlp, Glm4MoE): # Pick last one layer since the first ones may be dense layers. - example_moe = layer.mlp + self.example_moe = layer.mlp self.moe_layers.append(layer.mlp.experts) - if example_moe is None: + if self.example_moe is None: raise RuntimeError("No Glm4MoE layer found in model.layers.") - self.num_logical_experts = example_moe.n_logical_experts - self.num_physical_experts = example_moe.n_physical_experts - self.num_local_physical_experts = example_moe.n_local_physical_experts - self.num_routed_experts = example_moe.n_routed_experts - self.num_shared_experts = example_moe.n_shared_experts - self.num_redundant_experts = example_moe.n_redundant_experts + self.num_logical_experts = self.example_moe.n_logical_experts + self.num_physical_experts = self.example_moe.n_physical_experts + self.num_local_physical_experts = self.example_moe.n_local_physical_experts + self.num_routed_experts = self.example_moe.n_routed_experts + self.num_shared_experts = self.example_moe.n_shared_experts + self.num_redundant_experts = self.example_moe.n_redundant_experts def set_eplb_state( self, diff --git a/vllm/model_executor/models/qwen3_moe.py b/vllm/model_executor/models/qwen3_moe.py index 85429b3a01f9..556625313d5b 100644 --- a/vllm/model_executor/models/qwen3_moe.py +++ b/vllm/model_executor/models/qwen3_moe.py @@ -438,16 +438,6 @@ def forward( hidden_states, _ = self.norm(hidden_states, residual) return hidden_states - def get_expert_mapping(self) -> list[tuple[str, str, int, str]]: - # Params for weights, fp8 weight scales, fp8 activation scales - # (param_name, weight_name, expert_id, shard_id) - return FusedMoE.make_expert_params_mapping( - ckpt_gate_proj_name="gate_proj", - ckpt_down_proj_name="down_proj", - ckpt_up_proj_name="up_proj", - num_experts=self.config.num_experts, - num_redundant_experts=self.num_redundant_experts) - def load_weights(self, weights: Iterable[tuple[str, torch.Tensor]]) -> set[str]: stacked_params_mapping = [ @@ -616,27 +606,27 @@ def __init__(self, *, vllm_config: VllmConfig, prefix: str = ""): self.expert_weights = [] self.moe_layers: list[FusedMoE] = [] - example_layer = None + self.example_moe = None for layer in self.model.layers: if isinstance(layer, PPMissingLayer): continue assert isinstance(layer, Qwen3MoeDecoderLayer) if isinstance(layer.mlp, Qwen3MoeSparseMoeBlock): - example_layer = layer.mlp + self.example_moe = layer.mlp self.moe_layers.append(layer.mlp.experts) - if example_layer is None: + if self.example_moe is None: raise RuntimeError("No Qwen3MoE layer found in the model.layers.") self.num_moe_layers = len(self.moe_layers) self.num_expert_groups = 1 self.num_shared_experts = 0 - self.num_logical_experts = example_layer.n_logical_experts - self.num_physical_experts = example_layer.n_physical_experts - self.num_local_physical_experts = example_layer.n_local_physical_experts - self.num_routed_experts = example_layer.n_routed_experts - self.num_redundant_experts = example_layer.n_redundant_experts + self.num_logical_experts = self.example_moe.n_logical_experts + self.num_physical_experts = self.example_moe.n_physical_experts + self.num_local_physical_experts = self.example_moe.n_local_physical_experts + self.num_routed_experts = self.example_moe.n_routed_experts + self.num_redundant_experts = self.example_moe.n_redundant_experts def set_eplb_state( self, diff --git a/vllm/v1/worker/gpu_model_runner.py b/vllm/v1/worker/gpu_model_runner.py index d4afaf51e6e8..5ba595f30912 100644 --- a/vllm/v1/worker/gpu_model_runner.py +++ b/vllm/v1/worker/gpu_model_runner.py @@ -26,6 +26,7 @@ from vllm.config import (CompilationLevel, CUDAGraphMode, VllmConfig, get_layers_from_vllm_config, update_config) from vllm.distributed.eplb.eplb_state import EplbState +from vllm.distributed.eplb.model_register_gpu import model_register from vllm.distributed.kv_transfer import (get_kv_transfer_group, has_kv_transfer_group) from vllm.distributed.kv_transfer.kv_connector.utils import copy_kv_blocks @@ -2386,6 +2387,8 @@ def load_model(self, eep_scale_up: bool = False) -> None: logger.info("Loading model from scratch...") self.model = model_loader.load_model( vllm_config=self.vllm_config, model_config=self.model_config) + if self.parallel_config.enable_eplb: + model_register(self.model) if self.lora_config: self.model = self.load_lora_model(self.model, self.model_config, From 860c9245e18f0d763ee0be6fe60e7305d5783483 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=99=BD=E6=B0=B8=E6=96=8C?= Date: Thu, 18 Sep 2025 16:07:03 +0800 Subject: [PATCH 2/4] extract load_expert_weight method MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 白永斌 --- vllm/distributed/eplb/model_register_gpu.py | 58 ++++++++++++++++++++- vllm/model_executor/models/deepseek_v2.py | 31 ++--------- vllm/model_executor/models/glm4_moe.py | 31 ++--------- vllm/model_executor/models/qwen3_moe.py | 37 ++----------- 4 files changed, 68 insertions(+), 89 deletions(-) diff --git a/vllm/distributed/eplb/model_register_gpu.py b/vllm/distributed/eplb/model_register_gpu.py index d7d49031f0f1..ee68767dff9d 100644 --- a/vllm/distributed/eplb/model_register_gpu.py +++ b/vllm/distributed/eplb/model_register_gpu.py @@ -5,7 +5,11 @@ import torch from vllm.model_executor.layers.fused_moe import FusedMoE - +from vllm.model_executor.models.utils import (PPMissingLayer, is_pp_missing_parameter, + make_empty_intermediate_tensors_factory, make_layers, + maybe_prefix) +import typing +from typing import Callable def set_eplb_state( self, expert_load_view: torch.Tensor, @@ -50,6 +54,55 @@ def get_expert_mapping(self) -> list[tuple[str, str, int, str]]: num_experts=self.config.n_routed_experts num_redundant_experts=self.num_redundant_experts) +def load_expert_weight(self, mapping, loaded_weight, params_dict): + ignore_suffixes = (".bias", "_bias", ".k_scale", "_k_scale", + ".v_scale", "_v_scale", ".weight_scale", + "_weight_scale", ".input_scale", "_input_scale") + + is_continue = False + is_expert_weight = False + success = False + + param_name, weight_name, expert_id, shard_id = mapping + if weight_name not in name: + is_continue = True + return is_continue, is_expert_weight, success + + # Anyway, this is an expert weight and should not be + # attempted to load as other weights later + is_expert_weight = True + + # Do not modify `name` since the loop may continue here + # Instead, create a new variable + name_mapped = name.replace(weight_name, param_name) + + if is_pp_missing_parameter(name_mapped, self): + is_continue = True + return is_continue, is_expert_weight, success + + # Skip loading extra parameters for GPTQ/modelopt models. + if name_mapped.endswith( + ignore_suffixes + ) and name_mapped not in params_dict: + is_continue = True + return is_continue, is_expert_weight, success + + param = params_dict[name_mapped] + # We should ask the weight loader to return success or not + # here since otherwise we may skip experts with other + # available replicas. + weight_loader = typing.cast(Callable[..., bool], + param.weight_loader) + success = weight_loader(param, + loaded_weight, + name_mapped, + shard_id=shard_id, + expert_id=expert_id, + return_success=True) + if success: + name = name_mapped + return is_continue, is_expert_weight, success + def model_register(model): """ Registers custom methods related to Expert Parallel Load Balancing (EPLB) @@ -60,7 +113,8 @@ def model_register(model): model: The vLLM model instance to which the methods will be added. """ model.set_eplb_state = types.MethodType(set_eplb_state, model) + model.load_expert_weight = types.MethodType(load_expert_weight, model) model.update_physical_experts_metadata = \ - types.MethodType(update_physical_experts_metadata, model) + types.MethodType(update_physical_experts_metadata, model) model.model.get_expert_mapping = \ types.MethodType(get_expert_mapping, model.model) diff --git a/vllm/model_executor/models/deepseek_v2.py b/vllm/model_executor/models/deepseek_v2.py index 92ac7e023f2f..a0cfeb6b739c 100644 --- a/vllm/model_executor/models/deepseek_v2.py +++ b/vllm/model_executor/models/deepseek_v2.py @@ -976,35 +976,12 @@ def load_weights(self, weights: Iterable[tuple[str, else: is_expert_weight = False for mapping in expert_params_mapping: - param_name, weight_name, expert_id, shard_id = mapping - if weight_name not in name: + is_continue, is_expert_weight, success = \ + self.load_expert_weight( + mapping, loaded_weight, params_dict) + if is_continue: continue - - # Anyway, this is an expert weight and should not be - # attempted to load as other weights later - is_expert_weight = True - - # Do not modify `name` since the loop may continue here - # Instead, create a new variable - name_mapped = name.replace(weight_name, param_name) - - if is_pp_missing_parameter(name_mapped, self): - continue - - param = params_dict[name_mapped] - # We should ask the weight loader to return success or not - # here since otherwise we may skip experts with other - # available replicas. - weight_loader = typing.cast(Callable[..., bool], - param.weight_loader) - success = weight_loader(param, - loaded_weight, - name_mapped, - shard_id=shard_id, - expert_id=expert_id, - return_success=True) if success: - name = name_mapped break else: if is_expert_weight: diff --git a/vllm/model_executor/models/glm4_moe.py b/vllm/model_executor/models/glm4_moe.py index a7b1d759c985..3d800e548a64 100644 --- a/vllm/model_executor/models/glm4_moe.py +++ b/vllm/model_executor/models/glm4_moe.py @@ -515,35 +515,12 @@ def load_weights(self, weights: Iterable[tuple[str, else: is_expert_weight = False for mapping in expert_params_mapping: - param_name, weight_name, expert_id, shard_id = mapping - if weight_name not in name: + is_continue, is_expert_weight, success = \ + self.load_expert_weight( + mapping, loaded_weight, params_dict) + if is_continue: continue - - # Anyway, this is an expert weight and should not be - # attempted to load as other weights later - is_expert_weight = True - - # Do not modify `name` since the loop may continue here - # Instead, create a new variable - name_mapped = name.replace(weight_name, param_name) - - if is_pp_missing_parameter(name_mapped, self): - continue - - param = params_dict[name_mapped] - # We should ask the weight loader to return success or not - # here since otherwise we may skip experts with other - # available replicas. - weight_loader = typing.cast(Callable[..., bool], - param.weight_loader) - success = weight_loader(param, - loaded_weight, - name_mapped, - shard_id=shard_id, - expert_id=expert_id, - return_success=True) if success: - name = name_mapped break else: if is_expert_weight: diff --git a/vllm/model_executor/models/qwen3_moe.py b/vllm/model_executor/models/qwen3_moe.py index 556625313d5b..b34e5bfa2e59 100644 --- a/vllm/model_executor/models/qwen3_moe.py +++ b/vllm/model_executor/models/qwen3_moe.py @@ -498,41 +498,12 @@ def load_weights(self, weights: Iterable[tuple[str, else: is_expert_weight = False for mapping in expert_params_mapping: - param_name, weight_name, expert_id, shard_id = mapping - if weight_name not in name: + is_continue, is_expert_weight, success = \ + self.load_expert_weight( + mapping, loaded_weight, params_dict) + if is_continue: continue - - # Anyway, this is an expert weight and should not be - # attempted to load as other weights later - is_expert_weight = True - - # Do not modify `name` since the loop may continue here - # Instead, create a new variable - name_mapped = name.replace(weight_name, param_name) - - if is_pp_missing_parameter(name_mapped, self): - continue - - # Skip loading extra parameters for GPTQ/modelopt models. - if name_mapped.endswith( - ignore_suffixes - ) and name_mapped not in params_dict: - continue - - param = params_dict[name_mapped] - # We should ask the weight loader to return success or not - # here since otherwise we may skip experts with other - # available replicas. - weight_loader = typing.cast(Callable[..., bool], - param.weight_loader) - success = weight_loader(param, - loaded_weight, - name_mapped, - shard_id=shard_id, - expert_id=expert_id, - return_success=True) if success: - name = name_mapped break else: if is_expert_weight: From f1c53a6c1b15e46e614d9bd2ba454ca752e3972c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=99=BD=E6=B0=B8=E6=96=8C?= Date: Fri, 19 Sep 2025 09:35:14 +0800 Subject: [PATCH 3/4] fix bugs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 白永斌 --- ..._register_gpu.py => gpu_model_register.py} | 8 ++--- vllm/model_executor/models/deepseek_v2.py | 34 ------------------- vllm/model_executor/models/glm4_moe.py | 16 --------- vllm/model_executor/models/qwen3_moe.py | 34 ------------------- vllm/v1/worker/gpu_model_runner.py | 2 +- 5 files changed, 4 insertions(+), 90 deletions(-) rename vllm/distributed/eplb/{model_register_gpu.py => gpu_model_register.py} (94%) diff --git a/vllm/distributed/eplb/model_register_gpu.py b/vllm/distributed/eplb/gpu_model_register.py similarity index 94% rename from vllm/distributed/eplb/model_register_gpu.py rename to vllm/distributed/eplb/gpu_model_register.py index ee68767dff9d..2b32b48231d5 100644 --- a/vllm/distributed/eplb/model_register_gpu.py +++ b/vllm/distributed/eplb/gpu_model_register.py @@ -5,11 +5,9 @@ import torch from vllm.model_executor.layers.fused_moe import FusedMoE -from vllm.model_executor.models.utils import (PPMissingLayer, is_pp_missing_parameter, - make_empty_intermediate_tensors_factory, make_layers, - maybe_prefix) -import typing +from vllm.model_executor.models.utils import is_pp_missing_parameter from typing import Callable + def set_eplb_state( self, expert_load_view: torch.Tensor, @@ -51,7 +49,7 @@ def get_expert_mapping(self) -> list[tuple[str, str, int, str]]: ckpt_gate_proj_name="gate_proj", ckpt_down_proj_name="down_proj", ckpt_up_proj_name="up_proj", - num_experts=self.config.n_routed_experts + num_experts=self.config.n_routed_experts, num_redundant_experts=self.num_redundant_experts) def load_expert_weight(self, mapping, loaded_weight, params_dict): diff --git a/vllm/model_executor/models/deepseek_v2.py b/vllm/model_executor/models/deepseek_v2.py index d14c1b9f3a12..20e41b80e34d 100644 --- a/vllm/model_executor/models/deepseek_v2.py +++ b/vllm/model_executor/models/deepseek_v2.py @@ -863,40 +863,6 @@ def __init__(self, *, vllm_config: VllmConfig, prefix: str = ""): self.num_shared_experts = self.example_moe.n_shared_experts self.num_redundant_experts = self.example_moe.n_redundant_experts - def set_eplb_state( - self, - expert_load_view: torch.Tensor, - logical_to_physical_map: torch.Tensor, - logical_replica_count: torch.Tensor, - ) -> None: - for layer_idx, layer in enumerate(self.moe_layers): - # Register the expert weights. - self.expert_weights.append(layer.get_expert_weights()) - layer.set_eplb_state( - moe_layer_idx=layer_idx, - expert_load_view=expert_load_view, - logical_to_physical_map=logical_to_physical_map, - logical_replica_count=logical_replica_count, - ) - - def update_physical_experts_metadata( - self, - num_physical_experts: int, - num_local_physical_experts: int, - ) -> None: - assert self.num_local_physical_experts == num_local_physical_experts - self.num_physical_experts = num_physical_experts - self.num_local_physical_experts = num_local_physical_experts - self.num_redundant_experts = (num_physical_experts - - self.num_logical_experts) - for layer in self.model.layers: - if isinstance(layer.mlp, DeepseekV2MoE): - moe = layer.mlp - moe.n_local_physical_experts = num_local_physical_experts - moe.n_physical_experts = num_physical_experts - moe.n_redundant_experts = self.num_redundant_experts - moe.experts.update_expert_map() - def get_input_embeddings(self, input_ids: torch.Tensor) -> torch.Tensor: return self.model.get_input_embeddings(input_ids) diff --git a/vllm/model_executor/models/glm4_moe.py b/vllm/model_executor/models/glm4_moe.py index bede947cba66..0bad9eaec319 100644 --- a/vllm/model_executor/models/glm4_moe.py +++ b/vllm/model_executor/models/glm4_moe.py @@ -638,22 +638,6 @@ def __init__(self, *, vllm_config: VllmConfig, prefix: str = ""): self.num_shared_experts = self.example_moe.n_shared_experts self.num_redundant_experts = self.example_moe.n_redundant_experts - def set_eplb_state( - self, - expert_load_view: torch.Tensor, - logical_to_physical_map: torch.Tensor, - logical_replica_count: torch.Tensor, - ) -> None: - for layer_idx, layer in enumerate(self.moe_layers): - # Register the expert weights. - self.expert_weights.append(layer.get_expert_weights()) - layer.set_eplb_state( - moe_layer_idx=layer_idx, - expert_load_view=expert_load_view, - logical_to_physical_map=logical_to_physical_map, - logical_replica_count=logical_replica_count, - ) - def get_input_embeddings(self, input_ids: torch.Tensor) -> torch.Tensor: return self.model.get_input_embeddings(input_ids) diff --git a/vllm/model_executor/models/qwen3_moe.py b/vllm/model_executor/models/qwen3_moe.py index 4653701f9db3..5ad7d85a8b9e 100644 --- a/vllm/model_executor/models/qwen3_moe.py +++ b/vllm/model_executor/models/qwen3_moe.py @@ -600,40 +600,6 @@ def __init__(self, *, vllm_config: VllmConfig, prefix: str = ""): self.num_routed_experts = self.example_moe.n_routed_experts self.num_redundant_experts = self.example_moe.n_redundant_experts - def set_eplb_state( - self, - expert_load_view: torch.Tensor, - logical_to_physical_map: torch.Tensor, - logical_replica_count: torch.Tensor, - ) -> None: - for layer_idx, layer in enumerate(self.moe_layers): - # Register the expert weights. - self.expert_weights.append(layer.get_expert_weights()) - layer.set_eplb_state( - moe_layer_idx=layer_idx, - expert_load_view=expert_load_view, - logical_to_physical_map=logical_to_physical_map, - logical_replica_count=logical_replica_count, - ) - - def update_physical_experts_metadata( - self, - num_physical_experts: int, - num_local_physical_experts: int, - ) -> None: - assert self.num_local_physical_experts == num_local_physical_experts - self.num_physical_experts = num_physical_experts - self.num_local_physical_experts = num_local_physical_experts - self.num_redundant_experts = (num_physical_experts - - self.num_logical_experts) - for layer in self.model.layers: - if isinstance(layer.mlp, Qwen3MoeSparseMoeBlock): - moe = layer.mlp - moe.n_local_physical_experts = num_local_physical_experts - moe.n_physical_experts = num_physical_experts - moe.n_redundant_experts = self.num_redundant_experts - moe.experts.update_expert_map() - def get_input_embeddings(self, input_ids: torch.Tensor) -> torch.Tensor: return self.model.get_input_embeddings(input_ids) diff --git a/vllm/v1/worker/gpu_model_runner.py b/vllm/v1/worker/gpu_model_runner.py index 64d5051e1611..dd0f62011619 100644 --- a/vllm/v1/worker/gpu_model_runner.py +++ b/vllm/v1/worker/gpu_model_runner.py @@ -27,7 +27,7 @@ from vllm.config import (CompilationLevel, CUDAGraphMode, VllmConfig, get_layers_from_vllm_config, update_config) from vllm.distributed.eplb.eplb_state import EplbState -from vllm.distributed.eplb.model_register_gpu import model_register +from vllm.distributed.eplb.gpu_model_register import model_register from vllm.distributed.kv_transfer import (get_kv_transfer_group, has_kv_transfer_group) from vllm.distributed.kv_transfer.kv_connector.utils import copy_kv_blocks From 1f7ead35491e41448e98069ffa120f0299ccb40b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=99=BD=E6=B0=B8=E6=96=8C?= Date: Mon, 22 Sep 2025 15:05:03 +0800 Subject: [PATCH 4/4] fix expert weight identified incorrectly MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 白永斌 --- vllm/distributed/eplb/gpu_model_register.py | 26 ++++++++++----------- vllm/model_executor/models/deepseek_v2.py | 16 +++++++++---- vllm/model_executor/models/glm4_moe.py | 16 +++++++++---- vllm/model_executor/models/qwen3_moe.py | 16 +++++++++---- 4 files changed, 45 insertions(+), 29 deletions(-) diff --git a/vllm/distributed/eplb/gpu_model_register.py b/vllm/distributed/eplb/gpu_model_register.py index 2b32b48231d5..9238205741c3 100644 --- a/vllm/distributed/eplb/gpu_model_register.py +++ b/vllm/distributed/eplb/gpu_model_register.py @@ -2,7 +2,7 @@ # SPDX-FileCopyrightText: Copyright contributors to the vLLM project import types - +import typing import torch from vllm.model_executor.layers.fused_moe import FusedMoE from vllm.model_executor.models.utils import is_pp_missing_parameter @@ -52,23 +52,23 @@ def get_expert_mapping(self) -> list[tuple[str, str, int, str]]: num_experts=self.config.n_routed_experts, num_redundant_experts=self.num_redundant_experts) -def load_expert_weight(self, mapping, loaded_weight, params_dict): +def load_expert_weight(self, mapping, name, loaded_weight, params_dict): ignore_suffixes = (".bias", "_bias", ".k_scale", "_k_scale", ".v_scale", "_v_scale", ".weight_scale", "_weight_scale", ".input_scale", "_input_scale") + expert_matched = False is_continue = False - is_expert_weight = False success = False - + name_mapped = '' param_name, weight_name, expert_id, shard_id = mapping if weight_name not in name: is_continue = True - return is_continue, is_expert_weight, success + return expert_matched, is_continue, success, name_mapped # Anyway, this is an expert weight and should not be # attempted to load as other weights later - is_expert_weight = True + expert_matched = True # Do not modify `name` since the loop may continue here # Instead, create a new variable @@ -76,14 +76,13 @@ def load_expert_weight(self, mapping, loaded_weight, params_dict): if is_pp_missing_parameter(name_mapped, self): is_continue = True - return is_continue, is_expert_weight, success + return expert_matched, is_continue, success, name_mapped # Skip loading extra parameters for GPTQ/modelopt models. - if name_mapped.endswith( - ignore_suffixes - ) and name_mapped not in params_dict: + if name_mapped.endswith(ignore_suffixes) \ + and name_mapped not in params_dict: is_continue = True - return is_continue, is_expert_weight, success + return expert_matched, is_continue, success, name_mapped param = params_dict[name_mapped] # We should ask the weight loader to return success or not @@ -97,9 +96,7 @@ def load_expert_weight(self, mapping, loaded_weight, params_dict): shard_id=shard_id, expert_id=expert_id, return_success=True) - if success: - name = name_mapped - return is_continue, is_expert_weight, success + return expert_matched, is_continue, success, name_mapped def model_register(model): """ @@ -116,3 +113,4 @@ def model_register(model): types.MethodType(update_physical_experts_metadata, model) model.model.get_expert_mapping = \ types.MethodType(get_expert_mapping, model.model) + print("register complete") diff --git a/vllm/model_executor/models/deepseek_v2.py b/vllm/model_executor/models/deepseek_v2.py index 20e41b80e34d..da2c91eae77f 100644 --- a/vllm/model_executor/models/deepseek_v2.py +++ b/vllm/model_executor/models/deepseek_v2.py @@ -895,10 +895,10 @@ def load_weights(self, weights: Iterable[tuple[str, ("fused_qkv_a_proj", "q_a_proj", 0), ("fused_qkv_a_proj", "kv_a_proj_with_mqa", 1), ] - + from vllm.distributed.eplb.gpu_model_register import get_expert_mapping, load_expert_weight # Params for weights, fp8 weight scales, fp8 activation scales # (param_name, weight_name, expert_id, shard_id) - expert_params_mapping = self.model.get_expert_mapping() + expert_params_mapping = get_expert_mapping(self) params_dict = dict(self.named_parameters()) loaded_params: set[str] = set() for name, loaded_weight in weights: @@ -944,13 +944,19 @@ def load_weights(self, weights: Iterable[tuple[str, break else: is_expert_weight = False + is_continue = False for mapping in expert_params_mapping: - is_continue, is_expert_weight, success = \ - self.load_expert_weight( - mapping, loaded_weight, params_dict) + expert_matched, is_continue, success, name_mapped = \ + load_expert_weight(self, mapping, name, + loaded_weight, params_dict) + if expert_matched: + is_expert_weight = True + if is_continue: continue + if success: + name = name_mapped break else: if is_expert_weight: diff --git a/vllm/model_executor/models/glm4_moe.py b/vllm/model_executor/models/glm4_moe.py index 0bad9eaec319..c8c25abb0c8e 100644 --- a/vllm/model_executor/models/glm4_moe.py +++ b/vllm/model_executor/models/glm4_moe.py @@ -506,10 +506,10 @@ def load_weights(self, weights: Iterable[tuple[str, ("gate_up_proj", "gate_proj", 0), ("gate_up_proj", "up_proj", 1), ] - + from vllm.distributed.eplb.gpu_model_register import get_expert_mapping, load_expert_weight params_dict = dict(self.named_parameters()) loaded_params: set[str] = set() - expert_params_mapping = self.get_expert_mapping() + expert_params_mapping = get_expert_mapping(self) for name, loaded_weight in weights: spec_layer = get_spec_layer_idx_from_weight_name(self.config, name) if spec_layer is not None: @@ -539,13 +539,19 @@ def load_weights(self, weights: Iterable[tuple[str, break else: is_expert_weight = False + is_continue = False for mapping in expert_params_mapping: - is_continue, is_expert_weight, success = \ - self.load_expert_weight( - mapping, loaded_weight, params_dict) + expert_matched, is_continue, success, name_mapped = \ + load_expert_weight(self, mapping, name, + loaded_weight, params_dict) + if expert_matched: + is_expert_weight = True + if is_continue: continue + if success: + name = name_mapped break else: if is_expert_weight: diff --git a/vllm/model_executor/models/qwen3_moe.py b/vllm/model_executor/models/qwen3_moe.py index 5ad7d85a8b9e..416912e6ec63 100644 --- a/vllm/model_executor/models/qwen3_moe.py +++ b/vllm/model_executor/models/qwen3_moe.py @@ -453,10 +453,10 @@ def load_weights(self, weights: Iterable[tuple[str, ignore_suffixes = (".bias", "_bias", ".k_scale", "_k_scale", ".v_scale", "_v_scale", ".weight_scale", "_weight_scale", ".input_scale", "_input_scale") - + from vllm.distributed.eplb.gpu_model_register import get_expert_mapping, load_expert_weight params_dict = dict(self.named_parameters()) loaded_params: set[str] = set() - expert_params_mapping = self.get_expert_mapping() + expert_params_mapping = get_expert_mapping(self) for name, loaded_weight in weights: for (param_name, weight_name, shard_id) in stacked_params_mapping: # Skip non-stacked layers and experts (experts handled below). @@ -497,13 +497,19 @@ def load_weights(self, weights: Iterable[tuple[str, break else: is_expert_weight = False + is_continue = False for mapping in expert_params_mapping: - is_continue, is_expert_weight, success = \ - self.load_expert_weight( - mapping, loaded_weight, params_dict) + expert_matched, is_continue, success, name_mapped = \ + load_expert_weight(self, mapping, name, + loaded_weight, params_dict) + if expert_matched: + is_expert_weight = True + if is_continue: continue + if success: + name = name_mapped break else: if is_expert_weight: