Skip to content

Commit

Permalink
Merge c7bb6ee into c87f8fb
Browse files Browse the repository at this point in the history
  • Loading branch information
EvanHahn committed Apr 5, 2024
2 parents c87f8fb + c7bb6ee commit 2875624
Show file tree
Hide file tree
Showing 6 changed files with 218 additions and 24 deletions.
15 changes: 13 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ be the byte length of all the blocks in the batch. If the value encoding is

### indexer.state

Type: `IndexState: { current: 'idle' | 'indexing', remaining: number, entriesPerSecond: number }`
Type: `IndexState: { current: 'idle' | 'indexing' | 'closing' | 'closed', remaining: number, entriesPerSecond: number }`

A getter that returns the current `IndexState`, the same as the value emitted by the `index-state` event. This getter is useful for checking the state of the indexer before it has emitted any events.

Expand All @@ -139,14 +139,25 @@ Type: `Hypercore`
Add a hypercore to the indexer. Must have the same value encoding as other
hypercores already in the indexer.

Rejects if called after the indexer is closed.

### indexer.idle()

Resolves when indexing state is `'idle'`.

Resolves if the indexer is closed before this resolves. Rejects if called
after the indexer is closed.

### indexer.close()

Stop the indexer and flush index state to storage. This will not close the
underlying storage - it is up to the consumer to do that.

### indexer.unlink()

Unlink all index files. This should only be called after `close()` has resolved.
Unlink all index files.

This should only be called after `close()` has resolved, and rejects if not.

### indexer.on('index-state', onState)

Expand Down
111 changes: 92 additions & 19 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ const { once } = require('events')
const raf = require('random-access-file')
const { CoreIndexStream } = require('./lib/core-index-stream')
const { MultiCoreIndexStream } = require('./lib/multi-core-index-stream')
const { pDefer } = require('./lib/utils.js')
const { pDefer, ExhaustivenessError } = require('./lib/utils.js')

const DEFAULT_BATCH_SIZE = 100
// The indexing rate (in entries per second) is calculated as an exponential
Expand Down Expand Up @@ -86,26 +86,43 @@ class MultiCoreIndexer extends TypedEmitter {
}

/**
* Add a core to be indexed
* Add a hypercore to the indexer. Must have the same value encoding as other
* hypercores already in the indexer.
*
* Rejects if called after the indexer is closed.
*
* @param {import('hypercore')<T, any>} core
*/
addCore(core) {
this.#assertOpen('Cannot add core after closing')
const coreIndexStream = new CoreIndexStream(core, this.#createStorage)
this.#indexStream.addStream(coreIndexStream)
}

/**
* Resolves when indexing state is 'idle'
* Resolves when indexing state is 'idle'.
*
* Resolves if the indexer is closed before this resolves. Rejects if called
* after the indexer is closed.
*/
async idle() {
this.#assertOpen('Cannot await idle after closing')
if (this.#getState().current === 'idle') return
if (!this.#pendingIdle) {
this.#pendingIdle = pDefer()
}
return this.#pendingIdle.promise
}

/**
* Stop the indexer and flush index state to storage. This will not close the
* underlying storage - it is up to the consumer to do that.
*
* Rejects if called more than once.
*/
async close() {
this.#assertOpen('Cannot double-close')
this.#state = 'closing'
this.#indexStream.off('indexing', this.#emitStateBound)
this.#indexStream.off('drained', this.#emitStateBound)
this.#writeStream.destroy()
Expand All @@ -114,13 +131,42 @@ class MultiCoreIndexer extends TypedEmitter {
once(this.#indexStream, 'close'),
once(this.#writeStream, 'close'),
])
this.#pendingIdle?.resolve()
this.#state = 'closed'
}

/**
* Unlink all index files. This should only be called after `close()` has resolved.
* Unlink all index files.
*
* This should only be called after `close()` has resolved, and rejects if not.
*/
async unlink() {
await this.#indexStream.unlink()
switch (this.#state) {
case 'idle':
case 'indexing':
case 'closing':
throw new Error('Cannot unlink until fully closed')
case 'closed':
return this.#indexStream.unlink()
/* c8 ignore next 2 */
default:
throw new ExhaustivenessError(this.#state)
}
}

/** @param {string} message */
#assertOpen(message) {
switch (this.#state) {
case 'idle':
case 'indexing':
return
case 'closing':
case 'closed':
throw new Error(message)
/* c8 ignore next 2 */
default:
throw new ExhaustivenessError(this.#state)
}
}

/** @param {Entry<T>[]} entries */
Expand All @@ -146,28 +192,55 @@ class MultiCoreIndexer extends TypedEmitter {

#emitState() {
const state = this.#getState()
if (state.current !== this.#prevEmittedState?.current) {
this.emit(state.current)
}
// Only emit if remaining has changed (which infers that state.current has changed)
if (state.remaining !== this.#prevEmittedState?.remaining) {
this.emit('index-state', state)
switch (state.current) {
case 'idle':
case 'indexing':
if (state.current !== this.#prevEmittedState?.current) {
this.emit(state.current)
}
// Only emit if remaining has changed (which infers that state.current has changed)
if (state.remaining !== this.#prevEmittedState?.remaining) {
this.emit('index-state', state)
}
this.#prevEmittedState = state
break
/* c8 ignore next 3 */
case 'closing':
case 'closed':
break
/* c8 ignore next 2 */
default:
throw new ExhaustivenessError(state.current)
}
this.#prevEmittedState = state
}

/** @returns {IndexState} */
#getState() {
const remaining = this.#indexStream.remaining
const drained = this.#indexStream.drained
const prevState = this.#state
this.#state = remaining === 0 && drained ? 'idle' : 'indexing'
if (this.#state === 'idle' && this.#pendingIdle) {
this.#pendingIdle.resolve()
this.#pendingIdle = undefined
}
if (this.#state === 'indexing' && prevState === 'idle') {
this.#rateMeasurementStart = Date.now()

switch (this.#state) {
case 'idle':
case 'indexing': {
this.#state = remaining === 0 && drained ? 'idle' : 'indexing'
if (this.#state === 'idle' && this.#pendingIdle) {
this.#pendingIdle.resolve()
this.#pendingIdle = undefined
}
if (this.#state === 'indexing' && prevState === 'idle') {
this.#rateMeasurementStart = Date.now()
}
break
}
case 'closing':
case 'closed':
break
/* c8 ignore next 2 */
default:
throw new ExhaustivenessError(this.#state)
}

return {
current: this.#state,
remaining,
Expand Down
2 changes: 1 addition & 1 deletion lib/types.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { ReadableEvents } from 'streamx'

export type IndexStateCurrent = 'idle' | 'indexing'
export type IndexStateCurrent = 'idle' | 'indexing' | 'closing' | 'closed'

export interface IndexState {
current: IndexStateCurrent
Expand Down
10 changes: 10 additions & 0 deletions lib/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,13 @@ function pDefer() {
return deferred
}
exports.pDefer = pDefer

class ExhaustivenessError extends Error {
/** @param {never} value */
constructor(value) {
/* c8 ignore next 3 */
super(`Exhaustiveness check failed. ${value} should be impossible`)
this.name = 'ExhaustivenessError'
}
}
exports.ExhaustivenessError = ExhaustivenessError
85 changes: 83 additions & 2 deletions test/multi-core-indexer.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ test('Indexes items appended after initial index', async () => {
await indexer.close()
})

test('No cores, starts idle, indexing after core added', async () => {
test('State transitions', async () => {
const indexer = new MultiCoreIndexer([], {
batch: async () => {},
storage: () => new ram(),
Expand All @@ -106,7 +106,19 @@ test('No cores, starts idle, indexing after core added', async () => {
indexer.addCore(core)
assert.equal(indexer.state.current, 'indexing', 'indexing after core added')
await indexer.idle()
await indexer.close()
assert.equal(indexer.state.current, 'idle', 'returns to an idle state')
const closePromise = indexer.close()
assert.equal(
indexer.state.current,
'closing',
'moves to a "closing" state immediately after calling close'
)
await closePromise
assert.equal(
indexer.state.current,
'closed',
'moves to a "closed" state after closing'
)
})

test('Calling idle() when already idle still resolves', async () => {
Expand Down Expand Up @@ -541,6 +553,75 @@ test('Closing before batch complete should resume on next start', async () => {
await indexer2.close()
})

test('closing causes many methods to fail', async (t) => {
{
const indexer = new MultiCoreIndexer([], {
batch: async () => {},
storage: () => new ram(),
})
const closePromise = indexer.close()
t.after(() => closePromise)
const core = await create()
assert.throws(() => indexer.addCore(core))
}

{
const indexer = new MultiCoreIndexer([], {
batch: async () => {},
storage: () => new ram(),
})
const closePromise = indexer.close()
t.after(() => closePromise)
await assert.rejects(() => indexer.idle())
}

{
const indexer = new MultiCoreIndexer([], {
batch: async () => {},
storage: () => new ram(),
})
const closePromise = indexer.close()
t.after(() => closePromise)
await assert.rejects(() => indexer.close())
}
})

test('closing resolves existing idle promises', async () => {
const indexer = new MultiCoreIndexer([], {
batch: async () => {},
storage: () => new ram(),
})

const core = await create()
indexer.addCore(core)

const idlePromises = [indexer.idle(), indexer.idle(), indexer.idle()]

await indexer.close()

await assert.doesNotReject(() => Promise.all(idlePromises))
})

test('unlinking requires the indexer to be closed', async () => {
const indexer = new MultiCoreIndexer([], {
batch: async () => {},
storage: () => new ram(),
})

await indexer.idle()
await assert.rejects(() => indexer.unlink(), 'rejects when idle')

const core = await create()
indexer.addCore(core)
await assert.rejects(() => indexer.unlink(), 'rejects when indexing')

const closePromise = indexer.close()
await assert.rejects(() => indexer.unlink(), 'rejects when closing')

await closePromise
await assert.doesNotReject(() => indexer.unlink())
})

// This checks that storage names do not change between versions, which would be a breaking change
test('Consistent storage folders', async () => {
const storageNames = []
Expand Down
19 changes: 19 additions & 0 deletions test/unit-tests/utils.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// @ts-check
const test = require('node:test')
const assert = require('node:assert/strict')
const { ExhaustivenessError } = require('../../lib/utils.js')

test('ExhaustivenessError', () => {
const bools = [true, false]
assert.doesNotThrow(() => {
bools.forEach((bool) => {
switch (bool) {
case true:
case false:
break
default:
throw new ExhaustivenessError(bool)
}
})
})
})

0 comments on commit 2875624

Please sign in to comment.