Skip to content
This repository has been archived by the owner on Jul 27, 2021. It is now read-only.

Commit

Permalink
Merge ccc501d into 0a3b45d
Browse files Browse the repository at this point in the history
  • Loading branch information
tinchoz49 committed Dec 6, 2019
2 parents 0a3b45d + ccc501d commit 68f6fd0
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 40 deletions.
26 changes: 18 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -142,22 +142,32 @@ Find an opened feed using a filter callback.

- `descriptor: FeedDescriptor`

#### `feedStore.createReadStream(descriptor => (ReadableStream|Boolean)) -> ReadableStream`
#### `feedStore.createReadStream([options], [callback]) -> ReadableStream`

Creates a ReadableStream from the loaded feeds.

It uses an optional callback function to return the stream for each feed.

NOTE: If the callback returns `false` it will ignore the feed.

- `options: Object`: Default options for each feed.createReadStream(options). Optional.
- `callback: descriptor => (Object|undefined)`: Filter function to return options for each feed.createReadStream(). Returns `undefined` will ignore the feed. Optional.
- `descriptor: FeedDescriptor`

Usage:

```javascript
const stream = feedStore.createReadStream(descriptor => {
if (descriptor.metadata.tag === 'foo') {
return descriptor.feed.createReadStream()

// Live streaming from all the opened feeds.
const stream = feedStore.createReadStream({ live: true })

// Live streaming, from feeds filter by tag === 'foo'
const stream = feedStore.createReadStream({ live: true }, ({ metadata }) => {
if (metadata.tag === 'foo') {
return { start: 10 } // Start reading from index 10.
}
})

// Live streaming, from feeds tag === 'foo'
const stream = feedStore.createReadStream({ metadata }) => {
if (metadata.tag === 'foo') {
return { live: true, start: 10 } // Start reading from index 10.
}
})
```
Expand Down
31 changes: 19 additions & 12 deletions src/feed-store.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ const STORE_NAMESPACE = '@feedstore';
*
* @callback StreamCallback
* @param {FeedDescriptor} descriptor
* @returns {(ReadableStream|boolean)}
* @returns {(Object|undefined)}
*/

/**
Expand Down Expand Up @@ -306,29 +306,36 @@ export class FeedStore extends EventEmitter {
/**
* Creates a ReadableStream from the loaded feeds.
*
* Uses a callback function to return the stream for each feed.
* NOTE: If the callback returns `false` it will ignore the feed.
*
* @param {StreamCallback} [callback] Function to call the feed.createReadStream() for each feed.
* @param {Object} [options] Default options for each feed.createReadStream(options).
* @param {StreamCallback} [callback] Filter function to return options for each feed.createReadStream(). Returns `undefined` will ignore the feed.
* @returns {ReadableStream}
*/
createReadStream (callback = ({ feed }) => feed.createReadStream()) {
createReadStream (options, callback = () => ({})) {
if (typeof options === 'function') {
callback = options;
options = {};
} else if (options === undefined) {
options = {};
}

const multiReader = multi.obj();

this
.getDescriptors()
.filter(descriptor => descriptor.opened)
.forEach(descriptor => {
const stream = callback(descriptor);
if (stream) {
multiReader.add(stream);
const { feed } = descriptor;
const streamOptions = callback(descriptor);
if (streamOptions) {
multiReader.add(feed.createReadStream(Object.assign({}, options, streamOptions)));
}
});

const onFeed = (_, descriptor) => {
const stream = callback(descriptor);
if (stream) {
multiReader.add(stream);
const { feed } = descriptor;
const streamOptions = callback(descriptor);
if (streamOptions) {
multiReader.add(feed.createReadStream(Object.assign({}, options, streamOptions)));
}
};

Expand Down
56 changes: 36 additions & 20 deletions src/feed-store.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -262,28 +262,34 @@ describe('FeedStore', () => {
]);

const stream = feedStore.createReadStream();
const liveStream = feedStore.createReadStream(({ feed }) => feed.createReadStream({ live: true }));

const messages = [];
stream.on('data', (chunk) => {
messages.push(chunk.toString('utf8'));
});

const liveMessages = [];
liveStream.on('data', (chunk) => {
liveMessages.push(chunk.toString('utf8'));
});
const liveStream1 = testLiveStream({ live: true });
const liveStream2 = testLiveStream({ live: false }, () => ({ live: true }));

await eos(stream);
expect(messages.sort()).toEqual(['bar1', 'foo1']);

const quz = await feedStore.openFeed('/quz');
await pify(quz.append.bind(quz))('quz1');
await wait(() => {
expect(liveMessages.sort()).toEqual(['bar1', 'foo1', 'quz1']);
});

liveStream.destroy();
await Promise.all([liveStream1, liveStream2]);

async function testLiveStream (...args) {
const liveMessages = [];
const liveStream = feedStore.createReadStream(...args);
liveStream.on('data', (chunk) => {
liveMessages.push(chunk.toString('utf8'));
});
await wait(() => {
expect(liveMessages.sort()).toEqual(['bar1', 'foo1', 'quz1']);
});
liveStream.destroy();
}
});

test('createReadStreamByFilter', async () => {
Expand All @@ -302,9 +308,14 @@ describe('FeedStore', () => {
return feed.createReadStream();
}
});
const liveStream = feedStore.createReadStream(({ metadata = {}, feed }) => {
const liveStream1 = testLiveStream(({ metadata = {} }) => {
if (metadata.topic === 'topic1') {
return { live: true };
}
});
const liveStream2 = testLiveStream({ live: false }, ({ metadata = {} }) => {
if (metadata.topic === 'topic1') {
return feed.createReadStream({ live: true });
return { live: true };
}
});

Expand All @@ -313,11 +324,6 @@ describe('FeedStore', () => {
messages.push(chunk.toString('utf8'));
});

const liveMessages = [];
liveStream.on('data', (chunk) => {
liveMessages.push(chunk.toString('utf8'));
});

await eos(stream);
expect(messages.sort()).toEqual(['foo1']);

Expand All @@ -327,10 +333,20 @@ describe('FeedStore', () => {
const quz = await feedStore.openFeed('/quz', { metadata: { topic: 'topic1' } });
await pify(quz.append.bind(quz))('quz1');

await wait(() => {
expect(liveMessages.sort()).toEqual(['foo1', 'quz1']);
});
await Promise.all([liveStream1, liveStream2]);

liveStream.destroy();
async function testLiveStream (...args) {
const liveMessages = [];
const liveStream = feedStore.createReadStream(...args);

liveStream.on('data', (chunk) => {
liveMessages.push(chunk.toString('utf8'));
});

await wait(() => {
expect(liveMessages.sort()).toEqual(['foo1', 'quz1']);
});
liveStream.destroy();
}
});
});

0 comments on commit 68f6fd0

Please sign in to comment.