/
PooledBuffer.ts
133 lines (118 loc) · 4.06 KB
/
PooledBuffer.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
import { BuffersStream } from "./BuffersStream";
import { Readable } from "stream";
/**
* maxBufferLength is max size of each buffer in the pooled buffers.
*/
// Can't use import as Typescript doesn't recognize "buffer".
const maxBufferLength = require("buffer").constants.MAX_LENGTH;
/**
* This class provides a buffer container which conceptually has no hard size limit.
* It accepts a capacity, an array of input buffers and the total length of input data.
* It will allocate an internal "buffer" of the capacity and fill the data in the input buffers
* into the internal "buffer" serially with respect to the total length.
* Then by calling PooledBuffer.getReadableStream(), you can get a readable stream
* assembled from all the data in the internal "buffer".
*/
export class PooledBuffer {
/**
* Internal buffers used to keep the data.
* Each buffer has a length of the maxBufferLength except last one.
*/
private buffers: Buffer[] = [];
/**
* The total size of internal buffers.
*/
private readonly capacity: number;
/**
* The total size of data contained in internal buffers.
*/
private _size: number;
/**
* The size of the data contained in the pooled buffers.
*/
public get size(): number {
return this._size;
}
/**
* Creates an instance of PooledBuffer with given capacity.
* Internal buffers are allocated but contains no data.
* Users may call the {@link PooledBuffer.fill} method to fill this
* pooled buffer with data.
*
* @param capacity - Total capacity of the internal buffers
*/
constructor(capacity: number);
/**
* Creates an instance of PooledBuffer with given capacity.
* Internal buffers are allocated and filled with data in the input buffers serially
* with respect to the total length.
*
* @param capacity - Total capacity of the internal buffers
* @param buffers - Input buffers containing the data to be filled in the pooled buffer
* @param totalLength - Total length of the data to be filled in.
*/
constructor(capacity: number, buffers: Buffer[], totalLength: number);
constructor(capacity: number, buffers?: Buffer[], totalLength?: number) {
this.capacity = capacity;
this._size = 0;
// allocate
const bufferNum = Math.ceil(capacity / maxBufferLength);
for (let i = 0; i < bufferNum; i++) {
let len = i === bufferNum - 1 ? capacity % maxBufferLength : maxBufferLength;
if (len === 0) {
len = maxBufferLength;
}
this.buffers.push(Buffer.allocUnsafe(len));
}
if (buffers) {
this.fill(buffers, totalLength!);
}
}
/**
* Fill the internal buffers with data in the input buffers serially
* with respect to the total length and the total capacity of the internal buffers.
* Data copied will be shift out of the input buffers.
*
* @param buffers - Input buffers containing the data to be filled in the pooled buffer
* @param totalLength - Total length of the data to be filled in.
*
*/
public fill(buffers: Buffer[], totalLength: number) {
this._size = Math.min(this.capacity, totalLength);
let i = 0,
j = 0,
targetOffset = 0,
sourceOffset = 0,
totalCopiedNum = 0;
while (totalCopiedNum < this._size) {
const source = buffers[i];
const target = this.buffers[j];
const copiedNum = source.copy(target, targetOffset, sourceOffset);
totalCopiedNum += copiedNum;
sourceOffset += copiedNum;
targetOffset += copiedNum;
if (sourceOffset === source.length) {
i++;
sourceOffset = 0;
}
if (targetOffset === target.length) {
j++;
targetOffset = 0;
}
}
// clear copied from source buffers
buffers.splice(0, i);
if (buffers.length > 0) {
buffers[0] = buffers[0].slice(sourceOffset);
}
}
/**
* Get the readable stream assembled from all the data in the internal buffers.
*
*/
public getReadableStream(): Readable {
return new BuffersStream(this.buffers, this.size);
}
}