/
utils.node.ts
134 lines (116 loc) · 3.76 KB
/
utils.node.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
import * as fs from "fs";
import * as util from "util";
import { isNode } from "@azure/core-http";
/**
* Reads a readable stream into buffer. Fill the buffer from offset to end.
*
* @export
* @param {NodeJS.ReadableStream} stream A Node.js Readable stream
* @param {Buffer} buffer Buffer to be filled, length must >= offset
* @param {number} offset From which position in the buffer to be filled, inclusive
* @param {number} end To which position in the buffer to be filled, exclusive
* @param {string} [encoding] Encoding of the Readable stream
* @returns {Promise<void>}
*/
export async function streamToBuffer(
stream: NodeJS.ReadableStream,
buffer: Buffer,
offset: number,
end: number,
encoding?: string
): Promise<void> {
let pos = 0; // Position in stream
const count = end - offset; // Total amount of data needed in stream
return new Promise<void>((resolve, reject) => {
stream.on("readable", () => {
if (pos >= count) {
resolve();
return;
}
let chunk = stream.read();
if (!chunk) {
return;
}
if (typeof chunk === "string") {
chunk = Buffer.from(chunk, encoding);
}
// How much data needed in this chunk
const chunkLength = pos + chunk.length > count ? count - pos : chunk.length;
buffer.fill(chunk.slice(0, chunkLength), offset + pos, offset + pos + chunkLength);
pos += chunkLength;
});
stream.on("end", () => {
if (pos < count) {
reject(
new Error(
`Stream drains before getting enough data needed. Data read: ${pos}, data need: ${count}`
)
);
}
resolve();
});
stream.on("error", reject);
});
}
/**
* ONLY AVAILABLE IN NODE.JS RUNTIME.
*
* Writes the content of a readstream to a local file. Returns a Promise which is completed after the file handle is closed.
* If Promise is rejected, the reason will be set to the first error raised by either the
* ReadableStream or the fs.WriteStream.
*
* @export
* @param {NodeJS.ReadableStream} rs The read stream.
* @param {string} file Destination file path.
* @returns {Promise<void>}
*/
export async function readStreamToLocalFile(rs: NodeJS.ReadableStream, file: string) {
return new Promise<void>((resolve, reject) => {
const ws = fs.createWriteStream(file);
// Set STREAM_DEBUG env var to log stream events while running tests
if (process.env.STREAM_DEBUG) {
rs.on("close", () => console.log("rs.close"));
rs.on("data", () => console.log("rs.data"));
rs.on("end", () => console.log("rs.end"));
rs.on("error", () => console.log("rs.error"));
ws.on("close", () => console.log("ws.close"));
ws.on("drain", () => console.log("ws.drain"));
ws.on("error", () => console.log("ws.error"));
ws.on("finish", () => console.log("ws.finish"));
ws.on("pipe", () => console.log("ws.pipe"));
ws.on("unpipe", () => console.log("ws.unpipe"));
}
let error: Error;
rs.on("error", (err: Error) => {
// First error wins
if (!error) {
error = err;
}
// When rs.error is raised, rs.end will never be raised automatically, so it must be raised manually
// to ensure ws.close is eventually raised.
rs.emit("end");
});
ws.on("error", (err: Error) => {
// First error wins
if (!error) {
error = err;
}
});
ws.on("close", () => {
if (error) {
reject(error);
} else {
resolve();
}
});
rs.pipe(ws);
});
}
/**
* ONLY AVAILABLE IN NODE.JS RUNTIME.
*
* Promisified version of fs.stat().
*/
export const fsStat = util.promisify(isNode ? fs.stat : function stat() {});