From 6ec99c85d26f4fa607589a6387d980b742bb42dd Mon Sep 17 00:00:00 2001 From: wokron Date: Sun, 11 Jan 2026 19:19:33 +0800 Subject: [PATCH 1/2] add link-cp --- examples/CMakeLists.txt | 4 +- examples/fast-cp.cpp | 135 --------------------------- examples/link-cp.cpp | 202 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 204 insertions(+), 137 deletions(-) delete mode 100644 examples/fast-cp.cpp create mode 100644 examples/link-cp.cpp diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 75b89463..d7257237 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -1,8 +1,8 @@ add_executable(echo-server echo-server.cpp) target_link_libraries(echo-server PRIVATE condy uring) -add_executable(fast-cp fast-cp.cpp) -target_link_libraries(fast-cp PRIVATE condy uring) +add_executable(link-cp link-cp.cpp) +target_link_libraries(link-cp PRIVATE condy uring) add_executable(custom-allocator custom-allocator.cpp) target_link_libraries(custom-allocator PRIVATE condy uring) diff --git a/examples/fast-cp.cpp b/examples/fast-cp.cpp deleted file mode 100644 index 4772fc6f..00000000 --- a/examples/fast-cp.cpp +++ /dev/null @@ -1,135 +0,0 @@ -#include "condy/runtime.hpp" -#include -#include -#include -#include -#include -#include -#include - -constexpr size_t TASK_NUM = 64; -constexpr size_t CHUNK_SIZE = 128 * 1024l; - -condy::Coro<> copy_file_task(off_t &offset, off_t file_size, void *ptr) { - using condy::operators::operator>>; - - auto buffer = condy::buffer(ptr, CHUNK_SIZE); - - auto fixed_buffer = condy::fixed(0, buffer); - auto infd = condy::fixed(0); - auto outfd = condy::fixed(1); - - while (offset < file_size) { - off_t current_offset = offset; - offset += CHUNK_SIZE; - - auto [r1, r2] = - co_await (condy::async_read(infd, fixed_buffer, current_offset) >> - condy::async_write(outfd, fixed_buffer, current_offset)); - if (r1 < 0) { - std::fprintf(stderr, "Read error at offset %lld: %s\n", - (long long)current_offset, std::strerror(-r1)); - exit(1); - } - if (static_cast(r1) < CHUNK_SIZE) { - r2 = co_await condy::async_write( - outfd, condy::fixed(0, condy::buffer(ptr, r1)), current_offset); - } - if (r2 < 0) { - std::fprintf(stderr, "Write error at offset %lld: %s\n", - (long long)current_offset, std::strerror(-r2)); - exit(1); - } - } -} - -condy::Coro<> co_main(const char *infile, const char *outfile) { - using condy::operators::operator&&; - - auto start = std::chrono::high_resolution_clock::now(); - - struct statx statx_buf; - auto [infd, outfd, r] = co_await ( - condy::async_openat(AT_FDCWD, infile, O_RDONLY | O_DIRECT, 0) && - condy::async_openat(AT_FDCWD, outfile, - O_WRONLY | O_CREAT | O_TRUNC | O_DIRECT, 0644) && - condy::async_statx(AT_FDCWD, infile, AT_STATX_SYNC_AS_STAT, STATX_SIZE, - &statx_buf)); - if (infd < 0) { - std::fprintf(stderr, "Failed to open input file '%s': %s\n", infile, - std::strerror(-infd)); - exit(1); - } - if (outfd < 0) { - std::fprintf(stderr, "Failed to open output file '%s': %s\n", outfile, - std::strerror(-outfd)); - exit(1); - } - if (r < 0) { - std::fprintf(stderr, "Failed to stat input file '%s': %s\n", infile, - std::strerror(-r)); - exit(1); - } - - off_t file_size = static_cast(statx_buf.stx_size); - off_t offset = 0; - - void *raw_buffer = - mmap(nullptr, TASK_NUM * CHUNK_SIZE, PROT_READ | PROT_WRITE, - MAP_PRIVATE | MAP_ANONYMOUS | MAP_NORESERVE, -1, 0); - if (raw_buffer == MAP_FAILED) { - std::fprintf(stderr, "Failed to allocate aligned buffer: %s\n", - std::strerror(errno)); - exit(1); - } - - auto &fd_table = condy::current_runtime().fd_table(); - fd_table.init(2); - int fds[2] = {infd, outfd}; - fd_table.update(0, fds, 2); - - auto &buffer_table = condy::current_runtime().buffer_table(); - buffer_table.init(1); - iovec iov{ - .iov_base = raw_buffer, - .iov_len = TASK_NUM * CHUNK_SIZE, - }; - buffer_table.update(0, &iov, 1); - - std::vector> tasks; - for (size_t i = 0; i < TASK_NUM; i++) { - char *buffer = static_cast(raw_buffer) + i * CHUNK_SIZE; - tasks.emplace_back( - condy::co_spawn(copy_file_task(offset, file_size, buffer))); - } - - for (auto &task : tasks) { - co_await std::move(task); - } - - co_await (condy::async_close(condy::fixed(0)) && - condy::async_close(condy::fixed(1))); - - munmap(raw_buffer, TASK_NUM * CHUNK_SIZE); - - auto end = std::chrono::high_resolution_clock::now(); - std::chrono::duration elapsed = end - start; - double mb_copied = static_cast(file_size) / (1024.0 * 1024.0); - std::printf("Copied %.2f MiB in %.2f seconds (%.2f MiB/s)\n", mb_copied, - elapsed.count(), mb_copied / elapsed.count()); - co_return; -} - -int main(int argc, char **argv) noexcept(false) { - if (argc < 3) { - std::fprintf(stderr, "Usage: %s \n", argv[0]); - return 1; - } - - auto options = - condy::RuntimeOptions().sq_size(2 * TASK_NUM).cq_size(4 * TASK_NUM); - condy::Runtime runtime(options); - condy::sync_wait(runtime, co_main(argv[1], argv[2])); - - return 0; -} \ No newline at end of file diff --git a/examples/link-cp.cpp b/examples/link-cp.cpp new file mode 100644 index 00000000..db990e44 --- /dev/null +++ b/examples/link-cp.cpp @@ -0,0 +1,202 @@ +#include "condy/async_operations.hpp" +#include "condy/runtime.hpp" +#include +#include +#include +#include +#include + +static size_t task_num = 64; +static size_t chunk_size = 1024 * 1024l; // 1 MB +static bool use_direct = false; + +condy::Coro copy_file_task(size_t task_id, loff_t &offset, + loff_t file_size, void *buffer) { + using condy::operators::operator>>; + + int buffer_index = static_cast(task_id); + + while (offset < file_size) { + loff_t current_offset = offset; + offset += static_cast(chunk_size); + + auto to_copy = std::min(static_cast(chunk_size), + file_size - current_offset); + auto buf = condy::buffer(buffer, to_copy); + + auto aw1 = condy::async_read( + condy::fixed(0), condy::fixed(buffer_index, buf), current_offset); + auto aw2 = condy::async_write( + condy::fixed(1), condy::fixed(buffer_index, buf), current_offset); + auto [r1, r2] = co_await (std::move(aw1) >> std::move(aw2)); + + if (r1 < 0 || r2 < 0) { + std::fprintf(stderr, "Failed to copy at offset %lld: %d %d\n", + (long long)current_offset, r1, r2); + exit(1); + } + } +} + +condy::Coro do_file_copy(int infd, int outfd, loff_t size) { + size_t buffers_size = task_num * chunk_size; + void *raw_buffer; + if (posix_memalign(&raw_buffer, 4096, buffers_size) != 0) { + std::fprintf(stderr, "Failed to allocate aligned buffers: %d\n", errno); + exit(1); + } + + auto &fd_table = condy::current_runtime().fd_table(); + fd_table.init(2); + int fds[2] = {infd, outfd}; + fd_table.update(0, fds, 2); + + auto &buffer_table = condy::current_runtime().buffer_table(); + buffer_table.init(task_num); + std::vector iovs(task_num); + for (size_t i = 0; i < task_num; i++) { + iovs[i] = { + .iov_base = static_cast(raw_buffer) + i * chunk_size, + .iov_len = chunk_size, + }; + } + buffer_table.update(0, iovs.data(), task_num); + + int r_faddvise = co_await condy::async_fadvise( + infd, 0, static_cast(size), POSIX_FADV_SEQUENTIAL); + if (r_faddvise < 0) { + std::fprintf(stderr, "Failed to fadvise input file: %d\n", r_faddvise); + exit(1); + } + + std::vector> tasks; + tasks.reserve(task_num); + loff_t offset = 0; + for (size_t i = 0; i < task_num; i++) { + tasks.emplace_back(condy::co_spawn( + copy_file_task(i, offset, size, + static_cast(raw_buffer) + i * chunk_size))); + } + for (auto &task : tasks) { + co_await std::move(task); + } + free(raw_buffer); +} + +condy::Coro co_main(const char *infile, const char *outfile) { + using condy::operators::operator&&; + + int flags = 0; + if (use_direct) { + flags |= O_DIRECT; + } + + auto [infd, outfd] = + co_await (condy::async_open(infile, O_RDONLY | flags, 0) && + condy::async_open(outfile, O_WRONLY | O_CREAT | flags, 0644)); + if (infd < 0 || outfd < 0) { + std::fprintf(stderr, "Failed to open file: %d %d\n", infd, outfd); + exit(1); + } + + struct statx statx_buf; + int r_stat = co_await condy::async_statx( + AT_FDCWD, infile, AT_STATX_SYNC_AS_STAT, STATX_SIZE, &statx_buf); + if (r_stat < 0) { + std::fprintf(stderr, "Failed to statx file: %d\n", r_stat); + exit(1); + } + +#if !IO_URING_CHECK_VERSION(2, 6) // >= 2.6 + int r_trunc = co_await condy::async_ftruncate( + outfd, static_cast(statx_buf.stx_size)); + if (r_trunc < 0) { + std::fprintf(stderr, "Failed to truncate file: %d\n", r_trunc); + exit(1); + } +#else + int r_trunc = ftruncate64(outfd, static_cast(statx_buf.stx_size)); + if (r_trunc < 0) { + std::fprintf(stderr, "Failed to truncate file: %d\n", errno); + exit(1); + } +#endif + + std::printf("Copy %lld bytes from %s to %s\n", + (long long)statx_buf.stx_size, infile, outfile); + + auto start = std::chrono::high_resolution_clock::now(); + + co_await do_file_copy(infd, outfd, static_cast(statx_buf.stx_size)); + + auto end = std::chrono::high_resolution_clock::now(); + std::chrono::duration elapsed = end - start; + double mbps = + (static_cast(statx_buf.stx_size) / (1024.0 * 1024.0)) / + elapsed.count(); + std::printf("Copied %lld bytes in %.2f seconds (%.2f MB/s)\n", + (long long)statx_buf.stx_size, elapsed.count(), mbps); + + co_await (condy::async_close(infd) && condy::async_close(outfd)); +} + +void usage(const char *progname) { + std::fprintf( + stderr, + "Usage: %s [-hd] [-t ] [-c ] \n" + " -h Show this help message\n" + " -d Use O_DIRECT for file operations\n" + " -t Number of concurrent copy tasks\n" + " -c Size of each copy chunk\n", + progname); +} + +size_t get_chunk_size(const char *arg) { + size_t len = std::strlen(arg); + int suffix = std::tolower(arg[len - 1]); + size_t multiplier = 1; + if (suffix == 'k') { + multiplier = 1024; + len -= 1; + } else if (suffix == 'm') { + multiplier = 1024l * 1024; + len -= 1; + } else if (suffix == 'g') { + multiplier = 1024l * 1024 * 1024; + len -= 1; + } + return std::stoul(std::string(arg, len)) * multiplier; +} + +int main(int argc, char **argv) noexcept(false) { + int opt; + while ((opt = getopt(argc, argv, "ht:c:d")) != -1) { + switch (opt) { + case 't': + task_num = std::stoul(optarg); + break; + case 'c': + chunk_size = get_chunk_size(optarg); + break; + case 'd': + use_direct = true; + break; + case 'h': + usage(argv[0]); + return 0; + default: + usage(argv[0]); + return 1; + } + } + + if (argc - optind < 2) { + usage(argv[0]); + return 1; + } + + auto options = condy::RuntimeOptions().sq_size(task_num * 2); + condy::Runtime runtime(options); + condy::sync_wait(runtime, co_main(argv[optind], argv[optind + 1])); + return 0; +} \ No newline at end of file From ab794d205ed9a62dd1df22bb1f10be380db933f1 Mon Sep 17 00:00:00 2001 From: wokron Date: Sun, 11 Jan 2026 21:20:19 +0800 Subject: [PATCH 2/2] update echo-server --- examples/echo-server.cpp | 107 ++++++++++++++++++++------------------- 1 file changed, 55 insertions(+), 52 deletions(-) diff --git a/examples/echo-server.cpp b/examples/echo-server.cpp index efd69f7e..165e08a1 100644 --- a/examples/echo-server.cpp +++ b/examples/echo-server.cpp @@ -7,92 +7,95 @@ #include #include -void prepare_address(const std::string &host, uint16_t port, - sockaddr_in &addr) { - memset(&addr, 0, sizeof(addr)); - addr.sin_family = AF_INET; - addr.sin_port = htons(port); - inet_pton(AF_INET, host.c_str(), &addr.sin_addr); -} +constexpr size_t BACKLOG = 128; +constexpr size_t MAX_CONNECTIONS = 1024; +constexpr size_t MAX_MESSAGE_LEN = 2048; -condy::Coro<> handle_client(int client_fd) { - constexpr size_t BUFFER_SIZE = 1024; - char buffer[BUFFER_SIZE]; +condy::Coro session(int client_fd) { + char buffer[MAX_MESSAGE_LEN]; while (true) { - int n = co_await condy::async_read( - client_fd, condy::buffer(buffer, BUFFER_SIZE), 0); + int n = co_await condy::async_recv(condy::fixed(client_fd), + condy::buffer(buffer), 0); if (n <= 0) { if (n < 0) { - std::fprintf(stderr, "Read error: %s\n", std::strerror(-n)); + std::fprintf(stderr, "Read error: %d\n", n); } break; } - n = co_await condy::async_write(client_fd, condy::buffer(buffer, n), 0); + n = co_await condy::async_send(condy::fixed(client_fd), + condy::buffer(buffer, n), 0); if (n < 0) { - std::fprintf(stderr, "Write error: %s\n", std::strerror(-n)); + std::fprintf(stderr, "Write error: %d\n", n); break; } } - co_await condy::async_close(client_fd); - std::printf("Connection closed, fd:%d\n", client_fd); + co_await condy::async_close(condy::fixed(client_fd)); } -condy::Coro<> co_main(const std::string &host, uint16_t port) { +condy::Coro co_main(int server_fd) { + while (true) { + sockaddr_in client_addr; + socklen_t client_len = sizeof(client_addr); + int client_fd = co_await condy::async_accept_direct( + server_fd, (struct sockaddr *)&client_addr, &client_len, 0, + CONDY_FILE_INDEX_ALLOC); + if (client_fd < 0) { + std::fprintf(stderr, "Failed to accept connection: %d\n", + client_fd); + co_return 1; + } + + condy::co_spawn(session(client_fd)).detach(); + } +} + +void prepare_address(const std::string &host, uint16_t port, + sockaddr_in &addr) { + memset(&addr, 0, sizeof(addr)); + addr.sin_family = AF_INET; + addr.sin_port = htons(port); + inet_pton(AF_INET, host.c_str(), &addr.sin_addr); +} + +int main(int argc, char **argv) noexcept(false) { + if (argc < 3) { + std::fprintf(stderr, "Usage: %s \n", argv[0]); + return 1; + } + + std::string host = argv[1]; + uint16_t port = static_cast(std::stoi(argv[2])); + sockaddr_in server_addr; prepare_address(host, port, server_addr); int server_fd = socket(AF_INET, SOCK_STREAM, 0); if (server_fd < 0) { std::perror("Failed to create socket"); - co_return; + return 1; } if (bind(server_fd, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) { std::perror("Failed to bind socket"); - co_await condy::async_close(server_fd); - co_return; + close(server_fd); + return 1; } - if (listen(server_fd, SOMAXCONN) < 0) { + if (listen(server_fd, BACKLOG) < 0) { std::perror("Failed to listen on socket"); - co_await condy::async_close(server_fd); - co_return; + close(server_fd); + return 1; } std::printf("Echo server listening on %s:%d\n", host.c_str(), port); - while (true) { - sockaddr_in client_addr; - socklen_t client_len = sizeof(client_addr); - int client_fd = co_await condy::async_accept( - server_fd, (struct sockaddr *)&client_addr, &client_len, 0); - if (client_fd < 0) { - std::fprintf(stderr, "Failed to accept connection: %s\n", - std::strerror(-client_fd)); - co_return; - } - - std::printf("Accept connection from %s:%d, fd:%d\n", - inet_ntoa(client_addr.sin_addr), - ntohs(client_addr.sin_port), client_fd); + condy::Runtime runtime(condy::RuntimeOptions().sq_size(MAX_CONNECTIONS)); - condy::co_spawn(handle_client(client_fd)).detach(); - } -} - -int main(int argc, char **argv) noexcept(false) { - if (argc < 3) { - std::fprintf(stderr, "Usage: %s \n", argv[0]); - return 1; - } - - std::string host = argv[1]; - uint16_t port = static_cast(std::stoi(argv[2])); + runtime.fd_table().init(MAX_CONNECTIONS); - condy::sync_wait(co_main(host, port)); - return 0; + return condy::sync_wait(runtime, co_main(server_fd)); } \ No newline at end of file