-
Notifications
You must be signed in to change notification settings - Fork 0
/
LogReader.ts
87 lines (87 loc) · 3.25 KB
/
LogReader.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
/**
* Copyright (c) 2018-present, heineiuo.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/
import { assert } from "./DBHelper.ts";
import { Buffer } from "./Buffer.ts";
import Slice from "./Slice.ts";
import { Record, kBlockSize, RecordType, kHeaderSize } from "./LogFormat.ts";
import { FileHandle } from "./Env.ts";
export default class LogReader {
constructor(file: FileHandle) {
this._file = file;
}
_file: FileHandle;
async close(): Promise<void> {
if (!!this._file) {
const file = this._file;
delete this._file;
try {
await file.close();
}
catch (e) { }
}
}
async *iterator(): AsyncIterableIterator<Slice> {
const buf: Buffer = Buffer.fromUnknown(new ArrayBuffer(kBlockSize));
let blockIndex = -1;
let latestOpBuf = Buffer.alloc(0);
let latestType = null;
let bufHandledPosition = 0;
while (true) {
// read file fragment to `buf`
if (blockIndex === -1 || bufHandledPosition >= kBlockSize - kHeaderSize) {
const position = ++blockIndex * kBlockSize;
const { bytesRead } = await this._file.read(buf, 0, kBlockSize, position);
if (bytesRead === 0) {
await this.close();
return;
}
bufHandledPosition = 0;
continue;
}
// buf may be re-fill, to avoid this, copy it
const record = this.readPhysicalRecord(Buffer.fromUnknown(buf.slice(bufHandledPosition)));
bufHandledPosition += record.length + kHeaderSize;
if (record.type === RecordType.kFullType) {
const opSlice = new Slice(record.data.buffer);
yield opSlice;
}
else if (record.type === RecordType.kLastType) {
assert(latestType !== RecordType.kLastType);
latestOpBuf = Buffer.concat([latestOpBuf, record.data.buffer]);
const opSlice = new Slice(latestOpBuf);
latestOpBuf = Buffer.alloc(0);
yield opSlice;
}
else if (record.type === RecordType.kFirstType) {
assert(latestType !== RecordType.kFirstType);
latestOpBuf = record.data.buffer;
}
else if (record.type === RecordType.kMiddleType) {
latestOpBuf = Buffer.concat([latestOpBuf, record.data.buffer]);
}
else if (record.type === RecordType.kZeroType) {
// skip this block
latestType = record.type;
bufHandledPosition = kBlockSize;
}
latestType = record.type;
}
}
private readPhysicalRecord(buf: Buffer): Record {
const head = buf.slice(0, kHeaderSize);
const recordType = head[6];
const head4 = head[4] & 0xff;
const head5 = head[5] & 0xff;
const length = head4 | (head5 << 8);
const data = new Slice(buf.slice(kHeaderSize, kHeaderSize + length));
return {
length,
data,
type: recordType,
};
}
}