Skip to content

Conversation

@xunyoyo
Copy link
Contributor

@xunyoyo xunyoyo commented Nov 15, 2025

Motivation

NO.33 功能模块 fastdeploy/cache_manager/cache_messager.py 单测补充

Modifications

add tests/cache_manager/test_cache_messager.py

Usage or Command

cache_manager/cache_messager.py:

python -m coverage run -m unittest tests.cache_manager.test_cache_messager \
&& python -m coverage report -m --include='fastdeploy/cache_manager/cache_messager.py'

Accuracy Tests

tests/cache_manager/test_cache_messager.py:

Name                                         Stmts   Miss  Cover   Missing
--------------------------------------------------------------------------
fastdeploy/cache_manager/cache_messager.py     495     71    86%   256-265, 276-277, 279-280, 283, 295, 297, 299, 302-307, 314,
328-333, 360, 382-383, 430, 555, 557, 581, 592-600, 604, 613-617, 621-629, 651-652, 679, 696-697, 706-707, 733, 740-741, 755, 82
4, 863-864, 870-876
--------------------------------------------------------------------------
TOTAL                                          495     71    86%

旧覆盖

File	Stmts	Miss	Branch	BrPart	Cover(%)	Missing
fastdeploy/cache_manager/cache_messager.py	488	488	148	0	0	17-857

覆盖 ++417 行

Checklist

  • Add at least a tag in the PR title.
    • Tag list: [[FDConfig],[APIServer],[Engine], [Scheduler], [PD Disaggregation], [Executor], [Graph Optimization], [Speculative Decoding], [RL], [Models], [Quantization], [Loader], [OP], [KVCache], [DataProcessor], [BugFix], [Docs], [CI], [Optimization], [Feature], [Benchmark], [Others], [XPU], [HPU], [GCU], [DCU], [Iluvatar], [Metax]]
    • You can add new tags based on the PR content, but the semantics must be clear.
  • Format your code, run pre-commit before commit.
  • Add unit tests. Please write the reason in this PR if no unit tests.
  • Provide accuracy results.
  • If the current PR is submitting to the release branch, make sure the PR has been submitted to the develop branch, then cherry-pick it to the release branch with the [Cherry-Pick] PR tag.

Copilot AI review requested due to automatic review settings November 15, 2025 10:21
@paddle-bot
Copy link

paddle-bot bot commented Nov 15, 2025

Thanks for your contribution!

@paddle-bot paddle-bot bot added the contributor External developers label Nov 15, 2025
Copilot finished reviewing on behalf of xunyoyo November 15, 2025 10:24
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

本 PR 为 fastdeploy/cache_manager/cache_messager.py 模块补充单元测试,达到 86% 的测试覆盖率,作为 Hackathon 9th Sprint No.33 任务的一部分。

主要变更:

  • 添加了完整的单元测试文件 tests/cache_manager/test_cache_messager.py
  • 实现了对 CacheMessagerCacheMessagerV1 两个主要类的测试
  • 通过 mock 技术隔离了 Paddle 和其他依赖,使测试可以独立运行

return cache


class CacheMessagerInitTest(unittest.TestCase):
Copy link

Copilot AI Nov 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

测试类缺少文档字符串。建议为 CacheMessagerInitTestPrefillThreadTestHandleConnectTaskTest 等测试类添加文档字符串,说明该测试类测试的功能模块和目的。这有助于其他开发者理解测试的意图。

例如:

class CacheMessagerInitTest(unittest.TestCase):
    """Test the initialization of CacheMessager with different configurations."""

Copilot uses AI. Check for mistakes.
Comment on lines +577 to +582
with mock.patch("threading.Thread") as thread_cls:

def _fake_thread(*_args, **_kwargs):
return types.SimpleNamespace(start=lambda: None)

thread_cls.side_effect = _fake_thread
Copy link

Copilot AI Nov 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

使用 mock.patch("threading.Thread") 来阻止线程启动,但这种方式会影响整个测试环境中的所有线程创建。建议使用更精确的 patch 目标,例如 mock.patch.object(self.module, "threading") 或只 mock CacheMessagerV1 内部的线程创建逻辑。

另外,_fake_thread 函数可以简化为 lambda:

thread_cls.side_effect = lambda *args, **kwargs: types.SimpleNamespace(start=lambda: None)

Copilot uses AI. Check for mistakes.
Comment on lines +489 to +495
def _set_signals(instance):
step_key = f"splitwise_complete_prefilled_step_{instance.rank_id}_{instance.gpu_id}"
layer_key = f"splitwise_complete_prefilled_layer_{instance.rank_id}_{instance.gpu_id}"
_IPCSignal.instances[step_key].value[0] = 0
_IPCSignal.instances[layer_key].value[0] = 0

queue.signal_initializer = lambda: _set_signals(messager)
Copy link

Copilot AI Nov 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lambda 函数内部逻辑不够清晰。queue.signal_initializer = lambda: _set_signals(messager) 这种方式虽然简洁,但增加了代码理解难度。建议将 _set_signals 内联或者使用更清晰的命名。

更好的方式是直接在测试中设置信号值:

def _initialize_signals():
    step_key = f"splitwise_complete_prefilled_step_{messager.rank_id}_{messager.gpu_id}"
    layer_key = f"splitwise_complete_prefilled_layer_{messager.rank_id}_{messager.gpu_id}"
    _IPCSignal.instances[step_key].value[0] = 0
    _IPCSignal.instances[layer_key].value[0] = 0

queue.signal_initializer = _initialize_signals

这样更易于理解测试的意图。

Copilot generated this review using guidance from repository custom instructions.
Comment on lines +716 to +729
task = {
"request_id": "req-1",
"transfer_protocol": "ipc",
"device_ids": {0: 0},
"rdma_ports": {0: 0},
"src_block_ids": [0, 1],
"dest_block_ids": [2, 3],
"status": "init",
"sended_layer_id": -1,
"sended_block_num": 0,
"current_id": 0,
"need_prefill_tokens": 4,
}

Copy link

Copilot AI Nov 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

测试数据结构 task 字典包含大量字段(10+ 个键值对),但没有说明每个字段的含义。建议添加注释说明关键字段的作用,或者将其提取为辅助函数以提高可读性:

def _create_test_task(request_id, src_blocks, dest_blocks, current_id):
    """Create a test cache task with default values."""
    return {
        "request_id": request_id,
        "transfer_protocol": "ipc",
        "device_ids": {0: 0},
        "rdma_ports": {0: 0},
        "src_block_ids": src_blocks,
        "dest_block_ids": dest_blocks,
        # ... other fields
    }
Suggested change
task = {
"request_id": "req-1",
"transfer_protocol": "ipc",
"device_ids": {0: 0},
"rdma_ports": {0: 0},
"src_block_ids": [0, 1],
"dest_block_ids": [2, 3],
"status": "init",
"sended_layer_id": -1,
"sended_block_num": 0,
"current_id": 0,
"need_prefill_tokens": 4,
}
def _create_test_task(
request_id="req-1",
transfer_protocol="ipc",
device_ids=None,
rdma_ports=None,
src_block_ids=None,
dest_block_ids=None,
status="init",
sended_layer_id=-1,
sended_block_num=0,
current_id=0,
need_prefill_tokens=4,
):
"""
Helper to create a test cache task dictionary.
Args:
request_id (str): Unique identifier for the request.
transfer_protocol (str): Protocol used for transfer ("ipc", "rdma", etc.).
device_ids (dict): Mapping of device indices to IDs.
rdma_ports (dict): Mapping of device indices to RDMA port numbers.
src_block_ids (list): Source block indices.
dest_block_ids (list): Destination block indices.
status (str): Status of the task ("init", "pending", etc.).
sended_layer_id (int): Last sent layer index.
sended_block_num (int): Number of blocks sent.
current_id (int): Current block/task ID.
need_prefill_tokens (int): Number of tokens to prefill.
Returns:
dict: Task dictionary for cache operations.
"""
return {
"request_id": request_id,
"transfer_protocol": transfer_protocol,
"device_ids": device_ids if device_ids is not None else {0: 0},
"rdma_ports": rdma_ports if rdma_ports is not None else {0: 0},
"src_block_ids": src_block_ids if src_block_ids is not None else [0, 1],
"dest_block_ids": dest_block_ids if dest_block_ids is not None else [2, 3],
"status": status,
"sended_layer_id": sended_layer_id,
"sended_block_num": sended_block_num,
"current_id": current_id,
"need_prefill_tokens": need_prefill_tokens,
}
task = _create_test_task()

Copilot uses AI. Check for mistakes.


class ParseArgsTest(unittest.TestCase):
def test_parse_args_reads_cli_values(self):
Copy link

Copilot AI Nov 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

测试方法命名可以更加描述性。虽然当前命名遵循了 test_<what> 的约定,但可以考虑使用更详细的命名来说明测试场景。例如:

  • test_parse_args_reads_cli_valuestest_parse_args_with_all_parameters_returns_correct_values
  • test_initializes_with_ipc_and_rdmatest_cache_messager_initialization_with_ipc_and_rdma_protocols

这样可以使测试失败时更容易理解是哪个场景出了问题。

Suggested change
def test_parse_args_reads_cli_values(self):
def test_parse_args_with_all_parameters_returns_correct_values(self):

Copilot uses AI. Check for mistakes.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

可以按照这个评论,修改下已Test/test开头

Comment on lines +707 to +715
class _QueueStub:
def __init__(self, payloads):
self._payloads = list(payloads)

def get(self):
if not self._payloads:
raise SystemExit("stop prefill v1")
return self._payloads.pop(0)

Copy link

Copilot AI Nov 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

内部类 _QueueStub 的实现可以复用或提取到模块级别。如果其他测试也需要模拟队列行为,建议将其提取为通用的 mock 类,放在文件开头与其他 mock 类一起。这样可以提高代码复用性。

Suggested change
class _QueueStub:
def __init__(self, payloads):
self._payloads = list(payloads)
def get(self):
if not self._payloads:
raise SystemExit("stop prefill v1")
return self._payloads.pop(0)

Copilot uses AI. Check for mistakes.
Comment on lines +148 to +189
class _IPCCommManager:
def __init__(self, rank, gpu_id, cache_k, cache_v): # pylint: disable=unused-argument
self.rank = rank
self.gpu_id = gpu_id
self.cache_k = cache_k
self.cache_v = cache_v
self.write_calls = []
self.sync_targets = []

def write_cache(self, target_ip, target_id, src_block_ids, dest_block_ids, layer_idx):
self.write_calls.append((target_ip, target_id, tuple(src_block_ids), tuple(dest_block_ids), layer_idx))
return 0

def write_block_by_sync(self, target_id):
self.sync_targets.append(target_id)


class _RDMACommManager:
def __init__(
self,
splitwise_role,
rank,
gpu_id,
cache_k_ptr_list,
cache_v_ptr_list,
max_block_num,
block_bytes,
rdma_port,
): # pylint: disable=unused-argument
self.rank = rank
self.calls = []
self.connect_results = []

def connect(self, target_ip, target_id):
result = True if not self.connect_results else self.connect_results.pop(0)
self.calls.append((target_ip, target_id, result))
return result

def write_cache(self, *args, **kwargs): # pragma: no cover - compatibility helper
return 0


Copy link

Copilot AI Nov 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mock 类缺少类型提示和文档字符串。建议为 _IPCCommManager_RDMACommManager_EngineWorkerQueue 等 mock 类添加文档字符串,说明它们的用途和模拟的行为,以提高测试代码的可维护性。

例如:

class _IPCCommManager:
    """Mock implementation of IPCCommManager for testing purposes."""
    def __init__(self, rank, gpu_id, cache_k, cache_v):
        ...

Copilot uses AI. Check for mistakes.
Comment on lines 261 to 368
def _install_dependency_stubs():
paddle = _ensure_module("paddle")
paddle.Tensor = _FakeTensor
paddle.bfloat16 = "bfloat16"

def _full(shape, fill_value=0, dtype="float32"):
dtype_str = dtype if isinstance(dtype, str) else str(dtype)
return _FakeTensor(np.full(shape, fill_value), dtype=dtype_str)

def _to_tensor(data, dtype="float32", place=None): # pylint: disable=unused-argument
dtype_str = dtype if isinstance(dtype, str) else str(dtype)
return _FakeTensor(np.array(data), dtype=dtype_str)

paddle.full = _full
paddle.to_tensor = _to_tensor

def _set_device(_name):
return None

paddle.set_device = _set_device

device_mod = types.ModuleType("paddle.device")
device_mod.set_device = lambda _name: None
cuda_mod = types.ModuleType("paddle.device.cuda")
cuda_mod.memory_allocated = lambda: 0
device_mod.cuda = cuda_mod
paddle.device = device_mod
sys.modules["paddle.device"] = device_mod
sys.modules["paddle.device.cuda"] = cuda_mod

fastdeploy_pkg = _ensure_module("fastdeploy")
fastdeploy_pkg.__path__ = [str(PROJECT_ROOT / "fastdeploy")]

utils_module = types.ModuleType("fastdeploy.utils")
envs_module = types.ModuleType("fastdeploy.utils.envs")
envs_module.FD_ENGINE_TASK_QUEUE_WITH_SHM = False
envs_module.ENABLE_V1_KVCACHE_SCHEDULER = False

class _Logger:
def __init__(self):
self.messages = {"info": [], "debug": [], "error": []}

def info(self, msg):
self.messages["info"].append(msg)

def debug(self, msg):
self.messages["debug"].append(msg)

def error(self, msg):
self.messages["error"].append(msg)

def _get_logger(_name, _filename=None): # pylint: disable=unused-argument
return _Logger()

utils_module.envs = envs_module
utils_module.get_logger = _get_logger
sys.modules["fastdeploy.utils"] = utils_module
sys.modules["fastdeploy.utils.envs"] = envs_module
fastdeploy_pkg.utils = utils_module

transfer_factory = types.ModuleType("fastdeploy.cache_manager.transfer_factory")
transfer_factory.IPCCommManager = _IPCCommManager
transfer_factory.RDMACommManager = _RDMACommManager
sys.modules["fastdeploy.cache_manager.transfer_factory"] = transfer_factory

config_module = types.ModuleType("fastdeploy.config")

class _SpeculativeConfig:
def __init__(self, config_dict):
self.num_extra_cache_layer = config_dict.get("num_extra_cache_layer", 0)
self.num_gpu_block_expand_ratio = config_dict.get("num_gpu_block_expand_ratio", 0)

config_module.SpeculativeConfig = _SpeculativeConfig
sys.modules["fastdeploy.config"] = config_module
fastdeploy_pkg.config = config_module

inter_comm_module = types.ModuleType("fastdeploy.inter_communicator")
inter_comm_module.EngineWorkerQueue = _EngineWorkerQueue
inter_comm_module.IPCSignal = _IPCSignal
inter_comm_module.shared_memory_exists = lambda _name: False
sys.modules["fastdeploy.inter_communicator"] = inter_comm_module

ops_gpu_module = types.ModuleType("fastdeploy.model_executor.ops.gpu")

def _get_output_kv_signal(buffer, rank_id, flag): # pylint: disable=unused-argument
sequence = getattr(_get_output_kv_signal, "sequence", None)
if not sequence:
raise SystemExit("kv signal stop")

step = sequence.pop(0)
if step.get("stop"):
raise SystemExit("kv signal stop")

data = buffer.numpy()
data.fill(-1)
tasks = step.get("tasks", -1)
data[0] = tasks
if tasks == -1:
return
data[1] = step.get("layer", 0)
data[2] = step.get("engine", 0)
data[3] = step.get("offset", 0)
data[4] = step.get("current", 0)

ops_gpu_module.get_output_kv_signal = _get_output_kv_signal
ops_gpu_module.set_data_ipc = lambda *args, **kwargs: None
sys.modules["fastdeploy.model_executor.ops.gpu"] = ops_gpu_module

Copy link

Copilot AI Nov 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

函数 _install_dependency_stubs 过长(超过 100 行),职责过多。建议将其拆分为多个更小的函数,例如:

  • _install_paddle_stubs()
  • _install_fastdeploy_stubs()
  • _install_transfer_factory_stubs()
  • _install_inter_comm_stubs()
  • _install_ops_gpu_stubs()

这样可以提高代码的可读性和可维护性。

Copilot uses AI. Check for mistakes.
Comment on lines +345 to +365
def _get_output_kv_signal(buffer, rank_id, flag): # pylint: disable=unused-argument
sequence = getattr(_get_output_kv_signal, "sequence", None)
if not sequence:
raise SystemExit("kv signal stop")

step = sequence.pop(0)
if step.get("stop"):
raise SystemExit("kv signal stop")

data = buffer.numpy()
data.fill(-1)
tasks = step.get("tasks", -1)
data[0] = tasks
if tasks == -1:
return
data[1] = step.get("layer", 0)
data[2] = step.get("engine", 0)
data[3] = step.get("offset", 0)
data[4] = step.get("current", 0)

ops_gpu_module.get_output_kv_signal = _get_output_kv_signal
Copy link

Copilot AI Nov 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

使用了可变的函数属性 sequence 来存储状态(_get_output_kv_signal.sequence)。这种方式不够清晰且容易出错。建议使用类或者更明确的状态管理方式,例如将 sequence 存储在测试类的实例变量中,或者创建一个专门的 mock 类来管理这个状态。

例如:

class _MockOpsGpu:
    def __init__(self):
        self.kv_signal_sequence = []
    
    def get_output_kv_signal(self, buffer, rank_id, flag):
        if not self.kv_signal_sequence:
            raise SystemExit("kv signal stop")
        # ... rest of logic
Suggested change
def _get_output_kv_signal(buffer, rank_id, flag): # pylint: disable=unused-argument
sequence = getattr(_get_output_kv_signal, "sequence", None)
if not sequence:
raise SystemExit("kv signal stop")
step = sequence.pop(0)
if step.get("stop"):
raise SystemExit("kv signal stop")
data = buffer.numpy()
data.fill(-1)
tasks = step.get("tasks", -1)
data[0] = tasks
if tasks == -1:
return
data[1] = step.get("layer", 0)
data[2] = step.get("engine", 0)
data[3] = step.get("offset", 0)
data[4] = step.get("current", 0)
ops_gpu_module.get_output_kv_signal = _get_output_kv_signal
class _MockGetOutputKvSignal:
def __init__(self):
self.sequence = []
def __call__(self, buffer, rank_id, flag): # pylint: disable=unused-argument
if not self.sequence:
raise SystemExit("kv signal stop")
step = self.sequence.pop(0)
if step.get("stop"):
raise SystemExit("kv signal stop")
data = buffer.numpy()
data.fill(-1)
tasks = step.get("tasks", -1)
data[0] = tasks
if tasks == -1:
return
data[1] = step.get("layer", 0)
data[2] = step.get("engine", 0)
data[3] = step.get("offset", 0)
data[4] = step.get("current", 0)
ops_gpu_module.get_output_kv_signal = _MockGetOutputKvSignal()

Copilot uses AI. Check for mistakes.
import math
import sys
import types
import unittest
Copy link

Copilot AI Nov 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Module 'unittest' is imported with both 'import' and 'import from'.

Copilot uses AI. Check for mistakes.
@xunyoyo
Copy link
Contributor Author

xunyoyo commented Nov 17, 2025

@CSWYF3634076
Copy link
Collaborator

可以参考ai助手的建议规范化和精简部分代码,增加部分代码注释,cache_messager.py文件内容较多,可以更趋于理解工作原理然后进行整体化测试。
@YuanRisheng 也可以帮忙Review下

@YuanRisheng
Copy link
Collaborator

在pr描述里把miss代码行数减少了多少补充一下,其次我对ai review建议中感觉有必要修改的通过点赞标识了,建议修改

Updated copyright information and modified function names for clarity.
@xunyoyo
Copy link
Contributor Author

xunyoyo commented Nov 19, 2025

在pr描述里把miss代码行数减少了多少补充一下,其次我对ai review建议中感觉有必要修改的通过点赞标识了,建议修改

已经修改并添加许可证

@YuanRisheng
Copy link
Collaborator

0.4⭐️

@codecov-commenter
Copy link

Codecov Report

✅ All modified and coverable lines are covered by tests.
⚠️ Please upload report for BASE (develop@7bac016). Learn more about missing BASE report.

Additional details and impacted files
@@            Coverage Diff             @@
##             develop    #5056   +/-   ##
==========================================
  Coverage           ?   58.06%           
==========================================
  Files              ?      317           
  Lines              ?    38456           
  Branches           ?     5765           
==========================================
  Hits               ?    22329           
  Misses             ?    14331           
  Partials           ?     1796           
Flag Coverage Δ
GPU 58.06% <ø> (?)

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants