Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding component base class which ties a component instance to a given executor #2494

Merged
merged 1 commit into from Feb 12, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
44 changes: 12 additions & 32 deletions examples/quickstart/component_with_executor.cpp
@@ -1,4 +1,4 @@
// Copyright (c) 2014 Hartmut Kaiser
// Copyright (c) 2014-2017 Hartmut Kaiser
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
Expand All @@ -7,17 +7,25 @@
#include <hpx/include/components.hpp>
#include <hpx/include/actions.hpp>
#include <hpx/include/iostreams.hpp>
#include <hpx/include/thread_executors.hpp>
#include <hpx/include/parallel_executors.hpp>

#include <utility>

///////////////////////////////////////////////////////////////////////////////
// Define a base component which exposes the required interface
struct hello_world_server
: hpx::components::component_base<hello_world_server>
: hpx::components::executor_component<
hpx::parallel::local_priority_queue_executor,
hpx::components::component_base<hello_world_server> >
{
typedef hpx::parallel::local_priority_queue_executor executor_type;
typedef hpx::components::executor_component<
executor_type, hpx::components::component_base<hello_world_server>
> base_type;

// run on all available cores
hello_world_server()
: sched_(hpx::get_num_worker_threads()) // run on all available cores
: base_type(executor_type(hpx::get_num_worker_threads()))
{}

void print() const
Expand All @@ -26,34 +34,6 @@ struct hello_world_server
}

HPX_DEFINE_COMPONENT_ACTION(hello_world_server, print, print_action);

///////////////////////////////////////////////////////////////////////////
// wrap given function into a nullary function as expected by the executor
static void func(hpx::threads::thread_function_type f)
{
f(hpx::threads::wait_signaled);
}

/// This is the default hook implementation for schedule_thread which
/// forwards to the default scheduler.
static void schedule_thread(hpx::naming::address::address_type lva,
hpx::threads::thread_init_data& data,
hpx::threads::thread_state_enum initial_state)
{
hpx::util::thread_description desc(&hello_world_server::func);
#ifdef HPX_HAVE_THREAD_DESCRIPTION
desc = data.description;
#endif

hpx::get_lva<hello_world_server>::call(lva)->sched_.add(
hpx::util::bind(
hpx::util::one_shot(&hello_world_server::func),
std::move(data.func)),
desc, initial_state);
}

private:
hpx::threads::executors::local_priority_queue_executor sched_;
};

typedef hpx::components::component<hello_world_server> server_type;
Expand Down
1 change: 1 addition & 0 deletions hpx/include/components.hpp
Expand Up @@ -41,6 +41,7 @@
#include <hpx/runtime/components/server/simple_component_base.hpp>

#include <hpx/runtime/components/server/locking_hook.hpp>
#include <hpx/runtime/components/server/executor_component.hpp>
#include <hpx/runtime/components/server/migration_support.hpp>

#include <hpx/runtime/components/copy_component.hpp>
Expand Down
98 changes: 98 additions & 0 deletions hpx/runtime/components/server/executor_component.hpp
@@ -0,0 +1,98 @@
// Copyright (c) 2017 Hartmut Kaiser
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

#if !defined(HPX_RUNTIME_COMPONENTS_SERVER_EXECUTOR_COMPONENT_FEB_09_2017_0839PM)
#define HPX_RUNTIME_COMPONENTS_SERVER_EXECUTOR_COMPONENT_FEB_09_2017_0839PM

#include <hpx/config.hpp>
#include <hpx/parallel/executors/executor_traits.hpp>
#include <hpx/runtime/get_lva.hpp>
#include <hpx/runtime/naming/address.hpp>
#include <hpx/runtime/threads/executors.hpp>
#include <hpx/runtime/threads/thread_data_fwd.hpp>
#include <hpx/runtime/threads/thread_enums.hpp>
#include <hpx/traits/is_launch_policy.hpp>
#include <hpx/util/annotated_function.hpp>
#include <hpx/util/deferred_call.hpp>
#include <hpx/util/thread_description.hpp>

#include <type_traits>
#include <utility>

namespace hpx { namespace components
{
// This is a base class which allows to associate the execution of all
// actions for a particular component instance with a given executor.
template <typename Executor, typename BaseComponent>
struct executor_component : BaseComponent
{
private:
typedef BaseComponent base_type;
typedef Executor executor_type;
typedef typename base_type::this_component_type this_component_type;

public:
template <typename ... Arg>
executor_component(executor_type const& exec, Arg &&... arg)
: base_type(std::forward<Arg>(arg)...),
exec_(exec)
{}

///////////////////////////////////////////////////////////////////////
// wrap given function into a nullary function as expected by the
// executor
static void execute(hpx::threads::thread_function_type const& f)
{
f(hpx::threads::wait_signaled);
}

/// This is the default hook implementation for schedule_thread which
/// forwards to the executor instance associated with this component.
template <typename Executor_ = Executor>
static typename std::enable_if<
traits::is_threads_executor<Executor_>::value
>::type
schedule_thread(hpx::naming::address::address_type lva,
hpx::threads::thread_init_data& data,
hpx::threads::thread_state_enum initial_state)
{
hpx::util::thread_description desc(&executor_component::execute);
#ifdef HPX_HAVE_THREAD_DESCRIPTION
desc = data.description;
#endif
hpx::get_lva<executor_component>::call(lva)->exec_.add(
hpx::util::deferred_call(&executor_component::execute,
std::move(data.func)),
desc, initial_state);
}

template <typename Executor_ = Executor>
static typename std::enable_if<
!traits::is_threads_executor<Executor_>::value
>::type
schedule_thread(hpx::naming::address::address_type lva,
hpx::threads::thread_init_data& data,
hpx::threads::thread_state_enum initial_state)
{
hpx::util::thread_description desc(&executor_component::execute);
#ifdef HPX_HAVE_THREAD_DESCRIPTION
desc = data.description;
#endif
hpx::parallel::executor_traits<executor_type>::async_execute(
hpx::get_lva<executor_component>::call(lva)->exec_,
hpx::util::deferred_call(
hpx::util::annotated_function(
&executor_component::execute, desc.get_description()
),
std::move(data.func)));
}

protected:
executor_type exec_;
};
}}

#endif