diff --git a/custom_ops/gpu_ops/cpp_extensions.cc b/custom_ops/gpu_ops/cpp_extensions.cc index 6ecc1ed1451..931938abd89 100644 --- a/custom_ops/gpu_ops/cpp_extensions.cc +++ b/custom_ops/gpu_ops/cpp_extensions.cc @@ -12,6 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include +#include +#include +#include +#include + #include "paddle/extension.h" #include "pybind11/pybind11.h" namespace py = pybind11; @@ -49,6 +55,51 @@ void cuda_host_free(uintptr_t ptr) { check_cuda_error(cudaFreeHost(reinterpret_cast(ptr))); } +// Create a shared memory region and register it with CUDA +// The pinned shm can be shared between processes +uintptr_t create_pinned_shm(const char* shm_name, size_t byte_size) { + int fd = shm_open(shm_name, O_CREAT | O_RDWR, 0666); + if (fd < 0) throw std::runtime_error("shm_open failed"); + + if (ftruncate(fd, byte_size) != 0) + throw std::runtime_error("ftruncate failed"); + + void* addr = + mmap(nullptr, byte_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); + if (addr == MAP_FAILED) throw std::runtime_error("mmap failed"); + + check_cuda_error(cudaHostRegister(addr, byte_size, cudaHostRegisterPortable)); + + close(fd); + return reinterpret_cast(addr); +} + +uintptr_t open_pinned_shm(const char* shm_name, size_t byte_size) { + int fd = shm_open(shm_name, O_RDWR, 0666); + if (fd < 0) throw std::runtime_error("shm_open failed"); + + void* addr = + mmap(nullptr, byte_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); + if (addr == MAP_FAILED) throw std::runtime_error("mmap failed"); + + check_cuda_error(cudaHostRegister(addr, byte_size, cudaHostRegisterPortable)); + + close(fd); + return reinterpret_cast(addr); +} + +void free_pinned_shm(const char* shm_name, + uintptr_t addr_uint, + size_t byte_size) { + void* addr = reinterpret_cast(addr_uint); + + check_cuda_error(cudaHostUnregister(addr)); + + if (munmap(addr, byte_size) != 0) throw std::runtime_error("munmap failed"); + + if (shm_unlink(shm_name) != 0) throw std::runtime_error("shm_unlink failed"); +} + std::vector AppendAttention( const paddle::Tensor& qkv, const paddle::Tensor& key_cache, @@ -1131,6 +1182,22 @@ PYBIND11_MODULE(fastdeploy_ops, m) { py::arg("flags") = cudaHostAllocDefault); m.def( "cuda_host_free", &cuda_host_free, "Free pinned memory", py::arg("ptr")); + m.def("create_pinned_shm", + &create_pinned_shm, + "Allocate pinned memory for supporting inter process communication", + py::arg("name"), + py::arg("byte_size")); + m.def("open_pinned_shm", + &open_pinned_shm, + "Open pinned memory which has been allocated by another process", + py::arg("name"), + py::arg("byte_size")); + m.def("free_pinned_shm", + &free_pinned_shm, + "Free pinned memory which supports inter process communication", + py::arg("name"), + py::arg("addr_uint"), + py::arg("byte_size")); py::register_exception(m, "CudaError"); /** * append_attention.cu diff --git a/examples/splitwise/README.md b/examples/splitwise/README.md new file mode 100644 index 00000000000..2b8d88ca061 --- /dev/null +++ b/examples/splitwise/README.md @@ -0,0 +1,36 @@ +# Run the Examples on NVIDIA CUDA GPU + +## Prepare the Environment +Refer to [NVIDIA CUDA GPU Installation](https://paddlepaddle.github.io/FastDeploy/get_started/installation/nvidia_gpu/) to pull the docker image, such as: +``` +docker pull ccr-2vdh3abv-pub.cnc.bj.baidubce.com/paddlepaddle/fastdeploy-cuda-12.6:2.3.0 +``` + +In the docker container, the [NVIDIA MLNX_OFED](https://network.nvidia.com/products/infiniband-drivers/linux/mlnx_ofed/) and [Redis](https://redis.io/) are pre-installed. + +## Build and install FastDeploy + +``` +git clone https://github.com/PaddlePaddle/FastDeploy +cd FastDeploy + +export ENABLE_FD_RDMA=1 + +# Argument 1: Whether to build wheel package (1 for yes, 0 for compile only) +# Argument 2: Python interpreter path +# Argument 3: Whether to compile CPU inference operators +# Argument 4: Target GPU architectures +bash build.sh 1 python false [80,90] +``` + +## Run the Examples + +Run the shell scripts in this directory, ```bash start_v0_tp1.sh``` or ```bash start_v1_tp1.sh``` + +Note that, there are two methods for splitwise deployment: +* v0: using splitwise_scheduler or dp_scheduler, in which the requests are scheduled in the engine. +* v1: using router, in which the requests are scheduled in the router. + +# Run the Examples on Kunlunxin XPU + +Coming soon... diff --git a/examples/splitwise/start_mixed.sh b/examples/splitwise/start_mixed.sh index c36027ac26a..37699caeb2b 100644 --- a/examples/splitwise/start_mixed.sh +++ b/examples/splitwise/start_mixed.sh @@ -3,33 +3,28 @@ set -e # Test mixed server + router -wait_for_health() { - local server_port=$1 - while true; do - status_code=$(curl -s -o /dev/null -w "%{http_code}" "http://0.0.0.0:${server_port}/health" || echo "000") - if [ "$status_code" -eq 200 ]; then - break - else - echo "Service not ready. Retrying in 2s..." - sleep 2 - fi - done -} - # prepare environment -MODEL_NAME="PaddlePaddle/ERNIE-4.5-0.3B-Paddle" - +export MODEL_NAME="PaddlePaddle/ERNIE-4.5-0.3B-Paddle" export FD_DEBUG=1 -export ENABLE_V1_KVCACHE_SCHEDULER=0 -export KVCACHE_GDRCOPY_FLUSH_ENABLE=1 unset http_proxy && unset https_proxy rm -rf log_* +source ./utils.sh S1_PORT=52400 S2_PORT=52500 ROUTER_PORT=52600 +ports=( + $S1_PORT $((S1_PORT + 1)) $((S1_PORT + 2)) $((S1_PORT + 3)) + $S2_PORT $((S2_PORT + 1)) $((S2_PORT + 2)) $((S2_PORT + 3)) + $ROUTER_PORT +) +check_ports "${ports[@]}" || { + echo "❌ Some ports are in use. Please release them." + exit 1 +} + # start router export FD_LOG_DIR="log_router" mkdir -p ${FD_LOG_DIR} @@ -37,7 +32,6 @@ mkdir -p ${FD_LOG_DIR} nohup python -m fastdeploy.router.launch \ --port ${ROUTER_PORT} \ 2>&1 >${FD_LOG_DIR}/nohup & -sleep 1 # start modelserver 0 export CUDA_VISIBLE_DEVICES=0 @@ -53,7 +47,6 @@ nohup python -m fastdeploy.entrypoints.openai.api_server \ --max-model-len 32768 \ --router "0.0.0.0:${ROUTER_PORT}" \ 2>&1 >${FD_LOG_DIR}/nohup & -sleep 1 wait_for_health ${S1_PORT} @@ -76,6 +69,7 @@ wait_for_health ${S2_PORT} # send request sleep 10 # make sure server is registered to router +echo "send request..." curl -X POST "http://0.0.0.0:${ROUTER_PORT}/v1/chat/completions" \ -H "Content-Type: application/json" \ -d '{ @@ -83,5 +77,5 @@ curl -X POST "http://0.0.0.0:${ROUTER_PORT}/v1/chat/completions" \ {"role": "user", "content": "hello"} ], "max_tokens": 20, - "stream": true + "stream": false }' diff --git a/examples/splitwise/start_v0_tp1.sh b/examples/splitwise/start_v0_tp1.sh index 42f585a5a71..c91bcf9d302 100644 --- a/examples/splitwise/start_v0_tp1.sh +++ b/examples/splitwise/start_v0_tp1.sh @@ -6,22 +6,8 @@ set -e # v0: using splitwise_scheduler or dp_scheduler # v1: using local_scheduler + router -wait_for_health() { - local server_port=$1 - while true; do - status_code=$(curl -s -o /dev/null -w "%{http_code}" "http://0.0.0.0:${server_port}/health" || echo "000") - if [ "$status_code" -eq 200 ]; then - break - else - echo "Service not ready. Retrying in 2s..." - sleep 2 - fi - done -} - # prepare environment -MODEL_NAME="PaddlePaddle/ERNIE-4.5-0.3B-Paddle" - +export MODEL_NAME="PaddlePaddle/ERNIE-4.5-0.3B-Paddle" export FD_DEBUG=1 export ENABLE_V1_KVCACHE_SCHEDULER=1 export KVCACHE_GDRCOPY_FLUSH_ENABLE=1 @@ -37,10 +23,21 @@ fi unset http_proxy && unset https_proxy rm -rf log_* +source ./utils.sh P_PORT=52400 D_PORT=52500 -REDIS_PORT=56388 +REDIS_PORT="${REDIS_PORT:-56388}" + +ports=( + $P_PORT $((P_PORT + 1)) $((P_PORT + 2)) $((P_PORT + 3)) $((P_PORT + 4)) $((P_PORT + 5)) + $D_PORT $((D_PORT + 1)) $((D_PORT + 2)) $((D_PORT + 3)) $((D_PORT + 4)) $((D_PORT + 5)) + $REDIS_PORT +) +check_ports "${ports[@]}" || { + echo "❌ Some ports are in use. Please release them." + exit 1 +} # start redis if ! redis-cli -p ${REDIS_PORT} ping &>/dev/null; then @@ -104,6 +101,7 @@ wait_for_health ${D_PORT} # send request sleep 10 # make sure server is registered to router +echo "send request..." curl -X POST "http://0.0.0.0:${D_PORT}/v1/chat/completions" \ -H "Content-Type: application/json" \ -d '{ @@ -111,5 +109,5 @@ curl -X POST "http://0.0.0.0:${D_PORT}/v1/chat/completions" \ {"role": "user", "content": "hello"} ], "max_tokens": 20, - "stream": true + "stream": false }' diff --git a/examples/splitwise/start_v0_tp2.sh b/examples/splitwise/start_v0_tp2.sh deleted file mode 100644 index cb2015ec4ac..00000000000 --- a/examples/splitwise/start_v0_tp2.sh +++ /dev/null @@ -1,111 +0,0 @@ -#!/bin/bash -set -e - -# Test splitwise deployment -# There are two methods for splitwise deployment: -# v0: using splitwise_scheduler or dp_scheduler -# v1: using local_scheduler + router - -wait_for_health() { - local server_port=$1 - while true; do - status_code=$(curl -s -o /dev/null -w "%{http_code}" "http://0.0.0.0:${server_port}/health" || echo "000") - if [ "$status_code" -eq 200 ]; then - break - else - echo "Service not ready. Retrying in 2s..." - sleep 2 - fi - done -} - -# prepare environment -MODEL_NAME="PaddlePaddle/ERNIE-4.5-0.3B-Paddle" - -export FD_DEBUG=1 -export ENABLE_V1_KVCACHE_SCHEDULER=0 -export KVCACHE_GDRCOPY_FLUSH_ENABLE=1 - -SCRIPT_PATH=$(readlink -f "$0") -SCRIPT_DIR=$(dirname "$SCRIPT_PATH") -export $(bash ${SCRIPT_DIR}/../../scripts/get_rdma_nics.sh gpu) -echo "KVCACHE_RDMA_NICS:${KVCACHE_RDMA_NICS}" -if [ -z "${KVCACHE_RDMA_NICS}" ]; then - echo "KVCACHE_RDMA_NICS is empty, please check the output of get_rdma_nics.sh" - exit 1 -fi - -unset http_proxy && unset https_proxy -rm -rf log_* - -# start redis -if ! redis-cli ping &>/dev/null; then - echo "Redis is not running. Starting redis-server..." - redis-server --daemonize yes - sleep 1 -else - echo "Redis is already running." -fi -sleep 1 - -# start prefill -export CUDA_VISIBLE_DEVICES=0,1 -export FD_LOG_DIR="log_prefill" -mkdir -p ${FD_LOG_DIR} - -nohup python -m fastdeploy.entrypoints.openai.api_server \ - --model ${MODEL_NAME} \ - --port 8100 \ - --metrics-port 8101 \ - --engine-worker-queue-port 8102 \ - --cache-queue-port 8103 \ - --max-model-len 32768 \ - --tensor-parallel-size 2 \ - --splitwise-role "prefill" \ - --cache-transfer-protocol "rdma,ipc" \ - --pd-comm-port 8104 \ - --rdma-comm-ports 8105,8106 \ - --scheduler-name "splitwise" \ - --scheduler-host "127.0.0.1" \ - --scheduler-port 6379 \ - --scheduler-ttl 9000 \ - 2>&1 >${FD_LOG_DIR}/nohup & -# wait_for_health 8100 - -# start decode -export CUDA_VISIBLE_DEVICES=2,3 -export FD_LOG_DIR="log_decode" -mkdir -p ${FD_LOG_DIR} - -nohup python -m fastdeploy.entrypoints.openai.api_server \ - --model ${MODEL_NAME} \ - --port 9000 \ - --metrics-port 9001 \ - --engine-worker-queue-port 9002 \ - --cache-queue-port 9003 \ - --max-model-len 32768 \ - --tensor-parallel-size 2 \ - --splitwise-role "decode" \ - --cache-transfer-protocol "rdma,ipc" \ - --pd-comm-port 9004 \ - --rdma-comm-ports 9005,9006 \ - --scheduler-name "splitwise" \ - --scheduler-host "127.0.0.1" \ - --scheduler-port 6379 \ - --scheduler-ttl 9000 \ - 2>&1 >${FD_LOG_DIR}/nohup & -wait_for_health 9000 - - -# send request -sleep 10 # make sure server is registered to router -port=9000 -curl -X POST "http://0.0.0.0:${port}/v1/chat/completions" \ --H "Content-Type: application/json" \ --d '{ - "messages": [ - {"role": "user", "content": "hello"} - ], - "max_tokens": 20, - "stream": true -}' diff --git a/examples/splitwise/start_v1_tp1.sh b/examples/splitwise/start_v1_tp1.sh index 31eca8ab77f..51f594f9e61 100644 --- a/examples/splitwise/start_v1_tp1.sh +++ b/examples/splitwise/start_v1_tp1.sh @@ -6,22 +6,8 @@ set -e # v0: using splitwise_scheduler or dp_scheduler # v1: using local_scheduler + router -wait_for_health() { - local server_port=$1 - while true; do - status_code=$(curl -s -o /dev/null -w "%{http_code}" "http://0.0.0.0:${server_port}/health" || echo "000") - if [ "$status_code" -eq 200 ]; then - break - else - echo "Service not ready. Retrying in 2s..." - sleep 2 - fi - done -} - # prepare environment -MODEL_NAME="PaddlePaddle/ERNIE-4.5-0.3B-Paddle" - +export MODEL_NAME="PaddlePaddle/ERNIE-4.5-0.3B-Paddle" export FD_DEBUG=1 export ENABLE_V1_KVCACHE_SCHEDULER=1 export KVCACHE_GDRCOPY_FLUSH_ENABLE=1 @@ -37,10 +23,21 @@ fi unset http_proxy && unset https_proxy rm -rf log_* +source ./utils.sh P_PORT=52400 D_PORT=52500 -ROUTER_PORT=52600 +ROUTER_PORT=52700 + +ports=( + $P_PORT $((P_PORT + 1)) $((P_PORT + 2)) $((P_PORT + 3)) $((P_PORT + 4)) $((P_PORT + 5)) + $D_PORT $((D_PORT + 1)) $((D_PORT + 2)) $((D_PORT + 3)) $((D_PORT + 4)) $((D_PORT + 5)) + $ROUTER_PORT +) +check_ports "${ports[@]}" || { + echo "❌ Some ports are in use. Please release them." + exit 1 +} # start router export FD_LOG_DIR="log_router" @@ -50,7 +47,6 @@ nohup python -m fastdeploy.router.launch \ --port ${ROUTER_PORT} \ --splitwise \ 2>&1 >${FD_LOG_DIR}/nohup & -sleep 1 # start prefill export CUDA_VISIBLE_DEVICES=0 @@ -97,12 +93,13 @@ wait_for_health ${D_PORT} # send request sleep 10 # make sure server is registered to router +echo "send request..." curl -X POST "http://0.0.0.0:${ROUTER_PORT}/v1/chat/completions" \ -H "Content-Type: application/json" \ -d '{ "messages": [ {"role": "user", "content": "hello"} ], - "max_tokens": 20, - "stream": true + "max_tokens": 100, + "stream": false }' diff --git a/examples/splitwise/start_v1_tp2.sh b/examples/splitwise/start_v1_tp2.sh deleted file mode 100644 index c58a8a9cead..00000000000 --- a/examples/splitwise/start_v1_tp2.sh +++ /dev/null @@ -1,110 +0,0 @@ -#!/bin/bash -set -e - -# Test splitwise deployment -# There are two methods for splitwise deployment: -# v0: using splitwise_scheduler or dp_scheduler -# v1: using local_scheduler + router - -wait_for_health() { - local server_port=$1 - while true; do - status_code=$(curl -s -o /dev/null -w "%{http_code}" "http://0.0.0.0:${server_port}/health" || echo "000") - if [ "$status_code" -eq 200 ]; then - break - else - echo "Service not ready. Retrying in 2s..." - sleep 2 - fi - done -} - -# prepare environment -MODEL_NAME="PaddlePaddle/ERNIE-4.5-0.3B-Paddle" - -export FD_DEBUG=1 -export ENABLE_V1_KVCACHE_SCHEDULER=0 -export KVCACHE_GDRCOPY_FLUSH_ENABLE=1 - -SCRIPT_PATH=$(readlink -f "$0") -SCRIPT_DIR=$(dirname "$SCRIPT_PATH") -export $(bash ${SCRIPT_DIR}/../../scripts/get_rdma_nics.sh gpu) -echo "KVCACHE_RDMA_NICS:${KVCACHE_RDMA_NICS}" -if [ -z "${KVCACHE_RDMA_NICS}" ]; then - echo "KVCACHE_RDMA_NICS is empty, please check the output of get_rdma_nics.sh" - exit 1 -fi - -unset http_proxy && unset https_proxy -rm -rf log_* - -# start router -export FD_LOG_DIR="log_router" -mkdir -p ${FD_LOG_DIR} - -echo "start router" -router_port=9000 -nohup python -m fastdeploy.router.launch \ - --port ${router_port} \ - --splitwise \ - 2>&1 >${FD_LOG_DIR}/nohup & -sleep 1 - -# start prefill -export CUDA_VISIBLE_DEVICES=0,1 -export FD_LOG_DIR="log_prefill" -mkdir -p ${FD_LOG_DIR} - -echo "start prefill" -nohup python -m fastdeploy.entrypoints.openai.api_server \ - --model ${MODEL_NAME} \ - --port 8100 \ - --metrics-port 8101 \ - --engine-worker-queue-port 8102 \ - --cache-queue-port 8103 \ - --tensor-parallel-size 2 \ - --max-model-len 32768 \ - --splitwise-role "prefill" \ - --pd-comm-port 8104 \ - --rdma-comm-ports 8105,8106 \ - --router "0.0.0.0:${router_port}" \ - 2>&1 >${FD_LOG_DIR}/nohup & - -# wait_for_health 8100 - -# start decode -export CUDA_VISIBLE_DEVICES=2,3 -export FD_LOG_DIR="log_decode" -mkdir -p ${FD_LOG_DIR} - -echo "start decode" -nohup python -m fastdeploy.entrypoints.openai.api_server \ - --model ${MODEL_NAME} \ - --port 8200 \ - --metrics-port 8201 \ - --engine-worker-queue-port 8202 \ - --cache-queue-port 8203 \ - --max-model-len 32768 \ - --tensor-parallel-size 2 \ - --splitwise-role "decode" \ - --pd-comm-port 8204 \ - --rdma-comm-ports 8205,8206 \ - --router "0.0.0.0:${router_port}" \ - 2>&1 >${FD_LOG_DIR}/nohup & - -wait_for_health 8200 - - - -# send request -sleep 10 # make sure server is registered to router -port=9000 -curl -X POST "http://0.0.0.0:${port}/v1/chat/completions" \ --H "Content-Type: application/json" \ --d '{ - "messages": [ - {"role": "user", "content": "hello"} - ], - "max_tokens": 20, - "stream": true -}' diff --git a/examples/splitwise/stop.sh b/examples/splitwise/stop.sh index 5b0f13c5d95..943efa12c58 100644 --- a/examples/splitwise/stop.sh +++ b/examples/splitwise/stop.sh @@ -1,7 +1,6 @@ pkill -9 -f python pkill -9 -f fastdeploy -pkill -f -9 gunicorn +pkill -9 -f gunicorn +pkill -9 -f redis-server -if redis-cli ping >/dev/null 2>&1; then - redis-cli shutdown -fi +sleep 1 diff --git a/examples/splitwise/utils.sh b/examples/splitwise/utils.sh new file mode 100644 index 00000000000..b02784ac046 --- /dev/null +++ b/examples/splitwise/utils.sh @@ -0,0 +1,24 @@ +#!/bin/bash + +check_ports() { + for port in "$@"; do + if ss -tuln | grep -q ":$port "; then + echo "❌ Port $port is already in use" + return 1 + fi + done + return 0 +} + +wait_for_health() { + local server_port=$1 + while true; do + status_code=$(curl -s -o /dev/null -w "%{http_code}" "http://0.0.0.0:${server_port}/health" || echo "000") + if [ "$status_code" -eq 200 ]; then + break + else + echo "Service not ready. Retrying in 4s..." + sleep 4 + fi + done +} diff --git a/fastdeploy/cache_manager/cache_data.py b/fastdeploy/cache_manager/cache_data.py index 631f5efb05a..e8b49d59396 100644 --- a/fastdeploy/cache_manager/cache_data.py +++ b/fastdeploy/cache_manager/cache_data.py @@ -42,6 +42,7 @@ class CacheStatus(Enum): SWAP2CPU = 1 SWAP2GPU = 2 CPU = 3 + SPLITWISE_CPU2GPU = 4 class BlockNode: diff --git a/fastdeploy/cache_manager/cache_messager.py b/fastdeploy/cache_manager/cache_messager.py index dc3d64099a8..bdd2991ce5b 100644 --- a/fastdeploy/cache_manager/cache_messager.py +++ b/fastdeploy/cache_manager/cache_messager.py @@ -32,7 +32,11 @@ IPCSignal, shared_memory_exists, ) -from fastdeploy.model_executor.ops.gpu import get_output_kv_signal, set_data_ipc +from fastdeploy.model_executor.ops.gpu import ( + create_pinned_shm, + get_output_kv_signal, + set_data_ipc, +) from fastdeploy.utils import envs, get_logger logger = get_logger("cache_messager", "cache_messager.log") @@ -64,7 +68,6 @@ def parse_args(): help="cache transfer protocol, only surport ipc now", ) parser.add_argument("--pod_ip", type=str, default="0.0.0.0", help="pod ip") - parser.add_argument("--cache_queue_port", type=int, default=9924, help="cache queue port") parser.add_argument( "--engine_worker_queue_port", type=int, @@ -72,11 +75,17 @@ def parse_args(): help="engine worker queue port", ) parser.add_argument( - "--cache_dtype", + "--splitwise_cache_buffer_block_num", + type=int, + default=0, + help="The block num of cpu buffer to receive cache from prefill", + ) + parser.add_argument( + "--cache_saved_dtype", type=str, default="bfloat16", choices=["uint8", "bfloat16"], - help="cache dtype", + help="cache saved dtype", ) parser.add_argument( "--speculative_config", @@ -396,6 +405,9 @@ def __init__( engine_worker_queue_port, local_data_parallel_id, gpu_cache_kvs, + splitwise_cache_buffer_block_num, + splitwise_cache_buffer_ptrs, + bytes_per_block, rank, nranks, num_layers, @@ -409,8 +421,13 @@ def __init__( Args: splitwise_role (str): splitwise_role only can be 'prefill' or 'decode'. transfer_protocol (str): support ipc and rdma + pod_ip (str): pod ip engine_worker_queue_port (int): engine_worker_queue port gpu_cache_kvs (dict): GPU kv cache + splitwise_cache_buffer_block_num (int): number of blocks in each splitwise cpu buffer + splitwise_cache_buffer_ptrs (dict): tensor ptrs for splitwise cpu buffer. + If set, then use cpu buffer to receive cache from prefill. + bytes_per_block (int): bytes per block for cache kv rank (int): current rank nranks (int): global rank number num_layers (int): model layer number @@ -424,6 +441,14 @@ def __init__( self.gpu_cache_kvs = gpu_cache_kvs self.rank = rank self.nranks = nranks + self.bytes_per_block = bytes_per_block + self.block_size = block_size + self.splitwise_cache_buffer_block_num = splitwise_cache_buffer_block_num + if self.splitwise_cache_buffer_block_num > 0: + assert ( + splitwise_cache_buffer_ptrs is not None + ), "splitwise_cache_buffer_ptrs must be set when splitwise_cache_buffer_block_num > 0" + if not envs.FD_ENGINE_TASK_QUEUE_WITH_SHM: address = (pod_ip, engine_worker_queue_port) else: @@ -435,63 +460,65 @@ def __init__( client_id=self.rank, local_data_parallel_id=local_data_parallel_id, ) - self.block_size = block_size - transfer_protocol = transfer_protocol.split(",") - - logger.info(f"splitwise role: {splitwise_role}, {transfer_protocol}" f"rank: {rank}") # 1. initialize the cache_k_ptr_list and cache_v_ptr_list self.num_layers = num_layers - cache_k_ptr_list = [] - cache_v_ptr_list = [] - cache_k = [] - cache_v = [] + self.gpu_cache_k_tensors = [] + self.gpu_cache_v_tensors = [] + self.gpu_cache_k_ptrs = [] + self.gpu_cache_v_ptrs = [] + self.splitwise_cache_k_ptrs = [] + self.splitwise_cache_v_ptrs = [] self.messager = {} + for layer_idx in range(self.num_layers): key_cache = self.gpu_cache_kvs[f"key_caches_{layer_idx}_rank{self.rank}_device{gpu_id}"] val_cache = self.gpu_cache_kvs[f"value_caches_{layer_idx}_rank{self.rank}_device{gpu_id}"] - cache_k.append(key_cache) - cache_v.append(val_cache) - cache_k_ptr_list.append(key_cache.data_ptr()) - cache_v_ptr_list.append(val_cache.data_ptr()) - cache_k_ptr_list = np.array(cache_k_ptr_list) - cache_v_ptr_list = np.array(cache_v_ptr_list) - - # 2. initialize the block_bytes - cache_shape = key_cache.shape - max_block_num = cache_shape[0] - block_bytes = math.prod(cache_shape[1:]) - if key_cache.dtype == paddle.bfloat16: - block_bytes *= 2 - logger.info( - f"layers {num_layers} cache_shape: {cache_shape}, max_block_num: {max_block_num}, " - f"block_bytes: {block_bytes}, dtype: {key_cache.dtype}" - ) - self.block_bytes = block_bytes - - # 3. initialize the messager + self.gpu_cache_k_tensors.append(key_cache) + self.gpu_cache_v_tensors.append(val_cache) + self.gpu_cache_k_ptrs.append(key_cache.data_ptr()) + self.gpu_cache_v_ptrs.append(val_cache.data_ptr()) + + if splitwise_cache_buffer_block_num > 0: + logger.debug("use cpu buffer to receive cache from prefill") + for layer_idx in range(self.num_layers): + k_ptr = splitwise_cache_buffer_ptrs[f"key_layer{layer_idx}"] + v_ptr = splitwise_cache_buffer_ptrs[f"value_layer{layer_idx}"] + self.splitwise_cache_k_ptrs.append(k_ptr) + self.splitwise_cache_v_ptrs.append(v_ptr) + + # 2. initialize the messager + transfer_protocol = transfer_protocol.split(",") + logger.info(f"splitwise role: {splitwise_role}, {transfer_protocol}, rank: {rank}") for protocol in transfer_protocol: if protocol == "ipc": self.messager[protocol] = IPCCommManager( self.rank, gpu_id, - cache_k, - cache_v, + self.gpu_cache_k_tensors, + self.gpu_cache_v_tensors, ) - local_device_id = int(str(cache_k[0].place)[-2]) + local_device_id = int(str(self.gpu_cache_k_tensors[0].place)[-2]) logger.info(f"done create ipc_comm with local_device_id:{local_device_id}, ") elif protocol == "rdma": logger.info(f"splitwise_role rdma: {self.splitwise_role}, rank: {self.rank}, gpu_id: {gpu_id}") - + if splitwise_cache_buffer_block_num > 0: + register_k_ptrs = np.array(self.splitwise_cache_k_ptrs) + register_v_ptrs = np.array(self.splitwise_cache_v_ptrs) + register_blocks_num = splitwise_cache_buffer_block_num + else: + register_k_ptrs = np.array(self.gpu_cache_k_ptrs) + register_v_ptrs = np.array(self.gpu_cache_v_ptrs) + register_blocks_num = self.gpu_cache_k_tensors[0].shape[0] self.messager[protocol] = RDMACommManager( splitwise_role, rank, gpu_id, - cache_k_ptr_list, - cache_v_ptr_list, - max_block_num, - block_bytes, + register_k_ptrs, + register_v_ptrs, + register_blocks_num, + bytes_per_block, rdma_port, ) @@ -541,11 +568,14 @@ def _add_cache_task_thread(self): current_info["sended_layer_id"] = -1 current_info["sended_block_num"] = current_info["decode_cached_tokens"] // self.block_size current_info["status"] = "init" - logger.info(f"Get cache info from P: finish add cache task: {current_info}") + logger.info(f"Get cache info from D, finish add cache task: {current_info}") self.cache_info[info["request_id"]] = current_info self.idx_cache_task_dict[current_info["current_id"]] = current_info + + # TODO: create connection in advance + else: - logger.info(f"Get cache info from D: {info}") + logger.info(f"Get cache info from P: {info}") self.cache_info[info["request_id"]] = info if finished_add_cache_task_req_ids: @@ -661,7 +691,7 @@ def prefill_layerwise_send_cache_thread(self): cost_time = tok - tic block_num = len(src_block_ids) avg_time_per_block = cost_time * 1000 / block_num # ms - send_cache_speed = block_num * self.block_bytes / 1073741824 / cost_time # GB/s + send_cache_speed = block_num * self.bytes_per_block / 1073741824 / cost_time # GB/s logger.debug( f"finish write cache for a layer, {req_id}, {layer_idx}, {target_ip}, {target_id}," f"block_num: {block_num}, send_cache_speed(GB/s): {round(send_cache_speed, 5)}," @@ -764,7 +794,7 @@ def main(): device = args.device_id rank = args.rank paddle.set_device(f"gpu:{device}") - cache_type = args.cache_dtype + cache_saved_dtype = args.cache_saved_dtype speculative_config = SpeculativeConfig(args.speculative_config) num_extra_layers = speculative_config.num_extra_cache_layer key_cache_shape_list = [int(i) for i in args.key_cache_shape.split(",")] @@ -776,6 +806,15 @@ def main(): gpu_cache_kvs = {} gpu_cache_k_tensors = [] gpu_cache_v_tensors = [] + splitwise_cache_buffer_ptrs = {} + bytes_per_block = None # NOTE: key and value have the same shape and dtype + + if args.cache_saved_dtype == "bfloat16": + byte_size = 2 + elif args.cache_saved_dtype == "uint8": + byte_size = 1 + else: + raise ValueError(f"Unsupported cache dtype: {args.cache_saved_dtype}") logger.info(f"[rank {rank}/{args.mp_num}] Initializing kv cache for all layers.") for i in range(args.num_layers + num_extra_layers): @@ -798,10 +837,13 @@ def main(): f"[rank {rank}/{args.mp_num}] ..creating kv cache for layer {i}: {key_cache_shape} {value_cache_shape}" ) + if bytes_per_block is None: + bytes_per_block = math.prod(key_cache_shape_list[1:]) * byte_size + gpu_cache_kvs[f"key_caches_{i}_rank{rank}_device{device}"] = paddle.full( shape=key_cache_shape, fill_value=0, - dtype=cache_type, + dtype=cache_saved_dtype, ) gpu_cache_k_tensors.append(gpu_cache_kvs[f"key_caches_{i}_rank{rank}_device{device}"]) set_data_ipc( @@ -812,7 +854,7 @@ def main(): gpu_cache_kvs[f"value_caches_{i}_rank{rank}_device{device}"] = paddle.full( shape=value_cache_shape, fill_value=0, - dtype=cache_type, + dtype=cache_saved_dtype, ) gpu_cache_v_tensors.append(gpu_cache_kvs[f"value_caches_{i}_rank{rank}_device{device}"]) @@ -820,6 +862,25 @@ def main(): gpu_cache_kvs[f"value_caches_{i}_rank{rank}_device{device}"], f"value_caches_{i}_rank{rank}.device{device}", ) + + if args.splitwise_role == "decode" and args.splitwise_cache_buffer_block_num > 0: + logger.info(f"[rank {rank}/{args.mp_num}] Allocate splitwise cpu buffer for rdma register.") + for i in range(args.num_layers + num_extra_layers): + if i < args.num_layers: + num_blocks = args.splitwise_cache_buffer_block_num + else: + num_blocks = (num_extra_layer_gpu_blocks / total_gpu_blocks) * args.splitwise_cache_buffer_block_num + cache_shape = [num_blocks] + key_cache_shape_list[1:] + cache_bytes = num_blocks * bytes_per_block + logger.info( + f"[rank {rank}/{args.mp_num}] ..creating splitwise cpu buffer cache for layer {i}: " + f"shape: {cache_shape}, dtype: {cache_saved_dtype}, cache_bytes: {cache_bytes}" + ) + key_name = f"key_cpu_caches_{i}_rank{rank}" + val_name = f"value_cpu_caches_{i}_rank{rank}" + splitwise_cache_buffer_ptrs[f"key_layer{i}"] = create_pinned_shm(key_name, cache_bytes) + splitwise_cache_buffer_ptrs[f"value_layer{i}"] = create_pinned_shm(val_name, cache_bytes) + cache_kv_size_byte = sum([tmp.numel() * 1 for key, tmp in gpu_cache_kvs.items()]) logger.info(f"device :{device}") logger.info(f"cache_kv_size_byte : {cache_kv_size_byte}") @@ -833,6 +894,9 @@ def main(): engine_worker_queue_port=args.engine_worker_queue_port, local_data_parallel_id=args.local_data_parallel_id, gpu_cache_kvs=gpu_cache_kvs, + splitwise_cache_buffer_block_num=args.splitwise_cache_buffer_block_num, + splitwise_cache_buffer_ptrs=splitwise_cache_buffer_ptrs, + bytes_per_block=bytes_per_block, rank=rank, nranks=args.mp_num, num_layers=args.num_layers + num_extra_layers, @@ -878,4 +942,7 @@ def main(): logger.info("create cache messager...") logger.info(f"{args}") - main() + try: + main() + except Exception as e: + logger.error(f"Exception occurred in cache messager, error: {e}, traceback: {traceback.format_exc()}") diff --git a/fastdeploy/cache_manager/cache_transfer_manager.py b/fastdeploy/cache_manager/cache_transfer_manager.py index c9b6cd83f5b..bafc6c6e95b 100644 --- a/fastdeploy/cache_manager/cache_transfer_manager.py +++ b/fastdeploy/cache_manager/cache_transfer_manager.py @@ -32,6 +32,7 @@ cuda_host_alloc, cuda_host_free, memory_allocated, + open_pinned_shm, set_data_ipc, set_device, share_external_data_, @@ -60,7 +61,7 @@ def parse_args(): parser.add_argument("--num_layers", type=int, default=1, help="model num layers") parser.add_argument("--mp_num", type=int, default=1, help="number of model parallel") parser.add_argument( - "--cache_dtype", + "--cache_saved_dtype", type=str, default="bfloat16", choices=["uint8", "bfloat16"], @@ -78,6 +79,12 @@ def parse_args(): help="engine worker queue port", ) parser.add_argument("--num_cpu_blocks", type=int, default=4, help="cpu cache block number") + parser.add_argument( + "--splitwise_cache_buffer_block_num", + type=int, + default=0, + help="The block num of cpu buffer to receive cache from prefill", + ) parser.add_argument("--engine_pid", type=str, default=None, help="engine pid") parser.add_argument( "--protocol", @@ -120,6 +127,7 @@ def __init__(self, args): if args.value_cache_shape: self.value_cache_shape = [int(i) for i in args.value_cache_shape.split(",")] self.num_gpu_blocks = self.key_cache_shape[0] + self.bytes_per_block = None self.num_extra_layers = self.speculative_config.num_extra_cache_layer self.num_extra_layer_gpu_blocks = int(self.num_gpu_blocks * self.speculative_config.num_gpu_block_expand_ratio) @@ -159,10 +167,16 @@ def __init__(self, args): ) self.num_cpu_blocks = args.num_cpu_blocks + self.splitwise_cache_buffer_block_num = args.splitwise_cache_buffer_block_num + assert self.num_cpu_blocks <= 0 or self.splitwise_cache_buffer_block_num <= 0, ( + "Only one of num_cpu_blocks or splitwise_cache_buffer_block_num must be greater than zero. " + "In mixed or prefill, num_cpu_blocks is greater than 0 when prefix caching uses cpu buffer. " + "In decode, splitwise_cache_buffer_block_num is greater than 0 when using cpu buffer to receive cache." + "Note that the prefix caching is not supported yet in decode." + ) self._init_gpu_cache(args) - if self.num_cpu_blocks > 0: - self._init_cpu_cache(args) + self._init_cpu_cache(args) cache_task_broadcast_data = np.zeros(shape=[1], dtype=np.int32) self.cache_task_broadcast_signal = IPCSignal( @@ -227,17 +241,17 @@ def _init_gpu_cache(self, args): logger.info( f"[rank {self.rank}/{self.n_ranks}] ..creating kv cache for layer {i}: {key_cache_shape} {value_cache_shape}" ) - key_cache = paddle.full(shape=key_cache_shape, fill_value=0, dtype=args.cache_dtype) + key_cache = paddle.full(shape=key_cache_shape, fill_value=0, dtype=args.cache_saved_dtype) set_data_ipc(key_cache, key_name) if self.value_cache_shape: - val_cache = paddle.full(shape=value_cache_shape, fill_value=0, dtype=args.cache_dtype) + val_cache = paddle.full(shape=value_cache_shape, fill_value=0, dtype=args.cache_saved_dtype) set_data_ipc(val_cache, val_name) else: logger.info( f"[rank {self.rank}/{self.n_ranks}] ..attaching kv cache for layer {i}: {key_cache_shape} {value_cache_shape}" ) - key_cache = paddle.empty(shape=[], dtype=args.cache_dtype) - val_cache = paddle.empty(shape=[], dtype=args.cache_dtype) + key_cache = paddle.empty(shape=[], dtype=args.cache_saved_dtype) + val_cache = paddle.empty(shape=[], dtype=args.cache_saved_dtype) key_cache = share_external_data_(key_cache, key_name, key_cache_shape, True) if self.value_cache_shape: val_cache = share_external_data_(val_cache, val_name, value_cache_shape, True) @@ -258,40 +272,50 @@ def _init_gpu_cache(self, args): logger.info(f"[rank {self.rank}/{self.n_ranks}] done init cache (full) gmem alloc : {memory_allocated()}") def _init_cpu_cache(self, args): + def _allocate_or_open_cpu_cache(byte_size, name): + if self.num_cpu_blocks > 0: + return cuda_host_alloc(byte_size) + else: + # splitwise cpu cache buffer is allocated in cache messager process + return open_pinned_shm(name, byte_size) + + num_blocks = self.num_cpu_blocks if self.num_cpu_blocks > 0 else self.splitwise_cache_buffer_block_num + if num_blocks == 0: + logger.info(f"[rank {self.rank}/{self.n_ranks}] 💡 no swap space (cpu cache) is specified.") + self.swap_space_ready_signal.value[self.rank] = 1 + return + key_cache_size = self.key_cache_shape[1] * self.key_cache_shape[2] * self.key_cache_shape[3] if args.value_cache_shape: value_cache_size = self.value_cache_shape[1] * self.value_cache_shape[2] * self.value_cache_shape[3] else: value_cache_size = 0 - if args.cache_dtype == "bfloat16": - cache_bytes = 2 - elif args.cache_dtype == "uint8": - cache_bytes = 1 + if args.cache_saved_dtype == "bfloat16": + byte_size = 2 + elif args.cache_saved_dtype == "uint8": + byte_size = 1 else: - raise ValueError(f"Unsupported cache dtype: {args.cache_dtype}") - key_need_to_allocate_bytes = args.num_cpu_blocks * cache_bytes * key_cache_size - value_need_to_allocate_bytes = args.num_cpu_blocks * cache_bytes * value_cache_size - logger.info( - f"[rank {self.rank}/{self.n_ranks}] ..swap space size : {(key_need_to_allocate_bytes + value_need_to_allocate_bytes) / 1024 ** 3:.2f}GB" - ) - if args.num_cpu_blocks == 0: - logger.info(f"[rank {self.rank}/{self.n_ranks}] 💡 no swap space (cpu cache) is specified.") - self.swap_space_ready_signal.value[self.rank] = 1 - return + raise ValueError(f"Unsupported cache dtype: {args.cache_saved_dtype}") + + self.bytes_per_block = byte_size * key_cache_size + key_cache_bytes = num_blocks * self.bytes_per_block + value_cache_bytes = num_blocks * byte_size * value_cache_size + logger.info(f"[rank {self.rank}/{self.n_ranks}] Initializing swap space (cpu cache) for all layers.") paddle.set_device("cpu") self.k_dst_ptrs = [] self.v_dst_ptrs = [] for i in range(args.num_layers + self.num_extra_layers): - key_name = f"key_caches_{i}_rank{self.rank}" - val_name = f"value_caches_{i}_rank{self.rank}" + key_name = f"key_cpu_caches_{i}_rank{self.rank}" + val_name = f"value_cpu_caches_{i}_rank{self.rank}" logger.info( - f"[rank {self.rank}/{self.n_ranks}] ..creating cpu cache for layer {i}: {(key_need_to_allocate_bytes + value_need_to_allocate_bytes) / 1024 ** 3:.2f}GB" + f"[rank {self.rank}/{self.n_ranks}] ..allocate/open cpu cache for layer {i}: " + f"{(key_cache_bytes + value_cache_bytes) / 1024 ** 3:.2f}GB" ) - self.cpu_cache_kvs[key_name] = cuda_host_alloc(key_need_to_allocate_bytes) + self.cpu_cache_kvs[key_name] = _allocate_or_open_cpu_cache(key_cache_bytes, key_name) self.k_dst_ptrs.append(self.cpu_cache_kvs[key_name]) - if value_need_to_allocate_bytes > 0: - self.cpu_cache_kvs[val_name] = cuda_host_alloc(value_need_to_allocate_bytes) + if value_cache_bytes > 0: + self.cpu_cache_kvs[val_name] = _allocate_or_open_cpu_cache(value_cache_bytes, val_name) self.v_dst_ptrs.append(self.cpu_cache_kvs[val_name]) logger.info(f"[rank {self.rank}/{self.n_ranks}] ✅ swap space (cpu cache) is ready!") self.swap_space_ready_signal.value[self.rank] = 1 @@ -383,6 +407,7 @@ def do_data_transfer(self): self.cache_task_queue.barrier1.wait() if self.rank == 0: self.cache_task_queue.barrier1.reset() + if self.cache_task_broadcast_signal.value[0] == 1: data, read_finish = self.cache_task_queue.get_transfer_task() logger.debug(f"transfer data: get_transfer_task {data}") @@ -493,7 +518,7 @@ def _transfer_data( 0, ) - elif event_type.value == CacheStatus.SWAP2GPU.value: + elif event_type.value in (CacheStatus.SWAP2GPU.value, CacheStatus.SPLITWISE_CPU2GPU.value): swap_cache_all_layers( self.gpu_cache_k_tensors, self.k_dst_ptrs, @@ -517,14 +542,16 @@ def _transfer_data( f"transfer data: Get unexpected event type {event_type}, only SWAP2CPU and SWAP2GPU supported" ) except Exception as e: - logger.error(f"transfer data: error: {e}") + logger.error(f"transfer data: error: {e}, {str(traceback.format_exc())}.") raise e end_time = time.time() elasped_time = end_time - start_time + swap_speed = len(gpu_block_ids) * self.bytes_per_block / 1073741824 / elasped_time # GB/s logger.info( - f"transfer data: transfer_task_id {transfer_task_id} event_type {event_type}: " - + f"transfer {len(gpu_block_ids)} blocks done elapsed_time {elasped_time:.4f}" + f"finish transfer data: transfer_task_id {transfer_task_id}, blocks_num {len(gpu_block_ids)}," + + f"swap speed {swap_speed:.4f} GB/s, elapsed_time {elasped_time:.4f}" ) + return ( swap_node_ids, task_gpu_block_id, @@ -641,4 +668,7 @@ def main(): rank_id = args.rank + args.local_data_parallel_id * args.mp_num logger = get_logger("cache_transfer_manager", f"cache_transfer_manager_rank{rank_id}.log") set_device(args.device_id) - main() + try: + main() + except Exception as e: + logger.error(f"Exception occurred in cache transfer manager, error: {e}, traceback: {traceback.format_exc()}") diff --git a/fastdeploy/cache_manager/ops.py b/fastdeploy/cache_manager/ops.py index 260d275f5ce..0072b515ef9 100644 --- a/fastdeploy/cache_manager/ops.py +++ b/fastdeploy/cache_manager/ops.py @@ -4,8 +4,10 @@ if current_platform.is_cuda(): from fastdeploy.model_executor.ops.gpu import ( + create_pinned_shm, cuda_host_alloc, cuda_host_free, + open_pinned_shm, set_data_ipc, share_external_data, swap_cache_all_layers, @@ -23,6 +25,8 @@ ) unset_data_ipc = None + open_pinned_shm = None + create_pinned_shm = None memory_allocated = paddle.device.xpu.memory_allocated else: @@ -51,6 +55,8 @@ def share_external_data_(cache, cache_name, cache_shape, use_ipc): __all__ = [ "cuda_host_alloc", "cuda_host_free", + "open_pinned_shm", + "create_pinned_shm", "set_data_ipc", "share_external_data_", "swap_cache_all_layers", diff --git a/fastdeploy/cache_manager/prefix_cache_manager.py b/fastdeploy/cache_manager/prefix_cache_manager.py index 0b66bf24b26..dc201db600d 100644 --- a/fastdeploy/cache_manager/prefix_cache_manager.py +++ b/fastdeploy/cache_manager/prefix_cache_manager.py @@ -27,6 +27,7 @@ from collections import defaultdict from concurrent.futures import ThreadPoolExecutor from threading import Event, Lock +from typing import List, Union import numpy as np @@ -79,8 +80,14 @@ def __init__( self.cpu_free_block_list = list(range(self.num_cpu_blocks - 1, -1, -1)) else: self.cpu_free_block_list = [] + self.splitwise_cpu_free_block_list = [] + if self.cache_config.splitwise_cache_buffer_block_num > 0: + self.splitwise_cpu_free_block_list = list( + range(self.cache_config.splitwise_cache_buffer_block_num - 1, -1, -1) + ) heapq.heapify(self.gpu_free_block_list) heapq.heapify(self.cpu_free_block_list) + heapq.heapify(self.splitwise_cpu_free_block_list) self.key_cache_shape = [] self.val_cache_shape = [] @@ -172,15 +179,10 @@ def launch_cache_manager( """ launch_cache_manager function used to initialize the cache manager. """ - broadcast_cache_task_flag_array = np.zeros([1], dtype=np.int32) - - self.shm_cache_task_flag_broadcast = IPCSignal( - name="cache_task_broadcast_signal", - array=broadcast_cache_task_flag_array, - dtype=np.int32, - suffix=engine_worker_queue_port, - create=True, - ) + key_cache_shape, val_cache_shape = self._get_kv_cache_shape(cache_config.total_block_num) + key_cache_shape = ",".join([str(i) for i in key_cache_shape]) + val_cache_shape = ",".join([str(i) for i in val_cache_shape]) + logger.info(f"key_cache_shape {key_cache_shape} value_cache_shape {val_cache_shape}") self.cache_task_queue = EngineCacheQueue( address=(pod_ip, cache_config.cache_queue_port), @@ -191,15 +193,7 @@ def launch_cache_manager( local_data_parallel_id=self.local_data_parallel_id, ) - current_dir_path = os.path.split(os.path.abspath(__file__))[0] - filename = "cache_transfer_manager.py" - py_path = os.path.join(current_dir_path, filename) - cache_messager_processes = [] - key_cache_shape, val_cache_shape = self._get_kv_cache_shape(cache_config.total_block_num) - key_cache_shape = ",".join([str(i) for i in key_cache_shape]) - val_cache_shape = ",".join([str(i) for i in val_cache_shape]) - logger.info(f"key_cache_shape {key_cache_shape} value_cache_shape {val_cache_shape}") if self.enable_splitwise: cache_messager_processes = self.launch_cache_messager( cache_config, @@ -213,23 +207,14 @@ def launch_cache_manager( ) if cache_messager_processes is None: raise RuntimeError("Launch cache messager failed") - return [] - cache_ready_signal_data = np.zeros(shape=[tensor_parallel_size], dtype=np.int32) - self.cache_ready_signal = IPCSignal( - name="cache_ready_signal", - array=cache_ready_signal_data, - dtype=np.int32, - suffix=engine_worker_queue_port, - create=False, - ) - swap_space_ready_data = np.zeros(shape=[tensor_parallel_size], dtype=np.int32) - self.swap_space_ready_signal = IPCSignal( - name="swap_space_ready_signal", - array=swap_space_ready_data, + broadcast_cache_task_flag_array = np.zeros([1], dtype=np.int32) + self.shm_cache_task_flag_broadcast = IPCSignal( + name="cache_task_broadcast_signal", + array=broadcast_cache_task_flag_array, dtype=np.int32, suffix=engine_worker_queue_port, - create=False, + create=True, ) prefix_tree_status = np.zeros([1], dtype=np.int32) self.prefix_tree_status_signal = IPCSignal( @@ -240,61 +225,87 @@ def launch_cache_manager( create=False, ) - # Run command to launch cache transfer managers - log_dir = envs.FD_LOG_DIR cache_manager_processes = [] - for i in range(tensor_parallel_size): - launch_cmd = ( - "FLAGS_allocator_strategy=auto_growth CUDA_VISIBLE_DEVICES=0,1,2,3,4,5,6,7" - + " NCCL_MAX_NCHANNELS=1 NCCL_BUFFSIZE=0" - + f" FD_ENABLE_SWAP_SPACE_CLEARING={envs.FD_ENABLE_SWAP_SPACE_CLEARING}" - + f" {sys.executable} {py_path}" - + f" --device_id {int(device_ids[i])}" - + f" --rank {i}" - + f" --splitwise_role {self.splitwise_role}" - + f" --num_layers {cache_config.model_cfg.num_hidden_layers}" - + f" --mp_num {tensor_parallel_size}" - + f" --cache_dtype {cache_config.cache_dtype}" - + f" --key_cache_shape {key_cache_shape}" - + f" --value_cache_shape {val_cache_shape}" - + f" --cache_queue_port {cache_config.cache_queue_port}" - + f" --enable_splitwise {int(self.enable_splitwise)}" - + f" --pod_ip {pod_ip}" - + f" --engine_worker_queue_port {engine_worker_queue_port}" - + f" --num_cpu_blocks {cache_config.num_cpu_blocks}" - + f" --engine_pid {pid_suffix}" - + f" --protocol {cache_config.cache_transfer_protocol}" - + f" --local_data_parallel_id {self.local_data_parallel_id}" - + f" --rdma_port {cache_config.rdma_comm_ports[i] if cache_config.rdma_comm_ports is not None else '0'}" - + f" --speculative_config '{self.speculative_config.to_json_string()}'" - + (" --create_cache_tensor" if create_cache_tensor else "") - + f" >{log_dir}/launch_cache_transfer_manager_{int(device_ids[i])}.log 2>&1" + is_launch = ( + (self.splitwise_role == "mixed" and cache_config.enable_prefix_caching) + or ( + self.splitwise_role == "prefill" and cache_config.enable_hierarchical_cache and self.num_cpu_blocks > 0 + ) + or (self.splitwise_role == "decode" and cache_config.splitwise_cache_buffer_block_num > 0) + ) + if is_launch: + current_dir_path = os.path.split(os.path.abspath(__file__))[0] + filename = "cache_transfer_manager.py" + py_path = os.path.join(current_dir_path, filename) + + cache_ready_signal_data = np.zeros(shape=[tensor_parallel_size], dtype=np.int32) + self.cache_ready_signal = IPCSignal( + name="cache_ready_signal", + array=cache_ready_signal_data, + dtype=np.int32, + suffix=engine_worker_queue_port, + create=False, + ) + swap_space_ready_data = np.zeros(shape=[tensor_parallel_size], dtype=np.int32) + self.swap_space_ready_signal = IPCSignal( + name="swap_space_ready_signal", + array=swap_space_ready_data, + dtype=np.int32, + suffix=engine_worker_queue_port, + create=False, ) - logger.info(f"Launch cache transfer manager, command:{launch_cmd}") - cache_manager_processes.append(subprocess.Popen(launch_cmd, shell=True, preexec_fn=os.setsid)) - logger.info("PrefixCacheManager is waiting for kv cache to be initialized.") - while np.sum(self.cache_ready_signal.value) != tensor_parallel_size: - time.sleep(1) + log_dir = envs.FD_LOG_DIR + for i in range(tensor_parallel_size): + launch_cmd = ( + "FLAGS_allocator_strategy=auto_growth CUDA_VISIBLE_DEVICES=0,1,2,3,4,5,6,7" + + " NCCL_MAX_NCHANNELS=1 NCCL_BUFFSIZE=0" + + f" FD_ENABLE_SWAP_SPACE_CLEARING={envs.FD_ENABLE_SWAP_SPACE_CLEARING}" + + f" {sys.executable} {py_path}" + + f" --device_id {int(device_ids[i])}" + + f" --rank {i}" + + f" --splitwise_role {self.splitwise_role}" + + f" --num_layers {cache_config.model_cfg.num_hidden_layers}" + + f" --mp_num {tensor_parallel_size}" + + f" --cache_saved_dtype {cache_config.cache_saved_dtype}" + + f" --key_cache_shape {key_cache_shape}" + + f" --value_cache_shape {val_cache_shape}" + + f" --cache_queue_port {cache_config.cache_queue_port}" + + f" --enable_splitwise {int(self.enable_splitwise)}" + + f" --pod_ip {pod_ip}" + + f" --engine_worker_queue_port {engine_worker_queue_port}" + + f" --num_cpu_blocks {cache_config.num_cpu_blocks}" + + f" --splitwise_cache_buffer_block_num {cache_config.splitwise_cache_buffer_block_num}" + + f" --engine_pid {pid_suffix}" + + f" --protocol {cache_config.cache_transfer_protocol}" + + f" --local_data_parallel_id {self.local_data_parallel_id}" + + f" --rdma_port {cache_config.rdma_comm_ports[i] if cache_config.rdma_comm_ports is not None else '0'}" + + f" --speculative_config '{self.speculative_config.to_json_string()}'" + + (" --create_cache_tensor" if create_cache_tensor else "") + + f" >{log_dir}/launch_cache_transfer_manager_{int(device_ids[i])}.log 2>&1" + ) + logger.info(f"Launch cache transfer manager, command:{launch_cmd}") + cache_manager_processes.append(subprocess.Popen(launch_cmd, shell=True, preexec_fn=os.setsid)) + + logger.info("PrefixCacheManager is waiting for kv cache to be initialized.") + while np.sum(self.cache_ready_signal.value) != tensor_parallel_size: + time.sleep(1) - if cache_config.enable_hierarchical_cache and self.num_cpu_blocks > 0: + logger.info("PrefixCacheManager is waiting for swap kv cache to be initialized.") while np.sum(self.swap_space_ready_signal.value) != tensor_parallel_size: time.sleep(1) - exit_code = cache_manager_processes[-1].poll() - if exit_code is None: - logger.info("Launch cache transfer manager successful") - else: - logger.info( - "Launch cache transfer manager failed, see launch_cache_transfer_manager.log for more information" - ) + exit_code = cache_manager_processes[-1].poll() + if exit_code is None: + logger.info("Launch cache transfer manager successful") + else: + logger.info( + "Launch cache transfer manager failed, see launch_cache_transfer_manager.log for more information" + ) - # Start additional threads - if cache_config.enable_hierarchical_cache and self.num_cpu_blocks > 0: - logger.info("Enable hierarchical cache.") threading.Thread(target=self.recv_data_transfer_result).start() - if cache_config.enable_prefix_caching: - threading.Thread(target=self.clear_prefix_cache, daemon=True).start() + + threading.Thread(target=self.clear_prefix_cache, daemon=True).start() all_cache_processes = cache_messager_processes + cache_manager_processes return all_cache_processes @@ -338,12 +349,12 @@ def launch_cache_messager( + f" --splitwise_role {self.splitwise_role}" + f" --num_layers {cache_config.model_cfg.num_hidden_layers}" + f" --mp_num {tensor_parallel_size}" - + f" --cache_dtype {cache_config.cache_dtype}" + + f" --cache_saved_dtype {cache_config.cache_saved_dtype}" + f" --key_cache_shape {key_cache_shape}" + f" --value_cache_shape {value_cache_shape}" + f" --pod_ip {pod_ip}" - + f" --cache_queue_port {cache_config.cache_queue_port}" + f" --engine_worker_queue_port {engine_worker_queue_port}" + + f" --splitwise_cache_buffer_block_num {cache_config.splitwise_cache_buffer_block_num}" + f" --protocol {cache_config.cache_transfer_protocol}" + f" --local_data_parallel_id {self.local_data_parallel_id}" + f" --engine_pid {pid_suffix}" @@ -459,23 +470,69 @@ def recycle_cpu_blocks(self, cpu_block_ids): else: heapq.heappush(self.cpu_free_block_list, cpu_block_ids) + def can_allocate_splitwise_blocks(self, num_blocks: int): + """ + Check if num_blocks can be allocated on splitwise cpu buffer. + """ + return num_blocks <= len(self.splitwise_cpu_free_block_list) + + def allocate_splitwise_blocks(self, num_blocks: int): + """ + Allocate the block ids of splitwise cpu cache buffer. + """ + assert self.can_allocate_splitwise_blocks( + num_blocks + ), f"splitwise cpu free block num: {len(self.splitwise_cpu_free_block_list)} < needed number {num_blocks}" + allocated_block_ids = [heapq.heappop(self.splitwise_cpu_free_block_list) for _ in range(num_blocks)] + logger.debug( + f"allocate_splitwise_cpu_blocks: {allocated_block_ids}, " + f"len(self.splitwise_cpu_free_block_list) {len(self.splitwise_cpu_free_block_list)}" + ) + return allocated_block_ids + + def recycle_splitwise_blocks(self, block_ids: Union[int, List[int]]): + """ + Recycle the block ids of splitwise cpu cache buffer. + """ + logger.debug( + f"recycle_splitwise_blocks: {block_ids}, " + f"len(self.splitwise_cpu_free_block_list): {len(self.splitwise_cpu_free_block_list)}" + ) + if isinstance(block_ids, list): + for block_id in block_ids: + heapq.heappush(self.splitwise_cpu_free_block_list, block_id) + else: + heapq.heappush(self.splitwise_cpu_free_block_list, block_ids) + + def issue_splitwise_buffer_swap_task( + self, + request_id: str, + gpu_block_ids: List[int], + cpu_block_ids: List[int], + ): + """ + Swap splitwise cpu buffer to gpu cache. + # TODO: support async swap task + """ + self.issue_swap_task(request_id, gpu_block_ids, cpu_block_ids, CacheStatus.SPLITWISE_CPU2GPU, is_sync=True) + def issue_swap_task( self, transfer_task_id, - swap_node_ids, gpu_block_ids, cpu_block_ids, event_type, + swap_node_ids=None, is_sync=True, ): """ start data swap task args: transfer_task_id: transfer task id - swap_node_ids: to swap node id list gpu_block_ids: to swap gpu block id list cpu_block_ids: to swap cpu block id list event_type: CacheStatus.SWAP2GPU or CacheStatus.SWAP2CPU + swap_node_ids: to swap node id list is_sync: bool, whether to wait for the result of the swap task """ @@ -536,10 +593,10 @@ def _prepare_cpu_cache( logger.info(f"request_block_ids: req_id {req_id} issue_swap_task transfer_task_id {transfer_task_id}") self.issue_swap_task( transfer_task_id, - swap_node_ids, need_transfer_task_gpu_block_ids, need_transfer_task_cpu_block_ids, CacheStatus.SWAP2GPU, + swap_node_ids, True, ) @@ -1003,10 +1060,10 @@ def _evict_cache_async( ) self.issue_swap_task( transfer_task_id, - swap_node_ids, need_transfer_task_gpu_block_ids, need_transfer_task_cpu_block_ids, CacheStatus.SWAP2CPU, + swap_node_ids, True, ) @@ -1740,6 +1797,8 @@ def recv_data_transfer_result(self): if data is None: time.sleep(0.001) continue + + logger.debug(f"recv_data_transfer_result: start handling data {data}") ( swap_node_ids, task_gpu_block_id, @@ -1747,14 +1806,15 @@ def recv_data_transfer_result(self): event_type, transfer_task_id, ) = data - length = len(task_gpu_block_id) - for i in range(length): - self._handle_swap_result( - swap_node_ids[i], - task_gpu_block_id[i], - task_cpu_block_id[i], - event_type, - ) + if event_type.value != CacheStatus.SPLITWISE_CPU2GPU.value: + length = len(task_gpu_block_id) + for i in range(length): + self._handle_swap_result( + swap_node_ids[i], + task_gpu_block_id[i], + task_cpu_block_id[i], + event_type, + ) if transfer_task_id in self.task_swapping_event: self.task_swapping_event[transfer_task_id].set() logger.info( diff --git a/fastdeploy/cache_manager/utils.py b/fastdeploy/cache_manager/utils.py new file mode 100644 index 00000000000..08c3946b516 --- /dev/null +++ b/fastdeploy/cache_manager/utils.py @@ -0,0 +1,40 @@ +""" +# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License" +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" + + +def cache_byte_size(raw_cache_dtype): + """ + Convert the cache dtype to the corresponding byte size. + """ + if "int4" in raw_cache_dtype.lower() or "float4" in raw_cache_dtype.lower(): + byte_size = 0.5 + elif "int8" in raw_cache_dtype.lower() or "float8" in raw_cache_dtype.lower(): + byte_size = 1 + else: + byte_size = 2 + return byte_size + + +def convert_to_saved_dtype(raw_cache_dtype): + """ + Convert the input cache dtype to the real saved dtype. + """ + if "int4" in raw_cache_dtype.lower() or "float4" in raw_cache_dtype.lower(): + return "uint8" + elif "int8" in raw_cache_dtype.lower() or "float8" in raw_cache_dtype.lower(): + return "uint8" + else: + return raw_cache_dtype diff --git a/fastdeploy/config.py b/fastdeploy/config.py index 83af4ebdd50..538287a6041 100644 --- a/fastdeploy/config.py +++ b/fastdeploy/config.py @@ -29,6 +29,7 @@ import fastdeploy from fastdeploy import envs +from fastdeploy.cache_manager.utils import cache_byte_size, convert_to_saved_dtype from fastdeploy.model_executor.layers.quantization.quant_base import QuantConfigBase from fastdeploy.platforms import current_platform from fastdeploy.scheduler import SchedulerConfig @@ -1193,6 +1194,8 @@ def __init__(self, args): enable_prefix_caching (bool): Enable prefix caching. max_encoder_cache(int): Maximum number of tokens in the encoder cache. max_processor_cache(int): Maximum number of bytes in the processor cache. + splitwise_cache_buffer_size (float): The amount of CPU memory in decode to receive the cache from prefill (GB). + In splitwise deployment, decode uses cpu buffer to receive the cache from prefill. """ self.block_size = 64 self.gpu_memory_utilization = 0.9 @@ -1215,6 +1218,9 @@ def __init__(self, args): self.swap_space = None self.max_encoder_cache = None self.max_processor_cache = None + self.enable_splitwise_cache_buffer = False + self.splitwise_cache_buffer_size = 0 + self.splitwise_cache_buffer_block_num = 0 self.disable_chunked_mm_input = False for key, value in args.items(): if hasattr(self, key): @@ -1245,14 +1251,9 @@ def __init__(self, args): kv_num_head = self.model_cfg.num_attention_heads self.model_cfg.kv_num_head = kv_num_head # TODO check name - if "int4" in self.cache_dtype.lower() or "float4" in self.cache_dtype.lower(): - byte_size = 0.5 - self.cache_dtype = "uint8" - elif "int8" in self.cache_dtype.lower() or "float8" in self.cache_dtype.lower(): - self.cache_dtype = "uint8" - byte_size = 1 - else: - byte_size = 2 + byte_size = cache_byte_size(self.cache_dtype) + self.cache_saved_dtype = convert_to_saved_dtype(self.cache_dtype) + self.each_token_cache_space = int( self.model_cfg.num_hidden_layers * kv_num_head * self.model_cfg.head_dim * byte_size ) @@ -1265,6 +1266,21 @@ def __init__(self, args): * byte_size ) + if self.splitwise_cache_buffer_size is not None: + block_num = int(self.splitwise_cache_buffer_size * 1024**3 / self.bytes_per_block) + if block_num > 0: + self.enable_splitwise_cache_buffer = True + self.splitwise_cache_buffer_block_num = block_num + logger.info( + f"splitwise_cache_buffer_size: {self.splitwise_cache_buffer_size} GB, " + f"splitwise_cache_buffer_block_num: {self.splitwise_cache_buffer_block_num}" + ) + else: + logger.warning( + f"splitwise_cache_buffer_size ({self.splitwise_cache_buffer_size}) " + "is too small, disable it!" + ) + if self.swap_space is None: self.num_cpu_blocks = 0 else: @@ -1280,6 +1296,10 @@ def _verify_args(self): raise ValueError("GPU memory utilization must be less than 1.0. Got " f"{self.gpu_memory_utilization}.") if self.kv_cache_ratio > 1.0: raise ValueError("KV cache ratio must be less than 1.0. Got " f"{self.kv_cache_ratio}.") + if self.splitwise_cache_buffer_size is not None and self.splitwise_cache_buffer_size < 0.0: + raise ValueError( + "splitwise_cache_buffer_size must be greater than 0.0. Got " f"{self.splitwise_cache_buffer_size}." + ) def postprocess(self, num_total_tokens, number_of_tasks): """ diff --git a/fastdeploy/demo/offline_demo.py b/fastdeploy/demo/offline_demo.py index 137960b3ed0..a87dfc4098a 100644 --- a/fastdeploy/demo/offline_demo.py +++ b/fastdeploy/demo/offline_demo.py @@ -17,10 +17,10 @@ from fastdeploy.engine.sampling_params import SamplingParams from fastdeploy.entrypoints.llm import LLM -model_name_or_path = "/workspace/ERNIE-4.5-0.3B-Paddle" +model_name_or_path = "PaddlePaddle/ERNIE-4.5-0.3B-Paddle" # 超参设置 -sampling_params = SamplingParams(temperature=0.1, max_tokens=30, prompt_logprobs=100) +sampling_params = SamplingParams(temperature=0.1, max_tokens=30) llm = LLM(model=model_name_or_path, tensor_parallel_size=1, enable_prefix_caching=False) output = llm.generate(prompts="who are you?", use_tqdm=True, sampling_params=sampling_params) diff --git a/fastdeploy/engine/args_utils.py b/fastdeploy/engine/args_utils.py index 23812e966a6..da46bad0cc5 100644 --- a/fastdeploy/engine/args_utils.py +++ b/fastdeploy/engine/args_utils.py @@ -149,6 +149,10 @@ class EngineArgs: """ Maximum number of bytes(in GiB) in the processor cache. """ + splitwise_cache_buffer_size: Optional[float] = None + """ + The amount of CPU memory in decode to receive the cache from prefill (GB). + """ reasoning_parser: str = None """ specifies the reasoning parser to use for extracting reasoning content from the model output @@ -220,7 +224,7 @@ class EngineArgs: swap_space: float = None """ - The amount of CPU memory to offload to. + The amount of CPU memory for saving swaped cache (GB). """ cache_queue_port: str = "0" @@ -527,6 +531,21 @@ def __post_init__(self): "please set num_gpu_blocks_override for prefill " "instance using ENABLE_V1_KVCACHE_SCHEDULER." ) + if self.splitwise_cache_buffer_size is not None: + if self.splitwise_cache_buffer_size < 0: + raise ValueError("splitwise_cache_buffer_size should >= 0.") + if self.splitwise_role != "decode": + raise NotImplementedError("splitwise_cache_buffer_size params only support in decode mode now.") + if "ipc" in self.cache_transfer_protocol: + raise NotImplementedError( + "CPU cache buffer (splitwise_cache_buffer_size > 0) is not compatible with IPC cache " + "transfer protocol. Please use only RDMA protocol." + ) + if envs.ENABLE_V1_KVCACHE_SCHEDULER == 0: + raise NotImplementedError( + "splitwise_cache_buffer_size params only support when ENABLE_V1_KVCACHE_SCHEDULER=1" + ) + if not current_platform.is_cuda() and not current_platform.is_xpu(): envs.ENABLE_V1_KVCACHE_SCHEDULER = 0 if self.guided_decoding_backend != "off": @@ -989,6 +1008,12 @@ def add_cli_args(parser: FlexibleArgumentParser) -> FlexibleArgumentParser: default=EngineArgs.disable_chunked_mm_input, help="Disable chunked mm input.", ) + model_group.add_argument( + "--splitwise-cache-buffer-size", + default=EngineArgs.splitwise_cache_buffer_size, + type=float, + help="The amount of CPU memory in decode to receive the cache from prefill (GB). Default is 0.", + ) # Router parameters group router_group = parser.add_argument_group("Router") diff --git a/fastdeploy/engine/common_engine.py b/fastdeploy/engine/common_engine.py index 9f0db935cf5..95fa55e574c 100644 --- a/fastdeploy/engine/common_engine.py +++ b/fastdeploy/engine/common_engine.py @@ -23,7 +23,7 @@ import traceback import weakref from concurrent.futures import ThreadPoolExecutor -from typing import Dict, List, Optional, Tuple, Union +from typing import Dict, List, Optional, Tuple import numpy as np import paddle @@ -324,18 +324,18 @@ def start_worker_queue_service(self, start_queue): local_data_parallel_id=self.cfg.parallel_config.local_data_parallel_id, ) - def insert_tasks(self, tasks: Union[List[Request], List[RequestOutput]], current_id=-1): + def insert_tasks(self, tasks: List[Request], current_id=-1): """ - Insert tasks to engine. + Allocate resource and insert tasks to engine. + Used in v0_kvcache_scheduler. """ + if not isinstance(tasks, list): + tasks = [tasks] for task in tasks: start_span_request("DEQUEUE", task, trace.SpanKind.CONSUMER) self.resource_manager.check_and_free_block_tables() - if not isinstance(tasks, list): - tasks = [tasks] - need_delete_tasks = [] for task in tasks: if self.cfg.scheduler_config.splitwise_role != "mixed": @@ -388,7 +388,11 @@ def insert_tasks(self, tasks: Union[List[Request], List[RequestOutput]], current is_prefill = True self.token_processor.number_of_input_tokens += tasks[i].prompt_token_ids_len - self.split_connector.send_cache_infos(tasks, current_id) + if self.cfg.scheduler_config.splitwise_role == "prefill": + self.split_connector.send_cache_info_to_messager(tasks, current_id) + elif self.cfg.scheduler_config.splitwise_role == "decode": + self.split_connector.send_cache_info_to_prefill(tasks) + if not is_decode: self.llm_logger.info(f"Tasks are sent to engine, req_ids={req_ids}") for task in tasks: @@ -406,7 +410,8 @@ def insert_tasks(self, tasks: Union[List[Request], List[RequestOutput]], current def _insert_prefilled_requests(self, request_outputs: List[RequestOutput]): """ - insert prefilled requests into engine worker queue. + Decode insert prefilled requests into engine worker queue. + Used in v1_kvcache_scheduler. Args: request_outputs: a list of RequestOutput sent by prefill instance """ @@ -640,8 +645,9 @@ def _schedule_request_to_worker(self): time.sleep(0.001) continue if self.cfg.scheduler_config.splitwise_role == "decode": - # Decode will instert the request sent by prefill to engine, - # so the task sent by client will be ignored + # TODO: refine scheduler to remove this limitation + # Decode will process and schedule the request sent by prefill to engine, + # so the same request sent by the decode api server will be ignored continue llm_logger.debug(f"get tasks from scheduler: {tasks}") @@ -671,6 +677,8 @@ def _fetch_request(): try: nonlocal is_fetching is_fetching = True + + self.resource_manager.check_and_free_block_tables() num_prefill_batch = min( int(self.resource_manager.available_batch()), self.cfg.max_prefill_batch, @@ -692,8 +700,9 @@ def _fetch_request(): trace_print(LoggingEventName.REQUEST_QUEUE_END, task.request_id, getattr(task, "user", "")) if self.cfg.scheduler_config.splitwise_role == "decode": - # Decode will instert the request sent by prefill to engine, - # so the task sent by client will be ignored + # TODO: refine scheduler to remove this limitation + # Decode will process and schedule the request sent by prefill to engine, + # so the same request sent by the decode api server will be ignored is_fetching = False return @@ -744,11 +753,11 @@ def _fetch_request(): for tmp_task in need_delete_tasks: tasks.remove(tmp_task) # release resource in P - self.resource_manager.prerelease_resource(tmp_task) + self.resource_manager.pre_recycle_resource(tmp_task.request_id) if self.cfg.scheduler_config.splitwise_role == "prefill": # to send cache info to cache messager if tasks: - self.split_connector.send_cache_infos(tasks, 0) + self.split_connector.send_cache_info_to_messager(tasks, 0) # ensure cache tasks has sent to cache_messager need_check_req_ids = [task.request_id for task in tasks] while need_check_req_ids: @@ -1002,7 +1011,7 @@ def _zmq_send_generated_tokens(self): else: new_contents.append(content) if len(new_contents): - llm_logger.debug(f"Send response for request id: {request_id}") + llm_logger.debug(f"Send response for request id: {request_id}, {new_contents}") self.send_response_server.send_response(request_id, new_contents) except Exception as e: llm_logger.error(f"Unexcepted error happend: {e}, {traceback.format_exc()!s}") @@ -1041,7 +1050,7 @@ def _process_allocate_resource_requests(): if envs.ENABLE_V1_KVCACHE_SCHEDULER: if self.resource_manager.preallocate_resource_in_d(task): self.llm_logger.info(f"Resource available, processing task {task.request_id}") - self.split_connector.send_cache_infos([task], -1) + self.split_connector.send_cache_info_to_prefill([task]) processed_indices.append(idx) is_success = True else: @@ -1054,7 +1063,7 @@ def _process_allocate_resource_requests(): if not is_success: if not self.enable_decode_cache_task: task.error_msg = "Not enough resources" - self.split_connector.send_cache_infos([task], -1) + self.split_connector.send_cache_info_to_prefill([task]) processed_indices.append(idx) else: self.llm_logger.debug(f"Still waiting for resources {task.request_id}") @@ -1067,13 +1076,22 @@ def _process_prefilled_requests(): nonlocal prefilled_request_ouputs ready_request_outputs = [] waiting_request_outputs = [] - # Waiting for the api_server and scheduler in decode to - # receive the request sent by the client - for task in prefilled_request_ouputs: - if not hasattr(self.scheduler, "has_request") or self.scheduler.has_request(task.request_id): - ready_request_outputs.append(task) - else: - waiting_request_outputs.append(task) + + for req_output in prefilled_request_ouputs: + if hasattr(self.scheduler, "has_request") and not self.scheduler.has_request(req_output.request_id): + # ensure the api_server and scheduler in decode have + # received the request sent by the client + waiting_request_outputs.append(req_output) + continue + if ( + self.cfg.cache_config.enable_splitwise_cache_buffer + and not self.resource_manager.has_resource_for_prefilled_req(req_output.request_id) + ): + waiting_request_outputs.append(req_output) + continue + + ready_request_outputs.append(req_output) + self.llm_logger.debug(f"there are enough resource for prefilled request: {req_output.request_id}") prefilled_request_ouputs = waiting_request_outputs if self.cfg.splitwise_version == "v1": @@ -1083,35 +1101,27 @@ def _process_prefilled_requests(): if not envs.ENABLE_V1_KVCACHE_SCHEDULER: self._insert_prefilled_requests(ready_request_outputs) else: - for task in ready_request_outputs: - if envs.FD_ENABLE_INTERNAL_ADAPTER: - if ( - not task.outputs.token_ids - ): # first token is eos in Prefill, just recycle resource and continue - cur_req = self.resource_manager.requests[task.request_id] - self.resource_manager.stop_flags[cur_req.idx] = True - self.resource_manager.tasks_list[cur_req.idx] = None - self.resource_manager._free_blocks(cur_req) - if cur_req.request_id in self.token_processor.tokens_counter: - del self.token_processor.tokens_counter[task.request_id] - self.llm_logger.warning(f"{task.request_id} need not decode after first token") - del self.resource_manager.requests[task.request_id] - del self.resource_manager.req_dict[task.request_id] - continue - if task.error_code != 200: - cur_req = self.resource_manager.requests[task.request_id] - self.resource_manager.stop_flags[cur_req.idx] = True - self.resource_manager.tasks_list[cur_req.idx] = None - self.resource_manager._free_blocks(cur_req) - if cur_req.request_id in self.token_processor.tokens_counter: - del self.token_processor.tokens_counter[task.request_id] - self.scheduler.put_results([task]) + for req_output in ready_request_outputs: + request_id = req_output.request_id + if envs.FD_ENABLE_INTERNAL_ADAPTER and not req_output.outputs.token_ids: + # first token is eos in Prefill, just recycle resource and continue + self.llm_logger.warning(f"{request_id} need not decode after first token") + self.resource_manager.pre_recycle_resource(request_id) + if request_id in self.token_processor.tokens_counter: + del self.token_processor.tokens_counter[request_id] + continue + if req_output.error_code != 200: self.llm_logger.warning( - f"{task.request_id} prefill failed with msg:{task.error_msg}, recycle resource." + f"{request_id} prefill failed with msg:{req_output.error_msg}, recycle resource." ) + self.resource_manager.pre_recycle_resource(request_id) + if request_id in self.token_processor.tokens_counter: + del self.token_processor.tokens_counter[request_id] + self.scheduler.put_results([req_output]) continue - self.token_processor.tokens_counter[task.request_id] = 1 - self.resource_manager.insert_task_for_decoding(task) + self.token_processor.tokens_counter[request_id] = 1 + self.resource_manager.add_prefilled_request(req_output) + self.llm_logger.debug(f"add prefilled request success, {request_id}") def decode_loop(): while self.running: diff --git a/fastdeploy/engine/engine.py b/fastdeploy/engine/engine.py index 40e02445bdc..0b3cc9c55f0 100644 --- a/fastdeploy/engine/engine.py +++ b/fastdeploy/engine/engine.py @@ -403,8 +403,10 @@ def _exit_sub_services(self): llm_logger.info("Engine shut down, exiting sub services...") if hasattr(self, "cache_manager_processes"): - self.engine.resource_manager.cache_manager.shm_cache_task_flag_broadcast.clear() - self.engine.resource_manager.cache_manager.cache_ready_signal.clear() + if hasattr(self.engine.resource_manager.cache_manager, "shm_cache_task_flag_broadcast"): + self.engine.resource_manager.cache_manager.shm_cache_task_flag_broadcast.clear() + if hasattr(self.engine.resource_manager.cache_manager, "cache_ready_signal"): + self.engine.resource_manager.cache_manager.cache_ready_signal.clear() for p in self.cache_manager_processes: llm_logger.info(f"Killing cache manager process {p.pid}") try: diff --git a/fastdeploy/engine/sched/resource_manager_v1.py b/fastdeploy/engine/sched/resource_manager_v1.py index b74c772d313..450da411763 100644 --- a/fastdeploy/engine/sched/resource_manager_v1.py +++ b/fastdeploy/engine/sched/resource_manager_v1.py @@ -165,6 +165,7 @@ def __init__(self, max_num_seqs, config, tensor_parallel_size, splitwise_role, l # Priority queues for requests. self.waiting: deque[Request] = deque() self.running: list[Request] = [] + self.preallocated_reqs: dict[str, Request] = {} self.enable_max_prefill = envs.FD_ENABLE_MAX_PREFILL self.finish_execution_pool = ThreadPoolExecutor(max_workers=1) self.lock = threading.Lock() @@ -898,18 +899,27 @@ def add_request(self, request: Request) -> None: self.waiting.append(request) self.requests[request.request_id] = request - def prerelease_resource(self, request: Request): + def pre_recycle_resource(self, request_id: str): """ - Release resource in P or D before finished due to unexpected error. + Recycle resource in P or D before finished due to unexpected error. """ with self.lock: - self.tasks_list[request.idx] = None - self.stop_flags[request.idx] = True - if request.request_id in self.requests: - del self.requests[request.request_id] - if request.request_id in self.req_dict: - del self.req_dict[request.request_id] - self._free_blocks(request) + if self.config.cache_config.enable_splitwise_cache_buffer: + if request_id not in self.preallocated_reqs: + return + req = self.preallocated_reqs[request_id] + self.cache_manager.recycle_splitwise_blocks(req.disaggregate_info["block_tables"]) + del self.preallocated_reqs[request_id] + else: + if request_id not in self.requests: + return + req = self.requests[request_id] + self.tasks_list[req.idx] = None + self.stop_flags[req.idx] = True + self._free_blocks(req) + del self.requests[request_id] + if request_id in self.req_dict: + del self.req_dict[request_id] def add_request_in_p(self, requests: list[Request]): with self.lock: @@ -943,7 +953,7 @@ def preallocate_resource_in_p(self, request: Request): if not success: self._free_blocks(request) return False - # consider for mtp, plus enc_dec_block_num + need_extra_prefill_blocks = need_prealloc_prefill_blocks - request.cache_info[0] if self.cache_manager.can_allocate_gpu_blocks(need_extra_prefill_blocks): request.block_tables.extend(self.cache_manager.allocate_gpu_blocks(need_extra_prefill_blocks)) @@ -979,18 +989,31 @@ def preallocate_resource_in_d(self, request: Request): If can not, return False """ assert self.config.scheduler_config.splitwise_role == "decode", "Only D instance can call this method" - with self.lock: - if len(self.waiting) > 0: - return False - if self.available_batch() == 0: - return False - if request.reasoning_max_tokens is not None: - request.reasoning_max_tokens -= 1 - request.need_prefill_tokens = len(request.prompt_token_ids) - need_prealloc_prefill_blocks = ( - request.need_prefill_tokens + self.config.cache_config.block_size - 1 - ) // self.config.cache_config.block_size + self.config.cache_config.enc_dec_block_num # consider for mtp, plus enc_dec_block_num - if self.cache_manager.can_allocate_gpu_blocks(need_prealloc_prefill_blocks): + if request.reasoning_max_tokens is not None: + request.reasoning_max_tokens -= 1 + request.need_prefill_tokens = len(request.prompt_token_ids) + need_prealloc_prefill_blocks = ( + request.need_prefill_tokens + self.config.cache_config.block_size - 1 + ) // self.config.cache_config.block_size + self.config.cache_config.enc_dec_block_num + + if self.config.cache_config.enable_splitwise_cache_buffer: + # allocate block ids from splitwise cache buffer + with self.lock: + if not self.cache_manager.can_allocate_splitwise_blocks(need_prealloc_prefill_blocks): + return False + block_tables = self.cache_manager.allocate_splitwise_blocks(need_prealloc_prefill_blocks) + request.num_computed_tokens = request.need_prefill_tokens + request.disaggregate_info["block_tables"] = block_tables + self.preallocated_reqs[request.request_id] = request + else: # allocate block ids from gpu cache + with self.lock: + if len(self.waiting) > 0: + return False + if self.available_batch() == 0: + return False + if not self.cache_manager.can_allocate_gpu_blocks(need_prealloc_prefill_blocks): + return False + request.block_tables.extend(self.cache_manager.allocate_gpu_blocks(need_prealloc_prefill_blocks)) request.num_computed_tokens = request.need_prefill_tokens request.disaggregate_info["block_tables"] = request.block_tables @@ -1000,28 +1023,62 @@ def preallocate_resource_in_d(self, request: Request): self.stop_flags[request.idx] = False self.requests[request.request_id] = request self.req_dict[request.request_id] = allocated_position - return True - return False + return True - def insert_task_for_decoding(self, request_output_in_p: RequestOutput): + def has_resource_for_prefilled_req(self, request_id: str): """ - In P/D aggregated deployment, D should continue to decode after recieving first token and cache from P. + Check whether there are enough slot and gpu resource for the prefilled request, + of which the cache is saved in cpu buffer. """ assert self.config.scheduler_config.splitwise_role == "decode", "Only D instance can call this method" - with self.lock: - request = self.requests[request_output_in_p.request_id] - request.output_token_ids.append(request_output_in_p.outputs.token_ids[0]) - request.num_cached_tokens = request_output_in_p.num_cached_tokens - if ( - self.config.speculative_config.method in ["mtp"] - and self.config.scheduler_config.splitwise_role == "decode" - ): - request.draft_token_ids = copy.deepcopy(request_output_in_p.outputs.draft_token_ids) - # update request.need_prefill_tokens - request.need_prefill_tokens = len(request.prompt_token_ids) + 1 - request.inference_start_time = time.time() - request.schedule_start_time = time.time() - self.running.append(request) + assert request_id in self.preallocated_reqs, "request_id must be in preallocate" + need_blocks_num = len(self.preallocated_reqs[request_id].disaggregate_info["block_tables"]) + return self.available_batch() > 0 and self.cache_manager.can_allocate_gpu_blocks(need_blocks_num) + + def add_prefilled_request(self, request_output: RequestOutput): + """ + In P/D aggregated deployment, D should continue to decode after receiving first token and cache from P. + NOTE: GPU resources should be checked in advance to ensure they are sufficient for the prefilled request. + """ + assert self.config.scheduler_config.splitwise_role == "decode", "Only D instance can call this method" + if not self.config.cache_config.enable_splitwise_cache_buffer: + request = self.requests[request_output.request_id] + else: + # allocate gpu block ids, swap cpu buffer to gpu, recycle cpu buffer + request_id = request_output.request_id + request = self.preallocated_reqs.pop(request_id) + cpu_block_ids = request.disaggregate_info["block_tables"] + with self.lock: + gpu_block_ids = self.cache_manager.allocate_gpu_blocks(len(cpu_block_ids)) + request.block_tables = gpu_block_ids + + request.idx = self.get_available_position() + self.tasks_list[request.idx] = request + self.stop_flags[request.idx] = False + self.requests[request.request_id] = request + self.req_dict[request.request_id] = request.idx + + llm_logger.debug(f"call issue_splitwise_buffer_swap_task {request_id}") + tic = time.time() + self.cache_manager.issue_splitwise_buffer_swap_task(request_id, gpu_block_ids, cpu_block_ids) + llm_logger.debug(f"call issue_splitwise_buffer_swap_task {request_id} done, time: {time.time() - tic:.5f}") + + with self.lock: + llm_logger.debug(f"call recycle_splitwise_blocks {request_id}") + self.cache_manager.recycle_splitwise_blocks(cpu_block_ids) + + # update request and insert to running + request.output_token_ids.append(request_output.outputs.token_ids[0]) + request.num_cached_tokens = request_output.num_cached_tokens + if ( + self.config.speculative_config.method in ["mtp"] + and self.config.scheduler_config.splitwise_role == "decode" + ): + request.draft_token_ids = copy.deepcopy(request_output.outputs.draft_token_ids) + request.need_prefill_tokens = len(request.prompt_token_ids) + 1 + request.inference_start_time = time.time() + request.schedule_start_time = time.time() + self.running.append(request) def _free_blocks(self, request: Request): if self.config.cache_config.enable_prefix_caching: diff --git a/fastdeploy/inter_communicator/engine_worker_queue.py b/fastdeploy/inter_communicator/engine_worker_queue.py index 1171e6fb6e9..e0dec469cbb 100644 --- a/fastdeploy/inter_communicator/engine_worker_queue.py +++ b/fastdeploy/inter_communicator/engine_worker_queue.py @@ -671,7 +671,7 @@ def get_connect_rdma_task_response(self): self.connect_task_response_lock.release() return task_response - def put_cache_info(self, cache_info) -> None: + def put_cache_info(self, cache_info: List[Any]) -> None: """ Args: tasks: Tasks to be added to the queue diff --git a/fastdeploy/router/__init__.py b/fastdeploy/router/__init__.py index 31be300c18e..029a4b4fed0 100644 --- a/fastdeploy/router/__init__.py +++ b/fastdeploy/router/__init__.py @@ -13,3 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. """ + +from .router import RouterArgs, launch_router + +__all__ = ["RouterArgs", "launch_router"] diff --git a/fastdeploy/router/launch.py b/fastdeploy/router/launch.py index 421baa65e82..46c3c03a14f 100644 --- a/fastdeploy/router/launch.py +++ b/fastdeploy/router/launch.py @@ -14,41 +14,18 @@ # limitations under the License. """ -import argparse - -from fastdeploy.router.router import start_router +from fastdeploy.router.router import RouterArgs, launch_router +from fastdeploy.utils import FlexibleArgumentParser from fastdeploy.utils import router_logger as logger def main() -> None: - parser = argparse.ArgumentParser(description="Router for splitwise deployment testing") - parser.add_argument( - "--host", - type=str, - default="0.0.0.0", - help="Host address to bind the router server.", - ) - parser.add_argument( - "--port", - type=int, - default="9000", - help="Port number to bind the router server", - ) - parser.add_argument( - "--splitwise", - action="store_true", - help="Router uses splitwise deployment", - ) - parser.add_argument( - "--request-timeout-secs", - type=int, - default=1800, - help="Request timeout in seconds", - ) + parser = FlexibleArgumentParser() + parser = RouterArgs.add_cli_args(parser) args = parser.parse_args() try: - start_router(args) + launch_router(args) except Exception as e: logger.error(f"Error starting router: {e}") raise e diff --git a/fastdeploy/router/router.py b/fastdeploy/router/router.py index 3ff4cd37125..fb5522abbe6 100644 --- a/fastdeploy/router/router.py +++ b/fastdeploy/router/router.py @@ -6,6 +6,7 @@ import asyncio import random +from dataclasses import dataclass from itertools import chain from uuid import uuid4 @@ -19,11 +20,60 @@ InstanceRole, check_service_health_async, ) +from fastdeploy.utils import FlexibleArgumentParser from fastdeploy.utils import router_logger as logger app = FastAPI() +@dataclass +class RouterArgs: + host: str = "0.0.0.0" + """ + Host address to bind the router server + """ + port: str = "9000" + """ + Port to bind the router server. + """ + splitwise: bool = False + """ + Router uses splitwise deployment + """ + request_timeout_secs: int = 1800 + """ + Request timeout in seconds + """ + + @staticmethod + def add_cli_args(parser: FlexibleArgumentParser) -> FlexibleArgumentParser: + parser.add_argument( + "--host", + type=str, + default=RouterArgs.host, + help="Host address to bind the router server.", + ) + parser.add_argument( + "--port", + type=int, + default=RouterArgs.port, + help="Port number to bind the router server", + ) + parser.add_argument( + "--splitwise", + action="store_true", + default=RouterArgs.splitwise, + help="Router uses splitwise deployment", + ) + parser.add_argument( + "--request-timeout-secs", + type=int, + default=RouterArgs.request_timeout_secs, + help="Request timeout in seconds", + ) + return parser + + class Router: """ Router class that handles requests from client and @@ -306,8 +356,9 @@ async def health_generate(): return Response(status_code=200) -def start_router(router_args): +def launch_router(router_args: RouterArgs): app.state.router_args = router_args + print(f"Starting router with args: {router_args}") @app.on_event("startup") async def startup_event(): diff --git a/fastdeploy/scheduler/local_scheduler.py b/fastdeploy/scheduler/local_scheduler.py index 548789f7a79..8684270cbdf 100644 --- a/fastdeploy/scheduler/local_scheduler.py +++ b/fastdeploy/scheduler/local_scheduler.py @@ -285,7 +285,7 @@ def get_requests( if short_partial_requests + long_partial_requests > self.max_num_partial_prefills: break else: - if current_prefill_tokens > max_num_batched_tokens: + if current_prefill_tokens > max_num_batched_tokens and len(requests) > 0: break requests.append(request.raw) diff --git a/fastdeploy/splitwise/splitwise_connector.py b/fastdeploy/splitwise/splitwise_connector.py index 8daab42ddf1..5952c3ef829 100644 --- a/fastdeploy/splitwise/splitwise_connector.py +++ b/fastdeploy/splitwise/splitwise_connector.py @@ -53,7 +53,6 @@ def __init__(self, cfg, worker_queue, resource_manager): self.engine_worker_queue = worker_queue self.resource_manager = resource_manager self.connect_innode_instances = {} - self.temp_cache_info = dict() self.current_request_ids = dict() self.idx = self.cfg.parallel_config.local_data_parallel_id self.enable_decode_cache_task = envs.FD_ENABLE_CACHE_TASK == "1" @@ -291,98 +290,96 @@ def check_decode_allocated(self, task): self.logger.error(f"Receive_decode_allocated error: {msg}") return False, msg - def send_cache_infos(self, tasks: List[Request], current_id): + def send_cache_info_to_messager(self, tasks: List[Request], current_id): """ - Send cache information to specific port. + Prefill sends the request with allocated block ids to cache messager by engine worker queue. - Parameters: - tasks (list): List of tasks. - current_id (int): Current id to indicate the prefill number. - - Returns: - bool: Whether it is in decode status. + args: + tasks (list): List of tasks. + current_id (int): Current id to indicate the prefill number. """ - is_decode = False - temp_cache_info = dict() + cache_info = [] for i in range(len(tasks)): - if tasks[i].disaggregate_info is None: + dsg_info = tasks[i].disaggregate_info + if dsg_info is None: continue - self.logger.info(f"{tasks[i].disaggregate_info}") - if tasks[i].disaggregate_info["role"] == "decode": - if tasks[i].disaggregate_info["transfer_protocol"] == "ipc": - cache_info = { - "request_id": tasks[i].request_id, - "device_ids": self.cfg.parallel_config.device_ids.split(","), - "transfer_protocol": "ipc", - "dest_block_ids": tasks[i].disaggregate_info["block_tables"], - } - if tasks[i].disaggregate_info["cache_info"]["ipc"]["port"] not in temp_cache_info: - temp_cache_info[tasks[i].disaggregate_info["cache_info"]["ipc"]["port"]] = [] - temp_cache_info[tasks[i].disaggregate_info["cache_info"]["ipc"]["port"]].append(cache_info) - else: - addr = ( - f"{tasks[i].disaggregate_info['cache_info']['rdma']['ip']}:" - + f"{tasks[i].disaggregate_info['cache_info']['rdma']['port']}" - ) - if tasks[i].get("error_msg", None) is not None: - cache_info = { - "request_id": tasks[i].request_id, - "error_msg": tasks[i].get("error_msg"), - } - else: - cache_info = { - "request_id": tasks[i].request_id, - "device_ids": self.cfg.parallel_config.device_ids.split(","), - "ip": self.cfg.host_ip, - "rdma_ports": self.cfg.disaggregate_info["cache_info"]["rdma"]["rdma_port"], - "transfer_protocol": "rdma", - "dest_block_ids": tasks[i].disaggregate_info["block_tables"], - } - if addr not in temp_cache_info: - temp_cache_info[addr] = [] - - temp_cache_info[addr].append(cache_info) - is_decode = True + if envs.ENABLE_V1_KVCACHE_SCHEDULER: + info = { + "request_id": tasks[i].request_id, + "src_block_ids": tasks[i].block_tables, + "current_id": tasks[i].idx, + "need_prefill_tokens": tasks[i].need_prefill_tokens, + } else: - addr = "prefill" if current_id == -1: - current_id = tasks[i].disaggregate_info["cache_info"]["ipc"]["current_id"] - if envs.ENABLE_V1_KVCACHE_SCHEDULER: - cache_info = { + current_id = dsg_info["cache_info"]["ipc"]["current_id"] + info = { + "request_id": tasks[i].request_id, + "src_block_ids": tasks[i].block_tables, + "current_id": current_id, + } + cache_info.append(info) + + self.logger.debug(f"send_cache_info_to_messager, {cache_info}") + self.engine_worker_queue.put_cache_info(cache_info) + + def send_cache_info_to_prefill(self, tasks: List[Request]): + """ + Decode sends the request with allocated block ids to prefill. + + args: + tasks (list): List of tasks. + """ + cache_info = dict() + for i in range(len(tasks)): + dsg_info = tasks[i].disaggregate_info + if dsg_info is None: + self.logger.debug(f"skip send_cache_infos_to_prefill, {tasks[i].request_id}") + continue + self.logger.debug(f"send_cache_infos_to_prefill, {dsg_info}") + + if dsg_info["transfer_protocol"] == "ipc": + info = { + "request_id": tasks[i].request_id, + "device_ids": self.cfg.parallel_config.device_ids.split(","), + "transfer_protocol": "ipc", + "dest_block_ids": dsg_info["block_tables"], + } + if dsg_info["cache_info"]["ipc"]["port"] not in cache_info: + cache_info[dsg_info["cache_info"]["ipc"]["port"]] = [] + cache_info[dsg_info["cache_info"]["ipc"]["port"]].append(info) + else: + if tasks[i].get("error_msg", None) is not None: + info = { "request_id": tasks[i].request_id, - "src_block_ids": tasks[i].block_tables, - "current_id": tasks[i].idx, - "need_prefill_tokens": tasks[i].need_prefill_tokens, + "error_msg": tasks[i].get("error_msg"), } else: - cache_info = { + info = { "request_id": tasks[i].request_id, - "src_block_ids": tasks[i].block_tables, - "current_id": current_id, + "device_ids": self.cfg.parallel_config.device_ids.split(","), + "ip": self.cfg.host_ip, + "rdma_ports": self.cfg.disaggregate_info["cache_info"]["rdma"]["rdma_port"], + "transfer_protocol": "rdma", + "dest_block_ids": dsg_info["block_tables"], } - if addr not in temp_cache_info: - temp_cache_info[addr] = [] - temp_cache_info[addr].append(cache_info) - - if not is_decode and len(temp_cache_info): - for k, v in temp_cache_info.items(): - self.logger.debug(f"send cache info to cachemessager, {v}") - self.engine_worker_queue.put_cache_info(v) - else: - self.logger.debug(f"send cache info to coupled instance, {temp_cache_info}") - if len(temp_cache_info): - for k, v in temp_cache_info.items(): - self.logger.info(f"{k} {v}") - if ":" in str(k): - self._send_message(k, "cache_sync", v) - else: - if k not in self.connect_innode_instances: - self.create_connection(k) - self.connect_innode_instances[k].put_cache_info(v) - - return is_decode + addr = f"{dsg_info['cache_info']['rdma']['ip']}:" + f"{dsg_info['cache_info']['rdma']['port']}" + if addr not in cache_info: + cache_info[addr] = [] + cache_info[addr].append(info) + + self.logger.debug(f"send cache info to prefill, {cache_info}") + if len(cache_info): + for k, v in cache_info.items(): + self.logger.info(f"{k} {v}") + if ":" in str(k): + self._send_message(k, "cache_sync", v) + else: + if k not in self.connect_innode_instances: + self.create_connection(k) + self.connect_innode_instances[k].put_cache_info(v) def _serialize_message(self, msg_type: str, payload) -> bytes: # TODO 压缩 diff --git a/tests/cache_manager/test_cache_transfer_manager.py b/tests/cache_manager/test_cache_transfer_manager.py index 96f0b2ada26..12077564425 100644 --- a/tests/cache_manager/test_cache_transfer_manager.py +++ b/tests/cache_manager/test_cache_transfer_manager.py @@ -25,6 +25,7 @@ class Args: key_cache_shape = "1,1,1,1" value_cache_shape = "" create_cache_tensor = False + splitwise_cache_buffer_block_num = 0 # ========================== diff --git a/tests/e2e/test_ernie_03b_pd_router_v0.py b/tests/e2e/test_ernie_03b_pd_router_v0.py index c8da6adbb67..220e653b08e 100644 --- a/tests/e2e/test_ernie_03b_pd_router_v0.py +++ b/tests/e2e/test_ernie_03b_pd_router_v0.py @@ -111,7 +111,7 @@ def setup_and_run_server(): env_prefill["CUDA_VISIBLE_DEVICES"] = "0" env_prefill["ENABLE_V1_KVCACHE_SCHEDULER"] = "0" env_prefill["FD_LOG_DIR"] = "log_prefill" - prefill_log_path = "server.log" + prefill_log_path = "prefill.log" prefill_cmd = [ sys.executable, "-m", @@ -161,7 +161,7 @@ def setup_and_run_server(): env_decode["CUDA_VISIBLE_DEVICES"] = "1" env_decode["ENABLE_V1_KVCACHE_SCHEDULER"] = "0" env_decode["FD_LOG_DIR"] = "log_decode" - decode_log_path = "decode_server.log" + decode_log_path = "decode.log" decode_cmd = [ sys.executable, "-m", diff --git a/tests/e2e/test_ernie_03b_pd_router_v1_cpu_buffer.py b/tests/e2e/test_ernie_03b_pd_router_v1_cpu_buffer.py new file mode 100644 index 00000000000..86dd0386506 --- /dev/null +++ b/tests/e2e/test_ernie_03b_pd_router_v1_cpu_buffer.py @@ -0,0 +1,426 @@ +# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Test splitwise deployment which uses local_scheduler + router, +# the ENABLE_V1_KVCACHE_SCHEDULER is 1, and the decode use cpu buffer +# to receive cache from prefill. + +import json +import os +import shutil +import signal +import subprocess +import sys +import time + +import pytest +import requests +from utils.serving_utils import ( + FD_API_PORT, + FD_CACHE_QUEUE_PORT, + FD_ENGINE_QUEUE_PORT, + FD_METRICS_PORT, + clean_ports, + get_registered_number, +) + +# Read ports from environment variables; use default values if not set +FD_CONNECTOR_PORT = int(os.getenv("FD_CONNECTOR_PORT", 8433)) +FD_ROUTER_PORT = int(os.getenv("FD_ROUTER_PORT", 8533)) +FD_RDMA_PORT = int(os.getenv("FD_RDMA_PORT", 8623)) + +# List of ports to clean before and after tests +PORTS_TO_CLEAN = [ + FD_API_PORT, + FD_ENGINE_QUEUE_PORT, + FD_METRICS_PORT, + FD_CACHE_QUEUE_PORT, + FD_CONNECTOR_PORT, + FD_RDMA_PORT, + FD_API_PORT + 1, + FD_ENGINE_QUEUE_PORT + 1, + FD_METRICS_PORT + 1, + FD_CACHE_QUEUE_PORT + 1, + FD_CONNECTOR_PORT + 1, + FD_RDMA_PORT + 1, + FD_ROUTER_PORT, +] + + +@pytest.fixture(scope="session", autouse=True) +def setup_and_run_server(): + """ + Pytest fixture that runs once per test session: + - Cleans ports before tests + - Starts the API server as a subprocess + - Waits for server port to open (up to 30 seconds) + - Tears down server after all tests finish + """ + print("Pre-test port cleanup...") + clean_ports(PORTS_TO_CLEAN) + + print("log dir clean ") + if os.path.exists("log_router") and os.path.isdir("log_router"): + shutil.rmtree("log_router") + if os.path.exists("log_prefill") and os.path.isdir("log_prefill"): + shutil.rmtree("log_prefill") + if os.path.exists("log_decode") and os.path.isdir("log_decode"): + shutil.rmtree("log_decode") + + base_path = os.getenv("MODEL_PATH") + if base_path: + model_path = os.path.join(base_path, "ERNIE-4.5-0.3B-Paddle") + else: + model_path = "baidu/ERNIE-4.5-0.3B-Paddle" + print(f"model_path: {model_path}") + + # get rdma nics + current_dir = os.path.dirname(os.path.abspath(__file__)) + shell_path = os.path.join(current_dir, "utils/get_rdma_nics.sh") + output = subprocess.check_output(["bash", shell_path, "gpu"], text=True) + _, rdma_nics = output.split("=") + print(f"shell_path: {shell_path}, rdma_nics: {rdma_nics}") + + # router + print("start router...") + env_router = os.environ.copy() + env_router["FD_LOG_DIR"] = "log_router" + router_log_path = "router.log" + + router_cmd = [ + sys.executable, + "-m", + "fastdeploy.router.launch", + "--port", + str(FD_ROUTER_PORT), + "--splitwise", + ] + + with open(router_log_path, "w") as logfile: + process_router = subprocess.Popen( + router_cmd, + stdout=logfile, + stderr=subprocess.STDOUT, + start_new_session=True, # Enables killing full group via os.killpg + env=env_router, + ) + + # prefill实例 + print("start prefill...") + env_prefill = os.environ.copy() + env_prefill["CUDA_VISIBLE_DEVICES"] = "0" + env_prefill["FD_LOG_DIR"] = "log_prefill" + env_prefill["KVCACHE_RDMA_NICS"] = rdma_nics + + prefill_log_path = "prefill.log" + prefill_cmd = [ + sys.executable, + "-m", + "fastdeploy.entrypoints.openai.api_server", + "--model", + model_path, + "--port", + str(FD_API_PORT), + "--engine-worker-queue-port", + str(FD_ENGINE_QUEUE_PORT), + "--metrics-port", + str(FD_METRICS_PORT), + "--cache-queue-port", + str(FD_CACHE_QUEUE_PORT), + "--max-model-len", + "8192", + "--num-gpu-blocks-override", + "2000", + "--splitwise-role", + "prefill", + "--cache-transfer-protocol", + "rdma", + "--rdma-comm-ports", + str(FD_RDMA_PORT), + "--pd-comm-port", + str(FD_CONNECTOR_PORT), + "--router", + f"0.0.0.0:{FD_ROUTER_PORT}", + ] + + # Start subprocess in new process group + with open(prefill_log_path, "w") as logfile: + process_prefill = subprocess.Popen( + prefill_cmd, + stdout=logfile, + stderr=subprocess.STDOUT, + start_new_session=True, # Enables killing full group via os.killpg + env=env_prefill, + ) + time.sleep(1) + + # decode实例 + print("start decode...") + env_decode = os.environ.copy() + env_decode["CUDA_VISIBLE_DEVICES"] = "1" + env_decode["FD_LOG_DIR"] = "log_decode" + env_decode["KVCACHE_RDMA_NICS"] = rdma_nics + decode_log_path = "decode.log" + decode_cmd = [ + sys.executable, + "-m", + "fastdeploy.entrypoints.openai.api_server", + "--model", + model_path, + "--port", + str(FD_API_PORT + 1), + "--engine-worker-queue-port", + str(FD_ENGINE_QUEUE_PORT + 1), + "--metrics-port", + str(FD_METRICS_PORT + 1), + "--cache-queue-port", + str(FD_CACHE_QUEUE_PORT + 1), + "--max-model-len", + "8192", + "--splitwise-role", + "decode", + "--cache-transfer-protocol", + "rdma", + "--splitwise-cache-buffer-size", + "2", + "--rdma-comm-ports", + str(FD_RDMA_PORT + 1), + "--pd-comm-port", + str(FD_CONNECTOR_PORT + 1), + "--router", + f"0.0.0.0:{FD_ROUTER_PORT}", + ] + + # Start subprocess in new process group + with open(decode_log_path, "w") as logfile: + process_decode = subprocess.Popen( + decode_cmd, + stdout=logfile, + stderr=subprocess.STDOUT, + start_new_session=True, # Enables killing full group via os.killpg + env=env_decode, + ) + + # Wait up to 300 seconds for API server to be ready + for _ in range(60): + registered_numbers = get_registered_number(f"0.0.0.0:{FD_ROUTER_PORT}") + if registered_numbers["prefill"] >= 1 and registered_numbers["decode"] >= 1: + print("Prefill and decode servers are both online") + break + time.sleep(5) + else: + print("[TIMEOUT] API server failed to start in 5 minutes. Cleaning up...") + try: + os.killpg(process_prefill.pid, signal.SIGTERM) + os.killpg(process_decode.pid, signal.SIGTERM) + clean_ports(PORTS_TO_CLEAN) + except Exception as e: + print(f"Failed to kill process group: {e}") + raise RuntimeError(f"API server did not start on port {FD_API_PORT}") + + yield # Run tests + + print("\n===== Post-test server cleanup... =====") + try: + os.killpg(process_router.pid, signal.SIGTERM) + os.killpg(process_prefill.pid, signal.SIGTERM) + os.killpg(process_decode.pid, signal.SIGTERM) + clean_ports(PORTS_TO_CLEAN) + print(f"Prefill server (pid={process_prefill.pid}) terminated") + print(f"Decode server (pid={process_decode.pid}) terminated") + except Exception as e: + print(f"Failed to terminate API server: {e}") + + +@pytest.fixture(scope="session") +def api_url(request): + """ + Returns the API endpoint URL for chat completions. + """ + return f"http://0.0.0.0:{FD_ROUTER_PORT}/v1/chat/completions" + + +@pytest.fixture(scope="session") +def metrics_url(request): + """ + Returns the metrics endpoint URL. + """ + return f"http://0.0.0.0:{FD_METRICS_PORT}/metrics" + + +@pytest.fixture +def headers(): + """ + Returns common HTTP request headers. + """ + return {"Content-Type": "application/json"} + + +def test_metrics_config(metrics_url): + timeout = 600 + url = metrics_url.replace("metrics", "config-info") + res = requests.get(url, timeout=timeout) + assert res.status_code == 200 + + +def send_request(url, payload, timeout=600): + """ + 发送请求到指定的URL,并返回响应结果。 + """ + headers = { + "Content-Type": "application/json", + } + + try: + res = requests.post(url, headers=headers, json=payload, timeout=timeout) + print("🟢 接收响应中...\n") + return res + except requests.exceptions.Timeout: + print(f"❌ 请求超时(超过 {timeout} 秒)") + return None + except requests.exceptions.RequestException as e: + print(f"❌ 请求失败:{e}") + return None + + +def get_stream_chunks(response): + """解析流式返回,生成chunk List[dict]""" + chunks = [] + + if response.status_code == 200: + for line in response.iter_lines(decode_unicode=True): + if line: + if line.startswith("data: "): + line = line[len("data: ") :] + + if line.strip() == "[DONE]": + break + + try: + chunk = json.loads(line) + chunks.append(chunk) + except Exception as e: + print(f"解析失败: {e}, 行内容: {line}") + else: + print(f"请求失败,状态码: {response.status_code}") + print("返回内容:", response.text) + + return chunks + + +def test_chat_usage_stream(api_url): + """测试流式chat usage""" + payload = { + "model": "default", + "temperature": 0, + "top_p": 0, + "seed": 33, + "messages": [ + {"role": "system", "content": "You are a helpful assistant."}, + {"role": "user", "content": "牛顿的三大运动定律是什么?"}, + ], + "max_tokens": 50, + "stream": True, + "stream_options": {"include_usage": True, "continuous_usage_stats": True}, + "metadata": {"min_tokens": 10}, + } + + response = send_request(url=api_url, payload=payload) + chunks = get_stream_chunks(response) + result = "".join([x["choices"][0]["delta"]["content"] for x in chunks[:-1]]) + print("Decode Response:", result) + assert result != "", "结果为空" + usage = chunks[-1]["usage"] + total_tokens = usage["completion_tokens"] + usage["prompt_tokens"] + assert payload["max_tokens"] >= usage["completion_tokens"], "completion_tokens大于max_tokens" + assert payload["metadata"]["min_tokens"] <= usage["completion_tokens"], "completion_tokens小于min_tokens" + assert usage["total_tokens"] == total_tokens, "total_tokens不等于prompt_tokens + completion_tokens" + + +def test_chat_usage_non_stream(api_url): + """测试非流式chat usage""" + payload = { + "model": "default", + "temperature": 0, + "top_p": 0, + "seed": 33, + "messages": [ + {"role": "system", "content": "You are a helpful assistant."}, + {"role": "user", "content": "牛顿的三大运动定律是什么?"}, + ], + "max_tokens": 50, + "stream": False, + "metadata": {"min_tokens": 10}, + } + + response = send_request(url=api_url, payload=payload).json() + usage = response["usage"] + result = response["choices"][0]["message"]["content"] + assert result != "", "结果为空" + total_tokens = usage["completion_tokens"] + usage["prompt_tokens"] + assert payload["max_tokens"] >= usage["completion_tokens"], "completion_tokens大于max_tokens" + assert payload["metadata"]["min_tokens"] <= usage["completion_tokens"], "completion_tokens小于min_tokens" + assert usage["total_tokens"] == total_tokens, "total_tokens不等于prompt_tokens + completion_tokens" + + +def test_non_chat_usage_stream(api_url): + """测试流式非chat usage""" + payload = { + "model": "default", + "temperature": 0, + "top_p": 0, + "seed": 33, + "prompt": "牛顿的三大运动定律是什么?", + "max_tokens": 50, + "stream": True, + "stream_options": {"include_usage": True, "continuous_usage_stats": True}, + "metadata": {"min_tokens": 10}, + } + api_url = api_url.replace("chat/completions", "completions") + + response = send_request(url=api_url, payload=payload) + chunks = get_stream_chunks(response) + result = "".join([x["choices"][0]["text"] for x in chunks[:-1]]) + print("Decode Response:", result) + assert result != "", "结果为空" + usage = chunks[-1]["usage"] + total_tokens = usage["completion_tokens"] + usage["prompt_tokens"] + assert payload["max_tokens"] >= usage["completion_tokens"], "completion_tokens大于max_tokens" + assert payload["metadata"]["min_tokens"] <= usage["completion_tokens"], "completion_tokens小于min_tokens" + assert usage["total_tokens"] == total_tokens, "total_tokens不等于prompt_tokens + completion_tokens" + + +def test_non_chat_usage_non_stream(api_url): + """测试非流式非chat usage""" + payload = { + "model": "default", + "temperature": 0, + "top_p": 0, + "seed": 33, + "prompt": "牛顿的三大运动定律是什么?", + "max_tokens": 50, + "stream": False, + "metadata": {"min_tokens": 10}, + } + api_url = api_url.replace("chat/completions", "completions") + + response = send_request(url=api_url, payload=payload).json() + usage = response["usage"] + result = response["choices"][0]["text"] + print("Decode Response:", result) + assert result != "", "结果为空" + total_tokens = usage["completion_tokens"] + usage["prompt_tokens"] + assert payload["max_tokens"] >= usage["completion_tokens"], "completion_tokens大于max_tokens" + assert payload["metadata"]["min_tokens"] <= usage["completion_tokens"], "completion_tokens小于min_tokens" + assert usage["total_tokens"] == total_tokens, "total_tokens不等于prompt_tokens + completion_tokens" diff --git a/tests/e2e/test_ernie_03b_pd_router_v1_rdma.py b/tests/e2e/test_ernie_03b_pd_router_v1_rdma.py new file mode 100644 index 00000000000..9df153240a4 --- /dev/null +++ b/tests/e2e/test_ernie_03b_pd_router_v1_rdma.py @@ -0,0 +1,424 @@ +# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Test splitwise deployment: use local_scheduler + router, +# set ENABLE_V1_KVCACHE_SCHEDULER is 1, use rdma to transfer cache. + +import json +import os +import shutil +import signal +import subprocess +import sys +import time + +import pytest +import requests +from utils.serving_utils import ( + FD_API_PORT, + FD_CACHE_QUEUE_PORT, + FD_ENGINE_QUEUE_PORT, + FD_METRICS_PORT, + clean_ports, + get_registered_number, +) + +# Read ports from environment variables; use default values if not set +FD_CONNECTOR_PORT = int(os.getenv("FD_CONNECTOR_PORT", 8433)) +FD_ROUTER_PORT = int(os.getenv("FD_ROUTER_PORT", 8533)) +FD_RDMA_PORT = int(os.getenv("FD_RDMA_PORT", 8623)) + +# List of ports to clean before and after tests +PORTS_TO_CLEAN = [ + FD_API_PORT, + FD_ENGINE_QUEUE_PORT, + FD_METRICS_PORT, + FD_CACHE_QUEUE_PORT, + FD_CONNECTOR_PORT, + FD_RDMA_PORT, + FD_API_PORT + 1, + FD_ENGINE_QUEUE_PORT + 1, + FD_METRICS_PORT + 1, + FD_CACHE_QUEUE_PORT + 1, + FD_CONNECTOR_PORT + 1, + FD_RDMA_PORT + 1, + FD_ROUTER_PORT, +] + + +@pytest.fixture(scope="session", autouse=True) +def setup_and_run_server(): + """ + Pytest fixture that runs once per test session: + - Cleans ports before tests + - Starts the API server as a subprocess + - Waits for server port to open (up to 30 seconds) + - Tears down server after all tests finish + """ + print("Pre-test port cleanup...") + clean_ports(PORTS_TO_CLEAN) + + print("log dir clean ") + if os.path.exists("log_router") and os.path.isdir("log_router"): + shutil.rmtree("log_router") + if os.path.exists("log_prefill") and os.path.isdir("log_prefill"): + shutil.rmtree("log_prefill") + if os.path.exists("log_decode") and os.path.isdir("log_decode"): + shutil.rmtree("log_decode") + + base_path = os.getenv("MODEL_PATH") + if base_path: + model_path = os.path.join(base_path, "ERNIE-4.5-0.3B-Paddle") + else: + model_path = "baidu/ERNIE-4.5-0.3B-Paddle" + print(f"model_path: {model_path}") + + # get rdma nics + current_dir = os.path.dirname(os.path.abspath(__file__)) + shell_path = os.path.join(current_dir, "utils/get_rdma_nics.sh") + output = subprocess.check_output(["bash", shell_path, "gpu"], text=True) + _, rdma_nics = output.split("=") + print(f"shell_path: {shell_path}, rdma_nics: {rdma_nics}") + + # router + print("start router...") + env_router = os.environ.copy() + env_router["FD_LOG_DIR"] = "log_router" + router_log_path = "router.log" + + router_cmd = [ + sys.executable, + "-m", + "fastdeploy.router.launch", + "--port", + str(FD_ROUTER_PORT), + "--splitwise", + ] + + with open(router_log_path, "w") as logfile: + process_router = subprocess.Popen( + router_cmd, + stdout=logfile, + stderr=subprocess.STDOUT, + start_new_session=True, # Enables killing full group via os.killpg + env=env_router, + ) + + # prefill实例 + print("start prefill...") + env_prefill = os.environ.copy() + env_prefill["CUDA_VISIBLE_DEVICES"] = "0" + env_prefill["FD_LOG_DIR"] = "log_prefill" + env_prefill["KVCACHE_RDMA_NICS"] = rdma_nics + + prefill_log_path = "prefill.log" + prefill_cmd = [ + sys.executable, + "-m", + "fastdeploy.entrypoints.openai.api_server", + "--model", + model_path, + "--port", + str(FD_API_PORT), + "--engine-worker-queue-port", + str(FD_ENGINE_QUEUE_PORT), + "--metrics-port", + str(FD_METRICS_PORT), + "--cache-queue-port", + str(FD_CACHE_QUEUE_PORT), + "--max-model-len", + "8192", + "--num-gpu-blocks-override", + "2000", + "--splitwise-role", + "prefill", + "--cache-transfer-protocol", + "rdma", + "--rdma-comm-ports", + str(FD_RDMA_PORT), + "--pd-comm-port", + str(FD_CONNECTOR_PORT), + "--router", + f"0.0.0.0:{FD_ROUTER_PORT}", + ] + + # Start subprocess in new process group + with open(prefill_log_path, "w") as logfile: + process_prefill = subprocess.Popen( + prefill_cmd, + stdout=logfile, + stderr=subprocess.STDOUT, + start_new_session=True, # Enables killing full group via os.killpg + env=env_prefill, + ) + time.sleep(1) + + # decode实例 + print("start decode...") + env_decode = os.environ.copy() + env_decode["CUDA_VISIBLE_DEVICES"] = "1" + env_decode["FD_LOG_DIR"] = "log_decode" + env_decode["KVCACHE_RDMA_NICS"] = rdma_nics + + decode_log_path = "decode.log" + decode_cmd = [ + sys.executable, + "-m", + "fastdeploy.entrypoints.openai.api_server", + "--model", + model_path, + "--port", + str(FD_API_PORT + 1), + "--engine-worker-queue-port", + str(FD_ENGINE_QUEUE_PORT + 1), + "--metrics-port", + str(FD_METRICS_PORT + 1), + "--cache-queue-port", + str(FD_CACHE_QUEUE_PORT + 1), + "--max-model-len", + "8192", + "--splitwise-role", + "decode", + "--cache-transfer-protocol", + "rdma", + "--rdma-comm-ports", + str(FD_RDMA_PORT + 1), + "--pd-comm-port", + str(FD_CONNECTOR_PORT + 1), + "--router", + f"0.0.0.0:{FD_ROUTER_PORT}", + ] + + # Start subprocess in new process group + with open(decode_log_path, "w") as logfile: + process_decode = subprocess.Popen( + decode_cmd, + stdout=logfile, + stderr=subprocess.STDOUT, + start_new_session=True, # Enables killing full group via os.killpg + env=env_decode, + ) + + # Wait up to 300 seconds for API server to be ready + for _ in range(60): + registered_numbers = get_registered_number(f"0.0.0.0:{FD_ROUTER_PORT}") + if registered_numbers["prefill"] >= 1 and registered_numbers["decode"] >= 1: + print("Prefill and decode servers are both online") + break + time.sleep(5) + else: + print("[TIMEOUT] API server failed to start in 5 minutes. Cleaning up...") + try: + os.killpg(process_prefill.pid, signal.SIGTERM) + os.killpg(process_decode.pid, signal.SIGTERM) + clean_ports(PORTS_TO_CLEAN) + except Exception as e: + print(f"Failed to kill process group: {e}") + raise RuntimeError(f"API server did not start on port {FD_API_PORT}") + + yield # Run tests + + print("\n===== Post-test server cleanup... =====") + try: + os.killpg(process_router.pid, signal.SIGTERM) + os.killpg(process_prefill.pid, signal.SIGTERM) + os.killpg(process_decode.pid, signal.SIGTERM) + clean_ports(PORTS_TO_CLEAN) + print(f"Prefill server (pid={process_prefill.pid}) terminated") + print(f"Decode server (pid={process_decode.pid}) terminated") + except Exception as e: + print(f"Failed to terminate API server: {e}") + + +@pytest.fixture(scope="session") +def api_url(request): + """ + Returns the API endpoint URL for chat completions. + """ + return f"http://0.0.0.0:{FD_ROUTER_PORT}/v1/chat/completions" + + +@pytest.fixture(scope="session") +def metrics_url(request): + """ + Returns the metrics endpoint URL. + """ + return f"http://0.0.0.0:{FD_METRICS_PORT}/metrics" + + +@pytest.fixture +def headers(): + """ + Returns common HTTP request headers. + """ + return {"Content-Type": "application/json"} + + +def test_metrics_config(metrics_url): + timeout = 600 + url = metrics_url.replace("metrics", "config-info") + res = requests.get(url, timeout=timeout) + assert res.status_code == 200 + + +def send_request(url, payload, timeout=600): + """ + 发送请求到指定的URL,并返回响应结果。 + """ + headers = { + "Content-Type": "application/json", + } + + try: + res = requests.post(url, headers=headers, json=payload, timeout=timeout) + print("🟢 接收响应中...\n") + return res + except requests.exceptions.Timeout: + print(f"❌ 请求超时(超过 {timeout} 秒)") + return None + except requests.exceptions.RequestException as e: + print(f"❌ 请求失败:{e}") + return None + + +def get_stream_chunks(response): + """解析流式返回,生成chunk List[dict]""" + chunks = [] + + if response.status_code == 200: + for line in response.iter_lines(decode_unicode=True): + if line: + if line.startswith("data: "): + line = line[len("data: ") :] + + if line.strip() == "[DONE]": + break + + try: + chunk = json.loads(line) + chunks.append(chunk) + except Exception as e: + print(f"解析失败: {e}, 行内容: {line}") + else: + print(f"请求失败,状态码: {response.status_code}") + print("返回内容:", response.text) + + return chunks + + +def test_chat_usage_stream(api_url): + """测试流式chat usage""" + payload = { + "model": "default", + "temperature": 0, + "top_p": 0, + "seed": 33, + "messages": [ + {"role": "system", "content": "You are a helpful assistant."}, + {"role": "user", "content": "牛顿的三大运动定律是什么?"}, + ], + "max_tokens": 50, + "stream": True, + "stream_options": {"include_usage": True, "continuous_usage_stats": True}, + "metadata": {"min_tokens": 10}, + } + + response = send_request(url=api_url, payload=payload) + chunks = get_stream_chunks(response) + result = "".join([x["choices"][0]["delta"]["content"] for x in chunks[:-1]]) + print("Decode Response:", result) + assert result != "", "结果为空" + usage = chunks[-1]["usage"] + total_tokens = usage["completion_tokens"] + usage["prompt_tokens"] + assert payload["max_tokens"] >= usage["completion_tokens"], "completion_tokens大于max_tokens" + assert payload["metadata"]["min_tokens"] <= usage["completion_tokens"], "completion_tokens小于min_tokens" + assert usage["total_tokens"] == total_tokens, "total_tokens不等于prompt_tokens + completion_tokens" + + +def test_chat_usage_non_stream(api_url): + """测试非流式chat usage""" + payload = { + "model": "default", + "temperature": 0, + "top_p": 0, + "seed": 33, + "messages": [ + {"role": "system", "content": "You are a helpful assistant."}, + {"role": "user", "content": "牛顿的三大运动定律是什么?"}, + ], + "max_tokens": 50, + "stream": False, + "metadata": {"min_tokens": 10}, + } + + response = send_request(url=api_url, payload=payload).json() + usage = response["usage"] + result = response["choices"][0]["message"]["content"] + assert result != "", "结果为空" + total_tokens = usage["completion_tokens"] + usage["prompt_tokens"] + assert payload["max_tokens"] >= usage["completion_tokens"], "completion_tokens大于max_tokens" + assert payload["metadata"]["min_tokens"] <= usage["completion_tokens"], "completion_tokens小于min_tokens" + assert usage["total_tokens"] == total_tokens, "total_tokens不等于prompt_tokens + completion_tokens" + + +def test_non_chat_usage_stream(api_url): + """测试流式非chat usage""" + payload = { + "model": "default", + "temperature": 0, + "top_p": 0, + "seed": 33, + "prompt": "牛顿的三大运动定律是什么?", + "max_tokens": 50, + "stream": True, + "stream_options": {"include_usage": True, "continuous_usage_stats": True}, + "metadata": {"min_tokens": 10}, + } + api_url = api_url.replace("chat/completions", "completions") + + response = send_request(url=api_url, payload=payload) + chunks = get_stream_chunks(response) + result = "".join([x["choices"][0]["text"] for x in chunks[:-1]]) + print("Decode Response:", result) + assert result != "", "结果为空" + usage = chunks[-1]["usage"] + total_tokens = usage["completion_tokens"] + usage["prompt_tokens"] + assert payload["max_tokens"] >= usage["completion_tokens"], "completion_tokens大于max_tokens" + assert payload["metadata"]["min_tokens"] <= usage["completion_tokens"], "completion_tokens小于min_tokens" + assert usage["total_tokens"] == total_tokens, "total_tokens不等于prompt_tokens + completion_tokens" + + +def test_non_chat_usage_non_stream(api_url): + """测试非流式非chat usage""" + payload = { + "model": "default", + "temperature": 0, + "top_p": 0, + "seed": 33, + "prompt": "牛顿的三大运动定律是什么?", + "max_tokens": 50, + "stream": False, + "metadata": {"min_tokens": 10}, + } + api_url = api_url.replace("chat/completions", "completions") + + response = send_request(url=api_url, payload=payload).json() + usage = response["usage"] + result = response["choices"][0]["text"] + print("Decode Response:", result) + assert result != "", "结果为空" + total_tokens = usage["completion_tokens"] + usage["prompt_tokens"] + assert payload["max_tokens"] >= usage["completion_tokens"], "completion_tokens大于max_tokens" + assert payload["metadata"]["min_tokens"] <= usage["completion_tokens"], "completion_tokens小于min_tokens" + assert usage["total_tokens"] == total_tokens, "total_tokens不等于prompt_tokens + completion_tokens" diff --git a/tests/e2e/utils/get_rdma_nics.sh b/tests/e2e/utils/get_rdma_nics.sh new file mode 100644 index 00000000000..4fc07a98c9a --- /dev/null +++ b/tests/e2e/utils/get_rdma_nics.sh @@ -0,0 +1,225 @@ +#!/bin/bash +Cur_Dir=$(cd `dirname $0`; pwd) +NICNAME_TYPE=xgbe # 默认检测类型 +type=$1 + +if [ "$ENABLE_EP_DP" == "1" ]; then + gpu_root_port_filename="${Cur_Dir}/gpu_rootport_${DP_RANK}.txt" +else + gpu_root_port_filename="${Cur_Dir}/gpu_rootport.txt" +fi + +function __NEW_GPU_ROOTPORT_FILE__() { + touch ${gpu_root_port_filename} 2>/dev/null + echo "" > ${gpu_root_port_filename} 2>/dev/null + for gpu_bus in $(lspci 2>/dev/null | grep -iE "Communication controller: | controller: NVIDIA" | awk '{print $1}') + do + readlink "/sys/bus/pci/devices/0000:${gpu_bus}" 2>/dev/null | awk -F [/] '{print $6}' >> ${gpu_root_port_filename} + done +} + +function __RM_GPU_ROOTPORT_FILE__() { + rm -rf ${gpu_root_port_filename} 2>/dev/null +} + +function __JUDGE_NIC_TYPE__() { + XGBE_NUM=$(ip a 2>/dev/null | grep -c ": ${NICNAME_TYPE}") + gpu_first=true + xpu_first=true + cpu_first=true + + for (( xgbe_no=0; xgbe_no < XGBE_NUM; xgbe_no++ )) + do + [ ! -d "/sys/class/net/${NICNAME_TYPE}${xgbe_no}" ] && continue + + PCI_ADDRESS=$(ethtool -i "${NICNAME_TYPE}${xgbe_no}" 2>/dev/null | awk -F '0000:' '/bus-info/{print $2}') + [ -z "$PCI_ADDRESS" ] && continue + NIC_ROOTPORT=$(readlink "/sys/bus/pci/devices/0000:${PCI_ADDRESS}" 2>/dev/null | awk -F '/' '{print $6}') + + NIC_TYPE="CPU_NIC" + grep -qxF "$NIC_ROOTPORT" ${gpu_root_port_filename} 2>/dev/null && NIC_TYPE="GPU_NIC" + + if [[ "$type" == "gpu" && "$NIC_TYPE" == "GPU_NIC" ]]; then + ibdev=$(ibdev2netdev 2>/dev/null | awk -v nic="${NICNAME_TYPE}${xgbe_no}" '$5 == nic {print $1}') + if [ -n "$ibdev" ] && ip link show "${NICNAME_TYPE}${xgbe_no}" | grep -q "state UP"; then + if $gpu_first; then + printf "KVCACHE_RDMA_NICS=%s" "$ibdev" + gpu_first=false + else + printf ",%s" "$ibdev" + fi + fi + fi + + if [[ "$type" == "xpu" && "$NIC_TYPE" == "GPU_NIC" ]]; then + ibdev=$(ibdev2netdev 2>/dev/null | awk -v nic="${NICNAME_TYPE}${xgbe_no}" '$5 == nic {print $1}') + if [ -n "$ibdev" ] && ip link show "${NICNAME_TYPE}${xgbe_no}" | grep -q "state UP"; then + if $xpu_first; then + printf "KVCACHE_RDMA_NICS=%s,%s" "$ibdev" "$ibdev" + xpu_first=false + else + printf ",%s,%s" "$ibdev" "$ibdev" + fi + fi + fi + + if [[ "$type" == "cpu" ]]; then + for (( xgbe_no=0; xgbe_no < XGBE_NUM; xgbe_no++ )) + do + [ ! -d "/sys/class/net/${NICNAME_TYPE}${xgbe_no}" ] && continue + + PCI_ADDRESS=$(ethtool -i "${NICNAME_TYPE}${xgbe_no}" 2>/dev/null | awk -F '0000:' '/bus-info/{print $2}') + [ -z "$PCI_ADDRESS" ] && continue + + NIC_ROOTPORT=$(readlink "/sys/bus/pci/devices/0000:${PCI_ADDRESS}" 2>/dev/null | awk -F '/' '{print $6}') + grep -qxF "$NIC_ROOTPORT" ${gpu_root_port_filename} 2>/dev/null && continue + + if ip link show "${NICNAME_TYPE}${xgbe_no}" | grep -q "state UP" && \ + ip a show "${NICNAME_TYPE}${xgbe_no}" | grep -q "inet"; then + printf "KV_CACHE_SOCKET_IFNAME=%s\n" "${NICNAME_TYPE}${xgbe_no}" + return 0 + fi + done + echo "ERROR: No active CPU NIC with IP found!" >&2 + return 1 + fi + + if [[ "$type" == "cpu_ib" && "$NIC_TYPE" == "CPU_NIC" ]]; then + ibdev=$(ibdev2netdev 2>/dev/null | awk -v nic="${NICNAME_TYPE}${xgbe_no}" '$5 == nic {print $1}') + if [ -n "$ibdev" ] && ip link show "${NICNAME_TYPE}${xgbe_no}" | grep -q "state UP" && \ + ip a show "${NICNAME_TYPE}${xgbe_no}" | grep -q "inet "; then + if $cpu_ib_first; then + printf "KVCACHE_RDMA_NICS=%s" "$ibdev" + cpu_ib_first=false + else + printf ",%s" "$ibdev" + fi + fi + fi + + done + + case "$type" in + gpu) ! $gpu_first && printf "\n" ;; + xpu) ! $xpu_first && printf "\n" ;; + cpu) ! $cpu_first && printf "\n" ;; + cpu_ib) ! $cpu_ib_first && printf "\n" ;; + esac +} + +function get_vxpu_nics() { + local topo_output=$(xpu-smi topo -m) + local xpu_info=$(echo "$topo_output" | grep -E '^XPU[0-9]+') + + local nic_mapping=() + while IFS= read -r line; do + if [[ $line =~ NIC([0-9]+):\ +(mlx[0-9_]+) ]]; then + local nic_idx=${BASH_REMATCH[1]} + local nic_name=${BASH_REMATCH[2]} + nic_mapping[$nic_idx]=$nic_name + fi + done < <(echo "$topo_output" | grep -E '^\s*NIC[0-9]+:') + + local nic_count=${#nic_mapping[@]} + + declare -A priority_map=([PIX]=2 [NODE]=1 [SYS]=0) + local optimal_nics=() + + while IFS= read -r line; do + local fields=($line) + local nic_start_index=5 + local max_nics=$(( ${#fields[@]} - nic_start_index )) + local actual_nic_count=$(( max_nics < nic_count ? max_nics : nic_count )) + + local best_priority=-1 + local best_nic="" + + for ((nic_idx=0; nic_idx best_priority )); then + best_priority=$current_priority + best_nic="${nic_mapping[$nic_idx]}" + fi + done + + if [[ -n "$best_nic" ]]; then + optimal_nics+=("$best_nic") + fi + done <<< "$xpu_info" + + local IFS=, + export KVCACHE_RDMA_NICS="${optimal_nics[*]}" + echo "KVCACHE_RDMA_NICS=${optimal_nics[*]}" +} + +function get_vcpu_nics() { + ip -o addr show | awk '$3 == "inet" && $4 ~ /^10\./ {print "KV_CACHE_SOCKET_IFNAME="$2; exit}' +} + +function __main__() { + if [[ "$type" == "vxpu" ]]; then + get_vxpu_nics + return 0 + fi + if [[ "$type" == "vcpu" ]]; then + get_vcpu_nics + return 0 + fi + + # 处理 bond 情况 + if [[ "$type" == "cpu" ]]; then + for bond in $(ls -d /sys/class/net/bond* 2>/dev/null); do + bond_if=$(basename "$bond") + if ip link show "$bond_if" | grep -q "state UP" && \ + ip a show "$bond_if" | grep -q "inet "; then + printf "KV_CACHE_SOCKET_IFNAME=%s\n" "$bond_if" + return 0 + fi + done + fi + + if [[ "$type" == "cpu_ib" ]]; then + first=true + for bond in $(ls -d /sys/class/net/bond* 2>/dev/null); do + bond_if=$(basename "$bond") + __NEW_GPU_ROOTPORT_FILE__ + + ibdev=$(ibdev2netdev 2>/dev/null | grep -w "$bond_if" | awk '{print $1}') + if [ -n "$ibdev" ] && ip link show "$bond_if" | grep -q "state UP" && \ + ip a show "$bond_if" | grep -q "inet "; then + if $first; then + printf "KVCACHE_RDMA_NICS=%s" "$ibdev" + first=false + else + printf ",%s" "$ibdev" + fi + fi + + bondib=$(show_gids 2>/dev/null | grep -w "$bond_if" | awk '{print $1}' | grep "mlx.*bond" | head -1) + if [ -n "$bondib" ] && ip link show "$bond_if" | grep -q "state UP" && \ + ip a show "$bond_if" | grep -q "inet " && $first; then + printf "KVCACHE_RDMA_NICS=%s" "$bondib" + first=false + fi + + __RM_GPU_ROOTPORT_FILE__ + done + + ! $first && printf "\n" + [ ! $first ] && return 0 + fi + + local nic_types=("eth" "ib" "xgbe") + for nt in "${nic_types[@]}"; do + if ip a | grep -iq "$nt"; then + __NEW_GPU_ROOTPORT_FILE__ + NICNAME_TYPE=$nt + __JUDGE_NIC_TYPE__ + __RM_GPU_ROOTPORT_FILE__ + fi + done +} + +__main__