/
BuffersStream.ts
126 lines (115 loc) · 3.68 KB
/
BuffersStream.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
import { Readable, ReadableOptions } from "stream";
/**
* Options to configure the BuffersStream.
*/
export interface BuffersStreamOptions extends ReadableOptions {}
/**
* This class generates a readable stream from the data in an array of buffers.
*
* @export
* @class BuffersStream
*/
export class BuffersStream extends Readable {
/**
* The offset of data to be read in the current buffer.
*
* @private
* @type {number}
* @memberof BuffersStream
*/
private byteOffsetInCurrentBuffer: number;
/**
* The index of buffer to be read in the array of buffers.
*
* @private
* @type {number}
* @memberof BuffersStream
*/
private bufferIndex: number;
/**
* The total length of data already read.
*
* @private
* @type {number}
* @memberof BuffersStream
*/
private pushedBytesLength: number;
/**
* Creates an instance of BuffersStream that will emit the data
* contained in the array of buffers.
*
* @param {Buffer[]} buffers Array of buffers containing the data
* @param {number} byteLength The total length of data contained in the buffers
* @memberof BuffersStream
*/
constructor(
private buffers: Buffer[],
private byteLength: number,
options?: BuffersStreamOptions
) {
super(options);
this.byteOffsetInCurrentBuffer = 0;
this.bufferIndex = 0;
this.pushedBytesLength = 0;
// check byteLength is no larger than buffers[] total length
let buffersLength = 0;
for (const buf of this.buffers) {
buffersLength += buf.byteLength;
}
if (buffersLength < this.byteLength) {
throw new Error("Data size shouldn't be larger than the total length of buffers.");
}
}
/**
* Internal _read() that will be called when the stream wants to pull more data in.
*
* @param {number} size Optional. The size of data to be read
* @memberof BuffersStream
*/
public _read(size?: number) {
if (this.pushedBytesLength >= this.byteLength) {
this.push(null);
}
if (!size) {
size = this.readableHighWaterMark;
}
const outBuffers: Buffer[] = [];
let i = 0;
while (i < size && this.pushedBytesLength < this.byteLength) {
// The last buffer may be longer than the data it contains.
const remainingDataInAllBuffers = this.byteLength - this.pushedBytesLength;
const remainingCapacityInThisBuffer =
this.buffers[this.bufferIndex].byteLength - this.byteOffsetInCurrentBuffer;
const remaining = Math.min(remainingCapacityInThisBuffer, remainingDataInAllBuffers);
if (remaining > size - i) {
// chunkSize = size - i
const end = this.byteOffsetInCurrentBuffer + size - i;
outBuffers.push(this.buffers[this.bufferIndex].slice(this.byteOffsetInCurrentBuffer, end));
this.pushedBytesLength += size - i;
this.byteOffsetInCurrentBuffer = end;
i = size;
break;
} else {
// chunkSize = remaining
const end = this.byteOffsetInCurrentBuffer + remaining;
outBuffers.push(this.buffers[this.bufferIndex].slice(this.byteOffsetInCurrentBuffer, end));
if (remaining === remainingCapacityInThisBuffer) {
// this.buffers[this.bufferIndex] used up, shift to next one
this.byteOffsetInCurrentBuffer = 0;
this.bufferIndex++;
} else {
this.byteOffsetInCurrentBuffer = end;
}
this.pushedBytesLength += remaining;
i += remaining;
}
}
if (outBuffers.length > 1) {
this.push(Buffer.concat(outBuffers));
} else if (outBuffers.length === 1) {
this.push(outBuffers[0]);
}
}
}