diff --git a/.clang-format b/.clang-format index 15538430b..5ad2cd768 100644 --- a/.clang-format +++ b/.clang-format @@ -2,7 +2,8 @@ BasedOnStyle: Chromium BreakBeforeBraces: Custom BraceWrapping: AfterFunction: true - AfterStruct: true + AfterStruct: false +Cpp11BracedListStyle: false IndentWidth: 8 UseTab: ForContinuationAndIndentation PointerAlignment: Right diff --git a/Makefile.am b/Makefile.am index 091cd15bb..53b5abd46 100644 --- a/Makefile.am +++ b/Makefile.am @@ -44,6 +44,7 @@ basic_dqlite_sources = \ src/lib/buffer.c \ src/lib/fs.c \ src/lib/sm.c \ + src/lib/threadpool.c \ src/lib/transport.c \ src/logger.c \ src/message.c \ @@ -149,6 +150,7 @@ unit_test_SOURCES += \ test/test_error.c \ test/test_integration.c \ test/unit/ext/test_uv.c \ + test/unit/ext/test_uv_pool.c \ test/unit/lib/test_addr.c \ test/unit/lib/test_buffer.c \ test/unit/lib/test_byte.c \ diff --git a/src/lib/queue.h b/src/lib/queue.h index 7b903dbdf..bbaffe802 100644 --- a/src/lib/queue.h +++ b/src/lib/queue.h @@ -37,6 +37,19 @@ typedef void *queue[2]; QUEUE__PREV(q) = (e); \ } +#define QUEUE__INSERT_TAIL(q, e) QUEUE__PUSH(q, e) + +/** + * Insert an element at the front of a queue. + */ +#define QUEUE__INSERT_HEAD(h, q) \ + { \ + QUEUE__NEXT(q) = QUEUE__NEXT(h); \ + QUEUE__PREV(q) = (h); \ + QUEUE__NEXT_PREV(q) = (q); \ + QUEUE__NEXT(h) = (q); \ + } + /** * Remove the given element from the queue. Any element can be removed at any * time. @@ -47,6 +60,25 @@ typedef void *queue[2]; QUEUE__NEXT_PREV(e) = QUEUE__PREV(e); \ } +/** + * Moves elements from queue @h to queue @n + * Note: Removed QUEUE__SPLIT() and merged it into QUEUE__MOVE(). + */ +#define QUEUE__MOVE(h, n) \ + { \ + if (QUEUE__IS_EMPTY(h)) { \ + QUEUE__INIT(n); \ + } else { \ + queue *__q = QUEUE__HEAD(h); \ + QUEUE__PREV(n) = QUEUE__PREV(h); \ + QUEUE__PREV_NEXT(n) = (n); \ + QUEUE__NEXT(n) = (__q); \ + QUEUE__PREV(h) = QUEUE__PREV(__q); \ + QUEUE__PREV_NEXT(h) = (h); \ + QUEUE__PREV(__q) = (n); \ + } \ + } + /** * Return the element at the front of the queue. */ diff --git a/src/lib/threadpool.c b/src/lib/threadpool.c new file mode 100644 index 000000000..9498a6985 --- /dev/null +++ b/src/lib/threadpool.c @@ -0,0 +1,554 @@ +#include "threadpool.h" +#include +#include +#include +#include +#include +#include "../../src/lib/queue.h" +#include "../../src/lib/sm.h" +#include "../../src/utils.h" + +/** + * Planner thread state machine. + * + * signal() && + * empty(o) && signal() && exiting + * empty(u) && +-----> NOTHING ----------------> EXITED + * !exiting +------- ^ | + * | | + * empty(o) && | | signal() + * empty(u) | | !empty(o) || !empty(u) + * | | + * | | + * | V + * !empty(o) && +-----> DRAINING + * !empty(u) && +------- ^ | + * type(head(o)) != BAR | | + * | | type(head(o)) == BAR + * ord_in_flight == 0 | | + * | V + * BARRIER --------+ signal() + * ^ | <-------+ ord_in_flight == 0 + * | | + * empty(u) | | !empty(u) + * | V + * DRAINING_UNORD + */ + +enum planner_states { + PS_NOTHING, + PS_DRAINING, + PS_BARRIER, + PS_DRAINING_UNORD, + PS_EXITED, + PS_NR, +}; + +static const struct sm_conf planner_states[PS_NR] = { + [PS_NOTHING] = { + .flags = SM_INITIAL, + .name = "nothing", + .allowed = BITS(PS_DRAINING) | BITS(PS_EXITED), + }, + [PS_DRAINING] = { + .name = "draining", + .allowed = BITS(PS_DRAINING) + | BITS(PS_NOTHING) + | BITS(PS_BARRIER), + }, + [PS_BARRIER] = { + .name = "barrier", + .allowed = BITS(PS_DRAINING_UNORD) + | BITS(PS_DRAINING) + | BITS(PS_BARRIER), + }, + [PS_DRAINING_UNORD] = { + .name = "draining-unord", + .allowed = BITS(PS_BARRIER) + }, + [PS_EXITED] = { + .flags = SM_FINAL, + .name = "exited", + .allowed = 0, + }, +}; + +enum { + THREADPOOL_SIZE_MAX = 1024, +}; + +typedef struct pool_thread pool_thread_t; +typedef struct pool_impl pool_impl_t; + +struct targs { + pool_impl_t *pi; + uv_sem_t *sem; + uint32_t idx; /* Thread's index */ +}; + +/* Worker thread of the pool */ +struct pool_thread { + queue inq; /* Thread's input queue */ + uv_cond_t cond; /* Signalled when work item appears in @inq */ + uv_thread_t thread; /* Pool's worker thread */ + struct targs arg; +}; + +struct pool_impl { + uv_mutex_t mutex; /* Input queue, planner_sm, + worker and planner threads lock */ + uint32_t threads_nr; + pool_thread_t *threads; + + queue outq; /* Output queue used by libuv part */ + uv_mutex_t outq_mutex; /* Output queue lock */ + uv_async_t outq_async; /* Signalled when output queue is not + empty and libuv loop has to process + items from it */ + uint64_t active_ws; /* Number of all work items in flight, + accessed from the main thread only */ + + queue ordered; /* Queue of WT_ORD{N} items */ + queue unordered; /* Queue of WT_UNORD items */ + struct sm planner_sm; /* State machine of the scheduler */ + uv_cond_t planner_cond; + uv_thread_t planner_thread; /* Scheduler's thread */ + + uint32_t ord_in_flight; /* Number of WT_ORD{N} in flight */ + bool exiting; /* True when the pool is being stopped */ + enum pool_work_type /* Type of the previous work item, */ + ord_prev; /* used in WT_ORD{N} ivariants */ + uint32_t qos; /* QoS token */ + uint32_t qos_prio; /* QoS prio */ +}; + +static inline bool has_active_ws(pool_t *pool) +{ + return pool->pi->active_ws > 0; +} + +static inline void w_register(pool_t *pool, pool_work_t *w) +{ + if (w->type != WT_BAR) { + pool->pi->active_ws++; + } +} + +static inline void w_unregister(pool_t *pool, pool_work_t *w) +{ + (void)w; + PRE(has_active_ws(pool)); + pool->pi->active_ws--; +} + +static bool empty(const queue *q) +{ + return QUEUE__IS_EMPTY(q); +} + +static queue *head(const queue *q) +{ + return QUEUE__HEAD(q); +} + +static void push(queue *to, queue *what) +{ + QUEUE__INSERT_TAIL(to, what); +} + +static queue *pop(queue *from) +{ + queue *q = QUEUE__HEAD(from); + PRE(q != NULL); + QUEUE__REMOVE(q); + QUEUE__INIT(q); + return q; +} + +static queue *qos_pop(pool_impl_t *pi, queue *first, queue *second) +{ + PRE(!empty(first) || !empty(second)); + + if (empty(first)) { + return pop(second); + } else if (empty(second)) { + return pop(first); + } + + return pop(pi->qos++ % pi->qos_prio ? first : second); +} + +static pool_work_t *q_to_w(const queue *q) +{ + return QUEUE__DATA(q, pool_work_t, link); +} + +static enum pool_work_type q_type(const queue *q) +{ + return q_to_w(q)->type; +} + +static uint32_t q_tid(const queue *q) +{ + return q_to_w(q)->thread_id; +} + +static bool planner_invariant(const struct sm *m, int prev_state) +{ + pool_impl_t *pi = CONTAINER_OF(m, pool_impl_t, planner_sm); + queue *o = &pi->ordered; + queue *u = &pi->unordered; + + return ERGO(sm_state(m) == PS_NOTHING, empty(o) && empty(u)) && + ERGO(sm_state(m) == PS_DRAINING, + ERGO(prev_state == PS_BARRIER, + pi->ord_in_flight == 0 && empty(u)) && + ERGO(prev_state == PS_NOTHING, + !empty(u) || !empty(o))) && + ERGO(sm_state(m) == PS_EXITED, + pi->exiting && empty(o) && empty(u)) && + ERGO( + sm_state(m) == PS_BARRIER, + ERGO(prev_state == PS_DRAINING, q_type(head(o)) == WT_BAR) && + ERGO(prev_state == PS_DRAINING_UNORD, empty(u))) && + ERGO(sm_state(m) == PS_DRAINING_UNORD, !empty(u)); +} + +static void planner(void *arg) +{ + struct targs *ta = arg; + uv_sem_t *sem = ta->sem; + pool_impl_t *pi = ta->pi; + uv_mutex_t *mutex = &pi->mutex; + pool_thread_t *ts = pi->threads; + struct sm *planner_sm = &pi->planner_sm; + queue *o = &pi->ordered; + queue *u = &pi->unordered; + queue *q; + + sm_init(planner_sm, planner_invariant, NULL, planner_states, + PS_NOTHING); + uv_sem_post(sem); + uv_mutex_lock(mutex); + for (;;) { + switch (sm_state(planner_sm)) { + case PS_NOTHING: + while (empty(o) && empty(u) && !pi->exiting) { + uv_cond_wait(&pi->planner_cond, mutex); + } + sm_move(planner_sm, + pi->exiting ? PS_EXITED : PS_DRAINING); + break; + case PS_DRAINING: + while (!(empty(o) && empty(u))) { + sm_move(planner_sm, PS_DRAINING); + if (!empty(o) && + q_type(head(o)) == WT_BAR) { + sm_move(planner_sm, PS_BARRIER); + goto ps_barrier; + } + q = qos_pop(pi, o, u); + push(&ts[q_tid(q)].inq, q); + uv_cond_signal(&ts[q_tid(q)].cond); + if (q_type(q) >= WT_ORD1) { + pi->ord_in_flight++; + } + } + sm_move(planner_sm, PS_NOTHING); + ps_barrier: + break; + case PS_BARRIER: + if (!empty(u)) { + sm_move(planner_sm, PS_DRAINING_UNORD); + break; + } + if (pi->ord_in_flight == 0) { + q = pop(o); + PRE(q_to_w(q)->type == WT_BAR); + free(q_to_w(q)); + sm_move(planner_sm, PS_DRAINING); + break; + } + uv_cond_wait(&pi->planner_cond, mutex); + sm_move(planner_sm, PS_BARRIER); + break; + case PS_DRAINING_UNORD: + while (!empty(u)) { + q = pop(u); + push(&ts[q_tid(q)].inq, q); + uv_cond_signal(&ts[q_tid(q)].cond); + } + sm_move(planner_sm, PS_BARRIER); + break; + case PS_EXITED: + sm_fini(planner_sm); + uv_mutex_unlock(mutex); + return; + default: + POST(false && "Impossible!"); + } + } +} + +static void queue_work(pool_work_t *w) +{ + w->work_cb(w); +} + +static void queue_done(pool_work_t *w) +{ + w_unregister(w->pool, w); + if (w->after_work_cb != NULL) { + w->after_work_cb(w); + } +} + +static void worker(void *arg) +{ + struct targs *ta = arg; + pool_impl_t *pi = ta->pi; + uv_mutex_t *mutex = &pi->mutex; + pool_thread_t *ts = pi->threads; + enum pool_work_type wtype; + pool_work_t *w; + queue *q; + + uv_sem_post(ta->sem); + uv_mutex_lock(mutex); + for (;;) { + while (empty(&ts[ta->idx].inq)) { + if (pi->exiting) { + uv_mutex_unlock(mutex); + return; + } + uv_cond_wait(&ts[ta->idx].cond, mutex); + } + + q = pop(&ts[ta->idx].inq); + uv_mutex_unlock(mutex); + + w = q_to_w(q); + wtype = w->type; + queue_work(w); + + uv_mutex_lock(&pi->outq_mutex); + push(&pi->outq, &w->link); + uv_async_send(&pi->outq_async); + uv_mutex_unlock(&pi->outq_mutex); + + uv_mutex_lock(mutex); + if (wtype > WT_BAR) { + assert(pi->ord_in_flight > 0); + if (--pi->ord_in_flight == 0) { + uv_cond_signal(&pi->planner_cond); + } + } + } +} + +static void pool_cleanup(pool_t *pool) +{ + pool_impl_t *pi = pool->pi; + pool_thread_t *ts = pi->threads; + uint32_t i; + + if (pi->threads_nr == 0) { + return; + } + + pi->exiting = true; + uv_cond_signal(&pi->planner_cond); + + if (uv_thread_join(&pi->planner_thread)) { + abort(); + } + uv_cond_destroy(&pi->planner_cond); + POST(empty(&pi->ordered) && empty(&pi->unordered)); + + for (i = 0; i < pi->threads_nr; i++) { + uv_cond_signal(&ts[i].cond); + if (uv_thread_join(&ts[i].thread)) { + abort(); + } + POST(empty(&ts[i].inq)); + uv_cond_destroy(&ts[i].cond); + } + + free(pi->threads); + uv_mutex_destroy(&pi->mutex); + pi->threads_nr = 0; +} + +static void pool_threads_init(pool_t *pool) +{ + uint32_t i; + uv_sem_t sem; + pool_impl_t *pi = pool->pi; + pool_thread_t *ts; + struct targs pa = { + .sem = &sem, + .pi = pi, + }; + uv_thread_options_t config = { + .flags = UV_THREAD_HAS_STACK_SIZE, + .stack_size = 8u << 20, + }; + + if (uv_mutex_init(&pi->mutex)) { + abort(); + } + if (uv_sem_init(&sem, 0)) { + abort(); + } + + pi->threads = calloc(pi->threads_nr, sizeof(pi->threads[0])); + if (pi->threads == NULL) { + abort(); + } + + for (i = 0, ts = pi->threads; i < pi->threads_nr; i++) { + ts[i].arg = (struct targs){ + .pi = pi, + .sem = &sem, + .idx = i, + }; + + QUEUE__INIT(&ts[i].inq); + if (uv_cond_init(&ts[i].cond)) { + abort(); + } + if (uv_thread_create_ex(&ts[i].thread, &config, worker, + &ts[i].arg)) { + abort(); + } + } + + if (uv_cond_init(&pi->planner_cond)) { + abort(); + } + if (uv_thread_create_ex(&pi->planner_thread, &config, planner, &pa)) { + abort(); + } + for (i = 0; i < pi->threads_nr + 1 /* +planner */; i++) { + uv_sem_wait(&sem); + } + + uv_sem_destroy(&sem); +} + +static void pool_work_submit(pool_t *pool, pool_work_t *w) +{ + pool_impl_t *pi = pool->pi; + queue *o = &pi->ordered; + queue *u = &pi->unordered; + + if (w->type > WT_UNORD) { + /* Make sure that elements in the ordered queue come in order. + */ + PRE(ERGO(pi->ord_prev != WT_BAR && w->type != WT_BAR, + pi->ord_prev == w->type)); + pi->ord_prev = w->type; + } + + uv_mutex_lock(&pi->mutex); + push(w->type == WT_UNORD ? u : o, &w->link); + uv_cond_signal(&pi->planner_cond); + uv_mutex_unlock(&pi->mutex); +} + +void work_done(uv_async_t *handle) +{ + queue q = {}; + pool_impl_t *pi = CONTAINER_OF(handle, pool_impl_t, outq_async); + + uv_mutex_lock(&pi->outq_mutex); + QUEUE__MOVE(&pi->outq, &q); + uv_mutex_unlock(&pi->outq_mutex); + + while (!empty(&q)) { + queue_done(q_to_w(pop(&q))); + } +} + +void pool_queue_work(pool_t *pool, + pool_work_t *w, + uint32_t cookie, + enum pool_work_type type, + void (*work_cb)(pool_work_t *w), + void (*after_work_cb)(pool_work_t *w)) +{ + PRE(work_cb != NULL && type < WT_NR); + + *w = (pool_work_t){ + .pool = pool, + .type = type, + .thread_id = cookie % pool->pi->threads_nr, + .work_cb = work_cb, + .after_work_cb = after_work_cb, + }; + w_register(pool, w); + pool_work_submit(pool, w); +} + +int pool_init(pool_t *pool, + uv_loop_t *loop, + uint32_t threads_nr, + uint32_t qos_prio) +{ + int rc; + pool_impl_t *pi = pool->pi; + + PRE(threads_nr <= THREADPOOL_SIZE_MAX); + + pi = pool->pi = calloc(1, sizeof(*pool->pi)); + if (pi == NULL) { + return UV_ENOMEM; + } + + *pi = (pool_impl_t){ + .qos = 0, + .qos_prio = qos_prio, + .exiting = false, + .ord_prev = WT_BAR, + .threads_nr = threads_nr, + .ord_in_flight = 0, + }; + QUEUE__INIT(&pi->outq); + QUEUE__INIT(&pi->ordered); + QUEUE__INIT(&pi->unordered); + + rc = uv_mutex_init(&pi->outq_mutex); + if (rc != 0) { + free(pi); + return rc; + } + + rc = uv_async_init(loop, &pi->outq_async, work_done); + if (rc != 0) { + free(pi); + uv_mutex_destroy(&pi->outq_mutex); + return rc; + } + + pool_threads_init(pool); + return 0; +} + +void pool_fini(pool_t *pool) +{ + pool_impl_t *pi = pool->pi; + + pool_cleanup(pool); + + uv_mutex_lock(&pi->outq_mutex); + POST(empty(&pi->outq) && !has_active_ws(pool)); + uv_mutex_unlock(&pi->outq_mutex); + + uv_mutex_destroy(&pi->outq_mutex); + free(pi); +} + +void pool_close(pool_t *pool) +{ + uv_close((uv_handle_t *)&pool->pi->outq_async, NULL); +} diff --git a/src/lib/threadpool.h b/src/lib/threadpool.h new file mode 100644 index 000000000..96e8c1e10 --- /dev/null +++ b/src/lib/threadpool.h @@ -0,0 +1,92 @@ +#ifndef __THREAD_POOL__ +#define __THREAD_POOL__ + +#include +#include "queue.h" + +/** + Thread pool + + - Use-cases: + + - Move sqlite3-, IO- related blocking operations from libuv + loop's thread to pool's threads in order to unblock serving + incoming dqlite requests during sqlite3 IO. + Multiple sqlite3_step()-s can be in flight and executed + concurrently, while thread's loop is not IO blocked. + + - Introduced pool's work item thread affinity to serve sqlite3- + related items of each database in a "dedicated" thread which + allows not to make any assumption on sqlite3 threading model. + @see https://www.sqlite.org/threadsafe.html + + - The pool supports servicing of the following types of work items: + + - WT_UNORD - items, which can be processed by the pool in any + order, concurrency assumptions of this type of work are + guaranteed by other layers of the application. Read and write + transactions executed by sqlite3_step() are good examples for + such work item type. + + - WT_ORD_N - items, which can NOT be processed by the pool in + any order. The pool's logic shall guarantee that servicing + all WT_ORD_{N}s happens before WT_ORD_{N + 1}s. WT_ORD_{N}s + and WT_ORD_{N + 1}s operations can't be put into the pool + interleaved. Sqlite3 checkpoints is an example of WT_ORD_{N} + and InstallSnapshot(CP(), MV()) is an example of WT_ORD_{N + 1}. + + - WT_BAR - special purpose item, barrier. Delimits WT_ORD_{N}s + from WT_ORD_{N + 1}s. + + - The pool supports servicing of work items with a given quality + of service (QoS) considerations. For example, the priority of + serving read/write sqlite3 transactions (WT_UNORD) can be set + higher then snapshot installation (WT_ORD{N}). + */ + +struct pool_impl; +typedef struct pool_s pool_t; +typedef struct pool_work_s pool_work_t; + +enum pool_work_type { + WT_UNORD, + WT_BAR, + WT_ORD1, + WT_ORD2, + WT_NR, +}; + +struct pool_work_s +{ + queue link; /* Link into ordered, unordered and outq */ + uint32_t thread_id; /* Identifier of the thread the item is affined */ + pool_t *pool; /* The pool, item is being associated with */ + enum pool_work_type type; + + void (*work_cb)(pool_work_t *w); + void (*after_work_cb)(pool_work_t *w); +}; + +struct pool_s +{ + struct pool_impl *pi; +}; + +enum { + POOL_QOS_PRIO_FAIR = 2, +}; + +int pool_init(pool_t *pool, + uv_loop_t *loop, + uint32_t threads_nr, + uint32_t qos_prio); +void pool_fini(pool_t *pool); +void pool_close(pool_t *pool); +void pool_queue_work(pool_t *pool, + pool_work_t *w, + uint32_t cookie, + enum pool_work_type type, + void (*work_cb)(pool_work_t *w), + void (*after_work_cb)(pool_work_t *w)); + +#endif /* __THREAD_POOL__ */ diff --git a/test/unit/ext/test_uv_pool.c b/test/unit/ext/test_uv_pool.c new file mode 100644 index 000000000..5f2f82a75 --- /dev/null +++ b/test/unit/ext/test_uv_pool.c @@ -0,0 +1,113 @@ +#include "../../../src/lib/threadpool.h" +#include "../../../src/utils.h" +#include "../../lib/runner.h" +#include "../../lib/uv.h" + +TEST_MODULE(ext_uv_pool); + +/****************************************************************************** + * + * threadpool + * + ******************************************************************************/ + +enum { WORK_ITEMS_NR = 50000 }; + +struct fixture { + pool_work_t w; + uv_loop_t loop; + pool_t pool; +}; + +static void loop_setup(struct fixture *f) +{ + int rc; + + rc = uv_loop_init(&f->loop); + munit_assert_int(rc, ==, 0); + + rc = pool_init(&f->pool, &f->loop, 4, POOL_QOS_PRIO_FAIR); + munit_assert_int(rc, ==, 0); +} + +static void bottom_work_cb(pool_work_t *w) +{ + (void)w; +} + +static void bottom_after_work_cb(pool_work_t *w) +{ + static int count = 0; + + if (count == WORK_ITEMS_NR) + pool_close(w->pool); + + count++; + assert(w->type != WT_BAR); + free(w); +} + +static void after_work_cb(pool_work_t *w) +{ + enum pool_work_type pwt; + pool_work_t *work; + unsigned int wt; + unsigned int i; + + for (i = 0; i <= WORK_ITEMS_NR + 1 /* +WT_BAR */; i++) { + work = malloc(sizeof(*work)); + + if (i < WORK_ITEMS_NR / 2) + wt = WT_ORD1; + else if (i == WORK_ITEMS_NR / 2) + wt = WT_BAR; + else + wt = WT_ORD2; + + pwt = i % 2 == 0 ? wt : WT_UNORD; + pool_queue_work(w->pool, work, i, pwt, bottom_work_cb, + bottom_after_work_cb); + } +} + +static void work_cb(pool_work_t *w) +{ + (void)w; +} + +static void threadpool_tear_down(void *data) +{ + int rc; + struct fixture *f = data; + + pool_fini(&f->pool); + rc = uv_loop_close(&f->loop); + munit_assert_int(rc, ==, 0); + free(f); +} + +static void *threadpool_setup(const MunitParameter params[], void *user_data) +{ + (void)params; + (void)user_data; + struct fixture *f = calloc(1, sizeof *f); + loop_setup(f); + return f; +} + +TEST_SUITE(threadpool); +TEST_SETUP(threadpool, threadpool_setup); +TEST_TEAR_DOWN(threadpool, threadpool_tear_down); +TEST_CASE(threadpool, sync, NULL) +{ + (void)params; + struct fixture *f = data; + int rc; + + pool_queue_work(&f->pool, &f->w, 0, WT_UNORD, work_cb, after_work_cb); + + rc = uv_run(&f->loop, UV_RUN_DEFAULT); + munit_assert_int(rc, ==, 0); + + return MUNIT_OK; +}