Skip to content

Commit

Permalink
Fix apex annotation for async dispatch
Browse files Browse the repository at this point in the history
When an annotated task is passed though the async dispatch, we can strip
off the task 'name' or annotation and pass it through to the thread
creation so that it is used when the task is initialized and apex
get the right name. We make use of
traits::get_function_annotation<typename hpx::util::decay<F>::type>::call(f)
to achieve this.
  • Loading branch information
biddisco committed Dec 3, 2019
1 parent ef28317 commit a965665
Show file tree
Hide file tree
Showing 8 changed files with 63 additions and 57 deletions.
3 changes: 2 additions & 1 deletion hpx/async_launch_policy_dispatch.hpp
Expand Up @@ -90,7 +90,8 @@ namespace hpx { namespace detail
std::forward<F>(f), std::forward<Ts>(ts)...));
if (hpx::detail::has_async_policy(policy))
{
threads::thread_id_type tid = p.apply(pool, policy,
threads::thread_id_type tid = p.apply(pool,
"async_launch_policy_dispatch", policy,
policy.priority(), threads::thread_stacksize_default, hint);
if (tid && policy == launch::fork)
{
Expand Down
4 changes: 3 additions & 1 deletion hpx/lcos/detail/future_data.hpp
Expand Up @@ -864,7 +864,9 @@ namespace hpx { namespace lcos { namespace detail {

// run in a separate thread
virtual threads::thread_id_type apply(
threads::thread_pool_base* /*pool*/, launch /*policy*/,
threads::thread_pool_base* /*pool*/,
const char */*annotation*/,
launch /*policy*/,
threads::thread_priority /*priority*/,
threads::thread_stacksize /*stacksize*/,
threads::thread_schedule_hint /*schedulehint*/, error_code& /*ec*/)
Expand Down
23 changes: 15 additions & 8 deletions hpx/lcos/local/futures_factory.hpp
Expand Up @@ -104,7 +104,9 @@ namespace hpx { namespace lcos { namespace local {
protected:
// run in a separate thread
threads::thread_id_type apply(threads::thread_pool_base* pool,
launch policy, threads::thread_priority priority,
const char *annotation,
launch policy,
threads::thread_priority priority,
threads::thread_stacksize stacksize,
threads::thread_schedule_hint schedulehint,
error_code& ec) override
Expand All @@ -119,7 +121,7 @@ namespace hpx { namespace lcos { namespace local {
return threads::register_thread_nullary(pool,
util::deferred_call(
&base_type::run_impl, std::move(this_)),
util::thread_description(f_, "task_object::apply"),
util::thread_description(f_, annotation),
threads::pending_do_not_schedule, true,
threads::thread_priority_boost,
threads::thread_schedule_hint(
Expand Down Expand Up @@ -247,6 +249,7 @@ namespace hpx { namespace lcos { namespace local {
protected:
// run in a separate thread
threads::thread_id_type apply(threads::thread_pool_base* pool,
const char *annotation,
launch policy, threads::thread_priority priority,
threads::thread_stacksize stacksize,
threads::thread_schedule_hint schedulehint,
Expand All @@ -262,7 +265,7 @@ namespace hpx { namespace lcos { namespace local {
parallel::execution::post(*exec_,
util::deferred_call(
&base_type::run_impl, std::move(this_)),
schedulehint);
schedulehint, annotation);
return threads::invalid_thread_id;
}
else if (policy == launch::fork)
Expand All @@ -271,7 +274,7 @@ namespace hpx { namespace lcos { namespace local {
util::deferred_call(
&base_type::run_impl, std::move(this_)),
util::thread_description(
this->f_, "task_object::apply"),
this->f_, annotation),
threads::pending_do_not_schedule, true,
threads::thread_priority_boost,
threads::thread_schedule_hint(
Expand All @@ -284,7 +287,7 @@ namespace hpx { namespace lcos { namespace local {
util::deferred_call(
&base_type::run_impl, std::move(this_)),
util::thread_description(
this->f_, "task_object::apply"),
this->f_, annotation),
threads::pending, false, priority, schedulehint,
stacksize, ec);
return threads::invalid_thread_id;
Expand Down Expand Up @@ -777,7 +780,9 @@ namespace hpx { namespace lcos { namespace local {
}

// asynchronous execution
threads::thread_id_type apply(launch policy = launch::async,
threads::thread_id_type apply(
const char *annotation = "futures_factory::apply",
launch policy = launch::async,
threads::thread_priority priority =
threads::thread_priority_default,
threads::thread_stacksize stacksize =
Expand All @@ -786,11 +791,13 @@ namespace hpx { namespace lcos { namespace local {
threads::thread_schedule_hint(),
error_code& ec = throws) const
{
return apply(threads::detail::get_self_or_default_pool(), policy,
return apply(threads::detail::get_self_or_default_pool(),
annotation, policy,
priority, stacksize, schedulehint, ec);
}

threads::thread_id_type apply(threads::thread_pool_base* pool,
const char *annotation = "futures_factory::apply",
launch policy = launch::async,
threads::thread_priority priority =
threads::thread_priority_default,
Expand All @@ -808,7 +815,7 @@ namespace hpx { namespace lcos { namespace local {
return threads::invalid_thread_id;
}
return task_->apply(
pool, policy, priority, stacksize, schedulehint, ec);
pool, annotation, policy, priority, stacksize, schedulehint, ec);
}

// This is the same as get_future, except that it moves the
Expand Down
68 changes: 31 additions & 37 deletions hpx/runtime/threads/executors/guided_pool_executor.hpp
Expand Up @@ -112,18 +112,16 @@ namespace hpx { namespace threads { namespace executors
util::deferred_call(std::forward<F>(f), std::forward<Ts>(ts)...));

if (hp_sync_ &&
executor_.get_priority() == hpx::threads::thread_priority_high) {
p.apply(
hpx::launch::sync,
executor_.get_priority(),
executor_.get_priority() == hpx::threads::thread_priority_high)
{
p.apply("guided async", hpx::launch::sync, executor_.get_priority(),
executor_.get_stacksize(),
threads::thread_schedule_hint(
thread_schedule_hint_mode_numa, domain));
}
else {
p.apply(
hpx::launch::async,
executor_.get_priority(),
else
{
p.apply("guided async", hpx::launch::async, executor_.get_priority(),
executor_.get_stacksize(),
threads::thread_schedule_hint(
thread_schedule_hint_mode_numa, domain));
Expand Down Expand Up @@ -174,18 +172,16 @@ namespace hpx { namespace threads { namespace executors
);

if (hp_sync_ &&
executor_.get_priority() == hpx::threads::thread_priority_high) {
p.apply(
hpx::launch::sync,
executor_.get_priority(),
executor_.get_priority() == hpx::threads::thread_priority_high)
{
p.apply("guided then", hpx::launch::sync, executor_.get_priority(),
executor_.get_stacksize(),
threads::thread_schedule_hint(
thread_schedule_hint_mode_numa, domain));
}
else {
p.apply(
hpx::launch::async,
executor_.get_priority(),
else
{
p.apply("guided then", hpx::launch::async, executor_.get_priority(),
executor_.get_stacksize(),
threads::thread_schedule_hint(
thread_schedule_hint_mode_numa, domain));
Expand Down Expand Up @@ -440,18 +436,17 @@ namespace hpx { namespace threads { namespace executors
);

if (hp_sync_ &&
pool_executor_.get_priority() == hpx::threads::thread_priority_high) {
p.apply(
hpx::launch::sync,
pool_executor_.get_priority(),
pool_executor_.get_priority() ==
hpx::threads::thread_priority_high)
{
p.apply("guided async", hpx::launch::sync, pool_executor_.get_priority(),
pool_executor_.get_stacksize(),
threads::thread_schedule_hint(
thread_schedule_hint_mode_numa, domain));
}
else {
p.apply(
hpx::launch::async,
pool_executor_.get_priority(),
else
{
p.apply("guided async", hpx::launch::async, pool_executor_.get_priority(),
pool_executor_.get_stacksize(),
threads::thread_schedule_hint(
thread_schedule_hint_mode_numa, domain));
Expand Down Expand Up @@ -503,19 +498,18 @@ namespace hpx { namespace threads { namespace executors
future<typename util::detail::invoke_deferred_result<F, Ts...>::type>
async_execute(F && f, Ts &&... ts)
{
if (guided_) return guided_exec_.async_execute(
std::forward<F>(f), std::forward<Ts>(ts)...);
else {
typedef typename util::detail::invoke_deferred_result<F, Ts...>::type
result_type;

lcos::local::futures_factory<result_type()> p(
pool_exec_,
util::deferred_call(std::forward<F>(f), std::forward<Ts>(ts)...)
);
p.apply(
hpx::launch::async,
pool_exec_.get_priority(),
if (guided_)
return guided_exec_.async_execute(
std::forward<F>(f), std::forward<Ts>(ts)...);
else
{
typedef typename util::detail::invoke_deferred_result<F,
Ts...>::type result_type;

lcos::local::futures_factory<result_type()> p(pool_exec_,
util::deferred_call(
std::forward<F>(f), std::forward<Ts>(ts)...));
p.apply("guided async", hpx::launch::async, pool_exec_.get_priority(),
pool_exec_.get_stacksize(),
threads::thread_schedule_hint()
);
Expand Down
Expand Up @@ -54,7 +54,7 @@ namespace hpx { namespace threads {
lcos::local::futures_factory<result_type()> p(
std::forward<Executor>(exec),
util::deferred_call(std::forward<F>(f), std::forward<Ts>(ts)...));
p.apply();
p.apply(hpx::traits::get_function_annotation<typename hpx::util::decay<F>::type>::call(f));
return p.get_future();
}

Expand Down Expand Up @@ -105,7 +105,8 @@ namespace hpx { namespace threads {
{
exec.add(
util::deferred_call(std::forward<F>(f), std::forward<Ts>(ts)...),
"hpx::parallel::execution::post", threads::pending, true,
hpx::traits::get_function_annotation<typename hpx::util::decay<F>::type>::call(f),
threads::pending, true,
exec.get_stacksize(), threads::thread_schedule_hint(), throws);
}
///////////////////////////////////////////////////////////////////////////
Expand All @@ -115,11 +116,12 @@ namespace hpx { namespace threads {
hpx::traits::is_threads_executor<Executor>::value &&
std::is_same<typename hpx::util::decay<Hint>::type,
hpx::threads::thread_schedule_hint>::value>::type
post(Executor&& exec, F&& f, Hint&& hint, Ts&&... ts)
post(Executor&& exec, F&& f, Hint&& hint, const char *annotation, Ts&&... ts)
{
exec.add(
util::deferred_call(std::forward<F>(f), std::forward<Ts>(ts)...),
"hpx::parallel::execution::post", threads::pending, true,
annotation,
threads::pending, true,
exec.get_stacksize(), std::forward<Hint>(hint), throws);
}

Expand Down
8 changes: 4 additions & 4 deletions libs/resource_partitioner/examples/async_customization.cpp
Expand Up @@ -114,7 +114,7 @@ struct test_async_executor
lcos::local::futures_factory<result_type()> p(executor_,
util::deferred_call(std::forward<F>(f), std::forward<Ts>(ts)...));

p.apply(launch::async, threads::thread_priority_default,
p.apply("custom", launch::async, threads::thread_priority_default,
threads::thread_stacksize_default);

return p.get_future();
Expand Down Expand Up @@ -152,7 +152,7 @@ struct test_async_executor
util::deferred_call(std::forward<F>(f),
std::forward<Future>(predecessor), std::forward<Ts>(ts)...));

p.apply(launch::async, threads::thread_priority_default,
p.apply("custom then", launch::async, threads::thread_priority_default,
threads::thread_stacksize_default);

return p.get_future();
Expand Down Expand Up @@ -212,7 +212,7 @@ struct test_async_executor
predecessor),
std::forward<Ts>(ts)...));

p.apply(launch::async, threads::thread_priority_default,
p.apply("custom then", launch::async, threads::thread_priority_default,
threads::thread_stacksize_default);

return p.get_future();
Expand Down Expand Up @@ -260,7 +260,7 @@ struct test_async_executor
util::deferred_call(std::forward<F>(f),
std::forward<util::tuple<InnerFutures...>>(predecessor)));

p.apply(launch::async, threads::thread_priority_default,
p.apply("custom async", launch::async, threads::thread_priority_default,
threads::thread_stacksize_default);

return p.get_future();
Expand Down
2 changes: 1 addition & 1 deletion src/lcos/detail/future_data.cpp
Expand Up @@ -59,7 +59,7 @@ namespace hpx { namespace lcos { namespace detail
policy = launch::async;

// launch a new thread executing the given function
threads::thread_id_type tid = p.apply(
threads::thread_id_type tid = p.apply("run_on_completed_on_new_thread",
policy, threads::thread_priority_boost,
threads::thread_stacksize_current,
threads::thread_schedule_hint());
Expand Down
2 changes: 1 addition & 1 deletion src/util/backtrace/backtrace.cpp
Expand Up @@ -414,7 +414,7 @@ namespace hpx { namespace util {
stack_trace::get_symbols, &frames_.front(), frames_.size()));

error_code ec(lightweight);
threads::thread_id_type tid = p.apply(
threads::thread_id_type tid = p.apply("backtrace",
launch::fork, threads::thread_priority_default,
threads::thread_stacksize_medium,
threads::thread_schedule_hint(),
Expand Down

0 comments on commit a965665

Please sign in to comment.