Skip to content
This repository has been archived by the owner on Jul 27, 2021. It is now read-only.

Commit

Permalink
Merge 43b4936 into 87b4cc1
Browse files Browse the repository at this point in the history
  • Loading branch information
tinchoz49 committed Jun 3, 2020
2 parents 87b4cc1 + 43b4936 commit b6a66eb
Show file tree
Hide file tree
Showing 10 changed files with 2,609 additions and 344 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,5 @@ dist

# TernJS port file
.tern-port

.benchmark
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,10 @@ stream.on('data', data => {
})
```

#### `feedStore.createBatchStream([callback|options]) -> ReadableStream`

Almost equal to `createReadStream` but the batch messages will be returned in a single array of messages.

### Events

#### `feedStore.on('ready', () => {})`
Expand Down
113 changes: 113 additions & 0 deletions benchmark.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
//
// Copyright 2019 DxOS.
//

const { createStorage } = require('@dxos/random-access-multi-storage');
const { Suite } = require('@dxos/benchmark-suite');

const { FeedStore } = require('.');

const range = n => [...Array(n).keys()];

(async () => {
const maxFeeds = 5;
const maxMessages = 1000;
const expectedMessages = maxFeeds * maxMessages;

const check = count => {
if (count !== expectedMessages) {
throw new Error('messages amount expected incorrect');
}
};

const fs = await FeedStore.create(createStorage('.benchmark'), { feedOptions: { valueEncoding: 'utf8' } });
const suite = new Suite(fs, { maxFeeds, maxMessages });

suite.beforeAll(() => {
return Promise.all(range(maxFeeds).map(async i => {
const name = `feed/${i}`;
const feed = await fs.openFeed(name);

for (let i = 0; i < maxMessages; i++) {
await new Promise((resolve, reject) => {
feed.append(`${name}/${i}`, (err) => {
if (err) return reject(err);
resolve();
});
});
}

return feed;
}));
});

suite.test('getBatch', async () => {
let count = 0;

await Promise.all(fs.getOpenFeeds().map(feed => {
return new Promise((resolve, reject) => {
feed.getBatch(0, maxMessages, (err, result) => {
count += result.length;
if (err) return reject(err);
resolve();
});
});
}));

check(count);
});

suite.test('createReadStream [batch=1]', async () => {
const stream = fs.createReadStream({ batch: 1 });
let count = 0;

await new Promise((resolve, reject) => {
stream.on('data', (data) => {
count++;
if (count === expectedMessages) resolve();
});
});

stream.destroy();

check(count);
});

suite.test('createReadStream [batch=100]', async () => {
const stream = fs.createReadStream({ batch: 100 });
let count = 0;

await new Promise((resolve, reject) => {
stream.on('data', (data) => {
count++;
if (count === expectedMessages) resolve();
});
});

stream.destroy();

check(count);
});

suite.test('createBatchStream [batch=100]', async () => {
const stream = fs.createBatchStream({ batch: 100 });
let count = 0;

await new Promise((resolve, reject) => {
stream.on('data', (data) => {
count += data.length;
if (count === expectedMessages) resolve();
});
});

stream.destroy();

check(count);
});

const results = await suite.run();

suite.print(results);

process.exit(0);
})();
6 changes: 6 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
"scripts": {
"build": "npm run clean && npm run build:babel",
"build:babel": "babel ./src --out-dir ./dist --ignore \"**/*.test.js\" --source-maps",
"benchmark": "npm run build && npm run benchmark:node && npm run benchmark:browser",
"benchmark:node": "del-cli .benchmark && node benchmark.js",
"benchmark:browser": "browser-runner benchmark.js --timeout 0",
"clean": "del-cli dist",
"coverage": "npm test -- --coverage",
"coveralls": "npm run coverage && cat ./coverage/lcov.info | coveralls",
Expand Down Expand Up @@ -58,6 +61,9 @@
"@babel/core": "^7.4.5",
"@babel/plugin-proposal-export-default-from": "^7.5.2",
"@babel/preset-env": "^7.4.5",
"@dxos/benchmark-suite": "^1.0.0-beta.1",
"@dxos/browser-runner": "^1.0.0-beta.8",
"@dxos/random-access-multi-storage": "^1.1.0-beta.3",
"babel-eslint": "^10.0.2",
"babel-jest": "^24.8.0",
"babel-plugin-add-module-exports": "^1.0.2",
Expand Down
121 changes: 121 additions & 0 deletions src/create-batch-stream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
//
// Copyright 2019 DxOS.
//

import assert from 'assert';
import streamFrom from 'from2';

export default function createBatchStream (feed, opts = {}) {
assert(!opts.batch || opts.batch > 0, 'batch must be major or equal to 1');

let start = opts.start || 0;
let end = typeof opts.end === 'number' ? opts.end : -1;
const live = !!opts.live;
const snapshot = opts.snapshot !== false;
const batch = opts.batch || 100;
const metadata = opts.metadata || {};
let batchEnd = 0;
let batchLimit = 0;
let seq = start;
let first = true;
let firstSyncEnd = end;

let range = feed.download({ start: start, end: end, linear: true });

const stream = streamFrom.obj(read).on('end', cleanup).on('close', cleanup);

return stream;

function read (size, cb) {
if (!feed.opened) return open(size, cb);
if (!feed.readable) return cb(new Error('Feed is closed'));

if (first) {
if (end === -1) {
if (live) end = Infinity;
else if (snapshot) end = feed.length;
if (start > end) return cb(null, null);
}
if (opts.tail) start = feed.length;
firstSyncEnd = end === Infinity ? feed.length : end;
first = false;
}

if (start === end || (end === -1 && start === feed.length)) {
return cb(null, null);
}

if (batch === 1) {
seq = setStart(start + 1);
feed.get(seq, opts, (err, data) => {
if (err) return cb(err);
cb(null, [buildMessage(data)]);
});
return;
}

batchEnd = start + batch;
batchLimit = end === Infinity ? feed.length : end;
if (batchEnd > batchLimit) {
batchEnd = batchLimit;
}

if (!feed.downloaded(start, batchEnd)) {
seq = setStart(start + 1);
feed.get(seq, opts, (err, data) => {
if (err) return cb(err);
cb(null, [buildMessage(data)]);
});
return;
}

seq = setStart(batchEnd);
feed.getBatch(seq, batchEnd, opts, (err, messages) => {
if (err || messages.length === 0) {
cb(err);
return;
}

cb(null, messages.map(buildMessage));
});
}

function buildMessage (data) {
const message = {
key: feed.key,
seq: seq++,
data,
sync: (firstSyncEnd === false && feed.length === seq) || firstSyncEnd === 0 || firstSyncEnd === seq,
...metadata
};

if (message.sync && firstSyncEnd !== false) {
firstSyncEnd = false;
}

return message;
}

function cleanup () {
if (!range) return;
feed.undownload(range);
range = null;
}

function open (size, cb) {
feed.ready(function (err) {
if (err) return cb(err);
read(size, cb);
});
}

function setStart (value) {
const prevStart = start;
start = value;
range.start = start;
if (range.iterator) {
range.iterator.start = start;
}
return prevStart;
}
}
89 changes: 0 additions & 89 deletions src/create-read-stream.js

This file was deleted.

0 comments on commit b6a66eb

Please sign in to comment.