diff --git a/ggml/src/ggml.c b/ggml/src/ggml.c index f0c0f4fb0a29a..03c4189b11e5a 100644 --- a/ggml/src/ggml.c +++ b/ggml/src/ggml.c @@ -1959,21 +1959,21 @@ struct ggml_compute_threadpool { struct ggml_cplan * cplan; // synchronization primitives + atomic_int n_graph; // incremented when there is work to be done (i.e each graph) atomic_int n_barrier; atomic_int n_barrier_passed; atomic_int current_chunk; // currently processing chunk during Mat_Mul, shared between all the threads. - volatile bool stop; // Used for stopping the threadpool altogether - volatile bool pause; // Used for pausing the threadpool or individual threads - volatile bool new_work; // Set when there is work to be done, unset after it's done + // these are atomic as an annotation for thread-sanitizer + atomic_bool stop; // Used for stopping the threadpool altogether + atomic_bool pause; // Used for pausing the threadpool or individual threads struct ggml_compute_state * workers; // per thread state int32_t n_threads_max; // number of threads in the pool int32_t n_threads_cur; // number of threads used in the current graph - int32_t prio; // Scheduling priority - bool disposable; // Doesn't initialize a conv-var - uint32_t poll; // Polling level (0 - no polling) + int32_t prio; // Scheduling priority + uint32_t poll; // Polling level (0 - no polling) ggml_abort_callback abort_callback; // abort ggml_graph_compute when true void * abort_callback_data; @@ -1987,6 +1987,8 @@ struct ggml_compute_state { ggml_thread_t thrd; bool cpumask[GGML_MAX_N_THREADS]; bool mask_specified; + int last_graph; + bool pending; #endif struct ggml_compute_threadpool * threadpool; int ith; @@ -18937,15 +18939,13 @@ void ggml_release_threadpool(struct ggml_compute_threadpool* threadpool) { struct ggml_compute_state* workers = threadpool->workers; const int32_t n_threads = threadpool->n_threads_max; - if (!threadpool->disposable) { - ggml_mutex_lock(&threadpool->mutex); - } + ggml_mutex_lock(&threadpool->mutex); + threadpool->stop = true; threadpool->pause = false; - if (!threadpool->disposable) { - ggml_cond_broadcast(&threadpool->cond); - ggml_mutex_unlock(&threadpool->mutex); - } + + ggml_cond_broadcast(&threadpool->cond); + ggml_mutex_unlock(&threadpool->mutex); for (int32_t j = 1; j < n_threads; j++) { int32_t rc = ggml_thread_join(workers[j].thrd, NULL); @@ -18955,10 +18955,8 @@ void ggml_release_threadpool(struct ggml_compute_threadpool* threadpool) { GGML_ALIGNED_FREE(workers); - if (!threadpool->disposable) { - ggml_mutex_destroy(&threadpool->mutex); - ggml_cond_destroy(&threadpool->cond); - } + ggml_mutex_destroy(&threadpool->mutex); + ggml_cond_destroy(&threadpool->cond); #endif // GGML_USE_OPENMP GGML_ALIGNED_FREE(threadpool); @@ -18981,7 +18979,6 @@ static void __ggml_resume_threadpool(struct ggml_compute_threadpool * threadpool void ggml_pause_threadpool(struct ggml_compute_threadpool * threadpool) { #ifndef GGML_USE_OPENMP - GGML_ASSERT(!threadpool->disposable); ggml_mutex_lock(&threadpool->mutex); if (!threadpool->pause) { __ggml_pause_threadpool(threadpool); @@ -18994,7 +18991,6 @@ void ggml_pause_threadpool(struct ggml_compute_threadpool * threadpool) { void ggml_resume_threadpool(struct ggml_compute_threadpool * threadpool) { #ifndef GGML_USE_OPENMP - GGML_ASSERT(!threadpool->disposable); ggml_mutex_lock(&threadpool->mutex); if (threadpool->pause) { __ggml_resume_threadpool(threadpool); @@ -19011,7 +19007,7 @@ struct ggml_cplan ggml_graph_plan( struct ggml_compute_threadpool * threadpool) { if (threadpool == NULL) { - GGML_PRINT_DEBUG("Threadpool is not specified. Will create a disposable threadpool\n"); + GGML_PRINT_DEBUG("Threadpool is not specified. Will create a disposable threadpool : n_threads %u\n", n_threads); } if (n_threads <= 0) { n_threads = threadpool ? threadpool->n_threads_max : GGML_DEFAULT_N_THREADS; @@ -19197,55 +19193,40 @@ static thread_ret_t ggml_graph_compute_thread(void * data) { /*.threadpool=*/ state->threadpool, }; - struct ggml_tensor * node = cgraph->nodes[0]; + for (int node_n = 0; node_n < cgraph->n_nodes; node_n++) { + struct ggml_tensor * node = cgraph->nodes[node_n]; - ggml_compute_forward(¶ms, node); - if (state->ith == 0 && cplan->abort_callback && cplan->abort_callback(cplan->abort_callback_data)) { - state->threadpool->ec = GGML_STATUS_ABORTED; - } - - for (int node_n = 1; node_n < cgraph->n_nodes; node_n++) { - ggml_barrier(state->threadpool); - - if (state->threadpool->ec != GGML_STATUS_SUCCESS) { - break; - } - - node = cgraph->nodes[node_n]; ggml_compute_forward(¶ms, node); if (state->ith == 0 && cplan->abort_callback && cplan->abort_callback(cplan->abort_callback_data)) { state->threadpool->ec = GGML_STATUS_ABORTED; } - } - if (cgraph->n_nodes == 1) { - // We need a barrier before disabling new_work in case we have a trivial graph ggml_barrier(state->threadpool); - } - if (!state->threadpool->disposable && state->ith == 0) { - // Don't need a lock, because there is a barrier after this, and only after that - // do the secondary threads go into standby - state->threadpool->new_work = false; + if (state->threadpool->ec != GGML_STATUS_SUCCESS) { + break; + } } - ggml_barrier(state->threadpool); - return 0; } #ifndef GGML_USE_OPENMP -static inline bool ggml_graph_compute_got_work(struct ggml_compute_state *state) { - struct ggml_compute_threadpool * threadpool = state->threadpool; - return (threadpool->new_work && state->ith < threadpool->n_threads_cur); -} - static inline bool ggml_graph_compute_ready(struct ggml_compute_state * state) { struct ggml_compute_threadpool * threadpool = state->threadpool; - if (threadpool->stop || threadpool->pause) return true; - return ggml_graph_compute_got_work(state); + + if (state->pending || threadpool->stop || threadpool->pause) { return true; } + + // check for new graph/work + int new_graph = atomic_load_explicit(&threadpool->n_graph, memory_order_relaxed); + if (new_graph != state->last_graph) { + state->pending = (state->ith < threadpool->n_threads_cur); + state->last_graph = new_graph; + } + + return state->pending; } static inline bool ggml_graph_compute_poll_for_work(struct ggml_compute_state * state) { @@ -19260,14 +19241,14 @@ static inline bool ggml_graph_compute_poll_for_work(struct ggml_compute_state * __cpu_relax(); } - return ggml_graph_compute_got_work(state); + return state->pending; } -static bool ggml_graph_compute_check_for_work(struct ggml_compute_state * state) { +static inline bool ggml_graph_compute_check_for_work(struct ggml_compute_state * state) { struct ggml_compute_threadpool * threadpool = state->threadpool; if (ggml_graph_compute_poll_for_work(state)) { - return ggml_graph_compute_got_work(state); + return state->pending; } ggml_mutex_lock_shared(&threadpool->mutex); @@ -19278,15 +19259,13 @@ static bool ggml_graph_compute_check_for_work(struct ggml_compute_state * state) } ggml_mutex_unlock_shared(&threadpool->mutex); - return ggml_graph_compute_got_work(state); + return state->pending; } static thread_ret_t ggml_graph_compute_secondary_thread(void* data) { struct ggml_compute_state * state = (struct ggml_compute_state *) data; struct ggml_compute_threadpool * threadpool = state->threadpool; - GGML_ASSERT(!threadpool->disposable); - __thread_priority(threadpool->prio); if (state->mask_specified) __thread_affinity(state->cpumask); @@ -19302,14 +19281,17 @@ static thread_ret_t ggml_graph_compute_secondary_thread(void* data) { GGML_PRINT_DEBUG("thread #%d resuming after wait\n", state->ith); ggml_mutex_unlock_shared(&threadpool->mutex); } + // This needs to be checked for after the cond_wait if (threadpool->stop) break; // Check if there is new work // The main thread is the only one that can dispatch new work - bool new_work = ggml_graph_compute_check_for_work(state); - if (new_work) { + ggml_graph_compute_check_for_work(state); + if (state->pending) { + state->pending = false; + int64_t ret = (int64_t) ggml_graph_compute_thread(state); if (ret == GGML_EXIT_ABORTED) return (thread_ret_t) ret; @@ -19324,6 +19306,25 @@ static thread_ret_t ggml_graph_compute_secondary_thread(void* data) { return (thread_ret_t) 0; } +// Start processing new graph +static void ggml_graph_compute_kickoff(struct ggml_compute_threadpool * threadpool) +{ + // always take the mutex here because the worker threads are doing hybrid poll/wait + + ggml_mutex_lock(&threadpool->mutex); + + atomic_fetch_add_explicit(&threadpool->n_graph, 1, memory_order_relaxed); + + if (threadpool->pause) { + // resume does cond broadcast + __ggml_resume_threadpool(threadpool); + } else { + ggml_cond_broadcast(&threadpool->cond); + } + + ggml_mutex_unlock(&threadpool->mutex); +} + #endif // GGML_USE_OPENMP bool ggml_threadpool_params_match(const struct ggml_threadpool_params * p0, const struct ggml_threadpool_params * p1) { @@ -19341,7 +19342,6 @@ bool ggml_threadpool_params_match(const struct ggml_threadpool_params * p0, cons static struct ggml_compute_threadpool * ggml_create_threadpool_impl( struct ggml_threadpool_params * tpp, - bool disposable, struct ggml_cgraph * cgraph, struct ggml_cplan * cplan) { @@ -19350,16 +19350,15 @@ static struct ggml_compute_threadpool * ggml_create_threadpool_impl( { threadpool->cgraph = cgraph; threadpool->cplan = cplan; + threadpool->n_graph = 0; threadpool->n_barrier = 0; threadpool->n_barrier_passed = 0; threadpool->current_chunk = 0; threadpool->stop = false; - threadpool->pause = disposable ? false : tpp->paused; - threadpool->new_work = false; + threadpool->pause = tpp->paused; threadpool->workers = NULL; threadpool->n_threads_max = tpp->n_threads; - threadpool->n_threads_cur = disposable ? tpp->n_threads : 0; - threadpool->disposable = disposable; + threadpool->n_threads_cur = tpp->n_threads; threadpool->poll = tpp->poll; threadpool->prio = tpp->prio; @@ -19369,10 +19368,8 @@ static struct ggml_compute_threadpool * ggml_create_threadpool_impl( } #ifndef GGML_USE_OPENMP - if (!disposable) { - ggml_mutex_init(&threadpool->mutex); - ggml_cond_init(&threadpool->cond); - } + ggml_mutex_init(&threadpool->mutex); + ggml_cond_init(&threadpool->cond); #endif // GGML_USE_OPENMP struct ggml_compute_state * workers = @@ -19398,21 +19395,21 @@ static struct ggml_compute_threadpool * ggml_create_threadpool_impl( .thrd = 0, .mask_specified = tpp->mask_specified, .threadpool = threadpool, - .ith = j + .ith = j, + .last_graph = 0, + .pending = false }; if (tpp->mask_specified) { __cpumask_next(tpp->cpumask, workers[j].cpumask, tpp->strict_cpu, &cpumask_iter); } - // Disposable threadpools need to have a valid cplan and cgraph immediately. - thread_ret_t (*thread_entrypoint)(void*) = disposable ? ggml_graph_compute_thread : ggml_graph_compute_secondary_thread; // Spin threads for all secondary workers if (j > 0) { int32_t rc = ggml_thread_create( &workers[j].thrd, NULL, - thread_entrypoint, + ggml_graph_compute_secondary_thread, &workers[j] ); GGML_ASSERT(rc == 0); @@ -19424,7 +19421,7 @@ static struct ggml_compute_threadpool * ggml_create_threadpool_impl( } struct ggml_compute_threadpool * ggml_create_threadpool(struct ggml_threadpool_params * tpp) { - return ggml_create_threadpool_impl(tpp, false, NULL, NULL); + return ggml_create_threadpool_impl(tpp, NULL, NULL); } enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cplan * cplan) { @@ -19438,35 +19435,35 @@ enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cpl bool disposable_threadpool = false; if (threadpool == NULL) { - GGML_PRINT_DEBUG("NOTE: No threadpool was specified in this cplan. Will create a disposable threadpool\n"); + GGML_PRINT_DEBUG("Threadpool is not specified. Will create a disposable threadpool : n_threads %u\n", n_threads); disposable_threadpool = true; struct ggml_threadpool_params ttp = { .mask_specified = false, .n_threads = n_threads, .prio = 0, - .poll = false, + .poll = 1, .strict_cpu = false, .paused = false }; - threadpool = ggml_create_threadpool_impl(&ttp, true, cgraph, cplan); + threadpool = ggml_create_threadpool_impl(&ttp, cgraph, cplan); } else { - if (n_threads > threadpool->n_threads_max) { - GGML_PRINT("WARNING: cplan is requesting more threads than the threadpool contains. Expect a bad time!\n"); - } - // Not a disposable threadpool: - // Reset some of the paramters that need resetting + // Reset some of the parameters that need resetting // No worker threads should be accessing the parameters below at this stage - threadpool->cgraph = cgraph; - threadpool->cplan = cplan; - threadpool->n_threads_cur = n_threads; + threadpool->cgraph = cgraph; + threadpool->cplan = cplan; + threadpool->n_threads_cur = n_threads; threadpool->n_barrier = 0; threadpool->n_barrier_passed = 0; threadpool->current_chunk = 0; threadpool->ec = GGML_STATUS_SUCCESS; } + if (n_threads > threadpool->n_threads_max) { + GGML_PRINT("WARNING: cplan is requesting more threads than the threadpool contains. Expect a bad time!\n"); + } + #ifdef GGML_USE_OPENMP if (n_threads > 1) { #pragma omp parallel num_threads(n_threads) @@ -19492,26 +19489,15 @@ enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cpl ggml_graph_compute_thread(&worker); } #else - if (!disposable_threadpool) { - // Update main thread affinity to match the current threadpool - if (threadpool->workers[0].mask_specified) { - __thread_affinity(threadpool->workers[0].cpumask); - } - - // always take the mutex here because the worker threads are doing hybrid poll/wait - - ggml_mutex_lock(&threadpool->mutex); - threadpool->new_work = true; - if (!threadpool->pause) { - ggml_cond_broadcast(&threadpool->cond); - } else { - // resume does cond broadcast - __ggml_resume_threadpool(threadpool); - } - ggml_mutex_unlock(&threadpool->mutex); + // Update main thread affinity to match the current threadpool + if (threadpool->workers[0].mask_specified) { + __thread_affinity(threadpool->workers[0].cpumask); } - // this is a work thread too + // Kick all threads to start the new graph + ggml_graph_compute_kickoff(threadpool); + + // This is a work thread too ggml_graph_compute_thread(&threadpool->workers[0]); #endif