Skip to content

Commit

Permalink
fix: Improve compatibility of AsyncIterable helpers for polyfills (Ba…
Browse files Browse the repository at this point in the history
…bel/Hermes related) (#165)
  • Loading branch information
kitten committed Jul 20, 2023
1 parent 9199ec6 commit c160e1d
Show file tree
Hide file tree
Showing 7 changed files with 110 additions and 72 deletions.
5 changes: 5 additions & 0 deletions .changeset/yellow-hounds-heal.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'wonka': patch
---

Improve compatibility of `fromAsyncIterable` and `toAsyncIterable`. The `toAsyncIterable` will now output an object that's both an `AsyncIterator` and an `AsyncIterable`. Both helpers will now use a polyfill for `Symbol.asyncIterator` to improve compatibility with the Hermes engine and Babel transpilation.
17 changes: 10 additions & 7 deletions src/__tests__/sinks.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -244,19 +244,19 @@ describe('toAsyncIterable', () => {
};

const asyncIterator = sinks.toAsyncIterable(source)[Symbol.asyncIterator]();
const next$ = asyncIterator.next();

expect(pulls).toBe(1);
sink!(push(0));
expect(await asyncIterator.next()).toEqual({ value: 0, done: false });
expect(pulls).toBe(2);
expect(await next$).toEqual({ value: 0, done: false });
expect(pulls).toBe(1);

sink!(push(1));
expect(await asyncIterator.next()).toEqual({ value: 1, done: false });
expect(pulls).toBe(3);
expect(pulls).toBe(2);

sink!(SignalKind.End);
expect(await asyncIterator.next()).toEqual({ done: true });
expect(pulls).toBe(3);
expect(pulls).toBe(2);
});

it('buffers actively pushed values', async () => {
Expand All @@ -273,13 +273,14 @@ describe('toAsyncIterable', () => {
};

const asyncIterator = sinks.toAsyncIterable(source)[Symbol.asyncIterator]();
const next$ = asyncIterator.next();

sink!(push(0));
sink!(push(1));
sink!(SignalKind.End);

expect(pulls).toBe(1);
expect(await asyncIterator.next()).toEqual({ value: 0, done: false });
expect(await next$).toEqual({ value: 0, done: false });
expect(await asyncIterator.next()).toEqual({ value: 1, done: false });
expect(await asyncIterator.next()).toEqual({ done: true });
});
Expand All @@ -298,6 +299,7 @@ describe('toAsyncIterable', () => {
};

const asyncIterator = sinks.toAsyncIterable(source)[Symbol.asyncIterator]();
asyncIterator.next();
expect(pulls).toBe(1);

let resolved = false;
Expand Down Expand Up @@ -330,9 +332,10 @@ describe('toAsyncIterable', () => {
};

const asyncIterator = sinks.toAsyncIterable(source)[Symbol.asyncIterator]();
const next$ = asyncIterator.next();

sink!(push(0));
expect(await asyncIterator.next()).toEqual({ value: 0, done: false });
expect(await next$).toEqual({ value: 0, done: false });
expect(await asyncIterator.return!()).toEqual({ done: true });

sink!(push(1));
Expand Down
27 changes: 27 additions & 0 deletions src/helpers.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
import { TalkbackFn, TeardownFn, Start, Push, SignalKind } from './types';

declare global {
interface SymbolConstructor {
readonly observable: symbol;
}
}

/** Placeholder {@link TeardownFn | teardown functions} that's a no-op.
* @see {@link TeardownFn} for the definition and usage of teardowns.
* @internal
Expand Down Expand Up @@ -39,3 +45,24 @@ export function push<T>(value: T): Push<T> {
0: value,
} as Push<T>;
}

/** Returns the well-known symbol specifying the default AsyncIterator.
* @internal
*/
export const asyncIteratorSymbol = (): typeof Symbol.asyncIterator =>
(typeof Symbol === 'function' && Symbol.asyncIterator) || ('@@asyncIterator' as any);

/** Returns the well-known symbol specifying the default ES Observable.
* @privateRemarks
* This symbol is used to mark an object as a default ES Observable. By the specification, an object
* that abides by the default Observable implementation must carry a method set to this well-known
* symbol that returns the Observable implementation. It's common for this object to be an
* Observable itself and return itself on this method.
*
* @see {@link https://github.com/0no-co/wonka/issues/122} for notes on the intercompatibility
* between Observable implementations.
*
* @internal
*/
export const observableSymbol = (): typeof Symbol.observable =>
(typeof Symbol === 'function' && Symbol.observable) || ('@@observable' as any);
23 changes: 1 addition & 22 deletions src/observable.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,5 @@
import { Source, SignalKind, TalkbackKind } from './types';
import { push, start, talkbackPlaceholder } from './helpers';

declare global {
interface SymbolConstructor {
readonly observable: symbol;
}
}
import { push, start, talkbackPlaceholder, observableSymbol } from './helpers';

/** A definition of the ES Observable Subscription type that is returned by
* {@link Observable.subscribe}
Expand Down Expand Up @@ -118,21 +112,6 @@ interface Observable<T> {
[Symbol.observable](): Observable<T>;
}

/** Returns the well-known symbol specifying the default ES Observable.
* @privateRemarks
* This symbol is used to mark an object as a default ES Observable. By the specification, an object
* that abides by the default Observable implementation must carry a method set to this well-known
* symbol that returns the Observable implementation. It's common for this object to be an
* Observable itself and return itself on this method.
*
* @see {@link https://github.com/0no-co/wonka/issues/122} for notes on the intercompatibility
* between Observable implementations.
*
* @internal
*/
const observableSymbol = (): typeof Symbol.observable =>
Symbol.observable || ('@@observable' as any);

/** Converts an ES Observable to a {@link Source}.
* @param input - The {@link ObservableLike} object that will be converted.
* @returns A {@link Source} wrapping the passed Observable.
Expand Down
91 changes: 51 additions & 40 deletions src/sinks.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Source, Subscription, TalkbackKind, SignalKind } from './types';
import { talkbackPlaceholder } from './helpers';
import { Source, Subscription, TalkbackKind, SignalKind, SourceIterable } from './types';
import { talkbackPlaceholder, asyncIteratorSymbol } from './helpers';

/** Creates a subscription to a given source and invokes a `subscriber` callback for each value.
* @param subscriber - A callback function called for each issued value.
Expand Down Expand Up @@ -124,49 +124,60 @@ const doneResult = { done: true } as IteratorReturnResult<void>;
* }
* ```
*/
export const toAsyncIterable = <T>(source: Source<T>): AsyncIterable<T> => ({
[Symbol.asyncIterator](): AsyncIterator<T> {
const buffer: T[] = [];
export const toAsyncIterable = <T>(source: Source<T>): SourceIterable<T> => {
const buffer: T[] = [];

let ended = false;
let talkback = talkbackPlaceholder;
let next: ((value: IteratorResult<T>) => void) | void;
let ended = false;
let started = false;
let pulled = false;
let talkback = talkbackPlaceholder;
let next: ((value: IteratorResult<T>) => void) | void;

source(signal => {
if (ended) {
/*noop*/
} else if (signal === SignalKind.End) {
if (next) next = next(doneResult);
ended = true;
} else if (signal.tag === SignalKind.Start) {
(talkback = signal[0])(TalkbackKind.Pull);
} else if (next) {
next = next({ value: signal[0], done: false });
} else {
buffer.push(signal[0]);
return {
async next(): Promise<IteratorResult<T>> {
if (!started) {
started = true;
source(signal => {
if (ended) {
/*noop*/
} else if (signal === SignalKind.End) {
if (next) next = next(doneResult);
ended = true;
} else if (signal.tag === SignalKind.Start) {
pulled = true;
(talkback = signal[0])(TalkbackKind.Pull);
} else {
pulled = false;
if (next) {
next = next({ value: signal[0], done: false });
} else {
buffer.push(signal[0]);
}
}
});
}
});

return {
async next(): Promise<IteratorResult<T>> {
if (ended && !buffer.length) {
return doneResult;
} else if (!ended && buffer.length <= 1) {
talkback(TalkbackKind.Pull);
}

return buffer.length
? { value: buffer.shift()!, done: false }
: new Promise(resolve => (next = resolve));
},
async return(): Promise<IteratorReturnResult<void>> {
if (!ended) next = talkback(TalkbackKind.Close);
ended = true;
if (ended && !buffer.length) {
return doneResult;
},
};
},
});
} else if (!ended && !pulled && buffer.length <= 1) {
pulled = true;
talkback(TalkbackKind.Pull);
}

return buffer.length
? { value: buffer.shift()!, done: false }
: new Promise(resolve => (next = resolve));
},
async return(): Promise<IteratorReturnResult<void>> {
if (!ended) next = talkback(TalkbackKind.Close);
ended = true;
return doneResult;
},
[asyncIteratorSymbol()](): SourceIterable<T> {
return this;
},
};
};

/** Subscribes to a given source and collects all synchronous values into an array.
* @param source - A {@link Source}.
Expand Down
14 changes: 11 additions & 3 deletions src/sources.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
import { Source, Sink, SignalKind, TalkbackKind, Observer, Subject, TeardownFn } from './types';
import { push, start, talkbackPlaceholder, teardownPlaceholder } from './helpers';
import {
push,
start,
talkbackPlaceholder,
teardownPlaceholder,
asyncIteratorSymbol,
} from './helpers';
import { share } from './operators';

/** Helper creating a Source from a factory function when it's subscribed to.
Expand Down Expand Up @@ -45,9 +51,11 @@ export function lazy<T>(produce: () => Source<T>): Source<T> {
* @see {@link https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Iteration_protocols#the_async_iterator_and_async_iterable_protocols}
* for the JS Iterable protocol.
*/
export function fromAsyncIterable<T>(iterable: AsyncIterable<T>): Source<T> {
export function fromAsyncIterable<T>(iterable: AsyncIterable<T> | AsyncIterator<T>): Source<T> {
return sink => {
const iterator = iterable[Symbol.asyncIterator]();
const iterator: AsyncIterator<T> =
(iterable[asyncIteratorSymbol()] && iterable[asyncIteratorSymbol()]()) || iterable;

let ended = false;
let looping = false;
let pulled = false;
Expand Down
5 changes: 5 additions & 0 deletions src/types.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -200,3 +200,8 @@ export interface Subject<T> extends Observer<T> {
/** The {@link Source} that issues the signals as the {@link Observer} methods are called. */
source: Source<T>;
}

/** Async Iterable/Iterator after having converted a {@link Source}.
* @see {@link toAsyncIterable} for a helper that creates this structure.
*/
export interface SourceIterable<T> extends AsyncIterator<T>, AsyncIterable<T> {}

0 comments on commit c160e1d

Please sign in to comment.