You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
This package could provide utilities for a bit of "eager" processing of async iterators.
A "buffer" of a specified size to process items of an iterator in parallel before resolved items are yielded enables:
Look-ahead to get processing started some time before the consumer is actively pulling the next item off the stream.
Parallel execution of slow operations, like API-calls within in a map.
If result order is not important, the fastest items to resolve can be yielded early and thus processed earlier by the next step in the pipe.
New functions
race() where first resolved items are yielded first.
eager() for guaranteed item order.
This suggested style of functions could take one argument that describes the desired "parallelism" of the operation.
The argument could be a simple integer, or even better, a function that continuously supplies an adjusted "buffer" size to process in parallel before resolved items are yielded (good for backoff when an API is rate-limited, you probably want to switch down to a buffer size of 1 and then increase again slowly).
Suggested implementation
If race(5) is in the pipe, immediately call next() 5 times on the iterator and put them into a processing "buffer" (probably a map). When any resolves, remove it from the buffer, adjust the buffer size, fill up the buffer again using next() (unless the iterator reported being done), then yield the resolved item.
eager(5) would do the same thing, but keep resolved items until they can be yielded in the same order they arrived.
Background
This is an idea I've had since I did a similar thing in a project for my former employer last year.
Had this package existed at that time it would have been a very elegant base for what I did!
I basically implemented map, flatten and tap for async iterators in similar ways to this package (I think those were the only basic operations I needed) then I added the race-style-parallelism on top.
The use case was reading a lot of paginated data from the Google Classroom APIs, in multiple steps. We wanted a list of a student's assignments, with course and teacher data for each assignment, something like this:
Get all courses for a student, page by page
Get all of a student's assignments for each course, page by page
Get the teacher info for every course
For a typical student that could entail up to a 100 API calls, and if done sequentially, it took a loooong time...
By reading data from an async iterator in each step and "racing" 10-20 items from each of them, the next page of data was available just ahead of time, requests from all steps could be processed in parallel and it was all in nice readable chained collection operations. Ok, they would've been even more readable with the nice pipe-style API of this package 😄
Of course this wouldn't have been an issue at all if the Google Classroom API was GraphQL 🤷 and the API limits was a big thing as we really hammered it with short bursts!
The text was updated successfully, but these errors were encountered:
hey, I just had an idea... perhaps the simplest version of this can be made by chunking and then flattening? Would that eager load the items in each chunk?
As it's been over a year since I worked on this myself (and I no longer have access to the old code I wrote) I'll need to dig in and wrap my head around async iterators again...
I'll sure have a look at your PR and get back to you!
This package could provide utilities for a bit of "eager" processing of async iterators.
A "buffer" of a specified size to process items of an iterator in parallel before resolved items are yielded enables:
map
.New functions
race()
where first resolved items are yielded first.eager()
for guaranteed item order.This suggested style of functions could take one argument that describes the desired "parallelism" of the operation.
The argument could be a simple integer, or even better, a function that continuously supplies an adjusted "buffer" size to process in parallel before resolved items are yielded (good for backoff when an API is rate-limited, you probably want to switch down to a buffer size of
1
and then increase again slowly).Suggested implementation
If
race(5)
is in the pipe, immediately callnext()
5 times on the iterator and put them into a processing "buffer" (probably a map). When any resolves, remove it from the buffer, adjust the buffer size, fill up the buffer again usingnext()
(unless the iterator reported beingdone
), then yield the resolved item.eager(5)
would do the same thing, but keep resolved items until they can be yielded in the same order they arrived.Background
This is an idea I've had since I did a similar thing in a project for my former employer last year.
Had this package existed at that time it would have been a very elegant base for what I did!
I basically implemented
map
,flatten
andtap
for async iterators in similar ways to this package (I think those were the only basic operations I needed) then I added the race-style-parallelism on top.The use case was reading a lot of paginated data from the Google Classroom APIs, in multiple steps. We wanted a list of a student's assignments, with course and teacher data for each assignment, something like this:
For a typical student that could entail up to a 100 API calls, and if done sequentially, it took a loooong time...
By reading data from an async iterator in each step and "racing" 10-20 items from each of them, the next page of data was available just ahead of time, requests from all steps could be processed in parallel and it was all in nice readable chained collection operations. Ok, they would've been even more readable with the nice pipe-style API of this package 😄
Of course this wouldn't have been an issue at all if the Google Classroom API was GraphQL 🤷 and the API limits was a big thing as we really hammered it with short bursts!
The text was updated successfully, but these errors were encountered: