Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ucm/store/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ add_subdirectory(nfsstore)
add_subdirectory(dramstore)
add_subdirectory(localstore)
add_subdirectory(mooncakestore)
add_subdirectory(task)
add_subdirectory(test)
9 changes: 9 additions & 0 deletions ucm/store/device/ascend/ascend_device.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,14 @@ class AscendDevice : public IBufferedDevice {
}
return Status::OK();
}
Status H2DSync(std::byte* dst, const std::byte* src, const size_t count) override
{
return ASCEND_API(aclrtMemcpy, dst, count, src, count, ACL_MEMCPY_HOST_TO_DEVICE);
}
Status D2HSync(std::byte* dst, const std::byte* src, const size_t count) override
{
return ASCEND_API(aclrtMemcpy, dst, count, src, count, ACL_MEMCPY_DEVICE_TO_HOST);
}
Status H2DAsync(std::byte* dst, const std::byte* src, const size_t count) override
{
return ASCEND_API(aclrtMemcpyAsync, dst, count, src, count, ACL_MEMCPY_HOST_TO_DEVICE,
Expand All @@ -111,6 +119,7 @@ class AscendDevice : public IBufferedDevice {
return ASCEND_API(aclrtLaunchCallback, Trampoline, (void*)c, ACL_CALLBACK_NO_BLOCK,
this->stream_);
}
Status Synchronized() override { return ASCEND_API(aclrtSynchronizeStream, this->stream_); }

protected:
std::shared_ptr<std::byte> MakeBuffer(const size_t size) override
Expand Down
12 changes: 12 additions & 0 deletions ucm/store/device/cuda/cuda_device.cc
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,14 @@ class CudaDevice : public IBufferedDevice {
}
return status;
}
virtual Status H2DSync(std::byte* dst, const std::byte* src, const size_t count) override
{
return CUDA_API(cudaMemcpy, dst, src, count, cudaMemcpyHostToDevice);
}
virtual Status D2HSync(std::byte* dst, const std::byte* src, const size_t count) override
{
return CUDA_API(cudaMemcpy, dst, src, count, cudaMemcpyDeviceToHost);
}
Status H2DAsync(std::byte* dst, const std::byte* src, const size_t count) override
{
return CUDA_API(cudaMemcpyAsync, dst, src, count, cudaMemcpyHostToDevice,
Expand All @@ -100,6 +108,10 @@ class CudaDevice : public IBufferedDevice {
if (status.Failure()) { delete c; }
return status;
}
Status Synchronized() override
{
return CUDA_API(cudaStreamSynchronize, (cudaStream_t)this->stream_);
}

protected:
std::shared_ptr<std::byte> MakeBuffer(const size_t size) override
Expand Down
3 changes: 3 additions & 0 deletions ucm/store/device/idevice.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,12 @@ class IDevice {
virtual ~IDevice() = default;
virtual Status Setup() = 0;
virtual std::shared_ptr<std::byte> GetBuffer(const size_t size) = 0;
virtual Status H2DSync(std::byte* dst, const std::byte* src, const size_t count) = 0;
virtual Status D2HSync(std::byte* dst, const std::byte* src, const size_t count) = 0;
virtual Status H2DAsync(std::byte* dst, const std::byte* src, const size_t count) = 0;
virtual Status D2HAsync(std::byte* dst, const std::byte* src, const size_t count) = 0;
virtual Status AppendCallback(std::function<void(bool)> cb) = 0;
virtual Status Synchronized() = 0;

protected:
virtual std::shared_ptr<std::byte> MakeBuffer(const size_t size) = 0;
Expand Down
22 changes: 21 additions & 1 deletion ucm/store/device/simu/simu_device.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
* */
#include "ibuffered_device.h"
#include "logger/logger.h"
#include "thread/latch.h"
#include "thread/thread_pool.h"

namespace UC {
Expand All @@ -38,7 +39,19 @@ class SimuDevice : public IBufferedDevice {
{
auto status = IBufferedDevice::Setup();
if (status.Failure()) { return status; }
if (!this->backend_.Setup([](auto& task) { task(); })) { return Status::Error(); }
if (!this->backend_.SetWorkerFn([](auto& task, const auto&) { task(); }).Run()) {
return Status::Error();
}
return Status::OK();
}
Status H2DSync(std::byte* dst, const std::byte* src, const size_t count) override
{
std::copy(src, src + count, dst);
return Status::OK();
}
Status D2HSync(std::byte* dst, const std::byte* src, const size_t count) override
{
std::copy(src, src + count, dst);
return Status::OK();
}
Status H2DAsync(std::byte* dst, const std::byte* src, const size_t count) override
Expand All @@ -64,6 +77,13 @@ class SimuDevice : public IBufferedDevice {
this->backend_.Push([=] { cb(true); });
return Status::OK();
}
virtual Status Synchronized()
{
Latch waiter{1};
this->backend_.Push([&] { waiter.Done(nullptr); });
waiter.Wait();
return Status::OK();
}

protected:
std::shared_ptr<std::byte> MakeBuffer(const size_t size) override
Expand Down
2 changes: 1 addition & 1 deletion ucm/store/dramstore/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
file(GLOB_RECURSE UCMSTORE_DRAM_CC_SOURCE_FILES "./cc/*.cc")
add_library(dramstore STATIC ${UCMSTORE_DRAM_CC_SOURCE_FILES})
target_include_directories(dramstore PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/cc)
target_link_libraries(dramstore PUBLIC storeinfra)
target_link_libraries(dramstore PUBLIC storeinfra storetask)

file(GLOB_RECURSE UCMSTORE_DRAM_CPY_SOURCE_FILES "./cpy/*.cc")
pybind11_add_module(ucmdramstore ${UCMSTORE_DRAM_CPY_SOURCE_FILES})
Expand Down
19 changes: 9 additions & 10 deletions ucm/store/dramstore/cpy/dramstore.py.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,31 +56,30 @@ class DRAMStorePy : public DRAMStore {
size_t Load(const py::list& blockIds, const py::list& offsets, const py::list& addresses,
const py::list& lengths)
{
return this->SubmitPy(blockIds, offsets, addresses, lengths, CCStore::Task::Type::LOAD,
CCStore::Task::Location::DEVICE, "DRAM::H2D");
return this->SubmitPy(blockIds, offsets, addresses, lengths, Task::Type::LOAD,
Task::Location::DEVICE, "DRAM::H2D");
}
size_t Dump(const py::list& blockIds, const py::list& offsets, const py::list& addresses,
const py::list& lengths)
{
return this->SubmitPy(blockIds, offsets, addresses, lengths, CCStore::Task::Type::DUMP,
CCStore::Task::Location::DEVICE, "DRAM::D2H");
return this->SubmitPy(blockIds, offsets, addresses, lengths, Task::Type::DUMP,
Task::Location::DEVICE, "DRAM::D2H");
}

private:
size_t SubmitPy(const py::list& blockIds, const py::list& offsets, const py::list& addresses,
const py::list& lengths, const CCStore::Task::Type type,
const CCStore::Task::Location location, const std::string& brief)
const py::list& lengths, Task::Type&& type, Task::Location&& location,
std::string&& brief)
{
CCStore::Task task{type, location, brief};
Task task{std::move(type), std::move(location), std::move(brief)};
auto blockId = blockIds.begin();
auto offset = offsets.begin();
auto address = addresses.begin();
auto length = lengths.begin();
while ((blockId != blockIds.end()) && (offset != offsets.end()) &&
(address != addresses.end()) && (length != lengths.end())) {
auto ret = task.Append(blockId->cast<std::string>(), offset->cast<size_t>(),
address->cast<uintptr_t>(), length->cast<size_t>());
if (ret != 0) { return CCStore::invalidTaskId; }
task.Append(blockId->cast<std::string>(), offset->cast<size_t>(),
address->cast<uintptr_t>(), length->cast<size_t>());
blockId++;
offset++;
address++;
Expand Down
12 changes: 8 additions & 4 deletions ucm/store/infra/file/file.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,21 +54,25 @@ Status File::Access(const std::string& path, const int32_t mode)
}

Status File::Read(const std::string& path, const size_t offset, const size_t length,
uintptr_t address)
uintptr_t address, const bool directIo)
{
FileImpl file{path};
Status status = Status::OK();
if ((status = file.Open(IFile::OpenFlag::READ_ONLY)).Failure()) { return status; }
auto flags = directIo ? IFile::OpenFlag::READ_ONLY | IFile::OpenFlag::DIRECT
: IFile::OpenFlag::READ_ONLY;
if ((status = file.Open(flags)).Failure()) { return status; }
if ((status = file.Read((void*)address, length, offset)).Failure()) { return status; }
return status;
}

Status File::Write(const std::string& path, const size_t offset, const size_t length,
const uintptr_t address)
const uintptr_t address, const bool directIo)
{
FileImpl file{path};
Status status = Status::OK();
if ((status = file.Open(IFile::OpenFlag::WRITE_ONLY)).Failure()) { return status; }
auto flags = directIo ? IFile::OpenFlag::WRITE_ONLY | IFile::OpenFlag::DIRECT
: IFile::OpenFlag::WRITE_ONLY;
if ((status = file.Open(flags)).Failure()) { return status; }
if ((status = file.Write((const void*)address, length, offset)).Failure()) { return status; }
return status;
}
Expand Down
4 changes: 2 additions & 2 deletions ucm/store/infra/file/file.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ class File {
static Status Rename(const std::string& path, const std::string& newName);
static Status Access(const std::string& path, const int32_t mode);
static Status Read(const std::string& path, const size_t offset, const size_t length,
uintptr_t address);
uintptr_t address, const bool directIo = false);
static Status Write(const std::string& path, const size_t offset, const size_t length,
const uintptr_t address);
const uintptr_t address, const bool directIo = false);
static void MUnmap(void* addr, size_t size);
static void ShmUnlink(const std::string& path);
};
Expand Down
19 changes: 17 additions & 2 deletions ucm/store/infra/thread/latch.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

#include <atomic>
#include <condition_variable>
#include <functional>
#include <mutex>

namespace UC {
Expand All @@ -34,8 +35,22 @@ class Latch {
public:
explicit Latch(const size_t expected = 0) : counter_{expected} {}
void Up() { ++this->counter_; }
size_t Done() { return --this->counter_; }
void Notify() { this->cv_.notify_all(); }
void Done(std::function<void(void)> finish) noexcept
{
auto counter = this->counter_.load(std::memory_order_acquire);
while (counter > 0) {
auto desired = counter - 1;
if (this->counter_.compare_exchange_weak(counter, desired, std::memory_order_acq_rel)) {
if (desired == 0) {
if (finish) { finish(); }
std::lock_guard<std::mutex> lg(this->mutex_);
this->cv_.notify_all();
}
return;
}
counter = this->counter_.load(std::memory_order_acquire);
}
}
void Wait()
{
std::unique_lock<std::mutex> lk(this->mutex_);
Expand Down
51 changes: 35 additions & 16 deletions ucm/store/infra/thread/thread_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@

namespace UC {

template <class Task>
template <class Task, class WorkerArgs = void*>
class ThreadPool {
using WorkerInitFn = std::function<bool(void)>;
using WorkerFn = std::function<void(Task&)>;
using WorkerExitFn = std::function<void(void)>;
using WorkerInitFn = std::function<bool(WorkerArgs&)>;
using WorkerFn = std::function<void(Task&, const WorkerArgs&)>;
using WorkerExitFn = std::function<void(WorkerArgs&)>;

public:
ThreadPool() = default;
Expand All @@ -54,14 +54,31 @@ class ThreadPool {
if (w.joinable()) { w.join(); }
}
}
bool Setup(
WorkerFn&& fn, WorkerInitFn&& initFn = [] { return true; }, WorkerExitFn&& exitFn = [] {},
const size_t nWorker = 1) noexcept
ThreadPool& SetWorkerFn(WorkerFn&& fn)
{
this->initFn_ = initFn;
this->fn_ = fn;
this->exitFn_ = exitFn;
std::list<std::promise<bool>> start(nWorker);
this->fn_ = std::move(fn);
return *this;
}
ThreadPool& SetWorkerInitFn(WorkerInitFn&& fn)
{
this->initFn_ = std::move(fn);
return *this;
}
ThreadPool& SetWorkerExitFn(WorkerExitFn&& fn)
{
this->exitFn_ = std::move(fn);
return *this;
}
ThreadPool& SetNWorker(const size_t nWorker)
{
this->nWorker_ = nWorker;
return *this;
}
bool Run()
{
if (this->nWorker_ == 0) { return false; }
if (!this->fn_) { return false; }
std::list<std::promise<bool>> start(this->nWorker_);
std::list<std::future<bool>> fut;
for (auto& s : start) {
fut.push_back(s.get_future());
Expand Down Expand Up @@ -89,24 +106,26 @@ class ThreadPool {
private:
void Worker(std::promise<bool>& started) noexcept
{
auto success = this->initFn_();
WorkerArgs args = nullptr;
auto success = true;
if (this->initFn_) { success = this->initFn_(args); }
started.set_value(success);
while (success) {
Task task;
std::unique_lock<std::mutex> lk(this->mtx_);
this->cv_.wait(lk, [this] { return this->stop_ || !this->taskQ_.empty(); });
if (this->stop_) { break; }
if (this->taskQ_.empty()) { continue; }
task = std::move(this->taskQ_.front());
auto task = std::make_shared<Task>(std::move(this->taskQ_.front()));
this->taskQ_.pop_front();
lk.unlock();
this->fn_(task);
this->fn_(*task, args);
}
this->exitFn_();
if (this->exitFn_) { this->exitFn_(args); }
}

private:
bool stop_{false};
size_t nWorker_{1};
std::list<std::thread> workers_;
WorkerInitFn initFn_;
WorkerFn fn_;
Expand Down
2 changes: 1 addition & 1 deletion ucm/store/localstore/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ target_include_directories(localstore PUBLIC
${CMAKE_CURRENT_SOURCE_DIR}/cc/api
${CMAKE_CURRENT_SOURCE_DIR}/cc/domain
)
target_link_libraries(localstore PUBLIC storeinfra)
target_link_libraries(localstore PUBLIC storeinfra storetask)

file(GLOB_RECURSE UCMSTORE_LOCAL_CPY_SOURCE_FILES "./cpy/*.cc")
pybind11_add_module(ucmlocalstore ${UCMSTORE_LOCAL_CPY_SOURCE_FILES})
Expand Down
19 changes: 9 additions & 10 deletions ucm/store/localstore/cpy/localstore.py.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,31 +56,30 @@ class LocalStorePy : public LocalStore {
size_t Load(const py::list& blockIds, const py::list& offsets, const py::list& addresses,
const py::list& lengths)
{
return this->SubmitPy(blockIds, offsets, addresses, lengths, CCStore::Task::Type::LOAD,
CCStore::Task::Location::DEVICE, "LOCAL::S2D");
return this->SubmitPy(blockIds, offsets, addresses, lengths, Task::Type::LOAD,
Task::Location::DEVICE, "LOCAL::S2D");
}
size_t Dump(const py::list& blockIds, const py::list& offsets, const py::list& addresses,
const py::list& lengths)
{
return this->SubmitPy(blockIds, offsets, addresses, lengths, CCStore::Task::Type::DUMP,
CCStore::Task::Location::DEVICE, "LOCAL::D2S");
return this->SubmitPy(blockIds, offsets, addresses, lengths, Task::Type::DUMP,
Task::Location::DEVICE, "LOCAL::D2S");
}

private:
size_t SubmitPy(const py::list& blockIds, const py::list& offsets, const py::list& addresses,
const py::list& lengths, const CCStore::Task::Type type,
const CCStore::Task::Location location, const std::string& brief)
const py::list& lengths, Task::Type&& type, Task::Location&& location,
std::string&& brief)
{
CCStore::Task task{type, location, brief};
Task task{std::move(type), std::move(location), std::move(brief)};
auto blockId = blockIds.begin();
auto offset = offsets.begin();
auto address = addresses.begin();
auto length = lengths.begin();
while ((blockId != blockIds.end()) && (offset != offsets.end()) &&
(address != addresses.end()) && (length != lengths.end())) {
auto ret = task.Append(blockId->cast<std::string>(), offset->cast<size_t>(),
address->cast<uintptr_t>(), length->cast<size_t>());
if (ret != 0) { return CCStore::invalidTaskId; }
task.Append(blockId->cast<std::string>(), offset->cast<size_t>(),
address->cast<uintptr_t>(), length->cast<size_t>());
blockId++;
offset++;
address++;
Expand Down
2 changes: 1 addition & 1 deletion ucm/store/nfsstore/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ target_include_directories(nfsstore PUBLIC
${CMAKE_CURRENT_SOURCE_DIR}/cc/api
${CMAKE_CURRENT_SOURCE_DIR}/cc/domain
)
target_link_libraries(nfsstore PUBLIC storeinfra storedevice)
target_link_libraries(nfsstore PUBLIC storeinfra storedevice storetask)

file(GLOB_RECURSE UCMSTORE_NFS_CPY_SOURCE_FILES "./cpy/*.cc")
pybind11_add_module(ucmnfsstore ${UCMSTORE_NFS_CPY_SOURCE_FILES})
Expand Down
Loading