diff --git a/ucm/integration/vllm/uc_connector.py b/ucm/integration/vllm/uc_connector.py index b6657e90..b4710993 100644 --- a/ucm/integration/vllm/uc_connector.py +++ b/ucm/integration/vllm/uc_connector.py @@ -139,13 +139,11 @@ def __init__(self, vllm_config: "VllmConfig", role: KVConnectorRole): * self.num_layers * (1 if self.is_mla else self.num_head * self.total_tp_size * 2) ) - config["transferIoSize"] = config_base * ( - 1 if self.is_mla else self.num_head - ) + config["io_size"] = config_base * (1 if self.is_mla else self.num_head) logger.info( - "kv_block_size = %d, transferIoSize = %d,", + "kv_block_size = %d, io_size = %d,", config["kv_block_size"], - config["transferIoSize"], + config["io_size"], ) logger.info("init UCConnectorImpl, connector: %s", name) self.connector = UcmConnectorFactory.create_connector(name, config) diff --git a/ucm/store/connector/nfsstore_connector.py b/ucm/store/connector/nfsstore_connector.py index e89a17f4..7b4cab73 100644 --- a/ucm/store/connector/nfsstore_connector.py +++ b/ucm/store/connector/nfsstore_connector.py @@ -39,30 +39,31 @@ class NfsTask(Task): class UcmNfsStore(UcmKVStoreBase): def __init__(self, config: Dict): super().__init__(config) + self.store = ucmnfsstore.NFSStore() storage_backends = [ path for path in config["storage_backends"].split(":") if path ] block_size = int(config["kv_block_size"]) transfer_enable = True if config["role"] == "worker" else False - param = ucmnfsstore.NfsStore.Config( + param = ucmnfsstore.NFSStore.Config( storage_backends, block_size, transfer_enable ) if transfer_enable: param.transferDeviceId = config["device"] - param.transferIoSize = config["transferIoSize"] - ret = ucmnfsstore.Setup(param) + param.transferIoSize = config["io_size"] + ret = self.store.Setup(param) if ret != 0: msg = f"Failed to initialize ucmnfsstore, errcode: {ret}." raise RuntimeError(msg) def cc_store(self) -> int: - return ucmnfsstore.CCStoreImpl() + return self.store.CCStoreImpl() def create(self, block_ids: List[str]) -> List[int]: - return ucmnfsstore.AllocBatch(block_ids) + return self.store.AllocBatch(block_ids) def lookup(self, block_ids: List[str]) -> List[bool]: - return ucmnfsstore.LookupBatch(block_ids) + return self.store.LookupBatch(block_ids) def prefetch(self, block_ids: List[str]) -> None: pass @@ -72,7 +73,7 @@ def load( ) -> Task: dst_tensor_ptr = [t.data_ptr() for t in dst_tensor] dst_tensor_size = [t.numel() * t.element_size() for t in dst_tensor] - task_id = ucmnfsstore.LoadToDevice( + task_id = self.store.LoadToDevice( block_ids, offset, dst_tensor_ptr, dst_tensor_size ) return NfsTask(task_id=task_id) @@ -82,16 +83,16 @@ def dump( ) -> Task: src_tensor_ptr = [t.data_ptr() for t in src_tensor] src_tensor_size = [t.numel() * t.element_size() for t in src_tensor] - task_id = ucmnfsstore.DumpFromDevice( + task_id = self.store.DumpFromDevice( block_ids, offset, src_tensor_ptr, src_tensor_size ) return NfsTask(task_id=task_id) def wait(self, task: Task) -> int: - return ucmnfsstore.Wait(task.task_id) + return self.store.Wait(task.task_id) def commit(self, block_ids: List[str], is_success: bool = True) -> None: - ucmnfsstore.CommitBatch(block_ids, is_success) + self.store.CommitBatch(block_ids, is_success) def check(self, task: Task) -> Tuple[int, bool]: - return ucmnfsstore.Check(task.task_id) + return self.store.Check(task.task_id) diff --git a/ucm/store/dramstore/cc/api/dramstore.cc b/ucm/store/dramstore/cc/api/dramstore.cc index ff4dd0b3..56b4350f 100644 --- a/ucm/store/dramstore/cc/api/dramstore.cc +++ b/ucm/store/dramstore/cc/api/dramstore.cc @@ -27,7 +27,7 @@ namespace UC { -class DramStoreImpl : public DramStore { +class DRAMStoreImpl : public DRAMStore { public: int32_t Setup(const size_t ioSize, const size_t capacity, const int32_t deviceId) { return -1; } int32_t Alloc(const std::string& block) override { return -1; } @@ -47,9 +47,9 @@ class DramStoreImpl : public DramStore { int32_t Check(const size_t task, bool& finish) override { return -1; } }; -int32_t DramStore::Setup(const Config& config) +int32_t DRAMStore::Setup(const Config& config) { - auto impl = new (std::nothrow) DramStoreImpl(); + auto impl = new (std::nothrow) DRAMStoreImpl(); if (!impl) { UC_ERROR("Out of memory."); return Status::OutOfMemory().Underlying(); diff --git a/ucm/store/dramstore/cc/api/dramstore.h b/ucm/store/dramstore/cc/api/dramstore.h index 291ec2aa..1dc97573 100644 --- a/ucm/store/dramstore/cc/api/dramstore.h +++ b/ucm/store/dramstore/cc/api/dramstore.h @@ -28,7 +28,7 @@ namespace UC { -class DramStore : public CCStore { +class DRAMStore : public CCStore { public: struct Config { size_t ioSize; @@ -41,8 +41,8 @@ class DramStore : public CCStore { }; public: - DramStore() : impl_{nullptr} {} - ~DramStore() override + DRAMStore() : impl_{nullptr} {} + ~DRAMStore() override { if (this->impl_) { delete this->impl_; } } @@ -73,7 +73,7 @@ class DramStore : public CCStore { } private: - DramStore* impl_; + DRAMStore* impl_; }; } // namespace UC diff --git a/ucm/store/dramstore/cpy/dramstore.py.cc b/ucm/store/dramstore/cpy/dramstore.py.cc index 7ce9be74..18dff5d0 100644 --- a/ucm/store/dramstore/cpy/dramstore.py.cc +++ b/ucm/store/dramstore/cpy/dramstore.py.cc @@ -23,80 +23,72 @@ * */ #include "api/dramstore.h" #include -#include "template/singleton.h" namespace py = pybind11; -using StoreImpl = UC::DramStore; namespace UC { -inline void* CCStoreImpl() { return Singleton::Instance(); } -inline int32_t Setup(const StoreImpl::Config& config) -{ - return ((StoreImpl*)CCStoreImpl())->Setup(config); -} -inline int32_t Alloc(const std::string& block) { return ((StoreImpl*)CCStoreImpl())->Alloc(block); } -inline bool Lookup(const std::string& block) { return ((StoreImpl*)CCStoreImpl())->Lookup(block); } -inline void Commit(const std::string& block, const bool success) -{ - return ((StoreImpl*)CCStoreImpl())->Commit(block, success); -} -inline py::list AllocBatch(const py::list& blocks) -{ - py::list results; - for (auto& block : blocks) { results.append(Alloc(block.cast())); } - return results; -} -inline py::list LookupBatch(const py::list& blocks) -{ - py::list founds; - for (auto& block : blocks) { founds.append(Lookup(block.cast())); } - return founds; -} -inline void CommitBatch(const py::list& blocks, const bool success) -{ - for (auto& block : blocks) { Commit(block.cast(), success); } -} -inline int32_t Wait(const size_t task) { return ((StoreImpl*)CCStoreImpl())->Wait(task); } -inline py::tuple Check(const size_t task) -{ - auto finish = false; - auto ret = ((StoreImpl*)CCStoreImpl())->Check(task, finish); - return py::make_tuple(ret, finish); -} -size_t Submit(const py::list& blockIds, const py::list& offsets, const py::list& addresses, - const py::list& lengths, const CCStore::Task::Type type, - const CCStore::Task::Location location, const std::string& brief) -{ - CCStore::Task task{type, location, brief}; - auto blockId = blockIds.begin(); - auto offset = offsets.begin(); - auto address = addresses.begin(); - auto length = lengths.begin(); - while ((blockId != blockIds.end()) && (offset != offsets.end()) && - (address != addresses.end()) && (length != lengths.end())) { - auto ret = task.Append(blockId->cast(), offset->cast(), - address->cast(), length->cast()); - if (ret != 0) { return CCStore::invalidTaskId; } - blockId++; - offset++; - address++; - length++; +class DRAMStorePy : public DRAMStore { +public: + void* CCStoreImpl() { return this; } + py::list AllocBatch(const py::list& blocks) + { + py::list results; + for (auto& block : blocks) { results.append(this->Alloc(block.cast())); } + return results; } - return ((StoreImpl*)CCStoreImpl())->Submit(std::move(task)); -} -inline size_t Load(const py::list& blockIds, const py::list& offsets, const py::list& addresses, - const py::list& lengths) -{ - return Submit(blockIds, offsets, addresses, lengths, CCStore::Task::Type::LOAD, - CCStore::Task::Location::DEVICE, "Dram::H2D"); -} -inline size_t Dump(const py::list& blockIds, const py::list& offsets, const py::list& addresses, - const py::list& lengths) -{ - return Submit(blockIds, offsets, addresses, lengths, CCStore::Task::Type::DUMP, - CCStore::Task::Location::DEVICE, "Dram::D2H"); -} + py::list LookupBatch(const py::list& blocks) + { + py::list founds; + for (auto& block : blocks) { founds.append(this->Lookup(block.cast())); } + return founds; + } + void CommitBatch(const py::list& blocks, const bool success) + { + for (auto& block : blocks) { this->Commit(block.cast(), success); } + } + py::tuple CheckPy(const size_t task) + { + auto finish = false; + auto ret = this->Check(task, finish); + return py::make_tuple(ret, finish); + } + size_t Load(const py::list& blockIds, const py::list& offsets, const py::list& addresses, + const py::list& lengths) + { + return this->SubmitPy(blockIds, offsets, addresses, lengths, CCStore::Task::Type::LOAD, + CCStore::Task::Location::DEVICE, "DRAM::H2D"); + } + size_t Dump(const py::list& blockIds, const py::list& offsets, const py::list& addresses, + const py::list& lengths) + { + return this->SubmitPy(blockIds, offsets, addresses, lengths, CCStore::Task::Type::DUMP, + CCStore::Task::Location::DEVICE, "DRAM::D2H"); + } + +private: + size_t SubmitPy(const py::list& blockIds, const py::list& offsets, const py::list& addresses, + const py::list& lengths, const CCStore::Task::Type type, + const CCStore::Task::Location location, const std::string& brief) + { + CCStore::Task task{type, location, brief}; + auto blockId = blockIds.begin(); + auto offset = offsets.begin(); + auto address = addresses.begin(); + auto length = lengths.begin(); + while ((blockId != blockIds.end()) && (offset != offsets.end()) && + (address != addresses.end()) && (length != lengths.end())) { + auto ret = task.Append(blockId->cast(), offset->cast(), + address->cast(), length->cast()); + if (ret != 0) { return CCStore::invalidTaskId; } + blockId++; + offset++; + address++; + length++; + } + return this->Submit(std::move(task)); + } +}; } // namespace UC @@ -106,22 +98,24 @@ PYBIND11_MODULE(ucmdramstore, module) module.attr("version") = UC_VAR_PROJECT_VERSION; module.attr("commit_id") = UC_VAR_GIT_COMMIT_ID; module.attr("build_type") = UC_VAR_BUILD_TYPE; - auto store = module.def_submodule("DramStore"); - auto config = py::class_(store, "Config"); + auto store = py::class_(module, "DRAMStore"); + auto config = py::class_(store, "Config"); config.def(py::init(), py::arg("ioSize"), py::arg("capacity")); - config.def_readwrite("ioSize", &StoreImpl::Config::ioSize); - config.def_readwrite("capacity", &StoreImpl::Config::capacity); - config.def_readwrite("deviceId", &StoreImpl::Config::deviceId); - module.def("CCStoreImpl", &UC::CCStoreImpl); - module.def("Setup", &UC::Setup); - module.def("Alloc", &UC::Alloc); - module.def("AllocBatch", &UC::AllocBatch); - module.def("Lookup", &UC::Lookup); - module.def("LookupBatch", &UC::LookupBatch); - module.def("Load", &UC::Load); - module.def("Dump", &UC::Dump); - module.def("Wait", &UC::Wait); - module.def("Check", &UC::Check); - module.def("Commit", &UC::Commit); - module.def("CommitBatch", &UC::CommitBatch); + config.def_readwrite("ioSize", &UC::DRAMStorePy::Config::ioSize); + config.def_readwrite("capacity", &UC::DRAMStorePy::Config::capacity); + config.def_readwrite("deviceId", &UC::DRAMStorePy::Config::deviceId); + store.def(py::init<>()); + store.def("CCStoreImpl", &UC::DRAMStorePy::CCStoreImpl); + store.def("Setup", &UC::DRAMStorePy::Setup); + store.def("Alloc", py::overload_cast(&UC::DRAMStorePy::Alloc)); + store.def("AllocBatch", &UC::DRAMStorePy::AllocBatch); + store.def("Lookup", py::overload_cast(&UC::DRAMStorePy::Lookup)); + store.def("LookupBatch", &UC::DRAMStorePy::LookupBatch); + store.def("Load", &UC::DRAMStorePy::Load); + store.def("Dump", &UC::DRAMStorePy::Dump); + store.def("Wait", &UC::DRAMStorePy::Wait); + store.def("Check", &UC::DRAMStorePy::Check); + store.def("Commit", + py::overload_cast(&UC::DRAMStorePy::Commit)); + store.def("CommitBatch", &UC::DRAMStorePy::CommitBatch); } diff --git a/ucm/store/infra/logger/flux/flux_logger.cc b/ucm/store/infra/logger/flux/flux_logger.cc index 056b115e..067d5516 100644 --- a/ucm/store/infra/logger/flux/flux_logger.cc +++ b/ucm/store/infra/logger/flux/flux_logger.cc @@ -108,8 +108,8 @@ class FluxLogger : public ILogger { void Log(Level&& lv, SourceLocation&& loc, std::string&& msg) override { using namespace std::chrono; - static const size_t pid = static_cast(getpid()); - static thread_local const size_t tid = syscall(SYS_gettid); + static const auto pid = getpid(); + static thread_local const auto tid = syscall(SYS_gettid); static thread_local seconds lastSec{0}; static thread_local char datetime[32]; auto systemNow = system_clock::now(); diff --git a/ucm/store/localstore/cpy/localstore.py.cc b/ucm/store/localstore/cpy/localstore.py.cc index 03dde8b0..5c4514ab 100644 --- a/ucm/store/localstore/cpy/localstore.py.cc +++ b/ucm/store/localstore/cpy/localstore.py.cc @@ -23,106 +23,99 @@ * */ #include "localstore.h" #include -#include "template/singleton.h" namespace py = pybind11; -using StoreImpl = UC::LocalStore; namespace UC { -inline void* CCStoreImpl() { return Singleton::Instance(); } -inline int32_t Setup(const StoreImpl::Config& config) -{ - return ((StoreImpl*)CCStoreImpl())->Setup(config); -} -inline int32_t Alloc(const std::string& block) { return ((StoreImpl*)CCStoreImpl())->Alloc(block); } -inline bool Lookup(const std::string& block) { return ((StoreImpl*)CCStoreImpl())->Lookup(block); } -inline void Commit(const std::string& block, const bool success) -{ - return ((StoreImpl*)CCStoreImpl())->Commit(block, success); -} -inline py::list AllocBatch(const py::list& blocks) -{ - py::list results; - for (auto& block : blocks) { results.append(Alloc(block.cast())); } - return results; -} -inline py::list LookupBatch(const py::list& blocks) -{ - py::list founds; - for (auto& block : blocks) { founds.append(Lookup(block.cast())); } - return founds; -} -inline void CommitBatch(const py::list& blocks, const bool success) -{ - for (auto& block : blocks) { Commit(block.cast(), success); } -} -inline int32_t Wait(const size_t task) { return ((StoreImpl*)CCStoreImpl())->Wait(task); } -inline py::tuple Check(const size_t task) -{ - auto finish = false; - auto ret = ((StoreImpl*)CCStoreImpl())->Check(task, finish); - return py::make_tuple(ret, finish); -} -size_t Submit(const py::list& blockIds, const py::list& offsets, const py::list& addresses, - const py::list& lengths, const CCStore::Task::Type type, - const CCStore::Task::Location location, const std::string& brief) -{ - CCStore::Task task{type, location, brief}; - auto blockId = blockIds.begin(); - auto offset = offsets.begin(); - auto address = addresses.begin(); - auto length = lengths.begin(); - while ((blockId != blockIds.end()) && (offset != offsets.end()) && - (address != addresses.end()) && (length != lengths.end())) { - auto ret = task.Append(blockId->cast(), offset->cast(), - address->cast(), length->cast()); - if (ret != 0) { return CCStore::invalidTaskId; } - blockId++; - offset++; - address++; - length++; +class LocalStorePy : public LocalStore { +public: + void* CCStoreImpl() { return this; } + py::list AllocBatch(const py::list& blocks) + { + py::list results; + for (auto& block : blocks) { results.append(this->Alloc(block.cast())); } + return results; } - return ((StoreImpl*)CCStoreImpl())->Submit(std::move(task)); -} -inline size_t Load(const py::list& blockIds, const py::list& offsets, const py::list& addresses, - const py::list& lengths) -{ - return Submit(blockIds, offsets, addresses, lengths, CCStore::Task::Type::LOAD, - CCStore::Task::Location::DEVICE, "Local::S2D"); -} -inline size_t Dump(const py::list& blockIds, const py::list& offsets, const py::list& addresses, - const py::list& lengths) -{ - return Submit(blockIds, offsets, addresses, lengths, CCStore::Task::Type::DUMP, - CCStore::Task::Location::DEVICE, "Local::D2S"); -} + py::list LookupBatch(const py::list& blocks) + { + py::list founds; + for (auto& block : blocks) { founds.append(this->Lookup(block.cast())); } + return founds; + } + void CommitBatch(const py::list& blocks, const bool success) + { + for (auto& block : blocks) { this->Commit(block.cast(), success); } + } + py::tuple CheckPy(const size_t task) + { + auto finish = false; + auto ret = this->Check(task, finish); + return py::make_tuple(ret, finish); + } + size_t Load(const py::list& blockIds, const py::list& offsets, const py::list& addresses, + const py::list& lengths) + { + return this->SubmitPy(blockIds, offsets, addresses, lengths, CCStore::Task::Type::LOAD, + CCStore::Task::Location::DEVICE, "LOCAL::S2D"); + } + size_t Dump(const py::list& blockIds, const py::list& offsets, const py::list& addresses, + const py::list& lengths) + { + return this->SubmitPy(blockIds, offsets, addresses, lengths, CCStore::Task::Type::DUMP, + CCStore::Task::Location::DEVICE, "LOCAL::D2S"); + } + +private: + size_t SubmitPy(const py::list& blockIds, const py::list& offsets, const py::list& addresses, + const py::list& lengths, const CCStore::Task::Type type, + const CCStore::Task::Location location, const std::string& brief) + { + CCStore::Task task{type, location, brief}; + auto blockId = blockIds.begin(); + auto offset = offsets.begin(); + auto address = addresses.begin(); + auto length = lengths.begin(); + while ((blockId != blockIds.end()) && (offset != offsets.end()) && + (address != addresses.end()) && (length != lengths.end())) { + auto ret = task.Append(blockId->cast(), offset->cast(), + address->cast(), length->cast()); + if (ret != 0) { return CCStore::invalidTaskId; } + blockId++; + offset++; + address++; + length++; + } + return this->Submit(std::move(task)); + } +}; } // namespace UC -PYBIND11_MODULE(ucmdramstore, module) +PYBIND11_MODULE(ucmlocalstore, module) { module.attr("project") = UC_VAR_PROJECT_NAME; module.attr("version") = UC_VAR_PROJECT_VERSION; module.attr("commit_id") = UC_VAR_GIT_COMMIT_ID; module.attr("build_type") = UC_VAR_BUILD_TYPE; - auto store = module.def_submodule("LocalStore"); - auto config = py::class_(store, "Config"); + auto store = py::class_(module, "LocalStore"); + auto config = py::class_(store, "Config"); config.def(py::init(), py::arg("ioSize"), py::arg("capacity")); - config.def_readwrite("ioSize", &StoreImpl::Config::ioSize); - config.def_readwrite("capacity", &StoreImpl::Config::capacity); - config.def_readwrite("backend", &StoreImpl::Config::backend); - config.def_readwrite("deviceId", &StoreImpl::Config::deviceId); - module.def("CCStoreImpl", &UC::CCStoreImpl); - module.def("Setup", &UC::Setup); - module.def("Alloc", &UC::Alloc); - module.def("AllocBatch", &UC::AllocBatch); - module.def("Lookup", &UC::Lookup); - module.def("LookupBatch", &UC::LookupBatch); - module.def("Load", &UC::Load); - module.def("Dump", &UC::Dump); - module.def("Wait", &UC::Wait); - module.def("Check", &UC::Check); - module.def("Commit", &UC::Commit); - module.def("CommitBatch", &UC::CommitBatch); + config.def_readwrite("ioSize", &UC::LocalStorePy::Config::ioSize); + config.def_readwrite("capacity", &UC::LocalStorePy::Config::capacity); + config.def_readwrite("backend", &UC::LocalStorePy::Config::backend); + config.def_readwrite("deviceId", &UC::LocalStorePy::Config::deviceId); + store.def("CCStoreImpl", &UC::LocalStorePy::CCStoreImpl); + store.def("Setup", &UC::LocalStorePy::Setup); + store.def("Alloc", py::overload_cast(&UC::LocalStorePy::Alloc)); + store.def("AllocBatch", &UC::LocalStorePy::AllocBatch); + store.def("Lookup", py::overload_cast(&UC::LocalStorePy::Lookup)); + store.def("LookupBatch", &UC::LocalStorePy::LookupBatch); + store.def("Load", &UC::LocalStorePy::Load); + store.def("Dump", &UC::LocalStorePy::Dump); + store.def("Wait", &UC::LocalStorePy::Wait); + store.def("Check", &UC::LocalStorePy::Check); + store.def("Commit", + py::overload_cast(&UC::LocalStorePy::Commit)); + store.def("CommitBatch", &UC::LocalStorePy::CommitBatch); } diff --git a/ucm/store/nfsstore/cc/api/nfsstore.cc b/ucm/store/nfsstore/cc/api/nfsstore.cc index beeec1a4..1adfbf9f 100644 --- a/ucm/store/nfsstore/cc/api/nfsstore.cc +++ b/ucm/store/nfsstore/cc/api/nfsstore.cc @@ -29,7 +29,7 @@ namespace UC { -class NfsStoreImpl : public NfsStore { +class NFSStoreImpl : public NFSStore { public: int32_t Setup(const Config& config) { @@ -101,7 +101,7 @@ class NfsStoreImpl : public NfsStore { { std::string buildType = UC_VAR_BUILD_TYPE; if (buildType.empty()) { buildType = "Release"; } - UC_INFO("NfsStore-{}({}).", UC_VAR_GIT_COMMIT_ID, buildType); + UC_INFO("NFSStore-{}({}).", UC_VAR_GIT_COMMIT_ID, buildType); UC_INFO("Set UC::StorageBackends to {}.", config.storageBackends); UC_INFO("Set UC::BlockSize to {}.", config.kvcacheBlockSize); UC_INFO("Set UC::TransferEnable to {}.", config.transferEnable); @@ -117,9 +117,9 @@ class NfsStoreImpl : public NfsStore { TsfTaskManager transMgr_; }; -int32_t NfsStore::Setup(const Config& config) +int32_t NFSStore::Setup(const Config& config) { - auto impl = new (std::nothrow) NfsStoreImpl(); + auto impl = new (std::nothrow) NFSStoreImpl(); if (!impl) { UC_ERROR("Out of memory."); return Status::OutOfMemory().Underlying(); diff --git a/ucm/store/nfsstore/cc/api/nfsstore.h b/ucm/store/nfsstore/cc/api/nfsstore.h index bc9c8ebb..b6bd4ba6 100644 --- a/ucm/store/nfsstore/cc/api/nfsstore.h +++ b/ucm/store/nfsstore/cc/api/nfsstore.h @@ -29,7 +29,7 @@ namespace UC { -class NfsStore : public CCStore { +class NFSStore : public CCStore { public: struct Config { std::vector storageBackends; @@ -51,8 +51,8 @@ class NfsStore : public CCStore { }; public: - NfsStore() : impl_{nullptr} {} - ~NfsStore() override + NFSStore() : impl_{nullptr} {} + ~NFSStore() override { if (this->impl_) { delete this->impl_; } } @@ -83,7 +83,7 @@ class NfsStore : public CCStore { } private: - NfsStore* impl_; + NFSStore* impl_; }; }; // namespace UC diff --git a/ucm/store/nfsstore/cpy/nfsstore.py.cc b/ucm/store/nfsstore/cpy/nfsstore.py.cc index 66ca5eee..24c8d1b9 100644 --- a/ucm/store/nfsstore/cpy/nfsstore.py.cc +++ b/ucm/store/nfsstore/cpy/nfsstore.py.cc @@ -24,92 +24,84 @@ #include "nfsstore.h" #include #include -#include "template/singleton.h" namespace py = pybind11; -using StoreImpl = UC::NfsStore; namespace UC { -inline void* CCStoreImpl() { return Singleton::Instance(); } -inline int32_t Setup(const StoreImpl::Config& config) -{ - return ((StoreImpl*)CCStoreImpl())->Setup(config); -} -inline int32_t Alloc(const std::string& block) { return ((StoreImpl*)CCStoreImpl())->Alloc(block); } -inline bool Lookup(const std::string& block) { return ((StoreImpl*)CCStoreImpl())->Lookup(block); } -inline void Commit(const std::string& block, const bool success) -{ - return ((StoreImpl*)CCStoreImpl())->Commit(block, success); -} -inline py::list AllocBatch(const py::list& blocks) -{ - py::list results; - for (auto& block : blocks) { results.append(Alloc(block.cast())); } - return results; -} -inline py::list LookupBatch(const py::list& blocks) -{ - py::list founds; - for (auto& block : blocks) { founds.append(Lookup(block.cast())); } - return founds; -} -inline void CommitBatch(const py::list& blocks, const bool success) -{ - for (auto& block : blocks) { Commit(block.cast(), success); } -} -inline int32_t Wait(const size_t task) { return ((StoreImpl*)CCStoreImpl())->Wait(task); } -inline py::tuple Check(const size_t task) -{ - auto finish = false; - auto ret = ((StoreImpl*)CCStoreImpl())->Check(task, finish); - return py::make_tuple(ret, finish); -} -size_t Submit(const py::list& blockIds, const py::list& offsets, const py::list& addresses, - const py::list& lengths, const CCStore::Task::Type type, - const CCStore::Task::Location location, const std::string& brief) -{ - CCStore::Task task{type, location, brief}; - auto blockId = blockIds.begin(); - auto offset = offsets.begin(); - auto address = addresses.begin(); - auto length = lengths.begin(); - while ((blockId != blockIds.end()) && (offset != offsets.end()) && - (address != addresses.end()) && (length != lengths.end())) { - auto ret = task.Append(blockId->cast(), offset->cast(), - address->cast(), length->cast()); - if (ret != 0) { return CCStore::invalidTaskId; } - blockId++; - offset++; - address++; - length++; +class NFSStorePy : public NFSStore { +public: + void* CCStoreImpl() { return this; } + py::list AllocBatch(const py::list& blocks) + { + py::list results; + for (auto& block : blocks) { results.append(this->Alloc(block.cast())); } + return results; } - return ((StoreImpl*)CCStoreImpl())->Submit(std::move(task)); -} -inline size_t LoadToDevice(const py::list& blockIds, const py::list& offsets, - const py::list& addresses, const py::list& lengths) -{ - return Submit(blockIds, offsets, addresses, lengths, CCStore::Task::Type::LOAD, - CCStore::Task::Location::DEVICE, "NFS::S2D"); -} -inline size_t LoadToHost(const py::list& blockIds, const py::list& offsets, - const py::list& addresses, const py::list& lengths) -{ - return Submit(blockIds, offsets, addresses, lengths, CCStore::Task::Type::LOAD, - CCStore::Task::Location::HOST, "NFS::S2H"); -} -inline size_t DumpFromDevice(const py::list& blockIds, const py::list& offsets, - const py::list& addresses, const py::list& lengths) -{ - return Submit(blockIds, offsets, addresses, lengths, CCStore::Task::Type::DUMP, - CCStore::Task::Location::DEVICE, "NFS::D2S"); -} -inline size_t DumpFromHost(const py::list& blockIds, const py::list& offsets, - const py::list& addresses, const py::list& lengths) -{ - return Submit(blockIds, offsets, addresses, lengths, CCStore::Task::Type::DUMP, - CCStore::Task::Location::HOST, "NFS::H2S"); -} + py::list LookupBatch(const py::list& blocks) + { + py::list founds; + for (auto& block : blocks) { founds.append(this->Lookup(block.cast())); } + return founds; + } + void CommitBatch(const py::list& blocks, const bool success) + { + for (auto& block : blocks) { this->Commit(block.cast(), success); } + } + py::tuple CheckPy(const size_t task) + { + auto finish = false; + auto ret = this->Check(task, finish); + return py::make_tuple(ret, finish); + } + size_t LoadToDevice(const py::list& blockIds, const py::list& offsets, + const py::list& addresses, const py::list& lengths) + { + return this->SubmitPy(blockIds, offsets, addresses, lengths, CCStore::Task::Type::LOAD, + CCStore::Task::Location::DEVICE, "NFS::S2D"); + } + size_t LoadToHost(const py::list& blockIds, const py::list& offsets, const py::list& addresses, + const py::list& lengths) + { + return this->SubmitPy(blockIds, offsets, addresses, lengths, CCStore::Task::Type::LOAD, + CCStore::Task::Location::HOST, "NFS::S2H"); + } + size_t DumpFromDevice(const py::list& blockIds, const py::list& offsets, + const py::list& addresses, const py::list& lengths) + { + return this->SubmitPy(blockIds, offsets, addresses, lengths, CCStore::Task::Type::DUMP, + CCStore::Task::Location::DEVICE, "NFS::D2S"); + } + size_t DumpFromHost(const py::list& blockIds, const py::list& offsets, + const py::list& addresses, const py::list& lengths) + { + return this->SubmitPy(blockIds, offsets, addresses, lengths, CCStore::Task::Type::DUMP, + CCStore::Task::Location::HOST, "NFS::H2S"); + } + +private: + size_t SubmitPy(const py::list& blockIds, const py::list& offsets, const py::list& addresses, + const py::list& lengths, const CCStore::Task::Type type, + const CCStore::Task::Location location, const std::string& brief) + { + CCStore::Task task{type, location, brief}; + auto blockId = blockIds.begin(); + auto offset = offsets.begin(); + auto address = addresses.begin(); + auto length = lengths.begin(); + while ((blockId != blockIds.end()) && (offset != offsets.end()) && + (address != addresses.end()) && (length != lengths.end())) { + auto ret = task.Append(blockId->cast(), offset->cast(), + address->cast(), length->cast()); + if (ret != 0) { return CCStore::invalidTaskId; } + blockId++; + offset++; + address++; + length++; + } + return this->Submit(std::move(task)); + } +}; } // namespace UC @@ -119,29 +111,30 @@ PYBIND11_MODULE(ucmnfsstore, module) module.attr("version") = UC_VAR_PROJECT_VERSION; module.attr("commit_id") = UC_VAR_GIT_COMMIT_ID; module.attr("build_type") = UC_VAR_BUILD_TYPE; - auto store = module.def_submodule("NfsStore"); - auto config = py::class_(store, "Config"); + auto store = py::class_(module, "NFSStore"); + auto config = py::class_(store, "Config"); config.def(py::init&, const size_t, const bool>(), py::arg("storageBackends"), py::arg("kvcacheBlockSize"), py::arg("transferEnable")); - config.def_readwrite("storageBackends", &StoreImpl::Config::storageBackends); - config.def_readwrite("kvcacheBlockSize", &StoreImpl::Config::kvcacheBlockSize); - config.def_readwrite("transferEnable", &StoreImpl::Config::transferEnable); - config.def_readwrite("transferDeviceId", &StoreImpl::Config::transferDeviceId); - config.def_readwrite("transferStreamNumber", &StoreImpl::Config::transferStreamNumber); - config.def_readwrite("transferIoSize", &StoreImpl::Config::transferIoSize); - config.def_readwrite("transferBufferNumber", &StoreImpl::Config::transferBufferNumber); - module.def("CCStoreImpl", &UC::CCStoreImpl); - module.def("Setup", &UC::Setup); - module.def("Alloc", &UC::Alloc); - module.def("AllocBatch", &UC::AllocBatch); - module.def("Lookup", &UC::Lookup); - module.def("LookupBatch", &UC::LookupBatch); - module.def("LoadToDevice", &UC::LoadToDevice); - module.def("LoadToHost", &UC::LoadToHost); - module.def("DumpFromDevice", &UC::DumpFromDevice); - module.def("DumpFromHost", &UC::DumpFromHost); - module.def("Wait", &UC::Wait); - module.def("Check", &UC::Check); - module.def("Commit", &UC::Commit); - module.def("CommitBatch", &UC::CommitBatch); + config.def_readwrite("storageBackends", &UC::NFSStorePy::Config::storageBackends); + config.def_readwrite("kvcacheBlockSize", &UC::NFSStorePy::Config::kvcacheBlockSize); + config.def_readwrite("transferEnable", &UC::NFSStorePy::Config::transferEnable); + config.def_readwrite("transferDeviceId", &UC::NFSStorePy::Config::transferDeviceId); + config.def_readwrite("transferStreamNumber", &UC::NFSStorePy::Config::transferStreamNumber); + config.def_readwrite("transferIoSize", &UC::NFSStorePy::Config::transferIoSize); + config.def_readwrite("transferBufferNumber", &UC::NFSStorePy::Config::transferBufferNumber); + store.def(py::init<>()); + store.def("CCStoreImpl", &UC::NFSStorePy::CCStoreImpl); + store.def("Setup", &UC::NFSStorePy::Setup); + store.def("Alloc", py::overload_cast(&UC::NFSStorePy::Alloc)); + store.def("AllocBatch", &UC::NFSStorePy::AllocBatch); + store.def("Lookup", py::overload_cast(&UC::NFSStorePy::Lookup)); + store.def("LookupBatch", &UC::NFSStorePy::LookupBatch); + store.def("LoadToDevice", &UC::NFSStorePy::LoadToDevice); + store.def("LoadToHost", &UC::NFSStorePy::LoadToHost); + store.def("DumpFromDevice", &UC::NFSStorePy::DumpFromDevice); + store.def("DumpFromHost", &UC::NFSStorePy::DumpFromHost); + store.def("Wait", &UC::NFSStorePy::Wait); + store.def("Check", &UC::NFSStorePy::CheckPy); + store.def("Commit", py::overload_cast(&UC::NFSStorePy::Commit)); + store.def("CommitBatch", &UC::NFSStorePy::CommitBatch); } diff --git a/ucm/store/test/e2e/nfsstore_embed.py b/ucm/store/test/e2e/nfsstore_embed.py index 3b8e37be..e247271a 100644 --- a/ucm/store/test/e2e/nfsstore_embed.py +++ b/ucm/store/test/e2e/nfsstore_embed.py @@ -28,8 +28,9 @@ import torch import torch_npu -from connector.nfsstore_connector import UcmNfsStore -from connector.ucmstore import UcmKVStoreBase + +from ucm.store.connector.nfsstore_connector import UcmNfsStore +from ucm.store.connector.ucmstore import UcmKVStoreBase def setup_store(storage_backends, block_size, device_id, io_size) -> UcmKVStoreBase: diff --git a/ucm/store/test/e2e/nfsstore_fetch.py b/ucm/store/test/e2e/nfsstore_fetch.py index 5eced78c..8cfc8482 100644 --- a/ucm/store/test/e2e/nfsstore_fetch.py +++ b/ucm/store/test/e2e/nfsstore_fetch.py @@ -28,8 +28,9 @@ import torch import torch_npu -from connector.nfsstore_connector import UcmNfsStore -from connector.ucmstore import UcmKVStoreBase + +from ucm.store.connector.nfsstore_connector import UcmNfsStore +from ucm.store.connector.ucmstore import UcmKVStoreBase def setup_store(storage_backends, block_size, device_id, io_size) -> UcmKVStoreBase: diff --git a/ucm/ucm_sparse/gsa.py b/ucm/ucm_sparse/gsa.py index 596fb35a..ae6fb158 100644 --- a/ucm/ucm_sparse/gsa.py +++ b/ucm/ucm_sparse/gsa.py @@ -437,13 +437,13 @@ def __init__(self, vllm_config: VllmConfig, role: UcmSparseRole): * self.layer_num * (1 if self.use_mla else self.num_head * self.total_tp_size * 2) ) - transferIoSize = config_base * (1 if self.use_mla else self.num_head) + io_size = config_base * (1 if self.use_mla else self.num_head) nfs_config = { "storage_backends": "./ucm/data/" + str(self.rank), "kv_block_size": kv_block_size, "device": self.rank, "role": "worker", - "transferIoSize": transferIoSize, + "io_size": io_size, } self.connector = UcmConnectorFactory.create_connector( "UcmNfsStore", nfs_config