[Cherry-Pick][Optimization] Enable distributed communication environment variables by default (#7746)#7784
Conversation
…ITH_SHM (PaddlePaddle#7746) * [test] Stop server with /dev/shm cleanup * cleanup shm by clean_ports * kill_process_by_unix_socket * add engine_worker_queue.is_broken * Failed to connect to engine worker queue, retry after 5 seconds * test_Qwen2-7B-Instruct_offline * sys.path.insert(0, project_root) * Cleaning unix socket for all ports * add is_file_socket_available * clearup dev/shm/* for xpu --------- Co-authored-by: Jiang-Jia-Jun <163579578+Jiang-Jia-Jun@users.noreply.github.com>
|
Thanks for your contribution! |
PaddlePaddle-bot
left a comment
There was a problem hiding this comment.
🤖 Paddle-CI-Agent | pr_review |
2026-05-12 13:43:49
📋 Review 摘要
PR 概述:默认开启 FD_ENABLE_E2W_TENSOR_CONVERT 和 FD_ENGINE_TASK_QUEUE_WITH_SHM 两个环境变量,并补充 SHM 模式下 Unix socket 生命周期管理(is_broken() 检测、端口可用性联查、测试清理工具)。
变更范围:fastdeploy/envs.py、fastdeploy/engine/common_engine.py、fastdeploy/inter_communicator/engine_worker_queue.py、fastdeploy/utils.py、测试工具
影响面 Tag:[FDConfig] [Engine] [CI]
📝 PR 规范检查
PR 目标分支为 release/2.6,属于 cherry-pick 场景,标题缺少 [Cherry-Pick] 前缀;PR 各描述段落(Motivation / Modifications / Usage or Command / Accuracy Tests)均为空占位符,未填写实际内容。
标题建议(可直接复制):
[Cherry-Pick][FDConfig] 默认开启 FD_ENABLE_E2W_TENSOR_CONVERT 和 FD_ENGINE_TASK_QUEUE_WITH_SHM(#7746)
PR 描述建议(可直接复制):
## Motivation
默认开启 `FD_ENABLE_E2W_TENSOR_CONVERT` 和 `FD_ENGINE_TASK_QUEUE_WITH_SHM` 两个环境变量,并补充 SHM 模式下 Unix socket 清理逻辑,提升系统稳定性与资源释放可靠性。
## Modifications
- `fastdeploy/envs.py`:将 `FD_ENABLE_E2W_TENSOR_CONVERT` 和 `FD_ENGINE_TASK_QUEUE_WITH_SHM` 默认值从 `0` 改为 `1`,默认启用这两个功能
- `fastdeploy/inter_communicator/engine_worker_queue.py`:新增 `is_broken()` 方法,通过尝试连接检测队列是否已断开
- `fastdeploy/engine/common_engine.py`:在任务插入失败时检查队列连接状态,若队列已断开则等待 5 秒后重试
- `fastdeploy/utils.py`:新增 `is_file_socket_available()` 函数,在 SHM 模式下将 Unix socket 可用性纳入端口可用性判断
- `tests/e2e/utils/serving_utils.py`:新增 `kill_process_by_unix_socket()`、`cleanup_unix_socket()` 工具函数,在 `clean_ports()` 中补充 Unix socket 清理逻辑
- `tests/ci_use/*`:将重复的 `is_port_open`、`clean_ports` 等实现迁移至 `e2e.utils.serving_utils` 共享模块
- `tests/utils/test_find_free_ports.py`:新增 `find_free_ports` 单元测试
- `tests/xpu_ci/conftest.py`:在 teardown 中补充 `/dev/shm` 临时文件清理
## Usage or Command
N/A
## Accuracy Tests
N/A
## Checklist
- [x] Add at least a tag in the PR title.
- Tag list: [`[FDConfig]`,`[APIServer]`,`[Engine]`, `[Scheduler]`, `[PD Disaggregation]`, `[Executor]`, `[Graph Optimization]`, `[Speculative Decoding]`, `[RL]`, `[Models]`, `[Quantization]`, `[Loader]`, `[OP]`, `[KVCache]`, `[DataProcessor]`, `[BugFix]`, `[Docs]`, `[CI]`, `[Optimization]`, `[Feature]`, `[Benchmark]`, `[Others]`, `[XPU]`, `[HPU]`, `[GCU]`, `[DCU]`, `[Iluvatar]`, `[Metax]`]
- You can add new tags based on the PR content, but the semantics must be clear.
- [ ] Format your code, run `pre-commit` before commit.
- [ ] Add unit tests. Please write the reason in this PR if no unit tests.
- [ ] Provide accuracy results.
- [x] If the current PR is submitting to the `release` branch, make sure the PR has been submitted to the `develop` branch, then cherry-pick it to the `release` branch with the `[Cherry-Pick]` PR tag.问题
| 级别 | 文件 | 概述 |
|---|---|---|
| 🟡 建议 | fastdeploy/utils.py:672 |
is_file_socket_available() 中未预期的 OSError(如权限拒绝)返回 False,可能误判端口不可用 |
| 🟡 建议 | tests/ci_use/EB_Lite_with_adapter/test_eblite_serving.py:92 |
rm -rf /dev/shm/* 清理范围过广,共享 CI 环境下存在干扰风险 |
| 🟡 建议 | tests/xpu_ci/conftest.py:104 |
同上:rm -rf /dev/shm/* 清理范围过广 |
| 🟡 建议 | tests/ci_use/Qwen2-7B-Instruct_offline/test_Qwen2-7B-Instruct_offline.py:84 |
time.sleep(2) 替代原有超时轮询逻辑,慢速环境下存在 flaky test 风险 |
总体评价
功能改动清晰,两个环境变量的默认值调整和 SHM 生命周期管理逻辑合理,测试工具重构方向正确。PR 描述需要补充实际内容,同时建议收窄 /dev/shm 清理范围、恢复端口就绪轮询以提高 CI 可靠性。
| if e.errno in (errno.ECONNREFUSED, errno.ENOENT): | ||
| # Stale socket file: exists but nobody is listening | ||
| return True | ||
| return False |
There was a problem hiding this comment.
🟡 建议 is_file_socket_available() 中,对于非 ECONNREFUSED/ENOENT 的 OSError(如 EACCES 权限拒绝、ECONNABORTED 等)直接返回 False,将导致端口被误判为「不可用」,最终 find_free_ports 可能无法找到可用端口。
建议修复方式:对非预期错误记录日志并返回 True(保守策略,让 TCP 层绑定去兜底),或显式列举应返回 False 的错误码:
except OSError as e:
if e.errno in (errno.ECONNREFUSED, errno.ENOENT):
return True
# 其他 OSError(如 EACCES)视为无法判断,保守返回 True
llm_logger.warning(f"Unexpected OSError when checking socket {socket_path}: {e}")
return True| - Tears down server after all tests finish | ||
| """ | ||
| # 清理/dev/shm中的临时文件 | ||
| try: |
There was a problem hiding this comment.
🟡 建议 rm -rf /dev/shm/* 会清除 /dev/shm 下所有文件,在共享 CI 环境中可能误删其他并发测试或进程(如其他实例的 socket 文件、POSIX 共享内存等),导致不相关任务失败。
建议只清理本次测试已知的文件(例如 fd_task_queue_*.sock),或仅在独占 CI 机器上执行全量清理:
import glob
for f in glob.glob("/dev/shm/fd_task_queue_*.sock"):
try:
os.remove(f)
except Exception:
pass| for cmd in commands: | ||
| safe_kill_cmd(cmd) | ||
|
|
||
| try: |
There was a problem hiding this comment.
🟡 建议 与 test_eblite_serving.py 相同:rm -rf /dev/shm/* 清理范围过广,在多任务共享 CI 环境中存在干扰风险。建议改为只删除 FastDeploy 自身创建的文件(/dev/shm/fd_task_queue_*.sock 等)。
| cache_queue_port=FD_CACHE_QUEUE_PORT, | ||
| max_model_len=32768, | ||
| quantization="wint8", | ||
| logits_processors=["LogitBiasLogitsProcessor"], |
There was a problem hiding this comment.
🟡 建议 time.sleep(2) 替换了原有「等待端口就绪」的超时轮询逻辑,在慢速机器或负载高的 CI 环境中 2 秒可能不够,导致后续推理调用失败,出现偶发性测试不稳定(flaky test)。
建议恢复轮询或使用更可靠的健康检查:
wait_start = time.time()
while not is_port_open("127.0.0.1", FD_ENGINE_QUEUE_PORT):
if time.time() - wait_start > MAX_WAIT_SECONDS:
pytest.fail(f"Engine did not start within {MAX_WAIT_SECONDS}s")
time.sleep(1)
CI报告基于以下代码生成(30分钟更新一次): 1 任务总览
2 任务状态汇总2.1 Required任务 : 8/10 通过
2.2 可选任务 — 21/25 通过
3 失败详情(仅 required)Approval — 代码审批(置信度: 高)Approval
根因详情: 检测到 2 个审批缺失错误:
关键日志: 修复建议:
修复建议摘要: 请RD成员审批envs.py修改及Cherry-Pick规范(共2处) 关联变更: 链接: 查看日志 |
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## release/2.6 #7784 +/- ##
==============================================
Coverage ? 72.86%
==============================================
Files ? 378
Lines ? 53963
Branches ? 8440
==============================================
Hits ? 39321
Misses ? 11859
Partials ? 2783
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
0077822
into
PaddlePaddle:release/2.6
devPR:#7746
Motivation
默认开启
FD_ENABLE_E2W_TENSOR_CONVERT和FD_ENGINE_TASK_QUEUE_WITH_SHM两个环境变量,并补充 SHM 模式下 Unix socket 清理逻辑,提升系统稳定性与资源释放可靠性。Modifications
fastdeploy/envs.py:将FD_ENABLE_E2W_TENSOR_CONVERT和FD_ENGINE_TASK_QUEUE_WITH_SHM默认值从0改为1,默认启用这两个功能fastdeploy/inter_communicator/engine_worker_queue.py:新增is_broken()方法,通过尝试连接检测队列是否已断开fastdeploy/engine/common_engine.py:在任务插入失败时检查队列连接状态,若队列已断开则等待 5 秒后重试fastdeploy/utils.py:新增is_file_socket_available()函数,在 SHM 模式下将 Unix socket 可用性纳入端口可用性判断tests/e2e/utils/serving_utils.py:新增kill_process_by_unix_socket()、cleanup_unix_socket()工具函数,在clean_ports()中补充 Unix socket 清理逻辑tests/ci_use/*:将重复的is_port_open、clean_ports等实现迁移至e2e.utils.serving_utils共享模块tests/utils/test_find_free_ports.py:新增find_free_ports单元测试tests/xpu_ci/conftest.py:在 teardown 中补充/dev/shm临时文件清理Usage or Command
N/A
Accuracy Tests
N/A
Checklist
[FDConfig],[APIServer],[Engine],[Scheduler],[PD Disaggregation],[Executor],[Graph Optimization],[Speculative Decoding],[RL],[Models],[Quantization],[Loader],[OP],[KVCache],[DataProcessor],[BugFix],[Docs],[CI],[Optimization],[Feature],[Benchmark],[Others],[XPU],[HPU],[GCU],[DCU],[Iluvatar],[Metax]]pre-commitbefore commit.releasebranch, make sure the PR has been submitted to thedevelopbranch, then cherry-pick it to thereleasebranch with the[Cherry-Pick]PR tag.