Skip to content

Commit

Permalink
feat: implement addCore method
Browse files Browse the repository at this point in the history
  • Loading branch information
sethvincent committed Nov 29, 2022
1 parent 365249d commit da2b25e
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 4 deletions.
15 changes: 13 additions & 2 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class MultiCoreIndexer extends TypedEmitter {
#lastRemaining = -1
#rateMeasurementStart = Date.now()
#rate = 0
#createStorage

/**
*
Expand All @@ -48,9 +49,9 @@ class MultiCoreIndexer extends TypedEmitter {
*/
constructor(cores, { batch, maxBatch = DEFAULT_BATCH_SIZE, storage }) {
super()
const createStorage = MultiCoreIndexer.defaultStorage(storage)
this.#createStorage = MultiCoreIndexer.defaultStorage(storage)
const coreIndexStreams = cores.map((core) => {
const storage = createStorage(core.key.toString('hex'))
const storage = this.#createStorage(core.key.toString('hex'))
return new CoreIndexStream(core, storage)
})
this.#indexStream = new MultiCoreIndexStream(coreIndexStreams, {
Expand All @@ -74,6 +75,16 @@ class MultiCoreIndexer extends TypedEmitter {
this.#indexStream.on('indexing', this.#handleIndexingBound)
}

/**
* Add a core to be indexed
* @param {import('hypercore')<T>} core
*/
addCore(core) {
const storage = this.#createStorage(core.key.toString('hex'))
const coreIndexStream = new CoreIndexStream(core, storage)
this.#indexStream.addStream(coreIndexStream)
}

async close() {
this.#indexStream.off('indexing', this.#handleIndexingBound)
this.#writeStream.destroy()
Expand Down
27 changes: 25 additions & 2 deletions test/multi-core-indexer.test.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// @ts-check
const MultiCoreIndexer = require('../')
const { test, only } = require('tap')
const { test } = require('tap')
const { once } = require('events')
const ram = require('random-access-memory')
const {
Expand Down Expand Up @@ -50,6 +50,29 @@ test('Indexes items appended after initial index', async (t) => {
t.pass('Indexer closed')
})

test('Indexes items appended after initial index', async (t) => {
const cores = await createMultiple(5)
/** @type {Entry[]} */
const entries = []
const indexer = new MultiCoreIndexer(cores, {
batch: async (data) => {
entries.push(...data)
},
maxBatch: 50,
storage: () => ram(),
})

const newCores = await createMultiple(5)
for (const core of newCores) {
indexer.addCore(core)
}
const expected = await generateFixtures([...cores, ...newCores], 100)
await throttledIdle(indexer)
t.same(sortEntries(entries), sortEntries(expected))
await indexer.close()
t.pass('Indexer closed')
})

test('index sparse hypercores', async (t) => {
const coreCount = 5
const localCores = await createMultiple(coreCount)
Expand Down Expand Up @@ -245,7 +268,7 @@ test('Batches smaller than maxBatch when indexing is faster than hypercore reads
await indexer.close()
})

only('sync state / progress', async (t) => {
test('sync state / progress', async (t) => {
const expectedVariation = 0.2
const numberOfCores = 5
const entriesPerCore = 1000
Expand Down

0 comments on commit da2b25e

Please sign in to comment.