/
index.mjs
129 lines (105 loc) · 3.46 KB
/
index.mjs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
//CREDIT: https://github.com/signicode/rw-stream
import { strictEqual } from "assert";
import { promises as fsp } from "fs";
import { Readable, Writable } from "stream";
const { open } = fsp;
export default (async (file, { readStart, writeStart } = {}) => {
let readIndex = Number(readStart) || 0; // NaN -> 0
let writeIndex = Number(writeStart) || 0; // NaN -> 0
/**
* verbose type check
*/
if(typeof readIndex !== "number" || typeof writeIndex !== "number")
throw new TypeError("Read index or write index is NOT A NUMBER.");
if (readStart < writeStart)
throw new RangeError("Read index MUST come before write index.");
if (readStart < 0 || writeStart < 0)
throw new RangeError("Negative value is passed as a file operation start index.");
const fd = await open(file, "r+");
let nextReadingDone = {
promise: null,
_emit: () => void 0
};
function advanceReadPosition(bytesRead) {
const emitDone = nextReadingDone._emit;
if (bytesRead > 0) {
readIndex += bytesRead;
nextReadingDone.promise = new Promise (
emit => nextReadingDone._emit = emit
);
} else {
// EOF
readIndex = Infinity;
}
return emitDone(bytesRead);
}
const readableStream = new Readable({
async read(size) {
try {
const buffer = Buffer.alloc(size);
const { bytesRead } = await fd.read(buffer, 0, size, readIndex);
advanceReadPosition(bytesRead);
/**
* the end-of-file is reached when the number of bytes read is zero
*/
if (bytesRead === 0)
return this.push(null);
this.push(buffer.slice(0, bytesRead));
} catch (err) {
this.destroy(err);
}
}
}).once("error", fd.close);
const writableStream = new Writable({
async write(chunk, encoding, callback) {
try {
/**
* Switch an existing stream into object mode is possible, though not safe.
*/
const toWrite = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk, encoding);
if(toWrite.length <= 0)
return callback();
let totalBytesWritten = 0;
while (totalBytesWritten < toWrite.length) {
const writeLength = Math.min (
/**
* Left available space.
* readIndex will become Infinity once EOF is reached
*/
readIndex - (writeIndex + totalBytesWritten),
/**
* length of bytes not written yet
*/
toWrite.length - totalBytesWritten
);
/**
* A rare case where readIndex - (writeIndex + totalBytesWritten)
* equals 0. This hardly happen as the read speed is much faster
* than the write speed.
*/
if (writeLength === 0) {
strictEqual(toWrite.length !== totalBytesWritten, true);
await nextReadingDone.promise;
continue;
}
const { bytesWritten } = await fd.write(toWrite, totalBytesWritten, writeLength, writeIndex + totalBytesWritten);
totalBytesWritten += bytesWritten;
}
writeIndex += toWrite.length;
return callback();
} catch (err) {
return callback(err);
}
},
final(callback) {
fd.truncate(writeIndex)
.then(fd.close)
.then(() => callback(), callback)
}
}).once("error", fd.close);
return {
fd,
readableStream,
writableStream
};
});