[Loader]add multi-thread model loading#6877
Conversation
|
Thanks for your contribution! |
7423eac to
2c8f1e0
Compare
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## develop #6877 +/- ##
==========================================
Coverage ? 74.16%
==========================================
Files ? 383
Lines ? 53593
Branches ? 8399
==========================================
Hits ? 39748
Misses ? 11153
Partials ? 2692
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
| help="The format of the model weights to load. default/default_v1/dummy.", | ||
| ) | ||
|
|
||
| parser.add_argument( |
There was a problem hiding this comment.
能否直接默认开启?会有之前显存碎片的问题吗?
There was a problem hiding this comment.
会有显存碎片的问题,而且对硬件有要求,所以没法默认开启
| from fastdeploy.model_executor.layers.linear import KVBatchLinear | ||
| from fastdeploy.model_executor.utils import multi_switch_config_context | ||
|
|
||
| DEFAULT_NUM_THREADS = 8 |
There was a problem hiding this comment.
Pull request overview
该 PR 旨在为基于 safetensors 的模型权重加载引入可选的多线程加载能力,并将相关配置从 API Server/Engine 透传到 Worker 与权重加载逻辑中,以提升大模型启动加载速度。
Changes:
- 新增
model_loader_extra_config配置项与 CLI 参数,并在 Engine 启动 Worker 时透传该配置 - 扩展
get_weight_iterator支持读取LoadConfig,按配置启用多线程 safetensors shard 加载 - 新增
multi_thread_safetensors_weights_iterator以并行读取多个 safetensors 分片
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| fastdeploy/worker/worker_process.py | 新增 Worker 侧 --model_loader_extra_config 参数解析(JSON) |
| fastdeploy/model_executor/model_loader/default_loader_v1.py | 调整权重迭代器获取方式,将 fd_config.load_config 传入以支持扩展加载策略 |
| fastdeploy/model_executor/load_weight_utils.py | 引入多线程 safetensors shard 加载迭代器,并在 get_weight_iterator 中按配置选择加载路径 |
| fastdeploy/engine/engine.py | 启动 Worker 进程时透传 model_loader_extra_config |
| fastdeploy/engine/common_engine.py | 同上:在另一套引擎启动逻辑中透传 model_loader_extra_config |
| fastdeploy/engine/args_utils.py | 为 API Server/Engine CLI 新增 --model-loader-extra-config 参数并接入 EngineArgs |
| fastdeploy/config.py | 为 LoadConfig 增加 model_loader_extra_config 字段以承载加载器扩展配置 |
| extra_config = load_config.model_loader_extra_config if load_config else None | ||
| if extra_config is not None and extra_config.get("enable_multithread_load", False): | ||
| weights_iterator = multi_thread_safetensors_weights_iterator( | ||
| files_list, | ||
| max_workers=extra_config.get("num_threads", DEFAULT_NUM_THREADS), | ||
| disable_mmap=extra_config.get("disable_mmap", False), | ||
| ) |
There was a problem hiding this comment.
num_threads 直接透传到 ThreadPoolExecutor(max_workers=...),当用户传入 0/负数/非 int 时会直接抛 ValueError 导致加载失败。建议在读取 extra_config 时对线程数做校验与兜底(例如强制转换为 int 并 clamp 到 >=1,同时可上限为 len(files_list))。
| def get_weight_iterator(model_path: str, load_config: Optional[LoadConfig] = None): | ||
| files_list, ordered_weight_map, use_safetensors, is_layers_are_grouped = get_all_weights_file(model_path) | ||
| if use_safetensors: | ||
| if is_layers_are_grouped: | ||
| weights_iterator = safetensors_weights_iterator(files_list) | ||
| extra_config = load_config.model_loader_extra_config if load_config else None | ||
| if extra_config is not None and extra_config.get("enable_multithread_load", False): | ||
| weights_iterator = multi_thread_safetensors_weights_iterator( | ||
| files_list, | ||
| max_workers=extra_config.get("num_threads", DEFAULT_NUM_THREADS), | ||
| disable_mmap=extra_config.get("disable_mmap", False), | ||
| ) |
There was a problem hiding this comment.
新增的多线程 safetensors 加载分支(enable_multithread_load / disable_mmap)目前没有对应单测覆盖。仓库已存在 tests/model_executor/test_load_weight_utils.py 覆盖其它 iterator 行为,建议补充用例验证多线程分支能正确产出权重、并覆盖 disable_mmap=True/False 的路径。
64436f9 to
a891bfb
Compare
d8814cb to
6d9d9e5
Compare
PaddlePaddle-bot
left a comment
There was a problem hiding this comment.
🤖 AI Code Review |
2026-04-10 02:57 CST
📋 Review 摘要
PR 概述:实现多线程模型加载功能,用于提升 NVMe SSD 上的权重加载性能
变更范围:load_weight_utils.py(核心实现)、配置传递链路、文档、测试
影响面 Tag:[Loader] [Optimization]
📝 PR 规范检查
✓ PR 标题包含 [Loader] 标签
✓ 描述包含 Motivation 和 Modifications
✓ 有使用示例和性能测试数据
无 PR 规范问题。
问题
| 级别 | 文件 | 概述 |
|---|---|---|
| 🟡 建议 | load_weight_utils.py:433 |
异常捕获过于宽泛 |
| 🟡 建议 | load_weight_utils.py:441 |
_load_file 函数缺少异常处理 |
| 🟡 建议 | load_weight_utils.py:439 |
disable_mmap=True 时可能导致 OOM |
| 🟡 建议 | load_weight_utils.py:445 |
ThreadPoolExecutor 缺少 thread_name_prefix |
| 🟡 建议 | load_weight_utils.py:127 |
缺少参数验证 |
总体评价
功能实现正确,性能提升明显(1.6×加速)。代码结构清晰,配置传递链路完整。但建议增强异常处理、参数验证以提高代码健壮性。
| """ | ||
| try: | ||
| enable_tqdm = dist.get_rank() == 0 | ||
| except Exception: |
There was a problem hiding this comment.
🟡 建议 异常捕获过于宽泛
当前使用 except Exception 捕获所有异常但没有处理或记录。建议捕获特定异常类型(如 RuntimeError),或在异常发生时记录日志,以便在分布式环境下调试问题。
| with open(st_file, "rb") as f: | ||
| result = safetensors.paddle.load(f.read()) | ||
| else: | ||
| result = safetensors.paddle.load_file(st_file, device="cpu") |
There was a problem hiding this comment.
🟡 建议 _load_file 函数缺少异常处理
如果文件不存在、读取失败或 safetensors 解析失败,会导致整个加载过程失败。建议添加 try-except 并提供有意义的错误信息:
def _load_file(st_file: str):
try:
if disable_mmap:
with open(st_file, "rb") as f:
result = safetensors.paddle.load(f.read())
else:
result = safetensors.paddle.load_file(st_file, device="cpu")
return result
except FileNotFoundError:
logger.error(f"Weight file not found: {st_file}")
raise
except Exception as e:
logger.error(f"Failed to load weight file {st_file}: {e}")
raise| def _load_file(st_file: str): | ||
| if disable_mmap: | ||
| with open(st_file, "rb") as f: | ||
| result = safetensors.paddle.load(f.read()) |
There was a problem hiding this comment.
🟡 建议 disable_mmap=True 时可能导致 OOM
f.read() 会将整个文件读入内存。对于大型模型文件(如 100GB+),这可能导致内存溢出。建议:
- 添加文件大小检查和警告
- 在文档中明确说明 disable_mmap 仅适用于小文件或 mmap 不支持的场景
|
|
||
| return result | ||
|
|
||
| with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: |
There was a problem hiding this comment.
🟡 建议 ThreadPoolExecutor 建议添加 thread_name_prefix
添加 thread_name_prefix="safetensors_loader" 便于在调试时识别线程来源。
| if extra_config is not None and extra_config.get("enable_multithread_load", False): | ||
| weights_iterator = multi_thread_safetensors_weights_iterator( | ||
| files_list, | ||
| max_workers=extra_config.get("num_threads", DEFAULT_NUM_THREADS), |
There was a problem hiding this comment.
🟡 建议 缺少参数验证
建议在 get_weight_iterator 函数中添加参数验证:
num_threads = extra_config.get("num_threads", DEFAULT_NUM_THREADS)
if not isinstance(num_threads, int) or num_threads <= 0:
raise ValueError(f"num_threads must be a positive integer, got {num_threads}")
if num_threads > 32:
logger.warning(f"num_threads={num_threads} is unusually high, consider reducing it")
Motivation
Modifications
测试 Deepseek-V3 tp8 fp8动态量化 测试 in NVME ssd PCIE Gen4 (Speed 16GT/s, Width x4) 设备下有明显加载性能提升,但是内存占用会有额外的增加
Usage or Command
Accuracy Tests
Checklist
[FDConfig],[APIServer],[Engine],[Scheduler],[PD Disaggregation],[Executor],[Graph Optimization],[Speculative Decoding],[RL],[Models],[Quantization],[Loader],[OP],[KVCache],[DataProcessor],[BugFix],[Docs],[CI],[Optimization],[Feature],[Benchmark],[Others],[XPU],[HPU],[GCU],[DCU],[Iluvatar],[Metax]]pre-commitbefore commit.releasebranch, make sure the PR has been submitted to thedevelopbranch, then cherry-pick it to thereleasebranch with the[Cherry-Pick]PR tag.