Skip to content

Conversation

@yaacovCR
Copy link
Contributor

@yaacovCR yaacovCR commented Oct 26, 2025

A Queue is a batching async-generator meant to be consumed in the following pattern:

for await (const item of queue.subscribe(mapBatch)) {
   .... // use item
}

where mapBatch is a function that takes a batch of type Generator<T> and returns a single item of type U | undefined where undefined means no item is emitted.

Items are produced as managed by an executor function passed to the Queue constructor, a la repeaters, see https://repeater.js.org/)

A push() method is provided as an argument to the executor so that push() can be private to the code that constructs the Queue (although it can be saved to be passed to other code).

A stop() method is also provided as a the second argument to the executor for convenience, but it is also available on the queue object itself so that it can be called within the executor or by the consumer to end early. So this works:

const queue = new Queue(
  async (push, stop) => {
    push(1);
    push(2)
    await Promise.resolve();
    push(3);
    stop();
  },
);

const sub = queue.subscribe(batch => Array.from(batch));
const batch1 = await sub.next(); // batch1 = [1, 2]
const batch2 = await sub.next(); // batch2 = [3]

as does this:

let push;
const queue = new Queue(
  (_push) => {
    push = _push;
  },
);

push(1);
push(2);

const sub = queue.subscribe(batch => Array.from(batch));
const batch1 = await sub.next(); // batch1 = [1, 2]

const batch2Promise = sub.next();

await Promise.resolve();
push(3);
queue.stop();  // using the stop() method on queue avoid the need to set stop = _stop

const batch2 = await batch2Promise; // batch2 = [3]

Note: concurrent calls to subscribe will reference the same queue and are not encouraged.

Using queues, we are able to remove all logic for handling the implicit queue from IncrementalPublisher, retaining only the _handleCompletedBatch(), while adding only the required push() and stop() calls within the IncrementalGraph.

Tests do not change, except that we have some extra ticks from the use of our new generators (including the use of withCleanup() to wrap, and so we have to adjust a few tick-sensitive tests.

@yaacovCR yaacovCR requested a review from a team as a code owner October 26, 2025 15:23
@yaacovCR yaacovCR added the PR: polish 💅 PR doesn't change public API or any observed behaviour label Oct 26, 2025
@yaacovCR
Copy link
Contributor Author

API simplified, above edited.

@yaacovCR
Copy link
Contributor Author

yaacovCR commented Oct 27, 2025

How does a Queue differ from a Repeater?

  1. Queue adds two extra microtasks to the queue because (A) it uses a "real" async generator internally which adds an extra task for the [AsyncGeneratorResumeNext] job rather than a custom object following the raw async generator protocol which can avoid this and (B) it allows for concurrent .return() using the withCleanup() wrapper, which adds an await.
  2. Queue does not provide a mechanism for handling backpressure, i.e. all pushes are synchronous. If we would like to add backpressure handling within the Incremental Publisher/Graph, we will do it at the source within the executor code that handles "early execution."
  3. Queue allows for creation of multiple async-generators via the .subscribe() method, which could potentially conflict with each other; Repeaters ARE async generators, and cannot conflict. (This is tied to 1A, if a Queue was a generator, it would have to be a "raw" async generator, and we are using a "real" async generator purposefully for code simplification.)
  4. ...other differences? @brainkim

@brainkim
Copy link

@yaacovCR Glad to see you're still working on GraphQL. The Queue is likely smaller page weight wise! 121 lines is tough to beat. I say keep going.

Queue is a batching async-aware iterator-like protocol meant to be consumed in the following (and only the following) pattern:

>  let batch;
>  if ((batch = queue.currentBatch()) !== undefined) {
>    doSomethingWithBatch(batch);
>  }
>  while ((batch = await queue.nextBatch) !== undefined) {
>    doSomethingWithBatch(batch);
>  }

A `push()` methods is provided in the Queue constructor (a la repeaters, see https://repeater.js.org/) so that `push()` can be private to the code that constructs the Queue (although it can be saved to be passed to other code).

A `stop()` method is also provided as a the second argument to the constructor for convienence, but it is also available on the queue object itself so that it can be called by the executor or by the consumer. So this works:

>  const queue = new Queue(
>    async (push, stop) => {
>      push(1);
>      push(2)
>      await Promise.resolve();
>      push(3);
>      stop();
>    },
>  );
>
>  const batch1 = Array.from(queue.nextBatch()); // batch1 = [1, 2]
>  const batch2 = Array.from(await queue.nextBatchAsync()); // batch2 = [3]

as does this:

>  let push;
>  const queue = new Queue(
>    (_push) => {
>      push = _push;
>    },
>  );
>
>  push(1);
>  push(2);
>
>  const batch1 = Array.from(queue.nextBatch()); // batch1 = [1, 2]
>
>  const batch2Promise = queue.nextBatchAsync();
>
>  await Promise.resolve();
>  push(3);
>  queue.stop();
>
>  const batch2 = await batch2Promise; // batch2 = [3]

Note: concurrent calls to `currentBatch()` and `nextBatch` will return the same batch and are not encouraged.

A `toAsyncIterable(mapFn)` method transforms coalesces each batch of items into a single value (or undefined if not value is to be emitted for the batch).

Using queues, we are able to remove all logic for handling the implicit queue from IncrementalPublisher, retaining only the `_handleCompletedBatch()`, while adding only the required `push()` and `stop()` calls within the IncrementalGraph.

Tests do not change, except that `.return()` and `.throw()` (but not `next()` have an extra tick secondary to the additional layers of `withCleanup()`, so that the tests required slight adjustment.
for our purposes, we can just halt the executor
@yaacovCR yaacovCR merged commit fc37884 into graphql:next Oct 27, 2025
15 of 16 checks passed
@yaacovCR yaacovCR deleted the introduce-queue branch October 27, 2025 20:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

PR: polish 💅 PR doesn't change public API or any observed behaviour

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants