Skip to content

Commit

Permalink
feat(fetch): add selector
Browse files Browse the repository at this point in the history
  • Loading branch information
cartant committed Apr 3, 2020
1 parent 49304ff commit d4138e7
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 5 deletions.
49 changes: 49 additions & 0 deletions spec/observables/dom/fetch-spec.ts
Expand Up @@ -264,4 +264,53 @@ describe('fromFetch', () => {
}
});
});

it('should support a selector', done => {
mockFetch.respondWith = {
...OK_RESPONSE,
text: () => Promise.resolve('bar')
};
const fetch$ = fromFetch('/foo', undefined, response => response.text());
expect(mockFetch.calls.length).to.equal(0);
expect(MockAbortController.created).to.equal(0);

fetch$.subscribe({
next: text => {
expect(text).to.equal('bar');
},
error: done,
complete: () => {
// Wait until the complete and the subsequent unsubscribe are finished
// before testing these expectations:
setTimeout(() => {
expect(MockAbortController.created).to.equal(1);
expect(mockFetch.calls.length).to.equal(1);
expect(mockFetch.calls[0].input).to.equal('/foo');
expect(mockFetch.calls[0].init!.signal).not.to.be.undefined;
expect(mockFetch.calls[0].init!.signal!.aborted).to.be.false;
done();
}, 0);
}
});
});

it('should abort when unsubscribed and a selector is specified', () => {
mockFetch.respondWith = {
...OK_RESPONSE,
text: () => Promise.resolve('bar')
};
const fetch$ = fromFetch('/foo', undefined, response => response.text());
expect(mockFetch.calls.length).to.equal(0);
expect(MockAbortController.created).to.equal(0);
const subscription = fetch$.subscribe();

expect(MockAbortController.created).to.equal(1);
expect(mockFetch.calls.length).to.equal(1);
expect(mockFetch.calls[0].input).to.equal('/foo');
expect(mockFetch.calls[0].init!.signal).not.to.be.undefined;
expect(mockFetch.calls[0].init!.signal!.aborted).to.be.false;

subscription.unsubscribe();
expect(mockFetch.calls[0].init!.signal!.aborted).to.be.true;
});
});
44 changes: 39 additions & 5 deletions src/internal/observable/dom/fetch.ts
@@ -1,5 +1,18 @@
import { Observable } from '../../Observable';
import { Subscription } from '../../Subscription';
import { from } from '../../observable/from';
import { ObservableInput } from '../../types';

export function fromFetch(
input: string | Request,
init?: RequestInit
): Observable<Response>;

export function fromFetch<T>(
input: string | Request,
init: RequestInit | undefined,
selector: (response: Response) => ObservableInput<T>
): Observable<T>;

/**
* Uses [the Fetch API](https://developer.mozilla.org/en-US/docs/Web/API/Fetch_API) to
Expand Down Expand Up @@ -51,8 +64,12 @@ import { Subscription } from '../../Subscription';
* @returns An Observable, that when subscribed to performs an HTTP request using the native `fetch`
* function. The {@link Subscription} is tied to an `AbortController` for the the fetch.
*/
export function fromFetch(input: string | Request, init?: RequestInit): Observable<Response> {
return new Observable<Response>(subscriber => {
export function fromFetch<T>(
input: string | Request,
init?: RequestInit,
selector?: (response: Response) => ObservableInput<T>
): Observable<Response | T> {
return new Observable<Response | T>(subscriber => {
const controller = new AbortController();
const signal = controller.signal;
let abortable = true;
Expand Down Expand Up @@ -91,9 +108,26 @@ export function fromFetch(input: string | Request, init?: RequestInit): Observab
}

fetch(input, perSubscriberInit).then(response => {
abortable = false;
subscriber.next(response);
subscriber.complete();
if (selector) {
subscription.add(from(selector(response)).subscribe(
value => subscriber.next(value),
err => {
abortable = false;
if (!unsubscribed) {
// Only forward the error if it wasn't an abort.
subscriber.error(err);
}
},
() => {
abortable = false;
subscriber.complete();
}
));
} else {
abortable = false;
subscriber.next(response);
subscriber.complete();
}
}).catch(err => {
abortable = false;
if (!unsubscribed) {
Expand Down

0 comments on commit d4138e7

Please sign in to comment.