Skip to content

Commit

Permalink
Merge pull request #1899 from STEllAR-GROUP/fixing_1898
Browse files Browse the repository at this point in the history
Fixing a problem with credit splitting as revealed by #1898
  • Loading branch information
sithhell committed Dec 9, 2015
2 parents 694aadd + 5500c3d commit 17168bd
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 2 deletions.
10 changes: 10 additions & 0 deletions hpx/lcos/future.hpp
Expand Up @@ -127,6 +127,16 @@ namespace hpx { namespace lcos { namespace detail

ar.await_future(f);
}
else
{
if(f.is_ready() && f.has_value())
{
value_type const & value =
*hpx::traits::future_access<Future>::
get_shared_state(f)->get_result();
ar << value;
}
}
return;
}

Expand Down
23 changes: 22 additions & 1 deletion hpx/runtime/parcelset/parcelport_impl.hpp
Expand Up @@ -200,8 +200,29 @@ namespace hpx { namespace parcelset
new hpx::serialization::output_archive(
*future_await, 0, 0, 0, 0, &future_await->new_gids_)
);

put_parcel_await(
dest, std::move(p), std::move(f), trigger, archive, future_await);
}

void put_parcel_await(
locality const & dest, parcel p, write_handler_type f, bool trigger
, boost::shared_ptr<hpx::serialization::output_archive> const & archive
, boost::shared_ptr<
hpx::serialization::detail::future_await_container
> const & future_await)
{
future_await->reset();

(*archive) << p;

// We are doing a fixed point iteration until we are sure that the
// serialization process requires nothing more to wait on ...
// Things where we need waiting:
// - (shared_)future<id_type>: when the future wasn't ready yet, we
// need to do another await round for the id splitting
// - id_type: we need to await, if and only if, the credit of the
// needs to split.
if(future_await->has_futures())
{
void (parcelport_impl::*awaiter)(
Expand All @@ -210,7 +231,7 @@ namespace hpx { namespace parcelset
, boost::shared_ptr<
hpx::serialization::detail::future_await_container> const &
)
= &parcelport_impl::put_parcel_impl;
= &parcelport_impl::put_parcel_await;
(*future_await)(
util::bind(
util::one_shot(awaiter), this,
Expand Down
10 changes: 9 additions & 1 deletion hpx/runtime/serialization/detail/future_await_container.hpp
Expand Up @@ -49,7 +49,7 @@ namespace hpx { namespace serialization { namespace detail
// hpx::lcos::local::promise<void>::set_value() might need to acquire
// a lock, as such, we check the our triggering condition inside a
// critical section and trigger the promise outside of it.
bool set_value = true;
bool set_value = false;
{
boost::lock_guard<mutex_type> l(mtx_);
++triggered_futures_;
Expand All @@ -75,6 +75,14 @@ namespace hpx { namespace serialization { namespace detail
);
}

void reset()
{
done_ = false;
num_futures_ = 0;
triggered_futures_ = 0;
promise_ = hpx::lcos::local::promise<void>();
}

bool has_futures()
{
if(num_futures_ == 0)
Expand Down
1 change: 1 addition & 0 deletions tests/regressions/lcos/CMakeLists.txt
Expand Up @@ -18,6 +18,7 @@ set(tests
future_hang_on_get_629
future_hang_on_then_629
future_timed_wait_1025
future_serialization_1898
future_unwrap_878
future_unwrap_1182
ignore_while_locked_1485
Expand Down
88 changes: 88 additions & 0 deletions tests/regressions/lcos/future_serialization_1898.cpp
@@ -0,0 +1,88 @@
// Copyright (c) 2015 Thomas Heller
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

#include <hpx/hpx_main.hpp>
#include <hpx/include/actions.hpp>
#include <hpx/include/components.hpp>
#include <hpx/lcos/future.hpp>

#include <hpx/util/lightweight_test.hpp>

struct test_server
: hpx::components::component_base<test_server>
{
test_server() { alive++; }
~test_server() { alive--; }

static boost::atomic<int> alive;
};

typedef hpx::components::component<test_server> server_type;
HPX_REGISTER_COMPONENT(server_type, test_server);

boost::atomic<int> test_server::alive(0);

hpx::id_type test(hpx::future<hpx::id_type> fid)
{
hpx::id_type id = fid.get();
HPX_TEST(hpx::naming::detail::gid_was_split(id.get_gid()));
return id;
}

HPX_PLAIN_ACTION(test);

int main()
{
hpx::id_type loc = hpx::find_here();
{
HPX_TEST(test_server::alive == 0);
hpx::id_type gid = hpx::new_<test_server>(loc).get();
HPX_TEST(test_server::alive == 1);
// HPX_TEST(!hpx::naming::detail::gid_was_split(gid.get_gid()));

auto remote_localities = hpx::find_remote_localities();
for(hpx::id_type loc : remote_localities)
{
{
hpx::future<hpx::id_type> test_fid = hpx::make_ready_future(gid);
hpx::future<hpx::id_type> fid
= hpx::async(test_action(), loc, std::move(test_fid));
HPX_TEST(test_server::alive == 1);

hpx::id_type new_gid = fid.get();
HPX_TEST_NEQ(
hpx::naming::detail::get_credit_from_gid(gid.get_gid())
, hpx::naming::detail::get_credit_from_gid(new_gid.get_gid())
);
}

{
hpx::lcos::local::promise<hpx::id_type> pid;

hpx::future<hpx::id_type> test_fid = pid.get_future();
hpx::future<hpx::id_type> fid
= hpx::async(test_action(), loc, std::move(test_fid));
HPX_TEST(test_server::alive == 1);

hpx::this_thread::yield();

pid.set_value(gid);
HPX_TEST(test_server::alive == 1);

hpx::id_type new_gid = fid.get();
HPX_TEST_NEQ(
hpx::naming::detail::get_credit_from_gid(gid.get_gid())
, hpx::naming::detail::get_credit_from_gid(new_gid.get_gid())
);
}


HPX_TEST(test_server::alive == 1);
}
HPX_TEST(test_server::alive == 1);
}

return 0;
}

0 comments on commit 17168bd

Please sign in to comment.