Skip to content

Commit

Permalink
Inline execution of scoped tasks, if possible
Browse files Browse the repository at this point in the history
- flyby: adding stack size detection on Windows
- flyby: fixing scheduler hint pass through to various functions
- flyby: specialize cache_line_data for items that don't require padding
- flyby: fixing HPX_WITH_SWAP_CONTEXT_EMULATION on Windows
  • Loading branch information
hkaiser committed Dec 3, 2019
1 parent a759ebe commit 516db64
Show file tree
Hide file tree
Showing 27 changed files with 928 additions and 116 deletions.
2 changes: 0 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1251,8 +1251,6 @@ endif()

if(WIN32)
if(MSVC)
enable_language(ASM_MASM)

hpx_add_target_compile_option(-Ox PUBLIC CONFIGURATIONS Release)

# even VS2017 has an ICE when compiling with -Ob2
Expand Down
37 changes: 22 additions & 15 deletions examples/quickstart/fibonacci_local.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,12 @@
#include <hpx/include/async.hpp>
#include <hpx/include/util.hpp>

#include <hpx/parallel/executors/scoped_parallel_executor.hpp>

#include <cstdint>
#include <iostream>
#include <string>
#include <vector>

///////////////////////////////////////////////////////////////////////////////
//[fibonacci
Expand All @@ -25,21 +29,20 @@ std::uint64_t fibonacci(std::uint64_t n)
if (n < 2)
return n;

// Invoking the Fibonacci algorithm twice is inefficient.
// However, we intentionally demonstrate it this way to create some
// heavy workload.

hpx::future<std::uint64_t> n1 = hpx::async(fibonacci, n - 1);
hpx::future<std::uint64_t> n2 = hpx::async(fibonacci, n - 2);
hpx::parallel::execution::scoped_parallel_executor exec;
hpx::future<std::uint64_t> n1 = hpx::async(exec, fibonacci, n - 1);
std::uint64_t n2 = fibonacci(n - 2);

return n1.get() + n2.get(); // wait for the Futures to return their values
return n1.get() + n2; // wait for the Future to return their values
}
//fibonacci]

///////////////////////////////////////////////////////////////////////////////
//[hpx_main
int hpx_main(hpx::program_options::variables_map& vm)
{
hpx::threads::add_scheduler_mode(hpx::threads::policies::fast_idle_mode);

// extract command line argument, i.e. fib(N)
std::uint64_t n = vm["n-value"].as<std::uint64_t>();

Expand All @@ -53,7 +56,7 @@ int hpx_main(hpx::program_options::variables_map& vm)
hpx::util::format_to(std::cout, fmt, n, r, t.elapsed());
}

return hpx::finalize(); // Handles HPX shutdown
return hpx::finalize(); // Handles HPX shutdown
}
//hpx_main]

Expand All @@ -62,16 +65,20 @@ int hpx_main(hpx::program_options::variables_map& vm)
int main(int argc, char* argv[])
{
// Configure application-specific options
hpx::program_options::options_description
desc_commandline("Usage: " HPX_APPLICATION_STRING " [options]");
hpx::program_options::options_description desc_commandline(
"Usage: " HPX_APPLICATION_STRING " [options]");

// clang-format off
desc_commandline.add_options()
( "n-value",
hpx::program_options::value<std::uint64_t>()->default_value(10),
"n value for the Fibonacci function")
;
("n-value",
hpx::program_options::value<std::uint64_t>()->default_value(10),
"n value for the Fibonacci function");
// clang-format on

// use LIFO scheduler
std::vector<std::string> cfg = {"--hpx:queuing=local-priority-lifo"};

// Initialize and run HPX
return hpx::init(desc_commandline, argc, argv);
return hpx::init(desc_commandline, argc, argv, cfg);
}
//main]
4 changes: 2 additions & 2 deletions hpx/async.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ namespace hpx { namespace detail
call(F && f, Ts &&... ts)
{
parallel::execution::parallel_executor exec;
return parallel::execution::async_execute(
exec, std::forward<F>(f), std::forward<Ts>(ts)...);
return exec.async_execute(
std::forward<F>(f), std::forward<Ts>(ts)...);
}
};

Expand Down
14 changes: 13 additions & 1 deletion hpx/async_launch_policy_dispatch.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,24 @@ namespace hpx { namespace detail
// make sure this thread is executed last
// yield_to
hpx::this_thread::suspend(threads::pending, tid,
"async_launch_policy_dispatch<fork>");
"async_launch_policy_dispatch<launch>");
}
}
return p.get_future();
}

template <typename F, typename... Ts>
HPX_FORCEINLINE static typename std::enable_if<
traits::detail::is_deferred_invocable<F, Ts...>::value,
hpx::future<typename util::detail::invoke_deferred_result<F,
Ts...>::type>>::type
call(launch policy, threads::thread_schedule_hint hint, F&& f,
Ts&&... ts)
{
return call(policy, threads::detail::get_self_or_default_pool(),
hint, std::forward<F>(f), std::forward<Ts>(ts)...);
}

template <typename F, typename... Ts>
HPX_FORCEINLINE static typename std::enable_if<
traits::detail::is_deferred_invocable<F, Ts...>::value,
Expand Down
2 changes: 1 addition & 1 deletion hpx/lcos/detail/future_data.hpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2007-2017 Hartmut Kaiser
// Copyright (c) 2007-2019 Hartmut Kaiser
//
// SPDX-License-Identifier: BSL-1.0
// Distributed under the Boost Software License, Version 1.0. (See accompanying
Expand Down
128 changes: 88 additions & 40 deletions hpx/lcos/local/futures_factory.hpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2007-2018 Hartmut Kaiser
// Copyright (c) 2007-2019 Hartmut Kaiser
//
// SPDX-License-Identifier: BSL-1.0
// Distributed under the Boost Software License, Version 1.0. (See accompanying
Expand All @@ -7,6 +7,7 @@
#ifndef HPX_LCOS_LOCAL_FUTURES_FACTORY_HPP
#define HPX_LCOS_LOCAL_FUTURES_FACTORY_HPP

#include <hpx/assertion.hpp>
#include <hpx/config.hpp>
#include <hpx/allocator_support/allocator_deleter.hpp>
#include <hpx/allocator_support/internal_allocator.hpp>
Expand All @@ -18,6 +19,7 @@
#include <hpx/memory/intrusive_ptr.hpp>
#include <hpx/runtime/get_worker_thread_num.hpp>
#include <hpx/runtime/launch_policy.hpp>
#include <hpx/runtime/threads/detail/execute_thread.hpp>
#include <hpx/runtime/threads/thread_data_fwd.hpp>
#include <hpx/runtime/threads/thread_helpers.hpp>
#include <hpx/traits/future_access.hpp>
Expand All @@ -33,6 +35,7 @@
#include <utility>

namespace hpx { namespace lcos { namespace local {

///////////////////////////////////////////////////////////////////////
namespace detail {
template <typename Result, typename F, typename Executor,
Expand All @@ -47,26 +50,31 @@ namespace hpx { namespace lcos { namespace local {
typedef typename Base::init_no_addref init_no_addref;

F f_;
bool runs_as_child_;

task_object(F const& f)
: f_(f)
, runs_as_child_(false)
{
}

task_object(F&& f)
: f_(std::move(f))
, runs_as_child_(false)
{
}

task_object(init_no_addref no_addref, F const& f)
: base_type(no_addref)
, f_(f)
, runs_as_child_(false)
{
}

task_object(init_no_addref no_addref, F&& f)
: base_type(no_addref)
, f_(std::move(f))
, runs_as_child_(false)
{
}

Expand All @@ -75,6 +83,40 @@ namespace hpx { namespace lcos { namespace local {
return do_run_impl(std::is_void<Result>());
}

// overload get_result_void to be able to run child tasks, if needed
util::unused_type* get_result_void(error_code& ec = throws) override
{
if (runs_as_child_)
{
threads::thread_data* self = threads::get_self_id_data();
if (self->has_scoped_children())
{
auto state =
this->state_.load(std::memory_order_acquire);
while (state == this->empty)
{
// this thread would block on the future
threads::thread_data* child = self->pop_child();

// we should not run out of children
HPX_ASSERT(child != nullptr);

// execute the child inline
if (!threads::detail::execute_thread(child))
{
// the executed task did not run to completion
self->push_child(child);
}

// recheck our state
state =
this->state_.load(std::memory_order_acquire);
}
}
}
return this->base_type::get_result_void(ec);
}

private:
void do_run_impl(/*is_void=*/std::false_type)
{
Expand Down Expand Up @@ -116,15 +158,43 @@ namespace hpx { namespace lcos { namespace local {

if (policy == launch::fork)
{
return threads::register_thread_nullary(pool,
util::deferred_call(
&base_type::run_impl, std::move(this_)),
util::thread_description(f_, "task_object::apply"),
threads::pending_do_not_schedule, true,
threads::thread_priority_boost,
threads::thread_schedule_hint(
static_cast<std::int16_t>(get_worker_thread_num())),
stacksize, ec);
schedulehint.mode =
threads::thread_schedule_hint_mode_thread;
schedulehint.hint =
static_cast<std::int16_t>(get_worker_thread_num());
threads::thread_id_type id =
threads::register_thread_nullary(pool,
util::deferred_call(
&base_type::run_impl, std::move(this_)),
util::thread_description(f_, "task_object::apply"),
threads::pending_do_not_schedule, true,
threads::thread_priority_boost, schedulehint,
stacksize, ec);

if (schedulehint.runs_as_child)
{
runs_as_child_ = true;
threads::get_self_id_data()->push_child(
threads::get_thread_id_data(id));
}
return id;
}

if (schedulehint.runs_as_child)
{
threads::thread_id_type id =
threads::register_thread_nullary(pool,
util::deferred_call(
&base_type::run_impl, std::move(this_)),
util::thread_description(f_, "task_object::apply"),
threads::pending, true, priority, schedulehint,
stacksize, ec);

runs_as_child_ = true;
threads::get_self_id_data()->push_child(
threads::get_thread_id_data(id));

return id;
}

threads::register_thread_nullary(pool,
Expand Down Expand Up @@ -228,7 +298,6 @@ namespace hpx { namespace lcos { namespace local {

task_object(init_no_addref no_addref, F&& f)
: base_type(no_addref, std::move(f))
, exec_(nullptr)
{
}

Expand All @@ -252,43 +321,22 @@ namespace hpx { namespace lcos { namespace local {
threads::thread_schedule_hint schedulehint,
error_code& ec) override
{
this->check_started();

typedef typename Base::future_base_type future_base_type;
future_base_type this_(this);

if (exec_)
{
this->check_started();

typedef typename Base::future_base_type future_base_type;
future_base_type this_(this);

parallel::execution::post(*exec_,
util::deferred_call(
&base_type::run_impl, std::move(this_)),
schedulehint);
return threads::invalid_thread_id;
}
else if (policy == launch::fork)
{
return threads::register_thread_nullary(pool,
util::deferred_call(
&base_type::run_impl, std::move(this_)),
util::thread_description(
this->f_, "task_object::apply"),
threads::pending_do_not_schedule, true,
threads::thread_priority_boost,
threads::thread_schedule_hint(
static_cast<std::int16_t>(get_worker_thread_num())),
stacksize, ec);
}
else
{
threads::register_thread_nullary(pool,
util::deferred_call(
&base_type::run_impl, std::move(this_)),
util::thread_description(
this->f_, "task_object::apply"),
threads::pending, false, priority, schedulehint,
stacksize, ec);
return threads::invalid_thread_id;
}

return this->base_type::apply(
pool, policy, priority, stacksize, schedulehint, ec);
}
};

Expand Down
19 changes: 19 additions & 0 deletions hpx/runtime/threads/detail/execute_thread.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright (c) 2019 Hartmut Kaiser
//
// SPDX-License-Identifier: BSL-1.0
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

#if !defined(HPX_RUNTIME_THREADS_DETAIL_EXECUTE_THREAD_DEC_01_2019_0126PM)
#define HPX_RUNTIME_THREADS_DETAIL_EXECUTE_THREAD_DEC_01_2019_0126PM

#include <hpx/config.hpp>
#include <hpx/runtime/threads/thread_data.hpp>

namespace hpx { namespace threads { namespace detail {

HPX_API_EXPORT bool execute_thread(thread_data* thrd);

}}} // namespace hpx::threads::detail

#endif
10 changes: 8 additions & 2 deletions hpx/runtime/threads/detail/scheduling_loop.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -810,7 +810,13 @@ namespace hpx { namespace threads { namespace detail
#ifdef HPX_HAVE_THREAD_CUMULATIVE_COUNTS
++counters.executed_threads_;
#endif
scheduler.SchedulingPolicy::destroy_thread(thrd, busy_loop_count);
// if this thread has run as a child we don't destroy it
// here, the parent will do that
if (!thrd->runs_as_child())
{
scheduler.SchedulingPolicy::destroy_thread(
thrd, busy_loop_count);
}
}
}

Expand Down Expand Up @@ -894,7 +900,7 @@ namespace hpx { namespace threads { namespace detail
policies::fast_idle_mode))
{
// speed up idle suspend if no work was stolen
idle_loop_count -= params.max_idle_loop_count_ / 256;
idle_loop_count -= params.max_idle_loop_count_ / 1024;
added = std::size_t(-1);
}

Expand Down

0 comments on commit 516db64

Please sign in to comment.