diff --git a/hpx/lcos/local/dataflow.hpp b/hpx/lcos/local/dataflow.hpp index 295ca88007e1..0df6036d5df6 100644 --- a/hpx/lcos/local/dataflow.hpp +++ b/hpx/lcos/local/dataflow.hpp @@ -313,44 +313,48 @@ namespace hpx { namespace lcos { namespace local void await_range(std::size_t depth, TupleIter iter, Iter next, Iter end) { + void (dataflow_frame::*f)( + std::size_t, TupleIter, Iter, Iter + ) = &dataflow_frame::await_range_respawn; + for (/**/; next != end; ++next) { - if (!next->is_ready()) - { - void (dataflow_frame::*f)( - std::size_t, TupleIter, Iter, Iter - ) = &dataflow_frame::await_range_respawn; - - typedef - typename std::iterator_traits< - Iter - >::value_type - future_type; - typedef - typename traits::future_traits< - future_type - >::type - future_result_type; - - boost::intrusive_ptr< - lcos::detail::future_data - > next_future_data - = lcos::detail::get_shared_state(*next); + typedef + typename std::iterator_traits< + Iter + >::value_type + future_type; + typedef + typename traits::future_traits< + future_type + >::type + future_result_type; - boost::intrusive_ptr this_(this); + boost::intrusive_ptr< + lcos::detail::future_data + > next_future_data + = lcos::detail::get_shared_state(*next); + if (!next_future_data->is_ready()) + { next_future_data->execute_deferred(); - next_future_data->set_on_completed( - boost::bind( - f - , std::move(this_) - , ++depth - , std::move(iter) - , std::move(next) - , std::move(end) - ) - ); - return; + + // execute_deferred might have made the future ready + if (!next_future_data->is_ready()) + { + boost::intrusive_ptr this_(this); + next_future_data->set_on_completed( + boost::bind( + f + , std::move(this_) + , ++depth + , std::move(iter) + , std::move(next) + , std::move(end) + ) + ); + return; + } } } @@ -403,38 +407,42 @@ namespace hpx { namespace lcos { namespace local future_type & f_ = boost::fusion::deref(iter); - if(!f_.is_ready()) - { - void (dataflow_frame::*f)( - std::size_t, Iter, - boost::mpl::true_, boost::mpl::false_ - ) = &dataflow_frame::await_next_respawn; + typedef + typename traits::future_traits< + future_type + >::type + future_result_type; - typedef - typename traits::future_traits< - future_type - >::type - future_result_type; + boost::intrusive_ptr< + lcos::detail::future_data + > next_future_data + = lcos::detail::get_shared_state(f_); - boost::intrusive_ptr< - lcos::detail::future_data - > next_future_data - = lcos::detail::get_shared_state(f_); + if(!next_future_data->is_ready()) + { + next_future_data->execute_deferred(); - boost::intrusive_ptr this_(this); + // execute_deferred might have made the future ready + if (!next_future_data->is_ready()) + { + void (dataflow_frame::*f)( + std::size_t, Iter, + boost::mpl::true_, boost::mpl::false_ + ) = &dataflow_frame::await_next_respawn; - next_future_data->execute_deferred(); - next_future_data->set_on_completed( - hpx::util::bind( - f - , std::move(this_) - , ++depth - , std::move(iter) - , boost::mpl::true_() - , boost::mpl::false_() - ) - ); - return; + boost::intrusive_ptr this_(this); + next_future_data->set_on_completed( + hpx::util::bind( + f + , std::move(this_) + , ++depth + , std::move(iter) + , boost::mpl::true_() + , boost::mpl::false_() + ) + ); + return; + } } typedef boost::mpl::bool_< diff --git a/hpx/lcos/local/packaged_continuation.hpp b/hpx/lcos/local/packaged_continuation.hpp index c7fd84c6f652..a5cc9a8ad02e 100644 --- a/hpx/lcos/local/packaged_continuation.hpp +++ b/hpx/lcos/local/packaged_continuation.hpp @@ -368,13 +368,15 @@ namespace hpx { namespace lcos { namespace detail // bind an on_completed handler to this future which will invoke // the continuation boost::intrusive_ptr this_(this); - void (continuation::*cb)(shared_state_ptr const&, threads::executor&) = - &continuation::async; + void (continuation::*cb)( + shared_state_ptr const&, threads::executor& + ) = &continuation::async; shared_state_ptr const& state = lcos::detail::get_shared_state(future); state->execute_deferred(); - state->set_on_completed(util::bind(cb, std::move(this_), state, boost::ref(sched))); + state->set_on_completed(util::bind(cb, std::move(this_), + state, boost::ref(sched))); } protected: diff --git a/hpx/lcos/wait_all.hpp b/hpx/lcos/wait_all.hpp index dbd97ac09859..92e3c34e2be8 100644 --- a/hpx/lcos/wait_all.hpp +++ b/hpx/lcos/wait_all.hpp @@ -187,6 +187,9 @@ namespace hpx { namespace lcos future_type >::type future_result_type; + void (wait_all_frame::*f)(TupleIter, Iter, Iter) = + &wait_all_frame::await_range; + for (/**/; next != end; ++next) { boost::intrusive_ptr< @@ -195,17 +198,20 @@ namespace hpx { namespace lcos if (!next_future_data->is_ready()) { - // Attach a continuation to this future which will - // re-evaluate it and continue to the next element - // in the sequence (if any). - void (wait_all_frame::*f)(TupleIter, Iter, Iter) = - &wait_all_frame::await_range; - next_future_data->execute_deferred(); - next_future_data->set_on_completed(util::bind( - f, this, std::move(iter), - std::move(next), std::move(end))); - return; + + // execute_deferred might have made the future ready + if (!next_future_data->is_ready()) + { + // Attach a continuation to this future which will + // re-evaluate it and continue to the next element + // in the sequence (if any). + next_future_data->set_on_completed( + util::bind( + f, this, std::move(iter), + std::move(next), std::move(end))); + return; + } } } @@ -250,15 +256,20 @@ namespace hpx { namespace lcos if (!next_future_data->is_ready()) { - // Attach a continuation to this future which will - // re-evaluate it and continue to the next argument - // (if any). - void (wait_all_frame::*f)(TupleIter, true_, false_) = - &wait_all_frame::await_next; - next_future_data->execute_deferred(); - next_future_data->set_on_completed(hpx::util::bind( - f, this, std::move(iter), true_(), false_())); + + // execute_deferred might have made the future ready + if (!next_future_data->is_ready()) + { + // Attach a continuation to this future which will + // re-evaluate it and continue to the next argument + // (if any). + void (wait_all_frame::*f)(TupleIter, true_, false_) = + &wait_all_frame::await_next; + + next_future_data->set_on_completed(hpx::util::bind( + f, this, std::move(iter), true_(), false_())); + } } else { diff --git a/hpx/lcos/wait_some.hpp b/hpx/lcos/wait_some.hpp index 92d50b0b0a38..1832887ad3d1 100644 --- a/hpx/lcos/wait_some.hpp +++ b/hpx/lcos/wait_some.hpp @@ -191,20 +191,26 @@ namespace hpx { namespace lcos template void operator()(SharedState& shared_state) const { - std::size_t counter = wait_.count_.load(boost::memory_order_seq_cst); - if (counter < wait_.needed_count_ && !shared_state->is_ready()) { + std::size_t counter = + wait_.count_.load(boost::memory_order_seq_cst); + if (counter < wait_.needed_count_ && !shared_state->is_ready()) + { // handle future only if not enough futures are ready yet // also, do not touch any futures which are already ready - shared_state->execute_deferred(); - shared_state->set_on_completed(Callback(callback_)); - } - else { - if (wait_.count_.fetch_add(1) + 1 == wait_.needed_count_) + + // execute_deferred might have made the future ready + if (!shared_state->is_ready()) { - wait_.goal_reached_on_calling_thread_ = true; + shared_state->set_on_completed(Callback(callback_)); + return; } } + + if (wait_.count_.fetch_add(1) + 1 == wait_.needed_count_) + { + wait_.goal_reached_on_calling_thread_ = true; + } } template diff --git a/hpx/lcos/when_all.hpp b/hpx/lcos/when_all.hpp index 818acfb7aae0..885d85a45708 100644 --- a/hpx/lcos/when_all.hpp +++ b/hpx/lcos/when_all.hpp @@ -220,28 +220,36 @@ namespace hpx { namespace lcos template void await_range(TupleIter iter, Iter next, Iter end) { - for (/**/; next != end; ++next) - { - if (!next->is_ready()) - { - void (when_all_frame::*f)(TupleIter, Iter, Iter) = - &when_all_frame::await_range; + typedef typename std::iterator_traits::value_type + future_type; + typedef typename traits::future_traits::type + future_result_type; - typedef typename std::iterator_traits::value_type - future_type; - typedef typename traits::future_traits::type - future_result_type; + void (when_all_frame::*f)(TupleIter, Iter, Iter) = + &when_all_frame::await_range; - boost::intrusive_ptr< - lcos::detail::future_data - > next_future_data = lcos::detail::get_shared_state(*next); + for (/**/; next != end; ++next) + { + boost::intrusive_ptr< + lcos::detail::future_data + > next_future_data = lcos::detail::get_shared_state(*next); - boost::intrusive_ptr this_(this); + if (!next_future_data->is_ready()) + { next_future_data->execute_deferred(); - next_future_data->set_on_completed(util::bind( - f, this_, std::move(iter), - std::move(next), std::move(end))); - return; + + // execute_deferred might have made the future ready + if (!next_future_data->is_ready()) + { + // Attach a continuation to this future which will + // re-evaluate it and continue to the next element + // in the sequence (if any). + boost::intrusive_ptr this_(this); + next_future_data->set_on_completed(util::bind( + f, std::move(this_), std::move(iter), + std::move(next), std::move(end))); + return; + } } } @@ -274,27 +282,34 @@ namespace hpx { namespace lcos using boost::mpl::true_; future_type& f_ = boost::fusion::deref(iter); - if (!f_.is_ready()) - { - // Attach a continuation to this future which will - // re-evaluate it and continue to the next argument - // (if any). - void (when_all_frame::*f)(TupleIter, true_, false_) = - &when_all_frame::await_next; - typedef typename traits::future_traits::type - future_result_type; + typedef typename traits::future_traits::type + future_result_type; - boost::intrusive_ptr< - lcos::detail::future_data - > next_future_data = lcos::detail::get_shared_state(f_); + boost::intrusive_ptr< + lcos::detail::future_data + > next_future_data = lcos::detail::get_shared_state(f_); - boost::intrusive_ptr this_(this); + if (!next_future_data->is_ready()) + { next_future_data->execute_deferred(); - next_future_data->set_on_completed(hpx::util::bind( - f, this_, std::move(iter), true_(), false_())); - return; + // execute_deferred might have made the future ready + if (!next_future_data->is_ready()) + { + // Attach a continuation to this future which will + // re-evaluate it and continue to the next argument + // (if any). + void (when_all_frame::*f)(TupleIter, true_, false_) = + &when_all_frame::await_next; + + boost::intrusive_ptr this_(this); + next_future_data->set_on_completed( + hpx::util::bind( + f, std::move(this_), std::move(iter), + true_(), false_())); + return; + } } typedef typename boost::fusion::result_of::next::type diff --git a/hpx/lcos/when_any.hpp b/hpx/lcos/when_any.hpp index bc45478376b4..81bfebe964f8 100644 --- a/hpx/lcos/when_any.hpp +++ b/hpx/lcos/when_any.hpp @@ -216,29 +216,42 @@ namespace hpx { namespace lcos template void operator()(Future& future) const { - std::size_t index = when_.index_.load(boost::memory_order_seq_cst); - if (index == when_any_result::index_error()) { - if (!future.is_ready()) { - // handle future only if not enough futures are ready yet - // also, do not touch any futures which are already ready - - typedef - typename lcos::detail::shared_state_ptr_for::type - shared_state_ptr; - - shared_state_ptr const& shared_state = - lcos::detail::get_shared_state(future); + std::size_t index = + when_.index_.load(boost::memory_order_seq_cst); + if (index == when_any_result::index_error()) + { + typedef typename lcos::detail::shared_state_ptr_for< + Future + >::type shared_state_ptr; + + shared_state_ptr const& shared_state = + lcos::detail::get_shared_state(future); + + if (!shared_state->is_ready()) + { + // handle future only if not enough futures are ready + // yet also, do not touch any futures which are already + // ready + shared_state->execute_deferred(); - shared_state->set_on_completed(util::bind( - &when_any::on_future_ready, when_.shared_from_this(), - idx_, threads::get_self_id())); - } - else { - if (when_.index_.compare_exchange_strong(index, idx_)) + + // execute_deferred might have made the future ready + if (!shared_state->is_ready()) { - when_.goal_reached_on_calling_thread_ = true; + shared_state->set_on_completed( + util::bind( + &when_any::on_future_ready, + when_.shared_from_this(), + idx_, threads::get_self_id())); + ++idx_; + return; } } + + if (when_.index_.compare_exchange_strong(index, idx_)) + { + when_.goal_reached_on_calling_thread_ = true; + } } ++idx_; } diff --git a/hpx/lcos/when_each.hpp b/hpx/lcos/when_each.hpp index 1c98665016d2..40a3544baec6 100644 --- a/hpx/lcos/when_each.hpp +++ b/hpx/lcos/when_each.hpp @@ -156,29 +156,37 @@ namespace hpx { namespace lcos template void await_range(TupleIter iter, Iter next, Iter end) { - for(/**/; next != end; ++next) - { - if(!next->is_ready()) - { - void (when_each_frame::*f)(TupleIter, Iter, Iter) = - &when_each_frame::await_range; + typedef typename std::iterator_traits::value_type + future_type; + typedef typename traits::future_traits::type + future_result_type; - typedef typename std::iterator_traits::value_type - future_type; + void (when_each_frame::*f)(TupleIter, Iter, Iter) = + &when_each_frame::await_range; - typedef typename traits::future_traits::type - future_result_type; - - boost::intrusive_ptr< - lcos::detail::future_data - > next_future_data = lcos::detail::get_shared_state(*next); + for(/**/; next != end; ++next) + { + boost::intrusive_ptr< + lcos::detail::future_data + > next_future_data = lcos::detail::get_shared_state(*next); - boost::intrusive_ptr this_(this); + if (!next_future_data->is_ready()) + { next_future_data->execute_deferred(); - next_future_data->set_on_completed(util::bind( - f, this_, std::move(iter), - std::move(next), std::move(end))); - return; + + // execute_deferred might have made the future ready + if (!next_future_data->is_ready()) + { + // Attach a continuation to this future which will + // re-evaluate it and continue to the next argument + // (if any). + boost::intrusive_ptr this_(this); + next_future_data->set_on_completed( + util::bind( + f, std::move(this_), std::move(iter), + std::move(next), std::move(end))); + return; + } } f_(std::move(*next)); @@ -215,32 +223,38 @@ namespace hpx { namespace lcos typedef typename util::decay_unwrap< typename boost::fusion::result_of::deref::type >::type future_type; + typedef typename traits::future_traits::type + future_result_type; using boost::mpl::false_; using boost::mpl::true_; future_type& fut = boost::fusion::deref(iter); - if (!fut.is_ready()) - { - // Attach a continuation to this future which will - // re-evaluate it and continue to the next argument - // (if any). - void (when_each_frame::*f)(TupleIter, true_, false_) = - &when_each_frame::await_next; - - typedef typename traits::future_traits::type - future_result_type; - boost::intrusive_ptr< - lcos::detail::future_data - > next_future_data = lcos::detail::get_shared_state(fut); + boost::intrusive_ptr< + lcos::detail::future_data + > next_future_data = lcos::detail::get_shared_state(fut); - boost::intrusive_ptr this_(this); + if (!next_future_data->is_ready()) + { next_future_data->execute_deferred(); - next_future_data->set_on_completed(hpx::util::bind( - f, this_, std::move(iter), true_(), false_())); - return; + // execute_deferred might have made the future ready + if (!next_future_data->is_ready()) + { + // Attach a continuation to this future which will + // re-evaluate it and continue to the next argument + // (if any). + void (when_each_frame::*f)(TupleIter, true_, false_) = + &when_each_frame::await_next; + + boost::intrusive_ptr this_(this); + next_future_data->set_on_completed( + hpx::util::bind( + f, std::move(this_), std::move(iter), + true_(), false_())); + return; + } } f_(std::move(fut)); diff --git a/hpx/lcos/when_some.hpp b/hpx/lcos/when_some.hpp index df7ce5031f8e..bc05da15df00 100644 --- a/hpx/lcos/when_some.hpp +++ b/hpx/lcos/when_some.hpp @@ -309,31 +309,41 @@ namespace hpx { namespace lcos template void operator()(Future& future) const { - std::size_t counter = when_.count_.load(boost::memory_order_seq_cst); + std::size_t counter = + when_.count_.load(boost::memory_order_seq_cst); if (counter < when_.needed_count_) { if (!future.is_ready()) { - // handle future only if not enough futures are ready yet - // also, do not touch any futures which are already ready + // handle future only if not enough futures are ready + // yet also, do not touch any futures which are already + // ready - typedef - typename lcos::detail::shared_state_ptr_for::type - shared_state_ptr; + typedef typename lcos::detail::shared_state_ptr_for< + Future + >::type shared_state_ptr; shared_state_ptr const& shared_state = lcos::detail::get_shared_state(future); shared_state->execute_deferred(); - shared_state->set_on_completed(util::bind( - &when_some::on_future_ready, when_.shared_from_this(), - idx_, threads::get_self_id())); - } - else { - when_.lazy_values_.indices.push_back(idx_); - if (when_.count_.fetch_add(1) + 1 == when_.needed_count_) + + // execute_deferred might have made the future ready + if (!shared_state->is_ready()) { - when_.goal_reached_on_calling_thread_ = true; + shared_state->set_on_completed( + util::bind( + &when_some::on_future_ready, + when_.shared_from_this(), + idx_, threads::get_self_id())); + ++idx_; + return; } } + + when_.lazy_values_.indices.push_back(idx_); + if (when_.count_.fetch_add(1) + 1 == when_.needed_count_) + { + when_.goal_reached_on_calling_thread_ = true; + } } ++idx_; }