-
Notifications
You must be signed in to change notification settings - Fork 1.2k
/
fetchHttpClient.ts
322 lines (287 loc) · 9.47 KB
/
fetchHttpClient.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
import { AbortError } from "@azure/abort-controller";
import type {
HttpClient,
HttpHeaders as PipelineHeaders,
PipelineRequest,
PipelineResponse,
TransferProgressEvent,
} from "./interfaces.js";
import { RestError } from "./restError.js";
import { createHttpHeaders } from "./httpHeaders.js";
import { isNodeReadableStream, isWebReadableStream } from "./util/typeGuards.js";
/**
* Checks if the body is a Blob or Blob-like
*/
function isBlob(body: unknown): body is Blob {
// File objects count as a type of Blob, so we want to use instanceof explicitly
return (typeof Blob === "function" || typeof Blob === "object") && body instanceof Blob;
}
/**
* A HttpClient implementation that uses window.fetch to send HTTP requests.
* @internal
*/
class FetchHttpClient implements HttpClient {
/**
* Makes a request over an underlying transport layer and returns the response.
* @param request - The request to be made.
*/
public async sendRequest(request: PipelineRequest): Promise<PipelineResponse> {
const url = new URL(request.url);
const isInsecure = url.protocol !== "https:";
if (isInsecure && !request.allowInsecureConnection) {
throw new Error(`Cannot connect to ${request.url} while allowInsecureConnection is false.`);
}
if (request.proxySettings) {
throw new Error("HTTP proxy is not supported in browser environment");
}
try {
return await makeRequest(request);
} catch (e: any) {
throw getError(e, request);
}
}
}
/**
* Sends a request
*/
async function makeRequest(request: PipelineRequest): Promise<PipelineResponse> {
const { abortController, abortControllerCleanup } = setupAbortSignal(request);
try {
const headers = buildFetchHeaders(request.headers);
const { streaming, body: requestBody } = buildRequestBody(request);
const requestInit: RequestInit = {
body: requestBody,
method: request.method,
headers: headers,
signal: abortController.signal,
// Cloudflare doesn't implement the full Fetch API spec
// because of some of it doesn't make sense in the edge.
// See https://github.com/cloudflare/workerd/issues/902
...("credentials" in Request.prototype
? { credentials: request.withCredentials ? "include" : "same-origin" }
: {}),
...("cache" in Request.prototype ? { cache: "no-store" } : {}),
};
// According to https://fetch.spec.whatwg.org/#fetch-method,
// init.duplex must be set when body is a ReadableStream object.
// currently "half" is the only valid value.
if (streaming) {
(requestInit as any).duplex = "half";
}
/**
* Developers of the future:
* Do not set redirect: "manual" as part
* of request options.
* It will not work as you expect.
*/
const response = await fetch(request.url, requestInit);
// If we're uploading a blob, we need to fire the progress event manually
if (isBlob(request.body) && request.onUploadProgress) {
request.onUploadProgress({ loadedBytes: request.body.size });
}
return buildPipelineResponse(response, request, abortControllerCleanup);
} catch (e) {
abortControllerCleanup?.();
throw e;
}
}
/**
* Creates a pipeline response from a Fetch response;
*/
async function buildPipelineResponse(
httpResponse: Response,
request: PipelineRequest,
abortControllerCleanup?: () => void,
): Promise<PipelineResponse> {
const headers = buildPipelineHeaders(httpResponse);
const response: PipelineResponse = {
request,
headers,
status: httpResponse.status,
};
const bodyStream = isWebReadableStream(httpResponse.body)
? buildBodyStream(httpResponse.body, {
onProgress: request.onDownloadProgress,
onEnd: abortControllerCleanup,
})
: httpResponse.body;
if (
// Value of POSITIVE_INFINITY in streamResponseStatusCodes is considered as any status code
request.streamResponseStatusCodes?.has(Number.POSITIVE_INFINITY) ||
request.streamResponseStatusCodes?.has(response.status)
) {
if (request.enableBrowserStreams) {
response.browserStreamBody = bodyStream ?? undefined;
} else {
const responseStream = new Response(bodyStream);
response.blobBody = responseStream.blob();
abortControllerCleanup?.();
}
} else {
const responseStream = new Response(bodyStream);
response.bodyAsText = await responseStream.text();
abortControllerCleanup?.();
}
return response;
}
function setupAbortSignal(request: PipelineRequest): {
abortController: AbortController;
abortControllerCleanup: (() => void) | undefined;
} {
const abortController = new AbortController();
// Cleanup function
let abortControllerCleanup: (() => void) | undefined;
/**
* Attach an abort listener to the request
*/
let abortListener: ((event: any) => void) | undefined;
if (request.abortSignal) {
if (request.abortSignal.aborted) {
throw new AbortError("The operation was aborted.");
}
abortListener = (event: Event) => {
if (event.type === "abort") {
abortController.abort();
}
};
request.abortSignal.addEventListener("abort", abortListener);
abortControllerCleanup = () => {
if (abortListener) {
request.abortSignal?.removeEventListener("abort", abortListener);
}
};
}
// If a timeout was passed, call the abort signal once the time elapses
if (request.timeout > 0) {
setTimeout(() => {
abortController.abort();
}, request.timeout);
}
return { abortController, abortControllerCleanup };
}
/**
* Gets the specific error
*/
// eslint-disable-next-line @azure/azure-sdk/ts-use-interface-parameters
function getError(e: RestError, request: PipelineRequest): RestError {
if (e && e?.name === "AbortError") {
return e;
} else {
return new RestError(`Error sending request: ${e.message}`, {
code: e?.code ?? RestError.REQUEST_SEND_ERROR,
request,
});
}
}
/**
* Converts PipelineRequest headers to Fetch headers
*/
function buildFetchHeaders(pipelineHeaders: PipelineHeaders): Headers {
const headers = new Headers();
for (const [name, value] of pipelineHeaders) {
headers.append(name, value);
}
return headers;
}
function buildPipelineHeaders(httpResponse: Response): PipelineHeaders {
const responseHeaders = createHttpHeaders();
for (const [name, value] of httpResponse.headers) {
responseHeaders.set(name, value);
}
return responseHeaders;
}
interface BuildRequestBodyResponse {
body:
| string
| Blob
| ReadableStream<Uint8Array>
| ArrayBuffer
| ArrayBufferView
| FormData
| null
| undefined;
streaming: boolean;
}
function buildRequestBody(request: PipelineRequest): BuildRequestBodyResponse {
const body = typeof request.body === "function" ? request.body() : request.body;
if (isNodeReadableStream(body)) {
throw new Error("Node streams are not supported in browser environment.");
}
return isWebReadableStream(body)
? { streaming: true, body: buildBodyStream(body, { onProgress: request.onUploadProgress }) }
: { streaming: false, body };
}
/**
* Reads the request/response original stream and stream it through a new
* ReadableStream, this is done to be able to report progress in a way that
* all modern browsers support. TransformStreams would be an alternative,
* however they are not yet supported by all browsers i.e Firefox
*/
function buildBodyStream(
readableStream: ReadableStream<Uint8Array>,
options: { onProgress?: (progress: TransferProgressEvent) => void; onEnd?: () => void } = {},
): ReadableStream<Uint8Array> {
let loadedBytes = 0;
const { onProgress, onEnd } = options;
// If the current browser supports pipeThrough we use a TransformStream
// to report progress
if (isTransformStreamSupported(readableStream)) {
return readableStream.pipeThrough(
new TransformStream({
transform(chunk, controller) {
if (chunk === null) {
controller.terminate();
return;
}
controller.enqueue(chunk);
loadedBytes += chunk.length;
if (onProgress) {
onProgress({ loadedBytes });
}
},
flush() {
onEnd?.();
},
}),
);
} else {
// If we can't use transform streams, wrap the original stream in a new readable stream
// and use pull to enqueue each chunk and report progress.
const reader = readableStream.getReader();
return new ReadableStream({
async pull(controller) {
const { done, value } = await reader.read();
// When no more data needs to be consumed, break the reading
if (done || !value) {
onEnd?.();
// Close the stream
controller.close();
reader.releaseLock();
return;
}
loadedBytes += value?.length ?? 0;
// Enqueue the next data chunk into our target stream
controller.enqueue(value);
if (onProgress) {
onProgress({ loadedBytes });
}
},
cancel(reason?: string) {
onEnd?.();
return reader.cancel(reason);
},
});
}
}
/**
* Create a new HttpClient instance for the browser environment.
* @internal
*/
export function createFetchHttpClient(): HttpClient {
return new FetchHttpClient();
}
function isTransformStreamSupported(readableStream: ReadableStream): boolean {
return readableStream.pipeThrough !== undefined && self.TransformStream !== undefined;
}