Skip to content

Commit

Permalink
threadpool: Add some comments
Browse files Browse the repository at this point in the history
  • Loading branch information
ahodesuka committed Nov 27, 2017
1 parent 5df3f53 commit ee73345
Showing 1 changed file with 22 additions and 20 deletions.
42 changes: 22 additions & 20 deletions src/threadpool.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,9 @@ namespace AhoViewer
};
public:
ThreadPool() { init(); }
ThreadPool(size_t nThreads) : ma_n_idle(0), ma_n_threads(nThreads) { init(); resize(nThreads); }
ThreadPool(size_t nThreads) : ma_n_idle(0) { init(); resize(nThreads); }

// the destructor waits for all the functions in the queue to be finished
// the destructor waits for all tasks in the queue to be finished
~ThreadPool()
{
ma_shutdown = true;
Expand Down Expand Up @@ -117,7 +117,7 @@ namespace AhoViewer
// should be called from one thread, otherwise be careful to not interleave, also with interrupt()
void resize(size_t nThreads)
{
if (!ma_kill && !ma_interrupt)
if (!ma_interrupt && !ma_kill)
{
size_t oldNThreads = m_threads.size();

Expand Down Expand Up @@ -154,12 +154,14 @@ namespace AhoViewer
}
}

// Removes remaining tasks and resets the pool back to it's initial state
void kill()
{
interrupt(true);
init();
}

// Waits for remaining tasks to complete and resets the pool back to it's initial state
void wait()
{
interrupt();
Expand All @@ -171,7 +173,7 @@ namespace AhoViewer
-> std::future<typename std::result_of<F(Args...)>::type>
{
using return_type = typename std::result_of<F(Args...)>::type;
if (!ma_shutdown && !ma_kill)
if (!ma_interrupt && !ma_kill)
{
auto pck = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
Expand Down Expand Up @@ -201,9 +203,9 @@ namespace AhoViewer
delete _f; // empty the queue
}

// wait for all computing threads to finish and stop all threads
// wait for all computing threads to finish
// may be called asynchronously to not pause the calling thread while waiting
// if kill == true, all queued tasks are removed
// if kill == true, queued tasks are not completed and removed
void interrupt(bool kill = false)
{
if (kill)
Expand All @@ -223,10 +225,11 @@ namespace AhoViewer
m_cond.notify_all(); // stop all waiting threads
}
// wait for the computing threads to finish
std::unique_lock<std::mutex> lock(m_fmutex);
m_fcond.wait(lock, [ this ]
// if ma_shutdown is set the destructor will wait for the queue to finish
std::unique_lock<std::mutex> lock(m_imutex);
m_icond.wait(lock, [ this ]
{
return (ma_n_idle == m_threads.size() && m_queue.empty()) || (ma_kill && ma_interrupt);
return (ma_n_idle == m_threads.size() && m_queue.empty()) || ma_shutdown;
});
}

Expand Down Expand Up @@ -261,13 +264,13 @@ namespace AhoViewer
more_tasks = m_queue.pop(_f);
}

// the queue is empty here, wait for the next command
std::unique_lock<std::mutex> lock(m_mutex);
++ma_n_idle;
{
std::unique_lock<std::mutex> lock(m_fmutex);
m_fcond.notify_one();
std::unique_lock<std::mutex> lock(m_imutex);
m_icond.notify_one();
}
// the queue is empty here, wait for the next command
m_cond.wait(lock, [ this, &_f, &more_tasks, &abort ]
{
more_tasks = m_queue.pop(_f);
Expand All @@ -281,16 +284,15 @@ namespace AhoViewer
m_threads[i].reset(new std::thread(f));
}

std::vector<std::unique_ptr<std::thread>> m_threads;
std::vector<std::shared_ptr<std::atomic<bool>>> m_abort;
Queue<std::function<void()>*> m_queue;
std::vector<std::unique_ptr<std::thread>> m_threads;
std::vector<std::shared_ptr<std::atomic<bool>>> m_abort;
Queue<std::function<void()>*> m_queue;

std::atomic<bool> ma_interrupt, ma_kill, ma_shutdown;
std::atomic<size_t> ma_n_idle;
std::atomic<size_t> ma_n_threads;
std::atomic<bool> ma_interrupt, ma_kill, ma_shutdown;
std::atomic<size_t> ma_n_idle;

std::mutex m_mutex, m_fmutex;
std::condition_variable m_cond, m_fcond;
std::mutex m_mutex, m_imutex;
std::condition_variable m_cond, m_icond;
};
}

Expand Down

0 comments on commit ee73345

Please sign in to comment.