Skip to content

Commit

Permalink
Add sync waiting functions
Browse files Browse the repository at this point in the history
  • Loading branch information
alandefreitas committed Jan 9, 2022
1 parent 12dd8b0 commit 2a68b40
Show file tree
Hide file tree
Showing 26 changed files with 1,108 additions and 70 deletions.
35 changes: 35 additions & 0 deletions docs/waiting.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Waiting for futures

This library includes a number of helper functions to wait for the result of future tasks.

# Await

Usually use the functions `get()` or `wait()` to wait for the result of future types. This is especially important for deferred futures, which might not start executing until we start waiting for their results.

The free-function [await] can also be used to wait for results. This syntactic sugar makes waiting more similar to other common programming languages.

# Waiting for conjunction

The [await] can also be used for conjunctions. In this case, the function returns a tuple, which might be convenient with structured binding declarations.

We can also wait for a future conjunction with [wait_for_all]. This function waits for all results in a range or tuple of futures without owning them. This is convenient when we need to synchronously wait for a set of futures.

!!! warning "[wait_for_all] != [when_all]"

[wait_for_all] should not be confused with the future adaptor [when_all], which generates a future type

While [when_all] is an asynchronous operation in a task graph, [wait_for_all] is an indication that the task graph should end at that point as an alternative to `wait()`

# Waiting for disjunctions

We can also wait for a future disjunction with [wait_for_any]. This function waits for any result in a range or tuple of futures without owning them.

!!! warning "[wait_for_any] != [when_any]"

[wait_for_any] should not be confused with the future adaptor [when_any], which generates a future type

While [when_any] is an asynchronous operation in a task graph, [wait_for_any] is an indication that the task graph should end at that point as an alternative to `wait()`

It's important to note that waiting for disjunctions is a little more complex than waiting for conjunctions of future. While we can wait for a conjunction by waiting for each of its elements, we need set up and wait for a notification from any of the futures in order to know which future has finished first in a sequence of futures.

--8<-- "docs/references.md"
1 change: 1 addition & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ nav:
- Launching tasks: launching.md
- Stoppable futures: stoppable_futures.md
- Shared futures: shared_futures.md
- Waiting: waiting.md
- Future Adaptors:
- Continuations: continuations.md
- Conjunctions: conjunctions.md
Expand Down
52 changes: 36 additions & 16 deletions source/futures/adaptor/when_any.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
#include <futures/adaptor/detail/traits/is_tuple.h>
#include <futures/adaptor/detail/tuple_algorithm.h>
#include <futures/adaptor/when_any_result.h>
#include <futures/futures/traits/to_future.h>
#include <futures/futures/async.h>
#include <futures/futures/traits/to_future.h>

namespace futures {
/** \addtogroup adaptors Adaptors
Expand Down Expand Up @@ -689,15 +689,34 @@ namespace futures {
/// The logic for setting notifiers for futures with and without lazy continuations is almost the
/// same.
///
/// The task is the same but the first goes to a continuation and the later goes into a new thread.
/// Unfortunately, we do need a new thread an not only a new task because we are not sure there's
/// room in the executor for that.
/// The task is the same but the difference is:
/// 1) the notification task is a continuation if the future supports continuations, and
/// 2) the notification task goes into a new new thread if the future does not support continuations.
///
/// @note Unfortunately, we need a new thread an not only a new task in some executor whenever the
/// task doesn't support continuations because we cannot be sure there's room in the executor for
/// the notification task.
///
/// This might be counter intuitive, as one could assume there's going to be room for the notifications
/// as soon as the ongoing tasks are running. However, there are a few situations where this might happen:
///
/// 1) The current tasks we are waiting for have not been launched yet and the executor is busy with
/// tasks that need cancellation to stop
/// 2) Some of the tasks we are waiting for are running and some are enqueued. The running tasks finish
/// but we don't hear about it because the enqueued tasks come before the notification.
/// 3) All tasks we are waiting for have no support for continuations. The executor has no room for the
/// notifier because of some parallel tasks happening in the executor and we never hear about a future
/// getting ready.
///
/// So although this is an edge case, we cannot assume there's room for the notifications in the
/// executor.
///
template <class SettingLazyContinuables> void maybe_set_up_notifiers_common() {
constexpr bool setting_lazy = SettingLazyContinuables::value;
constexpr bool setting_thread = !SettingLazyContinuables::value;
constexpr bool setting_notifiers_as_continuations = SettingLazyContinuables::value;
constexpr bool setting_notifiers_as_new_threads = !SettingLazyContinuables::value;

// Never do that more than once. Also check
if constexpr (setting_thread) {
if constexpr (setting_notifiers_as_new_threads) {
if (thread_notifiers_set) {
return;
}
Expand All @@ -711,15 +730,15 @@ namespace futures {

// Initialize the variable the notifiers need to set
// Any of the notifiers will set the same variable
const bool init_ready =
(setting_lazy && (!thread_notifiers_set)) || (setting_thread && (!lazy_notifiers_set));
const bool init_ready = (setting_notifiers_as_continuations && (!thread_notifiers_set)) ||
(setting_notifiers_as_new_threads && (!lazy_notifiers_set));
if (init_ready) {
ready_notified = false;
}

// Check if there are threads to set up
const bool no_compatible_futures =
(setting_thread && all_lazy_continuable()) || (setting_lazy && lazy_continuable_size() == 0);
const bool no_compatible_futures = (setting_notifiers_as_new_threads && all_lazy_continuable()) ||
(setting_notifiers_as_continuations && lazy_continuable_size() == 0);
if (no_compatible_futures) {
return;
}
Expand Down Expand Up @@ -831,9 +850,11 @@ namespace futures {

// Launch the notification task for each future
if constexpr (sequence_is_range) {
if constexpr (is_lazy_continuable_v<typename sequence_type::value_type> && setting_thread) {
if constexpr (is_lazy_continuable_v<typename sequence_type::value_type> &&
setting_notifiers_as_new_threads) {
return;
} else if constexpr (not is_lazy_continuable_v<typename sequence_type::value_type> && setting_lazy) {
} else if constexpr (not is_lazy_continuable_v<typename sequence_type::value_type> &&
setting_notifiers_as_continuations) {
return;
} else {
// Ensure we have one notifier allocated for each task
Expand Down Expand Up @@ -1093,9 +1114,8 @@ namespace futures {
}
} else /* if constexpr (input_is_invocable) */ {
static_assert(input_is_invocable);
std::transform(first, last, std::back_inserter(v), [](auto &&f) {
return ::futures::async(std::forward<decltype(f)>(f));
});
std::transform(first, last, std::back_inserter(v),
[](auto &&f) { return ::futures::async(std::forward<decltype(f)>(f)); });
}

return when_any_future<sequence_type>(std::move(v));
Expand Down
7 changes: 6 additions & 1 deletion source/futures/executor/default_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,12 @@ namespace futures {
///
/// \return Reference to the default execution context for @ref async
inline default_execution_context_type &default_execution_context() {
static asio::thread_pool pool(hardware_concurrency());
#ifdef FUTURES_DEFAULT_THREAD_POOL_SIZE
const std::size_t default_thread_pool_size = FUTURES_DEFAULT_THREAD_POOL_SIZE;
#else
const std::size_t default_thread_pool_size = hardware_concurrency();
#endif
static asio::thread_pool pool(default_thread_pool_size);
return pool;
}

Expand Down
9 changes: 1 addition & 8 deletions source/futures/futures/await.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,9 @@ namespace futures {
/** \addtogroup futures Futures
* @{
*/
/** \addtogroup launch Launch
* @{
*/
/** \addtogroup launch-policies Launch Policies
* @{
*/

/// \brief Very simple version syntax sugar for types that pass the Future concept: future.wait() / future.get()
///
/// This syntax is most useful for cases where we are immediately requesting the future result
template <typename Future
#ifndef FUTURES_DOXYGEN
Expand All @@ -32,8 +27,6 @@ namespace futures {
}

/** @} */
/** @} */
/** @} */
} // namespace futures

#endif // FUTURES_AWAIT_H
52 changes: 51 additions & 1 deletion source/futures/futures/basic_future.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@

#include <futures/futures/detail/continuations_source.h>
#include <futures/futures/detail/shared_state.h>
#include <futures/futures/traits/is_future.h>
#include <futures/futures/detail/throw_exception.h>

#include <futures/futures/stop_token.h>
#include <futures/futures/traits/is_future.h>

namespace futures {
/** \addtogroup futures Futures
Expand Down Expand Up @@ -236,19 +238,26 @@ namespace futures {
public:
/// \name Public types
/// @{

using value_type = T;

using is_shared = Shared;
using is_lazy_continuable = LazyContinuable;
using is_stoppable = Stoppable;
static constexpr bool is_shared_v = Shared::value;
static constexpr bool is_lazy_continuable_v = LazyContinuable::value;
static constexpr bool is_stoppable_v = Stoppable::value;

using lazy_continuations_base =
detail::lazy_continuations_base<LazyContinuable, basic_future<T, Shared, LazyContinuable, Stoppable>, T>;
using stop_token_base =
detail::stop_token_base<Stoppable, basic_future<T, Shared, LazyContinuable, Stoppable>, T>;

friend lazy_continuations_base;
friend stop_token_base;

using notify_when_ready_handle = detail::shared_state_base::notify_when_ready_handle;

/// @}

/// \name Shared state counterparts
Expand Down Expand Up @@ -457,6 +466,38 @@ namespace futures {
/// For safety, all futures join at destruction
void detach() { join_ = false; }

/// \brief Notify this condition variable when the future is ready
notify_when_ready_handle notify_when_ready(std::condition_variable_any & cv) {
if (!state_) {
detail::throw_exception<future_uninitialized>();
}
return state_->notify_when_ready(cv);
}

/// \brief Cancel request to notify this condition variable when the future is ready
void unnotify_when_ready(notify_when_ready_handle h) {
if (!state_) {
detail::throw_exception<future_uninitialized>();
}
return state_->unnotify_when_ready(h);
}

/// \brief Get a reference to the mutex in the underlying shared state
std::mutex &mutex() {
if (!state_) {
detail::throw_exception<future_uninitialized>();
}
return state_->mutex();
}

/// \brief Checks if the shared state is ready
[[nodiscard]] bool is_ready(std::unique_lock<std::mutex>& lk) const {
if (!valid()) {
throw std::future_error(std::future_errc::no_state);
}
return state_->is_ready(lk);
}

private:
/// \name Private Functions
/// @{
Expand Down Expand Up @@ -503,6 +544,15 @@ namespace futures {
template <typename... Args> struct is_future<const basic_future<Args...> &> : std::true_type {};
/// @}

/// \name Define basic_future as a kind of future
/// @{
template <typename... Args> struct has_ready_notifier<basic_future<Args...>> : std::true_type {};
template <typename... Args> struct has_ready_notifier<basic_future<Args...> &> : std::true_type {};
template <typename... Args> struct has_ready_notifier<basic_future<Args...> &&> : std::true_type {};
template <typename... Args> struct has_ready_notifier<const basic_future<Args...>> : std::true_type {};
template <typename... Args> struct has_ready_notifier<const basic_future<Args...> &> : std::true_type {};
/// @}

/// \name Define basic_futures as supporting lazy continuations
/// @{
template <class T, class SH, class L, class ST> struct is_shared_future<basic_future<T, SH, L, ST>> : SH {};
Expand Down

0 comments on commit 2a68b40

Please sign in to comment.