Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

API to specify number of threads, from threadpool, to use for the task #17

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions include/pthreadpool.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,30 @@ pthreadpool_t pthreadpool_create(size_t threads_count);
*
* @returns The number of threads in the thread pool.
*/
size_t pthreadpool_get_max_threads_count(pthreadpool_t threadpool);

/*
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we include the why?
E.g.
API to cap the number of threads used to do work, rather than those
currently available in the runtime's threadpool.

    • This is useful to counter potential performance degradation
    • of using more threads than optimal for the device and use-case
    • such as the OS scheduling threads to run on smaller cores
    • at the cost of threading overhead.

Also indicate what happens if num_threads > # threads in pool

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I need to do add that.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please follow the same pattern (including @param and @returns tag) as the other functions, make sure both pthreadpool_set_max_num_threads and pthreadpool_get_max_num_threads are documented

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is better to use pthreadpool_get_threads_count/pthreadpool_set_threads_count for the new API functions and add a new function pthreadpool_get_max_threads_count to return the number of threads in the thread pool (what pthreadpool_get_threads_count does now)

* API to enable doing work with fewer threads than available in
* threadpool.
* This API takes in a pointer to threadpool object and sets number
* of threads to use with that threadpool.
* Purpose of this is to ameliorate some perf degradation observed
* due to OS mapping a given set of threads to fewer cores.
*
* @param threadpool threadpool object.
* @param num_threads num threads to use for the subsequent tasks
* submitted using the threadpool object.
*/
void pthreadpool_set_threads_count(pthreadpool_t threadpool, size_t num_threads);

/*
* API to get number of threads to be used via threadpool. This number
* can be different from the size of the threadpool. It may have been set
* by pthreadpool_set_threads_count.
*
* @param threadpool threadpool object.
* @returns number of threads used by threadpool.
*/
size_t pthreadpool_get_threads_count(pthreadpool_t threadpool);

/**
Expand Down
42 changes: 21 additions & 21 deletions src/fastpath.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_1d_fastpath(
const pthreadpool_task_1d_t task = (pthreadpool_task_1d_t) pthreadpool_load_relaxed_void_p(&threadpool->task);
void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument);

const size_t threads_count = threadpool->threads_count.value;
const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count);
const size_t range_threshold = -threads_count;

/* Process thread's own range of items */
Expand Down Expand Up @@ -77,7 +77,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_1d_with_uarch_fastpath(
}
#endif

const size_t threads_count = threadpool->threads_count.value;
const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count);
const size_t range_threshold = -threads_count;

/* Process thread's own range of items */
Expand Down Expand Up @@ -113,7 +113,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_1d_tile_1d_fastpath(
const pthreadpool_task_1d_tile_1d_t task = (pthreadpool_task_1d_tile_1d_t) pthreadpool_load_relaxed_void_p(&threadpool->task);
void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument);

const size_t threads_count = threadpool->threads_count.value;
const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count);
const size_t range_threshold = -threads_count;

/* Process thread's own range of items */
Expand Down Expand Up @@ -155,7 +155,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_2d_fastpath(
const pthreadpool_task_2d_t task = (pthreadpool_task_2d_t) pthreadpool_load_relaxed_void_p(&threadpool->task);
void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument);

const size_t threads_count = threadpool->threads_count.value;
const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count);
const size_t range_threshold = -threads_count;

/* Process thread's own range of items */
Expand Down Expand Up @@ -201,7 +201,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_2d_tile_1d_fastpath(
const pthreadpool_task_2d_tile_1d_t task = (pthreadpool_task_2d_tile_1d_t) pthreadpool_load_relaxed_void_p(&threadpool->task);
void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument);

const size_t threads_count = threadpool->threads_count.value;
const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count);
const size_t range_threshold = -threads_count;

/* Process thread's own range of items */
Expand Down Expand Up @@ -251,7 +251,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_2d_tile_2d_fastpath(
const pthreadpool_task_2d_tile_2d_t task = (pthreadpool_task_2d_tile_2d_t) pthreadpool_load_relaxed_void_p(&threadpool->task);
void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument);

const size_t threads_count = threadpool->threads_count.value;
const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count);
const size_t range_threshold = -threads_count;

/* Process thread's own range of items */
Expand Down Expand Up @@ -313,7 +313,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_2d_tile_2d_with_uarch_f
}
#endif

const size_t threads_count = threadpool->threads_count.value;
const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count);
const size_t range_threshold = -threads_count;

/* Process thread's own range of items */
Expand Down Expand Up @@ -366,7 +366,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_3d_fastpath(
const pthreadpool_task_3d_t task = (pthreadpool_task_3d_t) pthreadpool_load_relaxed_void_p(&threadpool->task);
void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument);

const size_t threads_count = threadpool->threads_count.value;
const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count);
const size_t range_threshold = -threads_count;

/* Process thread's own range of items */
Expand Down Expand Up @@ -419,7 +419,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_3d_tile_1d_fastpath(
const pthreadpool_task_3d_tile_1d_t task = (pthreadpool_task_3d_tile_1d_t) pthreadpool_load_relaxed_void_p(&threadpool->task);
void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument);

const size_t threads_count = threadpool->threads_count.value;
const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count);
const size_t range_threshold = -threads_count;

/* Process thread's own range of items */
Expand Down Expand Up @@ -476,7 +476,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_3d_tile_2d_fastpath(
const pthreadpool_task_3d_tile_2d_t task = (pthreadpool_task_3d_tile_2d_t) pthreadpool_load_relaxed_void_p(&threadpool->task);
void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument);

const size_t threads_count = threadpool->threads_count.value;
const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count);
const size_t range_threshold = -threads_count;

/* Process thread's own range of items */
Expand Down Expand Up @@ -546,7 +546,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_3d_tile_2d_with_uarch_f
}
#endif

const size_t threads_count = threadpool->threads_count.value;
const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count);
const size_t range_threshold = -threads_count;

/* Process thread's own range of items */
Expand Down Expand Up @@ -607,7 +607,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_4d_fastpath(
const pthreadpool_task_4d_t task = (pthreadpool_task_4d_t) pthreadpool_load_relaxed_void_p(&threadpool->task);
void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument);

const size_t threads_count = threadpool->threads_count.value;
const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count);
const size_t range_threshold = -threads_count;

/* Process thread's own range of items */
Expand Down Expand Up @@ -668,7 +668,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_4d_tile_1d_fastpath(
const pthreadpool_task_4d_tile_1d_t task = (pthreadpool_task_4d_tile_1d_t) pthreadpool_load_relaxed_void_p(&threadpool->task);
void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument);

const size_t threads_count = threadpool->threads_count.value;
const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count);
const size_t range_threshold = -threads_count;

/* Process thread's own range of items */
Expand Down Expand Up @@ -733,7 +733,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_4d_tile_2d_fastpath(
const pthreadpool_task_4d_tile_2d_t task = (pthreadpool_task_4d_tile_2d_t) pthreadpool_load_relaxed_void_p(&threadpool->task);
void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument);

const size_t threads_count = threadpool->threads_count.value;
const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count);
const size_t range_threshold = -threads_count;

/* Process thread's own range of items */
Expand Down Expand Up @@ -810,7 +810,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_4d_tile_2d_with_uarch_f
}
#endif

const size_t threads_count = threadpool->threads_count.value;
const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count);
const size_t range_threshold = -threads_count;

/* Process thread's own range of items */
Expand Down Expand Up @@ -878,7 +878,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_5d_fastpath(
const pthreadpool_task_5d_t task = (pthreadpool_task_5d_t) pthreadpool_load_relaxed_void_p(&threadpool->task);
void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument);

const size_t threads_count = threadpool->threads_count.value;
const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count);
const size_t range_threshold = -threads_count;

/* Process thread's own range of items */
Expand Down Expand Up @@ -946,7 +946,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_5d_tile_1d_fastpath(
const pthreadpool_task_5d_tile_1d_t task = (pthreadpool_task_5d_tile_1d_t) pthreadpool_load_relaxed_void_p(&threadpool->task);
void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument);

const size_t threads_count = threadpool->threads_count.value;
const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count);
const size_t range_threshold = -threads_count;

/* Process thread's own range of items */
Expand Down Expand Up @@ -1019,7 +1019,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_5d_tile_2d_fastpath(
const pthreadpool_task_5d_tile_2d_t task = (pthreadpool_task_5d_tile_2d_t) pthreadpool_load_relaxed_void_p(&threadpool->task);
void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument);

const size_t threads_count = threadpool->threads_count.value;
const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count);
const size_t range_threshold = -threads_count;

/* Process thread's own range of items */
Expand Down Expand Up @@ -1095,7 +1095,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_6d_fastpath(
const pthreadpool_task_6d_t task = (pthreadpool_task_6d_t) pthreadpool_load_relaxed_void_p(&threadpool->task);
void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument);

const size_t threads_count = threadpool->threads_count.value;
const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count);
const size_t range_threshold = -threads_count;

/* Process thread's own range of items */
Expand Down Expand Up @@ -1171,7 +1171,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_6d_tile_1d_fastpath(
const pthreadpool_task_6d_tile_1d_t task = (pthreadpool_task_6d_tile_1d_t) pthreadpool_load_relaxed_void_p(&threadpool->task);
void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument);

const size_t threads_count = threadpool->threads_count.value;
const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count);
const size_t range_threshold = -threads_count;

/* Process thread's own range of items */
Expand Down Expand Up @@ -1252,7 +1252,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_6d_tile_2d_fastpath(
const pthreadpool_task_6d_tile_2d_t task = (pthreadpool_task_6d_tile_2d_t) pthreadpool_load_relaxed_void_p(&threadpool->task);
void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument);

const size_t threads_count = threadpool->threads_count.value;
const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count);
const size_t range_threshold = -threads_count;

/* Process thread's own range of items */
Expand Down
26 changes: 18 additions & 8 deletions src/gcd.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ static void thread_main(void* arg, size_t thread_index) {
}
}

struct pthreadpool* pthreadpool_create(size_t threads_count) {
if (threads_count == 0) {
struct pthreadpool* pthreadpool_create(size_t max_threads_count) {
if (max_threads_count == 0) {
int threads = 1;
size_t sizeof_threads = sizeof(threads);
if (sysctlbyname("hw.logicalcpu_max", &threads, &sizeof_threads, NULL, 0) != 0) {
Expand All @@ -54,25 +54,34 @@ struct pthreadpool* pthreadpool_create(size_t threads_count) {
return NULL;
}

threads_count = (size_t) threads;
max_threads_count = (size_t) threads;
}

struct pthreadpool* threadpool = pthreadpool_allocate(threads_count);
struct pthreadpool* threadpool = pthreadpool_allocate(max_threads_count);
if (threadpool == NULL) {
return NULL;
}
threadpool->threads_count = fxdiv_init_size_t(threads_count);
for (size_t tid = 0; tid < threads_count; tid++) {
threadpool->max_threads_count = fxdiv_init_size_t(max_threads_count);
pthreadpool_store_relaxed_size_t(&threadpool->threads_count, max_threads_count);
for (size_t tid = 0; tid < max_threads_count; tid++) {
threadpool->threads[tid].thread_number = tid;
}

/* Thread pool with a single thread computes everything on the caller thread. */
if (threads_count > 1) {
if (max_threads_count > 1) {
threadpool->execution_semaphore = dispatch_semaphore_create(1);
}
return threadpool;
}

void pthreadpool_set_threads_count(struct pthreadpool* threadpool, size_t num_threads) {
dispatch_semaphore_wait(threadpool->execution_semaphore, DISPATCH_TIME_FOREVER);
const struct fxdiv_divisor_size_t max_threads_count = threadpool->max_threads_count;
const size_t num_threads_to_use = min(max_threads_count.value, num_threads);
pthreadpool_store_relaxed_size_t(&threadpool->threads_count, num_threads_to_use);
dispatch_semaphore_signal(threadpool->execution_semaphore);
}

PTHREADPOOL_INTERNAL void pthreadpool_parallelize(
struct pthreadpool* threadpool,
thread_function_t thread_function,
Expand All @@ -98,7 +107,8 @@ PTHREADPOOL_INTERNAL void pthreadpool_parallelize(
pthreadpool_store_relaxed_uint32_t(&threadpool->flags, flags);

/* Locking of completion_mutex not needed: readers are sleeping on command_condvar */
const struct fxdiv_divisor_size_t threads_count = threadpool->threads_count;
const struct fxdiv_divisor_size_t threads_count =
fxdiv_init_size_t(pthreadpool_load_relaxed_size_t(&threadpool->threads_count));

if (params_size != 0) {
memcpy(&threadpool->params, params, params_size);
Expand Down
8 changes: 4 additions & 4 deletions src/memory.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@


PTHREADPOOL_INTERNAL struct pthreadpool* pthreadpool_allocate(
size_t threads_count)
size_t max_threads_count)
{
assert(threads_count >= 1);
assert(max_threads_count >= 1);

const size_t threadpool_size = sizeof(struct pthreadpool) + threads_count * sizeof(struct thread_info);
const size_t threadpool_size = sizeof(struct pthreadpool) + max_threads_count * sizeof(struct thread_info);
struct pthreadpool* threadpool = NULL;
#if defined(__ANDROID__)
/*
Expand Down Expand Up @@ -55,7 +55,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_deallocate(
{
assert(threadpool != NULL);

const size_t threadpool_size = sizeof(struct pthreadpool) + threadpool->threads_count.value * sizeof(struct thread_info);
const size_t threadpool_size = sizeof(struct pthreadpool) + threadpool->max_threads_count.value * sizeof(struct thread_info);
memset(threadpool, 0, threadpool_size);

#ifdef _WIN32
Expand Down
Loading