-
Notifications
You must be signed in to change notification settings - Fork 509
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
MOD-6572 replace semaphore with wait in jobq #4446
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #4446 +/- ##
=======================================
Coverage 86.25% 86.26%
=======================================
Files 190 190
Lines 34842 34842
=======================================
+ Hits 30053 30056 +3
+ Misses 4789 4786 -3 ☔ View full report in Codecov by Sentry. |
wait and drain are modified accordingly.
sembm om dim - 768
rename priority_queue->num_threads_working to num_threads_not_idle uncomment LOG remove script
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice!
@@ -19,7 +19,7 @@ int ConcurrentSearch_CreatePool(int numThreads) { | |||
} | |||
int poolId = array_len(threadpools_g); | |||
threadpools_g = array_append(threadpools_g, redisearch_thpool_create(numThreads, DEFAULT_PRIVILEGED_THREADS_NUM, | |||
LogCallback)); | |||
LogCallback, "coord")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider passing the pool name as a parameter. I would at least add a comment about why we chose this name
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
2 rows above the array explictly initialized to size 1 with a comment "Only used by the coordinator, so 1 is enough"
deps/thpool/thpool.c
Outdated
|
||
/* Initialize threads if needed */ | ||
redisearch_thpool_verify_init(thpool_p); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider moving into priority_queue_push_chain
to avoid duplications
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it's the priority queue's responsibility to initialize the threads.
- I added a wrapper function
priority_queue_push_chain_init_threads
to the threadpool section priority_queue_push_chain_unsafe
: replacedredisearch_thpool_t *
argument that was used only to get the priority queue, with apriority_queue *
…lback, triggred by redis from the main thread remove thpool_init. not in use renaming of thpool structs' members remove priority_push_chain and introduce redisearch_thpool_push_chain_init_threads that encapsulate both safe push to the queue and threads verify init pthread_varrier: add volatile to cycle test_multithread: replace algo, datatype loop with an automatic generation of a function for each permutation (algo, datatype) as Test_burst_threads_sanity class attribute Thanks @GuyAv46
Co-authored-by: GuyAv46 <47632673+GuyAv46@users.noreply.github.com>
Co-authored-by: GuyAv46 <47632673+GuyAv46@users.noreply.github.com>
Co-authored-by: GuyAv46 <47632673+GuyAv46@users.noreply.github.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very well done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great job!
- Please copy the description of the PR into a confluence page (if hadn't already)
- Most of my comments are renaming suggestions for better readability
- Unit tests for this delicate code are great, let's try to make the most out of them by adding more asserts
@@ -2365,7 +2365,7 @@ void Indexes_UpgradeLegacyIndexes() { | |||
|
|||
void Indexes_ScanAndReindex() { | |||
if (!reindexPool) { | |||
reindexPool = redisearch_thpool_create(1, DEFAULT_PRIVILEGED_THREADS_NUM, LogCallback); | |||
reindexPool = redisearch_thpool_create(1, DEFAULT_PRIVILEGED_THREADS_NUM, LogCallback, "reindex"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know that the function name suggests it does reindex, but the more accurate name would be "background indexing" IMO
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the thepool name is limited to 10 charchters. Also i think that the fact that it's a thpool suggests that it's done in the bg
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't the limitation arbitrary? we can increase that if we like no?
"indexing" is also fine I guess
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, it's limited by prctl(PR_SET_NAME)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://man7.org/linux/man-pages/man2/prctl.2.html
PR_SET_NAME (since Linux 2.6.9)
Set the name of the calling thread, using the value in the
location pointed to by (char *) arg2. The name can be up
to 16 bytes long, including the terminating null byte.
redisearch_thpool_terminate_reset_threads->redisearch_thpool_terminate_threads redisearch_thpool_num_threads_working->redisearch_thpool_num_jobs_in_progress new: redisearch_thpool_is_initialized thpool.c: * Align all structs name to camelCase * redisearch_thpool_push_chain_init_threads->redisearch_thpool_push_chain_verify_init_threads * num_threads_working->num_jobs_in_progress * priority_queue_unprocessed_jobs->priority_queue_num_incomplete_jobs * THREAD_DEAD->THREAD_TERMINATE_ASAP * change_state_job->admin_job_change_state add thpool state verification in cpp-test TestTerminateWhenEmpty
Successfully created backport PR for |
* replace semaphore with wait in jobq * reset jobq run flag in verify init * add bm script * update script * update num_wrking under the jobq lock wait and drain are modified accordingly. * inistialize num_working in redisearch_thpool_verify_init * remove logs * signal one if we insert one job, otherwise broadcast sembm om dim - 768 * print every 50K queries * cleanups * move n_vectors and vecs_file to main * fix typo * script cleanups * rename priority_queue->cond to has_jobs rename priority_queue->num_threads_working to num_threads_not_idle uncomment LOG remove script * fix identations * Update deps/thpool/thpool.c Co-authored-by: GuyAv46 <47632673+GuyAv46@users.noreply.github.com> * remove thcount_lock Thcount used to lock the code block in thread_do where we realize the state is THPOOL_TERMINATE_WHEN_EMPTY and the jobq, and want to inform the other threads. However, this lock was not enough to ensure that thread won't go to sleep after the signal has already been sent. for example, once the queue is empty, for thread1 both state = THPOOL_TERMINATE_WHEN_EMPTY and jobq is empty, switch to thread2: try to pull a job from the queue, is_empty & should run both true, back to thread 1: change should_run and broadcast all other threads switch to thread2: sleep on the cond we need to make sure that both code block are protected by the same mutex: 1. thread_do changes the thpool state and signal the other threads 2. jobq_pull check if the jobq is empty and should_run and sleep on the cond. we lock these code blocks with jobqueues_rwmutex. thcount_lock is now redundant and the variable it used to guard can be atomic instead. * temp push script to run BM on ssh * remove num_threads_not_idle cond var - this condition variable was an approximate way to signal drain that all threads are idle. However, drain is more likly to wakeup due to timeout, since idealy, all threads are working as long as there are jobs in jobq. As for wait function, the threads were signaling that all are ideal before checking the queue length, so again, it doesn't indicate the real state of the queue (num_working might be 0, but there are still jobs in the queue) the condition variable was replaced with a sleeping and waking up after 100 ms. also, num working is increased if we pulled a job, and decreased when the job is done. change back to num_threads_working added description to has_jobs and should_run remove error checking from redisearch_thpool_create add num_threads_alive to stats remove unused priority_queue_len added a test to test terminate when empty added a util to create any class of thpool tests * add seed to query in poc script * remove script * remove lock from redisearch_thpool_num_threads_working unify if (job_p) in thread_do remove unused var from sleep_and_set in test_cpp_thpool * add volatile to should_run * refactor thpool * add admin priority queue *add threads manager to send admin jobs to threads *not lazy gc init to avoid bg intialization * little fix * add thpool name remove threads array from thpool struct * format thpool file * ensure job is running in redisearch_thpool_terminate_reset_threads change thpool state to THPOOL_UNINITIALIZED in redisearch_thpool_terminate_when_empty wait for num working threads to be 0 in redisearch_thpool_terminate_pause_threads thpool cpp tests: TestTerminateWhenEmpty: test recreating the threads after temination new test: TestPauseResume * add pthread_barrier implmntation if not defined * update thpool.h * thpool.h format * Don't intialize gc thpool. the gc thpool is intizlied by the timercallback, triggred by redis from the main thread remove thpool_init. not in use renaming of thpool structs' members remove priority_push_chain and introduce redisearch_thpool_push_chain_init_threads that encapsulate both safe push to the queue and threads verify init pthread_varrier: add volatile to cycle test_multithread: replace algo, datatype loop with an automatic generation of a function for each permutation (algo, datatype) as Test_burst_threads_sanity class attribute Thanks @GuyAv46 * Update tests/pytests/test_multithread.py Co-authored-by: GuyAv46 <47632673+GuyAv46@users.noreply.github.com> * Update tests/pytests/test_multithread.py Co-authored-by: GuyAv46 <47632673+GuyAv46@users.noreply.github.com> * Update tests/pytests/test_multithread.py Co-authored-by: GuyAv46 <47632673+GuyAv46@users.noreply.github.com> * improve documnetation * remove cast * thpool API: redisearch_thpool_terminate_reset_threads->redisearch_thpool_terminate_threads redisearch_thpool_num_threads_working->redisearch_thpool_num_jobs_in_progress new: redisearch_thpool_is_initialized thpool.c: * Align all structs name to camelCase * redisearch_thpool_push_chain_init_threads->redisearch_thpool_push_chain_verify_init_threads * num_threads_working->num_jobs_in_progress * priority_queue_unprocessed_jobs->priority_queue_num_incomplete_jobs * THREAD_DEAD->THREAD_TERMINATE_ASAP * change_state_job->admin_job_change_state add thpool state verification in cpp-test TestTerminateWhenEmpty * love thpool at the only once in redisearch_thpool_terminate_threads --------- Co-authored-by: GuyAv46 <47632673+GuyAv46@users.noreply.github.com> (cherry picked from commit 3946f69)
* MOD-6572 replace semaphore with wait in jobq (#4446) * replace semaphore with wait in jobq * reset jobq run flag in verify init * add bm script * update script * update num_wrking under the jobq lock wait and drain are modified accordingly. * inistialize num_working in redisearch_thpool_verify_init * remove logs * signal one if we insert one job, otherwise broadcast sembm om dim - 768 * print every 50K queries * cleanups * move n_vectors and vecs_file to main * fix typo * script cleanups * rename priority_queue->cond to has_jobs rename priority_queue->num_threads_working to num_threads_not_idle uncomment LOG remove script * fix identations * Update deps/thpool/thpool.c Co-authored-by: GuyAv46 <47632673+GuyAv46@users.noreply.github.com> * remove thcount_lock Thcount used to lock the code block in thread_do where we realize the state is THPOOL_TERMINATE_WHEN_EMPTY and the jobq, and want to inform the other threads. However, this lock was not enough to ensure that thread won't go to sleep after the signal has already been sent. for example, once the queue is empty, for thread1 both state = THPOOL_TERMINATE_WHEN_EMPTY and jobq is empty, switch to thread2: try to pull a job from the queue, is_empty & should run both true, back to thread 1: change should_run and broadcast all other threads switch to thread2: sleep on the cond we need to make sure that both code block are protected by the same mutex: 1. thread_do changes the thpool state and signal the other threads 2. jobq_pull check if the jobq is empty and should_run and sleep on the cond. we lock these code blocks with jobqueues_rwmutex. thcount_lock is now redundant and the variable it used to guard can be atomic instead. * temp push script to run BM on ssh * remove num_threads_not_idle cond var - this condition variable was an approximate way to signal drain that all threads are idle. However, drain is more likly to wakeup due to timeout, since idealy, all threads are working as long as there are jobs in jobq. As for wait function, the threads were signaling that all are ideal before checking the queue length, so again, it doesn't indicate the real state of the queue (num_working might be 0, but there are still jobs in the queue) the condition variable was replaced with a sleeping and waking up after 100 ms. also, num working is increased if we pulled a job, and decreased when the job is done. change back to num_threads_working added description to has_jobs and should_run remove error checking from redisearch_thpool_create add num_threads_alive to stats remove unused priority_queue_len added a test to test terminate when empty added a util to create any class of thpool tests * add seed to query in poc script * remove script * remove lock from redisearch_thpool_num_threads_working unify if (job_p) in thread_do remove unused var from sleep_and_set in test_cpp_thpool * add volatile to should_run * refactor thpool * add admin priority queue *add threads manager to send admin jobs to threads *not lazy gc init to avoid bg intialization * little fix * add thpool name remove threads array from thpool struct * format thpool file * ensure job is running in redisearch_thpool_terminate_reset_threads change thpool state to THPOOL_UNINITIALIZED in redisearch_thpool_terminate_when_empty wait for num working threads to be 0 in redisearch_thpool_terminate_pause_threads thpool cpp tests: TestTerminateWhenEmpty: test recreating the threads after temination new test: TestPauseResume * add pthread_barrier implmntation if not defined * update thpool.h * thpool.h format * Don't intialize gc thpool. the gc thpool is intizlied by the timercallback, triggred by redis from the main thread remove thpool_init. not in use renaming of thpool structs' members remove priority_push_chain and introduce redisearch_thpool_push_chain_init_threads that encapsulate both safe push to the queue and threads verify init pthread_varrier: add volatile to cycle test_multithread: replace algo, datatype loop with an automatic generation of a function for each permutation (algo, datatype) as Test_burst_threads_sanity class attribute Thanks @GuyAv46 * Update tests/pytests/test_multithread.py Co-authored-by: GuyAv46 <47632673+GuyAv46@users.noreply.github.com> * Update tests/pytests/test_multithread.py Co-authored-by: GuyAv46 <47632673+GuyAv46@users.noreply.github.com> * Update tests/pytests/test_multithread.py Co-authored-by: GuyAv46 <47632673+GuyAv46@users.noreply.github.com> * improve documnetation * remove cast * thpool API: redisearch_thpool_terminate_reset_threads->redisearch_thpool_terminate_threads redisearch_thpool_num_threads_working->redisearch_thpool_num_jobs_in_progress new: redisearch_thpool_is_initialized thpool.c: * Align all structs name to camelCase * redisearch_thpool_push_chain_init_threads->redisearch_thpool_push_chain_verify_init_threads * num_threads_working->num_jobs_in_progress * priority_queue_unprocessed_jobs->priority_queue_num_incomplete_jobs * THREAD_DEAD->THREAD_TERMINATE_ASAP * change_state_job->admin_job_change_state add thpool state verification in cpp-test TestTerminateWhenEmpty * love thpool at the only once in redisearch_thpool_terminate_threads --------- Co-authored-by: GuyAv46 <47632673+GuyAv46@users.noreply.github.com> (cherry picked from commit 3946f69) * in test_cpp_thpool:TestPauseResume Update the current jobs in progress used in the while condition to verify the job is pulled --------- Co-authored-by: meiravgri <109056284+meiravgri@users.noreply.github.com> Co-authored-by: meiravgri <meirav.grimberg@redis.com>
This PR significantly modifies the threadpool's pulling mechanism, setting the stage for more flexible runtime configurations.
Changing pulling synchronization machanism
Previously, the threadpool was synchronized using a semaphore-based waiting mechanism for job retrieval. This PR replaces that mechanism with a condition variable.
The revised job pulling mechanism operates as follows:
2. Threads enter a waiting state on a condition variable when the job queue is both empty and running.
3. Upon adding a new job to the queue:
Configure threadpool at runtime
Threadpool state
can be either
initialized
- hasn_threads
valid and ready to pull threads,or
uninitialized
- some or all of the threads may have exited.Threadpool state is set to
uninitialized
whenterminate_when_empty
ordestroy
are called.Removed
threads_all_idle
condition variabledrain
andwait
are internally implemented with a busy wait.calling
wait()
is equivalent to callingdrain(threshold = 0, yieldCB = nullptr)
Thread state
A thread can be in one of three states:
running
,terminate_when_empty
, ordead
.The thread state can be configured by pushing a
change_state_job
to the admin priority queue.Pause and resume
When pausing the threadpool, the jobq state is changed to
paused
. pause function will return when there are no more jobs in progress (i.enum_working_threads
== 0)Intialization
The new design assumes that the threadpool is initialized by the main thread.
All threadpool initialization is lazy and occurs upon the first push to the queue.
Since the GC pushes to the jobq from the bg, it breaks this assumption. Hence, the GC thpool initialized upon module startup.
Esthetic changes
Add name to the threadpool
The threadpool name is used to set the thepool's threads names.
a thread name is
<thpool_name>-<rand_id>.
As the maximum number of threads per thpool is 10,000,
rand_id
is a random number between 0 to 9,999.Remove
thpool->threads
arrayThis array was unnecessary and has been removed.