From 90891b4232e91dbd7a2e2077e4d23d16a374b41d Mon Sep 17 00:00:00 2001 From: Anna Henningsen Date: Fri, 25 May 2018 22:40:58 +0200 Subject: [PATCH] unix,win: limit concurrent DNS calls to nthreads/2 If `nthreads / 2` (rounded up) DNS calls are outstanding, queue more work of that kind instead of letting it take over more positions in the thread pool, blocking other work such as the (usually much faster) file system I/O or user-scheduled work. Fixes: https://github.com/nodejs/node/issues/8436 PR-URL: https://github.com/libuv/libuv/pull/1845 Reviewed-By: Ben Noordhuis Reviewed-By: Santiago Gimeno --- src/threadpool.c | 86 +++++++++++++++++++++++++++++++++++------- src/unix/fs.c | 6 ++- src/unix/getaddrinfo.c | 1 + src/unix/getnameinfo.c | 1 + src/uv-common.h | 7 ++++ src/win/fs.c | 6 ++- src/win/getaddrinfo.c | 1 + src/win/getnameinfo.c | 1 + 8 files changed, 94 insertions(+), 15 deletions(-) diff --git a/src/threadpool.c b/src/threadpool.c index 413d1c204c2..4875f27beb9 100644 --- a/src/threadpool.c +++ b/src/threadpool.c @@ -33,12 +33,18 @@ static uv_once_t once = UV_ONCE_INIT; static uv_cond_t cond; static uv_mutex_t mutex; static unsigned int idle_threads; +static unsigned int slow_io_work_running; static unsigned int nthreads; static uv_thread_t* threads; static uv_thread_t default_threads[4]; static QUEUE exit_message; static QUEUE wq; +static QUEUE run_slow_work_message; +static QUEUE slow_io_pending_wq; +static unsigned int slow_work_thread_threshold(void) { + return (nthreads + 1) / 2; +} static void uv__cancelled(struct uv__work* w) { abort(); @@ -51,6 +57,7 @@ static void uv__cancelled(struct uv__work* w) { static void worker(void* arg) { struct uv__work* w; QUEUE* q; + int is_slow_work; uv_sem_post((uv_sem_t*) arg); arg = NULL; @@ -58,31 +65,65 @@ static void worker(void* arg) { for (;;) { uv_mutex_lock(&mutex); - while (QUEUE_EMPTY(&wq)) { + wait_for_work: + /* Keep waiting while either no work is present or only slow I/O + and we're at the threshold for that. */ + while (QUEUE_EMPTY(&wq) || + (QUEUE_HEAD(&wq) == &run_slow_work_message && + QUEUE_NEXT(&run_slow_work_message) == &wq && + slow_io_work_running >= slow_work_thread_threshold())) { idle_threads += 1; uv_cond_wait(&cond, &mutex); idle_threads -= 1; } q = QUEUE_HEAD(&wq); - - if (q == &exit_message) + if (q == &exit_message) { uv_cond_signal(&cond); - else { + uv_mutex_unlock(&mutex); + break; + } + + QUEUE_REMOVE(q); + QUEUE_INIT(q); /* Signal uv_cancel() that the work req is executing. */ + + is_slow_work = 0; + if (q == &run_slow_work_message) { + /* If we're at the slow I/O threshold, re-schedule until after all + other work in the queue is done. */ + if (slow_io_work_running >= slow_work_thread_threshold()) { + QUEUE_INSERT_TAIL(&wq, q); + goto wait_for_work; + } + + /* If we encountered a request to run slow I/O work but there is none + to run, that means it's cancelled => Start over. */ + if (QUEUE_EMPTY(&slow_io_pending_wq)) + goto wait_for_work; + + is_slow_work = 1; + slow_io_work_running++; + + q = QUEUE_HEAD(&slow_io_pending_wq); QUEUE_REMOVE(q); - QUEUE_INIT(q); /* Signal uv_cancel() that the work req is - executing. */ + QUEUE_INIT(q); + + /* If there is more slow I/O work, schedule it to be run as well. */ + if (!QUEUE_EMPTY(&slow_io_pending_wq)) { + QUEUE_INSERT_TAIL(&wq, &run_slow_work_message); + if (idle_threads > 0) + uv_cond_signal(&cond); + } } uv_mutex_unlock(&mutex); - if (q == &exit_message) - break; - w = QUEUE_DATA(q, struct uv__work, wq); w->work(w); uv_mutex_lock(&w->loop->wq_mutex); + if (is_slow_work) + slow_io_work_running--; w->work = NULL; /* Signal uv_cancel() that the work req is done executing. */ QUEUE_INSERT_TAIL(&w->loop->wq, &w->wq); @@ -92,8 +133,20 @@ static void worker(void* arg) { } -static void post(QUEUE* q) { +static void post(QUEUE* q, enum uv__work_kind kind) { uv_mutex_lock(&mutex); + if (kind == UV__WORK_SLOW_IO) { + /* Insert into a separate queue. */ + QUEUE_INSERT_TAIL(&slow_io_pending_wq, q); + if (!QUEUE_EMPTY(&run_slow_work_message)) { + /* Running slow I/O tasks is already scheduled => Nothing to do here. + The worker that runs said other task will schedule this one as well. */ + uv_mutex_unlock(&mutex); + return; + } + q = &run_slow_work_message; + } + QUEUE_INSERT_TAIL(&wq, q); if (idle_threads > 0) uv_cond_signal(&cond); @@ -108,7 +161,7 @@ UV_DESTRUCTOR(static void cleanup(void)) { if (nthreads == 0) return; - post(&exit_message); + post(&exit_message, UV__WORK_CPU); for (i = 0; i < nthreads; i++) if (uv_thread_join(threads + i)) @@ -156,6 +209,8 @@ static void init_threads(void) { abort(); QUEUE_INIT(&wq); + QUEUE_INIT(&slow_io_pending_wq); + QUEUE_INIT(&run_slow_work_message); if (uv_sem_init(&sem, 0)) abort(); @@ -194,13 +249,14 @@ static void init_once(void) { void uv__work_submit(uv_loop_t* loop, struct uv__work* w, + enum uv__work_kind kind, void (*work)(struct uv__work* w), void (*done)(struct uv__work* w, int status)) { uv_once(&once, init_once); w->loop = loop; w->work = work; w->done = done; - post(&w->wq); + post(&w->wq, kind); } @@ -284,7 +340,11 @@ int uv_queue_work(uv_loop_t* loop, req->loop = loop; req->work_cb = work_cb; req->after_work_cb = after_work_cb; - uv__work_submit(loop, &req->work_req, uv__queue_work, uv__queue_done); + uv__work_submit(loop, + &req->work_req, + UV__WORK_CPU, + uv__queue_work, + uv__queue_done); return 0; } diff --git a/src/unix/fs.c b/src/unix/fs.c index 652cdfd734a..e66b1e4c839 100644 --- a/src/unix/fs.c +++ b/src/unix/fs.c @@ -120,7 +120,11 @@ do { \ if (cb != NULL) { \ uv__req_register(loop, req); \ - uv__work_submit(loop, &req->work_req, uv__fs_work, uv__fs_done); \ + uv__work_submit(loop, \ + &req->work_req, \ + UV__WORK_FAST_IO, \ + uv__fs_work, \ + uv__fs_done); \ return 0; \ } \ else { \ diff --git a/src/unix/getaddrinfo.c b/src/unix/getaddrinfo.c index 10e8afd75e8..25827c1feeb 100644 --- a/src/unix/getaddrinfo.c +++ b/src/unix/getaddrinfo.c @@ -186,6 +186,7 @@ int uv_getaddrinfo(uv_loop_t* loop, if (cb) { uv__work_submit(loop, &req->work_req, + UV__WORK_SLOW_IO, uv__getaddrinfo_work, uv__getaddrinfo_done); return 0; diff --git a/src/unix/getnameinfo.c b/src/unix/getnameinfo.c index 9a4367224c7..991002a67d7 100644 --- a/src/unix/getnameinfo.c +++ b/src/unix/getnameinfo.c @@ -109,6 +109,7 @@ int uv_getnameinfo(uv_loop_t* loop, if (getnameinfo_cb) { uv__work_submit(loop, &req->work_req, + UV__WORK_SLOW_IO, uv__getnameinfo_work, uv__getnameinfo_done); return 0; diff --git a/src/uv-common.h b/src/uv-common.h index 3289950d009..5555f83aee4 100644 --- a/src/uv-common.h +++ b/src/uv-common.h @@ -164,8 +164,15 @@ void uv__fs_poll_close(uv_fs_poll_t* handle); int uv__getaddrinfo_translate_error(int sys_err); /* EAI_* error. */ +enum uv__work_kind { + UV__WORK_CPU, + UV__WORK_FAST_IO, + UV__WORK_SLOW_IO +}; + void uv__work_submit(uv_loop_t* loop, struct uv__work *w, + enum uv__work_kind kind, void (*work)(struct uv__work *w), void (*done)(struct uv__work *w, int status)); diff --git a/src/win/fs.c b/src/win/fs.c index 71b6a81a0d5..0886d799826 100644 --- a/src/win/fs.c +++ b/src/win/fs.c @@ -55,7 +55,11 @@ do { \ if (cb != NULL) { \ uv__req_register(loop, req); \ - uv__work_submit(loop, &req->work_req, uv__fs_work, uv__fs_done); \ + uv__work_submit(loop, \ + &req->work_req, \ + UV__WORK_FAST_IO, \ + uv__fs_work, \ + uv__fs_done); \ return 0; \ } else { \ uv__fs_work(&req->work_req); \ diff --git a/src/win/getaddrinfo.c b/src/win/getaddrinfo.c index 063b4937cda..614ea8e376e 100644 --- a/src/win/getaddrinfo.c +++ b/src/win/getaddrinfo.c @@ -368,6 +368,7 @@ int uv_getaddrinfo(uv_loop_t* loop, if (getaddrinfo_cb) { uv__work_submit(loop, &req->work_req, + UV__WORK_SLOW_IO, uv__getaddrinfo_work, uv__getaddrinfo_done); return 0; diff --git a/src/win/getnameinfo.c b/src/win/getnameinfo.c index 71785a9fa65..b3773380c21 100644 --- a/src/win/getnameinfo.c +++ b/src/win/getnameinfo.c @@ -145,6 +145,7 @@ int uv_getnameinfo(uv_loop_t* loop, if (getnameinfo_cb) { uv__work_submit(loop, &req->work_req, + UV__WORK_SLOW_IO, uv__getnameinfo_work, uv__getnameinfo_done); return 0;