Skip to content

Commit

Permalink
feat: Allow adding non-ready cores (#24)
Browse files Browse the repository at this point in the history
* feat: Allow adding non-ready cores

Fixes #23

* run CI on stacked PRs

* fix types
  • Loading branch information
gmaclennan committed Nov 28, 2023
1 parent 846d895 commit 6e665e4
Show file tree
Hide file tree
Showing 10 changed files with 169 additions and 63 deletions.
1 change: 0 additions & 1 deletion .github/workflows/node.js.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ on:
push:
branches: [main]
pull_request:
branches: [main]

jobs:
build:
Expand Down
29 changes: 4 additions & 25 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@ const { Writable } = require('streamx')
const { TypedEmitter } = require('tiny-typed-emitter')
const { once } = require('events')
const raf = require('random-access-file')
const { discoveryKey } = require('hypercore-crypto')
// const log = require('debug')('multi-core-indexer')
const { CoreIndexStream } = require('./lib/core-index-stream')
const { MultiCoreIndexStream } = require('./lib/multi-core-index-stream')
const { promisify } = require('util')
const { pDefer } = require('./lib/utils.js')

const DEFAULT_BATCH_SIZE = 100
Expand Down Expand Up @@ -43,15 +41,13 @@ class MultiCoreIndexer extends TypedEmitter {
#createStorage
/** @type {IndexState | undefined} */
#prevEmittedState
/** @type {Set<import('random-access-storage')>} */
#storages = new Set()
#emitStateBound
/** @type {import('./lib/utils.js').DeferredPromise | undefined} */
#pendingIdle

/**
*
* @param {import('hypercore')<T, Buffer | string>[]} cores
* @param {import('hypercore')<T, any>[]} cores
* @param {object} opts
* @param {(entries: Entry<T>[]) => Promise<void>} opts.batch
* @param {StorageParam} opts.storage
Expand All @@ -61,9 +57,7 @@ class MultiCoreIndexer extends TypedEmitter {
super()
this.#createStorage = MultiCoreIndexer.defaultStorage(storage)
const coreIndexStreams = cores.map((core) => {
const storage = this.#createStorage(getStorageName(core))
this.#storages.add(storage)
return new CoreIndexStream(core, storage)
return new CoreIndexStream(core, this.#createStorage)
})
this.#indexStream = new MultiCoreIndexStream(coreIndexStreams, {
highWaterMark: maxBatch,
Expand Down Expand Up @@ -99,12 +93,10 @@ class MultiCoreIndexer extends TypedEmitter {

/**
* Add a core to be indexed
* @param {import('hypercore')<T, Buffer | string>} core
* @param {import('hypercore')<T, any>} core
*/
addCore(core) {
const storage = this.#createStorage(getStorageName(core))
this.#storages.add(storage)
const coreIndexStream = new CoreIndexStream(core, storage)
const coreIndexStream = new CoreIndexStream(core, this.#createStorage)
this.#indexStream.addStream(coreIndexStream)
}

Expand All @@ -128,13 +120,6 @@ class MultiCoreIndexer extends TypedEmitter {
once(this.#indexStream, 'close'),
once(this.#writeStream, 'close'),
])
const storageClosePromises = []
for (const storage of this.#storages) {
const promisifiedClose = promisify(storage.close.bind(storage))
storageClosePromises.push(promisifiedClose())
}
this.#storages.clear()
await Promise.all(storageClosePromises)
}

/** @param {Entry<T>[]} entries */
Expand Down Expand Up @@ -204,9 +189,3 @@ class MultiCoreIndexer extends TypedEmitter {
}

module.exports = MultiCoreIndexer

/** @param {{ key: Buffer }} core */
function getStorageName(core) {
const id = discoveryKey(core.key).toString('hex')
return [id.slice(0, 2), id.slice(2, 4), id].join('/')
}
37 changes: 29 additions & 8 deletions lib/core-index-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
const { Readable } = require('streamx')
const Bitfield = require('./bitfield')
const { pDefer } = require('./utils')
const { promisify } = require('node:util')

const kReadPromise = Symbol('readPromise')
const kOpenPromise = Symbol('openPromise')
Expand Down Expand Up @@ -35,7 +36,9 @@ class CoreIndexStream extends Readable {
#inProgressBitfield
#inProgress = 0
#core
/** @type {import('random-access-storage') | undefined} */
#storage
#createStorage
#index = 0
/** @type {Set<number>} */
#downloaded = new Set()
Expand All @@ -45,10 +48,10 @@ class CoreIndexStream extends Readable {
#drained = false

/**
* @param {import('hypercore')<T, Buffer | string>} core
* @param {import('random-access-storage')} storage
* @param {import('hypercore')<T, any>} core
* @param {(name: string) => import('random-access-storage')} createStorage
*/
constructor(core, storage) {
constructor(core, createStorage) {
super({
// Treat as object stream, count each object as size `1` so that the
// `remaining` property can use the stream buffer to calculate how many
Expand All @@ -57,8 +60,7 @@ class CoreIndexStream extends Readable {
byteLength: () => 1,
})
this.#core = core
this.id = core.key.toString('hex')
this.#storage = storage
this.#createStorage = createStorage
this[kHandleAppend] = this[kHandleAppend].bind(this)
this[kHandleDownload] = this[kHandleDownload].bind(this)
}
Expand All @@ -73,8 +75,8 @@ class CoreIndexStream extends Readable {
return this.#drained
}

get key() {
return this.#core.key
get core() {
return this.#core
}

/** @param {any} cb */
Expand Down Expand Up @@ -112,12 +114,18 @@ class CoreIndexStream extends Readable {
this.#core.removeListener('append', this[kHandleAppend])
this.#core.removeListener('download', this[kHandleDownload])
await this.#indexedBitfield?.flush()
if (this.#storage) await closeStorage(this.#storage)
}

async [kOpenPromise]() {
await this.#core.ready()
await this.#core.update({ wait: true })
const { discoveryKey } = this.#core
/* istanbul ignore next: just to keep TS happy - after core.ready() this is set */
if (!discoveryKey) throw new Error('Missing discovery key')
this.#storage = this.#createStorage(getStorageName(discoveryKey))
this.#indexedBitfield = await Bitfield.open(this.#storage)
this.#inProgressBitfield = await new Bitfield()
await this.#core.update({ wait: true })
this.#core.on('append', this[kHandleAppend])
this.#core.on('download', this[kHandleDownload])
}
Expand Down Expand Up @@ -177,6 +185,8 @@ class CoreIndexStream extends Readable {
if (block === null) return false
this.#inProgressBitfield?.set(index, true)
this.#inProgress++
/* istanbul ignore next: this should always be set at this point */
if (!this.#core.key) throw new Error('Missing core key')
const entry = { key: this.#core.key, block, index }
this.#readBufferAvailable = this.push(entry)
return true
Expand All @@ -196,3 +206,14 @@ class CoreIndexStream extends Readable {
}

exports.CoreIndexStream = CoreIndexStream

/** @param {Buffer} discoveryKey */
function getStorageName(discoveryKey) {
const id = discoveryKey.toString('hex')
return [id.slice(0, 2), id.slice(2, 4), id].join('/')
}

/** @param {import('random-access-storage')} storage*/
function closeStorage(storage) {
return promisify(storage.close.bind(storage))()
}
13 changes: 12 additions & 1 deletion lib/multi-core-index-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,15 @@ class MultiCoreIndexStream extends Readable {
// Do this so that we can remove this listener when we destroy the stream
const handleReadableFn = this[kHandleReadable].bind(this, stream)
this.#streams.set(stream, handleReadableFn)
this.#streamsById.set(stream.id, stream)
stream.core
.ready()
.then(() => {
const coreKey = stream.core.key
/* istanbul ignore next: this is set after ready */
if (!coreKey) return
this.#streamsById.set(coreKey.toString('hex'), stream)
})
.catch(noop)
this.#readable.add(stream)
stream.on('readable', handleReadableFn)
stream.on('indexing', this[kHandleIndexing])
Expand Down Expand Up @@ -180,3 +188,6 @@ class MultiCoreIndexStream extends Readable {
}

exports.MultiCoreIndexStream = MultiCoreIndexStream

/* istanbul ignore next: TODO add test for adding broken cores */
function noop() {}
Loading

0 comments on commit 6e665e4

Please sign in to comment.