Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bump level db to 8.0.0 #4514

Merged
merged 4 commits into from
Sep 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 2 additions & 6 deletions packages/db/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,7 @@
"@lodestar/utils": "^1.0.0",
"@types/levelup": "^4.3.3",
"it-all": "^1.0.2",
"level": "^7.0.0",
"levelup": "^5.0.1"
"level": "^8.0.0"
},
"devDependencies": {
"@types/level": "^6.0.0",
"@types/leveldown": "^4.0.3"
}
"devDependencies": {}
}
164 changes: 70 additions & 94 deletions packages/db/src/controller/level.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import {LevelUp} from "levelup";
import level from "level";
import all from "it-all";
import {Level} from "level";
// eslint-disable-next-line import/no-extraneous-dependencies
import type {ClassicLevel} from "classic-level";
import {ILogger} from "@lodestar/utils";
import {DbReqOpts, IDatabaseController, IDatabaseOptions, IFilterOptions, IKeyValue} from "./interface.js";
import {ILevelDbControllerMetrics} from "./metrics.js";
Expand All @@ -10,8 +10,10 @@ enum Status {
stopped = "stopped",
}

type LevelNodeJS = ClassicLevel<Uint8Array, Uint8Array>;

export interface ILevelDBOptions extends IDatabaseOptions {
db?: LevelUp;
db?: Level<Uint8Array, Uint8Array>;
}

export type LevelDbControllerModules = {
Expand All @@ -26,7 +28,7 @@ const BUCKET_ID_UNKNOWN = "unknown";
*/
export class LevelDbController implements IDatabaseController<Uint8Array, Uint8Array> {
private status = Status.stopped;
private db: LevelUp;
private db: Level<Uint8Array, Uint8Array>;

private readonly opts: ILevelDBOptions;
private readonly logger: ILogger;
Expand All @@ -36,8 +38,7 @@ export class LevelDbController implements IDatabaseController<Uint8Array, Uint8A
this.opts = opts;
this.logger = logger;
this.metrics = metrics ?? null;
// eslint-disable-next-line @typescript-eslint/no-unsafe-call, @typescript-eslint/no-unsafe-assignment
this.db = opts.db || level(opts.name || "beaconchain", {keyEncoding: "binary", valueEncoding: "binary"});
this.db = opts.db || new Level(opts.name || "beaconchain", {keyEncoding: "binary", valueEncoding: "binary"});
}

async start(): Promise<void> {
Expand Down Expand Up @@ -74,138 +75,113 @@ export class LevelDbController implements IDatabaseController<Uint8Array, Uint8A
this.metrics?.dbReadItems.inc({bucket: opts?.bucketId ?? BUCKET_ID_UNKNOWN}, 1);
return (await this.db.get(key)) as Uint8Array | null;
} catch (e) {
if ((e as NotFoundError).notFound) {
if ((e as LevelDbError).code === "LEVEL_NOT_FOUND") {
return null;
}
throw e;
}
}

async put(key: Uint8Array, value: Uint8Array, opts?: DbReqOpts): Promise<void> {
put(key: Uint8Array, value: Uint8Array, opts?: DbReqOpts): Promise<void> {
this.metrics?.dbWriteReq.inc({bucket: opts?.bucketId ?? BUCKET_ID_UNKNOWN}, 1);
this.metrics?.dbWriteItems.inc({bucket: opts?.bucketId ?? BUCKET_ID_UNKNOWN}, 1);

await this.db.put(key, value);
return this.db.put(key, value);
}

async delete(key: Uint8Array, opts?: DbReqOpts): Promise<void> {
delete(key: Uint8Array, opts?: DbReqOpts): Promise<void> {
this.metrics?.dbWriteReq.inc({bucket: opts?.bucketId ?? BUCKET_ID_UNKNOWN}, 1);
this.metrics?.dbWriteItems.inc({bucket: opts?.bucketId ?? BUCKET_ID_UNKNOWN}, 1);

await this.db.del(key);
return this.db.del(key);
}

async batchPut(items: IKeyValue<Uint8Array, Uint8Array>[], opts?: DbReqOpts): Promise<void> {
batchPut(items: IKeyValue<Uint8Array, Uint8Array>[], opts?: DbReqOpts): Promise<void> {
this.metrics?.dbWriteReq.inc({bucket: opts?.bucketId ?? BUCKET_ID_UNKNOWN}, 1);
this.metrics?.dbWriteItems.inc({bucket: opts?.bucketId ?? BUCKET_ID_UNKNOWN}, items.length);

const batch = this.db.batch();
for (const item of items) batch.put(item.key, item.value);
await batch.write();
return this.db.batch(items.map((item) => ({type: "put", key: item.key, value: item.value})));
}

async batchDelete(keys: Uint8Array[], opts?: DbReqOpts): Promise<void> {
batchDelete(keys: Uint8Array[], opts?: DbReqOpts): Promise<void> {
this.metrics?.dbWriteReq.inc({bucket: opts?.bucketId ?? BUCKET_ID_UNKNOWN}, 1);
this.metrics?.dbWriteItems.inc({bucket: opts?.bucketId ?? BUCKET_ID_UNKNOWN}, keys.length);

const batch = this.db.batch();
for (const key of keys) batch.del(key);
await batch.write();
return this.db.batch(keys.map((key) => ({type: "del", key: key})));
}

keysStream(opts?: IFilterOptions<Uint8Array>): AsyncGenerator<Uint8Array> {
return this.iterator({keys: true, values: false}, (key) => key, opts);
keysStream(opts: IFilterOptions<Uint8Array> = {}): AsyncIterable<Uint8Array> {
return this.metricsIterator(this.db.keys(opts), (key) => key, opts.bucketId ?? BUCKET_ID_UNKNOWN);
}

valuesStream(opts?: IFilterOptions<Uint8Array>): AsyncGenerator<Uint8Array> {
return this.iterator({keys: false, values: true}, (_key, value) => value, opts);
valuesStream(opts: IFilterOptions<Uint8Array> = {}): AsyncIterable<Uint8Array> {
return this.metricsIterator(this.db.values(opts), (value) => value, opts.bucketId ?? BUCKET_ID_UNKNOWN);
}

entriesStream(opts?: IFilterOptions<Uint8Array>): AsyncGenerator<IKeyValue<Uint8Array, Uint8Array>> {
return this.iterator({keys: true, values: true}, (key, value) => ({key, value}), opts);
entriesStream(opts: IFilterOptions<Uint8Array> = {}): AsyncIterable<IKeyValue<Uint8Array, Uint8Array>> {
return this.metricsIterator(
this.db.iterator(opts),
(entry) => ({key: entry[0], value: entry[1]}),
opts.bucketId ?? BUCKET_ID_UNKNOWN
);
}

async keys(opts?: IFilterOptions<Uint8Array>): Promise<Uint8Array[]> {
return all(this.keysStream(opts));
keys(opts: IFilterOptions<Uint8Array> = {}): Promise<Uint8Array[]> {
return this.metricsAll(this.db.keys(opts).all(), opts.bucketId ?? BUCKET_ID_UNKNOWN);
}

async values(opts?: IFilterOptions<Uint8Array>): Promise<Uint8Array[]> {
return all(this.valuesStream(opts));
values(opts: IFilterOptions<Uint8Array> = {}): Promise<Uint8Array[]> {
return this.metricsAll(this.db.values(opts).all(), opts.bucketId ?? BUCKET_ID_UNKNOWN);
}

async entries(opts?: IFilterOptions<Uint8Array>): Promise<IKeyValue<Uint8Array, Uint8Array>[]> {
return all(this.entriesStream(opts));
async entries(opts: IFilterOptions<Uint8Array> = {}): Promise<IKeyValue<Uint8Array, Uint8Array>[]> {
const entries = await this.metricsAll(this.db.iterator(opts).all(), opts.bucketId ?? BUCKET_ID_UNKNOWN);
return entries.map((entry) => ({key: entry[0], value: entry[1]}));
}

/**
* Turn an abstract-leveldown iterator into an AsyncGenerator.
* Replaces https://github.com/Level/iterator-stream
*
* How to use:
* - Entries = { keys: true, values: true }
* - Keys = { keys: true, values: false }
* - Values = { keys: false, values: true }
* Get the approximate number of bytes of file system space used by the range [start..end).
* The result might not include recently written data.
*/
private async *iterator<T>(
keysOpts: StreamKeysOpts,
getValue: (key: Uint8Array, value: Uint8Array) => T,
opts?: IFilterOptions<Uint8Array>
): AsyncGenerator<T> {
this.metrics?.dbWriteReq.inc({bucket: opts?.bucketId ?? BUCKET_ID_UNKNOWN}, 1);
let itemsRead = 0;
approximateSize(start: Uint8Array, end: Uint8Array): Promise<number> {
return (this.db as LevelNodeJS).approximateSize(start, end);
}

// Entries = { keys: true, values: true }
// Keys = { keys: true, values: false }
// Values = { keys: false, values: true }
/**
* Manually trigger a database compaction in the range [start..end].
*/
compactRange(start: Uint8Array, end: Uint8Array): Promise<void> {
return (this.db as LevelNodeJS).compactRange(start, end);
}

const iterator = this.db.iterator({
...opts,
...keysOpts,
// TODO: Test if this is necessary. It's in https://github.com/Level/iterator-stream but may be stale
limit: opts?.limit ?? -1,
});
/** Capture metrics for db.iterator, db.keys, db.values .all() calls */
private async metricsAll<T>(promise: Promise<T[]>, bucket: string): Promise<T[]> {
this.metrics?.dbWriteReq.inc({bucket}, 1);
const items = await promise;
this.metrics?.dbWriteItems.inc({bucket}, items.length);
return items;
}

try {
while (true) {
const [key, value] = await new Promise<[Uint8Array, Uint8Array]>((resolve, reject) => {
iterator.next((err, key: Uint8Array, value: Uint8Array) => {
if (err) reject(err);
else resolve([key, value]);
});
});

// Source code justification of why this condition implies the stream is done
// https://github.com/Level/level-js/blob/e2253839a62fa969de50e9114279763228959d40/iterator.js#L123
if (key === undefined && value === undefined) {
return; // Done
}

// Count metrics after done condition
itemsRead++;

yield getValue(key, value);
}
} finally {
this.metrics?.dbWriteItems.inc(itemsRead);

// TODO: Should we await here?
await new Promise<void>((resolve, reject) => {
iterator.end((err) => {
if (err) reject(err);
else resolve();
});
});
/** Capture metrics for db.iterator, db.keys, db.values AsyncIterable calls */
private async *metricsIterator<T, K>(
iterator: AsyncIterable<T>,
getValue: (item: T) => K,
bucket: string
): AsyncIterable<K> {
this.metrics?.dbWriteReq.inc({bucket}, 1);

let itemsRead = 0;

for await (const item of iterator) {
// Count metrics after done condition
itemsRead++;

yield getValue(item);
}

this.metrics?.dbWriteItems.inc({bucket}, itemsRead);
}
}

type StreamKeysOpts = {
keys: boolean;
values: boolean;
};

/** From https://www.npmjs.com/package/level */
type NotFoundError = {
notFound: true;
type: "NotFoundError";
};
type LevelDbError = {code: "LEVEL_NOT_FOUND"};
36 changes: 36 additions & 0 deletions packages/db/test/unit/controller/level.test.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import {execSync} from "node:child_process";
import {expect} from "chai";
import leveldown from "leveldown";
import all from "it-all";
Expand All @@ -22,6 +23,11 @@ describe("LevelDB controller", () => {
});
});

it("test get not found", async () => {
const key = Buffer.from("not-existing-key");
expect(await db.get(key)).to.equal(null);
});

it("test put/get/delete", async () => {
const key = Buffer.from("test");
const value = Buffer.from("some value");
Expand Down Expand Up @@ -66,6 +72,7 @@ describe("LevelDB controller", () => {
await db.batchDelete([k1, k2]);
expect((await db.entries()).length).to.equal(0);
});

it("test entries", async () => {
const k1 = Buffer.from("test1");
const k2 = Buffer.from("test2");
Expand Down Expand Up @@ -106,4 +113,33 @@ describe("LevelDB controller", () => {
const result = await all(resultStream);
expect(result.length).to.be.equal(2);
});

it("test compactRange + approximateSize", async () => {
const indexes = Array.from({length: 100}, (_, i) => i);
const keys = indexes.map((i) => Buffer.from([i]));
const values = indexes.map((i) => Buffer.alloc(1000, i));
const minKey = Buffer.from([0x00]);
const maxKey = Buffer.from([0xff]);

await db.batchPut(keys.map((key, i) => ({key, value: values[i]})));
await db.batchDelete(keys);

const sizeBeforeCompact = getDbSize();
await db.compactRange(minKey, maxKey);
const sizeAfterCompact = getDbSize();

expect(sizeAfterCompact).lt(sizeBeforeCompact, "Expected sizeAfterCompact < sizeBeforeCompact");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how much disk space was saved with this test? I think we should expect some threshold here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not that familiar with level internals so I think it's dangeours to do assumptions here. LevelDB behaviour is not trivial


// approximateSize is not exact, just test a number is positive
const approxSize = await db.approximateSize(minKey, maxKey);
expect(approxSize).gt(0, "approximateSize return not > 0");
});

function getDbSize(): number {
// 116 ./.__testdb
const res = execSync(`du -bs ${dbLocation}`, {encoding: "utf8"});
const match = res.match(/^(\d+)/);
if (!match) throw Error(`Unknown du response \n${res}`);
return parseInt(match[1]);
}
});