diff --git a/tests/engine/test_kv_cache_interface.py b/tests/engine/test_kv_cache_interface.py new file mode 100644 index 00000000000..69d0f9f8ad8 --- /dev/null +++ b/tests/engine/test_kv_cache_interface.py @@ -0,0 +1,60 @@ +import unittest + +from fastdeploy.engine.kv_cache_interface import AttentionSpec, KVCacheSpec + + +class TestKVCacheSpec(unittest.TestCase): + + def test_merge_valid(self): + # Create two valid KVCacheSpec objects with the same block_size and block_memory_used + spec1 = KVCacheSpec(block_size=256, block_memory_used=1024) + spec2 = KVCacheSpec(block_size=256, block_memory_used=1024) + + merged_spec = KVCacheSpec.merge([spec1, spec2]) + + self.assertEqual(merged_spec.block_size, spec1.block_size) + self.assertEqual(merged_spec.block_memory_used, spec1.block_memory_used) + + def test_merge_invalid(self): + spec1 = KVCacheSpec(block_size=256, block_memory_used=1024) + spec2 = KVCacheSpec(block_size=512, block_memory_used=1024) + + with self.assertRaises(AssertionError): + KVCacheSpec.merge([spec1, spec2]) + + def test_attention_spec_inheritance(self): + # Create an AttentionSpec object + attention_spec = AttentionSpec( + block_size=256, block_memory_used=1024, num_kv_heads=12, head_size=64, dtype="float32" + ) + + self.assertEqual(attention_spec.block_size, 256) + self.assertEqual(attention_spec.block_memory_used, 1024) + self.assertEqual(attention_spec.num_kv_heads, 12) + self.assertEqual(attention_spec.head_size, 64) + self.assertEqual(attention_spec.dtype, "float32") + + def test_attention_spec_merge(self): + # Create two AttentionSpec objects with the same attributes + spec1 = AttentionSpec(block_size=256, block_memory_used=1024, num_kv_heads=12, head_size=64, dtype="float32") + spec2 = AttentionSpec(block_size=256, block_memory_used=1024, num_kv_heads=12, head_size=64, dtype="float32") + + merged_spec = AttentionSpec.merge([spec1, spec2]) + + self.assertEqual(merged_spec.block_size, spec1.block_size) + self.assertEqual(merged_spec.block_memory_used, spec1.block_memory_used) + self.assertEqual(merged_spec.num_kv_heads, spec1.num_kv_heads) + self.assertEqual(merged_spec.head_size, spec1.head_size) + self.assertEqual(merged_spec.dtype, spec1.dtype) + + def test_attention_spec_merge_invalid(self): + # Create two AttentionSpec objects with different attributes + spec1 = AttentionSpec(block_size=256, block_memory_used=1024, num_kv_heads=12, head_size=64, dtype="float32") + spec2 = AttentionSpec(block_size=512, block_memory_used=1024, num_kv_heads=12, head_size=64, dtype="float32") + + with self.assertRaises(AssertionError): + AttentionSpec.merge([spec1, spec2]) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/layers/test_moba_attention_backend.py b/tests/layers/test_moba_attention_backend.py new file mode 100644 index 00000000000..7b0d24e6187 --- /dev/null +++ b/tests/layers/test_moba_attention_backend.py @@ -0,0 +1,169 @@ +import unittest +from unittest.mock import patch + +import numpy as np +import paddle + +from fastdeploy.model_executor.layers.attention.moba_attention_backend import ( + PlasAttentionBackend, + PlasAttentionMetadata, +) + + +class DummyFDConfig: + def __init__(self): + self.cache_config = type("CacheConfig", (), {"block_size": 4})() + self.model_config = type("ModelConfig", (), {"max_model_len": 16, "head_dim": 8, "num_hidden_layers": 2})() + self.scheduler_config = type("SchedulerConfig", (), {"max_num_seqs": 2})() + self.plas_attention_config = type( + "PlasConfig", + (), + { + "plas_block_size": 4, + "plas_encoder_top_k_left": 1, + "plas_encoder_top_k_right": 1, + "plas_use_encoder_seq_limit": 1, + "plas_decoder_top_k_left": 1, + "plas_decoder_top_k_right": 1, + "plas_use_decoder_seq_limit": 1, + "plas_max_seq_length": 32, + }, + )() + self.graph_opt_config = type("GraphOptConfig", (), {"cudagraph_capture_sizes": None})() + self.parallel_config = type("ParallelConfig", (), {"block_size": 4})() + + +class DummyForwardMeta: + def __init__(self, enc_seq=[4, 4], dec_seq=[2, 2]): + self.seq_lens_encoder = paddle.to_tensor(enc_seq, dtype="int64") + self.seq_lens_decoder = paddle.to_tensor(dec_seq, dtype="int64") + self.seq_lens_this_time = sum(dec_seq) + self.cu_seqlens_q = paddle.to_tensor([0] + list(np.cumsum(dec_seq)), dtype="int64") + self.caches = [paddle.zeros([2, 4, 8])] * 4 + self.block_tables = None + self.rotary_embs = None + + +class DummyLayer: + def __init__(self, layer_id=0, cache_quant_type_str=None, plas_use_mlp=True): + self.layer_id = layer_id + self.qkv_bias = None + self.cache_k_block_means = None + self.cache_quant_type_str = cache_quant_type_str + self.plas_use_mlp = plas_use_mlp + + +class TestPlasAttentionBackend(unittest.TestCase): + @patch( + "fastdeploy.model_executor.layers.attention.moba_attention_backend.get_cur_cu_seq_len_k", + return_value=(paddle.to_tensor([1, 2]), paddle.to_tensor([1, 2]), paddle.to_tensor([2])), + ) + def test_init_attention_metadata(self, mock_get_cu_seq): + # Test initialization of attention metadata + fd_config = DummyFDConfig() + backend = PlasAttentionBackend(fd_config, kv_num_heads=2, num_heads=2, head_dim=8) + forward_meta = DummyForwardMeta() + backend.init_attention_metadata(forward_meta) + + self.assertIsInstance(backend.attention_metadata, PlasAttentionMetadata) + self.assertTrue(backend.attention_metadata.q_input.shape[0] > 0) + + @patch( + "fastdeploy.model_executor.layers.attention.moba_attention_backend.get_cur_cu_seq_len_k", + return_value=( + paddle.to_tensor([0]), # cu_seq_q_pack + paddle.to_tensor([0]), # cu_seqlens_k + paddle.to_tensor([0]), # q_pack_tokens + ), + ) + def test_init_attention_metadata_empty_seq(self, mock_get_cu_seq): + # Test metadata init with empty sequences + fd_config = DummyFDConfig() + backend = PlasAttentionBackend(fd_config, kv_num_heads=2, num_heads=2, head_dim=8) + forward_meta = DummyForwardMeta() + forward_meta.seq_lens_encoder = paddle.to_tensor([0]) + forward_meta.seq_lens_decoder = paddle.to_tensor([0]) + forward_meta.cu_seqlens_q = paddle.to_tensor([0, 0]) + backend.init_attention_metadata(forward_meta) + + def test_get_kv_cache_shape(self): + # Test KV cache shape calculation under different quant types + fd_config = DummyFDConfig() + backend = PlasAttentionBackend(fd_config, kv_num_heads=2, num_heads=2, head_dim=8) + + # Default + shape = backend.get_kv_cache_shape(max_num_blocks=2) + self.assertEqual(shape, (2, 2, 4, 8)) + + # int4_zp quant + shape_int4 = backend.get_kv_cache_shape(max_num_blocks=2, kv_cache_quant_type="int4_zp") + self.assertEqual(shape_int4, (2, 2, 4, 4)) + + # Other quant types + shape_other = backend.get_kv_cache_shape(max_num_blocks=2, kv_cache_quant_type="int8") + self.assertEqual(shape_other, (2, 2, 4, 8)) + + @patch( + "fastdeploy.model_executor.layers.attention.moba_attention_backend.moba_attention", + return_value=(paddle.ones([4, 4]), None), + ) + @patch( + "fastdeploy.model_executor.layers.attention.moba_attention_backend.get_cur_cu_seq_len_k", + return_value=(paddle.to_tensor([1, 2]), paddle.to_tensor([1, 2]), paddle.to_tensor([2])), + ) + def test_forward_mixed(self, mock_get_cu_seq, mock_moba_attention): + # Test mixed forward path with various layer configurations + fd_config = DummyFDConfig() + backend = PlasAttentionBackend(fd_config, kv_num_heads=2, num_heads=2, head_dim=8) + forward_meta = DummyForwardMeta() + backend.init_attention_metadata(forward_meta) + + # Complete layer attributes + layer = DummyLayer() + qkv = paddle.zeros([4, 4]) + compressed_kv = paddle.zeros([4, 4]) + k_pe = paddle.zeros([4, 4]) + + out = backend.forward_mixed( + q=None, + k=None, + v=None, + qkv=qkv, + compressed_kv=compressed_kv, + k_pe=k_pe, + layer=layer, + forward_meta=forward_meta, + ) + self.assertTrue((out.numpy() == 1).all()) + + # Layer with missing attributes, no cache quant + layer_missing = DummyLayer(layer_id=1, cache_quant_type_str=None) + out2 = backend.forward_mixed( + q=None, + k=None, + v=None, + qkv=qkv, + compressed_kv=compressed_kv, + k_pe=k_pe, + layer=layer_missing, + forward_meta=forward_meta, + ) + self.assertTrue((out2.numpy() == 1).all()) + + # Layer with int4_zp cache quant + layer_int4 = DummyLayer(layer_id=1, cache_quant_type_str="int4_zp") + out3 = backend.forward_mixed( + q=None, + k=None, + v=None, + qkv=qkv, + compressed_kv=compressed_kv, + k_pe=k_pe, + layer=layer_int4, + forward_meta=forward_meta, + ) + self.assertTrue((out3.numpy() == 1).all()) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/output/test_pooler.py b/tests/output/test_pooler.py new file mode 100644 index 00000000000..42c33affc65 --- /dev/null +++ b/tests/output/test_pooler.py @@ -0,0 +1,93 @@ +import unittest + +import numpy as np +import paddle + +from fastdeploy.output.pooler import PoolerOutput, PoolingSequenceGroupOutput + + +class TestPoolingSequenceGroupOutput(unittest.TestCase): + + def test_get_data_nbytes_tensor(self): + tensor = paddle.to_tensor([[1, 2], [3, 4]], dtype="float32") + output = PoolingSequenceGroupOutput(data=tensor) + expected = tensor.numel() * tensor.element_size() + self.assertEqual(output.get_data_nbytes(), expected) + + def test_get_data_nbytes_numpy(self): + arr = np.ones((2, 3), dtype=np.float32) + output = PoolingSequenceGroupOutput(data=arr) + self.assertEqual(output.get_data_nbytes(), arr.nbytes) + + def test_get_data_nbytes_none(self): + output = PoolingSequenceGroupOutput(data=None) + self.assertEqual(output.get_data_nbytes(), 0) + + def test_repr(self): + output = PoolingSequenceGroupOutput(data=123) + self.assertIn("PoolingSequenceGroupOutput(data=", repr(output)) + + def test_eq_same(self): + output1 = PoolingSequenceGroupOutput(data=5) + output2 = PoolingSequenceGroupOutput(data=5) + self.assertTrue(output1 == output2) + + def test_eq_diff(self): + output1 = PoolingSequenceGroupOutput(data=5) + output2 = PoolingSequenceGroupOutput(data=6) + self.assertFalse(output1 == output2) + + def test_eq_not_implemented(self): + output = PoolingSequenceGroupOutput(data=5) + with self.assertRaises(NotImplementedError): + output == 123 + + +class TestPoolerOutput(unittest.TestCase): + + def test_get_data_nbytes_empty(self): + pooler = PoolerOutput(outputs=[]) + self.assertEqual(pooler.get_data_nbytes(), 0) + + def test_get_data_nbytes_multiple(self): + outputs = [ + PoolingSequenceGroupOutput(data=paddle.to_tensor([1, 2])), + PoolingSequenceGroupOutput(data=np.ones(3, dtype=np.float32)), + ] + pooler = PoolerOutput(outputs=outputs) + expected = outputs[0].get_data_nbytes() + outputs[1].get_data_nbytes() + self.assertEqual(pooler.get_data_nbytes(), expected) + + def test_len_and_index(self): + outputs = [PoolingSequenceGroupOutput(data=1), PoolingSequenceGroupOutput(data=2)] + pooler = PoolerOutput(outputs=outputs) + self.assertEqual(len(pooler), 2) + self.assertIs(pooler[0], outputs[0]) + self.assertIs(pooler[1], outputs[1]) + + def test_setitem(self): + outputs = [PoolingSequenceGroupOutput(data=1), PoolingSequenceGroupOutput(data=2)] + pooler = PoolerOutput(outputs=outputs) + new_output = PoolingSequenceGroupOutput(data=999) + pooler[1] = new_output + self.assertIs(pooler[1], new_output) + + def test_eq_same(self): + outputs1 = [PoolingSequenceGroupOutput(data=1)] + outputs2 = [PoolingSequenceGroupOutput(data=1)] + pooler1 = PoolerOutput(outputs=outputs1) + pooler2 = PoolerOutput(outputs=outputs2) + self.assertTrue(pooler1 == pooler2) + + def test_eq_diff(self): + pooler1 = PoolerOutput(outputs=[PoolingSequenceGroupOutput(data=1)]) + pooler2 = PoolerOutput(outputs=[PoolingSequenceGroupOutput(data=2)]) + self.assertFalse(pooler1 == pooler2) + + def test_eq_type_mismatch(self): + pooler = PoolerOutput(outputs=[PoolingSequenceGroupOutput(data=1)]) + self.assertFalse(pooler == 123) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/output/test_stream_transfer_data.py b/tests/output/test_stream_transfer_data.py new file mode 100644 index 00000000000..c35db771350 --- /dev/null +++ b/tests/output/test_stream_transfer_data.py @@ -0,0 +1,52 @@ +import unittest + +import numpy as np + +from fastdeploy.output.stream_transfer_data import DecoderState, StreamTransferData + + +class TestStreamTransferData(unittest.TestCase): + + def test_dataclass_initialization(self): + tokens = np.array([1, 2, 3]) + logprobs = np.array([0.1, 0.2, 0.3]) + accept_tokens = np.array([1, 0, 1]) + accept_num = np.array([2]) + pooler_output = np.random.rand(2, 4) + + data = StreamTransferData.__new__(StreamTransferData) + data.decoder_state = DecoderState.TEXT + data.batch_id = 42 + data.tokens = tokens + data.speculaive_decoding = True + data.logprobs = logprobs + data.accept_tokens = accept_tokens + data.accept_num = accept_num + data.pooler_output = pooler_output + + self.assertEqual(data.decoder_state, DecoderState.TEXT) + self.assertEqual(data.batch_id, 42) + self.assertTrue(np.array_equal(data.tokens, tokens)) + self.assertTrue(data.speculaive_decoding) + self.assertTrue(np.array_equal(data.logprobs, logprobs)) + self.assertTrue(np.array_equal(data.accept_tokens, accept_tokens)) + self.assertTrue(np.array_equal(data.accept_num, accept_num)) + self.assertTrue(np.array_equal(data.pooler_output, pooler_output)) + + def test_optional_fields_none(self): + data = StreamTransferData.__new__(StreamTransferData) + data.decoder_state = DecoderState.IMAGE + data.batch_id = 1 + + self.assertEqual(data.decoder_state, DecoderState.IMAGE) + self.assertEqual(data.batch_id, 1) + self.assertIsNone(getattr(data, "tokens", None)) + self.assertFalse(getattr(data, "speculaive_decoding", False)) + self.assertIsNone(getattr(data, "logprobs", None)) + self.assertIsNone(getattr(data, "accept_tokens", None)) + self.assertIsNone(getattr(data, "accept_num", None)) + self.assertIsNone(getattr(data, "pooler_output", None)) + + +if __name__ == "__main__": + unittest.main()