Skip to content

Commit

Permalink
feat(fetch): add selector
Browse files Browse the repository at this point in the history
Closes #4744
  • Loading branch information
cartant committed Feb 9, 2020
1 parent f7c433b commit bb60eea
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 14 deletions.
49 changes: 49 additions & 0 deletions spec/observables/dom/fetch-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -249,4 +249,53 @@ describe('fromFetch', () => {
// The subscription will not be closed until the error fires when the promise resolves.
expect(subscription.closed).to.be.false;
});

it('should support a selector', done => {
mockFetch.respondWith = {
...OK_RESPONSE,
text: () => Promise.resolve('bar')
};
const fetch$ = fromFetch('/foo', undefined, response => response.text());
expect(mockFetch.calls.length).to.equal(0);
expect(MockAbortController.created).to.equal(0);

fetch$.subscribe({
next: text => {
expect(text).to.equal('bar');
},
error: done,
complete: () => {
// Wait until the complete and the subsequent unsubscribe are finished
// before testing these expectations:
setTimeout(() => {
expect(MockAbortController.created).to.equal(1);
expect(mockFetch.calls.length).to.equal(1);
expect(mockFetch.calls[0].input).to.equal('/foo');
expect(mockFetch.calls[0].init!.signal).not.to.be.undefined;
expect(mockFetch.calls[0].init!.signal!.aborted).to.be.false;
done();
}, 0);
}
});
});

it('should abort when unsubscribed and a selector is specified', () => {
mockFetch.respondWith = {
...OK_RESPONSE,
text: () => Promise.resolve('bar')
};
const fetch$ = fromFetch('/foo', undefined, response => response.text());
expect(mockFetch.calls.length).to.equal(0);
expect(MockAbortController.created).to.equal(0);
const subscription = fetch$.subscribe();

expect(MockAbortController.created).to.equal(1);
expect(mockFetch.calls.length).to.equal(1);
expect(mockFetch.calls[0].input).to.equal('/foo');
expect(mockFetch.calls[0].init!.signal).not.to.be.undefined;
expect(mockFetch.calls[0].init!.signal!.aborted).to.be.false;

subscription.unsubscribe();
expect(mockFetch.calls[0].init!.signal!.aborted).to.be.true;
});
});
67 changes: 53 additions & 14 deletions src/internal/observable/dom/fetch.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,18 @@
import { Observable } from '../../Observable';
import { Subscription } from '../../Subscription';
import { from } from '../../observable/from';
import { ObservableInput } from '../../types';

export function fromFetch(
input: string | Request,
init?: RequestInit
): Observable<Response>;

export function fromFetch<T>(
input: string | Request,
init: RequestInit | undefined,
selector: (response: Response) => ObservableInput<T>
): Observable<T>;

/**
* Uses [the Fetch API](https://developer.mozilla.org/en-US/docs/Web/API/Fetch_API) to
Expand Down Expand Up @@ -50,27 +64,40 @@ import { Observable } from '../../Observable';
* @returns An Observable, that when subscribed to performs an HTTP request using the native `fetch`
* function. The {@link Subscription} is tied to an `AbortController` for the the fetch.
*/
export function fromFetch(input: string | Request, init?: RequestInit): Observable<Response> {
return new Observable<Response>(subscriber => {
export function fromFetch<T>(
input: string | Request,
init?: RequestInit,
selector?: (response: Response) => ObservableInput<T>
): Observable<Response | T> {
return new Observable<Response | T>(subscriber => {
const controller = new AbortController();
const signal = controller.signal;
let outerSignalHandler: () => void;
let abortable = true;
let unsubscribed = false;

const subscription = new Subscription();
subscription.add(() => {
unsubscribed = true;
if (abortable) {
controller.abort();
}
});

let perSubscriberInit: RequestInit;
if (init) {
// If a signal is provided, just have it teardown. It's a cancellation token, basically.
if (init.signal) {
if (init.signal.aborted) {
controller.abort();
} else {
outerSignalHandler = () => {
const outerSignal = init.signal;
const outerSignalHandler = () => {
if (!signal.aborted) {
controller.abort();
}
};
init.signal.addEventListener('abort', outerSignalHandler);
outerSignal.addEventListener('abort', outerSignalHandler);
subscription.add(() => outerSignal.removeEventListener('abort', outerSignalHandler));
}
}
// init cannot be mutated or reassigned as it's closed over by the
Expand All @@ -81,9 +108,26 @@ export function fromFetch(input: string | Request, init?: RequestInit): Observab
}

fetch(input, perSubscriberInit).then(response => {
abortable = false;
subscriber.next(response);
subscriber.complete();
if (selector) {
subscription.add(from(selector(response)).subscribe(
value => subscriber.next(value),
err => {
abortable = false;
if (!unsubscribed) {
// Only forward the error if it wasn't an abort.
subscriber.error(err);
}
},
() => {
abortable = false;
subscriber.complete();
}
));
} else {
abortable = false;
subscriber.next(response);
subscriber.complete();
}
}).catch(err => {
abortable = false;
if (!unsubscribed) {
Expand All @@ -92,11 +136,6 @@ export function fromFetch(input: string | Request, init?: RequestInit): Observab
}
});

return () => {
unsubscribed = true;
if (abortable) {
controller.abort();
}
};
return subscription;
});
}

0 comments on commit bb60eea

Please sign in to comment.