Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix apex annotation for async dispatch #4257

Merged
merged 8 commits into from
Dec 6, 2019
12 changes: 7 additions & 5 deletions hpx/async_launch_policy_dispatch.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ namespace hpx { namespace detail
std::forward<F>(f), std::forward<Ts>(ts)...));
if (hpx::detail::has_async_policy(policy))
{
threads::thread_id_type tid = p.apply(pool, policy,
threads::thread_id_type tid = p.apply(pool,
"async_launch_policy_dispatch", policy,
policy.priority(), threads::thread_stacksize_default, hint);
if (tid && policy == launch::fork)
{
Expand Down Expand Up @@ -147,8 +148,8 @@ namespace hpx { namespace detail
lcos::local::futures_factory<result_type()> p(util::deferred_call(
std::forward<F>(f), std::forward<Ts>(ts)...));

p.apply(pool, policy, policy.priority(),
threads::thread_stacksize_default, hint);
p.apply(pool, "async_launch_policy_dispatch::call", policy,
policy.priority(), threads::thread_stacksize_default, hint);
return p.get_future();
}

Expand Down Expand Up @@ -181,8 +182,9 @@ namespace hpx { namespace detail
std::forward<F>(f), std::forward<Ts>(ts)...));

// make sure this thread is executed last
threads::thread_id_type tid = p.apply(pool, policy,
policy.priority(), threads::thread_stacksize_default, hint);
threads::thread_id_type tid = p.apply(pool,
"async_launch_policy_dispatch::call", policy, policy.priority(),
threads::thread_stacksize_default, hint);
threads::thread_id_type tid_self = threads::get_self_id();
if (tid && tid_self &&
get_thread_id_data(tid)->get_scheduler_base() ==
Expand Down
4 changes: 3 additions & 1 deletion hpx/lcos/detail/future_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -864,7 +864,9 @@ namespace hpx { namespace lcos { namespace detail {

// run in a separate thread
virtual threads::thread_id_type apply(
threads::thread_pool_base* /*pool*/, launch /*policy*/,
threads::thread_pool_base* /*pool*/,
const char */*annotation*/,
launch /*policy*/,
threads::thread_priority /*priority*/,
threads::thread_stacksize /*stacksize*/,
threads::thread_schedule_hint /*schedulehint*/, error_code& /*ec*/)
Expand Down
23 changes: 15 additions & 8 deletions hpx/lcos/local/futures_factory.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,9 @@ namespace hpx { namespace lcos { namespace local {
protected:
// run in a separate thread
threads::thread_id_type apply(threads::thread_pool_base* pool,
launch policy, threads::thread_priority priority,
const char *annotation,
launch policy,
threads::thread_priority priority,
threads::thread_stacksize stacksize,
threads::thread_schedule_hint schedulehint,
error_code& ec) override
Expand All @@ -119,7 +121,7 @@ namespace hpx { namespace lcos { namespace local {
return threads::register_thread_nullary(pool,
util::deferred_call(
&base_type::run_impl, std::move(this_)),
util::thread_description(f_, "task_object::apply"),
util::thread_description(f_, annotation),
threads::pending_do_not_schedule, true,
threads::thread_priority_boost,
threads::thread_schedule_hint(
Expand Down Expand Up @@ -247,6 +249,7 @@ namespace hpx { namespace lcos { namespace local {
protected:
// run in a separate thread
threads::thread_id_type apply(threads::thread_pool_base* pool,
const char *annotation,
launch policy, threads::thread_priority priority,
threads::thread_stacksize stacksize,
threads::thread_schedule_hint schedulehint,
Expand All @@ -262,7 +265,7 @@ namespace hpx { namespace lcos { namespace local {
parallel::execution::post(*exec_,
util::deferred_call(
&base_type::run_impl, std::move(this_)),
schedulehint);
schedulehint, annotation);
return threads::invalid_thread_id;
}
else if (policy == launch::fork)
Expand All @@ -271,7 +274,7 @@ namespace hpx { namespace lcos { namespace local {
util::deferred_call(
&base_type::run_impl, std::move(this_)),
util::thread_description(
this->f_, "task_object::apply"),
this->f_, annotation),
threads::pending_do_not_schedule, true,
threads::thread_priority_boost,
threads::thread_schedule_hint(
Expand All @@ -284,7 +287,7 @@ namespace hpx { namespace lcos { namespace local {
util::deferred_call(
&base_type::run_impl, std::move(this_)),
util::thread_description(
this->f_, "task_object::apply"),
this->f_, annotation),
threads::pending, false, priority, schedulehint,
stacksize, ec);
return threads::invalid_thread_id;
Expand Down Expand Up @@ -777,7 +780,9 @@ namespace hpx { namespace lcos { namespace local {
}

// asynchronous execution
threads::thread_id_type apply(launch policy = launch::async,
threads::thread_id_type apply(
const char *annotation = "futures_factory::apply",
launch policy = launch::async,
threads::thread_priority priority =
threads::thread_priority_default,
threads::thread_stacksize stacksize =
Expand All @@ -786,11 +791,13 @@ namespace hpx { namespace lcos { namespace local {
threads::thread_schedule_hint(),
error_code& ec = throws) const
{
return apply(threads::detail::get_self_or_default_pool(), policy,
return apply(threads::detail::get_self_or_default_pool(),
annotation, policy,
priority, stacksize, schedulehint, ec);
}

threads::thread_id_type apply(threads::thread_pool_base* pool,
const char *annotation = "futures_factory::apply",
launch policy = launch::async,
threads::thread_priority priority =
threads::thread_priority_default,
Expand All @@ -808,7 +815,7 @@ namespace hpx { namespace lcos { namespace local {
return threads::invalid_thread_id;
}
return task_->apply(
pool, policy, priority, stacksize, schedulehint, ec);
pool, annotation, policy, priority, stacksize, schedulehint, ec);
}

// This is the same as get_future, except that it moves the
Expand Down
4 changes: 2 additions & 2 deletions hpx/lcos/local/packaged_continuation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ namespace hpx { namespace lcos { namespace detail
}

hpx::intrusive_ptr<continuation> this_(this);
hpx::util::thread_description desc(
hpx::util::thread_description desc(f_,
"hpx::parallel::execution::parallel_executor::post");

parallel::execution::detail::post_policy_dispatch<
Expand Down Expand Up @@ -422,7 +422,7 @@ namespace hpx { namespace lcos { namespace detail
}

hpx::intrusive_ptr<continuation> this_(this);
hpx::util::thread_description desc(
hpx::util::thread_description desc(f_,
"hpx::parallel::execution::parallel_executor::post");

parallel::execution::detail::post_policy_dispatch<
Expand Down
68 changes: 31 additions & 37 deletions hpx/runtime/threads/executors/guided_pool_executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,18 +112,16 @@ namespace hpx { namespace threads { namespace executors
util::deferred_call(std::forward<F>(f), std::forward<Ts>(ts)...));

if (hp_sync_ &&
executor_.get_priority() == hpx::threads::thread_priority_high) {
p.apply(
hpx::launch::sync,
executor_.get_priority(),
executor_.get_priority() == hpx::threads::thread_priority_high)
{
p.apply("guided async", hpx::launch::sync, executor_.get_priority(),
executor_.get_stacksize(),
threads::thread_schedule_hint(
thread_schedule_hint_mode_numa, domain));
}
else {
p.apply(
hpx::launch::async,
executor_.get_priority(),
else
{
p.apply("guided async", hpx::launch::async, executor_.get_priority(),
executor_.get_stacksize(),
threads::thread_schedule_hint(
thread_schedule_hint_mode_numa, domain));
Expand Down Expand Up @@ -174,18 +172,16 @@ namespace hpx { namespace threads { namespace executors
);

if (hp_sync_ &&
executor_.get_priority() == hpx::threads::thread_priority_high) {
p.apply(
hpx::launch::sync,
executor_.get_priority(),
executor_.get_priority() == hpx::threads::thread_priority_high)
{
p.apply("guided then", hpx::launch::sync, executor_.get_priority(),
executor_.get_stacksize(),
threads::thread_schedule_hint(
thread_schedule_hint_mode_numa, domain));
}
else {
p.apply(
hpx::launch::async,
executor_.get_priority(),
else
{
p.apply("guided then", hpx::launch::async, executor_.get_priority(),
executor_.get_stacksize(),
threads::thread_schedule_hint(
thread_schedule_hint_mode_numa, domain));
Expand Down Expand Up @@ -440,18 +436,17 @@ namespace hpx { namespace threads { namespace executors
);

if (hp_sync_ &&
pool_executor_.get_priority() == hpx::threads::thread_priority_high) {
p.apply(
hpx::launch::sync,
pool_executor_.get_priority(),
pool_executor_.get_priority() ==
hpx::threads::thread_priority_high)
{
p.apply("guided async", hpx::launch::sync, pool_executor_.get_priority(),
pool_executor_.get_stacksize(),
threads::thread_schedule_hint(
thread_schedule_hint_mode_numa, domain));
}
else {
p.apply(
hpx::launch::async,
pool_executor_.get_priority(),
else
{
p.apply("guided async", hpx::launch::async, pool_executor_.get_priority(),
pool_executor_.get_stacksize(),
threads::thread_schedule_hint(
thread_schedule_hint_mode_numa, domain));
Expand Down Expand Up @@ -503,19 +498,18 @@ namespace hpx { namespace threads { namespace executors
future<typename util::detail::invoke_deferred_result<F, Ts...>::type>
async_execute(F && f, Ts &&... ts)
{
if (guided_) return guided_exec_.async_execute(
std::forward<F>(f), std::forward<Ts>(ts)...);
else {
typedef typename util::detail::invoke_deferred_result<F, Ts...>::type
result_type;

lcos::local::futures_factory<result_type()> p(
pool_exec_,
util::deferred_call(std::forward<F>(f), std::forward<Ts>(ts)...)
);
p.apply(
hpx::launch::async,
pool_exec_.get_priority(),
if (guided_)
return guided_exec_.async_execute(
std::forward<F>(f), std::forward<Ts>(ts)...);
else
{
typedef typename util::detail::invoke_deferred_result<F,
Ts...>::type result_type;

lcos::local::futures_factory<result_type()> p(pool_exec_,
util::deferred_call(
std::forward<F>(f), std::forward<Ts>(ts)...));
p.apply("guided async", hpx::launch::async, pool_exec_.get_priority(),
pool_exec_.get_stacksize(),
threads::thread_schedule_hint()
);
Expand Down
2 changes: 1 addition & 1 deletion libs/affinity/tests/unit/parse_affinity_options.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@

#include <hpx/hpx_init.hpp>

#include <hpx/affinity/parse_affinity_options.hpp>
#include <hpx/include/threads.hpp>
#include <hpx/runtime/threads/policies/parse_affinity_options.hpp>
#include <hpx/testing.hpp>

#include <algorithm>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,12 @@ namespace hpx { namespace threads {
typedef typename util::detail::invoke_deferred_result<F, Ts...>::type
result_type;

char const* annotation = hpx::traits::get_function_annotation<
typename hpx::util::decay<F>::type>::call(f);
lcos::local::futures_factory<result_type()> p(
std::forward<Executor>(exec),
util::deferred_call(std::forward<F>(f), std::forward<Ts>(ts)...));
p.apply();
p.apply(annotation);
return p.get_future();
}

Expand Down Expand Up @@ -103,10 +105,12 @@ namespace hpx { namespace threads {
hpx::traits::is_threads_executor<Executor>::value>::type
post(Executor&& exec, F&& f, Ts&&... ts)
{
char const* annotation = hpx::traits::get_function_annotation<
typename hpx::util::decay<F>::type>::call(f);
exec.add(
util::deferred_call(std::forward<F>(f), std::forward<Ts>(ts)...),
"hpx::parallel::execution::post", threads::pending, true,
exec.get_stacksize(), threads::thread_schedule_hint(), throws);
annotation, threads::pending, true, exec.get_stacksize(),
threads::thread_schedule_hint(), throws);
}
///////////////////////////////////////////////////////////////////////////
// post()
Expand All @@ -115,12 +119,13 @@ namespace hpx { namespace threads {
hpx::traits::is_threads_executor<Executor>::value &&
std::is_same<typename hpx::util::decay<Hint>::type,
hpx::threads::thread_schedule_hint>::value>::type
post(Executor&& exec, F&& f, Hint&& hint, Ts&&... ts)
post(
Executor&& exec, F&& f, Hint&& hint, const char* annotation, Ts&&... ts)
{
exec.add(
util::deferred_call(std::forward<F>(f), std::forward<Ts>(ts)...),
"hpx::parallel::execution::post", threads::pending, true,
exec.get_stacksize(), std::forward<Hint>(hint), throws);
annotation, threads::pending, true, exec.get_stacksize(),
std::forward<Hint>(hint), throws);
}

///////////////////////////////////////////////////////////////////////////
Expand Down
8 changes: 4 additions & 4 deletions libs/resource_partitioner/examples/async_customization.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ struct test_async_executor
lcos::local::futures_factory<result_type()> p(executor_,
util::deferred_call(std::forward<F>(f), std::forward<Ts>(ts)...));

p.apply(launch::async, threads::thread_priority_default,
p.apply("custom", launch::async, threads::thread_priority_default,
threads::thread_stacksize_default);

return p.get_future();
Expand Down Expand Up @@ -152,7 +152,7 @@ struct test_async_executor
util::deferred_call(std::forward<F>(f),
std::forward<Future>(predecessor), std::forward<Ts>(ts)...));

p.apply(launch::async, threads::thread_priority_default,
p.apply("custom then", launch::async, threads::thread_priority_default,
threads::thread_stacksize_default);

return p.get_future();
Expand Down Expand Up @@ -212,7 +212,7 @@ struct test_async_executor
predecessor),
std::forward<Ts>(ts)...));

p.apply(launch::async, threads::thread_priority_default,
p.apply("custom then", launch::async, threads::thread_priority_default,
threads::thread_stacksize_default);

return p.get_future();
Expand Down Expand Up @@ -260,7 +260,7 @@ struct test_async_executor
util::deferred_call(std::forward<F>(f),
std::forward<util::tuple<InnerFutures...>>(predecessor)));

p.apply(launch::async, threads::thread_priority_default,
p.apply("custom async", launch::async, threads::thread_priority_default,
threads::thread_stacksize_default);

return p.get_future();
Expand Down
2 changes: 1 addition & 1 deletion src/lcos/detail/future_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ namespace hpx { namespace lcos { namespace detail
policy = launch::async;

// launch a new thread executing the given function
threads::thread_id_type tid = p.apply(
threads::thread_id_type tid = p.apply("run_on_completed_on_new_thread",
policy, threads::thread_priority_boost,
threads::thread_stacksize_current,
threads::thread_schedule_hint());
Expand Down
2 changes: 1 addition & 1 deletion src/util/backtrace/backtrace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ namespace hpx { namespace util {
stack_trace::get_symbols, &frames_.front(), frames_.size()));

error_code ec(lightweight);
threads::thread_id_type tid = p.apply(
threads::thread_id_type tid = p.apply("backtrace",
launch::fork, threads::thread_priority_default,
threads::thread_stacksize_medium,
threads::thread_schedule_hint(),
Expand Down
Loading