Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions tests/engine/test_kv_cache_interface.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import unittest

from fastdeploy.engine.kv_cache_interface import AttentionSpec, KVCacheSpec


class TestKVCacheSpec(unittest.TestCase):

def test_merge_valid(self):
# Create two valid KVCacheSpec objects with the same block_size and block_memory_used
spec1 = KVCacheSpec(block_size=256, block_memory_used=1024)
spec2 = KVCacheSpec(block_size=256, block_memory_used=1024)

merged_spec = KVCacheSpec.merge([spec1, spec2])

self.assertEqual(merged_spec.block_size, spec1.block_size)
self.assertEqual(merged_spec.block_memory_used, spec1.block_memory_used)

def test_merge_invalid(self):
spec1 = KVCacheSpec(block_size=256, block_memory_used=1024)
spec2 = KVCacheSpec(block_size=512, block_memory_used=1024)

with self.assertRaises(AssertionError):
KVCacheSpec.merge([spec1, spec2])

def test_attention_spec_inheritance(self):
# Create an AttentionSpec object
attention_spec = AttentionSpec(
block_size=256, block_memory_used=1024, num_kv_heads=12, head_size=64, dtype="float32"
)

self.assertEqual(attention_spec.block_size, 256)
self.assertEqual(attention_spec.block_memory_used, 1024)
self.assertEqual(attention_spec.num_kv_heads, 12)
self.assertEqual(attention_spec.head_size, 64)
self.assertEqual(attention_spec.dtype, "float32")

def test_attention_spec_merge(self):
# Create two AttentionSpec objects with the same attributes
spec1 = AttentionSpec(block_size=256, block_memory_used=1024, num_kv_heads=12, head_size=64, dtype="float32")
spec2 = AttentionSpec(block_size=256, block_memory_used=1024, num_kv_heads=12, head_size=64, dtype="float32")

merged_spec = AttentionSpec.merge([spec1, spec2])

self.assertEqual(merged_spec.block_size, spec1.block_size)
self.assertEqual(merged_spec.block_memory_used, spec1.block_memory_used)
self.assertEqual(merged_spec.num_kv_heads, spec1.num_kv_heads)
self.assertEqual(merged_spec.head_size, spec1.head_size)
self.assertEqual(merged_spec.dtype, spec1.dtype)

def test_attention_spec_merge_invalid(self):
# Create two AttentionSpec objects with different attributes
spec1 = AttentionSpec(block_size=256, block_memory_used=1024, num_kv_heads=12, head_size=64, dtype="float32")
spec2 = AttentionSpec(block_size=512, block_memory_used=1024, num_kv_heads=12, head_size=64, dtype="float32")

with self.assertRaises(AssertionError):
AttentionSpec.merge([spec1, spec2])


if __name__ == "__main__":
unittest.main()
169 changes: 169 additions & 0 deletions tests/layers/test_moba_attention_backend.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
import unittest
from unittest.mock import patch

import numpy as np
import paddle

from fastdeploy.model_executor.layers.attention.moba_attention_backend import (
PlasAttentionBackend,
PlasAttentionMetadata,
)


class DummyFDConfig:
def __init__(self):
self.cache_config = type("CacheConfig", (), {"block_size": 4})()
self.model_config = type("ModelConfig", (), {"max_model_len": 16, "head_dim": 8, "num_hidden_layers": 2})()
self.scheduler_config = type("SchedulerConfig", (), {"max_num_seqs": 2})()
self.plas_attention_config = type(
"PlasConfig",
(),
{
"plas_block_size": 4,
"plas_encoder_top_k_left": 1,
"plas_encoder_top_k_right": 1,
"plas_use_encoder_seq_limit": 1,
"plas_decoder_top_k_left": 1,
"plas_decoder_top_k_right": 1,
"plas_use_decoder_seq_limit": 1,
"plas_max_seq_length": 32,
},
)()
self.graph_opt_config = type("GraphOptConfig", (), {"cudagraph_capture_sizes": None})()
self.parallel_config = type("ParallelConfig", (), {"block_size": 4})()


class DummyForwardMeta:
def __init__(self, enc_seq=[4, 4], dec_seq=[2, 2]):
self.seq_lens_encoder = paddle.to_tensor(enc_seq, dtype="int64")
self.seq_lens_decoder = paddle.to_tensor(dec_seq, dtype="int64")
self.seq_lens_this_time = sum(dec_seq)
self.cu_seqlens_q = paddle.to_tensor([0] + list(np.cumsum(dec_seq)), dtype="int64")
self.caches = [paddle.zeros([2, 4, 8])] * 4
self.block_tables = None
self.rotary_embs = None


class DummyLayer:
def __init__(self, layer_id=0, cache_quant_type_str=None, plas_use_mlp=True):
self.layer_id = layer_id
self.qkv_bias = None
self.cache_k_block_means = None
self.cache_quant_type_str = cache_quant_type_str
self.plas_use_mlp = plas_use_mlp


class TestPlasAttentionBackend(unittest.TestCase):
@patch(
"fastdeploy.model_executor.layers.attention.moba_attention_backend.get_cur_cu_seq_len_k",
return_value=(paddle.to_tensor([1, 2]), paddle.to_tensor([1, 2]), paddle.to_tensor([2])),
)
def test_init_attention_metadata(self, mock_get_cu_seq):
# Test initialization of attention metadata
fd_config = DummyFDConfig()
backend = PlasAttentionBackend(fd_config, kv_num_heads=2, num_heads=2, head_dim=8)
forward_meta = DummyForwardMeta()
backend.init_attention_metadata(forward_meta)

self.assertIsInstance(backend.attention_metadata, PlasAttentionMetadata)
self.assertTrue(backend.attention_metadata.q_input.shape[0] > 0)

@patch(
"fastdeploy.model_executor.layers.attention.moba_attention_backend.get_cur_cu_seq_len_k",
return_value=(
paddle.to_tensor([0]), # cu_seq_q_pack
paddle.to_tensor([0]), # cu_seqlens_k
paddle.to_tensor([0]), # q_pack_tokens
),
)
def test_init_attention_metadata_empty_seq(self, mock_get_cu_seq):
# Test metadata init with empty sequences
fd_config = DummyFDConfig()
backend = PlasAttentionBackend(fd_config, kv_num_heads=2, num_heads=2, head_dim=8)
forward_meta = DummyForwardMeta()
forward_meta.seq_lens_encoder = paddle.to_tensor([0])
forward_meta.seq_lens_decoder = paddle.to_tensor([0])
forward_meta.cu_seqlens_q = paddle.to_tensor([0, 0])
backend.init_attention_metadata(forward_meta)

def test_get_kv_cache_shape(self):
# Test KV cache shape calculation under different quant types
fd_config = DummyFDConfig()
backend = PlasAttentionBackend(fd_config, kv_num_heads=2, num_heads=2, head_dim=8)

# Default
shape = backend.get_kv_cache_shape(max_num_blocks=2)
self.assertEqual(shape, (2, 2, 4, 8))

# int4_zp quant
shape_int4 = backend.get_kv_cache_shape(max_num_blocks=2, kv_cache_quant_type="int4_zp")
self.assertEqual(shape_int4, (2, 2, 4, 4))

# Other quant types
shape_other = backend.get_kv_cache_shape(max_num_blocks=2, kv_cache_quant_type="int8")
self.assertEqual(shape_other, (2, 2, 4, 8))

@patch(
"fastdeploy.model_executor.layers.attention.moba_attention_backend.moba_attention",
return_value=(paddle.ones([4, 4]), None),
)
@patch(
"fastdeploy.model_executor.layers.attention.moba_attention_backend.get_cur_cu_seq_len_k",
return_value=(paddle.to_tensor([1, 2]), paddle.to_tensor([1, 2]), paddle.to_tensor([2])),
)
def test_forward_mixed(self, mock_get_cu_seq, mock_moba_attention):
# Test mixed forward path with various layer configurations
fd_config = DummyFDConfig()
backend = PlasAttentionBackend(fd_config, kv_num_heads=2, num_heads=2, head_dim=8)
forward_meta = DummyForwardMeta()
backend.init_attention_metadata(forward_meta)

# Complete layer attributes
layer = DummyLayer()
qkv = paddle.zeros([4, 4])
compressed_kv = paddle.zeros([4, 4])
k_pe = paddle.zeros([4, 4])

out = backend.forward_mixed(
q=None,
k=None,
v=None,
qkv=qkv,
compressed_kv=compressed_kv,
k_pe=k_pe,
layer=layer,
forward_meta=forward_meta,
)
self.assertTrue((out.numpy() == 1).all())

# Layer with missing attributes, no cache quant
layer_missing = DummyLayer(layer_id=1, cache_quant_type_str=None)
out2 = backend.forward_mixed(
q=None,
k=None,
v=None,
qkv=qkv,
compressed_kv=compressed_kv,
k_pe=k_pe,
layer=layer_missing,
forward_meta=forward_meta,
)
self.assertTrue((out2.numpy() == 1).all())

# Layer with int4_zp cache quant
layer_int4 = DummyLayer(layer_id=1, cache_quant_type_str="int4_zp")
out3 = backend.forward_mixed(
q=None,
k=None,
v=None,
qkv=qkv,
compressed_kv=compressed_kv,
k_pe=k_pe,
layer=layer_int4,
forward_meta=forward_meta,
)
self.assertTrue((out3.numpy() == 1).all())


if __name__ == "__main__":
unittest.main()
93 changes: 93 additions & 0 deletions tests/output/test_pooler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import unittest

import numpy as np
import paddle

from fastdeploy.output.pooler import PoolerOutput, PoolingSequenceGroupOutput


class TestPoolingSequenceGroupOutput(unittest.TestCase):

def test_get_data_nbytes_tensor(self):
tensor = paddle.to_tensor([[1, 2], [3, 4]], dtype="float32")
output = PoolingSequenceGroupOutput(data=tensor)
expected = tensor.numel() * tensor.element_size()
self.assertEqual(output.get_data_nbytes(), expected)

def test_get_data_nbytes_numpy(self):
arr = np.ones((2, 3), dtype=np.float32)
output = PoolingSequenceGroupOutput(data=arr)
self.assertEqual(output.get_data_nbytes(), arr.nbytes)

def test_get_data_nbytes_none(self):
output = PoolingSequenceGroupOutput(data=None)
self.assertEqual(output.get_data_nbytes(), 0)

def test_repr(self):
output = PoolingSequenceGroupOutput(data=123)
self.assertIn("PoolingSequenceGroupOutput(data=", repr(output))

def test_eq_same(self):
output1 = PoolingSequenceGroupOutput(data=5)
output2 = PoolingSequenceGroupOutput(data=5)
self.assertTrue(output1 == output2)

def test_eq_diff(self):
output1 = PoolingSequenceGroupOutput(data=5)
output2 = PoolingSequenceGroupOutput(data=6)
self.assertFalse(output1 == output2)

def test_eq_not_implemented(self):
output = PoolingSequenceGroupOutput(data=5)
with self.assertRaises(NotImplementedError):
output == 123


class TestPoolerOutput(unittest.TestCase):

def test_get_data_nbytes_empty(self):
pooler = PoolerOutput(outputs=[])
self.assertEqual(pooler.get_data_nbytes(), 0)

def test_get_data_nbytes_multiple(self):
outputs = [
PoolingSequenceGroupOutput(data=paddle.to_tensor([1, 2])),
PoolingSequenceGroupOutput(data=np.ones(3, dtype=np.float32)),
]
pooler = PoolerOutput(outputs=outputs)
expected = outputs[0].get_data_nbytes() + outputs[1].get_data_nbytes()
self.assertEqual(pooler.get_data_nbytes(), expected)

def test_len_and_index(self):
outputs = [PoolingSequenceGroupOutput(data=1), PoolingSequenceGroupOutput(data=2)]
pooler = PoolerOutput(outputs=outputs)
self.assertEqual(len(pooler), 2)
self.assertIs(pooler[0], outputs[0])
self.assertIs(pooler[1], outputs[1])

def test_setitem(self):
outputs = [PoolingSequenceGroupOutput(data=1), PoolingSequenceGroupOutput(data=2)]
pooler = PoolerOutput(outputs=outputs)
new_output = PoolingSequenceGroupOutput(data=999)
pooler[1] = new_output
self.assertIs(pooler[1], new_output)

def test_eq_same(self):
outputs1 = [PoolingSequenceGroupOutput(data=1)]
outputs2 = [PoolingSequenceGroupOutput(data=1)]
pooler1 = PoolerOutput(outputs=outputs1)
pooler2 = PoolerOutput(outputs=outputs2)
self.assertTrue(pooler1 == pooler2)

def test_eq_diff(self):
pooler1 = PoolerOutput(outputs=[PoolingSequenceGroupOutput(data=1)])
pooler2 = PoolerOutput(outputs=[PoolingSequenceGroupOutput(data=2)])
self.assertFalse(pooler1 == pooler2)

def test_eq_type_mismatch(self):
pooler = PoolerOutput(outputs=[PoolingSequenceGroupOutput(data=1)])
self.assertFalse(pooler == 123)


if __name__ == "__main__":
unittest.main()
52 changes: 52 additions & 0 deletions tests/output/test_stream_transfer_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import unittest

import numpy as np

from fastdeploy.output.stream_transfer_data import DecoderState, StreamTransferData


class TestStreamTransferData(unittest.TestCase):

def test_dataclass_initialization(self):
tokens = np.array([1, 2, 3])
logprobs = np.array([0.1, 0.2, 0.3])
accept_tokens = np.array([1, 0, 1])
accept_num = np.array([2])
pooler_output = np.random.rand(2, 4)

data = StreamTransferData.__new__(StreamTransferData)
data.decoder_state = DecoderState.TEXT
data.batch_id = 42
data.tokens = tokens
data.speculaive_decoding = True
data.logprobs = logprobs
data.accept_tokens = accept_tokens
data.accept_num = accept_num
data.pooler_output = pooler_output

self.assertEqual(data.decoder_state, DecoderState.TEXT)
self.assertEqual(data.batch_id, 42)
self.assertTrue(np.array_equal(data.tokens, tokens))
self.assertTrue(data.speculaive_decoding)
self.assertTrue(np.array_equal(data.logprobs, logprobs))
self.assertTrue(np.array_equal(data.accept_tokens, accept_tokens))
self.assertTrue(np.array_equal(data.accept_num, accept_num))
self.assertTrue(np.array_equal(data.pooler_output, pooler_output))

def test_optional_fields_none(self):
data = StreamTransferData.__new__(StreamTransferData)
data.decoder_state = DecoderState.IMAGE
data.batch_id = 1

self.assertEqual(data.decoder_state, DecoderState.IMAGE)
self.assertEqual(data.batch_id, 1)
self.assertIsNone(getattr(data, "tokens", None))
self.assertFalse(getattr(data, "speculaive_decoding", False))
self.assertIsNone(getattr(data, "logprobs", None))
self.assertIsNone(getattr(data, "accept_tokens", None))
self.assertIsNone(getattr(data, "accept_num", None))
self.assertIsNone(getattr(data, "pooler_output", None))


if __name__ == "__main__":
unittest.main()