Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion include/condy/awaiters.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ class OpAwaiterBase {
prep_func_(sqe);
sqe->flags |= static_cast<uint8_t>(flags);
io_uring_sqe_set_data(
sqe, encode_work(&finish_handle_.get(), Handle::work_type));
sqe, encode_work(&finish_handle_.get(), WorkType::Common));
}

protected:
Expand Down
9 changes: 5 additions & 4 deletions include/condy/concepts.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@ concept HandleLike = requires(T handle, Invoker *invoker) {
};

template <typename T>
concept OpFinishHandleLike = HandleLike<T> && requires(T handle) {
{ handle.invoke() } -> std::same_as<void>;
{ handle.set_result(0, 0) } -> std::same_as<void>;
};
concept OpFinishHandleLike =
HandleLike<T> && requires(T handle, io_uring_cqe *cqe) {
{ handle.invoke() } -> std::same_as<void>;
{ handle.handle_cqe(cqe) } -> std::same_as<typename T::Action>;
};

template <typename T>
concept AwaiterLike = requires(T awaiter) {
Expand Down
174 changes: 109 additions & 65 deletions include/condy/finish_handles.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,30 +22,32 @@
#include <variant>
#include <vector>

// Cancel method for finish handles
#define DEFINE_CANCEL_METHOD_() \
void cancel() { \
auto *ring = detail::Context::current().ring(); \
io_uring_sqe *sqe = ring->get_sqe(); \
io_uring_prep_cancel(sqe, encode_work(this, work_type), 0); \
io_uring_sqe_set_data(sqe, encode_work(nullptr, WorkType::Ignore)); \
io_uring_sqe_set_flags(sqe, IOSQE_CQE_SKIP_SUCCESS); \
}

namespace condy {

class Ring;

class OpFinishHandle : public InvokerAdapter<OpFinishHandle, WorkInvoker> {
public:
static constexpr WorkType work_type = WorkType::Common;
using ReturnType = int32_t;
struct Action {
bool queue_work;
bool op_finish;
};
using HandleCQEFunc = Action (*)(void *, io_uring_cqe *);

DEFINE_CANCEL_METHOD_();
void cancel() {
auto *ring = detail::Context::current().ring();
io_uring_sqe *sqe = ring->get_sqe();
io_uring_prep_cancel(sqe, this, 0);
io_uring_sqe_set_data(sqe, encode_work(nullptr, WorkType::Ignore));
io_uring_sqe_set_flags(sqe, IOSQE_CQE_SKIP_SUCCESS);
}

Action handle_cqe(io_uring_cqe *cqe) { return handle_func_(this, cqe); }

void set_result(int32_t res, uint32_t flags) {
res_ = res;
flags_ = flags;
Action handle_cqe_impl(io_uring_cqe *cqe) {
res_ = cqe->res;
return {.queue_work = true, .op_finish = true};
}

void invoke() {
Expand All @@ -57,93 +59,120 @@ class OpFinishHandle : public InvokerAdapter<OpFinishHandle, WorkInvoker> {

void set_invoker(Invoker *invoker) { invoker_ = invoker; }

protected:
int32_t res_ = -ENOTRECOVERABLE; // Internal error if not set
uint32_t flags_ = 0;
Invoker *invoker_ = nullptr;
};

class ExtendOpFinishHandle : public OpFinishHandle {
public:
using ExtendFunc = void (*)(void *, int32_t);

void invoke_extend(int32_t res) {
assert(extend_func_ != nullptr);
extend_func_(this, res);
private:
static Action handle_cqe_static_(void *data, io_uring_cqe *cqe) {
auto *self = static_cast<OpFinishHandle *>(data);
return self->handle_cqe_impl(cqe);
}

protected:
ExtendFunc extend_func_ = nullptr;
HandleCQEFunc handle_func_ = handle_cqe_static_;
Invoker *invoker_ = nullptr;
int32_t res_ = -ENOTRECOVERABLE; // Internal error if not set
};

template <typename Func, OpFinishHandleLike HandleBase>
class MultiShotMixin : public HandleBase {
public:
static constexpr WorkType work_type = WorkType::MultiShot;

template <typename... Args>
MultiShotMixin(Func func, Args &&...args)
: HandleBase(std::forward<Args>(args)...), func_(std::move(func)) {
this->extend_func_ = [](void *data, int32_t) {
auto *self = static_cast<MultiShotMixin<Func, HandleBase> *>(data);
self->invoke_multishot_();
};
this->handle_func_ = handle_cqe_static_;
}

DEFINE_CANCEL_METHOD_();
OpFinishHandle::Action
handle_cqe_impl(io_uring_cqe *cqe) /* fake override */ {
if (cqe->flags & IORING_CQE_F_MORE) {
HandleBase::handle_cqe_impl(cqe);
func_(HandleBase::extract_result());
return {.queue_work = false, .op_finish = false};
} else {
HandleBase::handle_cqe_impl(cqe);
return {.queue_work = true, .op_finish = true};
}
}

private:
void invoke_multishot_() { func_(HandleBase::extract_result()); }
static OpFinishHandle::Action handle_cqe_static_(void *data,
io_uring_cqe *cqe) {
auto *self = static_cast<MultiShotMixin *>(data);
return self->handle_cqe_impl(cqe);
}

private:
protected:
Func func_;
};

template <typename MultiShotFunc>
using MultiShotOpFinishHandle =
MultiShotMixin<MultiShotFunc, ExtendOpFinishHandle>;
using MultiShotOpFinishHandle = MultiShotMixin<MultiShotFunc, OpFinishHandle>;

template <typename Func, OpFinishHandleLike HandleBase>
class ZeroCopyMixin : public HandleBase {
public:
static constexpr WorkType work_type = WorkType::ZeroCopy;

template <typename... Args>
ZeroCopyMixin(Func func, Args &&...args)
: HandleBase(std::forward<Args>(args)...), free_func_(std::move(func)) {
this->func_ = [](void *data) {
auto *self = static_cast<ZeroCopyMixin<Func, HandleBase> *>(data);
self->invoke();
};
this->extend_func_ = [](void *data, int32_t res) {
auto *self = static_cast<ZeroCopyMixin<Func, HandleBase> *>(data);
self->invoke_notify_(res);
};
this->func_ = invoke_static_;
this->handle_func_ = handle_cqe_static_;
}

void invoke() /* fake override */ {
assert(this->invoker_ != nullptr);
(*this->invoker_)();
resumed_ = true;
if (notified_) {
free_func_(notify_res_);
delete this;
}
// Invocation of free_func_ should be delayed until the operation is
// finished since user may adjust the behavior of free_func_ based on
// the result of the operation.
maybe_free_();
}

DEFINE_CANCEL_METHOD_();
OpFinishHandle::Action
handle_cqe_impl(io_uring_cqe *cqe) /* fake override */ {
if (cqe->flags & IORING_CQE_F_MORE) {
HandleBase::handle_cqe_impl(cqe);
return {.queue_work = true, .op_finish = false};
} else {
if (cqe->flags & IORING_CQE_F_NOTIF) {
notify_(cqe->res);
return {.queue_work = false, .op_finish = true};
} else {
// Only one cqe means the operation is finished without
// notification. This is rare but possible.
// https://github.com/axboe/liburing/issues/1462
notify_(0);
HandleBase::handle_cqe_impl(cqe);
return {.queue_work = true, .op_finish = true};
}
}
}

private:
void invoke_notify_(int32_t res) {
notify_res_ = res;
notified_ = true;
if (resumed_) {
void maybe_free_() {
if (resumed_ && notified_) {
free_func_(notify_res_);
delete this;
}
}

private:
void notify_(int32_t res) {
assert(res != -ENOTRECOVERABLE);
notify_res_ = res;
notified_ = true;
maybe_free_();
}

static void invoke_static_(void *data) {
auto *self = static_cast<ZeroCopyMixin *>(data);
self->invoke();
}

static OpFinishHandle::Action handle_cqe_static_(void *data,
io_uring_cqe *cqe) {
auto *self = static_cast<ZeroCopyMixin *>(data);
return self->handle_cqe_impl(cqe);
}

protected:
Func free_func_;
int32_t notify_res_ = -ENOTRECOVERABLE;
// Use these flags to handle race between invoke and notify
Expand All @@ -152,7 +181,7 @@ class ZeroCopyMixin : public HandleBase {
};

template <typename FreeFunc>
using ZeroCopyOpFinishHandle = ZeroCopyMixin<FreeFunc, ExtendOpFinishHandle>;
using ZeroCopyOpFinishHandle = ZeroCopyMixin<FreeFunc, OpFinishHandle>;

template <BufferRingLike Br, OpFinishHandleLike HandleBase>
class SelectBufferMixin : public HandleBase {
Expand All @@ -161,7 +190,16 @@ class SelectBufferMixin : public HandleBase {

template <typename... Args>
SelectBufferMixin(Br *buffers, Args &&...args)
: HandleBase(std::forward<Args>(args)...), buffers_(buffers) {}
: HandleBase(std::forward<Args>(args)...), buffers_(buffers) {
this->handle_func_ = handle_cqe_static_;
}

OpFinishHandle::Action
handle_cqe_impl(io_uring_cqe *cqe) /* fake override */ {
HandleBase::handle_cqe_impl(cqe);
this->flags_ = cqe->flags;
return {.queue_work = true, .op_finish = true};
}

ReturnType extract_result() {
int res = this->res_;
Expand All @@ -170,6 +208,14 @@ class SelectBufferMixin : public HandleBase {
}

private:
static OpFinishHandle::Action handle_cqe_static_(void *data,
io_uring_cqe *cqe) {
auto *self = static_cast<SelectBufferMixin *>(data);
return self->handle_cqe_impl(cqe);
}

protected:
uint32_t flags_ = 0;
Br *buffers_;
};

Expand All @@ -178,7 +224,7 @@ using SelectBufferOpFinishHandle = SelectBufferMixin<Br, OpFinishHandle>;

template <typename MultiShotFunc, BufferRingLike Br>
using MultiShotSelectBufferOpFinishHandle =
MultiShotMixin<MultiShotFunc, SelectBufferMixin<Br, ExtendOpFinishHandle>>;
MultiShotMixin<MultiShotFunc, SelectBufferMixin<Br, OpFinishHandle>>;

template <bool Cancel, HandleLike Handle> class RangedParallelFinishHandle {
public:
Expand Down Expand Up @@ -437,6 +483,4 @@ class WhenAnyFinishHandle : public ParallelAnyFinishHandle<Handles...> {
}
};

#undef DEFINE_CANCEL_METHOD_

} // namespace condy
30 changes: 5 additions & 25 deletions include/condy/runtime.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -361,35 +361,15 @@ class Runtime {
tsan_acquire(data);
local_queue_.push_back(work);
}
} else if (type == WorkType::MultiShot) {
auto *handle = static_cast<ExtendOpFinishHandle *>(data);
handle->set_result(cqe->res, cqe->flags);
if (cqe->flags & IORING_CQE_F_MORE) {
handle->invoke_extend(0); // res not used here
} else {
} else if (type == WorkType::Common) {
auto *handle = static_cast<OpFinishHandle *>(data);
auto action = handle->handle_cqe(cqe);
if (action.op_finish) {
pending_works_--;
local_queue_.push_back(handle);
}
} else if (type == WorkType::ZeroCopy) {
auto *handle = static_cast<ExtendOpFinishHandle *>(data);
if (cqe->flags & IORING_CQE_F_MORE) {
handle->set_result(cqe->res, cqe->flags);
if (action.queue_work) {
local_queue_.push_back(handle);
} else {
pending_works_--;
if (cqe->flags & IORING_CQE_F_NOTIF) {
handle->invoke_extend(cqe->res);
} else {
handle->set_result(cqe->res, cqe->flags);
local_queue_.push_back(handle);
handle->invoke_extend(0);
}
}
} else if (type == WorkType::Common) {
auto *handle = static_cast<OpFinishHandle *>(data);
handle->set_result(cqe->res, cqe->flags);
pending_works_--;
local_queue_.push_back(handle);
} else {
assert(false && "Invalid work type");
}
Expand Down
2 changes: 0 additions & 2 deletions include/condy/work_type.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ enum class WorkType : uint8_t {
Notify,
SendFd,
Schedule,
MultiShot,
ZeroCopy,
};

inline std::pair<void *, WorkType> decode_work(void *ptr) {
Expand Down
Loading
Loading