From c7bb6ee54b2b77966df3193a27eb2b547bb3f3ed Mon Sep 17 00:00:00 2001 From: Evan Hahn Date: Fri, 5 Apr 2024 02:18:27 +0000 Subject: [PATCH 1/4] feat: reject if closed state is incorrect Some methods, like `addCore()`, now throw if called after calling `close()`. `unlink()` is the opposite, and should only be called after the indexer is closed. This is arguably a breaking change, but I feel that changing undefined behavior is not breaking. --- README.md | 15 ++++- index.js | 111 ++++++++++++++++++++++++++------ lib/types.ts | 2 +- lib/utils.js | 10 +++ test/multi-core-indexer.test.js | 85 +++++++++++++++++++++++- test/unit-tests/utils.test.js | 19 ++++++ 6 files changed, 218 insertions(+), 24 deletions(-) create mode 100644 test/unit-tests/utils.test.js diff --git a/README.md b/README.md index 498f9b0..d1568dd 100644 --- a/README.md +++ b/README.md @@ -125,7 +125,7 @@ be the byte length of all the blocks in the batch. If the value encoding is ### indexer.state -Type: `IndexState: { current: 'idle' | 'indexing', remaining: number, entriesPerSecond: number }` +Type: `IndexState: { current: 'idle' | 'indexing' | 'closing' | 'closed', remaining: number, entriesPerSecond: number }` A getter that returns the current `IndexState`, the same as the value emitted by the `index-state` event. This getter is useful for checking the state of the indexer before it has emitted any events. @@ -139,6 +139,15 @@ Type: `Hypercore` Add a hypercore to the indexer. Must have the same value encoding as other hypercores already in the indexer. +Rejects if called after the indexer is closed. + +### indexer.idle() + +Resolves when indexing state is `'idle'`. + +Resolves if the indexer is closed before this resolves. Rejects if called +after the indexer is closed. + ### indexer.close() Stop the indexer and flush index state to storage. This will not close the @@ -146,7 +155,9 @@ underlying storage - it is up to the consumer to do that. ### indexer.unlink() -Unlink all index files. This should only be called after `close()` has resolved. +Unlink all index files. + +This should only be called after `close()` has resolved, and rejects if not. ### indexer.on('index-state', onState) diff --git a/index.js b/index.js index bb669a0..5e501bd 100644 --- a/index.js +++ b/index.js @@ -5,7 +5,7 @@ const { once } = require('events') const raf = require('random-access-file') const { CoreIndexStream } = require('./lib/core-index-stream') const { MultiCoreIndexStream } = require('./lib/multi-core-index-stream') -const { pDefer } = require('./lib/utils.js') +const { pDefer, ExhaustivenessError } = require('./lib/utils.js') const DEFAULT_BATCH_SIZE = 100 // The indexing rate (in entries per second) is calculated as an exponential @@ -86,18 +86,27 @@ class MultiCoreIndexer extends TypedEmitter { } /** - * Add a core to be indexed + * Add a hypercore to the indexer. Must have the same value encoding as other + * hypercores already in the indexer. + * + * Rejects if called after the indexer is closed. + * * @param {import('hypercore')} core */ addCore(core) { + this.#assertOpen('Cannot add core after closing') const coreIndexStream = new CoreIndexStream(core, this.#createStorage) this.#indexStream.addStream(coreIndexStream) } /** - * Resolves when indexing state is 'idle' + * Resolves when indexing state is 'idle'. + * + * Resolves if the indexer is closed before this resolves. Rejects if called + * after the indexer is closed. */ async idle() { + this.#assertOpen('Cannot await idle after closing') if (this.#getState().current === 'idle') return if (!this.#pendingIdle) { this.#pendingIdle = pDefer() @@ -105,7 +114,15 @@ class MultiCoreIndexer extends TypedEmitter { return this.#pendingIdle.promise } + /** + * Stop the indexer and flush index state to storage. This will not close the + * underlying storage - it is up to the consumer to do that. + * + * Rejects if called more than once. + */ async close() { + this.#assertOpen('Cannot double-close') + this.#state = 'closing' this.#indexStream.off('indexing', this.#emitStateBound) this.#indexStream.off('drained', this.#emitStateBound) this.#writeStream.destroy() @@ -114,13 +131,42 @@ class MultiCoreIndexer extends TypedEmitter { once(this.#indexStream, 'close'), once(this.#writeStream, 'close'), ]) + this.#pendingIdle?.resolve() + this.#state = 'closed' } /** - * Unlink all index files. This should only be called after `close()` has resolved. + * Unlink all index files. + * + * This should only be called after `close()` has resolved, and rejects if not. */ async unlink() { - await this.#indexStream.unlink() + switch (this.#state) { + case 'idle': + case 'indexing': + case 'closing': + throw new Error('Cannot unlink until fully closed') + case 'closed': + return this.#indexStream.unlink() + /* c8 ignore next 2 */ + default: + throw new ExhaustivenessError(this.#state) + } + } + + /** @param {string} message */ + #assertOpen(message) { + switch (this.#state) { + case 'idle': + case 'indexing': + return + case 'closing': + case 'closed': + throw new Error(message) + /* c8 ignore next 2 */ + default: + throw new ExhaustivenessError(this.#state) + } } /** @param {Entry[]} entries */ @@ -146,28 +192,55 @@ class MultiCoreIndexer extends TypedEmitter { #emitState() { const state = this.#getState() - if (state.current !== this.#prevEmittedState?.current) { - this.emit(state.current) - } - // Only emit if remaining has changed (which infers that state.current has changed) - if (state.remaining !== this.#prevEmittedState?.remaining) { - this.emit('index-state', state) + switch (state.current) { + case 'idle': + case 'indexing': + if (state.current !== this.#prevEmittedState?.current) { + this.emit(state.current) + } + // Only emit if remaining has changed (which infers that state.current has changed) + if (state.remaining !== this.#prevEmittedState?.remaining) { + this.emit('index-state', state) + } + this.#prevEmittedState = state + break + /* c8 ignore next 3 */ + case 'closing': + case 'closed': + break + /* c8 ignore next 2 */ + default: + throw new ExhaustivenessError(state.current) } - this.#prevEmittedState = state } + /** @returns {IndexState} */ #getState() { const remaining = this.#indexStream.remaining const drained = this.#indexStream.drained const prevState = this.#state - this.#state = remaining === 0 && drained ? 'idle' : 'indexing' - if (this.#state === 'idle' && this.#pendingIdle) { - this.#pendingIdle.resolve() - this.#pendingIdle = undefined - } - if (this.#state === 'indexing' && prevState === 'idle') { - this.#rateMeasurementStart = Date.now() + + switch (this.#state) { + case 'idle': + case 'indexing': { + this.#state = remaining === 0 && drained ? 'idle' : 'indexing' + if (this.#state === 'idle' && this.#pendingIdle) { + this.#pendingIdle.resolve() + this.#pendingIdle = undefined + } + if (this.#state === 'indexing' && prevState === 'idle') { + this.#rateMeasurementStart = Date.now() + } + break + } + case 'closing': + case 'closed': + break + /* c8 ignore next 2 */ + default: + throw new ExhaustivenessError(this.#state) } + return { current: this.#state, remaining, diff --git a/lib/types.ts b/lib/types.ts index a6d7e18..672017a 100644 --- a/lib/types.ts +++ b/lib/types.ts @@ -1,6 +1,6 @@ import { ReadableEvents } from 'streamx' -export type IndexStateCurrent = 'idle' | 'indexing' +export type IndexStateCurrent = 'idle' | 'indexing' | 'closing' | 'closed' export interface IndexState { current: IndexStateCurrent diff --git a/lib/utils.js b/lib/utils.js index 88b8d90..e89ed61 100644 --- a/lib/utils.js +++ b/lib/utils.js @@ -12,3 +12,13 @@ function pDefer() { return deferred } exports.pDefer = pDefer + +class ExhaustivenessError extends Error { + /** @param {never} value */ + constructor(value) { + /* c8 ignore next 3 */ + super(`Exhaustiveness check failed. ${value} should be impossible`) + this.name = 'ExhaustivenessError' + } +} +exports.ExhaustivenessError = ExhaustivenessError diff --git a/test/multi-core-indexer.test.js b/test/multi-core-indexer.test.js index d3f9769..6d19638 100644 --- a/test/multi-core-indexer.test.js +++ b/test/multi-core-indexer.test.js @@ -95,7 +95,7 @@ test('Indexes items appended after initial index', async () => { await indexer.close() }) -test('No cores, starts idle, indexing after core added', async () => { +test('State transitions', async () => { const indexer = new MultiCoreIndexer([], { batch: async () => {}, storage: () => new ram(), @@ -106,7 +106,19 @@ test('No cores, starts idle, indexing after core added', async () => { indexer.addCore(core) assert.equal(indexer.state.current, 'indexing', 'indexing after core added') await indexer.idle() - await indexer.close() + assert.equal(indexer.state.current, 'idle', 'returns to an idle state') + const closePromise = indexer.close() + assert.equal( + indexer.state.current, + 'closing', + 'moves to a "closing" state immediately after calling close' + ) + await closePromise + assert.equal( + indexer.state.current, + 'closed', + 'moves to a "closed" state after closing' + ) }) test('Calling idle() when already idle still resolves', async () => { @@ -541,6 +553,75 @@ test('Closing before batch complete should resume on next start', async () => { await indexer2.close() }) +test('closing causes many methods to fail', async (t) => { + { + const indexer = new MultiCoreIndexer([], { + batch: async () => {}, + storage: () => new ram(), + }) + const closePromise = indexer.close() + t.after(() => closePromise) + const core = await create() + assert.throws(() => indexer.addCore(core)) + } + + { + const indexer = new MultiCoreIndexer([], { + batch: async () => {}, + storage: () => new ram(), + }) + const closePromise = indexer.close() + t.after(() => closePromise) + await assert.rejects(() => indexer.idle()) + } + + { + const indexer = new MultiCoreIndexer([], { + batch: async () => {}, + storage: () => new ram(), + }) + const closePromise = indexer.close() + t.after(() => closePromise) + await assert.rejects(() => indexer.close()) + } +}) + +test('closing resolves existing idle promises', async () => { + const indexer = new MultiCoreIndexer([], { + batch: async () => {}, + storage: () => new ram(), + }) + + const core = await create() + indexer.addCore(core) + + const idlePromises = [indexer.idle(), indexer.idle(), indexer.idle()] + + await indexer.close() + + await assert.doesNotReject(() => Promise.all(idlePromises)) +}) + +test('unlinking requires the indexer to be closed', async () => { + const indexer = new MultiCoreIndexer([], { + batch: async () => {}, + storage: () => new ram(), + }) + + await indexer.idle() + await assert.rejects(() => indexer.unlink(), 'rejects when idle') + + const core = await create() + indexer.addCore(core) + await assert.rejects(() => indexer.unlink(), 'rejects when indexing') + + const closePromise = indexer.close() + await assert.rejects(() => indexer.unlink(), 'rejects when closing') + + await closePromise + await assert.doesNotReject(() => indexer.unlink()) +}) + // This checks that storage names do not change between versions, which would be a breaking change test('Consistent storage folders', async () => { const storageNames = [] diff --git a/test/unit-tests/utils.test.js b/test/unit-tests/utils.test.js new file mode 100644 index 0000000..2e961b6 --- /dev/null +++ b/test/unit-tests/utils.test.js @@ -0,0 +1,19 @@ +// @ts-check +const test = require('node:test') +const assert = require('node:assert/strict') +const { ExhaustivenessError } = require('../../lib/utils.js') + +test('ExhaustivenessError', () => { + const bools = [true, false] + assert.doesNotThrow(() => { + bools.forEach((bool) => { + switch (bool) { + case true: + case false: + break + default: + throw new ExhaustivenessError(bool) + } + }) + }) +}) From 56564f62b92980cc07142da3f4f40dcaebf16fb7 Mon Sep 17 00:00:00 2001 From: Evan Hahn Date: Mon, 15 Apr 2024 21:32:59 +0000 Subject: [PATCH 2/4] Double-closing is a no-op --- index.js | 15 ++++++++++----- test/multi-core-indexer.test.js | 21 +++++++++++---------- 2 files changed, 21 insertions(+), 15 deletions(-) diff --git a/index.js b/index.js index 5e501bd..bc0cc43 100644 --- a/index.js +++ b/index.js @@ -121,7 +121,7 @@ class MultiCoreIndexer extends TypedEmitter { * Rejects if called more than once. */ async close() { - this.#assertOpen('Cannot double-close') + if (!this.#isOpen()) return this.#state = 'closing' this.#indexStream.off('indexing', this.#emitStateBound) this.#indexStream.off('drained', this.#emitStateBound) @@ -154,21 +154,26 @@ class MultiCoreIndexer extends TypedEmitter { } } - /** @param {string} message */ - #assertOpen(message) { + /** @returns {boolean} */ + #isOpen() { switch (this.#state) { case 'idle': case 'indexing': - return + return true case 'closing': case 'closed': - throw new Error(message) + return false /* c8 ignore next 2 */ default: throw new ExhaustivenessError(this.#state) } } + /** @param {string} message */ + #assertOpen(message) { + if (!this.#isOpen()) throw new Error(message) + } + /** @param {Entry[]} entries */ async #handleEntries(entries) { this.#emitState() diff --git a/test/multi-core-indexer.test.js b/test/multi-core-indexer.test.js index 6d19638..728083d 100644 --- a/test/multi-core-indexer.test.js +++ b/test/multi-core-indexer.test.js @@ -553,6 +553,17 @@ test('Closing before batch complete should resume on next start', async () => { await indexer2.close() }) +test('double-closing is a no-op', async (t) => { + const indexer = new MultiCoreIndexer([], { + batch: async () => {}, + storage: () => new ram(), + }) + const closePromise = indexer.close() + t.after(() => closePromise) + + await assert.doesNotReject(() => indexer.close()) +}) + test('closing causes many methods to fail', async (t) => { { const indexer = new MultiCoreIndexer([], { @@ -574,16 +585,6 @@ test('closing causes many methods to fail', async (t) => { t.after(() => closePromise) await assert.rejects(() => indexer.idle()) } - - { - const indexer = new MultiCoreIndexer([], { - batch: async () => {}, - storage: () => new ram(), - }) - const closePromise = indexer.close() - t.after(() => closePromise) - await assert.rejects(() => indexer.close()) - } }) test('closing resolves existing idle promises', async () => { From bc566d8fb69bbf73f840b9f7144ec7eeeed4b368 Mon Sep 17 00:00:00 2001 From: Evan Hahn Date: Mon, 15 Apr 2024 21:33:19 +0000 Subject: [PATCH 3/4] Fix code coverage for ExhaustivenessError --- lib/utils.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/utils.js b/lib/utils.js index e89ed61..85297cb 100644 --- a/lib/utils.js +++ b/lib/utils.js @@ -14,9 +14,9 @@ function pDefer() { exports.pDefer = pDefer class ExhaustivenessError extends Error { + /* c8 ignore next 5 */ /** @param {never} value */ constructor(value) { - /* c8 ignore next 3 */ super(`Exhaustiveness check failed. ${value} should be impossible`) this.name = 'ExhaustivenessError' } From d32e778d04e524ef18805cdede7dbfb78d4ac966 Mon Sep 17 00:00:00 2001 From: Evan Hahn Date: Mon, 15 Apr 2024 21:34:36 +0000 Subject: [PATCH 4/4] Update docs for .close() --- README.md | 2 ++ index.js | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index d1568dd..a7acfb4 100644 --- a/README.md +++ b/README.md @@ -153,6 +153,8 @@ after the indexer is closed. Stop the indexer and flush index state to storage. This will not close the underlying storage - it is up to the consumer to do that. +No-op if called more than once. + ### indexer.unlink() Unlink all index files. diff --git a/index.js b/index.js index bc0cc43..3eb0ed3 100644 --- a/index.js +++ b/index.js @@ -118,7 +118,7 @@ class MultiCoreIndexer extends TypedEmitter { * Stop the indexer and flush index state to storage. This will not close the * underlying storage - it is up to the consumer to do that. * - * Rejects if called more than once. + * No-op if called more than once. */ async close() { if (!this.#isOpen()) return