Skip to content

Commit

Permalink
Ensure that continuables that are resolved immediately are always sym…
Browse files Browse the repository at this point in the history
…metrically transferable
  • Loading branch information
Rogiel authored and Naios committed Sep 12, 2023
1 parent c7f5b1c commit 23a724c
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 2 deletions.
25 changes: 23 additions & 2 deletions include/continuable/detail/other/coroutines.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,16 @@ class awaitable {
/// A cache which is used to pass the result of the continuation
/// to the coroutine.
result_t result_;
/// Enumeration that represents the suspension state of the awaitable.
enum class state : std::uint8_t {
suspended,
pending,
resolved,
};
/// An atomic that specifies whether the awaitable has suspended or not.
/// Allows to perform symmetric transfer on continuables that are
/// immediately resolved.
std::atomic<state> state_{state::pending};

public:
explicit constexpr awaitable(Continuable&& continuable)
Expand All @@ -117,16 +127,27 @@ class awaitable {

/// Suspend the current context
// TODO Convert this to an r-value function once possible
void await_suspend(coroutine_handle<> h) {
bool await_suspend(coroutine_handle<> h) {
assert(result_.is_empty());
// Forward every result to the current awaitable
std::move(continuable_)
.next([h, this](auto&&... args) mutable {
assert(result_.is_empty());
result_ = result_t::from(std::forward<decltype(args)>(args)...);
h.resume();

// If true, it means that the promise was suspended (i.e., the
// awaitable await_suspend method has already returned). That
// means we must call the resume coroutine from the continuation
// chain.
if (state_.exchange(state::resolved, std::memory_order_acq_rel) ==
state::suspended) {
return h.resume();
}
})
.done();

return state_.exchange(state::suspended, std::memory_order_acq_rel) !=
state::resolved;
}

/// Resume the coroutine represented by the handle
Expand Down
72 changes: 72 additions & 0 deletions test/unit-test/multi/test-continuable-await.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,78 @@ TYPED_TEST(single_dimension_tests, are_awaitable_with_cancellation_from_coro) {
ASSERT_ASYNC_CANCELLATION(resolve_coro_canceled(supply))
}

template <typename S>
cti::continuable<> test_symmetric_transfer(S&& supplier) {
// If symmetric transfer is not working properly, large
// loops will quickly cause stack overflows.
for (size_t index = 0; index < 10000; index++) {
co_await supplier();
}
co_return;
}

TYPED_TEST(single_dimension_tests, are_symmetric_transferable) {
auto const& supply = [&]() {
return cti::make_continuable<int>([](auto&& promise) {
promise.set_value(0);
});
};

ASSERT_ASYNC_COMPLETION(test_symmetric_transfer(supply));
}

TYPED_TEST(single_dimension_tests, are_symmetric_transferable_type_erased) {
auto const& supply = [&]() -> cti::continuable<int> {
return cti::make_continuable<int>([](auto&& promise) {
promise.set_value(0);
});
};

ASSERT_ASYNC_COMPLETION(test_symmetric_transfer(supply));
}

TYPED_TEST(single_dimension_tests,
are_symmetric_transferable_using_make_ready) {
auto const& supply = [&]() {
return cti::make_ready_continuable<int>(0);
};

ASSERT_ASYNC_COMPLETION(test_symmetric_transfer(supply));
}

TYPED_TEST(single_dimension_tests,
are_symmetric_transferable_using_type_erased_make_ready) {
auto const& supply = [&]() -> cti::continuable<int> {
return cti::make_ready_continuable<int>(0);
};

ASSERT_ASYNC_COMPLETION(test_symmetric_transfer(supply));
}

TYPED_TEST(single_dimension_tests, are_symmetric_transferable_using_type_erased_from_thread) {
auto const& supply = [&]() -> cti::continuable<int> {
return cti::make_continuable<int>([](auto&& promise) {
std::async(std::launch::async, std::forward<decltype(promise)>(promise), 0);
});
};

ASSERT_ASYNC_COMPLETION(test_symmetric_transfer(supply));
}

TYPED_TEST(single_dimension_tests, are_symmetric_transferable_except) {
size_t count = 0;
auto const& supply = [&]() -> cti::continuable<int> {
// NOTE: The symmetric transfer loop does 10000 iterations.
if(++count == 5000) {
return cti::make_exceptional_continuable<int>(
std::make_exception_ptr(std::runtime_error("Failed")));
}
return cti::make_ready_continuable<int>(0);
};

ASSERT_ASYNC_EXCEPTION_COMPLETION(test_symmetric_transfer(supply));
}

# endif // CONTINUABLE_WITH_NO_EXCEPTIONS

#endif // CONTINUABLE_HAS_EXPERIMENTAL_COROUTINE

0 comments on commit 23a724c

Please sign in to comment.