Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 2 additions & 9 deletions include/condy/concepts.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,13 @@ concept PrepFuncLike = requires(T prep_func, Ring *ring) {

template <typename T>
concept CQEHandlerLike = requires(T handler, io_uring_cqe *cqe) {
typename std::decay_t<T>::ReturnType;
{ handler.handle_cqe(cqe) } noexcept -> std::same_as<void>;
{
handler.extract_result()
} noexcept -> std::same_as<typename std::decay_t<T>::ReturnType>;
{ handler(cqe) } noexcept;
};
Comment thread
wokron marked this conversation as resolved.

template <typename T>
concept BufferRingLike = requires(T br, io_uring_cqe *cqe) {
typename std::decay_t<T>::ReturnType;
{ br.bgid() } -> std::same_as<uint16_t>;
{
br.handle_finish(cqe)
} -> std::same_as<typename std::decay_t<T>::ReturnType>;
{ br.handle_finish(cqe) };
};

template <typename T>
Expand Down
73 changes: 19 additions & 54 deletions include/condy/cqe_handler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@
* @file cqe_handler.hpp
* @brief Definitions of CQE handlers
* @details This file defines a series of CQE handlers, which are responsible
* for processing the completion of asynchronous operations. Each handler
* defines a `handle_cqe` method to process the CQE and an `extract_result`
* method to retrieve the result of the operation.
* for processing the completion of asynchronous operations.
*/

#pragma once
Expand Down Expand Up @@ -45,43 +43,27 @@ inline bool check_cqe32([[maybe_unused]] io_uring_cqe *cqe) {
* @return int32_t The result of the operation, which is the value of `cqe->res`
* for the corresponding CQE.
*/
class SimpleCQEHandler {
public:
using ReturnType = int32_t;

void handle_cqe(io_uring_cqe *cqe) noexcept { res_ = cqe->res; }

ReturnType extract_result() noexcept { return res_; }

private:
int32_t res_ = -ENOTRECOVERABLE; // Internal error if not set
struct SimpleCQEHandler {
int32_t operator()(io_uring_cqe *cqe) noexcept { return cqe->res; }
};

/**
* @brief A CQE handler that returns the selected buffers based on the result of
* the CQE.
* @tparam Br The buffer ring type
* @return std::pair<int32_t, typename Br::ReturnType> A pair containing the
* result of the operation (the value of `cqe->res`) and the selected buffers.
* @return std::pair<int32_t, BufferType> A pair containing the
* result of the operation (the value of `cqe->res`) and the selected buffer,
* whose type is determined by the buffer ring.
*/
template <BufferRingLike Br> class SelectBufferCQEHandler {
public:
using ReturnType = std::pair<int32_t, typename Br::ReturnType>;

SelectBufferCQEHandler(Br *buffers) : buffers_(buffers) {}

void handle_cqe(io_uring_cqe *cqe) noexcept {
res_ = cqe->res;
buffer_ = buffers_->handle_finish(cqe);
auto operator()(io_uring_cqe *cqe) noexcept {
return std::make_pair(cqe->res, buffers_->handle_finish(cqe));
}

ReturnType extract_result() noexcept { return {res_, std::move(buffer_)}; }

private:
using BufferType = typename Br::ReturnType;

int32_t res_ = -ENOTRECOVERABLE; // Internal error if not set
BufferType buffer_ = {};
Br *buffers_;
};

Expand All @@ -91,22 +73,12 @@ template <BufferRingLike Br> class SelectBufferCQEHandler {
* @return std::pair<int32_t, uint64_t> A pair containing the status and result
* of the NVMe command.
*/
class NVMePassthruCQEHandler {
public:
using ReturnType = std::pair<int32_t, uint64_t>;

void handle_cqe(io_uring_cqe *cqe) noexcept {
struct NVMePassthruCQEHandler {
std::pair<int32_t, uint64_t> operator()(io_uring_cqe *cqe) noexcept {
assert(detail::check_cqe32(cqe) &&
"Expected big CQE for NVMe passthrough");
res_ = cqe->res;
nvme_result_ = cqe->big_cqe[0];
return {cqe->res, cqe->big_cqe[0]};
}

ReturnType extract_result() noexcept { return {res_, nvme_result_}; }

private:
int32_t res_ = -ENOTRECOVERABLE; // Internal error if not set
uint64_t nvme_result_ = 0;
};

#if !IO_URING_CHECK_VERSION(2, 12) // >= 2.12
Expand Down Expand Up @@ -138,25 +110,18 @@ struct TxTimestampResult {
* @return std::pair<int32_t, TxTimestampResult> Result of the TX timestamp
* operation.
*/
class TxTimestampCQEHandler {
public:
using ReturnType = std::pair<int32_t, TxTimestampResult>;

void handle_cqe(io_uring_cqe *cqe) noexcept {
struct TxTimestampCQEHandler {
std::pair<int32_t, TxTimestampResult>
operator()(io_uring_cqe *cqe) noexcept {
assert(detail::check_cqe32(cqe) &&
"Expected big CQE for TX timestamp operations");
res_ = cqe->res;
result_.tstype =
TxTimestampResult result;
result.tstype =
static_cast<int>(cqe->flags >> IORING_TIMESTAMP_TYPE_SHIFT);
result_.hwts = cqe->flags & IORING_CQE_F_TSTAMP_HW;
result_.ts = *reinterpret_cast<io_timespec *>(cqe + 1);
result.hwts = cqe->flags & IORING_CQE_F_TSTAMP_HW;
result.ts = *reinterpret_cast<io_timespec *>(cqe + 1);
return {cqe->res, result};
}

ReturnType extract_result() noexcept { return {res_, result_}; }

private:
int32_t res_ = -ENOTRECOVERABLE; // Internal error if not set
TxTimestampResult result_;
};
#endif

Expand Down
29 changes: 11 additions & 18 deletions include/condy/finish_handles.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,8 @@ namespace condy {
template <CQEHandlerLike CQEHandler, typename Receiver>
class OpFinishHandle : public OpFinishHandleBase {
public:
using ReturnType = typename CQEHandler::ReturnType;

template <typename... Args>
OpFinishHandle(Receiver receiver, Args &&...args)
: cqe_handler_(std::forward<Args>(args)...),
receiver_(std::move(receiver)) {
OpFinishHandle(CQEHandler cqe_handler, Receiver receiver)
: cqe_handler_(std::move(cqe_handler)), receiver_(std::move(receiver)) {
this->handle_func_ = handle_static_;
}

Expand Down Expand Up @@ -64,8 +60,7 @@ class OpFinishHandle : public OpFinishHandleBase {
protected:
void finish_(io_uring_cqe *cqe) noexcept {
stop_callback_.reset();
cqe_handler_.handle_cqe(cqe);
std::move(receiver_)(cqe_handler_.extract_result());
std::move(receiver_)(cqe_handler_(cqe));
}

CQEHandler cqe_handler_;
Expand All @@ -76,10 +71,10 @@ class OpFinishHandle : public OpFinishHandleBase {
template <CQEHandlerLike CQEHandler, typename Func, typename Receiver>
class MultiShotOpFinishHandle : public OpFinishHandle<CQEHandler, Receiver> {
public:
template <typename... Args>
MultiShotOpFinishHandle(Receiver receiver, Func func, Args &&...args)
: OpFinishHandle<CQEHandler, Receiver>(std::move(receiver),
std::forward<Args>(args)...),
MultiShotOpFinishHandle(CQEHandler cqe_handler, Receiver receiver,
Func func)
: OpFinishHandle<CQEHandler, Receiver>(std::move(cqe_handler),
std::move(receiver)),
func_(std::move(func)) {
this->handle_func_ = handle_static_;
}
Expand All @@ -93,8 +88,7 @@ class MultiShotOpFinishHandle : public OpFinishHandle<CQEHandler, Receiver> {
bool handle_impl_(io_uring_cqe *cqe) noexcept
/* fake override */ {
if (cqe->flags & IORING_CQE_F_MORE) {
this->cqe_handler_.handle_cqe(cqe);
func_(this->cqe_handler_.extract_result());
func_(this->cqe_handler_(cqe));
return false;
} else {
this->finish_(cqe);
Expand All @@ -109,10 +103,9 @@ class MultiShotOpFinishHandle : public OpFinishHandle<CQEHandler, Receiver> {
template <CQEHandlerLike CQEHandler, typename Func, typename Receiver>
class ZeroCopyOpFinishHandle : public OpFinishHandle<CQEHandler, Receiver> {
public:
template <typename... Args>
ZeroCopyOpFinishHandle(Receiver receiver, Func func, Args &&...args)
: OpFinishHandle<CQEHandler, Receiver>(std::move(receiver),
std::forward<Args>(args)...),
ZeroCopyOpFinishHandle(CQEHandler cqe_handler, Receiver receiver, Func func)
: OpFinishHandle<CQEHandler, Receiver>(std::move(cqe_handler),
std::move(receiver)),
free_func_(std::move(func)) {
this->handle_func_ = handle_static_;
}
Expand Down
9 changes: 3 additions & 6 deletions include/condy/op_states.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,12 @@
namespace condy {
namespace detail {

template <typename Handle, PrepFuncLike Func, typename Receiver>
class OpSenderOperationState {
template <typename Handle, PrepFuncLike Func> class OpSenderOperationState {
public:
template <typename... HandleArgs>
OpSenderOperationState(Func prep_func, Receiver receiver,
HandleArgs &&...handle_args)
OpSenderOperationState(Func prep_func, HandleArgs &&...handle_args)
: prep_func_(std::move(prep_func)),
finish_handle_(std::move(receiver),
std::forward<HandleArgs>(handle_args)...) {}
finish_handle_(std::forward<HandleArgs>(handle_args)...) {}

OpSenderOperationState(OpSenderOperationState &&) = delete;
OpSenderOperationState &operator=(OpSenderOperationState &&) = delete;
Expand Down
20 changes: 7 additions & 13 deletions include/condy/provided_buffers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@ namespace detail {

class BundledProvidedBufferQueue {
public:
using ReturnType = BufferInfo;

BundledProvidedBufferQueue(uint32_t capacity, unsigned int flags)
: capacity_(std::bit_ceil(capacity)), buf_lens_(capacity_, 0) {
auto &context = detail::Context::current();
Expand Down Expand Up @@ -137,18 +135,18 @@ class BundledProvidedBufferQueue {
public:
uint16_t bgid() const noexcept { return bgid_; }

ReturnType handle_finish(io_uring_cqe *cqe) noexcept {
BufferInfo handle_finish(io_uring_cqe *cqe) noexcept {
assert(cqe != nullptr);
int32_t res = cqe->res;
uint32_t flags = cqe->flags;

if (!(flags & IORING_CQE_F_BUFFER)) {
return ReturnType{0, 0};
return BufferInfo{0, 0};
}

assert(res > 0);

ReturnType result = {
BufferInfo result = {
.bid = static_cast<uint16_t>(flags >> IORING_CQE_BUFFER_SHIFT),
.num_buffers = 0,
};
Expand Down Expand Up @@ -209,7 +207,7 @@ class ProvidedBufferQueue : public detail::BundledProvidedBufferQueue {
ProvidedBufferQueue(uint32_t capacity, unsigned int flags = 0)
: BundledProvidedBufferQueue(capacity, flags) {}

ReturnType handle_finish(io_uring_cqe *cqe) noexcept {
BufferInfo handle_finish(io_uring_cqe *cqe) noexcept {
assert(cqe != nullptr);
auto result = BundledProvidedBufferQueue::handle_finish(cqe);
assert(result.num_buffers <= 1);
Expand Down Expand Up @@ -285,8 +283,6 @@ namespace detail {

class BundledProvidedBufferPool {
public:
using ReturnType = std::vector<ProvidedBuffer>;

BundledProvidedBufferPool(uint32_t num_buffers, size_t buffer_size,
unsigned int flags)
: num_buffers_(std::bit_ceil(num_buffers)), buffer_size_(buffer_size) {
Expand Down Expand Up @@ -361,7 +357,7 @@ class BundledProvidedBufferPool {
public:
uint16_t bgid() const noexcept { return bgid_; }

ReturnType handle_finish(io_uring_cqe *cqe) noexcept {
std::vector<ProvidedBuffer> handle_finish(io_uring_cqe *cqe) noexcept {
assert(cqe != nullptr);
int32_t res = cqe->res;
uint32_t flags = cqe->flags;
Expand Down Expand Up @@ -464,8 +460,6 @@ inline void ProvidedBuffer::reset() noexcept {
*/
class ProvidedBufferPool : public detail::BundledProvidedBufferPool {
public:
using ReturnType = ProvidedBuffer;

/**
* @brief Construct a new ProvidedBufferPool object in current Runtime.
* @param num_buffers Number of buffers to allocate in the pool.
Expand All @@ -478,11 +472,11 @@ class ProvidedBufferPool : public detail::BundledProvidedBufferPool {
: BundledProvidedBufferPool(num_buffers, buffer_size, flags) {}

public:
ReturnType handle_finish(io_uring_cqe *cqe) noexcept {
ProvidedBuffer handle_finish(io_uring_cqe *cqe) noexcept {
assert(cqe != nullptr);
auto buffers = BundledProvidedBufferPool::handle_finish(cqe);
if (buffers.empty()) {
return ReturnType();
return ProvidedBuffer();
}
assert(buffers.size() == 1);
return std::move(buffers[0]);
Expand Down
19 changes: 10 additions & 9 deletions include/condy/sender_operations.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,30 +13,31 @@ namespace condy {

template <CQEHandlerLike CQEHandler, PrepFuncLike PrepFunc, typename... Args>
auto build_op_sender(PrepFunc &&prep_func, Args &&...args) {
return OpSender<std::decay_t<PrepFunc>, CQEHandler, std::decay_t<Args>...>(
std::forward<PrepFunc>(prep_func), std::forward<Args>(args)...);
return OpSender<std::decay_t<PrepFunc>, CQEHandler>(
std::forward<PrepFunc>(prep_func),
CQEHandler(std::forward<Args>(args)...));
}
Comment thread
wokron marked this conversation as resolved.

template <CQEHandlerLike CQEHandler, PrepFuncLike PrepFunc,
typename MultiShotFunc, typename... Args>
auto build_multishot_op_sender(PrepFunc &&func, MultiShotFunc &&multishot_func,
Args &&...handler_args) {
return MultiShotOpSender<std::decay_t<PrepFunc>, CQEHandler,
std::decay_t<MultiShotFunc>,
std::decay_t<Args>...>(
std::decay_t<MultiShotFunc>>(
std::forward<PrepFunc>(func),
std::forward<MultiShotFunc>(multishot_func),
std::forward<Args>(handler_args)...);
CQEHandler(std::forward<Args>(handler_args)...),
std::forward<MultiShotFunc>(multishot_func));
Comment thread
wokron marked this conversation as resolved.
}

template <CQEHandlerLike CQEHandler, PrepFuncLike PrepFunc, typename FreeFunc,
typename... Args>
auto build_zero_copy_op_sender(PrepFunc &&func, FreeFunc &&free_func,
Args &&...handler_args) {
return ZeroCopyOpSender<std::decay_t<PrepFunc>, CQEHandler,
std::decay_t<FreeFunc>, std::decay_t<Args>...>(
std::forward<PrepFunc>(func), std::forward<FreeFunc>(free_func),
std::forward<Args>(handler_args)...);
std::decay_t<FreeFunc>>(
std::forward<PrepFunc>(func),
CQEHandler(std::forward<Args>(handler_args)...),
std::forward<FreeFunc>(free_func));
Comment thread
wokron marked this conversation as resolved.
}

namespace detail {
Expand Down
Loading
Loading