diff --git a/.changeset/yellow-hounds-heal.md b/.changeset/yellow-hounds-heal.md new file mode 100644 index 0000000..942b6c8 --- /dev/null +++ b/.changeset/yellow-hounds-heal.md @@ -0,0 +1,5 @@ +--- +'wonka': patch +--- + +Improve compatibility of `fromAsyncIterable` and `toAsyncIterable`. The `toAsyncIterable` will now output an object that's both an `AsyncIterator` and an `AsyncIterable`. Both helpers will now use a polyfill for `Symbol.asyncIterator` to improve compatibility with the Hermes engine and Babel transpilation. diff --git a/src/__tests__/sinks.test.ts b/src/__tests__/sinks.test.ts index e6f3fc7..86a10ae 100644 --- a/src/__tests__/sinks.test.ts +++ b/src/__tests__/sinks.test.ts @@ -244,19 +244,19 @@ describe('toAsyncIterable', () => { }; const asyncIterator = sinks.toAsyncIterable(source)[Symbol.asyncIterator](); + const next$ = asyncIterator.next(); - expect(pulls).toBe(1); sink!(push(0)); - expect(await asyncIterator.next()).toEqual({ value: 0, done: false }); - expect(pulls).toBe(2); + expect(await next$).toEqual({ value: 0, done: false }); + expect(pulls).toBe(1); sink!(push(1)); expect(await asyncIterator.next()).toEqual({ value: 1, done: false }); - expect(pulls).toBe(3); + expect(pulls).toBe(2); sink!(SignalKind.End); expect(await asyncIterator.next()).toEqual({ done: true }); - expect(pulls).toBe(3); + expect(pulls).toBe(2); }); it('buffers actively pushed values', async () => { @@ -273,13 +273,14 @@ describe('toAsyncIterable', () => { }; const asyncIterator = sinks.toAsyncIterable(source)[Symbol.asyncIterator](); + const next$ = asyncIterator.next(); sink!(push(0)); sink!(push(1)); sink!(SignalKind.End); expect(pulls).toBe(1); - expect(await asyncIterator.next()).toEqual({ value: 0, done: false }); + expect(await next$).toEqual({ value: 0, done: false }); expect(await asyncIterator.next()).toEqual({ value: 1, done: false }); expect(await asyncIterator.next()).toEqual({ done: true }); }); @@ -298,6 +299,7 @@ describe('toAsyncIterable', () => { }; const asyncIterator = sinks.toAsyncIterable(source)[Symbol.asyncIterator](); + asyncIterator.next(); expect(pulls).toBe(1); let resolved = false; @@ -330,9 +332,10 @@ describe('toAsyncIterable', () => { }; const asyncIterator = sinks.toAsyncIterable(source)[Symbol.asyncIterator](); + const next$ = asyncIterator.next(); sink!(push(0)); - expect(await asyncIterator.next()).toEqual({ value: 0, done: false }); + expect(await next$).toEqual({ value: 0, done: false }); expect(await asyncIterator.return!()).toEqual({ done: true }); sink!(push(1)); diff --git a/src/helpers.ts b/src/helpers.ts index eb4b4a1..c237eb7 100644 --- a/src/helpers.ts +++ b/src/helpers.ts @@ -1,5 +1,11 @@ import { TalkbackFn, TeardownFn, Start, Push, SignalKind } from './types'; +declare global { + interface SymbolConstructor { + readonly observable: symbol; + } +} + /** Placeholder {@link TeardownFn | teardown functions} that's a no-op. * @see {@link TeardownFn} for the definition and usage of teardowns. * @internal @@ -39,3 +45,24 @@ export function push(value: T): Push { 0: value, } as Push; } + +/** Returns the well-known symbol specifying the default AsyncIterator. + * @internal + */ +export const asyncIteratorSymbol = (): typeof Symbol.asyncIterator => + (typeof Symbol === 'function' && Symbol.asyncIterator) || ('@@asyncIterator' as any); + +/** Returns the well-known symbol specifying the default ES Observable. + * @privateRemarks + * This symbol is used to mark an object as a default ES Observable. By the specification, an object + * that abides by the default Observable implementation must carry a method set to this well-known + * symbol that returns the Observable implementation. It's common for this object to be an + * Observable itself and return itself on this method. + * + * @see {@link https://github.com/0no-co/wonka/issues/122} for notes on the intercompatibility + * between Observable implementations. + * + * @internal + */ +export const observableSymbol = (): typeof Symbol.observable => + (typeof Symbol === 'function' && Symbol.observable) || ('@@observable' as any); diff --git a/src/observable.ts b/src/observable.ts index f226630..b80c079 100644 --- a/src/observable.ts +++ b/src/observable.ts @@ -1,11 +1,5 @@ import { Source, SignalKind, TalkbackKind } from './types'; -import { push, start, talkbackPlaceholder } from './helpers'; - -declare global { - interface SymbolConstructor { - readonly observable: symbol; - } -} +import { push, start, talkbackPlaceholder, observableSymbol } from './helpers'; /** A definition of the ES Observable Subscription type that is returned by * {@link Observable.subscribe} @@ -118,21 +112,6 @@ interface Observable { [Symbol.observable](): Observable; } -/** Returns the well-known symbol specifying the default ES Observable. - * @privateRemarks - * This symbol is used to mark an object as a default ES Observable. By the specification, an object - * that abides by the default Observable implementation must carry a method set to this well-known - * symbol that returns the Observable implementation. It's common for this object to be an - * Observable itself and return itself on this method. - * - * @see {@link https://github.com/0no-co/wonka/issues/122} for notes on the intercompatibility - * between Observable implementations. - * - * @internal - */ -const observableSymbol = (): typeof Symbol.observable => - Symbol.observable || ('@@observable' as any); - /** Converts an ES Observable to a {@link Source}. * @param input - The {@link ObservableLike} object that will be converted. * @returns A {@link Source} wrapping the passed Observable. diff --git a/src/sinks.ts b/src/sinks.ts index 739e346..69fccf4 100644 --- a/src/sinks.ts +++ b/src/sinks.ts @@ -1,5 +1,5 @@ -import { Source, Subscription, TalkbackKind, SignalKind } from './types'; -import { talkbackPlaceholder } from './helpers'; +import { Source, Subscription, TalkbackKind, SignalKind, SourceIterable } from './types'; +import { talkbackPlaceholder, asyncIteratorSymbol } from './helpers'; /** Creates a subscription to a given source and invokes a `subscriber` callback for each value. * @param subscriber - A callback function called for each issued value. @@ -124,49 +124,60 @@ const doneResult = { done: true } as IteratorReturnResult; * } * ``` */ -export const toAsyncIterable = (source: Source): AsyncIterable => ({ - [Symbol.asyncIterator](): AsyncIterator { - const buffer: T[] = []; +export const toAsyncIterable = (source: Source): SourceIterable => { + const buffer: T[] = []; - let ended = false; - let talkback = talkbackPlaceholder; - let next: ((value: IteratorResult) => void) | void; + let ended = false; + let started = false; + let pulled = false; + let talkback = talkbackPlaceholder; + let next: ((value: IteratorResult) => void) | void; - source(signal => { - if (ended) { - /*noop*/ - } else if (signal === SignalKind.End) { - if (next) next = next(doneResult); - ended = true; - } else if (signal.tag === SignalKind.Start) { - (talkback = signal[0])(TalkbackKind.Pull); - } else if (next) { - next = next({ value: signal[0], done: false }); - } else { - buffer.push(signal[0]); + return { + async next(): Promise> { + if (!started) { + started = true; + source(signal => { + if (ended) { + /*noop*/ + } else if (signal === SignalKind.End) { + if (next) next = next(doneResult); + ended = true; + } else if (signal.tag === SignalKind.Start) { + pulled = true; + (talkback = signal[0])(TalkbackKind.Pull); + } else { + pulled = false; + if (next) { + next = next({ value: signal[0], done: false }); + } else { + buffer.push(signal[0]); + } + } + }); } - }); - - return { - async next(): Promise> { - if (ended && !buffer.length) { - return doneResult; - } else if (!ended && buffer.length <= 1) { - talkback(TalkbackKind.Pull); - } - return buffer.length - ? { value: buffer.shift()!, done: false } - : new Promise(resolve => (next = resolve)); - }, - async return(): Promise> { - if (!ended) next = talkback(TalkbackKind.Close); - ended = true; + if (ended && !buffer.length) { return doneResult; - }, - }; - }, -}); + } else if (!ended && !pulled && buffer.length <= 1) { + pulled = true; + talkback(TalkbackKind.Pull); + } + + return buffer.length + ? { value: buffer.shift()!, done: false } + : new Promise(resolve => (next = resolve)); + }, + async return(): Promise> { + if (!ended) next = talkback(TalkbackKind.Close); + ended = true; + return doneResult; + }, + [asyncIteratorSymbol()](): SourceIterable { + return this; + }, + }; +}; /** Subscribes to a given source and collects all synchronous values into an array. * @param source - A {@link Source}. diff --git a/src/sources.ts b/src/sources.ts index abeb6c2..480f0de 100644 --- a/src/sources.ts +++ b/src/sources.ts @@ -1,5 +1,11 @@ import { Source, Sink, SignalKind, TalkbackKind, Observer, Subject, TeardownFn } from './types'; -import { push, start, talkbackPlaceholder, teardownPlaceholder } from './helpers'; +import { + push, + start, + talkbackPlaceholder, + teardownPlaceholder, + asyncIteratorSymbol, +} from './helpers'; import { share } from './operators'; /** Helper creating a Source from a factory function when it's subscribed to. @@ -45,9 +51,11 @@ export function lazy(produce: () => Source): Source { * @see {@link https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Iteration_protocols#the_async_iterator_and_async_iterable_protocols} * for the JS Iterable protocol. */ -export function fromAsyncIterable(iterable: AsyncIterable): Source { +export function fromAsyncIterable(iterable: AsyncIterable | AsyncIterator): Source { return sink => { - const iterator = iterable[Symbol.asyncIterator](); + const iterator: AsyncIterator = + (iterable[asyncIteratorSymbol()] && iterable[asyncIteratorSymbol()]()) || iterable; + let ended = false; let looping = false; let pulled = false; diff --git a/src/types.d.ts b/src/types.d.ts index 164cafa..faa5093 100644 --- a/src/types.d.ts +++ b/src/types.d.ts @@ -200,3 +200,8 @@ export interface Subject extends Observer { /** The {@link Source} that issues the signals as the {@link Observer} methods are called. */ source: Source; } + +/** Async Iterable/Iterator after having converted a {@link Source}. + * @see {@link toAsyncIterable} for a helper that creates this structure. + */ +export interface SourceIterable extends AsyncIterator, AsyncIterable {}