Package implementing concurrency primitive inspired by the blog post Notes on structured concurrency, or: Go statement considered harmful
npm install nursery
This package requires Node 10 and above, and has only one dependency: abort-controller
which is used as a polyfill
for the standard AbortController
class used to signal cancellation in the fetch
API.
Why v10? Because it's API heavily uses for await
, which is available only since Node 10.
The package enables you to group a number of running async tasks into one, and to ensure that they all finish together. Let's see an example:
const Nursery = require('nursery')
const delay = ms => new Promise(resolve => setTimeout(resolve, ms))
;(async function() {
await Nursery([
delay(20).then(() => console.log('second')),
delay(10).then(() => console.log('first'))
])
}
})()
// ==> first
// ==> second
How is this different from the following using Promise.all
?
;(async function() {
await Promise.all([
delay(20).then(() => console.log('second')),
delay(10).then(() => console.log('first')),
])
})()
// ==> first
// ==> second
It isn't! Same. But!
But Promise.all
is dumb. It doesn't include the following guarantee:
A Nursery waits for all tasks to terminate, even if one of the tasks fails.
Let's look at this example:
;(async function() {
try {
await Promise.all([
Promise.reject(new Error('failed!')),
delay(10).then(() => console.log('first')),
])
} catch (err) {
console.log('after Promise.all', err.message)
}
})()
// ==> after Promise.all failed!
// ==> first
In the above example, due to the failure of the first task, the Promise.all exits immediately, without waiting
for the delay task to end. Thus, the catch happens first and output after Promise.all failed!
, and then
sometime in the future, wihout us having any control about it, the other task ends.
What happened if the other task failed? Can we do anything about it? Nope. It silently fails. We lost control over it.
And this is bad. Why this is bad intuitively makes sense, but the blog post Notes on structured concurrency, or: Go statement considered harmful makes a pretty good case on why this it's bad.
Let's contrast this with the same implementation of the code, using nurseries:
;(async function() {
try {
await Nursery([
Promise.reject(new Error('failed!')),
delay(10).then(() => console.log('first')),
])
} catch (err) {
console.log('after Nursery', err.message)
}
})()
// ==> first
// ==> after Nursery failed!
This code works as we "expect" it too. The nursery does not finish until all nursery-run promises finish, even if one of the tasks fail. We still get the error, but all the tasks end their run.
Note: what happens if more than one task fails? Look it up in the API, this is handled well. TL;DR: the exception thrown includes a field that has all the other errors in an array.
Let's look at another way of writing this in nursery
:
;(async function() {
try {
for await (const {nurse} of Nursery()) {
nurse(Promise.reject(new Error('failed!')))
nurse(delay(10).then(() => console.log('first')))
}
} catch (err) {
console.log('after Nursery', err.message)
}
})()
// ==> first
// ==> after Nursery failed!
The syntax is strange. There's a for await
loop, and a body that runs two tasks using nurse(...)
.
Don't worry, the for await
loop executes only once
(we'll see later that it can execute more if we want, for retries).
The tasks run concurrently, but the for await
loop (along with Nursery
magic),
ensures that the body of the for loop waits till both tasks have run.
You can think of the Nursery(...)
call, because it was called without any tasks, to create an iterator
of Nurse
-es that enable executing tasks inside the nursery. Once all the tasks finish executing, the for await
also finishes.
Note: tasks in Nursery
are either already-created promises,
or functions that returns promises that the nursey executes to get the promise. For example, the above
code can be written, but instead of passing promises directly, we pass async functions:
;(async function() {
try {
for await (const {nurse} of Nursery()) {
nurse(() => Promise.reject(new Error('failed!')))
nurse(() => delay(10).then(() => console.log('first')))
}
} catch (err) {
console.log('after Nursery', err.message)
}
})()
// ==> first
// ==> after Nursery failed!
But what if I want to cancel a task if another task fails? I still want to wait till that cancellation is done, but I want to cancel it. As an exanple, let's create a "real" task, which uses the Star Wars API to get the height of Luke Skywalker:
const fetch = require('node-fetch')
async function fetchSkywalkerHeight(fetchOptions) {
const response = await fetch('https://swapi.dev/api/people/1/', fetchOptions)
const skywalkerInfo = await response.json()
return skywalkerInfo.height
}
;(async function() {
console.log(await fetchSkywalkerHeight())
})()
// ==> 172
Note: we'll use the fetchOptions
later, when cancelling this task.
Now let's use this task in a nursery with another failed task:
await (async function() {
try {
for await (const {nurse} of Nursery()) {
nurse(Promise.reject(new Error('failed!')))
nurse(fetchSkywalkerHeight().then(height => console.log(height)))
}
} catch (err) {
console.log('after Nursery', err.message)
}
})()
// ==> 172
// ==> after Nursery failed!
The nursery waits for the fetchSkywalker
task to terminate, and thus it outputs 172
before failing. How
can we abort that fetch? We send the fetch
an abort signal (this is part of the Fetch API):
await (async function() {
try {
for await (const {nurse, signal} of Nursery()) {
nurse(Promise.reject(new Error('failed!')))
nurse(fetchSkywalkerHeight({signal}).then(height => console.log(height)))
}
} catch (err) {
console.log('after Nursery', err.message)
}
})()
// ==> after Nursery failed!
In this snippet, the "172" was never output, because the fetchSkywalkerHeight
was cancelled. How? The nursery
always creates an AbortController
(with it's accompanying AbortSignal
) when initializing,
and when one of the tasks fails, it calls AbortController.abort()
to signal to the other tasks to abort.
The AbortController
and it's accompanying AbortSignal
are stored in the nursery as
abortController
and signal
respectively. We pass the signal
to the Fetch API to tell it when to abort. This
is how the fetch
knows how to abort the task once of the other tasks fail.
I chose
AbortController
as the cancellation API as there is currently no other standard API that enables task cancellation. If JavaScript in the future standardizes on another standard, I'll add that one too.
You can use the AbortSignal
to enable your own cancellation mechanism. Let's see an example:
;(async function() {
try {
for await (const {nurse, signal} of Nursery()) {
nurse(Promise.reject(new Error('failed!')))
nurse(
delay(10).then(_ =>
!signal.aborted ? console.log('not aborted') : console.log('aborted'),
),
)
}
} catch (err) {
console.log('after Nursery', err.message)
}
})()
// ==> aborted
// ==> after Nursery failed!
When an abort happens, the signal
becomes true
, enabling us to check the flag and abort whenever we want to.
We can also use signal.addEventListener('abort', ...)
to register an abort handler if we want to.
For more information on
AbortController
, see this. For more information on the Fetch API use ofAbortController
, see this.
- Retrying: if you pass
{retries: 3}
to theNursery
call, the body of thefor await
(or the tasks in the tasks list), are retried 3 trimes (if needed). See "retries" section below. - Throttling: if you pass
{execution: throat(3)}
(using the wonderful throat package)) to theNursery
call, the execution of the tasks is throttled to three at a time. See "execution" section below. - Sequential execution of async functions: pass
{execution: throat(1)}
to theNursery
call, and you get sequential execution of the async functions in the task list!
You can also run a task in "supervisor" mode. A task in this mode is not waited upon. Once all the other tasks are done, the nursery closes. Thus a supervisor task can supervise and wait for all other tasks to be done.
(Note that a supervisor tasks is also waited upon to close, as this is a prime directive of a nursery: all tasks end. But this is done in a second phase: after all regular tasks are done, aborting is signalled, which allows supervisors to also be finish).
The simplest task that should be run in supervisor mode is the Nursery.timeoutTask
. This task enables us to timeout
all tasks running in a nursery. For example, lets timeout the lukeSkywalker task:
await (async function() {
try {
for await (const {nurse, supervisor} of Nursery()) {
supervisor(Nursery.timeoutTask(5))
nurse(fetchSkywalkerHeight({signal}).then(height => console.log(height)))
}
} catch (err) {
if (err instanceof Nursery.TimeoutError) {
console.log('Timed out!')
}
}
})()
// ==> Timed out!
While Nursery.timeoutTask
is an important supervisor task, you can write your own in a simple way. Look
at the Nursery.timeoutTask source code to understand how to write other supervisor tasks.
const Nursery = require('nursery')
The only export is Nursery
, a function that if called with a set of "tasks" (or just one), returns a promise, thus:
await Nursery([...listOfTasks | task]))
A task is either a
Promise
(e.g.Promise.resolve(42)
orfuncReturningPromise()
) or a function returning aPromise
(e.g.() => Promise.resolve(42)
orfuncReturningPromise
).
Note: I sometimes refer to a function returning a Promise
is sometimes referred to as an async function.
Instead of an array of tasks, you can pass just one task.
If no tasks are passed, calling Nursery
returns a
generator of Nurse
objects,
destined to be used in a
for await
loop, thus:
for await (const nurseObject of Nursery()) {
// body using the nurseObject
}
Unless there are retries (see below on how to ask for retries), the body of the loop will run only 1 time.
Three overloads:
Nursery(task: Promise|Function, [options: object])
Nursery(taskList: Array<Promise|Function>, [options: object])
Nursery([options: object])
This function call returns a Promise
if called with a task or an array tasks,
or returns an async generator of nursery
objects (of type Nursery
) if not.
This generator is commonly used in for await
loops.
-
taskList
: optional array of tasks. A task is either aPromise
or a function returning aPromise
. -
options
: [optional] object with the following properties:retries
: the number of retries to run the loop body in case of failures. Default: 0. See "Retries" section.onRetry
: an async function that will be called (and waited upon) between every retry. It is passed the following parameters:attempt
: the number of retry attempt it is. Before the first retry,attempty
is 1, before the second retry, it is2
and so on. Note that if this function returns a rejected promise, then the retries stop. This enables you to do error filtering and continue with the retries only on the errors you want.remaining
: the remaining number of attempts left. Note thatremaining + attempt === retries
. A nice set of retry functions is supplied as part of theNursery
object. See below.
execution
: a function that receives a task that are async function and calls it. The nursery uses it to execute all tasks that are async functions. The default is to call it as is, but you can use it, for example, to throttle execution. See "execution" section.
-
Returns:
-
If no
taskLists
: an async generator commonly used infor await
loops. Example (definition ofdelay
can be found above):for await (const {nurse} of Nursery()) { nurse(delay(10).then(() => console.log('done'))) nurse(delay(20).then(() => console.log('done'))) }
In this example, the
for await
loop will wait until the two delays are done. -
If
taskList
: a Promise that is exactly whatPromise.all
returns. Example:console.log(await Nursery([ delay(10).then(() => 4), delay(20).then(() => 2), ])) // ==> [4, 2]
In this example, the Nursery call will wait until the two delays are done and will return an array of results
-
If
task
: a Promise that is the return value of the task:console.log(await Nursery([ delay(10).then(() => 4), ])) // ==> 4
In this example, the Nursery call will wait until the two delays are done and will return an array of results
In all the above cases, a
Nurse
object is passed to the function, to enable it to run other tasks. Note that the return value in this case is an array of all thenurse
run tasks + the return value of the task function itself:console.log( await Nursery(({nurse}) => { nurse(delay(20).then(_ => 'run1')) nurse(delay(10).then(_ => 'run2')) return 'done' }), )
-
The object generated by the Nursery generator above. In the following example, nurseryObject
is a nursery object:
for await (const nurseryObject of Nursery()) {
nurseryObject.nurse(delay(10).then(() => console.log('done')))
nurseryObject.nurse(delay(20).then(() => console.log('done')))
}
The Nurse
object has the following properties:
-
nurse(task: Promise | function)
: If the task is aPromise
, it will wait for promise resolution or cancelation. If it is afunction
, it will call the function (with anurse
object), and wait on thePromise
returned by it. If the function is synchronous and does not return a promise, it will transform the sync value (or exception) into a promise automatically.If the task is a function, it will be called, and passed itself (the nursery object).
The
Nursery
generator (orNursery
function call) ensures that all tasks that ran in the body of thefor await
loop (or were given to theNursery
function) terminate by waiting for them.The function call returns the
Promise
(either the one given to it, or the one returned by the function). You can, but don't have toawait
on the task (because the Nursery generator will wait for it when it is closed by thefor await
loop). -
supervisor(task: Promise | function)
: runs the task as a supervisor. A task in this mode is not waited upon. Once all the other (non-supervisor) tasks are done, the nursery closes. Thus a supervisor task can supervise and wait for all other tasks to be done. Note that a supervisor tasks is also waited upon to close, as this is a prime directive of a nursery: all tasks end. But this is done in a second phase: after all regular tasks are done, aborting is signalled, which allows supervisors to register on abort and end. -
signal
: anAbortSignal
(see here) to enable the tasks running in the nursery to detect when the nursery is aborted. In the example below, the second task detects that the nursery was aborted usingAbortSignal.aborted
innurse.signal.aborted ? ... : ...
:for await (const {nurse, signal} of Nursery()) { nurse(Promise.reject(new Error('failed!'))) nurse( delay(10).then(_ => !signal.aborted ? console.log('not aborted') : console.log('aborted'), ), ) } // ==> aborted // ==> after Nursery failed
-
abortController
: theAbortController
(see here) that can be used to abort the nursery, usingnurse.abortController.abort()
. This will abort the nursery without the need to fail a task. Example:
for await (const {nurse, signal, abortController} of Nursery()) {
nurse(delay(10).then(() => abortController.abort()))
nurse(
delay(10).then(_ => (!signal.aborted ? console.log('not aborted') : console.log('aborted'))),
)
}
// ==> aborted
The nursery will close itself after at least 1 generation of a nursery object (see "retries" below for when it is more).
Closing a generator will do the following:
- Wait for all Promises to be either resolved or rejected.
- If all promises are resolved, then all is good.
- If only one promise is rejected, it will throw/reject that promise (after waiting as described above).
- If more than one promise is rejected, it will throw/reject the first promise that was rejected, but will append
all other errors to it as a property of that error, in the field designated by the
Symbol
keyNursery.moreErrors
. Example:
try {
for await (const {nurse} of Nursery()) {
nurse(Promise.reject(new Error('first error')))
nurse(delay(10).then(_ => Promise.reject(new Error('second error'))))
}
} catch (err) {
console.log(err.message)
console.log(err[Nursery.moreErrors][0].message)
}
// ==> first error
// ==> second error
Note that if the first error is not of type object
, then no field can or is added to it, but it is still rejected.
Example with retries:
let rejectionCount = 0
for await (const {nurse} of Nursery({retries: 1})) {
nurse(() => rejectionCount++ === 0 ? Promise.reject(new Error()) : Promise.resolve(1))
nurse(delay(20).then(() => console.log('done')))
}
In the above example, done
will be output twice because in the first run, the first task fails, and thus the whole
body retries.
The execution
option enables you to control the execution of tasks that are functions. Note that if a task
is a Promise
, it will not be passed through the execution
option, as it is already executing.
The nursery will pass all task exections through the function.
In the example below, we can log each task execution:
function log(f) {
console.log('executing task')
return f()
}
for await (const {nurse} of Nursery({execution: log})) {
nurse(() => delay(10).then(_ => console.log(1)))
nurse(() => delay(20).then(_ => console.log(2)))
}
// ==> executing task
// ==> executing task
// ==> 1
// ==> 2
Another, more practical example, using the wonderful throat package:
const throat = require('throat')
// `throat(1)` returns a function that will execute functions passed to it, as is,
// but with a concurrency level of 1, i.e. sequential
for await (const {nurse} of Nursery({execution: throat(1)})) {
nurse(() => delay(20).then(_ => console.log(1)))
nurse(() => delay(10).then(_ => console.log(2)))
nurse(() => delay(5).then(_ => console.log(3)))
nurse(() => delay(30).then(_ => console.log(4)))
}
// => 1
// => 2
// => 3
// => 4
A function that returns a task that is supposed to run in supervisor mode,
and throws a Nursery.TimeoutError
if the timeout is reached.
Nursery.timeoutTask(ms, options)
: returns a task that times out afterms
milliseconds, or aborts with no error if other non-supervisor tasks are finished.ms
: the number of milliseconds to wait to throw theNursery.TimeoutError
. The exception will be thrown only if other non-supervisor tasks are not finished.options
: an object with the following properties:name
: the name of the task. default:undefined
. Will be sent to theNursery.TimeoutError
, which shows it in the error and is also a property of the exception.
Example:
try {
const [, skyWalkerHeight] = await Nursery(({nurse, supervisor, signal}) => {
supervisor(Nursery.timeoutTask(5, {name: 'fetchSkywalkerHeight'}))
nurse(fetchSkywalkerHeight({signal}))
})
console.log(skyWalkerHeight)
} catch (err) {
if (err.code === 'ERR_NURSERY_TIMEOUT_ERR') {
console.log(err.message)
}
}
// ==> Timeout of 5ms occured for task fetchSkywalkerHeight
Note in the example how we get back the skywalker height: it is the second value returned because the
timeoutTask also returns a value (undefined
).
A subclass of Error
. Is thrown by Nursery.timeoutTask
when a timeout occurs. Includes the following properties:
message
: the regularError
message, with a specific message about the timeout, including thename
andms
of the timeout.code
: will be set toERR_NURSERY_TIMEOUT_ERR
to be able to identify this error.ms
: the number of milliseconds of the timeout (same as thems
value ofNursery.timeoutTask
)name
: the name of the task ((same as thename
option ofNursery.timeoutTask
))
See Nursery.timeoutTask
for an example of catching a Nursery.TimeoutError
.
- A function that returns a timeout function that can be supplied to
Nursery
-sonRetry
. It creates a constant delay before each retry. It accepts the following parameter:delta
: the delay, in milliseconds, before each retry. Example:
for await (const {nurse} of Nursery({
retries: 3,
onRetry: Nursery.constantTimeRetry({delta: 200}),
})) {
// ...
}
-
A function that returns a timeout function that can be supplied to
Nursery
-sonRetry
. It creates a linear delay before each retry. It accepts the following parameter:start
: the initial first delay before each retry.delta
: How much to grow each delay before each retry.max
: the maximum delay possible. Default:Infinity
Note that the calculcation before each attempt will be
Math.min(start + delta * (attempt - 1), max)
Example:
for await (const {nurse} of Nursery({
retries: 3,
onRetry: Nursery.linearTimeRetry({start: 100, delta: 50}),
})) {
// ...
}
-
A function that returns a timeout function that can be supplied to
Nursery
-sonRetry
. It creates an exponential (backoff) delay before each retry. It accepts the following parameter:start
: the initial first delay before each retry.factor
: How much to grow each delay before each retry.max
: the maximum delay possible. Default:Infinity
Note that the calculcation before each attempt will be
Math.min(start * factor ** (attempt - 1), max)
Example:
for await (const {nurse} of Nursery({
retries: 3,
onRetry: Nursery.exponentialTimeRetry({start: 100, factor: 2}),
})) {
// ...
}
An exception, which if thrown from inside a task, will cancel it without aborting, and return a value. Example:
const [res1, res2] = await Nursery([
() => {
// ...
throw new Nursery.CancelTask(42)
},
() => Promise.resolve(43),
])
console.log(res1)
console.log(res2)
// ==> 42
// ==> 43
Note that if you throw this exception, the task will not fail. It's a nice and simple way to abort a task without the whole Nursery being rejected.
Nursery.CancelTask
extends JavaScript's Error
with the following:
constructor(value[], message)
:value
: the value returned by the task. Will set the propertyvalue
.message
: thethis.message
. The default isNursery task cancelled
.
code
: used to identify that it is aCancelTask
. Has the valueERR_NURSERY_TASK_CANCELLED
value
: the value to be returned by the task
- Contributions are welcome! PRs are welcome!
- To build the code, use
npm run build
(currently runs nothing). - To test the code, use
npm test
, which will both run the tests under thetest
folder, and run eslint - This code uses prettier and
npm test
verifies that.