From a965665cea5d3efdee40465106fb62dcd4ac1753 Mon Sep 17 00:00:00 2001 From: John Biddiscombe Date: Tue, 3 Dec 2019 11:33:43 +0100 Subject: [PATCH] Fix apex annotation for async dispatch When an annotated task is passed though the async dispatch, we can strip off the task 'name' or annotation and pass it through to the thread creation so that it is used when the task is initialized and apex get the right name. We make use of traits::get_function_annotation::type>::call(f) to achieve this. --- hpx/async_launch_policy_dispatch.hpp | 3 +- hpx/lcos/detail/future_data.hpp | 4 +- hpx/lcos/local/futures_factory.hpp | 23 ++++--- .../executors/guided_pool_executor.hpp | 68 +++++++++---------- .../parallel/executors/thread_execution.hpp | 10 +-- .../examples/async_customization.cpp | 8 +-- src/lcos/detail/future_data.cpp | 2 +- src/util/backtrace/backtrace.cpp | 2 +- 8 files changed, 63 insertions(+), 57 deletions(-) diff --git a/hpx/async_launch_policy_dispatch.hpp b/hpx/async_launch_policy_dispatch.hpp index 11b7902a79e4..f46f829c323c 100644 --- a/hpx/async_launch_policy_dispatch.hpp +++ b/hpx/async_launch_policy_dispatch.hpp @@ -90,7 +90,8 @@ namespace hpx { namespace detail std::forward(f), std::forward(ts)...)); if (hpx::detail::has_async_policy(policy)) { - threads::thread_id_type tid = p.apply(pool, policy, + threads::thread_id_type tid = p.apply(pool, + "async_launch_policy_dispatch", policy, policy.priority(), threads::thread_stacksize_default, hint); if (tid && policy == launch::fork) { diff --git a/hpx/lcos/detail/future_data.hpp b/hpx/lcos/detail/future_data.hpp index bef95da7e28e..51feaf2eef8a 100644 --- a/hpx/lcos/detail/future_data.hpp +++ b/hpx/lcos/detail/future_data.hpp @@ -864,7 +864,9 @@ namespace hpx { namespace lcos { namespace detail { // run in a separate thread virtual threads::thread_id_type apply( - threads::thread_pool_base* /*pool*/, launch /*policy*/, + threads::thread_pool_base* /*pool*/, + const char */*annotation*/, + launch /*policy*/, threads::thread_priority /*priority*/, threads::thread_stacksize /*stacksize*/, threads::thread_schedule_hint /*schedulehint*/, error_code& /*ec*/) diff --git a/hpx/lcos/local/futures_factory.hpp b/hpx/lcos/local/futures_factory.hpp index 1bb34c7cb97a..2682313f5037 100644 --- a/hpx/lcos/local/futures_factory.hpp +++ b/hpx/lcos/local/futures_factory.hpp @@ -104,7 +104,9 @@ namespace hpx { namespace lcos { namespace local { protected: // run in a separate thread threads::thread_id_type apply(threads::thread_pool_base* pool, - launch policy, threads::thread_priority priority, + const char *annotation, + launch policy, + threads::thread_priority priority, threads::thread_stacksize stacksize, threads::thread_schedule_hint schedulehint, error_code& ec) override @@ -119,7 +121,7 @@ namespace hpx { namespace lcos { namespace local { return threads::register_thread_nullary(pool, util::deferred_call( &base_type::run_impl, std::move(this_)), - util::thread_description(f_, "task_object::apply"), + util::thread_description(f_, annotation), threads::pending_do_not_schedule, true, threads::thread_priority_boost, threads::thread_schedule_hint( @@ -247,6 +249,7 @@ namespace hpx { namespace lcos { namespace local { protected: // run in a separate thread threads::thread_id_type apply(threads::thread_pool_base* pool, + const char *annotation, launch policy, threads::thread_priority priority, threads::thread_stacksize stacksize, threads::thread_schedule_hint schedulehint, @@ -262,7 +265,7 @@ namespace hpx { namespace lcos { namespace local { parallel::execution::post(*exec_, util::deferred_call( &base_type::run_impl, std::move(this_)), - schedulehint); + schedulehint, annotation); return threads::invalid_thread_id; } else if (policy == launch::fork) @@ -271,7 +274,7 @@ namespace hpx { namespace lcos { namespace local { util::deferred_call( &base_type::run_impl, std::move(this_)), util::thread_description( - this->f_, "task_object::apply"), + this->f_, annotation), threads::pending_do_not_schedule, true, threads::thread_priority_boost, threads::thread_schedule_hint( @@ -284,7 +287,7 @@ namespace hpx { namespace lcos { namespace local { util::deferred_call( &base_type::run_impl, std::move(this_)), util::thread_description( - this->f_, "task_object::apply"), + this->f_, annotation), threads::pending, false, priority, schedulehint, stacksize, ec); return threads::invalid_thread_id; @@ -777,7 +780,9 @@ namespace hpx { namespace lcos { namespace local { } // asynchronous execution - threads::thread_id_type apply(launch policy = launch::async, + threads::thread_id_type apply( + const char *annotation = "futures_factory::apply", + launch policy = launch::async, threads::thread_priority priority = threads::thread_priority_default, threads::thread_stacksize stacksize = @@ -786,11 +791,13 @@ namespace hpx { namespace lcos { namespace local { threads::thread_schedule_hint(), error_code& ec = throws) const { - return apply(threads::detail::get_self_or_default_pool(), policy, + return apply(threads::detail::get_self_or_default_pool(), + annotation, policy, priority, stacksize, schedulehint, ec); } threads::thread_id_type apply(threads::thread_pool_base* pool, + const char *annotation = "futures_factory::apply", launch policy = launch::async, threads::thread_priority priority = threads::thread_priority_default, @@ -808,7 +815,7 @@ namespace hpx { namespace lcos { namespace local { return threads::invalid_thread_id; } return task_->apply( - pool, policy, priority, stacksize, schedulehint, ec); + pool, annotation, policy, priority, stacksize, schedulehint, ec); } // This is the same as get_future, except that it moves the diff --git a/hpx/runtime/threads/executors/guided_pool_executor.hpp b/hpx/runtime/threads/executors/guided_pool_executor.hpp index 5daa5a7a5968..8bf6f1129961 100644 --- a/hpx/runtime/threads/executors/guided_pool_executor.hpp +++ b/hpx/runtime/threads/executors/guided_pool_executor.hpp @@ -112,18 +112,16 @@ namespace hpx { namespace threads { namespace executors util::deferred_call(std::forward(f), std::forward(ts)...)); if (hp_sync_ && - executor_.get_priority() == hpx::threads::thread_priority_high) { - p.apply( - hpx::launch::sync, - executor_.get_priority(), + executor_.get_priority() == hpx::threads::thread_priority_high) + { + p.apply("guided async", hpx::launch::sync, executor_.get_priority(), executor_.get_stacksize(), threads::thread_schedule_hint( thread_schedule_hint_mode_numa, domain)); } - else { - p.apply( - hpx::launch::async, - executor_.get_priority(), + else + { + p.apply("guided async", hpx::launch::async, executor_.get_priority(), executor_.get_stacksize(), threads::thread_schedule_hint( thread_schedule_hint_mode_numa, domain)); @@ -174,18 +172,16 @@ namespace hpx { namespace threads { namespace executors ); if (hp_sync_ && - executor_.get_priority() == hpx::threads::thread_priority_high) { - p.apply( - hpx::launch::sync, - executor_.get_priority(), + executor_.get_priority() == hpx::threads::thread_priority_high) + { + p.apply("guided then", hpx::launch::sync, executor_.get_priority(), executor_.get_stacksize(), threads::thread_schedule_hint( thread_schedule_hint_mode_numa, domain)); } - else { - p.apply( - hpx::launch::async, - executor_.get_priority(), + else + { + p.apply("guided then", hpx::launch::async, executor_.get_priority(), executor_.get_stacksize(), threads::thread_schedule_hint( thread_schedule_hint_mode_numa, domain)); @@ -440,18 +436,17 @@ namespace hpx { namespace threads { namespace executors ); if (hp_sync_ && - pool_executor_.get_priority() == hpx::threads::thread_priority_high) { - p.apply( - hpx::launch::sync, - pool_executor_.get_priority(), + pool_executor_.get_priority() == + hpx::threads::thread_priority_high) + { + p.apply("guided async", hpx::launch::sync, pool_executor_.get_priority(), pool_executor_.get_stacksize(), threads::thread_schedule_hint( thread_schedule_hint_mode_numa, domain)); } - else { - p.apply( - hpx::launch::async, - pool_executor_.get_priority(), + else + { + p.apply("guided async", hpx::launch::async, pool_executor_.get_priority(), pool_executor_.get_stacksize(), threads::thread_schedule_hint( thread_schedule_hint_mode_numa, domain)); @@ -503,19 +498,18 @@ namespace hpx { namespace threads { namespace executors future::type> async_execute(F && f, Ts &&... ts) { - if (guided_) return guided_exec_.async_execute( - std::forward(f), std::forward(ts)...); - else { - typedef typename util::detail::invoke_deferred_result::type - result_type; - - lcos::local::futures_factory p( - pool_exec_, - util::deferred_call(std::forward(f), std::forward(ts)...) - ); - p.apply( - hpx::launch::async, - pool_exec_.get_priority(), + if (guided_) + return guided_exec_.async_execute( + std::forward(f), std::forward(ts)...); + else + { + typedef typename util::detail::invoke_deferred_result::type result_type; + + lcos::local::futures_factory p(pool_exec_, + util::deferred_call( + std::forward(f), std::forward(ts)...)); + p.apply("guided async", hpx::launch::async, pool_exec_.get_priority(), pool_exec_.get_stacksize(), threads::thread_schedule_hint() ); diff --git a/libs/execution/include/hpx/parallel/executors/thread_execution.hpp b/libs/execution/include/hpx/parallel/executors/thread_execution.hpp index c910d4682028..c82e9af644c1 100644 --- a/libs/execution/include/hpx/parallel/executors/thread_execution.hpp +++ b/libs/execution/include/hpx/parallel/executors/thread_execution.hpp @@ -54,7 +54,7 @@ namespace hpx { namespace threads { lcos::local::futures_factory p( std::forward(exec), util::deferred_call(std::forward(f), std::forward(ts)...)); - p.apply(); + p.apply(hpx::traits::get_function_annotation::type>::call(f)); return p.get_future(); } @@ -105,7 +105,8 @@ namespace hpx { namespace threads { { exec.add( util::deferred_call(std::forward(f), std::forward(ts)...), - "hpx::parallel::execution::post", threads::pending, true, + hpx::traits::get_function_annotation::type>::call(f), + threads::pending, true, exec.get_stacksize(), threads::thread_schedule_hint(), throws); } /////////////////////////////////////////////////////////////////////////// @@ -115,11 +116,12 @@ namespace hpx { namespace threads { hpx::traits::is_threads_executor::value && std::is_same::type, hpx::threads::thread_schedule_hint>::value>::type - post(Executor&& exec, F&& f, Hint&& hint, Ts&&... ts) + post(Executor&& exec, F&& f, Hint&& hint, const char *annotation, Ts&&... ts) { exec.add( util::deferred_call(std::forward(f), std::forward(ts)...), - "hpx::parallel::execution::post", threads::pending, true, + annotation, + threads::pending, true, exec.get_stacksize(), std::forward(hint), throws); } diff --git a/libs/resource_partitioner/examples/async_customization.cpp b/libs/resource_partitioner/examples/async_customization.cpp index 5bac8a84f017..eb2eb06d923b 100644 --- a/libs/resource_partitioner/examples/async_customization.cpp +++ b/libs/resource_partitioner/examples/async_customization.cpp @@ -114,7 +114,7 @@ struct test_async_executor lcos::local::futures_factory p(executor_, util::deferred_call(std::forward(f), std::forward(ts)...)); - p.apply(launch::async, threads::thread_priority_default, + p.apply("custom", launch::async, threads::thread_priority_default, threads::thread_stacksize_default); return p.get_future(); @@ -152,7 +152,7 @@ struct test_async_executor util::deferred_call(std::forward(f), std::forward(predecessor), std::forward(ts)...)); - p.apply(launch::async, threads::thread_priority_default, + p.apply("custom then", launch::async, threads::thread_priority_default, threads::thread_stacksize_default); return p.get_future(); @@ -212,7 +212,7 @@ struct test_async_executor predecessor), std::forward(ts)...)); - p.apply(launch::async, threads::thread_priority_default, + p.apply("custom then", launch::async, threads::thread_priority_default, threads::thread_stacksize_default); return p.get_future(); @@ -260,7 +260,7 @@ struct test_async_executor util::deferred_call(std::forward(f), std::forward>(predecessor))); - p.apply(launch::async, threads::thread_priority_default, + p.apply("custom async", launch::async, threads::thread_priority_default, threads::thread_stacksize_default); return p.get_future(); diff --git a/src/lcos/detail/future_data.cpp b/src/lcos/detail/future_data.cpp index 2bcb527dd042..bd2bf9eef299 100644 --- a/src/lcos/detail/future_data.cpp +++ b/src/lcos/detail/future_data.cpp @@ -59,7 +59,7 @@ namespace hpx { namespace lcos { namespace detail policy = launch::async; // launch a new thread executing the given function - threads::thread_id_type tid = p.apply( + threads::thread_id_type tid = p.apply("run_on_completed_on_new_thread", policy, threads::thread_priority_boost, threads::thread_stacksize_current, threads::thread_schedule_hint()); diff --git a/src/util/backtrace/backtrace.cpp b/src/util/backtrace/backtrace.cpp index db787b1110f5..bf54d6e33f94 100644 --- a/src/util/backtrace/backtrace.cpp +++ b/src/util/backtrace/backtrace.cpp @@ -414,7 +414,7 @@ namespace hpx { namespace util { stack_trace::get_symbols, &frames_.front(), frames_.size())); error_code ec(lightweight); - threads::thread_id_type tid = p.apply( + threads::thread_id_type tid = p.apply("backtrace", launch::fork, threads::thread_priority_default, threads::thread_stacksize_medium, threads::thread_schedule_hint(),