Skip to content

Commit

Permalink
Fixing a problem with credit splitting as revealed by #1898
Browse files Browse the repository at this point in the history
We currently run into a situation where we would need to wait on futures before
actual serialization after the first round of awaiting has been completed. In
addition, the future itself doesn't give its held value the chance to be awaited.
  • Loading branch information
sithhell committed Dec 8, 2015
1 parent 8417f14 commit b9ac626
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 2 deletions.
10 changes: 10 additions & 0 deletions hpx/lcos/future.hpp
Expand Up @@ -127,6 +127,16 @@ namespace hpx { namespace lcos { namespace detail

ar.await_future(f);
}
else
{
if(f.is_ready() && f.has_value())
{
value_type const & value =
*hpx::traits::future_access<Future>::
get_shared_state(f)->get_result();
ar << value;
}
}
return;
}

Expand Down
22 changes: 21 additions & 1 deletion hpx/runtime/parcelset/parcelport_impl.hpp
Expand Up @@ -200,8 +200,28 @@ namespace hpx { namespace parcelset
new hpx::serialization::output_archive(
*future_await, 0, 0, 0, 0, &future_await->new_gids_)
);

put_parcel_await(dest, std::move(p), std::move(f), trigger, archive, future_await);
}

void put_parcel_await(
locality const & dest, parcel p, write_handler_type f, bool trigger
, boost::shared_ptr<hpx::serialization::output_archive> const & archive
, boost::shared_ptr<
hpx::serialization::detail::future_await_container
> const & future_await)
{
future_await->reset();

(*archive) << p;

// We are doing a fixed point iteration until we are sure that the
// serialization process requires nothing more to wait on ...
// Things where we need waiting:
// - (shared_)future<id_type>: when the future wasn't ready yet, we
// need to do another await round for the id splitting
// - id_type: we need to await, if and only if, the credit of the
// needs to split.
if(future_await->has_futures())
{
void (parcelport_impl::*awaiter)(
Expand All @@ -210,7 +230,7 @@ namespace hpx { namespace parcelset
, boost::shared_ptr<
hpx::serialization::detail::future_await_container> const &
)
= &parcelport_impl::put_parcel_impl;
= &parcelport_impl::put_parcel_await;
(*future_await)(
util::bind(
util::one_shot(awaiter), this,
Expand Down
10 changes: 9 additions & 1 deletion hpx/runtime/serialization/detail/future_await_container.hpp
Expand Up @@ -49,7 +49,7 @@ namespace hpx { namespace serialization { namespace detail
// hpx::lcos::local::promise<void>::set_value() might need to acquire
// a lock, as such, we check the our triggering condition inside a
// critical section and trigger the promise outside of it.
bool set_value = true;
bool set_value = false;
{
boost::lock_guard<mutex_type> l(mtx_);
++triggered_futures_;
Expand All @@ -75,6 +75,14 @@ namespace hpx { namespace serialization { namespace detail
);
}

void reset()
{
done_ = false;
num_futures_ = 0;
triggered_futures_ = 0;
promise_ = hpx::lcos::local::promise<void>();
}

bool has_futures()
{
if(num_futures_ == 0)
Expand Down
1 change: 1 addition & 0 deletions tests/regressions/lcos/CMakeLists.txt
Expand Up @@ -18,6 +18,7 @@ set(tests
future_hang_on_get_629
future_hang_on_then_629
future_timed_wait_1025
future_serialization_1898
future_unwrap_878
future_unwrap_1182
ignore_while_locked_1485
Expand Down
86 changes: 86 additions & 0 deletions tests/regressions/lcos/future_serialization_1898.cpp
@@ -0,0 +1,86 @@
// Copyright (c) 2015 Thomas Heller
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

#include <hpx/hpx_main.hpp>
#include <hpx/include/actions.hpp>
#include <hpx/include/components.hpp>
#include <hpx/lcos/future.hpp>

#include <hpx/util/lightweight_test.hpp>

struct test_server
: hpx::components::component_base<test_server>
{
test_server() { alive++; }
~test_server() { alive--; }

static boost::atomic<int> alive;
};

typedef hpx::components::component<test_server> server_type;
HPX_REGISTER_COMPONENT(server_type, test_server);

boost::atomic<int> test_server::alive(0);

hpx::id_type test(hpx::future<hpx::id_type> fid)
{
hpx::id_type id = fid.get();
HPX_TEST(hpx::naming::detail::gid_was_split(id.get_gid()));
return id;
}

HPX_PLAIN_ACTION(test);

int main()
{
hpx::id_type loc = hpx::find_here();
{
HPX_TEST(test_server::alive == 0);
hpx::id_type gid = hpx::new_<test_server>(loc).get();
HPX_TEST(test_server::alive == 1);
// HPX_TEST(!hpx::naming::detail::gid_was_split(gid.get_gid()));

auto remote_localities = hpx::find_remote_localities();
for(hpx::id_type loc : remote_localities)
{
{
hpx::future<hpx::id_type> test_fid = hpx::make_ready_future(gid);
hpx::future<hpx::id_type> fid = hpx::async(test_action(), loc, std::move(test_fid));
HPX_TEST(test_server::alive == 1);

hpx::id_type new_gid = fid.get();
HPX_TEST_NEQ(
hpx::naming::detail::get_credit_from_gid(gid.get_gid())
, hpx::naming::detail::get_credit_from_gid(new_gid.get_gid())
);
}

{
hpx::lcos::local::promise<hpx::id_type> pid;

hpx::future<hpx::id_type> test_fid = pid.get_future();
hpx::future<hpx::id_type> fid = hpx::async(test_action(), loc, std::move(test_fid));
HPX_TEST(test_server::alive == 1);

hpx::this_thread::yield();

pid.set_value(gid);
HPX_TEST(test_server::alive == 1);

hpx::id_type new_gid = fid.get();
HPX_TEST_NEQ(
hpx::naming::detail::get_credit_from_gid(gid.get_gid())
, hpx::naming::detail::get_credit_from_gid(new_gid.get_gid())
);
}


HPX_TEST(test_server::alive == 1);
}
HPX_TEST(test_server::alive == 1);
}

return 0;
}

0 comments on commit b9ac626

Please sign in to comment.