Skip to content

Commit

Permalink
Fixed sequential executor test
Browse files Browse the repository at this point in the history
- This also applies similar changes to all related spots, needs a thorough code review.
  Essentially, after calling execute_deferred() the future may have become ready, thus
  it needs not to be handled by facilities like dataflow(), when_all(), etc.
  • Loading branch information
hkaiser committed Jun 18, 2015
1 parent e642922 commit 1fcbdb4
Show file tree
Hide file tree
Showing 8 changed files with 272 additions and 193 deletions.
130 changes: 69 additions & 61 deletions hpx/lcos/local/dataflow.hpp
Expand Up @@ -313,44 +313,48 @@ namespace hpx { namespace lcos { namespace local
void await_range(std::size_t depth, TupleIter iter,
Iter next, Iter end)
{
void (dataflow_frame::*f)(
std::size_t, TupleIter, Iter, Iter
) = &dataflow_frame::await_range_respawn;

for (/**/; next != end; ++next)
{
if (!next->is_ready())
{
void (dataflow_frame::*f)(
std::size_t, TupleIter, Iter, Iter
) = &dataflow_frame::await_range_respawn;

typedef
typename std::iterator_traits<
Iter
>::value_type
future_type;
typedef
typename traits::future_traits<
future_type
>::type
future_result_type;

boost::intrusive_ptr<
lcos::detail::future_data<future_result_type>
> next_future_data
= lcos::detail::get_shared_state(*next);
typedef
typename std::iterator_traits<
Iter
>::value_type
future_type;
typedef
typename traits::future_traits<
future_type
>::type
future_result_type;

boost::intrusive_ptr<dataflow_frame> this_(this);
boost::intrusive_ptr<
lcos::detail::future_data<future_result_type>
> next_future_data
= lcos::detail::get_shared_state(*next);

if (!next_future_data->is_ready())
{
next_future_data->execute_deferred();
next_future_data->set_on_completed(
boost::bind(
f
, std::move(this_)
, ++depth
, std::move(iter)
, std::move(next)
, std::move(end)
)
);
return;

// execute_deferred might have made the future ready
if (!next_future_data->is_ready())
{
boost::intrusive_ptr<dataflow_frame> this_(this);
next_future_data->set_on_completed(
boost::bind(
f
, std::move(this_)
, ++depth
, std::move(iter)
, std::move(next)
, std::move(end)
)
);
return;
}
}
}

Expand Down Expand Up @@ -403,38 +407,42 @@ namespace hpx { namespace lcos { namespace local
future_type & f_ =
boost::fusion::deref(iter);

if(!f_.is_ready())
{
void (dataflow_frame::*f)(
std::size_t, Iter,
boost::mpl::true_, boost::mpl::false_
) = &dataflow_frame::await_next_respawn;
typedef
typename traits::future_traits<
future_type
>::type
future_result_type;

typedef
typename traits::future_traits<
future_type
>::type
future_result_type;
boost::intrusive_ptr<
lcos::detail::future_data<future_result_type>
> next_future_data
= lcos::detail::get_shared_state(f_);

boost::intrusive_ptr<
lcos::detail::future_data<future_result_type>
> next_future_data
= lcos::detail::get_shared_state(f_);
if(!next_future_data->is_ready())
{
next_future_data->execute_deferred();

boost::intrusive_ptr<dataflow_frame> this_(this);
// execute_deferred might have made the future ready
if (!next_future_data->is_ready())
{
void (dataflow_frame::*f)(
std::size_t, Iter,
boost::mpl::true_, boost::mpl::false_
) = &dataflow_frame::await_next_respawn;

next_future_data->execute_deferred();
next_future_data->set_on_completed(
hpx::util::bind(
f
, std::move(this_)
, ++depth
, std::move(iter)
, boost::mpl::true_()
, boost::mpl::false_()
)
);
return;
boost::intrusive_ptr<dataflow_frame> this_(this);
next_future_data->set_on_completed(
hpx::util::bind(
f
, std::move(this_)
, ++depth
, std::move(iter)
, boost::mpl::true_()
, boost::mpl::false_()
)
);
return;
}
}

typedef boost::mpl::bool_<
Expand Down
8 changes: 5 additions & 3 deletions hpx/lcos/local/packaged_continuation.hpp
Expand Up @@ -368,13 +368,15 @@ namespace hpx { namespace lcos { namespace detail
// bind an on_completed handler to this future which will invoke
// the continuation
boost::intrusive_ptr<continuation> this_(this);
void (continuation::*cb)(shared_state_ptr const&, threads::executor&) =
&continuation::async;
void (continuation::*cb)(
shared_state_ptr const&, threads::executor&
) = &continuation::async;

shared_state_ptr const& state =
lcos::detail::get_shared_state(future);
state->execute_deferred();
state->set_on_completed(util::bind(cb, std::move(this_), state, boost::ref(sched)));
state->set_on_completed(util::bind(cb, std::move(this_),
state, boost::ref(sched)));
}

protected:
Expand Down
47 changes: 29 additions & 18 deletions hpx/lcos/wait_all.hpp
Expand Up @@ -187,6 +187,9 @@ namespace hpx { namespace lcos
future_type
>::type future_result_type;

void (wait_all_frame::*f)(TupleIter, Iter, Iter) =
&wait_all_frame::await_range;

for (/**/; next != end; ++next)
{
boost::intrusive_ptr<
Expand All @@ -195,17 +198,20 @@ namespace hpx { namespace lcos

if (!next_future_data->is_ready())
{
// Attach a continuation to this future which will
// re-evaluate it and continue to the next element
// in the sequence (if any).
void (wait_all_frame::*f)(TupleIter, Iter, Iter) =
&wait_all_frame::await_range;

next_future_data->execute_deferred();
next_future_data->set_on_completed(util::bind(
f, this, std::move(iter),
std::move(next), std::move(end)));
return;

// execute_deferred might have made the future ready
if (!next_future_data->is_ready())
{
// Attach a continuation to this future which will
// re-evaluate it and continue to the next element
// in the sequence (if any).
next_future_data->set_on_completed(
util::bind(
f, this, std::move(iter),
std::move(next), std::move(end)));
return;
}
}
}

Expand Down Expand Up @@ -250,15 +256,20 @@ namespace hpx { namespace lcos

if (!next_future_data->is_ready())
{
// Attach a continuation to this future which will
// re-evaluate it and continue to the next argument
// (if any).
void (wait_all_frame::*f)(TupleIter, true_, false_) =
&wait_all_frame::await_next;

next_future_data->execute_deferred();
next_future_data->set_on_completed(hpx::util::bind(
f, this, std::move(iter), true_(), false_()));

// execute_deferred might have made the future ready
if (!next_future_data->is_ready())
{
// Attach a continuation to this future which will
// re-evaluate it and continue to the next argument
// (if any).
void (wait_all_frame::*f)(TupleIter, true_, false_) =
&wait_all_frame::await_next;

next_future_data->set_on_completed(hpx::util::bind(
f, this, std::move(iter), true_(), false_()));
}
}
else
{
Expand Down
22 changes: 14 additions & 8 deletions hpx/lcos/wait_some.hpp
Expand Up @@ -191,20 +191,26 @@ namespace hpx { namespace lcos
template <typename SharedState>
void operator()(SharedState& shared_state) const
{
std::size_t counter = wait_.count_.load(boost::memory_order_seq_cst);
if (counter < wait_.needed_count_ && !shared_state->is_ready()) {
std::size_t counter =
wait_.count_.load(boost::memory_order_seq_cst);
if (counter < wait_.needed_count_ && !shared_state->is_ready())
{
// handle future only if not enough futures are ready yet
// also, do not touch any futures which are already ready

shared_state->execute_deferred();
shared_state->set_on_completed(Callback(callback_));
}
else {
if (wait_.count_.fetch_add(1) + 1 == wait_.needed_count_)

// execute_deferred might have made the future ready
if (!shared_state->is_ready())
{
wait_.goal_reached_on_calling_thread_ = true;
shared_state->set_on_completed(Callback(callback_));
return;
}
}

if (wait_.count_.fetch_add(1) + 1 == wait_.needed_count_)
{
wait_.goal_reached_on_calling_thread_ = true;
}
}

template <typename Sequence_>
Expand Down
83 changes: 49 additions & 34 deletions hpx/lcos/when_all.hpp
Expand Up @@ -220,28 +220,36 @@ namespace hpx { namespace lcos
template <typename TupleIter, typename Iter>
void await_range(TupleIter iter, Iter next, Iter end)
{
for (/**/; next != end; ++next)
{
if (!next->is_ready())
{
void (when_all_frame::*f)(TupleIter, Iter, Iter) =
&when_all_frame::await_range;
typedef typename std::iterator_traits<Iter>::value_type
future_type;
typedef typename traits::future_traits<future_type>::type
future_result_type;

typedef typename std::iterator_traits<Iter>::value_type
future_type;
typedef typename traits::future_traits<future_type>::type
future_result_type;
void (when_all_frame::*f)(TupleIter, Iter, Iter) =
&when_all_frame::await_range;

boost::intrusive_ptr<
lcos::detail::future_data<future_result_type>
> next_future_data = lcos::detail::get_shared_state(*next);
for (/**/; next != end; ++next)
{
boost::intrusive_ptr<
lcos::detail::future_data<future_result_type>
> next_future_data = lcos::detail::get_shared_state(*next);

boost::intrusive_ptr<when_all_frame> this_(this);
if (!next_future_data->is_ready())
{
next_future_data->execute_deferred();
next_future_data->set_on_completed(util::bind(
f, this_, std::move(iter),
std::move(next), std::move(end)));
return;

// execute_deferred might have made the future ready
if (!next_future_data->is_ready())
{
// Attach a continuation to this future which will
// re-evaluate it and continue to the next element
// in the sequence (if any).
boost::intrusive_ptr<when_all_frame> this_(this);
next_future_data->set_on_completed(util::bind(
f, std::move(this_), std::move(iter),
std::move(next), std::move(end)));
return;
}
}
}

Expand Down Expand Up @@ -274,27 +282,34 @@ namespace hpx { namespace lcos
using boost::mpl::true_;

future_type& f_ = boost::fusion::deref(iter);
if (!f_.is_ready())
{
// Attach a continuation to this future which will
// re-evaluate it and continue to the next argument
// (if any).
void (when_all_frame::*f)(TupleIter, true_, false_) =
&when_all_frame::await_next;

typedef typename traits::future_traits<future_type>::type
future_result_type;
typedef typename traits::future_traits<future_type>::type
future_result_type;

boost::intrusive_ptr<
lcos::detail::future_data<future_result_type>
> next_future_data = lcos::detail::get_shared_state(f_);
boost::intrusive_ptr<
lcos::detail::future_data<future_result_type>
> next_future_data = lcos::detail::get_shared_state(f_);

boost::intrusive_ptr<when_all_frame> this_(this);
if (!next_future_data->is_ready())
{
next_future_data->execute_deferred();
next_future_data->set_on_completed(hpx::util::bind(
f, this_, std::move(iter), true_(), false_()));

return;
// execute_deferred might have made the future ready
if (!next_future_data->is_ready())
{
// Attach a continuation to this future which will
// re-evaluate it and continue to the next argument
// (if any).
void (when_all_frame::*f)(TupleIter, true_, false_) =
&when_all_frame::await_next;

boost::intrusive_ptr<when_all_frame> this_(this);
next_future_data->set_on_completed(
hpx::util::bind(
f, std::move(this_), std::move(iter),
true_(), false_()));
return;
}
}

typedef typename boost::fusion::result_of::next<TupleIter>::type
Expand Down

0 comments on commit 1fcbdb4

Please sign in to comment.