Skip to content

Commit

Permalink
fixup! feat(http): Introduction of the fetch Backend for the `HttpC…
Browse files Browse the repository at this point in the history
…lient`
  • Loading branch information
AndrewKushnir committed Jun 8, 2023
1 parent d24306c commit a761c27
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 69 deletions.
2 changes: 1 addition & 1 deletion goldens/public-api/common/http/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import { XhrFactory } from '@angular/common';
// @public
export class FetchBackend implements HttpBackend {
// (undocumented)
handle(req: HttpRequest<any>): Observable<HttpEvent<any>>;
handle(request: HttpRequest<any>): Observable<HttpEvent<any>>;
// (undocumented)
static ɵfac: i0.ɵɵFactoryDeclaration<FetchBackend, never>;
// (undocumented)
Expand Down
135 changes: 67 additions & 68 deletions packages/common/http/src/fetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@
*/

import {inject, Injectable} from '@angular/core';
import {BehaviorSubject, defer, merge, Observable} from 'rxjs';
import {finalize} from 'rxjs/operators';
import {Observable, Observer} from 'rxjs';

import {HttpBackend} from './backend';
import {HttpHeaders} from './headers';
Expand Down Expand Up @@ -50,48 +49,42 @@ export class FetchBackend implements HttpBackend {
private readonly fetchImpl =
inject(FetchFactory, {optional: true})?.fetch ?? fetch.bind(globalThis);

handle(req: HttpRequest<any>): Observable<HttpEvent<any>> {
// Deferring to allow creating a new AbortController on retry()
return defer(() => {
const abortController = new AbortController();

// TODO: use `AsyncGenerators` instead of `BehaviorSubject` when we no longer support RXJS 6.x
// We can't use it now because from() doesn't accept generators on RxJS 6.
// Just replace `reportEvent` calls with `yield`.
const eventStream = new BehaviorSubject<HttpEvent<any>>({type: HttpEventType.Sent});

// Everything happens on Observable subscription.
return merge(
defer(() => {
return this
.doRequest(req, abortController.signal, (event) => eventStream.next(event))
.finally(() => eventStream.complete());
}),
eventStream,
)
.pipe(finalize(() => {
// Aborting the fetch call when the observable is unsubscribed
abortController.abort();
}));
handle(request: HttpRequest<any>): Observable<HttpEvent<any>> {
return new Observable(observer => {
const aborter = new AbortController();
this.doRequest(request, aborter.signal, observer)
.then(noop, error => observer.error(new HttpErrorResponse({error})));
return () => aborter.abort();
});
}

private async doRequest(
request: HttpRequest<any>, signal: AbortSignal,
reportEvent: (event: HttpEvent<any>) => void): Promise<HttpResponse<any>> {
observer: Observer<HttpEvent<any>>): Promise<void> {
const init = this.createRequestInit(request);
let response;

try {
response = await this.fetchImpl(request.url, {signal, ...init});
const fetchPromise = this.fetchImpl(request.url, {signal, ...init});

// Register a nonsense `.then` before sending the `Sent` event, in case the
// sent event causes the Promise to be synchronously rejected. zone.js treats a rejected
// promise that has not yet been awaited as an unhandled error.
fetchPromise.then(noop, noop);

// Send the `Sent` event before awaiting the response.
observer.next({type: HttpEventType.Sent});

response = await fetchPromise;
} catch (error: any) {
throw new HttpErrorResponse({
observer.error(new HttpErrorResponse({
error,
status: error.status ?? 0,
statusText: error.statusText,
url: request.url,
headers: error.headers,
});
}));
return;
}

const headers = new HttpHeaders(response.headers);
Expand All @@ -102,11 +95,11 @@ export class FetchBackend implements HttpBackend {
let body: string|ArrayBuffer|Blob|object|null = null;

if (request.reportProgress) {
reportEvent(new HttpHeaderResponse({headers, status, statusText, url}));
observer.next(new HttpHeaderResponse({headers, status, statusText, url}));
}

if (response.body) {
// Read Progess
// Read Progress
const contentLength = response.headers.get('content-length');
const chunks: Uint8Array[] = [];
const reader = response.body.getReader();
Expand All @@ -130,7 +123,7 @@ export class FetchBackend implements HttpBackend {
(partialText ?? '') + (decoder ??= new TextDecoder).decode(value, {stream: true}) :
undefined;

reportEvent({
observer.next({
type: HttpEventType.DownloadProgress,
total: contentLength ? +contentLength : undefined,
loaded: receivedLength,
Expand All @@ -141,8 +134,19 @@ export class FetchBackend implements HttpBackend {

// Combine all chunks.
const chunksAll = this.concatChunks(chunks, receivedLength);

body = this.parseBody(request, response, chunksAll);
try {
body = this.parseBody(request, chunksAll);
} catch (error) {
// Body loading or parsing failed
observer.error(new HttpErrorResponse({
error,
headers: new HttpHeaders(response.headers),
status: response.status,
statusText: response.statusText,
url: getResponseUrl(response) ?? request.url,
}));
return;
}
}

// Same behavior as the XhrBackend
Expand All @@ -157,48 +161,41 @@ export class FetchBackend implements HttpBackend {
const ok = status >= 200 && status < 300;

if (ok) {
return new HttpResponse({
observer.next(new HttpResponse({
body,
headers,
status,
statusText,
url,
});
}));

// The full body has been received and delivered, no further events
// are possible. This request is complete.
observer.complete();
} else {
observer.error(new HttpErrorResponse({
error: body,
headers,
status,
statusText,
url,
}));
}

throw new HttpErrorResponse({
error: body,
headers,
status,
statusText,
url,
});
}

private parseBody(request: HttpRequest<any>, response: Response, binContent: Uint8Array): string
|ArrayBuffer|Blob|object|null {
try {
switch (request.responseType) {
case 'json':
// stripping the XSSI when present
const text = new TextDecoder().decode(binContent).replace(XSSI_PREFIX, '');
return text === '' ? null : JSON.parse(text) as object;
case 'text':
return new TextDecoder().decode(binContent);
case 'blob':
return new Blob([binContent]);
case 'arraybuffer':
return binContent.buffer;
}
} catch (error) {
// body loading or parsing failed
throw new HttpErrorResponse({
error,
headers: new HttpHeaders(response.headers),
status: response.status,
statusText: response.statusText,
url: getResponseUrl(response) ?? request.url,
});
private parseBody(request: HttpRequest<any>, binContent: Uint8Array): string|ArrayBuffer|Blob
|object|null {
switch (request.responseType) {
case 'json':
// stripping the XSSI when present
const text = new TextDecoder().decode(binContent).replace(XSSI_PREFIX, '');
return text === '' ? null : JSON.parse(text) as object;
case 'text':
return new TextDecoder().decode(binContent);
case 'blob':
return new Blob([binContent]);
case 'arraybuffer':
return binContent.buffer;
}
}

Expand Down Expand Up @@ -249,3 +246,5 @@ export class FetchBackend implements HttpBackend {
export abstract class FetchFactory {
abstract fetch: typeof fetch;
}

function noop(): void {}

0 comments on commit a761c27

Please sign in to comment.