Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions include/condy/channel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ class Channel<T, N>::PushFinishHandle
// Successfully canceled
assert(result_ == -ENOTRECOVERABLE);
result_ = -ECANCELED;
runtime_->resume_work();
need_resume_ = true;
runtime_->schedule(this);
}
}
Expand Down Expand Up @@ -427,7 +427,7 @@ class Channel<T, N>::PopFinishHandle
// Successfully canceled
assert(result_.first == -ENOTRECOVERABLE);
result_.first = -ECANCELED;
runtime_->resume_work();
need_resume_ = true;
runtime_->schedule(this);
}
}
Expand Down
52 changes: 41 additions & 11 deletions include/condy/runtime.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,8 @@ class Runtime {
// Fast path: if the ring is enabled, we can directly schedule the
// work
tsan_release(work);
schedule_msg_ring_(curr_runtime, work, WorkType::Schedule);
schedule_msg_ring_(curr_runtime,
encode_work(work, WorkType::Schedule));
} else {
// Slow path: if the ring is not enabled, we need to acquire the
// mutex to ensure the work is scheduled before the ring is enabled
Expand All @@ -204,13 +205,33 @@ class Runtime {
if (state == State::Enabled) {
lock.unlock();
tsan_release(work);
schedule_msg_ring_(curr_runtime, work, WorkType::Schedule);
schedule_msg_ring_(curr_runtime,
encode_work(work, WorkType::Schedule));
} else {
global_queue_.push_back(work);
}
}
}

// Internal use only. Schedule a cancel request for the given data.
void cancel(void *data) noexcept {
// Ensure align of 8 for encoding
assert(reinterpret_cast<intptr_t>(data) % 8 == 0);
auto *curr_runtime = detail::Context::current().runtime();
if (curr_runtime == this) {
io_uring_sqe *sqe = ring_.get_sqe();
prep_cancel_(sqe, data);
Comment thread
wokron marked this conversation as resolved.
return;
}

auto state = state_.load();
if (state != State::Enabled) {
return;
}

schedule_msg_ring_(curr_runtime, encode_work(data, WorkType::Cancel));
}
Comment thread
wokron marked this conversation as resolved.

void pend_work() noexcept { pending_works_++; }

void resume_work() noexcept { pending_works_--; }
Expand Down Expand Up @@ -291,15 +312,15 @@ class Runtime {
auto &settings() noexcept { return ring_.settings(); }

private:
void schedule_msg_ring_(Runtime *curr_runtime, WorkInvoker *work,
WorkType type) noexcept {
void schedule_msg_ring_(Runtime *curr_runtime, void *data) noexcept {
int ring_fd = this->ring_.ring()->ring_fd;
if (curr_runtime != nullptr) {
io_uring_sqe *sqe = curr_runtime->ring_.get_sqe();
prep_msg_ring_(sqe, work, type);
prep_msg_ring_(ring_fd, sqe, data);
curr_runtime->pend_work();
} else {
io_uring_sqe sqe = {};
prep_msg_ring_(&sqe, work, type);
prep_msg_ring_(ring_fd, &sqe, data);
int r = detail::sync_msg_ring(&sqe);
if (r < 0) {
panic_on(std::format("sync_msg_ring: {}", std::strerror(-r)));
Expand All @@ -319,21 +340,27 @@ class Runtime {
return;
}

schedule_msg_ring_(curr_runtime, nullptr, WorkType::Ignore);
schedule_msg_ring_(curr_runtime,
encode_work(nullptr, WorkType::Ignore));
}

void flush_global_queue_() noexcept {
local_queue_.push_back(std::move(global_queue_));
}

void prep_msg_ring_(io_uring_sqe *sqe, WorkInvoker *work,
WorkType type) noexcept {
auto data = encode_work(work, type);
io_uring_prep_msg_ring(sqe, this->ring_.ring()->ring_fd, 0,
static void prep_msg_ring_(int ring_fd, io_uring_sqe *sqe,
void *data) noexcept {
io_uring_prep_msg_ring(sqe, ring_fd, 0,
reinterpret_cast<uint64_t>(data), 0);
io_uring_sqe_set_data(sqe, encode_work(nullptr, WorkType::Schedule));
}

static void prep_cancel_(io_uring_sqe *sqe, void *data) noexcept {
io_uring_prep_cancel(sqe, data, 0);
Comment thread
wokron marked this conversation as resolved.
io_uring_sqe_set_data(sqe, encode_work(nullptr, WorkType::Ignore));
io_uring_sqe_set_flags(sqe, IOSQE_CQE_SKIP_SUCCESS);
}

void flush_ring_() noexcept {
auto r = ring_.reap_completions(
[this](io_uring_cqe *cqe) { process_cqe_(cqe); });
Expand Down Expand Up @@ -371,6 +398,9 @@ class Runtime {
tsan_acquire(data);
(*work)();
}
} else if (type == WorkType::Cancel) {
io_uring_sqe *sqe = ring_.get_sqe();
prep_cancel_(sqe, data);
} else if (type == WorkType::Common) {
auto *handle = static_cast<OpFinishHandleBase *>(data);
auto op_finish = handle->handle(cqe);
Expand Down
6 changes: 6 additions & 0 deletions include/condy/work_type.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,13 @@ enum class WorkType : uint8_t {
Common,
Ignore,
Schedule,
Cancel,

// Add new work types above this line
WorkTypeMax,
};
static_assert(static_cast<uint8_t>(WorkType::WorkTypeMax) <= 8,
"WorkType must fit in 3 bits");

Comment thread
wokron marked this conversation as resolved.
inline std::pair<void *, WorkType> decode_work(void *ptr) noexcept {
intptr_t mask = (1 << 3) - 1;
Expand Down
65 changes: 65 additions & 0 deletions tests/test_runtime.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -236,5 +236,70 @@ TEST_CASE("test runtime - allow_exit from other thread") {

runtime.allow_exit();

t1.join();
}

TEST_CASE("test runtime - cancel from other task") {
condy::Runtime runtime(options);

auto cancel_task = [&](void *ptr) -> condy::Coro<void> {
runtime.cancel(ptr);
co_return;
};
auto func = [&]() -> condy::Coro<void> {
__kernel_timespec ts{
.tv_sec = 60ll * 60ll,
.tv_nsec = 0,
};
Comment thread
wokron marked this conversation as resolved.
auto aw =
condy::detail::make_op_awaiter(io_uring_prep_timeout, &ts, 0, 0);
void *ptr = aw.get_handle();
auto t = condy::co_spawn(cancel_task(ptr));
co_await aw;
co_await t;
Comment thread
wokron marked this conversation as resolved.
};
Comment thread
wokron marked this conversation as resolved.

condy::co_spawn(runtime, func()).detach();

runtime.allow_exit();
runtime.run();
}

TEST_CASE("test runtime - cancel from other thread") {
condy::Runtime runtime(options);

std::atomic_bool r1_started = false;
void *ptr = nullptr;

auto notify_task = [&]() -> condy::Coro<void> {
r1_started = true;
r1_started.notify_one();
co_return;
};

auto func = [&]() -> condy::Coro<void> {
__kernel_timespec ts{
.tv_sec = 60ll * 60ll,
.tv_nsec = 0,
};
auto aw =
condy::detail::make_op_awaiter(io_uring_prep_timeout, &ts, 0, 0);
ptr = aw.get_handle();
auto t = condy::co_spawn(runtime, notify_task());
co_await aw;
Comment thread
wokron marked this conversation as resolved.
co_await t;
};
Comment thread
wokron marked this conversation as resolved.

condy::co_spawn(runtime, func()).detach();

std::thread t1([&]() {
runtime.allow_exit();
runtime.run();
});

r1_started.wait(false);

runtime.cancel(ptr);

t1.join();
}
1 change: 1 addition & 0 deletions tests/test_work_type.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,5 @@ TEST_CASE("test work_type - encode and decode") {
test_type(condy::WorkType::Common);
test_type(condy::WorkType::Ignore);
test_type(condy::WorkType::Schedule);
test_type(condy::WorkType::Cancel);
}
Loading