Skip to content
This repository has been archived by the owner on Jul 27, 2021. It is now read-only.

Commit

Permalink
Added reader class.
Browse files Browse the repository at this point in the history
  • Loading branch information
tinchoz49 committed Jan 22, 2020
1 parent 6a6b090 commit c7764f4
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 84 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ Find an opened feed using a filter callback.

- `descriptor: FeedDescriptor`

#### `feedStore.createReadStream([options], [callback]) -> ReadableStream`
#### `feedStore.createReadStream([callback|options]) -> ReadableStream`

Creates a ReadableStream from the loaded feeds.

Expand Down
77 changes: 9 additions & 68 deletions src/feed-store.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,13 @@

import { EventEmitter } from 'events';
import assert from 'assert';
import multi from 'multi-read-stream';
import eos from 'end-of-stream';
import hypertrie from 'hypertrie';
import jsonBuffer from 'buffer-json-encoding';
import through from 'through2';
import pump from 'pump';

import FeedDescriptor from './feed-descriptor';
import IndexDB from './index-db';
import Locker from './locker';
import Reader from './reader';

const STORE_NAMESPACE = '@feedstore';

Expand Down Expand Up @@ -112,15 +109,13 @@ export class FeedStore extends EventEmitter {

this.on('feed', (_, descriptor) => {
this._readers.forEach(reader => {
reader.addFeed(descriptor);
reader.addFeedStream(descriptor);
});
});

this.on('closed', () => {
this._readers.forEach(reader => {
if (!reader.stream.destroyed) {
reader.stream.destroy(new Error('FeedStore closed'));
}
reader.destroy(new Error('FeedStore closed'));
});
this._readers.clear();
});
Expand Down Expand Up @@ -390,52 +385,25 @@ export class FeedStore extends EventEmitter {
/**
* Creates a ReadableStream from the loaded feeds.
*
* @param {Object} [options] Default options for each feed.createReadStream(options).
* @param {StreamCallback} [callback] Filter function to return options for each feed.createReadStream(). Returns `undefined` will ignore the feed.
* @param {StreamCallback|Object} [callback] Filter function to return options for each feed.createReadStream (returns `false` will ignore the feed) or default object options for each feed.createReadStream(options)
* @returns {ReadableStream}
*/
createReadStream (options, callback = () => ({})) {
if (typeof options === 'function') {
callback = options;
options = {};
} else if (options === undefined) {
options = {};
}

const reader = {
stream: multi.obj(),
addFeed: descriptor => {
const keyStr = descriptor.key.toString('hex');

if (reader.feeds.includes(keyStr)) {
return;
}

let streamOptions = callback(descriptor);
if (streamOptions) {
streamOptions = Object.assign({}, options, typeof streamOptions === 'object' ? streamOptions : {});
reader.stream.add(this._createFeedStream(descriptor, streamOptions));
reader.feeds.push(keyStr);
}
},
feeds: []
};
createReadStream (callback = () => true) {
const reader = new Reader(callback);

this._readers.add(reader);

eos(reader.stream, () => {
reader.onEnd(() => {
this._readers.delete(reader);
});

this._isOpen().then(() => {
this
.getDescriptors()
.filter(descriptor => descriptor.opened)
.forEach(reader.addFeed);
.forEach(descriptor => reader.addFeedStream(descriptor));
}).catch(err => {
if (!reader.stream.destroyed) {
reader.stream.destroy(err);
}
reader.destroy(err);
});

return reader.stream;
Expand Down Expand Up @@ -527,33 +495,6 @@ export class FeedStore extends EventEmitter {
}
}

/**
* Creates a feed stream and stream the block data, seq, key and metadata.
*
* @param {FeedDescriptor} descriptor
* @param {Object} options
* @returns {ReadableStream}
*/
_createFeedStream (descriptor, options) {
const { feed, path, key, metadata } = descriptor;

const { feedStoreInfo = false, ...feedStreamOptions } = options;

const stream = feed.createReadStream(feedStreamOptions);

if (!feedStoreInfo) {
return stream;
}

let seq = feedStreamOptions.start === undefined ? 0 : feedStreamOptions.start;

const addFeedStoreInfo = through.obj((chunk, _, next) => {
next(null, { data: chunk, seq: seq++, path, key, metadata });
});

return pump(stream, addFeedStoreInfo);
}

async _isOpen () {
if (this._state === CLOSED || this._state === CLOSING) {
throw new Error('FeedStore closed');
Expand Down
26 changes: 11 additions & 15 deletions src/feed-store.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -285,16 +285,15 @@ describe('FeedStore', () => {
});

const liveStream1 = testLiveStream({ live: true });
const liveStream2 = testLiveStream({ live: false }, () => ({ live: true }));
const liveStream3 = testLiveStream({ live: true }, () => true);
const liveStream2 = testLiveStream(() => ({ live: true }));

await eos(stream);
expect(messages.sort()).toEqual(['bar1', 'foo1']);

const quz = await feedStore.openFeed('/quz');
await pify(quz.append.bind(quz))('quz1');

await Promise.all([liveStream1, liveStream2, liveStream3]);
await Promise.all([liveStream1, liveStream2]);

async function testLiveStream (...args) {
const liveMessages = [];
Expand All @@ -320,17 +319,13 @@ describe('FeedStore', () => {
pify(bar.append.bind(bar))('bar1')
]);

const stream = feedStore.createReadStream(({ metadata = {}, feed }) => {
const stream = feedStore.createReadStream(({ metadata = {} }) => {
if (metadata.topic === 'topic1') {
return feed.createReadStream();
return true;
}
});
const liveStream1 = testLiveStream(({ metadata = {} }) => {
if (metadata.topic === 'topic1') {
return { live: true };
}
});
const liveStream2 = testLiveStream({ live: false }, ({ metadata = {} }) => {

const liveStream = testLiveStream(({ metadata = {} }) => {
if (metadata.topic === 'topic1') {
return { live: true };
}
Expand All @@ -350,7 +345,7 @@ describe('FeedStore', () => {
const quz = await feedStore.openFeed('/quz', { metadata: { topic: 'topic1' } });
await pify(quz.append.bind(quz))('quz1');

await Promise.all([liveStream1, liveStream2]);
await liveStream;

async function testLiveStream (...args) {
const liveMessages = [];
Expand Down Expand Up @@ -385,12 +380,13 @@ describe('FeedStore', () => {
pify(bar.append.bind(bar))('bar2')
]);

const stream = feedStore.createReadStream({ feedStoreInfo: true }, descriptor => {
const stream = feedStore.createReadStream(descriptor => {
const options = { feedStoreInfo: true };
if (descriptor.path === '/foo') {
return { start: 1 };
options.start = 1;
}

return {};
return options;
});

const messages = [];
Expand Down
108 changes: 108 additions & 0 deletions src/reader.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
//
// Copyright 2019 DxOS.
//

import assert from 'assert';
import multi from 'multi-read-stream';
import pump from 'pump';
import through from 'through2';
import eos from 'end-of-stream';

const all = () => true;

/**
* Creates a multi ReadableStream for feed streams.
*/
export default class Reader {
/**
* constructor
*
* @param {StreamCallback|Object} [callback] Filter function to return options for each feed.createReadStream (returns `false` will ignore the feed) or default object options for each feed.createReadStream(options)
*/
constructor (filter) {
assert(typeof filter === 'function' || typeof filter === 'object');

if (typeof filter === 'function') {
this._filter = filter;
this._options = {};
} else {
this._filter = all;
this._options = filter;
}

this._stream = multi.obj({ autoDestroy: false });
this._feeds = new Set();
}

/**
* @type {ReadableStream}
*/
get stream () {
return this._stream;
}

/**
* Destroy stream.
*
* @param {Error} [err] Optional error object.
*/
destroy (err) {
if (!this._stream.destroyed) {
this._stream.destroy(err);
}
}

/**
* Adds a feed stream and stream the block data, seq, key and metadata.
*
* @param {FeedDescriptor} descriptor
*/
addFeedStream (descriptor) {
const { feed, path, key, metadata } = descriptor;

if (!feed || this._feeds.has(feed)) {
return;
}

let streamOptions = this._filter(descriptor);
if (!streamOptions) {
return;
}

streamOptions = Object.assign({}, this._options, typeof streamOptions === 'object' ? streamOptions : {});

const { feedStoreInfo = false, ...feedStreamOptions } = streamOptions;

const stream = feed.createReadStream(feedStreamOptions);

eos(stream, () => {
this._feeds.delete(feed);
});

if (!feedStoreInfo) {
this._stream.add(stream);
this._feeds.add(feed);
return;
}

let seq = feedStreamOptions.start === undefined ? 0 : feedStreamOptions.start;

const addFeedStoreInfo = through.obj((chunk, _, next) => {
next(null, { data: chunk, seq: seq++, path, key, metadata });
});

this._stream.add(pump(stream, addFeedStoreInfo));
this._feeds.add(feed);
}

/**
* Execute a callback on end of the stream.
*
* @param {function} [callback]
*/
onEnd (callback) {
eos(this._stream, (err) => {
callback(err);
});
}
}

0 comments on commit c7764f4

Please sign in to comment.