Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: reject if closed state is incorrect #42

Merged
merged 4 commits into from
Apr 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ be the byte length of all the blocks in the batch. If the value encoding is

### indexer.state

Type: `IndexState: { current: 'idle' | 'indexing', remaining: number, entriesPerSecond: number }`
Type: `IndexState: { current: 'idle' | 'indexing' | 'closing' | 'closed', remaining: number, entriesPerSecond: number }`

A getter that returns the current `IndexState`, the same as the value emitted by the `index-state` event. This getter is useful for checking the state of the indexer before it has emitted any events.

Expand All @@ -139,14 +139,27 @@ Type: `Hypercore`
Add a hypercore to the indexer. Must have the same value encoding as other
hypercores already in the indexer.

Rejects if called after the indexer is closed.

### indexer.idle()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method was previously undocumented. Because it has a behavior change, I documented it here.


Resolves when indexing state is `'idle'`.

Resolves if the indexer is closed before this resolves. Rejects if called
after the indexer is closed.

### indexer.close()

Stop the indexer and flush index state to storage. This will not close the
underlying storage - it is up to the consumer to do that.

No-op if called more than once.

### indexer.unlink()

Unlink all index files. This should only be called after `close()` has resolved.
Unlink all index files.

This should only be called after `close()` has resolved, and rejects if not.

### indexer.on('index-state', onState)

Expand Down
116 changes: 97 additions & 19 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ const { once } = require('events')
const raf = require('random-access-file')
const { CoreIndexStream } = require('./lib/core-index-stream')
const { MultiCoreIndexStream } = require('./lib/multi-core-index-stream')
const { pDefer } = require('./lib/utils.js')
const { pDefer, ExhaustivenessError } = require('./lib/utils.js')

const DEFAULT_BATCH_SIZE = 100
// The indexing rate (in entries per second) is calculated as an exponential
Expand Down Expand Up @@ -86,26 +86,43 @@ class MultiCoreIndexer extends TypedEmitter {
}

/**
* Add a core to be indexed
* Add a hypercore to the indexer. Must have the same value encoding as other
* hypercores already in the indexer.
*
* Rejects if called after the indexer is closed.
*
* @param {import('hypercore')<T, any>} core
*/
addCore(core) {
this.#assertOpen('Cannot add core after closing')
const coreIndexStream = new CoreIndexStream(core, this.#createStorage)
this.#indexStream.addStream(coreIndexStream)
}

/**
* Resolves when indexing state is 'idle'
* Resolves when indexing state is 'idle'.
*
* Resolves if the indexer is closed before this resolves. Rejects if called
* after the indexer is closed.
EvanHahn marked this conversation as resolved.
Show resolved Hide resolved
*/
async idle() {
this.#assertOpen('Cannot await idle after closing')
if (this.#getState().current === 'idle') return
if (!this.#pendingIdle) {
this.#pendingIdle = pDefer()
}
return this.#pendingIdle.promise
}

/**
* Stop the indexer and flush index state to storage. This will not close the
* underlying storage - it is up to the consumer to do that.
*
* No-op if called more than once.
*/
async close() {
if (!this.#isOpen()) return
this.#state = 'closing'
this.#indexStream.off('indexing', this.#emitStateBound)
this.#indexStream.off('drained', this.#emitStateBound)
this.#writeStream.destroy()
Expand All @@ -114,13 +131,47 @@ class MultiCoreIndexer extends TypedEmitter {
once(this.#indexStream, 'close'),
once(this.#writeStream, 'close'),
])
this.#pendingIdle?.resolve()
this.#state = 'closed'
}

/**
* Unlink all index files. This should only be called after `close()` has resolved.
* Unlink all index files.
*
* This should only be called after `close()` has resolved, and rejects if not.
*/
async unlink() {
await this.#indexStream.unlink()
switch (this.#state) {
case 'idle':
case 'indexing':
case 'closing':
throw new Error('Cannot unlink until fully closed')
case 'closed':
return this.#indexStream.unlink()
/* c8 ignore next 2 */
default:
throw new ExhaustivenessError(this.#state)
}
}

/** @returns {boolean} */
#isOpen() {
switch (this.#state) {
case 'idle':
case 'indexing':
return true
case 'closing':
case 'closed':
return false
/* c8 ignore next 2 */
default:
throw new ExhaustivenessError(this.#state)
}
}

/** @param {string} message */
#assertOpen(message) {
if (!this.#isOpen()) throw new Error(message)
}

/** @param {Entry<T>[]} entries */
Expand All @@ -146,28 +197,55 @@ class MultiCoreIndexer extends TypedEmitter {

#emitState() {
const state = this.#getState()
if (state.current !== this.#prevEmittedState?.current) {
this.emit(state.current)
switch (state.current) {
case 'idle':
case 'indexing':
if (state.current !== this.#prevEmittedState?.current) {
this.emit(state.current)
}
// Only emit if remaining has changed (which infers that state.current has changed)
if (state.remaining !== this.#prevEmittedState?.remaining) {
this.emit('index-state', state)
}
this.#prevEmittedState = state
break
/* c8 ignore next 3 */
case 'closing':
case 'closed':
break
/* c8 ignore next 2 */
default:
throw new ExhaustivenessError(state.current)
}
// Only emit if remaining has changed (which infers that state.current has changed)
if (state.remaining !== this.#prevEmittedState?.remaining) {
this.emit('index-state', state)
}
this.#prevEmittedState = state
}

/** @returns {IndexState} */
#getState() {
const remaining = this.#indexStream.remaining
const drained = this.#indexStream.drained
const prevState = this.#state
this.#state = remaining === 0 && drained ? 'idle' : 'indexing'
if (this.#state === 'idle' && this.#pendingIdle) {
this.#pendingIdle.resolve()
this.#pendingIdle = undefined
}
if (this.#state === 'indexing' && prevState === 'idle') {
this.#rateMeasurementStart = Date.now()

switch (this.#state) {
case 'idle':
case 'indexing': {
this.#state = remaining === 0 && drained ? 'idle' : 'indexing'
if (this.#state === 'idle' && this.#pendingIdle) {
this.#pendingIdle.resolve()
this.#pendingIdle = undefined
}
if (this.#state === 'indexing' && prevState === 'idle') {
this.#rateMeasurementStart = Date.now()
}
break
}
case 'closing':
case 'closed':
break
/* c8 ignore next 2 */
default:
throw new ExhaustivenessError(this.#state)
}

return {
current: this.#state,
remaining,
Expand Down
2 changes: 1 addition & 1 deletion lib/types.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { ReadableEvents } from 'streamx'

export type IndexStateCurrent = 'idle' | 'indexing'
export type IndexStateCurrent = 'idle' | 'indexing' | 'closing' | 'closed'

export interface IndexState {
current: IndexStateCurrent
Expand Down
10 changes: 10 additions & 0 deletions lib/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,13 @@ function pDefer() {
return deferred
}
exports.pDefer = pDefer

class ExhaustivenessError extends Error {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy-pasted from mapeo-core-next.

/* c8 ignore next 5 */
/** @param {never} value */
constructor(value) {
super(`Exhaustiveness check failed. ${value} should be impossible`)
this.name = 'ExhaustivenessError'
}
}
exports.ExhaustivenessError = ExhaustivenessError
86 changes: 84 additions & 2 deletions test/multi-core-indexer.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ test('Indexes items appended after initial index', async () => {
await indexer.close()
})

test('No cores, starts idle, indexing after core added', async () => {
test('State transitions', async () => {
const indexer = new MultiCoreIndexer([], {
batch: async () => {},
storage: () => new ram(),
Expand All @@ -106,7 +106,19 @@ test('No cores, starts idle, indexing after core added', async () => {
indexer.addCore(core)
assert.equal(indexer.state.current, 'indexing', 'indexing after core added')
await indexer.idle()
await indexer.close()
assert.equal(indexer.state.current, 'idle', 'returns to an idle state')
const closePromise = indexer.close()
assert.equal(
indexer.state.current,
'closing',
'moves to a "closing" state immediately after calling close'
)
await closePromise
assert.equal(
indexer.state.current,
'closed',
'moves to a "closed" state after closing'
)
})

test('Calling idle() when already idle still resolves', async () => {
Expand Down Expand Up @@ -541,6 +553,76 @@ test('Closing before batch complete should resume on next start', async () => {
await indexer2.close()
})

test('double-closing is a no-op', async (t) => {
const indexer = new MultiCoreIndexer([], {
batch: async () => {},
storage: () => new ram(),
})
const closePromise = indexer.close()
t.after(() => closePromise)

await assert.doesNotReject(() => indexer.close())
})

test('closing causes many methods to fail', async (t) => {
{
const indexer = new MultiCoreIndexer([], {
batch: async () => {},
storage: () => new ram(),
})
const closePromise = indexer.close()
t.after(() => closePromise)
const core = await create()
assert.throws(() => indexer.addCore(core))
}

{
const indexer = new MultiCoreIndexer([], {
batch: async () => {},
storage: () => new ram(),
})
const closePromise = indexer.close()
t.after(() => closePromise)
await assert.rejects(() => indexer.idle())
}
})

test('closing resolves existing idle promises', async () => {
const indexer = new MultiCoreIndexer([], {
batch: async () => {},
storage: () => new ram(),
})

const core = await create()
indexer.addCore(core)

const idlePromises = [indexer.idle(), indexer.idle(), indexer.idle()]

await indexer.close()

await assert.doesNotReject(() => Promise.all(idlePromises))
})

test('unlinking requires the indexer to be closed', async () => {
const indexer = new MultiCoreIndexer([], {
batch: async () => {},
storage: () => new ram(),
})

await indexer.idle()
await assert.rejects(() => indexer.unlink(), 'rejects when idle')

const core = await create()
indexer.addCore(core)
await assert.rejects(() => indexer.unlink(), 'rejects when indexing')

const closePromise = indexer.close()
await assert.rejects(() => indexer.unlink(), 'rejects when closing')

await closePromise
await assert.doesNotReject(() => indexer.unlink())
})

// This checks that storage names do not change between versions, which would be a breaking change
test('Consistent storage folders', async () => {
const storageNames = []
Expand Down
19 changes: 19 additions & 0 deletions test/unit-tests/utils.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// @ts-check
const test = require('node:test')
const assert = require('node:assert/strict')
const { ExhaustivenessError } = require('../../lib/utils.js')

test('ExhaustivenessError', () => {
const bools = [true, false]
assert.doesNotThrow(() => {
bools.forEach((bool) => {
switch (bool) {
case true:
case false:
break
default:
throw new ExhaustivenessError(bool)
}
})
})
})
Loading