Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Default pool rename #2926

Merged
merged 4 commits into from Oct 4, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 8 additions & 0 deletions hpx/runtime/resource/detail/partitioner.hpp
Expand Up @@ -174,6 +174,14 @@ namespace hpx { namespace resource { namespace detail
std::size_t expand_pool(std::string const& pool_name,
util::function_nonser<void(std::size_t)> const& add_pu);

void set_default_pool_name(const std::string &name) {
initial_thread_pools_[0].pool_name_ = name;
}

const std::string &get_default_pool_name() const {
return initial_thread_pools_[0].pool_name_;
}

private:
////////////////////////////////////////////////////////////////////////
void fill_topology_vectors();
Expand Down
5 changes: 5 additions & 0 deletions hpx/runtime/resource/partitioner.hpp
Expand Up @@ -184,6 +184,11 @@ namespace hpx { namespace resource
HPX_EXPORT void create_thread_pool(std::string const& name,
scheduler_function scheduler_creation);

// allow the default pool to be renamed to something else
HPX_EXPORT void set_default_pool_name(std::string const& name);

HPX_EXPORT const std::string & get_default_pool_name() const;

///////////////////////////////////////////////////////////////////////
// Functions to add processing units to thread pools via
// the pu/core/numa_domain API
Expand Down
22 changes: 14 additions & 8 deletions src/runtime/resource/detail/detail_partitioner.cpp
Expand Up @@ -367,7 +367,7 @@ namespace hpx { namespace resource { namespace detail
// exclusively if dynamic pools are enabled.
// Also, by default, the first PU is always exclusive
// (to avoid deadlocks).
add_resource(p, "default",
add_resource(p, get_default_pool_name(),
first || !(mode_ & mode_allow_dynamic_pools));
first = false;
}
Expand All @@ -378,11 +378,12 @@ namespace hpx { namespace resource { namespace detail
std::unique_lock<mutex_type> l(mtx_);

// @TODO allow empty pools
if (get_pool_data("default").num_threads_ == 0)
if (get_pool_data(get_default_pool_name()).num_threads_ == 0)
{
l.unlock();
throw_runtime_error("partitioner::setup_pools",
"Default pool has no threads assigned. Please rerun with "
"Default pool " + get_default_pool_name()
+ " has no threads assigned. Please rerun with "
"--hpx:threads=X and check the pool thread assignment");
}

Expand Down Expand Up @@ -553,9 +554,10 @@ namespace hpx { namespace resource { namespace detail

std::unique_lock<mutex_type> l(mtx_);

if (pool_name == "default")
if (pool_name==get_default_pool_name())
{
initial_thread_pools_[0] = detail::init_pool_data("default", sched);
initial_thread_pools_[0] = detail::init_pool_data(
get_default_pool_name(), sched);
return;
}

Expand Down Expand Up @@ -597,10 +599,10 @@ namespace hpx { namespace resource { namespace detail

std::unique_lock<mutex_type> l(mtx_);

if (pool_name == "default")
if (pool_name==get_default_pool_name())
{
initial_thread_pools_[0] = detail::init_pool_data(
"default", std::move(scheduler_creation));
get_default_pool_name(), std::move(scheduler_creation));
return;
}

Expand Down Expand Up @@ -1048,9 +1050,13 @@ namespace hpx { namespace resource { namespace detail
std::size_t partitioner::get_pool_index(
std::string const& pool_name) const
{
// the default pool is always index 0, it may be renamed
// but the user can always ask for "default"
if (pool_name == "default") {
return 0;
}
{
std::lock_guard<mutex_type> l(mtx_);

std::size_t num_pools = initial_thread_pools_.size();
for (std::size_t i = 0; i < num_pools; i++)
{
Expand Down
9 changes: 9 additions & 0 deletions src/runtime/resource/partitioner.cpp
Expand Up @@ -237,6 +237,15 @@ namespace hpx { namespace resource
partitioner_.create_thread_pool(name, scheduler_creation);
}

void partitioner::set_default_pool_name(std::string const& name)
{
partitioner_.set_default_pool_name(name);
}

const std::string & partitioner::get_default_pool_name() const {
return partitioner_.get_default_pool_name();
}

void partitioner::add_resource(pu const& p, std::string const& pool_name,
bool exclusive, std::size_t num_threads /*= 1*/)
{
Expand Down
40 changes: 6 additions & 34 deletions src/runtime/threads/threadmanager.cpp
Expand Up @@ -359,12 +359,12 @@ namespace hpx { namespace threads
// make sure the first thread-pool that gets instantiated is the default one
if (i == 0)
{
if (name != "default")
if (name != rp.get_default_pool_name())
{
throw std::invalid_argument("Trying to instantiate pool " +
name +
" as first thread pool, but first thread pool must be "
"named default");
"named " + rp.get_default_pool_name());
}
}

Expand Down Expand Up @@ -801,46 +801,18 @@ namespace hpx { namespace threads
return *pools_[0];
}

// threadmanager::scheduler_type& threadmanager::get_scheduler(
// std::string const& pool_name) const
// {
// // if the given pool_name is default, we don't need to look for it
// if (pool_name == "default")
// {
// return default_scheduler();
// }
//
// // don't start at begin() since the first one is the default, start one
// // further
// auto pool =
// std::find_if(
// ++pools_.begin(), pools_.end(),
// [&pool_name](pool_type const& itp) -> bool
// {
// return (itp->get_pool_name() == pool_name);
// });
//
// if (pool != pools_.end())
// {
// return pool->get_scheduler();
// }
//
// throw std::invalid_argument(
// "the resource partitioner does not own a thread pool named \""
// + pool_name + "\". \n");
// }

detail::thread_pool_base& threadmanager::get_pool(
std::string const& pool_name) const
{
// if the given pool_name is default, we don't need to look for it
if (pool_name == "default")
// we must always return pool 0
if (pool_name == "default" ||
pool_name == resource::get_partitioner().get_default_pool_name())
{
return default_pool();
}

// don't start at begin() since the first one is the default,
// start one further
// now check the other pools - no need to check pool 0 again, so ++begin
auto pool = std::find_if(
++pools_.begin(), pools_.end(),
[&pool_name](pool_type const& itp) -> bool
Expand Down
2 changes: 2 additions & 0 deletions tests/unit/resource/CMakeLists.txt
Expand Up @@ -4,11 +4,13 @@
# file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

set(tests
named_pool_executor
resource_partitioner
throttle
used_pus
)

set(named_pool_executor_PARAMETERS THREADS_PER_LOCALITY 4)
set(resource_partitioner_PARAMETERS THREADS_PER_LOCALITY 4)
set(throttle_PARAMETERS THREADS_PER_LOCALITY 4)
set(used_pus_PARAMETERS THREADS_PER_LOCALITY 4)
Expand Down
150 changes: 150 additions & 0 deletions tests/unit/resource/named_pool_executor.cpp
@@ -0,0 +1,150 @@
// Copyright (c) 2017 Hartmut Kaiser
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

// Simple test verifying basic resource partitioner
// pool and executor

#include <hpx/hpx_init.hpp>
#include <hpx/include/resource_partitioner.hpp>
#include <hpx/include/parallel_execution.hpp>
#include <hpx/runtime/threads/executors/pool_executor.hpp>
#include <hpx/include/threads.hpp>
#include <hpx/util/lightweight_test.hpp>

#include <cstddef>
#include <string>
#include <utility>
#include <vector>
#include <iostream>

const int max_threads = 4;

// dummy function we will call using async
void dummy_task(std::size_t n, std::string text)
{
for (std::size_t i(0); i < n; ++i)
{
std::cout << text << " iteration " << i << "\n";
}
}

int hpx_main(int argc, char* argv[])
{
HPX_TEST_EQ(std::size_t(4), hpx::resource::get_num_threads());
HPX_TEST_EQ(std::size_t(4), hpx::resource::get_num_thread_pools());
HPX_TEST_EQ(std::size_t(0), hpx::resource::get_pool_index("default"));
HPX_TEST_EQ(std::size_t(0), hpx::resource::get_pool_index("pool-0"));

for (int i=0; i<max_threads; ++i) {
std::string pool_name = "pool-"+std::to_string(i);
HPX_TEST_EQ(pool_name , hpx::resource::get_pool_name(i));
HPX_TEST_EQ(std::size_t(1), hpx::resource::get_num_threads(i));
}

// setup executors for different task priorities on the pools
// segfaults or exceptions in any of the following will cause
// the test to fail
hpx::threads::scheduled_executor exec_0_hp =
hpx::threads::executors::pool_executor("default",
hpx::threads::thread_priority_high);

hpx::threads::scheduled_executor exec_0 =
hpx::threads::executors::pool_executor("default",
hpx::threads::thread_priority_default);

std::vector<hpx::future<void>> lotsa_futures;

// use executors to schedule work on pools
lotsa_futures.push_back(
hpx::async(exec_0_hp, &dummy_task, 3, "HP default")
);

lotsa_futures.push_back(
hpx::async(exec_0, &dummy_task, 3, "Normal default")
);

std::vector<hpx::threads::scheduled_executor> execs;
std::vector<hpx::threads::scheduled_executor> execs_hp;
//
for (int i=0; i<max_threads; ++i) {
std::string pool_name = "pool-"+std::to_string(i);
execs.push_back(
hpx::threads::executors::pool_executor(pool_name,
hpx::threads::thread_priority_default)
);
execs_hp.push_back(
hpx::threads::executors::pool_executor(pool_name,
hpx::threads::thread_priority_high)
);
}

for (int i=0; i<max_threads; ++i) {
std::string pool_name = "pool-"+std::to_string(i);
lotsa_futures.push_back(
hpx::async(execs[i], &dummy_task, 3, pool_name + " normal")
);
lotsa_futures.push_back(
hpx::async(execs_hp[i], &dummy_task, 3, pool_name + " HP")
);
}

// check that the default executor still works
hpx::parallel::execution::default_executor large_stack_executor(
hpx::threads::thread_stacksize_large);

lotsa_futures.push_back(
hpx::async(large_stack_executor, &dummy_task, 3, "true default + large stack")
);

// just wait until everything is done
when_all(lotsa_futures).get();

return hpx::finalize();
}

// this test must be run with 4 threads
int main(int argc, char* argv[])
{
std::vector<std::string> cfg = {
"hpx.os_threads=" + std::to_string(max_threads)
};

// create the resource partitioner
hpx::resource::partitioner rp(argc, argv, std::move(cfg));

// before adding pools - set the default pool name to "pool-0"
rp.set_default_pool_name("pool-0");

// create N pools
for (int i=0; i<max_threads; i++) {
std::string pool_name = "pool-"+std::to_string(i);
rp.create_thread_pool(pool_name,
hpx::resource::scheduling_policy::local_priority_fifo);
}

// add one PU to each pool
int thread_count = 0;
for (const hpx::resource::numa_domain& d : rp.numa_domains())
{
for (const hpx::resource::core& c : d.cores())
{
for (const hpx::resource::pu& p : c.pus())
{
if (thread_count < max_threads)
{
std::string pool_name = "pool-" + std::to_string(thread_count);
std::cout << "Added pu " << thread_count
<< " to " << pool_name << "\n";
rp.add_resource(p, pool_name);
thread_count++;
}
}
}
}

// now run the test
HPX_TEST_EQ(hpx::init(), 0);
return hpx::util::report_errors();
}