From be57e237a4c403b5cc1c729b37b3ea9079f708ba Mon Sep 17 00:00:00 2001 From: Michael Stetsyuk Date: Fri, 29 May 2026 17:09:50 +0000 Subject: [PATCH] Fix lost bytes on cancelled io --- src/fibers/fiber.cpp | 1 - src/fibers/tests/io-cancel-test.cpp | 81 +++++++++++++++++++++++++++++ 2 files changed, 81 insertions(+), 1 deletion(-) create mode 100644 src/fibers/tests/io-cancel-test.cpp diff --git a/src/fibers/fiber.cpp b/src/fibers/fiber.cpp index 233f962..67ad3b9 100644 --- a/src/fibers/fiber.cpp +++ b/src/fibers/fiber.cpp @@ -1438,7 +1438,6 @@ void FiberScheduler::poll(int fd, uint32_t events, uint64_t * triggeredEvents, I void FiberScheduler::cancelIo(IoFuture * future) noexcept { - future->result = nullptr; enqueueIo( nullptr, [=](io_uring_sqe * sqe) noexcept diff --git a/src/fibers/tests/io-cancel-test.cpp b/src/fibers/tests/io-cancel-test.cpp new file mode 100644 index 0000000..1780f0d --- /dev/null +++ b/src/fibers/tests/io-cancel-test.cpp @@ -0,0 +1,81 @@ +#include + +#include + +#include + +#include + +namespace silk +{ + +// Repeatedly read then immediately cancel. +// Make sure no data is lost. +TEST(IoCancel, cancelMustNotDropDeliveredBytes) +{ + static constexpr uint64_t TOTAL = 4096; + + struct Params + { + int readFd; + int writeFd; + + static int fiberMain(Params * p) noexcept + { + std::string expected(TOTAL, '\0'); + for (uint64_t i = 0; i < TOTAL; ++i) + { + expected[i] = static_cast(i & 0xFF); + } + + EXPECT_EQ(::write(p->writeFd, expected.data(), TOTAL), static_cast(TOTAL)); + ::close(p->writeFd); + + std::string got; + for (;;) + { + char buf[64] = {}; + uint64_t bytes_read = 0; + FiberScheduler::IoFuture future; + iovec iov{buf, sizeof(buf)}; + FiberScheduler::read(p->readFd, &iov, 1, 0, &bytes_read, &future); + future.cancel(); + if (future.wait() == 0) + { + // Read won (likely). + if (bytes_read == 0) + { + break; + } + got.append(buf, bytes_read); + } + else + { + // Cancel won. + // It's unlikely to happen consistently, + // but to keep the test independent of kernel internals, read to make progress. + if (!FiberScheduler::read(p->readFd, buf, sizeof(buf), 0, &bytes_read)) + { + if (bytes_read == 0) + { + break; + } + got.append(buf, bytes_read); + } + } + } + + EXPECT_EQ(got, expected); + return 0; + } + }; + + int fds[2]; + ASSERT_EQ(::pipe(fds), 0); + + EXPECT_EQ(FiberScheduler::run(Params::fiberMain, Params{fds[0], fds[1]}), 0); + + ::close(fds[0]); +} + +} // namespace silk