Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
"@types/fs-extra": "^9.0.13",
"@types/jest": "^27.0.2",
"@types/mock-fs": "^4.13.1",
"@types/node": "^16.11.11",
"@types/node": "^12.20.39",
"@types/proper-lockfile": "^4.1.2",
"@typescript-eslint/eslint-plugin": "^4.33.0",
"@typescript-eslint/parser": "^4.33.0",
Expand Down Expand Up @@ -82,7 +82,8 @@
"lint": "npm run lint:ts",
"commit": "git-cz",
"release": "release-script",
"prepare": "husky install"
"prepare": "husky install",
"perf": "ts-node test/perf.ts"
},
"config": {
"commitizen": {
Expand Down
5 changes: 3 additions & 2 deletions src/lib/db.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -767,10 +767,11 @@ describe("lib/db", () => {

it("does not do anything while the DB is being closed", async () => {
db.set("key3", 3);
await db.compress(); // this writes the DB
db.delete("key2");
db.set("key3", 3.5);
const closePromise = db.close();
await db.compress();
const closePromise = db.close(); // this only appends the extra key3 line
await db.compress(); // this does not compress
await closePromise;

await expect(fs.readFile(testFilenameFull, "utf8")).resolves.toBe(
Expand Down
135 changes: 110 additions & 25 deletions src/lib/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,29 @@ export interface FsWriteOptions {
EOL?: string;
}

enum Operation {
Clear = 0,
Write = 1,
Delete = 2,
}

type LazyEntry<V extends unknown = unknown> = (
| {
op: Operation.Clear;
}
| {
op: Operation.Delete;
key: string;
}
| {
op: Operation.Write;
key: string;
value: V;
}
) & {
serialize(): string;
};

/**
* fsync on a directory ensures there are no rename operations etc. which haven't been persisted to disk.
*/
Expand Down Expand Up @@ -443,7 +466,7 @@ export class JsonlDB<V extends unknown = unknown> {
throw new Error("The database is not open!");
}
this._db.clear();
this.write("");
this.write(this.makeLazyClear());
}
public delete(key: string): boolean {
if (!this._isOpen) {
Expand All @@ -452,7 +475,7 @@ export class JsonlDB<V extends unknown = unknown> {
const ret = this._db.delete(key);
if (ret) {
// Something was deleted
this.write(this.entryToLine(key));
this.write(this.makeLazyDelete(key));
}
return ret;
}
Expand All @@ -461,7 +484,7 @@ export class JsonlDB<V extends unknown = unknown> {
throw new Error("The database is not open!");
}
this._db.set(key, value);
this.write(this.entryToLine(key, value));
this.write(this.makeLazyWrite(key, value));
return this;
}

Expand All @@ -488,7 +511,7 @@ export class JsonlDB<V extends unknown = unknown> {

for (const [key, value] of Object.entries(jsonOrFile)) {
this._db.set(key, value);
this.write(this.entryToLine(key, value), true);
this.write(this.makeLazyWrite(key, value), true);
}
}

Expand All @@ -502,8 +525,8 @@ export class JsonlDB<V extends unknown = unknown> {
return fs.writeJSON(filename, composeObject([...this._db]), options);
}

private updateStatistics(command: string): void {
if (command === "") {
private updateStatistics(entry: LazyEntry<V>): void {
if (entry.op === Operation.Clear) {
this._uncompressedSize = 0;
} else {
this._uncompressedSize++;
Expand Down Expand Up @@ -570,20 +593,20 @@ export class JsonlDB<V extends unknown = unknown> {
* Writes a line into the correct backlog
* @param noAutoCompress Whether auto-compression should be disabled
*/
private write(line: string, noAutoCompress: boolean = false): void {
private write(lazy: LazyEntry<V>, noAutoCompress: boolean = false): void {
/* istanbul ignore else */
if (this._compressBacklog && !this._compressBacklog.destroyed) {
// The compress backlog handling also handles the file statistics
this._compressBacklog.write(line);
this._compressBacklog.write(lazy);
} else if (this._writeBacklog && !this._writeBacklog.destroyed) {
// Update line statistics
this.updateStatistics(line);
this.updateStatistics(lazy);

// Either compress or write to the main file, never both
if (!noAutoCompress && this.needToCompress()) {
this.compress();
} else {
this._writeBacklog.write(line);
this._writeBacklog.write(lazy);
// If this is a throttled stream, uncork it as soon as the write
// buffer is larger than configured
if (
Expand All @@ -601,7 +624,7 @@ export class JsonlDB<V extends unknown = unknown> {
}
// If necessary, write to the dump backlog, so the dump doesn't miss any data
if (this._dumpBacklog && !this._dumpBacklog.destroyed) {
this._dumpBacklog.write(line);
this._dumpBacklog.write(lazy);
}
}

Expand All @@ -616,6 +639,48 @@ export class JsonlDB<V extends unknown = unknown> {
}
}

private makeLazyClear(): LazyEntry & { op: Operation.Clear } {
return {
op: Operation.Clear,

serialize:
/* istanbul ignore next - this is impossible to test since it requires exact timing */ () =>
"",
};
}

private makeLazyDelete(key: string): LazyEntry & { op: Operation.Delete } {
let serialized: string | undefined;
return {
op: Operation.Delete,
key,
serialize: () => {
if (serialized == undefined) {
serialized = this.entryToLine(key);
}
return serialized;
},
};
}

private makeLazyWrite(
key: string,
value: V,
): LazyEntry<V> & { op: Operation.Write } {
let serialized: string | undefined;
return {
op: Operation.Write,
key,
value,
serialize: () => {
if (serialized == undefined) {
serialized = this.entryToLine(key, value);
}
return serialized;
},
};
}

/**
* Saves a compressed copy of the DB into the given path.
* @param targetFilename Where the compressed copy should be written. Default: `<filename>.dump`
Expand All @@ -635,13 +700,14 @@ export class JsonlDB<V extends unknown = unknown> {
for (const [key, value] of entries) {
await fs.appendFile(
this._dumpFd,
// No need to serialize lazily here
this.entryToLine(key, value) + "\n",
);
}
// In case there is any data in the backlog stream, persist that too
let line: string;
while (null !== (line = this._dumpBacklog.read())) {
await fs.appendFile(this._dumpFd, line + "\n");
let lazy: LazyEntry<V>;
while (null !== (lazy = this._dumpBacklog.read())) {
await fs.appendFile(this._dumpFd, lazy.serialize() + "\n");
}
this._dumpBacklog.destroy();
this._dumpBacklog = undefined;
Expand All @@ -665,16 +731,35 @@ export class JsonlDB<V extends unknown = unknown> {
// Open the file for appending and reading
this._fd = await fs.open(this.filename, "a+");
this._openPromise?.resolve();
// The chunk map is used to buffer all entries that are currently waiting in line
// so we avoid serializing redundant entries. When the write backlog is throttled,
// the chunk map will only be used for a short time.
const chunk = new Map<string, LazyEntry>();
for await (const action of this
._writeBacklog as AsyncIterable<string>) {
if (action === "") {
// Since we opened the file in append mode, we cannot truncate
// therefore close and open in write mode again
await fs.close(this._fd);
this._fd = await fs.open(this.filename, "w+");
._writeBacklog as AsyncIterable<LazyEntry>) {
if (action.op === Operation.Clear) {
chunk.clear();
chunk.set("", action);
} else {
await fs.appendFile(this._fd, action + "\n");
// Only remember the last entry for each key
chunk.set(action.key, action);
}

// When the backlog has been drained, perform the necessary write actions
if (this._writeBacklog.readableLength === 0) {
for (const entry of chunk.values()) {
if (entry.op === Operation.Clear) {
// Since we opened the file in append mode, we cannot truncate
// therefore close and open in write mode again
await fs.close(this._fd);
this._fd = await fs.open(this.filename, "w+");
} else {
await fs.appendFile(this._fd, entry.serialize() + "\n");
}
}
chunk.clear();
}

// When this is a throttled stream, auto-cork it when it was drained
if (this._writeBacklog.readableLength === 0 && this._isOpen) {
this.autoCork();
Expand Down Expand Up @@ -737,10 +822,10 @@ export class JsonlDB<V extends unknown = unknown> {
}

// In case there is any data in the backlog stream, persist that too
let line: string;
while (null !== (line = this._compressBacklog.read())) {
this.updateStatistics(line);
this._writeBacklog!.write(line);
let lazy: LazyEntry<V>;
while (null !== (lazy = this._compressBacklog.read())) {
this.updateStatistics(lazy);
this._writeBacklog!.write(lazy);
}
this._compressBacklog.destroy();
this._compressBacklog = undefined;
Expand Down
84 changes: 57 additions & 27 deletions test/perf.ts
Original file line number Diff line number Diff line change
@@ -1,42 +1,72 @@
import { padStart } from "alcalzone-shared/strings";
import fs from "fs-extra";
import { JsonlDB } from "../src";

process.on("unhandledRejection", (r) => {
debugger;
});

const testDB: JsonlDB<any> = new JsonlDB("test.jsonl", {
autoCompress: { onClose: false },
throttleFS: {
intervalMs: 10000,
intervalMs: 1000,
},
});

(async () => {
// await testDB.open();
// // add a shitton of values
// console.time("create values");
// const MAX_NODES = 100;
// for (let pass = 1; pass <= 10; pass++) {
// for (let nodeId = 1; nodeId <= MAX_NODES; nodeId++) {
// for (let ccId = 1; ccId <= 100; ccId++) {
// for (let endpoint = 0; endpoint <= 10; endpoint++) {
// for (const property of ["a", "b", "c", "d", "e"]) {
// const key = `${nodeId}-${ccId}-${endpoint}-${property}`;
// if (Math.random() < 0.15) {
// testDB.delete(key);
// } else {
// testDB.set(key, Math.random() * 100);
// }
// }
// }
// }
// }
// }
// add a shitton of values
const NUM_PASSES = 10;
const NUM_KEYS = 1000;
const NUM_CHANGES = 100000;
let total: number = 0;

// console.time("open");
// console.timeEnd("open");

for (let pass = 1; pass <= NUM_PASSES; pass++) {
await fs.remove("test.jsonl");

await testDB.open();

console.log(`start ${pass}`);

const start = Date.now();
for (let i = 0; i < NUM_CHANGES; i++) {
const key = `k${padStart(
Math.round(Math.random() * NUM_KEYS).toString(),
5,
"0",
)}`;
if (Math.random() < 0.15) {
testDB.delete(key);
} else {
testDB.set(key, Math.random() * 100);
}
}
console.log("close");
await testDB.close();

const time = Date.now() - start;
total += time;

console.log(`end ${pass}`);
}
// console.time("close");
// await testDB.close();
// console.timeEnd("create values");
// console.timeEnd("close");

console.log(`${NUM_PASSES}x, ${NUM_KEYS} keys, ${NUM_CHANGES} changes`);
console.log(` ${(total / NUM_PASSES).toFixed(2)} ms / attempt`);
console.log(` ${((NUM_CHANGES / total) * 1000).toFixed(2)} changes/s`);

console.time("open values");
await testDB.open();
console.log(testDB.size);
console.timeEnd("open values");
process.exit(0);

await testDB.close();
// console.time("open values");
// await testDB.open();
// console.log(testDB.size);
// console.timeEnd("open values");

// await testDB.close();

// await fs.remove("test.jsonl");
})().catch(() => {
Expand Down