Skip to content

Commit

Permalink
Use pthreadpool object to store max number of threads to use
Browse files Browse the repository at this point in the history
Summary:
This commits changes API to set max num threads. It applies the limit to
the pthreadpool object.

Test Plan:

Reviewers:

Subscribers:

Tasks:

Tags:
  • Loading branch information
kimishpatel committed Nov 9, 2021
1 parent 9ef46d3 commit f5d213f
Show file tree
Hide file tree
Showing 7 changed files with 54 additions and 73 deletions.
2 changes: 1 addition & 1 deletion include/pthreadpool.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ size_t pthreadpool_get_threads_count(pthreadpool_t threadpool);
* Purpose of this is to ameliorate some perf degradation observed
* due to OS mapping a given set of threads to fewer cores.
*/
void pthreadpool_set_max_num_threads(size_t num_threads);
void pthreadpool_set_max_num_threads(struct pthreadpool* threadpool, size_t num_threads);
size_t pthreadpool_get_max_num_threads();

/**
Expand Down
15 changes: 8 additions & 7 deletions src/gcd.c
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,12 @@ struct pthreadpool* pthreadpool_create(size_t threads_count) {
return threadpool;
}

void pthreadpool_set_max_num_threads(size_t num_threads) {
max_num_threads = num_threads;
}

size_t pthreadpool_get_max_num_threads() {
return max_num_threads;
void pthreadpool_set_max_num_threads(struct pthreadpool* threadpool, size_t num_threads) {
pthread_mutex_lock(&threadpool->execution_mutex);
const struct fxdiv_divisor_size_t threads_count = threadpool->threads_count;
const size_t num_threads_to_use_ = min(threads_count.value, num_threads);
pthreadpool_store_relaxed_size_t(&threadpool->num_threads_to_use, num_threads_to_use_);
pthread_mutex_unlock(&threadpool->execution_mutex);
}

PTHREADPOOL_INTERNAL void pthreadpool_parallelize(
Expand Down Expand Up @@ -109,8 +109,9 @@ PTHREADPOOL_INTERNAL void pthreadpool_parallelize(
pthreadpool_store_relaxed_uint32_t(&threadpool->flags, flags);

/* Locking of completion_mutex not needed: readers are sleeping on command_condvar */
const size_t num_threads_to_use = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use);
const struct fxdiv_result_size_tsize_t threads_count =
fxdiv_init_size_t(min(threadpool->threads_count.value, pthreadpool_get_max_num_threads()));
fxdiv_init_size_t(min(threadpool->threads_count.value, num_threads_to_use);

if (params_size != 0) {
memcpy(&threadpool->params, params, params_size);
Expand Down
42 changes: 21 additions & 21 deletions src/portable-api.c
Original file line number Diff line number Diff line change
Expand Up @@ -1235,7 +1235,7 @@ void pthreadpool_parallelize_1d(
uint32_t flags)
{
size_t threads_count;
const size_t num_threads_to_use = pthreadpool_get_max_num_threads();
const size_t num_threads_to_use = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use);
threads_count = min(threadpool->threads_count.value, num_threads_to_use);
if (threadpool == NULL || threads_count <= 1 || range <= 1) {
/* No thread pool used: execute task sequentially on the calling thread */
Expand Down Expand Up @@ -1274,7 +1274,7 @@ void pthreadpool_parallelize_1d_with_uarch(
uint32_t flags)
{
size_t threads_count;
const size_t num_threads_to_use = pthreadpool_get_max_num_threads();
const size_t num_threads_to_use = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use);
threads_count = min(threadpool->threads_count.value, num_threads_to_use);
if (threadpool == NULL || threads_count <= 1 || range <= 1) {
/* No thread pool used: execute task sequentially on the calling thread */
Expand Down Expand Up @@ -1325,7 +1325,7 @@ void pthreadpool_parallelize_1d_tile_1d(
uint32_t flags)
{
size_t threads_count;
const size_t num_threads_to_use = pthreadpool_get_max_num_threads();
const size_t num_threads_to_use = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use);
threads_count = min(threadpool->threads_count.value, num_threads_to_use);
if (threadpool == NULL || threads_count <= 1 || range <= tile) {
/* No thread pool used: execute task sequentially on the calling thread */
Expand Down Expand Up @@ -1368,7 +1368,7 @@ void pthreadpool_parallelize_2d(
uint32_t flags)
{
size_t threads_count;
const size_t num_threads_to_use = pthreadpool_get_max_num_threads();
const size_t num_threads_to_use = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use);
threads_count = min(threadpool->threads_count.value, num_threads_to_use);
if (threadpool == NULL || threads_count <= 1 || (range_i | range_j) <= 1) {
/* No thread pool used: execute task sequentially on the calling thread */
Expand Down Expand Up @@ -1413,7 +1413,7 @@ void pthreadpool_parallelize_2d_tile_1d(
uint32_t flags)
{
size_t threads_count;
const size_t num_threads_to_use = pthreadpool_get_max_num_threads();
const size_t num_threads_to_use = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use);
threads_count = min(threadpool->threads_count.value, num_threads_to_use);
if (threadpool == NULL || threads_count <= 1 || (range_i <= 1 && range_j <= tile_j)) {
/* No thread pool used: execute task sequentially on the calling thread */
Expand Down Expand Up @@ -1462,7 +1462,7 @@ void pthreadpool_parallelize_2d_tile_2d(
uint32_t flags)
{
size_t threads_count;
const size_t num_threads_to_use = pthreadpool_get_max_num_threads();
const size_t num_threads_to_use = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use);
threads_count = min(threadpool->threads_count.value, num_threads_to_use);
if (threadpool == NULL || threads_count <= 1 || (range_i <= tile_i && range_j <= tile_j)) {
/* No thread pool used: execute task sequentially on the calling thread */
Expand Down Expand Up @@ -1516,7 +1516,7 @@ void pthreadpool_parallelize_2d_tile_2d_with_uarch(
uint32_t flags)
{
size_t threads_count;
const size_t num_threads_to_use = pthreadpool_get_max_num_threads();
const size_t num_threads_to_use = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use);
threads_count = min(threadpool->threads_count.value, num_threads_to_use);
if (threadpool == NULL || threads_count <= 1 || (range_i <= tile_i && range_j <= tile_j)) {
/* No thread pool used: execute task sequentially on the calling thread */
Expand Down Expand Up @@ -1578,7 +1578,7 @@ void pthreadpool_parallelize_3d(
uint32_t flags)
{
size_t threads_count;
const size_t num_threads_to_use = pthreadpool_get_max_num_threads();
const size_t num_threads_to_use = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use);
threads_count = min(threadpool->threads_count.value, num_threads_to_use);
if (threadpool == NULL || threads_count <= 1 || (range_i | range_j | range_k) <= 1) {
/* No thread pool used: execute task sequentially on the calling thread */
Expand Down Expand Up @@ -1627,7 +1627,7 @@ void pthreadpool_parallelize_3d_tile_1d(
uint32_t flags)
{
size_t threads_count;
const size_t num_threads_to_use = pthreadpool_get_max_num_threads();
const size_t num_threads_to_use = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use);
threads_count = min(threadpool->threads_count.value, num_threads_to_use);
if (threadpool == NULL || threads_count <= 1 || ((range_i | range_j) <= 1 && range_k <= tile_k)) {
/* No thread pool used: execute task sequentially on the calling thread */
Expand Down Expand Up @@ -1680,7 +1680,7 @@ void pthreadpool_parallelize_3d_tile_2d(
uint32_t flags)
{
size_t threads_count;
const size_t num_threads_to_use = pthreadpool_get_max_num_threads();
const size_t num_threads_to_use = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use);
threads_count = min(threadpool->threads_count.value, num_threads_to_use);
if (threadpool == NULL || threads_count <= 1 || (range_i <= 1 && range_j <= tile_j && range_k <= tile_k)) {
/* No thread pool used: execute task sequentially on the calling thread */
Expand Down Expand Up @@ -1738,7 +1738,7 @@ void pthreadpool_parallelize_3d_tile_2d_with_uarch(
uint32_t flags)
{
size_t threads_count;
const size_t num_threads_to_use = pthreadpool_get_max_num_threads();
const size_t num_threads_to_use = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use);
threads_count = min(threadpool->threads_count.value, num_threads_to_use);
if (threadpool == NULL || threads_count <= 1 || (range_i <= 1 && range_j <= tile_j && range_k <= tile_k)) {
/* No thread pool used: execute task sequentially on the calling thread */
Expand Down Expand Up @@ -1804,7 +1804,7 @@ void pthreadpool_parallelize_4d(
uint32_t flags)
{
size_t threads_count;
const size_t num_threads_to_use = pthreadpool_get_max_num_threads();
const size_t num_threads_to_use = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use);
threads_count = min(threadpool->threads_count.value, num_threads_to_use);
if (threadpool == NULL || threads_count <= 1 || (range_i | range_j | range_k | range_l) <= 1) {
/* No thread pool used: execute task sequentially on the calling thread */
Expand Down Expand Up @@ -1859,7 +1859,7 @@ void pthreadpool_parallelize_4d_tile_1d(
uint32_t flags)
{
size_t threads_count;
const size_t num_threads_to_use = pthreadpool_get_max_num_threads();
const size_t num_threads_to_use = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use);
threads_count = min(threadpool->threads_count.value, num_threads_to_use);
if (threadpool == NULL || threads_count <= 1 || ((range_i | range_j | range_k) <= 1 && range_l <= tile_l)) {
/* No thread pool used: execute task sequentially on the calling thread */
Expand Down Expand Up @@ -1918,7 +1918,7 @@ void pthreadpool_parallelize_4d_tile_2d(
uint32_t flags)
{
size_t threads_count;
const size_t num_threads_to_use = pthreadpool_get_max_num_threads();
const size_t num_threads_to_use = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use);
threads_count = min(threadpool->threads_count.value, num_threads_to_use);
if (threadpool == NULL || threads_count <= 1 || ((range_i | range_j) <= 1 && range_k <= tile_k && range_l <= tile_l)) {
/* No thread pool used: execute task sequentially on the calling thread */
Expand Down Expand Up @@ -1981,7 +1981,7 @@ void pthreadpool_parallelize_4d_tile_2d_with_uarch(
uint32_t flags)
{
size_t threads_count;
const size_t num_threads_to_use = pthreadpool_get_max_num_threads();
const size_t num_threads_to_use = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use);
threads_count = min(threadpool->threads_count.value, num_threads_to_use);
if (threadpool == NULL || threads_count <= 1 || ((range_i | range_j) <= 1 && range_k <= tile_k && range_l <= tile_l)) {
/* No thread pool used: execute task sequentially on the calling thread */
Expand Down Expand Up @@ -2052,7 +2052,7 @@ void pthreadpool_parallelize_5d(
uint32_t flags)
{
size_t threads_count;
const size_t num_threads_to_use = pthreadpool_get_max_num_threads();
const size_t num_threads_to_use = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use);
threads_count = min(threadpool->threads_count.value, num_threads_to_use);
if (threadpool == NULL || threads_count <= 1 || (range_i | range_j | range_k | range_l | range_m) <= 1) {
/* No thread pool used: execute task sequentially on the calling thread */
Expand Down Expand Up @@ -2111,7 +2111,7 @@ void pthreadpool_parallelize_5d_tile_1d(
uint32_t flags)
{
size_t threads_count;
const size_t num_threads_to_use = pthreadpool_get_max_num_threads();
const size_t num_threads_to_use = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use);
threads_count = min(threadpool->threads_count.value, num_threads_to_use);
if (threadpool == NULL || threads_count <= 1 || ((range_i | range_j | range_k | range_l) <= 1 && range_m <= tile_m)) {
/* No thread pool used: execute task sequentially on the calling thread */
Expand Down Expand Up @@ -2174,7 +2174,7 @@ void pthreadpool_parallelize_5d_tile_2d(
uint32_t flags)
{
size_t threads_count;
const size_t num_threads_to_use = pthreadpool_get_max_num_threads();
const size_t num_threads_to_use = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use);
threads_count = min(threadpool->threads_count.value, num_threads_to_use);
if (threadpool == NULL || threads_count <= 1 || ((range_i | range_j | range_k) <= 1 && range_l <= tile_l && range_m <= tile_m)) {
/* No thread pool used: execute task sequentially on the calling thread */
Expand Down Expand Up @@ -2238,7 +2238,7 @@ void pthreadpool_parallelize_6d(
uint32_t flags)
{
size_t threads_count;
const size_t num_threads_to_use = pthreadpool_get_max_num_threads();
const size_t num_threads_to_use = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use);
threads_count = min(threadpool->threads_count.value, num_threads_to_use);
if (threadpool == NULL || threads_count <= 1 || (range_i | range_j | range_k | range_l | range_m | range_n) <= 1) {
/* No thread pool used: execute task sequentially on the calling thread */
Expand Down Expand Up @@ -2301,7 +2301,7 @@ void pthreadpool_parallelize_6d_tile_1d(
uint32_t flags)
{
size_t threads_count;
const size_t num_threads_to_use = pthreadpool_get_max_num_threads();
const size_t num_threads_to_use = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use);
threads_count = min(threadpool->threads_count.value, num_threads_to_use);
if (threadpool == NULL || threads_count <= 1 || ((range_i | range_j | range_k | range_l | range_m) <= 1 && range_n <= tile_n)) {
/* No thread pool used: execute task sequentially on the calling thread */
Expand Down Expand Up @@ -2368,7 +2368,7 @@ void pthreadpool_parallelize_6d_tile_2d(
uint32_t flags)
{
size_t threads_count;
const size_t num_threads_to_use = pthreadpool_get_max_num_threads();
const size_t num_threads_to_use = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use);
threads_count = min(threadpool->threads_count.value, num_threads_to_use);
if (threadpool == NULL || threads_count <= 1 || ((range_i | range_j | range_k | range_l) <= 1 && range_m <= tile_m && range_n <= tile_n)) {
/* No thread pool used: execute task sequentially on the calling thread */
Expand Down
17 changes: 7 additions & 10 deletions src/pthreads.c
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,6 @@
#include "threadpool-object.h"
#include "threadpool-utils.h"

thread_local size_t max_num_threads = UINT_MAX;

#if PTHREADPOOL_USE_FUTEX
#if defined(__linux__)
static int futex_wait(pthreadpool_atomic_uint32_t* address, uint32_t value) {
Expand Down Expand Up @@ -297,12 +295,12 @@ struct pthreadpool* pthreadpool_create(size_t threads_count) {
return threadpool;
}

void pthreadpool_set_max_num_threads(size_t num_threads) {
max_num_threads = num_threads;
}

size_t pthreadpool_get_max_num_threads() {
return max_num_threads;
void pthreadpool_set_max_num_threads(struct pthreadpool* threadpool, size_t num_threads) {
pthread_mutex_lock(&threadpool->execution_mutex);
const struct fxdiv_divisor_size_t threads_count = threadpool->threads_count;
const size_t num_threads_to_use_ = min(threads_count.value, num_threads);
pthreadpool_store_release_size_t(&threadpool->num_threads_to_use, num_threads_to_use_);
pthread_mutex_unlock(&threadpool->execution_mutex);
}

PTHREADPOOL_INTERNAL void pthreadpool_parallelize(
Expand All @@ -324,7 +322,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_parallelize(
pthread_mutex_lock(&threadpool->execution_mutex);

const struct fxdiv_divisor_size_t threads_count = threadpool->threads_count;
size_t max_threads_to_use = pthreadpool_get_max_num_threads();
size_t max_threads_to_use = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use);
const struct fxdiv_divisor_size_t num_threads_to_use = fxdiv_init_size_t(min(threads_count.value, max_threads_to_use));
#if !PTHREADPOOL_USE_FUTEX
/* Lock the command variables to ensure that threads don't start processing before they observe complete command with all arguments */
Expand All @@ -341,7 +339,6 @@ PTHREADPOOL_INTERNAL void pthreadpool_parallelize(

/* Locking of completion_mutex not needed: readers are sleeping on command_condvar */
pthreadpool_store_relaxed_size_t(&threadpool->active_threads, num_threads_to_use.value - 1 /* caller thread */);
pthreadpool_store_relaxed_size_t(&threadpool->num_threads_to_use, num_threads_to_use.value);
#if PTHREADPOOL_USE_FUTEX
pthreadpool_store_relaxed_uint32_t(&threadpool->has_active_threads, 1);
#endif
Expand Down
16 changes: 0 additions & 16 deletions src/threadpool-common.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,19 +73,3 @@
#define PTHREADPOOL_INTERNAL
#endif
#endif

// ported from: https://stackoverflow.com/questions/18298280/how-to-declare-a-variable-as-thread-local-portably
/* gcc doesn't know _Thread_local from C11 yet */
#ifdef __GNUC__
# define thread_local __thread
/*
// c11 standard already has thread_local specified
// https://en.cppreference.com/w/c/thread/thread_local
#elif __STDC_VERSION__ >= 201112L
# define thread_local _Thread_local
*/
#elif defined(_MSC_VER)
# define thread_local __declspec(thread)
#else
# error Cannot define thread_local
#endif
15 changes: 7 additions & 8 deletions src/windows.c
Original file line number Diff line number Diff line change
Expand Up @@ -195,12 +195,12 @@ struct pthreadpool* pthreadpool_create(size_t threads_count) {
return threadpool;
}

void pthreadpool_set_max_num_threads(size_t num_threads) {
max_num_threads = num_threads;
}

size_t pthreadpool_get_max_num_threads() {
return max_num_threads;
void pthreadpool_set_max_num_threads(struct pthreadpool* threadpool, size_t num_threads) {
pthread_mutex_lock(&threadpool->execution_mutex);
const struct fxdiv_divisor_size_t threads_count = threadpool->threads_count;
const size_t num_threads_to_use_ = min(threads_count.value, num_threads);
pthreadpool_store_relaxed_size_t(&threadpool->num_threads_to_use, num_threads_to_use_);
pthread_mutex_unlock(&threadpool->execution_mutex);
}

PTHREADPOOL_INTERNAL void pthreadpool_parallelize(
Expand All @@ -223,7 +223,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_parallelize(
assert(wait_status == WAIT_OBJECT_0);

const struct fxdiv_divisor_size_t threads_count = threadpool->threads_count;
size_t max_threads_to_use = pthreadpool_get_max_num_threads();
size_t max_threads_to_use = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use);
const struct fxdiv_divisor_size_t num_threads_to_use = fxdiv_init_size_t(min(threads_count.value, max_threads_to_use));

/* Setup global arguments */
Expand All @@ -233,7 +233,6 @@ PTHREADPOOL_INTERNAL void pthreadpool_parallelize(
pthreadpool_store_relaxed_uint32_t(&threadpool->flags, flags);

pthreadpool_store_relaxed_size_t(&threadpool->active_threads, num_threads_to_use.value - 1 /* caller thread */);
pthreadpool_store_relaxed_size_t(&threadpool->num_threads_to_use, num_threads_to_use.value);

if (params_size != 0) {
CopyMemory(&threadpool->params, params, params_size);
Expand Down
Loading

0 comments on commit f5d213f

Please sign in to comment.