Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions fastdeploy/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -1588,10 +1588,6 @@ def check(self):
f"be less than or equal to max_num_partial_prefills: {self.max_num_partial_prefills}"
)
assert self.scheduler_config.splitwise_role in ["mixed", "prefill", "decode"]
# TODO(@wufeisheng): TP and EP need to be supported simultaneously.
assert (self.parallel_config.tensor_parallel_size == 1 and self.parallel_config.expert_parallel_size >= 1) or (
self.parallel_config.tensor_parallel_size >= 1 and self.parallel_config.expert_parallel_size == 1
), "TP and EP cannot be enabled at the same time"

if not self.cache_config.enable_chunked_prefill:
if not envs.ENABLE_V1_KVCACHE_SCHEDULER:
Expand Down
18 changes: 10 additions & 8 deletions fastdeploy/model_executor/load_weight_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ def load_reordered_experts(model_path: str, key_name: str):
return weight


def load_ep_checkpoint(model_path: str, fd_config: FDConfig, return_numpy: bool = False):
def load_ep_checkpoint(cls: PretrainedModel, model_path: str, fd_config: FDConfig, return_numpy: bool = False):
"""
load ep checkpoint
"""
Expand Down Expand Up @@ -266,6 +266,10 @@ def get_expert_ranges(fd_config):
if k in weight_list:
filtered_map[k] = weight_list[k]

if fd_config.parallel_config.tensor_parallel_size > 1:
tp_actions = cls._get_tensor_parallel_mappings(fd_config.model_config.pretrained_config)
new_actions = {k: v for k, v in tp_actions.items() if k not in num_local_ffn_keys}

state_dict = {}
# Get all safetensor file paths that need to be opened
safetensor_paths = set(filtered_map.values())
Expand All @@ -281,6 +285,9 @@ def get_expert_ranges(fd_config):
for k in filtered_map:
if filtered_map[k] == safetensor_path and k in f.keys():
weight = f.get_tensor(k)
if fd_config.parallel_config.tensor_parallel_size > 1:
if k in new_actions:
weight = new_actions[k](weight)
if not return_numpy:
weight = paddle.Tensor(weight, zero_copy=True)
weight = weight._copy_to(paddle.framework._current_expected_place(), False)
Expand Down Expand Up @@ -456,13 +463,8 @@ def load_composite_checkpoint(
# 2. Tensor Parallel (TP)
# 3. Pre-sharded (pre-split)
"""
# (TODO: remove in the future)
if (
fd_config.parallel_config.use_ep
and fd_config.speculative_config.model_type != "mtp"
and fd_config.parallel_config.tensor_parallel_size == 1
):
state_dict = load_ep_checkpoint(model_path, fd_config, return_numpy=True)
if fd_config.parallel_config.use_ep and fd_config.speculative_config.model_type != "mtp":
state_dict = load_ep_checkpoint(cls, model_path, fd_config, return_numpy=True)
else:
rank_dirs = [
f for f in os.listdir(model_path) if f.startswith("rank") and os.path.isdir(os.path.join(model_path, f))
Expand Down
4 changes: 3 additions & 1 deletion fastdeploy/worker/worker_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -720,7 +720,9 @@ def initialize_fd_config(args, ranks: int = 1, local_rank: int = 0) -> FDConfig:
num_experts_per_rank = num_experts // parallel_config.expert_parallel_size
num_experts_start_offset = expert_parallel_rank * num_experts_per_rank
max_chips_per_node = 16 if current_platform.is_iluvatar() else 8
parallel_config.local_data_parallel_id = expert_parallel_rank % max_chips_per_node
parallel_config.local_data_parallel_id = parallel_config.data_parallel_rank % (
max_chips_per_node // parallel_config.tensor_parallel_size
)

parallel_config.expert_parallel_rank = expert_parallel_rank
parallel_config.num_experts_per_rank = num_experts_per_rank
Expand Down
Loading