-
Notifications
You must be signed in to change notification settings - Fork 1.1k
/
highlevel.node.ts
372 lines (344 loc) · 12.6 KB
/
highlevel.node.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
import { generateUuid, TransferProgressEvent } from "@azure/ms-rest-js";
import * as fs from "fs";
import { Readable } from "stream";
import { Aborter } from "./Aborter";
import { BlobURL } from "./BlobURL";
import { BlockBlobURL } from "./BlockBlobURL";
import { BlobHTTPHeaders } from "./generated/src/models";
import { BlobUploadCommonResponse, IDownloadFromBlobOptions, IUploadToBlockBlobOptions } from "./highlevel.common";
import { IBlobAccessConditions } from "./models";
import { Batch } from "./utils/Batch";
import { BufferScheduler } from "./utils/BufferScheduler";
import {
BLOCK_BLOB_MAX_BLOCKS,
BLOCK_BLOB_MAX_STAGE_BLOCK_BYTES,
BLOCK_BLOB_MAX_UPLOAD_BLOB_BYTES,
DEFAULT_BLOB_DOWNLOAD_BLOCK_BYTES,
} from "./utils/constants";
import { generateBlockID } from "./utils/utils.common";
import { streamToBuffer } from "./utils/utils.node";
/**
* ONLY AVAILABLE IN NODE.JS RUNTIME.
*
* Uploads a local file in blocks to a block blob.
*
* When file size <= 256MB, this method will use 1 upload call to finish the upload.
* Otherwise, this method will call stageBlock to upload blocks, and finally call commitBlockList
* to commit the block list.
*
* @export
* @param {Aborter} aborter Create a new Aborter instance with Aborter.none or Aborter.timeout(),
* goto documents of Aborter for more examples about request cancellation
* @param {string} filePath Full path of local file
* @param {BlockBlobURL} blockBlobURL BlockBlobURL
* @param {IUploadToBlockBlobOptions} [options] IUploadToBlockBlobOptions
* @returns {(Promise<BlobUploadCommonResponse>)} ICommonResponse
*/
export async function uploadFileToBlockBlob(
aborter: Aborter,
filePath: string,
blockBlobURL: BlockBlobURL,
options?: IUploadToBlockBlobOptions
): Promise<BlobUploadCommonResponse> {
const size = fs.statSync(filePath).size;
return uploadResetableStreamToBlockBlob(
aborter,
(offset, count) =>
fs.createReadStream(filePath, {
autoClose: true,
end: count ? offset + count - 1 : Infinity,
start: offset
}),
size,
blockBlobURL,
options
);
}
/**
* ONLY AVAILABLE IN NODE.JS RUNTIME.
*
* Accepts a Node.js Readable stream factory, and uploads in blocks to a block blob.
* The Readable stream factory must returns a Node.js Readable stream starting from the offset defined. The offset
* is the offset in the block blob to be uploaded.
*
* When buffer length <= 256MB, this method will use 1 upload call to finish the upload.
* Otherwise, this method will call stageBlock to upload blocks, and finally call commitBlockList
* to commit the block list.
*
* @export
* @param {Aborter} aborter Create a new Aborter instance with Aborter.none or Aborter.timeout(),
* goto documents of Aborter for more examples about request cancellation
* @param {(offset: number) => NodeJS.ReadableStream} streamFactory Returns a Node.js Readable stream starting
* from the offset defined
* @param {number} size Size of the block blob
* @param {BlockBlobURL} blockBlobURL BlockBlobURL
* @param {IUploadToBlockBlobOptions} [options] IUploadToBlockBlobOptions
* @returns {(Promise<BlobUploadCommonResponse>)} ICommonResponse
*/
async function uploadResetableStreamToBlockBlob(
aborter: Aborter,
streamFactory: (offset: number, count?: number) => NodeJS.ReadableStream,
size: number,
blockBlobURL: BlockBlobURL,
options: IUploadToBlockBlobOptions = {}
): Promise<BlobUploadCommonResponse> {
if (!options.blockSize) {
options.blockSize = 0;
}
if (options.blockSize < 0 || options.blockSize > BLOCK_BLOB_MAX_STAGE_BLOCK_BYTES) {
throw new RangeError(
`blockSize option must be >= 0 and <= ${BLOCK_BLOB_MAX_STAGE_BLOCK_BYTES}`
);
}
if (options.maxSingleShotSize !== 0 && !options.maxSingleShotSize) {
options.maxSingleShotSize = BLOCK_BLOB_MAX_UPLOAD_BLOB_BYTES;
}
if (
options.maxSingleShotSize < 0 ||
options.maxSingleShotSize > BLOCK_BLOB_MAX_UPLOAD_BLOB_BYTES
) {
throw new RangeError(
`maxSingleShotSize option must be >= 0 and <= ${BLOCK_BLOB_MAX_UPLOAD_BLOB_BYTES}`
);
}
if (options.blockSize === 0) {
if (size > BLOCK_BLOB_MAX_BLOCKS * BLOCK_BLOB_MAX_STAGE_BLOCK_BYTES) {
throw new RangeError(`${size} is too larger to upload to a block blob.`);
}
if (size > options.maxSingleShotSize) {
options.blockSize = Math.ceil(size / BLOCK_BLOB_MAX_BLOCKS);
if (options.blockSize < DEFAULT_BLOB_DOWNLOAD_BLOCK_BYTES) {
options.blockSize = DEFAULT_BLOB_DOWNLOAD_BLOCK_BYTES;
}
}
}
if (!options.blobHTTPHeaders) {
options.blobHTTPHeaders = {};
}
if (!options.blobAccessConditions) {
options.blobAccessConditions = {};
}
if (size <= options.maxSingleShotSize) {
return blockBlobURL.upload(aborter, () => streamFactory(0), size, options);
}
const numBlocks: number = Math.floor((size - 1) / options.blockSize) + 1;
if (numBlocks > BLOCK_BLOB_MAX_BLOCKS) {
throw new RangeError(
`The buffer's size is too big or the BlockSize is too small;` +
`the number of blocks must be <= ${BLOCK_BLOB_MAX_BLOCKS}`
);
}
const blockList: string[] = [];
const blockIDPrefix = generateUuid();
let transferProgress: number = 0;
const batch = new Batch(options.parallelism);
for (let i = 0; i < numBlocks; i++) {
batch.addOperation(
async (): Promise<any> => {
const blockID = generateBlockID(blockIDPrefix, i);
const start = options.blockSize! * i;
const end = i === numBlocks - 1 ? size : start + options.blockSize!;
const contentLength = end - start;
blockList.push(blockID);
await blockBlobURL.stageBlock(
aborter,
blockID,
() => streamFactory(start, contentLength),
contentLength,
{
leaseAccessConditions: options.blobAccessConditions!.leaseAccessConditions
}
);
// Update progress after block is successfully uploaded to server, in case of block trying
transferProgress += contentLength;
if (options.progress) {
options.progress({ loadedBytes: transferProgress });
}
}
);
}
await batch.do();
return blockBlobURL.commitBlockList(aborter, blockList, options);
}
/**
* ONLY AVAILABLE IN NODE.JS RUNTIME.
*
* Downloads an Azure Blob in parallel to a buffer.
* Offset and count are optional, pass 0 for both to download the entire blob.
*
* @export
* @param {Aborter} aborter Create a new Aborter instance with Aborter.none or Aborter.timeout(),
* goto documents of Aborter for more examples about request cancellation
* @param {Buffer} buffer Buffer to be fill, must have length larger than count
* @param {BlobURL} blobURL A BlobURL object
* @param {number} offset From which position of the block blob to download, in bytes
* @param {number} [count] How much data in bytes to be downloaded. Will download to the end when passing undefined
* @param {IDownloadFromBlobOptions} [options] IDownloadFromBlobOptions
* @returns {Promise<void>}
*/
export async function downloadBlobToBuffer(
aborter: Aborter,
buffer: Buffer,
blobURL: BlobURL,
offset: number,
count?: number,
options: IDownloadFromBlobOptions = {}
): Promise<void> {
if (!options.blockSize) {
options.blockSize = 0;
}
if (options.blockSize < 0) {
throw new RangeError("blockSize option must be >= 0");
}
if (options.blockSize === 0) {
options.blockSize = DEFAULT_BLOB_DOWNLOAD_BLOCK_BYTES;
}
if (offset < 0) {
throw new RangeError("offset option must be >= 0");
}
if (count && count <= 0) {
throw new RangeError("count option must be > 0");
}
if (!options.blobAccessConditions) {
options.blobAccessConditions = {};
}
// Customer doesn't specify length, get it
if (!count) {
const response = await blobURL.getProperties(aborter, options);
count = response.contentLength! - offset;
if (count < 0) {
throw new RangeError(
`offset ${offset} shouldn't be larger than blob size ${response.contentLength!}`
);
}
}
if (buffer.length < count) {
throw new RangeError(
`The buffer's size should be equal to or larger than the request count of bytes: ${count}`
);
}
let transferProgress: number = 0;
const batch = new Batch(options.parallelism);
for (let off = offset; off < offset + count; off = off + options.blockSize) {
batch.addOperation(async () => {
// Exclusive chunk end position
let chunkEnd = offset + count!;
if (off + options.blockSize! < chunkEnd) {
chunkEnd = off + options.blockSize!;
}
const response = await blobURL.download(aborter, off, chunkEnd - off, {
blobAccessConditions: options.blobAccessConditions,
maxRetryRequests: options.maxRetryRequestsPerBlock
});
const stream = response.readableStreamBody!;
await streamToBuffer(stream, buffer, off - offset, chunkEnd - offset);
// Update progress after block is downloaded, in case of block trying
// Could provide finer grained progress updating inside HTTP requests,
// only if convenience layer download try is enabled
transferProgress += chunkEnd - off;
if (options.progress) {
options.progress({ loadedBytes: transferProgress });
}
});
}
await batch.do();
}
/**
* Option interface for uploadStreamToBlockBlob.
*
* @export
* @interface IUploadStreamToBlockBlobOptions
*/
export interface IUploadStreamToBlockBlobOptions {
/**
* Blob HTTP Headers.
*
* @type {BlobHTTPHeaders}
* @memberof IUploadStreamToBlockBlobOptions
*/
blobHTTPHeaders?: BlobHTTPHeaders;
/**
* Metadata of block blob.
*
* @type {{ [propertyName: string]: string }}
* @memberof IUploadStreamToBlockBlobOptions
*/
metadata?: { [propertyName: string]: string };
/**
* Access conditions headers.
*
* @type {IBlobAccessConditions}
* @memberof IUploadStreamToBlockBlobOptions
*/
accessConditions?: IBlobAccessConditions;
/**
* Progress updater.
*
* @memberof IUploadStreamToBlockBlobOptions
*/
progress?: (progress: TransferProgressEvent) => void;
}
/**
* ONLY AVAILABLE IN NODE.JS RUNTIME.
*
* Uploads a Node.js Readable stream into block blob.
*
* PERFORMANCE IMPROVEMENT TIPS:
* * Input stream highWaterMark is better to set a same value with bufferSize
* parameter, which will avoid Buffer.concat() operations.
*
* @export
* @param {Aborter} aborter Create a new Aborter instance with Aborter.none or Aborter.timeout(),
* goto documents of Aborter for more examples about request cancellation
* @param {Readable} stream Node.js Readable stream
* @param {BlockBlobURL} blockBlobURL A BlockBlobURL instance
* @param {number} bufferSize Size of every buffer allocated, also the block size in the uploaded block blob
* @param {number} maxBuffers Max buffers will allocate during uploading, positive correlation
* with max uploading concurrency
* @param {IUploadStreamToBlockBlobOptions} [options]
* @returns {Promise<BlobUploadCommonResponse>}
*/
export async function uploadStreamToBlockBlob(
aborter: Aborter,
stream: Readable,
blockBlobURL: BlockBlobURL,
bufferSize: number,
maxBuffers: number,
options: IUploadStreamToBlockBlobOptions = {}
): Promise<BlobUploadCommonResponse> {
if (!options.blobHTTPHeaders) {
options.blobHTTPHeaders = {};
}
if (!options.accessConditions) {
options.accessConditions = {};
}
let blockNum = 0;
const blockIDPrefix = generateUuid();
let transferProgress: number = 0;
const blockList: string[] = [];
const scheduler = new BufferScheduler(
stream,
bufferSize,
maxBuffers,
async (buffer: Buffer) => {
const blockID = generateBlockID(blockIDPrefix, blockNum);
blockList.push(blockID);
blockNum++;
await blockBlobURL.stageBlock(aborter, blockID, buffer, buffer.length, {
leaseAccessConditions: options.accessConditions!.leaseAccessConditions
});
// Update progress after block is successfully uploaded to server, in case of block trying
transferProgress += buffer.length;
if (options.progress) {
options.progress({ loadedBytes: transferProgress });
}
},
// Parallelism should set a smaller value than maxBuffers, which is helpful to
// reduce the possibility when a outgoing handler waits for stream data, in
// this situation, outgoing handlers are blocked.
// Outgoing queue shouldn't be empty.
Math.ceil((maxBuffers / 4) * 3)
);
await scheduler.do();
return blockBlobURL.commitBlockList(aborter, blockList, options);
}