Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/ops.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ These are fundamental asynchronous operations. Many of them are asynchronous ver
- `condy::async_fgetxattr()`
- `condy::async_fsetxattr()`
- `condy::async_socket()`
- `condy::async_uring_cmd()`
- `condy::async_uring_cmd128()`
- `condy::async_cmd_sock()`
- `condy::async_cmd_getsockname()`
- `condy::async_waitid()`
Expand Down Expand Up @@ -126,6 +128,8 @@ These asynchronous operations accept the index of a file registered with the ker
- `condy::async_recv_multishot()`
- `condy::async_shutdown()`
- `condy::async_sync_file_range()`
- `condy::async_uring_cmd()`
- `condy::async_uring_cmd128()`
- `condy::async_cmd_sock()`
- `condy::async_cmd_getsockname()`
- `condy::async_ftruncate()`
Expand Down
45 changes: 45 additions & 0 deletions include/condy/async_operations.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "condy/awaiter_operations.hpp"
#include "condy/concepts.hpp"
#include "condy/condy_uring.hpp"
#include "condy/cqe_handler.hpp"
#include "condy/helpers.hpp"
#include "condy/provided_buffers.hpp"

Expand Down Expand Up @@ -850,6 +851,50 @@ inline auto async_socket_direct(int domain, int type, int protocol,
protocol, file_index, flags);
}

#if !IO_URING_CHECK_VERSION(2, 13) // >= 2.13
/**
* @brief See io_uring_prep_uring_cmd
* @tparam CQEHandler Custom CQE handler for specific result processing.
* @param cmd_func Function to configure sqe for specific command. Signature:
* void(io_uring_sqe *sqe).
*/
template <CQEHandlerLike CQEHandler = SimpleCQEHandler, FdLike Fd,
typename CmdFunc>
inline auto async_uring_cmd(int cmd_op, Fd fd, CmdFunc &&cmd_func) {
auto prep_func = [cmd_op, fd,
cmd_func = std::forward<CmdFunc>(cmd_func)](Ring *ring) {
auto *sqe = ring->get_sqe();
io_uring_prep_uring_cmd(sqe, cmd_op, fd);
cmd_func(sqe);
return sqe;
};
auto op = build_op_awaiter<CQEHandler>(std::move(prep_func));
return detail::maybe_flag_fixed_fd(std::move(op), fd);
}
#endif

#if !IO_URING_CHECK_VERSION(2, 13) // >= 2.13
/**
* @brief See io_uring_prep_uring_cmd128
* @tparam CQEHandler Custom CQE handler for specific result processing.
* @param cmd_func Function to configure sqe for specific command. Signature:
* void(io_uring_sqe *sqe).
*/
template <CQEHandlerLike CQEHandler = SimpleCQEHandler, FdLike Fd,
typename CmdFunc>
inline auto async_uring_cmd128(int cmd_op, Fd fd, CmdFunc &&cmd_func) {
auto prep_func = [cmd_op, fd,
cmd_func = std::forward<CmdFunc>(cmd_func)](Ring *ring) {
auto *sqe = ring->get_sqe128();
io_uring_prep_uring_cmd128(sqe, cmd_op, fd);
cmd_func(sqe);
return sqe;
};
auto op = build_op_awaiter<CQEHandler>(std::move(prep_func));
return detail::maybe_flag_fixed_fd(std::move(op), fd);
}
#endif

#if !IO_URING_CHECK_VERSION(2, 5) // >= 2.5
/**
* @brief See io_uring_prep_cmd_sock
Expand Down
131 changes: 131 additions & 0 deletions tests/test_async_operations.4.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "condy/buffers.hpp"
#include "condy/runtime.hpp"
#include "condy/runtime_options.hpp"
#include "condy/sync_wait.hpp"
#include "helpers.hpp"
#include <cerrno>
Expand Down Expand Up @@ -267,6 +268,136 @@ TEST_CASE("test async_operations - test socket - direct") {
condy::sync_wait(func());
}

#if !IO_URING_CHECK_VERSION(2, 13) // >= 2.13
TEST_CASE("test async_operations - test uring_cmd - basic") {
auto my_async_cmd_sock = [](int cmd_op, int fd, int level, int optname,
void *optval, int optlen) {
return condy::async_uring_cmd(cmd_op, fd, [=](io_uring_sqe *sqe) {
sqe->optval = (unsigned long)(uintptr_t)optval;
sqe->optname = optname;
sqe->optlen = optlen;
sqe->level = level;
});
};

int listen_fd = socket(AF_INET, SOCK_STREAM, 0);
REQUIRE(listen_fd >= 0);

Comment thread
wokron marked this conversation as resolved.
auto func = [&]() -> condy::Coro<void> {
int val = 1;
int r = co_await my_async_cmd_sock(SOCKET_URING_OP_SETSOCKOPT,
listen_fd, SOL_SOCKET, SO_REUSEADDR,
&val, sizeof(val));
REQUIRE(r == 0);
};
condy::sync_wait(func());

close(listen_fd);
}
#endif

#if !IO_URING_CHECK_VERSION(2, 13) // >= 2.13
TEST_CASE("test async_operations - test uring_cmd - fixed fd") {
auto my_async_cmd_sock_fixed = [](int cmd_op, int fixed_fd, int level,
int optname, void *optval, int optlen) {
return condy::async_uring_cmd(
cmd_op, condy::fixed(fixed_fd), [=](io_uring_sqe *sqe) {
sqe->optval = (unsigned long)(uintptr_t)optval;
sqe->optname = optname;
sqe->optlen = optlen;
sqe->level = level;
});
};

int listen_fd = create_accept_socket();

Comment thread
wokron marked this conversation as resolved.
auto func = [&]() -> condy::Coro<void> {
auto &fd_table = condy::current_runtime().fd_table();
fd_table.init(1);
int r = co_await condy::async_files_update(&listen_fd, 1, 0);
REQUIRE(r == 1);

int val = 1;
r = co_await my_async_cmd_sock_fixed(SOCKET_URING_OP_SETSOCKOPT, 0,
SOL_SOCKET, SO_REUSEADDR, &val,
sizeof(val));
REQUIRE(r == 0);
};

condy::sync_wait(func());

close(listen_fd);
}
#endif

#if !IO_URING_CHECK_VERSION(2, 13) // >= 2.13
TEST_CASE("test async_operations - test uring_cmd128 - basic") {
// TODO: Use nvme cmd instead of socket cmd
condy::Runtime runtime(
condy::RuntimeOptions().enable_sqe_mixed().enable_cqe_mixed());
auto my_async_cmd_sock = [](int cmd_op, int fd, int level, int optname,
void *optval, int optlen) {
return condy::async_uring_cmd128(cmd_op, fd, [=](io_uring_sqe *sqe) {
sqe->optval = (unsigned long)(uintptr_t)optval;
sqe->optname = optname;
sqe->optlen = optlen;
sqe->level = level;
});
};

int listen_fd = socket(AF_INET, SOCK_STREAM, 0);
REQUIRE(listen_fd >= 0);

Comment thread
wokron marked this conversation as resolved.
auto func = [&]() -> condy::Coro<void> {
int val = 1;
int r = co_await my_async_cmd_sock(SOCKET_URING_OP_SETSOCKOPT,
listen_fd, SOL_SOCKET, SO_REUSEADDR,
&val, sizeof(val));
REQUIRE(r == 0);
};
condy::sync_wait(runtime, func());

close(listen_fd);
}
#endif

#if !IO_URING_CHECK_VERSION(2, 13) // >= 2.13
TEST_CASE("test async_operations - test uring_cmd128 - fixed fd") {
// TODO: Use nvme cmd instead of socket cmd
condy::Runtime runtime(
condy::RuntimeOptions().enable_sqe_mixed().enable_cqe_mixed());
auto my_async_cmd_sock_fixed = [](int cmd_op, int fixed_fd, int level,
int optname, void *optval, int optlen) {
return condy::async_uring_cmd128(
cmd_op, condy::fixed(fixed_fd), [=](io_uring_sqe *sqe) {
sqe->optval = (unsigned long)(uintptr_t)optval;
sqe->optname = optname;
sqe->optlen = optlen;
sqe->level = level;
});
};

int listen_fd = create_accept_socket();

Comment thread
wokron marked this conversation as resolved.
auto func = [&]() -> condy::Coro<void> {
auto &fd_table = condy::current_runtime().fd_table();
fd_table.init(1);
int r = co_await condy::async_files_update(&listen_fd, 1, 0);
REQUIRE(r == 1);

int val = 1;
r = co_await my_async_cmd_sock_fixed(SOCKET_URING_OP_SETSOCKOPT, 0,
SOL_SOCKET, SO_REUSEADDR, &val,
sizeof(val));
REQUIRE(r == 0);
};

condy::sync_wait(runtime, func());

close(listen_fd);
}
#endif

#if !IO_URING_CHECK_VERSION(2, 5) // >= 2.5
TEST_CASE("test async_operations - test cmd_sock - basic") {
int listen_fd = socket(AF_INET, SOCK_STREAM, 0);
Expand Down
Loading