diff --git a/fastdeploy/config.py b/fastdeploy/config.py index a2cc620a8cf..6ac5c12a05f 100644 --- a/fastdeploy/config.py +++ b/fastdeploy/config.py @@ -1588,10 +1588,6 @@ def check(self): f"be less than or equal to max_num_partial_prefills: {self.max_num_partial_prefills}" ) assert self.scheduler_config.splitwise_role in ["mixed", "prefill", "decode"] - # TODO(@wufeisheng): TP and EP need to be supported simultaneously. - assert (self.parallel_config.tensor_parallel_size == 1 and self.parallel_config.expert_parallel_size >= 1) or ( - self.parallel_config.tensor_parallel_size >= 1 and self.parallel_config.expert_parallel_size == 1 - ), "TP and EP cannot be enabled at the same time" if not self.cache_config.enable_chunked_prefill: if not envs.ENABLE_V1_KVCACHE_SCHEDULER: diff --git a/fastdeploy/model_executor/load_weight_utils.py b/fastdeploy/model_executor/load_weight_utils.py index 4bc30b779f1..5f7839955ec 100644 --- a/fastdeploy/model_executor/load_weight_utils.py +++ b/fastdeploy/model_executor/load_weight_utils.py @@ -188,7 +188,7 @@ def load_reordered_experts(model_path: str, key_name: str): return weight -def load_ep_checkpoint(model_path: str, fd_config: FDConfig, return_numpy: bool = False): +def load_ep_checkpoint(cls: PretrainedModel, model_path: str, fd_config: FDConfig, return_numpy: bool = False): """ load ep checkpoint """ @@ -266,6 +266,10 @@ def get_expert_ranges(fd_config): if k in weight_list: filtered_map[k] = weight_list[k] + if fd_config.parallel_config.tensor_parallel_size > 1: + tp_actions = cls._get_tensor_parallel_mappings(fd_config.model_config.pretrained_config) + new_actions = {k: v for k, v in tp_actions.items() if k not in num_local_ffn_keys} + state_dict = {} # Get all safetensor file paths that need to be opened safetensor_paths = set(filtered_map.values()) @@ -281,6 +285,9 @@ def get_expert_ranges(fd_config): for k in filtered_map: if filtered_map[k] == safetensor_path and k in f.keys(): weight = f.get_tensor(k) + if fd_config.parallel_config.tensor_parallel_size > 1: + if k in new_actions: + weight = new_actions[k](weight) if not return_numpy: weight = paddle.Tensor(weight, zero_copy=True) weight = weight._copy_to(paddle.framework._current_expected_place(), False) @@ -456,13 +463,8 @@ def load_composite_checkpoint( # 2. Tensor Parallel (TP) # 3. Pre-sharded (pre-split) """ - # (TODO: remove in the future) - if ( - fd_config.parallel_config.use_ep - and fd_config.speculative_config.model_type != "mtp" - and fd_config.parallel_config.tensor_parallel_size == 1 - ): - state_dict = load_ep_checkpoint(model_path, fd_config, return_numpy=True) + if fd_config.parallel_config.use_ep and fd_config.speculative_config.model_type != "mtp": + state_dict = load_ep_checkpoint(cls, model_path, fd_config, return_numpy=True) else: rank_dirs = [ f for f in os.listdir(model_path) if f.startswith("rank") and os.path.isdir(os.path.join(model_path, f)) diff --git a/fastdeploy/worker/worker_process.py b/fastdeploy/worker/worker_process.py index ab03d8ce427..6e58aa16386 100644 --- a/fastdeploy/worker/worker_process.py +++ b/fastdeploy/worker/worker_process.py @@ -720,7 +720,9 @@ def initialize_fd_config(args, ranks: int = 1, local_rank: int = 0) -> FDConfig: num_experts_per_rank = num_experts // parallel_config.expert_parallel_size num_experts_start_offset = expert_parallel_rank * num_experts_per_rank max_chips_per_node = 16 if current_platform.is_iluvatar() else 8 - parallel_config.local_data_parallel_id = expert_parallel_rank % max_chips_per_node + parallel_config.local_data_parallel_id = parallel_config.data_parallel_rank % ( + max_chips_per_node // parallel_config.tensor_parallel_size + ) parallel_config.expert_parallel_rank = expert_parallel_rank parallel_config.num_experts_per_rank = num_experts_per_rank