From 160fc8de96503bcd5b32f900fe4d65a2296c3937 Mon Sep 17 00:00:00 2001 From: Max Krasnyansky Date: Mon, 12 Aug 2024 19:04:01 -0700 Subject: [PATCH 1/2] threadpool: reduce the number of barrier required New work is now indicated with an atomic counter that is incremented for each new graph that needs to be computed. This removes the need for extra barrier for clearing the "new_work" and removes the special case for trivial graphs. --- ggml/src/ggml.c | 78 +++++++++++++++++++++---------------------------- 1 file changed, 34 insertions(+), 44 deletions(-) diff --git a/ggml/src/ggml.c b/ggml/src/ggml.c index f0c0f4fb0a29a..3e3061279517a 100644 --- a/ggml/src/ggml.c +++ b/ggml/src/ggml.c @@ -1959,13 +1959,13 @@ struct ggml_compute_threadpool { struct ggml_cplan * cplan; // synchronization primitives + atomic_int n_graph; // incremented when there is work to be done (i.e each graph) atomic_int n_barrier; atomic_int n_barrier_passed; atomic_int current_chunk; // currently processing chunk during Mat_Mul, shared between all the threads. volatile bool stop; // Used for stopping the threadpool altogether volatile bool pause; // Used for pausing the threadpool or individual threads - volatile bool new_work; // Set when there is work to be done, unset after it's done struct ggml_compute_state * workers; // per thread state int32_t n_threads_max; // number of threads in the pool @@ -1987,6 +1987,8 @@ struct ggml_compute_state { ggml_thread_t thrd; bool cpumask[GGML_MAX_N_THREADS]; bool mask_specified; + int last_graph; + bool pending; #endif struct ggml_compute_threadpool * threadpool; int ith; @@ -19197,55 +19199,39 @@ static thread_ret_t ggml_graph_compute_thread(void * data) { /*.threadpool=*/ state->threadpool, }; - struct ggml_tensor * node = cgraph->nodes[0]; + for (int node_n = 0; node_n < cgraph->n_nodes; node_n++) { + struct ggml_tensor * node = cgraph->nodes[node_n]; - ggml_compute_forward(¶ms, node); - if (state->ith == 0 && cplan->abort_callback && cplan->abort_callback(cplan->abort_callback_data)) { - state->threadpool->ec = GGML_STATUS_ABORTED; - } - - for (int node_n = 1; node_n < cgraph->n_nodes; node_n++) { - ggml_barrier(state->threadpool); - - if (state->threadpool->ec != GGML_STATUS_SUCCESS) { - break; - } - - node = cgraph->nodes[node_n]; ggml_compute_forward(¶ms, node); if (state->ith == 0 && cplan->abort_callback && cplan->abort_callback(cplan->abort_callback_data)) { state->threadpool->ec = GGML_STATUS_ABORTED; } - } - if (cgraph->n_nodes == 1) { - // We need a barrier before disabling new_work in case we have a trivial graph ggml_barrier(state->threadpool); - } - if (!state->threadpool->disposable && state->ith == 0) { - // Don't need a lock, because there is a barrier after this, and only after that - // do the secondary threads go into standby - state->threadpool->new_work = false; + if (state->threadpool->ec != GGML_STATUS_SUCCESS) { + break; + } } - ggml_barrier(state->threadpool); - return 0; } #ifndef GGML_USE_OPENMP -static inline bool ggml_graph_compute_got_work(struct ggml_compute_state *state) { - struct ggml_compute_threadpool * threadpool = state->threadpool; - return (threadpool->new_work && state->ith < threadpool->n_threads_cur); -} - static inline bool ggml_graph_compute_ready(struct ggml_compute_state * state) { struct ggml_compute_threadpool * threadpool = state->threadpool; - if (threadpool->stop || threadpool->pause) return true; - return ggml_graph_compute_got_work(state); + if (threadpool->stop || threadpool->pause || state->pending) { return true; } + + // check for new graph/work + int new_graph = atomic_load_explicit(&threadpool->n_graph, memory_order_relaxed); + if (new_graph != state->last_graph) { + state->pending = (state->ith < threadpool->n_threads_cur); + state->last_graph = new_graph; + } + + return state->pending; } static inline bool ggml_graph_compute_poll_for_work(struct ggml_compute_state * state) { @@ -19260,14 +19246,14 @@ static inline bool ggml_graph_compute_poll_for_work(struct ggml_compute_state * __cpu_relax(); } - return ggml_graph_compute_got_work(state); + return state->pending; } -static bool ggml_graph_compute_check_for_work(struct ggml_compute_state * state) { +static inline bool ggml_graph_compute_check_for_work(struct ggml_compute_state * state) { struct ggml_compute_threadpool * threadpool = state->threadpool; if (ggml_graph_compute_poll_for_work(state)) { - return ggml_graph_compute_got_work(state); + return state->pending; } ggml_mutex_lock_shared(&threadpool->mutex); @@ -19278,7 +19264,7 @@ static bool ggml_graph_compute_check_for_work(struct ggml_compute_state * state) } ggml_mutex_unlock_shared(&threadpool->mutex); - return ggml_graph_compute_got_work(state); + return state->pending; } static thread_ret_t ggml_graph_compute_secondary_thread(void* data) { @@ -19308,8 +19294,10 @@ static thread_ret_t ggml_graph_compute_secondary_thread(void* data) { // Check if there is new work // The main thread is the only one that can dispatch new work - bool new_work = ggml_graph_compute_check_for_work(state); - if (new_work) { + ggml_graph_compute_check_for_work(state); + if (state->pending) { + state->pending = false; + int64_t ret = (int64_t) ggml_graph_compute_thread(state); if (ret == GGML_EXIT_ABORTED) return (thread_ret_t) ret; @@ -19350,12 +19338,12 @@ static struct ggml_compute_threadpool * ggml_create_threadpool_impl( { threadpool->cgraph = cgraph; threadpool->cplan = cplan; + threadpool->n_graph = 0; threadpool->n_barrier = 0; threadpool->n_barrier_passed = 0; threadpool->current_chunk = 0; threadpool->stop = false; threadpool->pause = disposable ? false : tpp->paused; - threadpool->new_work = false; threadpool->workers = NULL; threadpool->n_threads_max = tpp->n_threads; threadpool->n_threads_cur = disposable ? tpp->n_threads : 0; @@ -19398,7 +19386,9 @@ static struct ggml_compute_threadpool * ggml_create_threadpool_impl( .thrd = 0, .mask_specified = tpp->mask_specified, .threadpool = threadpool, - .ith = j + .ith = j, + .last_graph = 0, + .pending = false }; if (tpp->mask_specified) { @@ -19501,12 +19491,12 @@ enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cpl // always take the mutex here because the worker threads are doing hybrid poll/wait ggml_mutex_lock(&threadpool->mutex); - threadpool->new_work = true; - if (!threadpool->pause) { - ggml_cond_broadcast(&threadpool->cond); - } else { + atomic_fetch_add_explicit(&threadpool->n_graph, 1, memory_order_relaxed); + if (threadpool->pause) { // resume does cond broadcast __ggml_resume_threadpool(threadpool); + } else { + ggml_cond_broadcast(&threadpool->cond); } ggml_mutex_unlock(&threadpool->mutex); } From c1491e5263b0c0d010a4b6a99c87973c681cfa80 Mon Sep 17 00:00:00 2001 From: Max Krasnyansky Date: Mon, 12 Aug 2024 22:18:16 -0700 Subject: [PATCH 2/2] threadpool: remove special-casing for disposable threadpools With the efficient hybrid polling there is no need to make disposable pools any different. This simplifies the overall logic and reduces branching. Include n_threads in debug print for disposable threadpool. Declare pause and stop flags as atomic_bool This doesn't actually generate any memory barriers and simply informs the thread sanitizer that these flags can be written & read by different threads without locking. --- ggml/src/ggml.c | 122 +++++++++++++++++++++++------------------------- 1 file changed, 59 insertions(+), 63 deletions(-) diff --git a/ggml/src/ggml.c b/ggml/src/ggml.c index 3e3061279517a..03c4189b11e5a 100644 --- a/ggml/src/ggml.c +++ b/ggml/src/ggml.c @@ -1964,16 +1964,16 @@ struct ggml_compute_threadpool { atomic_int n_barrier_passed; atomic_int current_chunk; // currently processing chunk during Mat_Mul, shared between all the threads. - volatile bool stop; // Used for stopping the threadpool altogether - volatile bool pause; // Used for pausing the threadpool or individual threads + // these are atomic as an annotation for thread-sanitizer + atomic_bool stop; // Used for stopping the threadpool altogether + atomic_bool pause; // Used for pausing the threadpool or individual threads struct ggml_compute_state * workers; // per thread state int32_t n_threads_max; // number of threads in the pool int32_t n_threads_cur; // number of threads used in the current graph - int32_t prio; // Scheduling priority - bool disposable; // Doesn't initialize a conv-var - uint32_t poll; // Polling level (0 - no polling) + int32_t prio; // Scheduling priority + uint32_t poll; // Polling level (0 - no polling) ggml_abort_callback abort_callback; // abort ggml_graph_compute when true void * abort_callback_data; @@ -18939,15 +18939,13 @@ void ggml_release_threadpool(struct ggml_compute_threadpool* threadpool) { struct ggml_compute_state* workers = threadpool->workers; const int32_t n_threads = threadpool->n_threads_max; - if (!threadpool->disposable) { - ggml_mutex_lock(&threadpool->mutex); - } + ggml_mutex_lock(&threadpool->mutex); + threadpool->stop = true; threadpool->pause = false; - if (!threadpool->disposable) { - ggml_cond_broadcast(&threadpool->cond); - ggml_mutex_unlock(&threadpool->mutex); - } + + ggml_cond_broadcast(&threadpool->cond); + ggml_mutex_unlock(&threadpool->mutex); for (int32_t j = 1; j < n_threads; j++) { int32_t rc = ggml_thread_join(workers[j].thrd, NULL); @@ -18957,10 +18955,8 @@ void ggml_release_threadpool(struct ggml_compute_threadpool* threadpool) { GGML_ALIGNED_FREE(workers); - if (!threadpool->disposable) { - ggml_mutex_destroy(&threadpool->mutex); - ggml_cond_destroy(&threadpool->cond); - } + ggml_mutex_destroy(&threadpool->mutex); + ggml_cond_destroy(&threadpool->cond); #endif // GGML_USE_OPENMP GGML_ALIGNED_FREE(threadpool); @@ -18983,7 +18979,6 @@ static void __ggml_resume_threadpool(struct ggml_compute_threadpool * threadpool void ggml_pause_threadpool(struct ggml_compute_threadpool * threadpool) { #ifndef GGML_USE_OPENMP - GGML_ASSERT(!threadpool->disposable); ggml_mutex_lock(&threadpool->mutex); if (!threadpool->pause) { __ggml_pause_threadpool(threadpool); @@ -18996,7 +18991,6 @@ void ggml_pause_threadpool(struct ggml_compute_threadpool * threadpool) { void ggml_resume_threadpool(struct ggml_compute_threadpool * threadpool) { #ifndef GGML_USE_OPENMP - GGML_ASSERT(!threadpool->disposable); ggml_mutex_lock(&threadpool->mutex); if (threadpool->pause) { __ggml_resume_threadpool(threadpool); @@ -19013,7 +19007,7 @@ struct ggml_cplan ggml_graph_plan( struct ggml_compute_threadpool * threadpool) { if (threadpool == NULL) { - GGML_PRINT_DEBUG("Threadpool is not specified. Will create a disposable threadpool\n"); + GGML_PRINT_DEBUG("Threadpool is not specified. Will create a disposable threadpool : n_threads %u\n", n_threads); } if (n_threads <= 0) { n_threads = threadpool ? threadpool->n_threads_max : GGML_DEFAULT_N_THREADS; @@ -19222,7 +19216,8 @@ static thread_ret_t ggml_graph_compute_thread(void * data) { static inline bool ggml_graph_compute_ready(struct ggml_compute_state * state) { struct ggml_compute_threadpool * threadpool = state->threadpool; - if (threadpool->stop || threadpool->pause || state->pending) { return true; } + + if (state->pending || threadpool->stop || threadpool->pause) { return true; } // check for new graph/work int new_graph = atomic_load_explicit(&threadpool->n_graph, memory_order_relaxed); @@ -19271,8 +19266,6 @@ static thread_ret_t ggml_graph_compute_secondary_thread(void* data) { struct ggml_compute_state * state = (struct ggml_compute_state *) data; struct ggml_compute_threadpool * threadpool = state->threadpool; - GGML_ASSERT(!threadpool->disposable); - __thread_priority(threadpool->prio); if (state->mask_specified) __thread_affinity(state->cpumask); @@ -19288,6 +19281,7 @@ static thread_ret_t ggml_graph_compute_secondary_thread(void* data) { GGML_PRINT_DEBUG("thread #%d resuming after wait\n", state->ith); ggml_mutex_unlock_shared(&threadpool->mutex); } + // This needs to be checked for after the cond_wait if (threadpool->stop) break; @@ -19312,6 +19306,25 @@ static thread_ret_t ggml_graph_compute_secondary_thread(void* data) { return (thread_ret_t) 0; } +// Start processing new graph +static void ggml_graph_compute_kickoff(struct ggml_compute_threadpool * threadpool) +{ + // always take the mutex here because the worker threads are doing hybrid poll/wait + + ggml_mutex_lock(&threadpool->mutex); + + atomic_fetch_add_explicit(&threadpool->n_graph, 1, memory_order_relaxed); + + if (threadpool->pause) { + // resume does cond broadcast + __ggml_resume_threadpool(threadpool); + } else { + ggml_cond_broadcast(&threadpool->cond); + } + + ggml_mutex_unlock(&threadpool->mutex); +} + #endif // GGML_USE_OPENMP bool ggml_threadpool_params_match(const struct ggml_threadpool_params * p0, const struct ggml_threadpool_params * p1) { @@ -19329,7 +19342,6 @@ bool ggml_threadpool_params_match(const struct ggml_threadpool_params * p0, cons static struct ggml_compute_threadpool * ggml_create_threadpool_impl( struct ggml_threadpool_params * tpp, - bool disposable, struct ggml_cgraph * cgraph, struct ggml_cplan * cplan) { @@ -19343,11 +19355,10 @@ static struct ggml_compute_threadpool * ggml_create_threadpool_impl( threadpool->n_barrier_passed = 0; threadpool->current_chunk = 0; threadpool->stop = false; - threadpool->pause = disposable ? false : tpp->paused; + threadpool->pause = tpp->paused; threadpool->workers = NULL; threadpool->n_threads_max = tpp->n_threads; - threadpool->n_threads_cur = disposable ? tpp->n_threads : 0; - threadpool->disposable = disposable; + threadpool->n_threads_cur = tpp->n_threads; threadpool->poll = tpp->poll; threadpool->prio = tpp->prio; @@ -19357,10 +19368,8 @@ static struct ggml_compute_threadpool * ggml_create_threadpool_impl( } #ifndef GGML_USE_OPENMP - if (!disposable) { - ggml_mutex_init(&threadpool->mutex); - ggml_cond_init(&threadpool->cond); - } + ggml_mutex_init(&threadpool->mutex); + ggml_cond_init(&threadpool->cond); #endif // GGML_USE_OPENMP struct ggml_compute_state * workers = @@ -19395,14 +19404,12 @@ static struct ggml_compute_threadpool * ggml_create_threadpool_impl( __cpumask_next(tpp->cpumask, workers[j].cpumask, tpp->strict_cpu, &cpumask_iter); } - // Disposable threadpools need to have a valid cplan and cgraph immediately. - thread_ret_t (*thread_entrypoint)(void*) = disposable ? ggml_graph_compute_thread : ggml_graph_compute_secondary_thread; // Spin threads for all secondary workers if (j > 0) { int32_t rc = ggml_thread_create( &workers[j].thrd, NULL, - thread_entrypoint, + ggml_graph_compute_secondary_thread, &workers[j] ); GGML_ASSERT(rc == 0); @@ -19414,7 +19421,7 @@ static struct ggml_compute_threadpool * ggml_create_threadpool_impl( } struct ggml_compute_threadpool * ggml_create_threadpool(struct ggml_threadpool_params * tpp) { - return ggml_create_threadpool_impl(tpp, false, NULL, NULL); + return ggml_create_threadpool_impl(tpp, NULL, NULL); } enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cplan * cplan) { @@ -19428,35 +19435,35 @@ enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cpl bool disposable_threadpool = false; if (threadpool == NULL) { - GGML_PRINT_DEBUG("NOTE: No threadpool was specified in this cplan. Will create a disposable threadpool\n"); + GGML_PRINT_DEBUG("Threadpool is not specified. Will create a disposable threadpool : n_threads %u\n", n_threads); disposable_threadpool = true; struct ggml_threadpool_params ttp = { .mask_specified = false, .n_threads = n_threads, .prio = 0, - .poll = false, + .poll = 1, .strict_cpu = false, .paused = false }; - threadpool = ggml_create_threadpool_impl(&ttp, true, cgraph, cplan); + threadpool = ggml_create_threadpool_impl(&ttp, cgraph, cplan); } else { - if (n_threads > threadpool->n_threads_max) { - GGML_PRINT("WARNING: cplan is requesting more threads than the threadpool contains. Expect a bad time!\n"); - } - // Not a disposable threadpool: - // Reset some of the paramters that need resetting + // Reset some of the parameters that need resetting // No worker threads should be accessing the parameters below at this stage - threadpool->cgraph = cgraph; - threadpool->cplan = cplan; - threadpool->n_threads_cur = n_threads; + threadpool->cgraph = cgraph; + threadpool->cplan = cplan; + threadpool->n_threads_cur = n_threads; threadpool->n_barrier = 0; threadpool->n_barrier_passed = 0; threadpool->current_chunk = 0; threadpool->ec = GGML_STATUS_SUCCESS; } + if (n_threads > threadpool->n_threads_max) { + GGML_PRINT("WARNING: cplan is requesting more threads than the threadpool contains. Expect a bad time!\n"); + } + #ifdef GGML_USE_OPENMP if (n_threads > 1) { #pragma omp parallel num_threads(n_threads) @@ -19482,26 +19489,15 @@ enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cpl ggml_graph_compute_thread(&worker); } #else - if (!disposable_threadpool) { - // Update main thread affinity to match the current threadpool - if (threadpool->workers[0].mask_specified) { - __thread_affinity(threadpool->workers[0].cpumask); - } - - // always take the mutex here because the worker threads are doing hybrid poll/wait - - ggml_mutex_lock(&threadpool->mutex); - atomic_fetch_add_explicit(&threadpool->n_graph, 1, memory_order_relaxed); - if (threadpool->pause) { - // resume does cond broadcast - __ggml_resume_threadpool(threadpool); - } else { - ggml_cond_broadcast(&threadpool->cond); - } - ggml_mutex_unlock(&threadpool->mutex); + // Update main thread affinity to match the current threadpool + if (threadpool->workers[0].mask_specified) { + __thread_affinity(threadpool->workers[0].cpumask); } - // this is a work thread too + // Kick all threads to start the new graph + ggml_graph_compute_kickoff(threadpool); + + // This is a work thread too ggml_graph_compute_thread(&threadpool->workers[0]); #endif