Skip to content

Commit

Permalink
spawn race condition fix, closes #167.
Browse files Browse the repository at this point in the history
  • Loading branch information
klemens-morgenstern committed Mar 31, 2024
1 parent 904b141 commit 61bf8d4
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 16 deletions.
12 changes: 0 additions & 12 deletions include/boost/cobalt/detail/forward_cancellation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,6 @@ struct forward_cancellation
}
};

struct forward_dispatch_cancellation
{
asio::cancellation_signal &cancel_signal;
executor exec;

forward_dispatch_cancellation(asio::cancellation_signal &cancel_signal,
executor exec) : cancel_signal(cancel_signal), exec(exec) {}
void operator()(asio::cancellation_type ct) const
{
asio::dispatch(exec, [this, ct]{cancel_signal.emit(ct);});
}
};

}

Expand Down
16 changes: 12 additions & 4 deletions include/boost/cobalt/detail/spawn.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,15 @@ struct async_initiate_spawn
#else
auto alloc = asio::get_associated_allocator(h);
#endif
auto recs = allocate_unique<detail::task_receiver<T>>(alloc, std::move(rec));
auto recs = std::allocate_shared<detail::task_receiver<T>>(alloc, std::move(rec));

auto sl = asio::get_associated_cancellation_slot(h);
if (sl.is_connected())
sl.template emplace<detail::forward_dispatch_cancellation>(recs->promise->signal, exec);
sl.assign(
[exec, recs](asio::cancellation_type ct)
{
asio::dispatch(exec, [recs, ct] {recs->cancel(ct);});
});

auto p = recs.get();

Expand Down Expand Up @@ -97,15 +101,19 @@ struct async_initiate_spawn
#else
auto alloc = asio::get_associated_allocator(h);
#endif
auto recs = allocate_unique<detail::task_receiver<void>>(alloc, std::move(a.receiver_));
auto recs = std::allocate_shared<detail::task_receiver<void>>(alloc, std::move(a.receiver_));

if (recs->done)
return asio::dispatch(asio::get_associated_immediate_executor(h, exec),
asio::append(std::forward<Handler>(h), recs->exception));

auto sl = asio::get_associated_cancellation_slot(h);
if (sl.is_connected())
sl.template emplace<detail::forward_dispatch_cancellation>(recs->promise->signal, exec);
sl.assign(
[exec, recs](asio::cancellation_type ct)
{
asio::dispatch(exec, [recs, ct] {recs->cancel(ct);});
});

auto p = recs.get();

Expand Down
6 changes: 6 additions & 0 deletions include/boost/cobalt/detail/task.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,12 @@ struct task_receiver : task_value_holder<T>
done = true;
}

void cancel(asio::cancellation_type ct) const
{
if (!done)
promise->signal.emit(ct);
}

task_receiver() = default;
task_receiver(task_receiver && lhs)
: task_value_holder<T>(std::move(lhs)),
Expand Down
11 changes: 11 additions & 0 deletions test/task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -312,5 +312,16 @@ CO_TEST_CASE(move_only)
co_await task_move_only_test();
}

BOOST_AUTO_TEST_CASE(cancel_task_)
{
asio::thread_pool ctx{1};
asio::cancellation_signal sig;
cobalt::spawn(ctx, test0(), asio::bind_cancellation_slot(sig.slot(), asio::detached));
sig.emit(asio::cancellation_type::all);
ctx.join();
}




BOOST_AUTO_TEST_SUITE_END();

0 comments on commit 61bf8d4

Please sign in to comment.