Skip to content

Commit

Permalink
Replace spin-lock with conditional variable for SleepTask (#3433)
Browse files Browse the repository at this point in the history
Using spin-lock when threads are waiting could sometimes result in hangs on systems with heavy load. In this PR we replace spin-lock based `SleepTask` with the one based on the standard conditional variable. The performance could be slightly better or worse comparing to what we currently have, however, this change assures no hangs will happen.
  • Loading branch information
sh1ng committed May 21, 2023
1 parent f0221d6 commit eb770bc
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 14 deletions.
28 changes: 21 additions & 7 deletions src/core/parallel/job_idle.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
#include "parallel/api.h"
#include "parallel/job_idle.h"
#include "parallel/thread_pool.h"
#include <condition_variable>
#include <mutex>
namespace dt {


Expand All @@ -37,6 +39,12 @@ Job_Idle::Job_Idle() {
}


Job_Idle::~Job_Idle() {
delete current_sleep_task_;
delete previous_sleep_task_;
}


ThreadTask* Job_Idle::get_next_task(size_t) {
return current_sleep_task_;
}
Expand Down Expand Up @@ -74,7 +82,7 @@ void Job_Idle::awaken_and_run(ThreadJob* job, size_t nthreads) {
n_threads_running_ += nth;
saved_exception_ = nullptr;

previous_sleep_task_->wake_up(nth, job);
previous_sleep_task_->wake_up(job);
thpool->workers_[0]->run_in_main_thread(job);
}

Expand Down Expand Up @@ -137,7 +145,6 @@ bool Job_Idle::is_running() const noexcept {




//------------------------------------------------------------------------------
// SleepTask
//------------------------------------------------------------------------------
Expand All @@ -149,25 +156,33 @@ SleepTask::SleepTask(Job_Idle* idle_job)

void SleepTask::execute() {
parent_->remove_running_thread();
semaphore_.wait();
{
std::unique_lock<std::mutex> lk(cv_m_);
cv_.wait(lk,[&]{return job_ != nullptr;});
}
xassert(job_);
thpool->assign_job_to_current_thread(job_);
}


void SleepTask::wake_up(int nth, ThreadJob* next_job) {
job_ = next_job;
semaphore_.signal(nth);
void SleepTask::wake_up(ThreadJob* next_job) {
{
std::lock_guard<std::mutex> lk(cv_m_);
job_ = next_job;
}
cv_.notify_all();
}


void SleepTask::fall_asleep() {
std::lock_guard<std::mutex> lk(cv_m_);
// Clear `job_` indicating that we no longer run in a parallel region.
job_ = nullptr;
}


void SleepTask::abort_current_job() {
std::lock_guard<std::mutex> lk(cv_m_);
if (job_) {
job_->abort_execution();
}
Expand All @@ -180,5 +195,4 @@ bool SleepTask::is_sleeping() const noexcept {




} // namespace dt
14 changes: 8 additions & 6 deletions src/core/parallel/job_idle.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
#include <mutex> // std::mutex
#include "parallel/semaphore.h" // LightweightSemaphore
#include "parallel/thread_job.h" // ThreadJob, ThreadTask
#include <condition_variable>

namespace dt {

class SleepTask;
Expand Down Expand Up @@ -96,6 +98,7 @@ class Job_Idle : public ThreadJob {

public:
Job_Idle();
~Job_Idle() override;

ThreadTask* get_next_task(size_t thread_index) override;

Expand Down Expand Up @@ -128,25 +131,24 @@ class Job_Idle : public ThreadJob {
};



class SleepTask : public ThreadTask {
private:
Job_Idle* const parent_;
ThreadJob* job_;
LightweightSemaphore semaphore_;
Job_Idle* const parent_;
ThreadJob* job_;
std::condition_variable cv_;
std::mutex cv_m_;

public:
SleepTask(Job_Idle*);
void execute() override;

void wake_up(int nthreads, ThreadJob* next_job);
void wake_up(ThreadJob* next_job);
void fall_asleep();
void abort_current_job();
bool is_sleeping() const noexcept;
};




} // namespace dt
#endif
1 change: 1 addition & 0 deletions src/core/parallel/thread_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ size_t get_hardware_concurrency() noexcept {
set_nthreads,
dt::doc_options_nthreads
);

}
#endif

Expand Down
2 changes: 1 addition & 1 deletion src/core/parallel/thread_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class ThreadPool
size_t num_threads_requested_;

// Scheduler used to manage sleep/awake cycle of the workers in the pool.
// See definition in thread_worker.h
// See definition in job_idle.h
Job_Idle idle_job_;

// Mutex which can be used to guard operations that must be protected
Expand Down

0 comments on commit eb770bc

Please sign in to comment.