-
Notifications
You must be signed in to change notification settings - Fork 609
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce priorities in ThreadPool #2092
Conversation
dali/pipeline/util/thread_pool.h
Outdated
* @brief Adds work to the queue with optional priority. | ||
* The work only gets queued and it will only start after invoking | ||
* `RunAll` (wakes up all threads to complete all remaining works) or | ||
* `RunWork` (wakes up a single thread to complete one work unit). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see RunWork
implemented.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, it should be DoWorkWithID
dali/pipeline/util/thread_pool.h
Outdated
return a.first < b.first; | ||
} | ||
}; | ||
std::queue<Work> work_queue__; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove
dali/pipeline/util/thread_pool.cc
Outdated
|
||
void ThreadPool::DoWorkWithID(Work work, int64_t priority) { | ||
AddWork(std::move(work), priority); | ||
adding_work_ = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not lock protected change
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch!
dali/pipeline/util/thread_pool.cc
Outdated
tl_errors_[i].pop(); | ||
throw std::runtime_error(error); | ||
} | ||
} | ||
} | ||
} | ||
|
||
void ThreadPool::RunAll(bool wait) { | ||
adding_work_ = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not lock protected change.
adding_work_ = true; | ||
} | ||
|
||
void ThreadPool::DoWorkWithID(Work work, int64_t priority) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about:
void ThreadPool::DoWorkWithID(Work work, int64_t priority, bool hold_work = false) {
AddWork(std::move(work), priority, hold_work );
condition_.notify_one();
}
void ThreadPool::AddWork(Work work, int64_t priority, bool hold_work = true) {
std::lock_guard<std::mutex> lock(mutex_);
work_queue_.push({priority, std::move(work)});
work_complete_ = false;
adding_work_ = hold_work ;
}
dali/pipeline/util/thread_pool.cc
Outdated
std::lock_guard<std::mutex> lock(mutex_); | ||
adding_work_ = false; | ||
} | ||
condition_.notify_all(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it can be notify_one()
. Worker threads already have a mechanism to notify a next worker if there is a task pending. In case the number of jobs is smaller than number of workers notify_all()
would wake more threads than needed.
Signed-off-by: Joaquin Anton <janton@nvidia.com>
Signed-off-by: Joaquin Anton <janton@nvidia.com>
Signed-off-by: Joaquin Anton <janton@nvidia.com>
6bf0eeb
to
03aa05d
Compare
!build |
CI MESSAGE: [1455543]: BUILD STARTED |
CI MESSAGE: [1455543]: BUILD FAILED |
Signed-off-by: Joaquin Anton <janton@nvidia.com>
!build |
CI MESSAGE: [1455678]: BUILD STARTED |
CI MESSAGE: [1455678]: BUILD PASSED |
Signed-off-by: Joaquin Anton janton@nvidia.com
Why we need this PR?
What happened in this PR?
Fill relevant points, put NA otherwise. Replace anything inside []
Added a priority queue in ThreadPool
Added a new pattern of using ThreadPool:
AddWork
will add work to the queue but won't start processing it, andRunAll
will wake up all the threads to process the remaining work, which will be picked in order according to the task priorityThread pool
Thread pool implementation
Unit tests and benchmark added
Doxygen
TODO as a follow-up: Modify all uses of ThreadPool in DALI operators to use the new AddWork/RunAll pattern
JIRA TASK: [DALI-1473]