Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[build-system]
requires = ["setuptools>=45", "wheel", "cmake"]
build-backend = "setuptools.build_meta"
requires = ["setuptools>=45", "wheel", "cmake", "torch", "pybind11"]
build-backend = "setuptools.build_meta"
31 changes: 30 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@
import shutil
import subprocess
import sys
import sysconfig

import pybind11
import torch
import torch.utils.cpp_extension
from setuptools import Extension, find_packages, setup
from setuptools.command.build_ext import build_ext
from setuptools.command.develop import develop
Expand All @@ -36,9 +40,11 @@
GSA_SRC_DIR = os.path.join(ROOT_DIR, "ucm", "csrc", "gsaoffloadops")
PREFETCH_SRC_DIR = os.path.join(ROOT_DIR, "ucm", "csrc", "ucmprefetch")
RETRIEVAL_SRC_DIR = os.path.join(ROOT_DIR, "ucm", "csrc", "esaretrieval")
KVSTAR_RETRIEVAL_SRC_DIR = os.path.join(ROOT_DIR, "ucm", "csrc", "kvstar_retrieve")

GSA_INSTALL_DIR = os.path.join(ROOT_DIR, "ucm", "ucm_sparse")
RETRIEVAL_INSTALL_DIR = os.path.join(ROOT_DIR, "ucm", "ucm_sparse", "retrieval")
KVSTAR_RETRIEVAL_INSTALL_DIR = os.path.join(ROOT_DIR, "ucm", "ucm_sparse")

PLATFORM = os.getenv("PLATFORM")

Expand Down Expand Up @@ -73,6 +79,20 @@ def build_cmake(self, ext: CMakeExtension):
f"-DPYTHON3_EXECUTABLE={sys.executable}",
]

torch_cmake_prefix = torch.utils.cmake_prefix_path
pybind11_cmake_dir = pybind11.get_cmake_dir()

cmake_prefix_paths = [torch_cmake_prefix, pybind11_cmake_dir]
cmake_args.append(f"-DCMAKE_PREFIX_PATH={';'.join(cmake_prefix_paths)}")

torch_includes = torch.utils.cpp_extension.include_paths()
python_include = sysconfig.get_path("include")
pybind11_include = pybind11.get_include()

all_includes = torch_includes + [python_include, pybind11_include]
cmake_include_string = ";".join(all_includes)
cmake_args.append(f"-DEXTERNAL_INCLUDE_DIRS={cmake_include_string}")

if _is_cuda():
cmake_args.append("-DRUNTIME_ENVIRONMENT=cuda")
elif _is_npu():
Expand All @@ -88,10 +108,11 @@ def build_cmake(self, ext: CMakeExtension):
print(f"[INFO] Building {ext.name} module with CMake")
print(f"[INFO] Source directory: {ext.sourcedir}")
print(f"[INFO] Build directory: {build_dir}")
print(f"[INFO] CMake command: {' '.join(cmake_args)}")

subprocess.check_call(cmake_args, cwd=build_dir)

if ext.name in ["store", "gsa_offload_ops", "esaretrieval"]:
if ext.name in ["store", "gsa_offload_ops", "esaretrieval", "kvstar_retrieve"]:
subprocess.check_call(["make", "-j", "8"], cwd=build_dir)
else:
# 对于gsa_prefetch使用cmake --build
Expand All @@ -117,6 +138,8 @@ def _copy_so_files(self, ext: CMakeExtension):
search_patterns.extend(["prefetch"])
elif ext.name == "esaretrieval":
search_patterns.extend(["retrieval_backend"])
elif ext.name == "kvstar_retrieve":
search_patterns.extend(["kvstar_retrieve"])

for file in os.listdir(so_search_dir):
if file.endswith(".so") or ".so." in file:
Expand All @@ -128,6 +151,9 @@ def _copy_so_files(self, ext: CMakeExtension):
if ext.name == "esaretrieval":
install_dir = RETRIEVAL_INSTALL_DIR
build_install_dir = "ucm/ucm_sparse/retrieval"
elif ext.name == "kvstar_retrieve":
install_dir = KVSTAR_RETRIEVAL_INSTALL_DIR
build_install_dir = "ucm/ucm_sparse/kvstar_retrieve"
else:
install_dir = GSA_INSTALL_DIR
build_install_dir = "ucm/ucm_sparse"
Expand All @@ -151,6 +177,9 @@ def _copy_so_files(self, ext: CMakeExtension):
ext_modules.append(CMakeExtension(name="gsa_offload_ops", sourcedir=GSA_SRC_DIR))
ext_modules.append(CMakeExtension(name="gsa_prefetch", sourcedir=PREFETCH_SRC_DIR))
ext_modules.append(CMakeExtension(name="esaretrieval", sourcedir=RETRIEVAL_SRC_DIR))
ext_modules.append(
CMakeExtension(name="kvstar_retrieve", sourcedir=KVSTAR_RETRIEVAL_SRC_DIR)
)

setup(
name="ucm",
Expand Down
111 changes: 111 additions & 0 deletions ucm/csrc/kvstar_retrieve/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
cmake_minimum_required(VERSION 3.18)
project(kvstar_retrieve_clib)

include(FetchContent)

set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fPIC -D_GLIBCXX_USE_CXX11_ABI=0")

if(CMAKE_SYSTEM_PROCESSOR MATCHES "x86_64|i686|i386|AMD64")
include(CheckCXXCompilerFlag)

check_cxx_compiler_flag("-mf16c" COMPILER_SUPPORTS_F16C)
check_cxx_compiler_flag("-mavx2" COMPILER_SUPPORTS_AVX2)

if(COMPILER_SUPPORTS_F16C AND COMPILER_SUPPORTS_AVX2)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -mf16c -mavx2")
message(STATUS "F16C and AVX2 instruction sets enabled")

add_definitions(-DUSE_F16C=1)
else()
message(STATUS "Compiler does not support F16C/AVX2, using software implementation")
add_definitions(-DUSE_F16C=0)
endif()
else()
message(STATUS "Non-x86 architecture, F16C not applicable")
add_definitions(-DUSE_F16C=0)
endif()

FetchContent_Declare(
fmt
GIT_REPOSITORY https://github.com/fmtlib/fmt.git
GIT_TAG 10.2.1
)

set(FMT_TEST OFF CACHE BOOL "Disable testing for fmt")
FetchContent_MakeAvailable(fmt)

FetchContent_Declare(
spdlog
GIT_REPOSITORY https://github.com/gabime/spdlog.git
GIT_TAG v1.14.1
)

set(SPDLOG_BUILD_TESTS OFF CACHE BOOL "Disable testing for spdlog")

set(SPDLOG_USE_EXTERNAL_FMT ON CACHE BOOL "Use external fmt library")
FetchContent_MakeAvailable(spdlog)

find_package(Torch REQUIRED)
include_directories(${TORCH_INCLUDE_DIRS})
find_package(Python REQUIRED COMPONENTS Interpreter Development)
include_directories(${Python_INCLUDE_DIRS})
find_package(pybind11 REQUIRED)

set(NUMA_INSTALL_DIR ${CMAKE_CURRENT_BINARY_DIR}/numa_install)

FetchContent_Declare(
numactl
URL https://github.com/numactl/numactl/releases/download/v2.0.16/numactl-2.0.16.tar.gz
TLS_VERIFY OFF
)

FetchContent_MakeAvailable(numactl)

if(NOT EXISTS "${NUMA_INSTALL_DIR}/lib/libnuma.so")
message(STATUS "Configuring numactl...")
execute_process(
COMMAND ./configure --prefix=${NUMA_INSTALL_DIR}
WORKING_DIRECTORY ${numactl_SOURCE_DIR}
RESULT_VARIABLE numa_configure_result
OUTPUT_VARIABLE numa_configure_output
ERROR_VARIABLE numa_configure_error
)
if(NOT numa_configure_result EQUAL 0)
message(FATAL_ERROR "Failed to configure numactl. \n"
"Result: ${numa_configure_result}\n"
"STDOUT: ${numa_configure_output}\n"
"STDERR: ${numa_configure_error}\n")
endif()

message(STATUS "Building and installing numactl...")
execute_process(
COMMAND make install -j8
WORKING_DIRECTORY ${numactl_SOURCE_DIR}
RESULT_VARIABLE numa_install_result
OUTPUT_VARIABLE numa_install_output
ERROR_VARIABLE numa_install_error
)
if(NOT numa_install_result EQUAL 0)
message(FATAL_ERROR "Failed to build and install numactl. \n"
"Result: ${numa_install_result}\n"
"STDOUT: ${numa_install_output}\n"
"STDERR: ${numa_install_error}\n")
endif()
else()
message(STATUS "Found already built libnuma. Skipping build.")
endif()

add_subdirectory(core)
add_subdirectory(py_intf)

#
set(OUTPUT_ROOT ${CMAKE_CURRENT_SOURCE_DIR}/output)
set_target_properties(kvstar_retrieve PROPERTIES
PREFIX ""
SUFFIX ".so"
LIBRARY_OUTPUT_DIRECTORY "${OUTPUT_ROOT}/lib"
RUNTIME_OUTPUT_DIRECTORY "${OUTPUT_ROOT}/bin"
ARCHIVE_OUTPUT_DIRECTORY "${OUTPUT_ROOT}/lib"
)
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,15 @@ file(GLOB_RECURSE CORE_SRC_FILES "*.cpp" "api/*.cpp" "api/**/*.cpp" "domain/*.cp

add_library(kvstar_retrieve.core STATIC ${CORE_SRC_FILES})

# --- 头文件路径 ---
# 不再使用${Torch_INCLUDE_DIRS}
# 直接使用从 setup.py 注入的、包含了所有路径的 EXTERNAL_INCLUDE_DIRS 变量。
# 这保证了编译器在编译时能看到正确的 -I/path/to/include 参数。
target_include_directories(kvstar_retrieve.core PUBLIC
"." "api" "domain" "infra"
${EXTERNAL_INCLUDE_DIRS}
${NUMA_INSTALL_DIR}/include
)

# --- 链接库 ---
target_link_libraries(kvstar_retrieve.core PUBLIC
spdlog::spdlog
fmt::fmt
numa
${NUMA_INSTALL_DIR}/lib/libnuma.so
${Torch_LIBRARIES}
)
)
Original file line number Diff line number Diff line change
Expand Up @@ -12,25 +12,19 @@ SetupParam::SetupParam(const std::vector<int>& cpuNumaIds, const int physicalCor
: cpuNumaIds{cpuNumaIds}, physicalCorePerNuma{physicalCorePerNuma}, allocRatio{allocRatio}, blkRepreSize{blkRepreSize}, deviceType{deviceType},
totalTpSize{totalTpSize}, localRankId{localRankId}
{
// 根据一些设置, 确定线程池的线程数量和对应绑定的核心id

int coreNumPerNumaAlloc = static_cast<int>(this->physicalCorePerNuma * this->allocRatio);

// 清理并准备perNumaCoreIds容器
this->perNumaCoreIds.clear();
this->perNumaCoreIds.reserve(this->cpuNumaIds.size());

// 遍历所有分配给该rank的NUMA节点ID
for (const int numaId : this->cpuNumaIds) {
// 计算该NUMA节点的起始核心ID
int startCoreId = numaId * this->physicalCorePerNuma;

// 创建一个代表该NUMA节点将要使用的核心ID列表
std::vector<int> curNumaCoreIdAlloc(coreNumPerNumaAlloc);

// 生成核心ID序列, 例如 startCoreId=32, coreNumPerNumaAlloc=24 -> 生成 [32, 33, ..., 55]
std::iota(curNumaCoreIdAlloc.begin(), curNumaCoreIdAlloc.end(), startCoreId);

// 将这个列表添加到最终结果中
this->perNumaCoreIds.push_back(curNumaCoreIdAlloc);

KVSTAR_DEBUG("Alloc core ids {} in numa {}.", curNumaCoreIdAlloc, numaId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@

namespace KVStar {

// vLLM每个TP域(Worker进程), 各自有一个检索CLIB实例
struct SetupParam {
std::vector<int> cpuNumaIds; // 该tp rank能接管的numa id, 例如NUMA_NUM = 8, TP = 2, 那一个rank能分到4个NUMA节点
std::vector<int> cpuNumaIds;
int physicalCorePerNuma;
float allocRatio;
size_t blkRepreSize;
Expand All @@ -22,7 +21,6 @@ struct SetupParam {
int localRankId;
std::vector<std::vector<int>> perNumaCoreIds;
int threadNum;
// TODO: 按需设置检索引擎的配置项

SetupParam(const std::vector<int>& cpuNumaIds, const int physicalCorePerNuma, const float allocRatio, const size_t blkRepreSize,
const DeviceType deviceType, const int totalTpSize, const int localRankId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

namespace KVStar {

// 纯C++的Tensor元数据结构, 不含任何Python对象
struct PlainTensor {
void* data = nullptr;
std::vector<int64_t> shape;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,16 @@ enum DeviceType {
};

struct RetrieveTask {
// 数据部分纯C++
PlainTensor queryGroup;
PlainTensor blkRepre;
std::optional<PlainTensor> dPrunedIndex;

// --- 任务元数据和同步机制 ---
int topK;
int reqId;
DeviceType deviceType;
size_t allocTaskId;
std::shared_ptr<RetrieveTaskWaiter> waiter; //Waiter机制核心
std::shared_ptr<RetrieveTaskWaiter> waiter;

// 构造函数
RetrieveTask(
PlainTensor qGroup, PlainTensor bRepre, std::optional<PlainTensor> pIndex,
int tK, int rId, DeviceType devType
Expand All @@ -40,9 +37,8 @@ struct RetrieveTask {
topK(tK),
reqId(rId),
deviceType(devType),
allocTaskId(0) {} // taskId 在 Manager 中分配
allocTaskId(0) {}

// 默认构造、移动构造
RetrieveTask() = default;
RetrieveTask(RetrieveTask&& other) noexcept = default;
RetrieveTask& operator=(RetrieveTask&& other) noexcept = default;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

namespace KVStar {
Status RetrieveTaskManager::Setup(const size_t threadNum, const std::vector<int>& cpuNumaIds, const std::vector<std::vector<int>>& bindCoreId) {
// 一些必要检测

const size_t numaNodeCount = cpuNumaIds.size();
if (numaNodeCount == 0) {
KVSTAR_ERROR("Retrieve task manager get error numa id info {}.", cpuNumaIds);
Expand All @@ -24,23 +24,19 @@ Status RetrieveTaskManager::Setup(const size_t threadNum, const std::vector<int>

this->_queues.reserve(threadNum);
for (size_t i = 0; i < threadNum; ++i) {
// 1. 计算当前线程 i 应该落在哪个 NUMA 节点的索引上
const size_t numaListIndex = i / threadsPerNuma;

// 2. 计算当前线程 i 在其 NUMA 节点内部的核心列表中的索引
const size_t coreListIndex = i % threadsPerNuma;

// 安全检查:确保 bindCoreId 内部的向量大小是足够的
if (coreListIndex >= bindCoreId[numaListIndex].size()) {
KVSTAR_ERROR("Bind core ids {} can not alloc per numa need alloc threads num {}.", bindCoreId, threadsPerNuma);
return Status::InvalidParam();
}

// 3. 提取最终的 numaId 和 coreId
const int targetNumaId = cpuNumaIds[numaListIndex];
const int targetCoreId = bindCoreId[numaListIndex][coreListIndex];

auto& queue = this->_queues.emplace_back(std::make_unique<RetrieveTaskQueue>()); // emplace_back 会返回一个指向刚刚被创建的那个新元素的引用
auto& queue = this->_queues.emplace_back(std::make_unique<RetrieveTaskQueue>());
auto status = queue->Setup(targetNumaId, targetCoreId, &this->_failureSet);
if (status.Failure()) {
KVSTAR_ERROR("Init and setup thread id {} in pool failed.", i);
Expand All @@ -51,7 +47,6 @@ Status RetrieveTaskManager::Setup(const size_t threadNum, const std::vector<int>
return Status::OK();
}

// 提交单个task无需dispatch
Status RetrieveTaskManager::SubmitSingleTask(RetrieveTask&& task, size_t &taskId)
{
std::unique_lock<std::mutex> lk(this->_mutex);
Expand All @@ -60,20 +55,17 @@ Status RetrieveTaskManager::SubmitSingleTask(RetrieveTask&& task, size_t &taskId
auto [waiter_iter, success1] = this->_waiters.emplace(taskId, std::make_shared<RetrieveTaskWaiter>(taskId, 1));
if (!success1) { return Status::OutOfMemory(); }

// 2. 创建并存储 Result 容器
auto resultPtr = std::make_shared<TaskResult>();
auto [result_iter, success2] = this->_resultMap.emplace(taskId, resultPtr);
if (!success2) {
this->_waiters.erase(waiter_iter); // 如果失败,回滚
this->_waiters.erase(waiter_iter);
return Status::OutOfMemory();
}

// 3. 将 waiter 关联到 task
task.allocTaskId = taskId;
task.waiter = waiter_iter->second;
KVSTAR_DEBUG("Set task id to retrieve task waiter success.");

// 4. 将 task 和 resultPtr 打包成 WorkItem 推入队列
this->_queues[this->_lastTimeScheduledQueueIdx]->Push({std::move(task), resultPtr});

KVSTAR_DEBUG("Push task and set task scheduled queue idx success, queue idx: {}.", this->_lastTimeScheduledQueueIdx);
Expand Down Expand Up @@ -103,7 +95,6 @@ Status RetrieveTaskManager::Wait(const size_t taskId) {
return failure ? Status::Error() : Status::OK();
}

// GetResult 的实现
Status RetrieveTaskManager::GetResult(size_t taskId, std::shared_ptr<TaskResult>& result) {
std::unique_lock<std::mutex> lk(this->_mutex);
auto it = _resultMap.find(taskId);
Expand Down
Loading