Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions ucm/integration/vllm/uc_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,13 +139,11 @@ def __init__(self, vllm_config: "VllmConfig", role: KVConnectorRole):
* self.num_layers
* (1 if self.is_mla else self.num_head * self.total_tp_size * 2)
)
config["transferIoSize"] = config_base * (
1 if self.is_mla else self.num_head
)
config["io_size"] = config_base * (1 if self.is_mla else self.num_head)
logger.info(
"kv_block_size = %d, transferIoSize = %d,",
"kv_block_size = %d, io_size = %d,",
config["kv_block_size"],
config["transferIoSize"],
config["io_size"],
)
logger.info("init UCConnectorImpl, connector: %s", name)
self.connector = UcmConnectorFactory.create_connector(name, config)
Expand Down
23 changes: 12 additions & 11 deletions ucm/store/connector/nfsstore_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,30 +39,31 @@ class NfsTask(Task):
class UcmNfsStore(UcmKVStoreBase):
def __init__(self, config: Dict):
super().__init__(config)
self.store = ucmnfsstore.NFSStore()
storage_backends = [
path for path in config["storage_backends"].split(":") if path
]
block_size = int(config["kv_block_size"])
transfer_enable = True if config["role"] == "worker" else False
param = ucmnfsstore.NfsStore.Config(
param = ucmnfsstore.NFSStore.Config(
storage_backends, block_size, transfer_enable
)
if transfer_enable:
param.transferDeviceId = config["device"]
param.transferIoSize = config["transferIoSize"]
ret = ucmnfsstore.Setup(param)
param.transferIoSize = config["io_size"]
ret = self.store.Setup(param)
if ret != 0:
msg = f"Failed to initialize ucmnfsstore, errcode: {ret}."
raise RuntimeError(msg)

def cc_store(self) -> int:
return ucmnfsstore.CCStoreImpl()
return self.store.CCStoreImpl()

def create(self, block_ids: List[str]) -> List[int]:
return ucmnfsstore.AllocBatch(block_ids)
return self.store.AllocBatch(block_ids)

def lookup(self, block_ids: List[str]) -> List[bool]:
return ucmnfsstore.LookupBatch(block_ids)
return self.store.LookupBatch(block_ids)

def prefetch(self, block_ids: List[str]) -> None:
pass
Expand All @@ -72,7 +73,7 @@ def load(
) -> Task:
dst_tensor_ptr = [t.data_ptr() for t in dst_tensor]
dst_tensor_size = [t.numel() * t.element_size() for t in dst_tensor]
task_id = ucmnfsstore.LoadToDevice(
task_id = self.store.LoadToDevice(
block_ids, offset, dst_tensor_ptr, dst_tensor_size
)
return NfsTask(task_id=task_id)
Expand All @@ -82,16 +83,16 @@ def dump(
) -> Task:
src_tensor_ptr = [t.data_ptr() for t in src_tensor]
src_tensor_size = [t.numel() * t.element_size() for t in src_tensor]
task_id = ucmnfsstore.DumpFromDevice(
task_id = self.store.DumpFromDevice(
block_ids, offset, src_tensor_ptr, src_tensor_size
)
return NfsTask(task_id=task_id)

def wait(self, task: Task) -> int:
return ucmnfsstore.Wait(task.task_id)
return self.store.Wait(task.task_id)

def commit(self, block_ids: List[str], is_success: bool = True) -> None:
ucmnfsstore.CommitBatch(block_ids, is_success)
self.store.CommitBatch(block_ids, is_success)

def check(self, task: Task) -> Tuple[int, bool]:
return ucmnfsstore.Check(task.task_id)
return self.store.Check(task.task_id)
6 changes: 3 additions & 3 deletions ucm/store/dramstore/cc/api/dramstore.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

namespace UC {

class DramStoreImpl : public DramStore {
class DRAMStoreImpl : public DRAMStore {
public:
int32_t Setup(const size_t ioSize, const size_t capacity, const int32_t deviceId) { return -1; }
int32_t Alloc(const std::string& block) override { return -1; }
Expand All @@ -47,9 +47,9 @@ class DramStoreImpl : public DramStore {
int32_t Check(const size_t task, bool& finish) override { return -1; }
};

int32_t DramStore::Setup(const Config& config)
int32_t DRAMStore::Setup(const Config& config)
{
auto impl = new (std::nothrow) DramStoreImpl();
auto impl = new (std::nothrow) DRAMStoreImpl();
if (!impl) {
UC_ERROR("Out of memory.");
return Status::OutOfMemory().Underlying();
Expand Down
8 changes: 4 additions & 4 deletions ucm/store/dramstore/cc/api/dramstore.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

namespace UC {

class DramStore : public CCStore {
class DRAMStore : public CCStore {
public:
struct Config {
size_t ioSize;
Expand All @@ -41,8 +41,8 @@ class DramStore : public CCStore {
};

public:
DramStore() : impl_{nullptr} {}
~DramStore() override
DRAMStore() : impl_{nullptr} {}
~DRAMStore() override
{
if (this->impl_) { delete this->impl_; }
}
Expand Down Expand Up @@ -73,7 +73,7 @@ class DramStore : public CCStore {
}

private:
DramStore* impl_;
DRAMStore* impl_;
};

} // namespace UC
Expand Down
164 changes: 79 additions & 85 deletions ucm/store/dramstore/cpy/dramstore.py.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,80 +23,72 @@
* */
#include "api/dramstore.h"
#include <pybind11/pybind11.h>
#include "template/singleton.h"

namespace py = pybind11;
using StoreImpl = UC::DramStore;

namespace UC {

inline void* CCStoreImpl() { return Singleton<StoreImpl>::Instance(); }
inline int32_t Setup(const StoreImpl::Config& config)
{
return ((StoreImpl*)CCStoreImpl())->Setup(config);
}
inline int32_t Alloc(const std::string& block) { return ((StoreImpl*)CCStoreImpl())->Alloc(block); }
inline bool Lookup(const std::string& block) { return ((StoreImpl*)CCStoreImpl())->Lookup(block); }
inline void Commit(const std::string& block, const bool success)
{
return ((StoreImpl*)CCStoreImpl())->Commit(block, success);
}
inline py::list AllocBatch(const py::list& blocks)
{
py::list results;
for (auto& block : blocks) { results.append(Alloc(block.cast<std::string>())); }
return results;
}
inline py::list LookupBatch(const py::list& blocks)
{
py::list founds;
for (auto& block : blocks) { founds.append(Lookup(block.cast<std::string>())); }
return founds;
}
inline void CommitBatch(const py::list& blocks, const bool success)
{
for (auto& block : blocks) { Commit(block.cast<std::string>(), success); }
}
inline int32_t Wait(const size_t task) { return ((StoreImpl*)CCStoreImpl())->Wait(task); }
inline py::tuple Check(const size_t task)
{
auto finish = false;
auto ret = ((StoreImpl*)CCStoreImpl())->Check(task, finish);
return py::make_tuple(ret, finish);
}
size_t Submit(const py::list& blockIds, const py::list& offsets, const py::list& addresses,
const py::list& lengths, const CCStore::Task::Type type,
const CCStore::Task::Location location, const std::string& brief)
{
CCStore::Task task{type, location, brief};
auto blockId = blockIds.begin();
auto offset = offsets.begin();
auto address = addresses.begin();
auto length = lengths.begin();
while ((blockId != blockIds.end()) && (offset != offsets.end()) &&
(address != addresses.end()) && (length != lengths.end())) {
auto ret = task.Append(blockId->cast<std::string>(), offset->cast<size_t>(),
address->cast<uintptr_t>(), length->cast<size_t>());
if (ret != 0) { return CCStore::invalidTaskId; }
blockId++;
offset++;
address++;
length++;
class DRAMStorePy : public DRAMStore {
public:
void* CCStoreImpl() { return this; }
py::list AllocBatch(const py::list& blocks)
{
py::list results;
for (auto& block : blocks) { results.append(this->Alloc(block.cast<std::string>())); }
return results;
}
return ((StoreImpl*)CCStoreImpl())->Submit(std::move(task));
}
inline size_t Load(const py::list& blockIds, const py::list& offsets, const py::list& addresses,
const py::list& lengths)
{
return Submit(blockIds, offsets, addresses, lengths, CCStore::Task::Type::LOAD,
CCStore::Task::Location::DEVICE, "Dram::H2D");
}
inline size_t Dump(const py::list& blockIds, const py::list& offsets, const py::list& addresses,
const py::list& lengths)
{
return Submit(blockIds, offsets, addresses, lengths, CCStore::Task::Type::DUMP,
CCStore::Task::Location::DEVICE, "Dram::D2H");
}
py::list LookupBatch(const py::list& blocks)
{
py::list founds;
for (auto& block : blocks) { founds.append(this->Lookup(block.cast<std::string>())); }
return founds;
}
void CommitBatch(const py::list& blocks, const bool success)
{
for (auto& block : blocks) { this->Commit(block.cast<std::string>(), success); }
}
py::tuple CheckPy(const size_t task)
{
auto finish = false;
auto ret = this->Check(task, finish);
return py::make_tuple(ret, finish);
}
size_t Load(const py::list& blockIds, const py::list& offsets, const py::list& addresses,
const py::list& lengths)
{
return this->SubmitPy(blockIds, offsets, addresses, lengths, CCStore::Task::Type::LOAD,
CCStore::Task::Location::DEVICE, "DRAM::H2D");
}
size_t Dump(const py::list& blockIds, const py::list& offsets, const py::list& addresses,
const py::list& lengths)
{
return this->SubmitPy(blockIds, offsets, addresses, lengths, CCStore::Task::Type::DUMP,
CCStore::Task::Location::DEVICE, "DRAM::D2H");
}

private:
size_t SubmitPy(const py::list& blockIds, const py::list& offsets, const py::list& addresses,
const py::list& lengths, const CCStore::Task::Type type,
const CCStore::Task::Location location, const std::string& brief)
{
CCStore::Task task{type, location, brief};
auto blockId = blockIds.begin();
auto offset = offsets.begin();
auto address = addresses.begin();
auto length = lengths.begin();
while ((blockId != blockIds.end()) && (offset != offsets.end()) &&
(address != addresses.end()) && (length != lengths.end())) {
auto ret = task.Append(blockId->cast<std::string>(), offset->cast<size_t>(),
address->cast<uintptr_t>(), length->cast<size_t>());
if (ret != 0) { return CCStore::invalidTaskId; }
blockId++;
offset++;
address++;
length++;
}
return this->Submit(std::move(task));
}
};

} // namespace UC

Expand All @@ -106,22 +98,24 @@ PYBIND11_MODULE(ucmdramstore, module)
module.attr("version") = UC_VAR_PROJECT_VERSION;
module.attr("commit_id") = UC_VAR_GIT_COMMIT_ID;
module.attr("build_type") = UC_VAR_BUILD_TYPE;
auto store = module.def_submodule("DramStore");
auto config = py::class_<StoreImpl::Config>(store, "Config");
auto store = py::class_<UC::DRAMStorePy>(module, "DRAMStore");
auto config = py::class_<UC::DRAMStorePy::Config>(store, "Config");
config.def(py::init<const size_t, const size_t>(), py::arg("ioSize"), py::arg("capacity"));
config.def_readwrite("ioSize", &StoreImpl::Config::ioSize);
config.def_readwrite("capacity", &StoreImpl::Config::capacity);
config.def_readwrite("deviceId", &StoreImpl::Config::deviceId);
module.def("CCStoreImpl", &UC::CCStoreImpl);
module.def("Setup", &UC::Setup);
module.def("Alloc", &UC::Alloc);
module.def("AllocBatch", &UC::AllocBatch);
module.def("Lookup", &UC::Lookup);
module.def("LookupBatch", &UC::LookupBatch);
module.def("Load", &UC::Load);
module.def("Dump", &UC::Dump);
module.def("Wait", &UC::Wait);
module.def("Check", &UC::Check);
module.def("Commit", &UC::Commit);
module.def("CommitBatch", &UC::CommitBatch);
config.def_readwrite("ioSize", &UC::DRAMStorePy::Config::ioSize);
config.def_readwrite("capacity", &UC::DRAMStorePy::Config::capacity);
config.def_readwrite("deviceId", &UC::DRAMStorePy::Config::deviceId);
store.def(py::init<>());
store.def("CCStoreImpl", &UC::DRAMStorePy::CCStoreImpl);
store.def("Setup", &UC::DRAMStorePy::Setup);
store.def("Alloc", py::overload_cast<const std::string&>(&UC::DRAMStorePy::Alloc));
store.def("AllocBatch", &UC::DRAMStorePy::AllocBatch);
store.def("Lookup", py::overload_cast<const std::string&>(&UC::DRAMStorePy::Lookup));
store.def("LookupBatch", &UC::DRAMStorePy::LookupBatch);
store.def("Load", &UC::DRAMStorePy::Load);
store.def("Dump", &UC::DRAMStorePy::Dump);
store.def("Wait", &UC::DRAMStorePy::Wait);
store.def("Check", &UC::DRAMStorePy::Check);
store.def("Commit",
py::overload_cast<const std::string&, const bool>(&UC::DRAMStorePy::Commit));
store.def("CommitBatch", &UC::DRAMStorePy::CommitBatch);
}
4 changes: 2 additions & 2 deletions ucm/store/infra/logger/flux/flux_logger.cc
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,8 @@ class FluxLogger : public ILogger {
void Log(Level&& lv, SourceLocation&& loc, std::string&& msg) override
{
using namespace std::chrono;
static const size_t pid = static_cast<size_t>(getpid());
static thread_local const size_t tid = syscall(SYS_gettid);
static const auto pid = getpid();
static thread_local const auto tid = syscall(SYS_gettid);
static thread_local seconds lastSec{0};
static thread_local char datetime[32];
auto systemNow = system_clock::now();
Expand Down
Loading