Skip to content

Commit

Permalink
Merge pull request #2506 from STEllAR-GROUP/fix_background_handling
Browse files Browse the repository at this point in the history
Fixing background work invocations
  • Loading branch information
sithhell committed Feb 17, 2017
2 parents f3f4a41 + 2284dda commit 10aaacc
Show file tree
Hide file tree
Showing 3 changed files with 161 additions and 50 deletions.
186 changes: 142 additions & 44 deletions hpx/runtime/threads/detail/scheduling_loop.hpp
Expand Up @@ -28,6 +28,7 @@
#include <cstddef>
#include <cstdint>
#include <limits>
#include <memory>
#include <utility>

namespace hpx { namespace threads { namespace detail
Expand Down Expand Up @@ -287,6 +288,112 @@ namespace hpx { namespace threads { namespace detail
std::int64_t const max_busy_loop_count_;
};

template <typename SchedulingPolicy>
thread_id_type create_background_thread(SchedulingPolicy& scheduler,
scheduling_callbacks& callbacks, std::shared_ptr<bool>& background_running,
std::size_t num_thread, std::int64_t& idle_loop_count)
{
thread_id_type background_thread;
background_running.reset(new bool(true));
thread_init_data background_init(
[&, background_running](thread_state_ex_enum) -> thread_result_type
{
while(*background_running)
{
if (callbacks.background_())
{
// we only update the idle_loop_count if
// background_running is true. If it was false, this task
// was given back to the scheduler.
if (*background_running)
idle_loop_count = 0;
}
hpx::this_thread::suspend(hpx::threads::pending,
"background_work");
}
return std::make_pair(terminated, nullptr);
},
hpx::util::thread_description("background_work"),
0,
thread_priority_critical,
num_thread,
std::size_t(-1),
&scheduler);

scheduler.create_thread(background_init, &background_thread, suspended,
true, hpx::throws, num_thread);
HPX_ASSERT(background_thread);
// We can now set the state to pending
background_thread->set_state(pending);
return background_thread;
}

// This function tries to invoke the background work thread. It returns
// false when we need to give the background thread back to scheduler
// and create a new one that is supposed to be executed inside the
// scheduling_loop, true otherwise
template <typename SchedulingPolicy>
bool call_background_thread(thread_id_type& background_thread,
thread_data*& next_thrd, SchedulingPolicy& scheduler, std::size_t num_thread,
bool running)
{
if (HPX_UNLIKELY(background_thread))
{
thread_state state = background_thread->get_state();
thread_state_enum state_val = state.state();

if (HPX_LIKELY(pending == state_val))
{
{
// tries to set state to active (only if state is still
// the same as 'state')
detail::switch_status thrd_stat (background_thread.get(), state);
if (HPX_LIKELY(thrd_stat.is_valid() &&
thrd_stat.get_previous() == pending))
{
thrd_stat = (*background_thread)();

thread_data *next = thrd_stat.get_next_thread();
if (next != nullptr && next != background_thread.get())
{
if (next_thrd == nullptr)
{
next_thrd = next;
}
else
{
scheduler.SchedulingPolicy::schedule_thread(
next, num_thread);
}
}
}
thrd_stat.store_state(state);
state_val = state.state();

if (HPX_LIKELY(state_val == pending_boost))
{
background_thread->set_state(pending);
}
else if(terminated == state_val)
{
std::int64_t busy_count = 0;
scheduler.destroy_thread(background_thread.get(), busy_count);
background_thread.reset();
}
else if(suspended == state_val)
{
return false;
}
}
return true;
}
// This should never be reached ... we should only deal with pending
// here.
HPX_ASSERT(false);
}
return true;
}

template <typename SchedulingPolicy>
void scheduling_loop(std::size_t num_thread, SchedulingPolicy& scheduler,
scheduling_counters& counters, scheduling_callbacks& params)
Expand Down Expand Up @@ -314,27 +421,15 @@ namespace hpx { namespace threads { namespace detail
thread_data* thrd = nullptr;
thread_data* next_thrd = nullptr;

thread_init_data background_init(
[&](thread_state_ex_enum) -> thread_result_type
{
while(true)
{
if (params.background_())
idle_loop_count = 0;
hpx::this_thread::suspend(hpx::threads::pending,
"background_work");
}
return std::make_pair(terminated, nullptr);
},
hpx::util::thread_description("background_work"));

std::shared_ptr<bool> background_running = nullptr;
thread_id_type background_thread = nullptr;

if ((scheduler.get_scheduler_mode() & policies::do_background_work) &&
num_thread < params.max_background_threads_ &&
!params.background_.empty())
{
background_thread.reset(
new thread_data(background_init, nullptr, pending));
background_thread = create_background_thread(scheduler, params,
background_running, num_thread, idle_loop_count);
}

while (true) {
Expand Down Expand Up @@ -555,22 +650,26 @@ namespace hpx { namespace threads { namespace detail
}
}

// let our background threads terminate
if (background_running)
{
*background_running = running;
}
// do background work in parcel layer and in agas
if (HPX_UNLIKELY(background_thread))
if (!call_background_thread(background_thread, next_thrd, scheduler,
num_thread, running))
{
thread_result_type background_result = (*background_thread)();
if (background_result.second.get() != nullptr)
{
if (next_thrd == nullptr)
{
next_thrd = background_result.second.get();
}
else if(background_result.second != background_thread)
{
scheduler.SchedulingPolicy::schedule_thread(
background_result.second.get(), num_thread);
}
}
// Let the current background thread terminate as soon as
// possible and give it back to the scheduler.
*background_running = false;
scheduler.schedule_thread(
background_thread.get(), num_thread,
thread_priority_critical);
// Create a new one which will replace the current such we
// avoid deadlock situations, if all background threads are
// blocked.
background_thread = create_background_thread(scheduler, params,
background_running, num_thread, idle_loop_count);
}

// call back into invoking context
Expand All @@ -587,21 +686,20 @@ namespace hpx { namespace threads { namespace detail
busy_loop_count = 0;

// do background work in parcel layer and in agas
if (HPX_UNLIKELY(background_thread))
if (!call_background_thread(background_thread, next_thrd, scheduler,
num_thread, running))
{
thread_result_type background_result = (*background_thread)();
if (background_result.second.get() != nullptr)
{
if (next_thrd == nullptr)
{
next_thrd = background_result.second.get();
}
else if(background_result.second != background_thread)
{
scheduler.SchedulingPolicy::schedule_thread(
background_result.second.get(), num_thread);
}
}
// Let the current background thread terminate as soon as
// possible and give it back to the scheduler.
*background_running = false;
scheduler.schedule_thread(
background_thread.get(), num_thread,
thread_priority_critical);
// Create a new one which will replace the current such we
// avoid deadlock situations, if all background threads are
// blocked.
background_thread = create_background_thread(scheduler, params,
background_running, num_thread, idle_loop_count);
}
}
else if ((scheduler.get_scheduler_mode() & policies::fast_idle_mode) ||
Expand Down
24 changes: 18 additions & 6 deletions src/runtime/components/server/runtime_support_server.cpp
Expand Up @@ -587,7 +587,9 @@ namespace hpx { namespace components { namespace server
// it hands over the token to machine nr.i.
threads::threadmanager_base& tm = appl.get_thread_manager();

for (std::size_t k = 0; tm.get_thread_count() > 1; ++k)
for (std::size_t k = 0;
tm.get_thread_count() > std::size_t(1 + hpx::get_os_thread_count());
++k)
{
util::detail::yield_k(k, "runtime_support::dijkstra_termination");
}
Expand Down Expand Up @@ -623,7 +625,9 @@ namespace hpx { namespace components { namespace server
applier::applier& appl = hpx::applier::get_applier();
threads::threadmanager_base& tm = appl.get_thread_manager();

for (std::size_t k = 0; tm.get_thread_count() > 1; ++k)
for (std::size_t k = 0;
tm.get_thread_count() > std::int64_t(1 + hpx::get_os_thread_count());
++k)
{
util::detail::yield_k(k,
"runtime_support::dijkstra_termination_detection");
Expand Down Expand Up @@ -682,7 +686,9 @@ namespace hpx { namespace components { namespace server
applier::applier& appl = hpx::applier::get_applier();
threads::threadmanager_base& tm = appl.get_thread_manager();

for (std::size_t k = 0; tm.get_thread_count() > 1; ++k)
for (std::size_t k = 0;
tm.get_thread_count() > std::int64_t(1 + hpx::get_os_thread_count());
++k)
{
util::detail::yield_k(k,
"runtime_support::send_dijkstra_termination_token");
Expand Down Expand Up @@ -756,7 +762,9 @@ namespace hpx { namespace components { namespace server
applier::applier& appl = hpx::applier::get_applier();
threads::threadmanager_base& tm = appl.get_thread_manager();

for (std::size_t k = 0; tm.get_thread_count() > 1; ++k)
for (std::size_t k = 0;
tm.get_thread_count() > std::int64_t(1 + hpx::get_os_thread_count());
++k)
{
util::detail::yield_k(k,
"runtime_support::dijkstra_termination_detection");
Expand Down Expand Up @@ -1085,7 +1093,9 @@ namespace hpx { namespace components { namespace server

stopped_ = true;

for (std::size_t k = 0; tm.get_thread_count() > 1; ++k)
for (std::size_t k = 0;
tm.get_thread_count() > std::int64_t(1 + hpx::get_os_thread_count());
++k)
{
// let thread-manager clean up threads
cleanup_threads(tm, l);
Expand All @@ -1106,7 +1116,9 @@ namespace hpx { namespace components { namespace server
// now we have to wait for all threads to be aborted
start_time = t.elapsed();

for (std::size_t k = 0; tm.get_thread_count() > 1; ++k)
for (std::size_t k = 0;
tm.get_thread_count() > std::int64_t(1 + hpx::get_os_thread_count());
++k)
{
// abort all suspended threads
tm.abort_all_suspended_threads();
Expand Down
1 change: 1 addition & 0 deletions src/runtime/threads/detail/thread_pool.cpp
Expand Up @@ -23,6 +23,7 @@
#include <hpx/util/logging.hpp>
#include <hpx/util/hardware/timestamp.hpp>
#include <hpx/util/high_resolution_clock.hpp>
#include <hpx/util/thread_specific_ptr.hpp>
#include <hpx/util/unlock_guard.hpp>

#include <boost/atomic.hpp>
Expand Down

0 comments on commit 10aaacc

Please sign in to comment.