Skip to content
This repository has been archived by the owner on Jul 27, 2021. It is now read-only.

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
tinchoz49 committed Jun 3, 2020
1 parent 532d1b2 commit 43b4936
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 19 deletions.
23 changes: 12 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -151,20 +151,15 @@ Find an opened feed using a filter callback.
Creates a ReadableStream from the loaded feeds.

- `options: Object`: Default options for each feed.createReadStream(options). Optional.
- `batch: Number`: Defines the batch number of blocks to read in each iteration. Default: 100.
- `live: Boolean`: Defines the stream as a live stream. Will wait for new incoming data. Default: false.
- `feedStoreInfo: Boolean`: Enables streaming objects with additional feed information:
- `data: Buffer`: The original chunk of the block data.
- `seq: Number`: Sequence number of the read block.
- `key: Buffer`: Key of the read feed.
- `path: String`: FeedStore path of the read feed.
- `metadata: Object`: FeedStore metadata of the read feed.
- `callback: descriptor => Promise<(Object|undefined)>`: Filter function to return options for each feed.createReadStream(). Returns `undefined` will ignore the feed. Optional.
- `descriptor: FeedDescriptor`

The data returned will be an object with:

- `data: Buffer`: The original chunk of the block data.
- `seq: Number`: Sequence number of the read block.
- `key: Buffer`: Key of the read feed.
- `path: String`: FeedStore path of the read feed.
- `metadata: Object`: FeedStore metadata of the read feed.
- `sync: Boolean`: It reports if the stream is in sync with the data.

Usage:

```javascript
Expand All @@ -183,6 +178,12 @@ const stream = feedStore.createReadStream(({ metadata }) => {
return { live: true, start: 10 } // Start reading from index 10.
}
})

// With additional information.
const stream = feedStore.createReadStream({ feedStoreInfo: true })
stream.on('data', data => {
console.log(data) // { data, seq, key, path, metadata }
})
```

#### `feedStore.createBatchStream([callback|options]) -> ReadableStream`
Expand Down
8 changes: 4 additions & 4 deletions src/feed-store.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ describe('FeedStore', () => {

const onSync = jest.fn();
const messages = [];
const stream = feedStore.createReadStream();
const stream = feedStore.createReadStream({ feedStoreInfo: true });
stream.on('data', (msg) => {
messages.push(msg);
});
Expand Down Expand Up @@ -393,7 +393,7 @@ describe('FeedStore', () => {

const onSync = jest.fn();
const messages = [];
const stream = feedStore.createReadStream(descriptor => !descriptor.key.equals(feed2.key));
const stream = feedStore.createReadStream(descriptor => (!descriptor.key.equals(feed2.key) && { feedStoreInfo: true }));
stream.on('data', (msg) => messages.push(msg));
stream.on('sync', onSync);
await new Promise(resolve => eos(stream, () => resolve()));
Expand All @@ -420,7 +420,7 @@ describe('FeedStore', () => {

const onSync = jest.fn();
const messages = [];
const stream = feedStore.createReadStream({ live: true });
const stream = feedStore.createReadStream({ live: true, feedStoreInfo: true });
stream.on('data', (msg) => messages.push(msg));
stream.on('sync', onSync);
for (let i = 0; i < 2000; i++) {
Expand Down Expand Up @@ -454,7 +454,7 @@ describe('FeedStore', () => {

const onSync = jest.fn();
const batches = [];
const stream = feedStore.createBatchStream({ batch: 50 });
const stream = feedStore.createBatchStream({ batch: 50, feedStoreInfo: true });
stream.on('data', (msg) => {
batches.push(msg);
});
Expand Down
20 changes: 16 additions & 4 deletions src/reader.js
Original file line number Diff line number Diff line change
Expand Up @@ -145,18 +145,30 @@ export default class Reader {
metadata: { path, metadata }
}, this._options, typeof streamOptions === 'object' ? streamOptions : {});

const { feedStoreInfo = false } = streamOptions;

const stream = createBatchStream(feed, streamOptions);

eos(stream, () => {
this._feeds.delete(feed);
});

const transform = through.obj((messages, _, next) => {
if (this._inBatch) {
transform.push(messages);
if (feedStoreInfo) {
if (this._inBatch) {
transform.push(messages);
} else {
for (const message of messages) {
transform.push(message);
}
}
} else {
for (const message of messages) {
transform.push(message);
if (this._inBatch) {
transform.push(messages.map(m => m.data));
} else {
for (const message of messages) {
transform.push(message.data);
}
}
}

Expand Down

0 comments on commit 43b4936

Please sign in to comment.