Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix potential zero-copy for primarynamespace::bulk_service_async et.al. #1543

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
17 changes: 0 additions & 17 deletions hpx/runtime/agas/primary_namespace.hpp
Expand Up @@ -46,23 +46,6 @@ struct primary_namespace
{
this->base_type::service_non_blocking(this->get_gid(), req, priority);
}

std::vector<response> bulk_service(
std::vector<request> const& reqs
, threads::thread_priority priority = threads::thread_priority_default
, error_code& ec = throws
)
{
return this->base_type::bulk_service(this->get_gid(), reqs, priority, ec);
}

void bulk_service_non_blocking(
std::vector<request> const& reqs
, threads::thread_priority priority = threads::thread_priority_default
)
{
this->base_type::bulk_service_non_blocking(this->get_gid(), reqs, priority);
}
};

}}
Expand Down
14 changes: 8 additions & 6 deletions hpx/runtime/agas/stubs/primary_namespace.hpp
Expand Up @@ -21,6 +21,10 @@ struct HPX_EXPORT primary_namespace
typedef server::primary_namespace server_type;
typedef server::primary_namespace server_component_type;

typedef util::function_nonser<
void(boost::system::error_code const&, parcelset::parcel const&)
> parcel_sent_func;

///////////////////////////////////////////////////////////////////////////
template <typename Result>
static lcos::future<Result> service_async(
Expand All @@ -41,8 +45,7 @@ struct HPX_EXPORT primary_namespace
static void service_non_blocking(
naming::id_type const& gid
, request const& req
, util::function_nonser<void(boost::system::error_code const&,
parcelset::parcel const&)> const& f
, parcel_sent_func const& cb
, threads::thread_priority priority = threads::thread_priority_default
);

Expand All @@ -60,6 +63,7 @@ struct HPX_EXPORT primary_namespace
static lcos::future<std::vector<response> > bulk_service_async(
naming::id_type const& gid
, std::vector<request> const& reqs
, parcel_sent_func const& cb
, threads::thread_priority priority = threads::thread_priority_default
);

Expand All @@ -69,6 +73,7 @@ struct HPX_EXPORT primary_namespace
static void bulk_service_non_blocking(
naming::id_type const& gid
, std::vector<request> const& reqs
, parcel_sent_func const& cb
, threads::thread_priority priority = threads::thread_priority_default
);

Expand All @@ -77,10 +82,7 @@ struct HPX_EXPORT primary_namespace
, std::vector<request> const& reqs
, threads::thread_priority priority = threads::thread_priority_default
, error_code& ec = throws
)
{
return bulk_service_async(gid, reqs, priority).get(ec);
}
);

static naming::gid_type get_service_instance(naming::gid_type const& dest)
{
Expand Down
37 changes: 26 additions & 11 deletions src/runtime/agas/addressing_service.cpp
Expand Up @@ -2893,6 +2893,12 @@ void addressing_service::send_refcnt_requests(
}
#endif

template <typename T>
void keep_data_alive(boost::shared_ptr<T>)
{
// do nothing, just keep argument alive
}

void addressing_service::send_refcnt_requests_non_blocking(
addressing_service::mutex_type::scoped_lock& l
, error_code& ec
Expand Down Expand Up @@ -2925,7 +2931,8 @@ void addressing_service::send_refcnt_requests_non_blocking(

// collect all requests for each locality
typedef std::map<naming::id_type, std::vector<request> > requests_type;
requests_type requests;
boost::shared_ptr<requests_type> requests =
boost::make_shared<requests_type>();

for (refcnt_requests_type::const_reference e : *p)
{
Expand All @@ -2938,15 +2945,17 @@ void addressing_service::send_refcnt_requests_non_blocking(
stubs::primary_namespace::get_service_instance(raw)
, naming::id_type::unmanaged);

requests[target].push_back(req);
(*requests)[target].push_back(req);
}

// send requests to all locality
requests_type::const_iterator end = requests.end();
for (requests_type::const_iterator it = requests.begin(); it != end; ++it)
requests_type::const_iterator end = requests->end();
for (requests_type::const_iterator it = requests->begin(); it != end; ++it)
{
stubs::primary_namespace::bulk_service_non_blocking(
(*it).first, (*it).second, action_priority_);
(*it).first, (*it).second,
util::bind(&keep_data_alive<requests_type>, requests),
action_priority_);
}

if (&ec != &throws)
Expand Down Expand Up @@ -2989,7 +2998,8 @@ addressing_service::send_refcnt_requests_async(

// collect all requests for each locality
typedef std::map<naming::id_type, std::vector<request> > requests_type;
requests_type requests;
boost::shared_ptr<requests_type> requests =
boost::make_shared<requests_type>();

std::vector<hpx::future<std::vector<response> > > lazy_results;
for (refcnt_requests_type::const_reference e : *p)
Expand All @@ -3003,16 +3013,21 @@ addressing_service::send_refcnt_requests_async(
stubs::primary_namespace::get_service_instance(raw)
, naming::id_type::unmanaged);

requests[target].push_back(req);
(*requests)[target].push_back(req);
}

// send requests to all locality
requests_type::const_iterator end = requests.end();
for (requests_type::const_iterator it = requests.begin(); it != end; ++it)
requests_type::const_iterator end = requests->end();
for (requests_type::const_iterator it = requests->begin(); it != end; ++it)
{
lazy_results.push_back(
// make sure requests vector goes out of scope only after the
// parcel-send operation is complete
future<std::vector<response> > f =
stubs::primary_namespace::bulk_service_async(
(*it).first, (*it).second, action_priority_));
(*it).first, (*it).second,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even though the requests are stored in a shared pointer which is kept alive by the callback function, the requests are copied here. The actual requests that are part of the asynchronous communication are not being held alive, by the bound callback function.

util::bind(&keep_data_alive<requests_type>, requests),
action_priority_);
lazy_results.push_back(std::move(f));
}

return lazy_results;
Expand Down
25 changes: 20 additions & 5 deletions src/runtime/agas/stubs/primary_namespace_stubs.cpp
Expand Up @@ -63,13 +63,12 @@ template lcos::future<std::pair<naming::id_type, naming::address> >
void primary_namespace::service_non_blocking(
naming::id_type const& gid
, request const& req
, util::function_nonser<void(boost::system::error_code const&,
parcelset::parcel const&)> const& f
, parcel_sent_func const& cb
, threads::thread_priority priority
)
{
typedef server_type::service_action action_type;
hpx::apply_p_cb<action_type>(gid, priority, f, req);
hpx::apply_p_cb<action_type>(gid, priority, cb, req);
}

void primary_namespace::service_non_blocking(
Expand All @@ -82,28 +81,44 @@ void primary_namespace::service_non_blocking(
hpx::apply_p<action_type>(gid, priority, req);
}

std::vector<response> primary_namespace::bulk_service(
naming::id_type const& gid
, std::vector<request> const& reqs
, threads::thread_priority priority
, error_code& ec
)
{
typedef server_type::bulk_service_action action_type;

lcos::packaged_action<action_type> p;
p.apply_p(launch::async, gid, priority, reqs);
return p.get_future().get(ec);
}

lcos::future<std::vector<response> >
primary_namespace::bulk_service_async(
naming::id_type const& gid
, std::vector<request> const& reqs
, parcel_sent_func const& cb
, threads::thread_priority priority
)
{
typedef server_type::bulk_service_action action_type;

lcos::packaged_action<action_type> p;
p.apply_p(launch::async, gid, priority, reqs);
p.apply_p_cb(launch::async, gid, priority, cb, reqs);
return p.get_future();
}

void primary_namespace::bulk_service_non_blocking(
naming::id_type const& gid
, std::vector<request> const& reqs
, parcel_sent_func const& cb
, threads::thread_priority priority
)
{
typedef server_type::bulk_service_action action_type;
hpx::apply_p<action_type>(gid, priority, reqs);
hpx::apply_p_cb<action_type>(gid, priority, cb, reqs);
}

}}}
Expand Down