Skip to content
This repository has been archived by the owner on Jul 27, 2021. It is now read-only.

Commit

Permalink
Feed store selective reader (#25)
Browse files Browse the repository at this point in the history
* Using create-batch-stream

* minor fix

* Revert modifications for testing purpose

* Try to use node streams

* Fix test running twice

* First "working" version

* Make tests pass

* Make ordered reader inherit Readable

* Increase message count

* Use stream.on('data')

* Update jest

* Add set timeout so read doesnt owerwhelm the event loop

* Lint

* Rename to SelectiveReader

* Add a test for a feed added later

* Fix a test for a feed added later

* Revert to while(true) iml

* Clean up code

* Remove consolel og

* Remove commented code

Co-authored-by: Martin Acosta <tinchoz49@gmail.com>
  • Loading branch information
dmaretskyi and tinchoz49 committed Aug 3, 2020
1 parent ce2d9ad commit 9ddb5e3
Show file tree
Hide file tree
Showing 6 changed files with 1,666 additions and 641 deletions.
3 changes: 2 additions & 1 deletion babel.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ module.exports = {
}
],
'add-module-exports',
'@babel/plugin-proposal-export-default-from'
'@babel/plugin-proposal-export-default-from',
'@babel/plugin-proposal-class-properties'
]
};
5 changes: 3 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
"devDependencies": {
"@babel/cli": "^7.7.0",
"@babel/core": "^7.4.5",
"@babel/plugin-proposal-class-properties": "^7.10.4",
"@babel/plugin-proposal-export-default-from": "^7.5.2",
"@babel/preset-env": "^7.4.5",
"@dxos/benchmark-suite": "^1.0.0-beta.1",
Expand All @@ -71,11 +72,11 @@
"coveralls": "^3.0.7",
"del-cli": "^3.0.0",
"end-of-stream-promise": "^1.0.0",
"jest": "^24.5.0",
"jest": "^26.2.2",
"random-access-memory": "^3.1.1",
"semistandard": "^14.2.0",
"tempy": "^0.5.0",
"wait-for-expect": "^3.0.1"
"wait-for-expect": "^3.0.2"
},
"publishConfig": {
"access": "public"
Expand Down
24 changes: 24 additions & 0 deletions src/feed-store.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import pEvent from 'p-event';
import FeedDescriptor from './feed-descriptor';
import IndexDB from './index-db';
import Reader from './reader';
import SelectiveReader from './selective-reader';

// TODO(burdon): Change to "dxos.feedstore"?
const STORE_NAMESPACE = '@feedstore';
Expand Down Expand Up @@ -367,6 +368,29 @@ export class FeedStore extends EventEmitter {
return this._createReadStream(callback);
}

/**
*
* @param {(feedDescriptor: FeedDescriptor, message: object) => Promise<boolean>} evaluator
*/
createSelectiveStream (evaluator) {
const reader = new SelectiveReader(evaluator);

this._readers.add(reader);

this
._isOpen()
.then(() => {
return reader.addInitialFeedStreams(this
.getDescriptors()
.filter(descriptor => descriptor.opened));
})
.catch(err => {
reader.destroy(err);
});

return reader;
}

/**
* Creates a ReadableStream from the loaded feeds and returns the messages in batch.
*
Expand Down
101 changes: 101 additions & 0 deletions src/selective-reader.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
//
// Copyright 2019 DXOS.org
//

import { Readable } from 'stream';
import createBatchStream from './create-batch-stream';

/**
* Creates a multi ReadableStream for feed streams.
*/
export default class SelectiveReader extends Readable {
/** @type {(feedDescriptor, message) => Promise<boolean>} */
_evaluator;

/** @type {Set<{ descriptor: FeedDescriptor, stream: any, buffer: any[] }>} */
_feeds = new Set();

/** @type {() => void} */
_wakeUpReader;

/** @type {Promise} */
_hasData;

_reading = false;

constructor (evaluator) {
super({ objectMode: true });

this._evaluator = evaluator;
this._resetDataLock();
}

_resetDataLock () {
this._hasData = new Promise(resolve => { this._wakeUpReader = resolve; });
}

async _read () {
if (this._reading) {
this._needsData = true;
return;
}
this._reading = true;
this._needsData = false;

while (true) {
this._resetDataLock();

for (const feed of this._feeds.values()) {
if (feed.buffer.length === 0) {
const messages = feed.stream.read();
if (!messages) continue;
feed.buffer.push(...messages);
}

let message;
while ((message = feed.buffer.shift())) {
if (await this._evaluator(feed.descriptor, message)) {
process.nextTick(() => this._wakeUpReader());
this._needsData = false;
if (!this.push(message)) {
this._reading = false;
return;
}
} else {
feed.buffer.unshift(message);
break;
}
}
}

await new Promise(resolve => setTimeout(resolve, 0)); // yield so that other tasks can be processed

if (this._needsData && Array.from(this._feeds.values()).some(x => x.buffer.length > 0)) {
continue;
}
await this._hasData;
}
}

async addInitialFeedStreams (descriptors) {
for (const descriptor of descriptors) {
this.addFeedStream(descriptor);
}
}

/**
* Adds a feed stream and stream the block data, seq, key and metadata.
*
* @param {FeedDescriptor} descriptor
*/
async addFeedStream (descriptor) {
const stream = createBatchStream(descriptor.feed, { live: true });

stream.on('readable', () => {
this._wakeUpReader();
this._read();
});

this._feeds.add({ descriptor, stream, buffer: [] });
}
}
100 changes: 100 additions & 0 deletions src/selective-reader.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
//
// Copyright 2019 DXOS.org
//

import pify from 'pify';
import ram from 'random-access-memory';
import waitForExpect from 'wait-for-expect';
import { FeedStore } from './feed-store';

function append (feed, message) {
return pify(feed.append.bind(feed))(message);
}

async function generateStreamData (feedStore, maxMessages = 200) {
const [feed1, feed2] = await Promise.all([
feedStore.openFeed('/feed1'),
feedStore.openFeed('/feed2')
]);

const messages = [];
for (let i = 0; i < maxMessages; i++) {
messages.push(append(feed1, `feed1/message${i}`));
messages.push(append(feed2, `feed2/message${i}`));
}

await Promise.all(messages);

return [feed1, feed2];
}

describe('SelectiveReader', () => {
test('two feeds', async () => {
const feedStore = await FeedStore.create(ram, { feedOptions: { valueEncoding: 'utf-8' } });

const MESSAGE_COUNT = 10;

const [feed1] = await generateStreamData(feedStore, MESSAGE_COUNT);

const messages = [];

const allowedFeeds = new Set(['/feed1']);
const stream = feedStore.createSelectiveStream(
async (feedDescriptor, message) => allowedFeeds.has(feedDescriptor.path)
);

stream.on('data', message => {
messages.push(message);
if (message.data.startsWith('allow-')) {
allowedFeeds.add(message.data.slice(6));
}
});

// only feed1 messages should be here at this point
await waitForExpect(async () => {
expect(messages.length === MESSAGE_COUNT);
expect(messages.every(msg => msg.data.startsWith('feed1')));
});

await append(feed1, 'allow-/feed2');

await waitForExpect(() => expect(messages.length).toBe(MESSAGE_COUNT * 2 + 1));

// TODO(marik-d): Test for sync events
});

test('feed is added later', async () => {
const feedStore = await FeedStore.create(ram, { feedOptions: { valueEncoding: 'utf-8' } });

const MESSAGE_COUNT = 10;

await generateStreamData(feedStore, MESSAGE_COUNT);

const messages = [];

const allowedFeeds = new Set(['/feed1', '/feed3']);
const stream = feedStore.createSelectiveStream(
async (feedDescriptor, message) => allowedFeeds.has(feedDescriptor.path)
);

stream.on('data', message => {
messages.push(message);
if (message.data.startsWith('allow-')) {
allowedFeeds.add(message.data.slice(6));
}
});

// only feed1 messages should be here at this point
await waitForExpect(async () => {
expect(messages.length === MESSAGE_COUNT);
expect(messages.every(msg => msg.data.startsWith('feed1')));
});

const feed = await feedStore.openFeed('/feed3');
await append(feed, 'allow-/feed2');

await waitForExpect(() => expect(messages.length).toBe(MESSAGE_COUNT * 2 + 1));

// TODO(marik-d): Test for sync events
});
});

0 comments on commit 9ddb5e3

Please sign in to comment.