Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding flags to scheduler allowing to control thread stealing and idle back-off #3768

Merged
merged 8 commits into from Apr 8, 2019
@@ -600,8 +600,8 @@ if(HPX_WITH_THREAD_CUMULATIVE_COUNTS)
endif()

hpx_option(HPX_WITH_THREAD_STEALING_COUNTS BOOL
"Enable keeping track of counts of thread stealing incidents in the schedulers (default: ON)"
ON CATEGORY "Thread Manager" ADVANCED)
"Enable keeping track of counts of thread stealing incidents in the schedulers (default: OFF)"
OFF CATEGORY "Thread Manager" ADVANCED)

if(HPX_WITH_THREAD_STEALING_COUNTS)
hpx_add_config_define(HPX_HAVE_THREAD_STEALING_COUNTS)
@@ -1383,14 +1383,18 @@ if(HPX_WITH_COMPILER_WARNINGS)
hpx_add_compile_flag(-W3 LANGUAGES C CXX)
# According to the ifort Windows manual, W3 isn't supported
hpx_add_compile_flag(-W1 LANGUAGES Fortran)
# Boost.Lockfree triggers 'warning C4307: '+' : integral constant overflow'
# which is benign
hpx_add_compile_flag(-wd4307)

# MSVC2012/2013 are overeager to report 'qualifier applied to function type
# has no meaning; ignored'
hpx_add_compile_flag(-wd4180)

# Boost.Lockfree triggers 'warning C4307: '+' : integral constant overflow'
# which is benign
hpx_add_compile_flag(-wd4307)

# object allocated on the heap may not be aligned
hpx_add_compile_flag(-wd4316)

# max symbol length exceeded
hpx_add_compile_flag(-wd4503)

@@ -1435,6 +1439,12 @@ if(HPX_WITH_COMPILER_WARNINGS)
hpx_add_compile_flag_if_available(-Wformat=2)
hpx_add_compile_flag_if_available(-Wno-format-nonliteral)

# enable C++17 style alignment on allocations for pre=C++17
hpx_add_compile_flag_if_available(-faligned-new)

# prevent warnings about over-aligned types for pre-C++17
hpx_add_compile_flag_if_available(-Wno-aligned-new)

# Self initialization is dangerous
hpx_add_compile_flag_if_available(-Winit-self)

@@ -10,6 +10,7 @@
#include <hpx/include/components.hpp>
#include <hpx/include/actions.hpp>
#include <hpx/include/threads.hpp>
#include <hpx/include/util.hpp>

#include <atomic>

@@ -223,7 +223,10 @@ namespace example {
}

bool numa_sensitive() const override { return true; }
virtual bool has_thread_stealing() const override { return true; }
virtual bool has_thread_stealing(std::size_t) const override
{
return true;
}

static std::string get_scheduler_name()
{
@@ -837,9 +840,8 @@ namespace example {
}

/// Return the next thread to be executed, return false if none available
virtual bool get_next_thread(std::size_t thread_num,
bool running, std::int64_t& idle_loop_count,
threads::thread_data*& thrd) override
virtual bool get_next_thread(std::size_t thread_num, bool running,
threads::thread_data*& thrd, bool /*enable_stealing*/) override
{
// LOG_CUSTOM_MSG("get_next_thread " << " queue "
// << decnumber(thread_num));
@@ -1288,12 +1290,14 @@ namespace example {
/// manager to allow for maintenance tasks to be executed in the
/// scheduler. Returns true if the OS thread calling this function
/// has to be terminated (i.e. no more work has to be done).
virtual bool wait_or_add_new(std::size_t thread_num,
bool running, std::int64_t& idle_loop_count) override
virtual bool wait_or_add_new(std::size_t thread_num, bool running,
std::int64_t& idle_loop_count, bool /*enable_stealing*/,
std::size_t& added) override
{
std::size_t added = 0;
bool result = true;

added = 0;

if (thread_num == std::size_t(-1)) {
HPX_THROW_EXCEPTION(bad_parameter,
"shared_priority_queue_scheduler_example::wait_or_add_new",
@@ -1310,46 +1314,45 @@ namespace example {
// set the preferred queue for this domain, if applicable
std::size_t q_index = q_lookup_[thread_num];
// get next task, steal if from another domain
result = hp_queues_[dom].wait_or_add_new(q_index, running,
idle_loop_count, added);
result =
hp_queues_[dom].wait_or_add_new(q_index, running, added) &&
result;
if (0 != added) return result;
}

// try a normal priority task
if (!result) {
for (std::size_t d=0; d<num_domains_; ++d) {
std::size_t dom = (domain_num+d) % num_domains_;
// set the preferred queue for this domain, if applicable
std::size_t q_index = q_lookup_[thread_num];
// get next task, steal if from another domain
result = np_queues_[dom].wait_or_add_new(q_index, running,
idle_loop_count, added);
if (0 != added) return result;
}
for (std::size_t d=0; d<num_domains_; ++d) {
std::size_t dom = (domain_num+d) % num_domains_;
// set the preferred queue for this domain, if applicable
std::size_t q_index = q_lookup_[thread_num];
// get next task, steal if from another domain
result =
np_queues_[dom].wait_or_add_new(q_index, running, added) &&
result;
if (0 != added) return result;
}

// low priority task
if (!result) {
#ifdef JB_LP_STEALING
for (std::size_t d=domain_num; d<domain_num+num_domains_; ++d) {
std::size_t dom = d % num_domains_;
// set the preferred queue for this domain, if applicable
std::size_t q_index = (dom==domain_num) ?
q_lookup_[thread_num] :
lp_lookup_[(counters_[dom]++ %
lp_queues_[dom].num_cores)];
for (std::size_t d=domain_num; d<domain_num+num_domains_; ++d) {
std::size_t dom = d % num_domains_;
// set the preferred queue for this domain, if applicable
std::size_t q_index = (dom==domain_num) ?
q_lookup_[thread_num] :
lp_lookup_[(counters_[dom]++ %
lp_queues_[dom].num_cores)];

result = lp_queues_[dom].wait_or_add_new(q_index, running,
idle_loop_count, added);
if (0 != added) return result;
}
#else
// no cross domain stealing for LP queues
result = lp_queues_[domain_num].wait_or_add_new(0, running,
idle_loop_count, added);
result = lp_queues_[dom].wait_or_add_new(
q_index, running, added);
if (0 != added) return result;
#endif
}
#else
// no cross domain stealing for LP queues
result =
lp_queues_[domain_num].wait_or_add_new(0, running, added) &&
result;
if (0 != added) return result;
#endif

return result;
}
@@ -1398,7 +1401,8 @@ namespace example {

// create queue sets for each numa domain
for (std::size_t i=0; i<num_domains_; ++i) {
int queues = (std::max)(q_counts_[i] / cores_per_queue_.high_priority,
std::size_t queues = (std::max)(
q_counts_[i] / cores_per_queue_.high_priority,
std::size_t(1));
hp_queues_[i].init(
q_counts_[i], queues, max_queue_thread_count_);
@@ -34,6 +34,7 @@
#include <hpx/util/thread_aware_timer.hpp>
#include <hpx/util/unique_function.hpp>
#include <hpx/util/unwrap.hpp>
#include <hpx/util/yield_while.hpp>
#include <hpx/util/zip_iterator.hpp>

#endif
@@ -46,9 +46,11 @@ namespace hpx { namespace lcos { namespace local
/// releasing all waiting threads as soon as the last \a thread
/// entered this function.
void wait();

/// The function \a count_up will increase the number of \a threads
/// to be waited in \a wait function.
void count_up();

/// The function \a reset will reset the number of \a threads
/// as given by the function parameter \a number_of_threads.
/// the newer coming \a threads executing the function
@@ -59,7 +61,7 @@ namespace hpx { namespace lcos { namespace local
/// The function \a reset can be executed while previous \a threads
/// executing waiting after they have been waken up.
/// Thus \a total_ can not be reset to \a barrier_flag which
/// will break the comparision condition under the function \a wait.
/// will break the comparison condition under the function \a wait.
void reset(std::size_t number_of_threads);

private:
@@ -14,6 +14,7 @@
#include <hpx/lcos/local/spinlock.hpp>
#include <hpx/runtime/threads/thread_enums.hpp>
#include <hpx/util/assert_owns_lock.hpp>
#include <hpx/util/cache_aligned_data.hpp>
#include <hpx/util/register_locks.hpp>
#include <hpx/util/steady_clock.hpp>
#include <hpx/util/unlock_guard.hpp>
@@ -37,34 +38,34 @@ namespace hpx { namespace lcos { namespace local
public:
void notify_one(error_code& ec = throws)
{
std::unique_lock<mutex_type> l(mtx_);
cond_.notify_one(std::move(l), ec);
std::unique_lock<mutex_type> l(mtx_.data_);
cond_.data_.notify_one(std::move(l), ec);
}

void notify_all(error_code& ec = throws)
{
std::unique_lock<mutex_type> l(mtx_);
cond_.notify_all(std::move(l), ec);
std::unique_lock<mutex_type> l(mtx_.data_);
cond_.data_.notify_all(std::move(l), ec);
}

void wait(std::unique_lock<mutex>& lock, error_code& ec = throws)
{
HPX_ASSERT_OWNS_LOCK(lock);
util::ignore_while_checking<std::unique_lock<mutex> > il(&lock);
std::unique_lock<mutex_type> l(mtx_);
std::unique_lock<mutex_type> l(mtx_.data_);
util::unlock_guard<std::unique_lock<mutex> > unlock(lock);
//The following ensures that the inner lock will be unlocked
//before the outer to avoid deadlock (fixes issue #3608)
std::lock_guard<std::unique_lock<mutex_type> > unlock_next(
l, std::adopt_lock);

cond_.wait(l, ec);
cond_.data_.wait(l, ec);

// We need to ignore our internal mutex for the user provided lock
// being able to be reacquired without a lock held during suspension
// error. We can't use RAII here since the guard object would get
// destructed before the unlock_guard.
hpx::util::ignore_lock(&mtx_);
hpx::util::ignore_lock(&mtx_.data_);
}

template <class Predicate>
@@ -86,21 +87,21 @@ namespace hpx { namespace lcos { namespace local
HPX_ASSERT_OWNS_LOCK(lock);

util::ignore_while_checking<std::unique_lock<mutex> > il(&lock);
std::unique_lock<mutex_type> l(mtx_);
std::unique_lock<mutex_type> l(mtx_.data_);
util::unlock_guard<std::unique_lock<mutex> > unlock(lock);
//The following ensures that the inner lock will be unlocked
//before the outer to avoid deadlock (fixes issue #3608)
std::lock_guard<std::unique_lock<mutex_type> > unlock_next(
l, std::adopt_lock);

threads::thread_state_ex_enum const reason =
cond_.wait_until(l, abs_time, ec);
cond_.data_.wait_until(l, abs_time, ec);

// We need to ignore our internal mutex for the user provided lock
// being able to be reacquired without a lock held during suspension
// error. We can't use RAII here since the guard object would get
// destructed before the unlock_guard.
hpx::util::ignore_lock(&mtx_);
hpx::util::ignore_lock(&mtx_.data_);

if (ec) return cv_status::error;

@@ -140,8 +141,8 @@ namespace hpx { namespace lcos { namespace local
}

private:
mutable mutex_type mtx_;
detail::condition_variable cond_;
mutable util::cache_line_data<mutex_type> mtx_;
util::cache_line_data<detail::condition_variable> cond_;
};

class condition_variable_any
@@ -152,14 +153,14 @@ namespace hpx { namespace lcos { namespace local
public:
void notify_one(error_code& ec = throws)
{
std::unique_lock<mutex_type> l(mtx_);
cond_.notify_one(std::move(l), ec);
std::unique_lock<mutex_type> l(mtx_.data_);
cond_.data_.notify_one(std::move(l), ec);
}

void notify_all(error_code& ec = throws)
{
std::unique_lock<mutex_type> l(mtx_);
cond_.notify_all(std::move(l), ec);
std::unique_lock<mutex_type> l(mtx_.data_);
cond_.data_.notify_all(std::move(l), ec);
}

template <class Lock>
@@ -168,20 +169,20 @@ namespace hpx { namespace lcos { namespace local
HPX_ASSERT_OWNS_LOCK(lock);

util::ignore_all_while_checking ignore_lock;
std::unique_lock<mutex_type> l(mtx_);
std::unique_lock<mutex_type> l(mtx_.data_);
util::unlock_guard<Lock> unlock(lock);
//The following ensures that the inner lock will be unlocked
//before the outer to avoid deadlock (fixes issue #3608)
std::lock_guard<std::unique_lock<mutex_type> > unlock_next(
l, std::adopt_lock);

cond_.wait(l, ec);
cond_.data_.wait(l, ec);

// We need to ignore our internal mutex for the user provided lock
// being able to be reacquired without a lock held during suspension
// error. We can't use RAII here since the guard object would get
// destructed before the unlock_guard.
hpx::util::ignore_lock(&mtx_);
hpx::util::ignore_lock(&mtx_.data_);
}

template <class Lock, class Predicate>
@@ -203,21 +204,21 @@ namespace hpx { namespace lcos { namespace local
HPX_ASSERT_OWNS_LOCK(lock);

util::ignore_all_while_checking ignore_lock;
std::unique_lock<mutex_type> l(mtx_);
std::unique_lock<mutex_type> l(mtx_.data_);
util::unlock_guard<Lock> unlock(lock);
//The following ensures that the inner lock will be unlocked
//before the outer to avoid deadlock (fixes issue #3608)
std::lock_guard<std::unique_lock<mutex_type> > unlock_next(
l, std::adopt_lock);

threads::thread_state_ex_enum const reason =
cond_.wait_until(l, abs_time, ec);
cond_.data_.wait_until(l, abs_time, ec);

// We need to ignore our internal mutex for the user provided lock
// being able to be reacquired without a lock held during suspension
// error. We can't use RAII here since the guard object would get
// destructed before the unlock_guard.
hpx::util::ignore_lock(&mtx_);
hpx::util::ignore_lock(&mtx_.data_);

if (ec) return cv_status::error;

@@ -256,8 +257,8 @@ namespace hpx { namespace lcos { namespace local
}

private:
mutable mutex_type mtx_;
detail::condition_variable cond_;
mutable util::cache_line_data<mutex_type> mtx_;
util::cache_line_data<detail::condition_variable> cond_;
};
}}}

ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.