From 7897e85c11bcdece1efd749e42e772e5cf0974cf Mon Sep 17 00:00:00 2001 From: Daniel Lytkin Date: Fri, 23 Jul 2021 03:50:52 +0700 Subject: [PATCH] feat(from): support `AbortSignal` in `from(observable)` (#333) --- spec/asynciterable/from-spec.ts | 58 +++++++++++++++++++++++++++++++++ src/asynciterable/from.ts | 18 +++++++++- 2 files changed, 75 insertions(+), 1 deletion(-) diff --git a/spec/asynciterable/from-spec.ts b/spec/asynciterable/from-spec.ts index ee32fb64..870b52a5 100644 --- a/spec/asynciterable/from-spec.ts +++ b/spec/asynciterable/from-spec.ts @@ -2,6 +2,8 @@ import { hasNext, noNext, toObserver } from '../asynciterablehelpers'; import { setInterval, clearInterval } from 'timers'; import { PartialObserver } from '../../src/observer'; import { from } from 'ix/asynciterable'; +import { AbortError } from 'ix/Ix'; +import { withAbort } from 'ix/asynciterable/operators'; test('AsyncIterable#from from promise list', async () => { const xs: Iterable> = [ @@ -246,3 +248,59 @@ test('AsyncIterable#fromObservable with error', async () => { const it = ys[Symbol.asyncIterator](); await expect(it.next()).rejects.toThrow(err); }); + +test('AsyncIterable#fromObservable with abort while waiting', async () => { + let unsubscribed = false; + + const xs = new TestObservable((obs) => { + obs.next(0); + + return { + unsubscribe() { + unsubscribed = true; + }, + }; + }); + + const abortController = new AbortController(); + + const ys = from(xs).pipe(withAbort(abortController.signal)); + const it = ys[Symbol.asyncIterator](); + + await hasNext(it, 0); + + setTimeout(() => { + abortController.abort(); + }, 100); + + await expect(it.next()).rejects.toBeInstanceOf(AbortError); + expect(unsubscribed).toBe(true); +}); + +test('AsyncIterable#fromObservable with abort while queueing', async () => { + let unsubscribed = false; + + const xs = new TestObservable((obs) => { + obs.next(0); + obs.next(1); + obs.next(2); + + return { + unsubscribe() { + unsubscribed = true; + }, + }; + }); + + const abortController = new AbortController(); + + const ys = from(xs).pipe(withAbort(abortController.signal)); + const it = ys[Symbol.asyncIterator](); + + await hasNext(it, 0); + + abortController.abort(); + + await expect(it.next()).rejects.toBeInstanceOf(AbortError); + expect(unsubscribed).toBe(true); +}); diff --git a/src/asynciterable/from.ts b/src/asynciterable/from.ts index 4ccb79fa..9db1b245 100644 --- a/src/asynciterable/from.ts +++ b/src/asynciterable/from.ts @@ -12,6 +12,7 @@ import { import { Observable } from '../observer'; import { toLength } from '../util/tolength'; import { AsyncSink } from './asyncsink'; +import { AbortError, throwIfAborted } from '../aborterror'; export let from: ( source: AsyncIterableInput, @@ -149,7 +150,9 @@ export function _initialize(Ctor: typeof AsyncIterableX) { this._selector = selector; } - async *[Symbol.asyncIterator]() { + async *[Symbol.asyncIterator](signal?: AbortSignal) { + throwIfAborted(signal); + const sink: AsyncSink = new AsyncSink(); const subscription = this._observable.subscribe({ next(value: TSource) { @@ -163,12 +166,25 @@ export function _initialize(Ctor: typeof AsyncIterableX) { } }); + function onAbort() { + sink.error(new AbortError()); + } + + if (signal) { + signal.addEventListener('abort', onAbort); + } + let i = 0; try { for (let next; !(next = await sink.next()).done; ) { + throwIfAborted(signal); yield await this._selector(next.value!, i++); } } finally { + if (signal) { + signal.removeEventListener('abort', onAbort); + } + subscription.unsubscribe(); } }