diff --git a/README.md b/README.md index 82882af..df41639 100644 --- a/README.md +++ b/README.md @@ -86,7 +86,7 @@ Creates a new FeedStore `without wait for their initialization.` > The initialization happens by running: `await feedStore.initialize()` -#### `await feedStore.openFeed(path, [options]) -> Hypercore` +#### `feedStore.openFeed(path, [options]) -> Promise` Creates a new hypercore feed identified by a string path. @@ -97,21 +97,21 @@ Creates a new hypercore feed identified by a string path. - `metadata: Object`: Serializable object with custom data about the feed. - `[...hypercoreOptions]`: Hypercore options. -#### `await feedStore.closeFeed(path)` +#### `feedStore.closeFeed(path) -> Promise` Close a feed by the path. -#### `await feedStore.deleteDescriptor(path)` +#### `feedStore.deleteDescriptor(path) -> Promise` Remove a descriptor from the database by the path. > This operation would not close the feed. -#### `await feedStore.close()` +#### `feedStore.close() -> Promise` Close the hypertrie database and their feeds. -#### `await feedStore.loadFeeds((descriptor) => Boolean) -> Hypercore[]` +#### `feedStore.loadFeeds((descriptor) => Boolean) -> Promise` Loads feeds using a function to filter what feeds you want to load from the database. @@ -119,7 +119,7 @@ Loads feeds using a function to filter what feeds you want to load from the data const feeds = await feedStore.loadFeeds(descriptor => descriptor.metadata.tag === 'important') ``` -#### `await feedStore.ready()` +#### `feedStore.ready() -> Promise` Wait for feedStore to be ready. @@ -174,6 +174,18 @@ Filter the opened feeds using a callback function. - `descriptor: FeedDescriptor` +#### `feedStore.createReadStream([options]) -> ReadableStream` + +Creates a ReadableStream from the feeds stored in FeedStore. + +- `options: Options for the hypercore.createReadStream` + +#### `feedStore.createReadStreamByFilter(descriptor => Boolean, [options]) -> ReadableStream` + +Creates a ReadableStream from the feeds stored in FeedStore. + +- `options: Options for the hypercore.createReadStream` + ### Events #### `feedStore.on('ready', () => {})` diff --git a/package-lock.json b/package-lock.json index e5a7468..97c1ec1 100644 --- a/package-lock.json +++ b/package-lock.json @@ -2425,11 +2425,19 @@ "version": "1.4.4", "resolved": "https://registry.npmjs.org/end-of-stream/-/end-of-stream-1.4.4.tgz", "integrity": "sha512-+uw1inIHVPQoaVuHzRyXd21icM+cnt4CzD5rW+NC1wjOUSTOs+Te7FOv7AhN7vS9x/oIyhLP5PR1H+phQAHu5Q==", - "dev": true, "requires": { "once": "^1.4.0" } }, + "end-of-stream-promise": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/end-of-stream-promise/-/end-of-stream-promise-1.0.0.tgz", + "integrity": "sha512-1u3Geul15xPtCiZFZibKWulA6pMecvPO+cvejugP36fGviKdcKydXzHMLqjZgt+N+DO+ifcKVUYcmg7IxlgpBg==", + "dev": true, + "requires": { + "end-of-stream": "^1.4.4" + } + }, "error-ex": { "version": "1.3.2", "resolved": "https://registry.npmjs.org/error-ex/-/error-ex-1.3.2.tgz", @@ -5744,6 +5752,14 @@ "resolved": "https://registry.npmjs.org/ms/-/ms-2.1.2.tgz", "integrity": "sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w==" }, + "multi-read-stream": { + "version": "2.0.0", + "resolved": "https://registry.npmjs.org/multi-read-stream/-/multi-read-stream-2.0.0.tgz", + "integrity": "sha1-2e6GFHQwiUEaTZrH0h04qmjDbkA=", + "requires": { + "readable-stream": "^2.0.1" + } + }, "mute-stream": { "version": "0.0.7", "resolved": "https://registry.npmjs.org/mute-stream/-/mute-stream-0.0.7.tgz", @@ -6040,7 +6056,6 @@ "version": "1.4.0", "resolved": "https://registry.npmjs.org/once/-/once-1.4.0.tgz", "integrity": "sha1-WDsap3WWHUsROsF9nFC6753Xa9E=", - "dev": true, "requires": { "wrappy": "1" } @@ -7839,6 +7854,12 @@ "browser-process-hrtime": "^0.1.2" } }, + "wait-for-expect": { + "version": "3.0.1", + "resolved": "https://registry.npmjs.org/wait-for-expect/-/wait-for-expect-3.0.1.tgz", + "integrity": "sha512-3Ha7lu+zshEG/CeHdcpmQsZnnZpPj/UsG3DuKO8FskjuDbkx3jE3845H+CuwZjA2YWYDfKMU2KhnCaXMLd3wVw==", + "dev": true + }, "walker": { "version": "1.0.7", "resolved": "https://registry.npmjs.org/walker/-/walker-1.0.7.tgz", @@ -7921,8 +7942,7 @@ "wrappy": { "version": "1.0.2", "resolved": "https://registry.npmjs.org/wrappy/-/wrappy-1.0.2.tgz", - "integrity": "sha1-tSQ9jz7BqjXxNkYFvA0QNuMKtp8=", - "dev": true + "integrity": "sha1-tSQ9jz7BqjXxNkYFvA0QNuMKtp8=" }, "write": { "version": "1.0.3", diff --git a/package.json b/package.json index 1117149..2b31db8 100644 --- a/package.json +++ b/package.json @@ -25,20 +25,25 @@ "coverage": "npm test -- --coverage", "coveralls": "npm run coverage && cat ./coverage/lcov.info | coveralls", "lint": "semistandard 'src/**/*.js'", - "prepublishOnly": "npm run test && npm run build", - "test": "npm run build:protobuf && jest --verbose --passWithNoTests src", + "prepublishOnly": "npm run build && npm run test", + "test": "jest --verbose --passWithNoTests src", "posttest": "npm run lint" }, "browserslist": [ "> 5%" ], + "jest": { + "testEnvironment": "node" + }, "dependencies": { "@dxos/codec-protobuf": "^1.0.0", "buffer-json-encoding": "^1.0.2", "debug": "^4.1.1", + "end-of-stream": "^1.4.4", "hypercore": "^7.7.1", "hypercore-crypto": "^1.0.0", "hypertrie": "^3.8.0", + "multi-read-stream": "^2.0.0", "mutexify": "^1.2.0", "p-timeout": "^3.2.0", "pify": "^4.0.1", @@ -57,10 +62,12 @@ "babel-plugin-inline-import": "^3.0.0", "coveralls": "^3.0.7", "del-cli": "^3.0.0", + "end-of-stream-promise": "^1.0.0", "jest": "^24.5.0", "random-access-memory": "^3.1.1", "semistandard": "^14.2.0", - "tempy": "^0.3.0" + "tempy": "^0.3.0", + "wait-for-expect": "^3.0.1" }, "publishConfig": { "access": "public" diff --git a/src/feed-store.js b/src/feed-store.js index 21bff76..70361bb 100644 --- a/src/feed-store.js +++ b/src/feed-store.js @@ -4,6 +4,8 @@ import { EventEmitter } from 'events'; import assert from 'assert'; +import multi from 'multi-read-stream'; +import eos from 'end-of-stream'; import Codec from '@dxos/codec-protobuf'; @@ -174,7 +176,7 @@ class FeedStore extends EventEmitter { /** * Get a descriptor by a path. - * +* * @param {string} path * @returns {FeedDescriptor} */ @@ -195,12 +197,12 @@ class FeedStore extends EventEmitter { /** * Find a feed using a filter callback. * - * @param {descriptorCallback} callback + * @param {descriptorCallback} callback Filter function for the opened descriptors. * @returns {Hypercore} */ - findFeed (cb) { + findFeed (callback) { const descriptor = this.getOpenedDescriptors() - .find(descriptor => cb(descriptor)); + .find(descriptor => callback(descriptor)); if (descriptor) { return descriptor.feed; @@ -210,12 +212,12 @@ class FeedStore extends EventEmitter { /** * Filter feeds using a filter callback. * - * @param {descriptorCallback} callback + * @param {descriptorCallback} callback Filter function for the opened descriptors. * @returns {Hypercore[]} */ - filterFeeds (cb) { + filterFeeds (callback) { const descriptors = this.getOpenedDescriptors() - .filter(descriptor => cb(descriptor)); + .filter(descriptor => callback(descriptor)); return descriptors.map(descriptor => descriptor.feed); } @@ -223,14 +225,14 @@ class FeedStore extends EventEmitter { /** * Load feeds using a filter callback. * - * @param {descriptorCallback} callback + * @param {descriptorCallback} callback Filter function for the opened descriptors. * @returns {Promise} */ - async loadFeeds (cb) { + async loadFeeds (callback) { await this.ready(); const descriptors = this.getDescriptors() - .filter(descriptor => cb(descriptor)); + .filter(descriptor => callback(descriptor)); return Promise.all(descriptors.map(descriptor => this._openFeed(descriptor))); } @@ -344,6 +346,44 @@ class FeedStore extends EventEmitter { } } + /** + * Creates a ReadableStream from the feeds stored in FeedStore. + * + * @param {Object} [options] Options for the hypercore.createReadStream. + * @returns {ReadableStream} + */ + createReadStream (options) { + return this.createReadStreamByFilter(() => true, options); + } + + /** + * Creates a ReadableStream from multiple feeds using a filter function. + * + * @param {descriptorCallback} [callback] Filter function to select from which feeds to read. + * @param {Object} [options] Options for the hypercore.createReadStream. + * @returns {ReadableStream} + */ + createReadStreamByFilter (callback, options) { + const streams = []; + + this.filterFeeds(callback).forEach(feed => { + streams.push(feed.createReadStream(options)); + }); + + const multiReader = multi.obj(streams); + + const onFeed = (feed, descriptor) => { + if (callback(descriptor)) { + multiReader.add(feed.createReadStream(options)); + } + }; + + this.on('feed', onFeed); + eos(multiReader, () => this.removeListener('feed', onFeed)); + + return multiReader; + } + /** * Factory to create a new FeedDescriptor. * diff --git a/src/feed-store.test.js b/src/feed-store.test.js index 88bf571..5621891 100644 --- a/src/feed-store.test.js +++ b/src/feed-store.test.js @@ -7,6 +7,8 @@ import tempy from 'tempy'; import ram from 'random-access-memory'; import hypercore from 'hypercore'; import pify from 'pify'; +import wait from 'wait-for-expect'; +import eos from 'end-of-stream-promise'; import FeedStore from './feed-store'; @@ -66,6 +68,7 @@ describe('FeedStore', () => { test('Feeds', async () => { expect(feedStore.getFeeds().map(f => f.key)).toEqual([booksFeed.key, usersFeed.key]); expect(feedStore.findFeed(fd => fd.key.equals(booksFeed.key))).toBe(booksFeed); + expect(feedStore.findFeed(() => false)).toBeUndefined(); expect(feedStore.filterFeeds(fd => fd.path === '/books')).toEqual([booksFeed]); }); @@ -122,6 +125,16 @@ describe('FeedStore', () => { decode (val) { return JSON.parse(val); } + }, + codecB: { + name: 'codecB', + encode (val) { + val.encodedBy = 'codecB'; + return Buffer.from(JSON.stringify(val)); + }, + decode (val) { + return JSON.parse(val); + } } } }; @@ -206,4 +219,85 @@ describe('FeedStore', () => { expect(release).toBeDefined(); await release(); }); + + test('createReadStream', async () => { + const feedStore = await FeedStore.create( + hypertrie(ram), + ram + ); + + const foo = await feedStore.openFeed('/foo'); + const bar = await feedStore.openFeed('/bar'); + await Promise.all([ + pify(foo.append.bind(foo))('foo1'), + pify(bar.append.bind(bar))('bar1') + ]); + + const stream = feedStore.createReadStream(); + const liveStream = feedStore.createReadStream({ live: true }); + + const messages = []; + stream.on('data', (chunk) => { + messages.push(chunk.toString('utf8')); + }); + + const liveMessages = []; + liveStream.on('data', (chunk) => { + liveMessages.push(chunk.toString('utf8')); + }); + + await eos(stream); + expect(messages.sort()).toEqual(['bar1', 'foo1']); + + const quz = await feedStore.openFeed('/quz'); + await pify(quz.append.bind(quz))('quz1'); + await wait(() => { + expect(liveMessages.sort()).toEqual(['bar1', 'foo1', 'quz1']); + }); + + liveStream.destroy(); + }); + + test('createReadStreamByFilter', async () => { + const feedStore = await FeedStore.create( + hypertrie(ram), + ram + ); + + const foo = await feedStore.openFeed('/foo', { metadata: { topic: 'topic1' } }); + const bar = await feedStore.openFeed('/bar'); + + await Promise.all([ + pify(foo.append.bind(foo))('foo1'), + pify(bar.append.bind(bar))('bar1') + ]); + + const stream = feedStore.createReadStreamByFilter(descriptor => descriptor.metadata.topic === 'topic1'); + const liveStream = feedStore.createReadStreamByFilter(descriptor => descriptor.metadata.topic === 'topic1', { live: true }); + + const messages = []; + stream.on('data', (chunk) => { + messages.push(chunk.toString('utf8')); + }); + + const liveMessages = []; + liveStream.on('data', (chunk) => { + liveMessages.push(chunk.toString('utf8')); + }); + + await eos(stream); + expect(messages.sort()).toEqual(['foo1']); + + const baz = await feedStore.openFeed('/baz'); + await pify(baz.append.bind(baz))('baz1'); + + const quz = await feedStore.openFeed('/quz', { metadata: { topic: 'topic1' } }); + await pify(quz.append.bind(quz))('quz1'); + + await wait(() => { + expect(liveMessages.sort()).toEqual(['foo1', 'quz1']); + }); + + liveStream.destroy(); + }); });