Skip to content

Commit

Permalink
feat: allow concur iterables as input to asAsync
Browse files Browse the repository at this point in the history
  • Loading branch information
TomerAberbach committed Apr 5, 2024
1 parent 77f8535 commit 0ef1fc1
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 5 deletions.
8 changes: 7 additions & 1 deletion src/operations/as.d.ts
Expand Up @@ -19,6 +19,12 @@ import type { MaybePromiseLike } from '../internal/types.js'
/**
* Returns an async iterable wrapper around `iterable`.
*
* Note that when passing a concur iterable the returned async iterable may have
* to buffer the values produced by the concur iterable because values may not
* be read from the async iterable as quickly as they are produced by the concur
* iterable. This is a fundamental problem because concur iterables are "push"
* based while async iterables are "pull" based, which creates backpressure.
*
* @example
* ```js
* const asyncIterable = asAsync([`sloth`, `more sloth`, `even more sloth`])
Expand All @@ -35,7 +41,7 @@ import type { MaybePromiseLike } from '../internal/types.js'
* ```
*/
export const asAsync: <Value>(
iterable: Iterable<Value> | AsyncIterable<Value>,
iterable: Iterable<Value> | AsyncIterable<Value> | ConcurIterable<Value>,
) => AsyncIterable<Value>

/**
Expand Down
37 changes: 35 additions & 2 deletions src/operations/as.js
Expand Up @@ -14,15 +14,48 @@
* limitations under the License.
*/

import { createAsyncIterable } from '../internal/helpers.js'
import { createAsyncIterable, deferred } from '../internal/helpers.js'
import { map } from './transform.js'

export const asAsync = iterable => {
if (iterable[Symbol.asyncIterator]) {
return iterable
}

return createAsyncIterable(() => iterable[Symbol.iterator]())
return createAsyncIterable(
iterable[Symbol.iterator]
? () => iterable[Symbol.iterator]()
: async function* () {
let buffer = []
let done = false
let nonEmptyBufferDeferred = deferred()

iterable(value => {
buffer.push(value)
if (nonEmptyBufferDeferred) {
const currentDeferred = nonEmptyBufferDeferred
nonEmptyBufferDeferred = null
currentDeferred._resolve()
}
}).then(() => {
done = true
nonEmptyBufferDeferred?._resolve()
})

// eslint-disable-next-line no-unmodified-loop-condition
while (!done) {
if (!buffer.length) {
await nonEmptyBufferDeferred._promise
continue
}

const currentBuffer = buffer
buffer = []
nonEmptyBufferDeferred = deferred()
yield* currentBuffer
}
},
)
}

export const asConcur = iterable => {
Expand Down
19 changes: 17 additions & 2 deletions test/operations/as.ts
Expand Up @@ -40,10 +40,13 @@ test.skip(`asAsync types are correct`, () => {
expectTypeOf(asAsync(asAsync([`a`, `b`, `c`]))).toMatchTypeOf<
AsyncIterable<string>
>()
expectTypeOf(asAsync(asConcur([`a`, `b`, `c`]))).toMatchTypeOf<
AsyncIterable<string>
>()
})

testProp(
`asAsync returns a pure async iterable`,
`asAsync returns a pure async iterable for non-concur iterables`,
[fc.oneof(iterableArb, asyncIterableArb)],
async ({ iterable }) => {
const asyncIterable = asAsync(iterable)
Expand All @@ -53,7 +56,7 @@ testProp(
)

testProp(
`asAsync returns an async iterable containing the same values in the same order as the given iterable`,
`asAsync returns an async iterable containing the same values in the same order as the given iterable or async iterable`,
[fc.oneof(iterableArb, asyncIterableArb)],
async ({ iterable, values }) => {
const asyncIterable = asAsync(iterable)
Expand All @@ -62,6 +65,18 @@ testProp(
},
)

testProp(
`asAsync returns an async iterable containing the same values as the given concur iterable`,
[concurIterableArb],
async ({ iterable, values }) => {
const asyncIterable = asAsync(iterable)

expect(await reduceAsync(toArray(), asyncIterable)).toIncludeSameMembers(
values,
)
},
)

test.skip(`asConcur types are correct`, () => {
expectTypeOf(asConcur([1, 2, 3])).toMatchTypeOf<ConcurIterable<number>>()
expectTypeOf(asConcur([1, 2, 3] as Iterable<number>)).toMatchTypeOf<
Expand Down

0 comments on commit 0ef1fc1

Please sign in to comment.