Skip to content

Commit 595d0ce

Browse files
committed
feat: default to calculating speed over a range of time
BREAKING CHANGES: - `new StreamSpeed(timeUnit)` - `new StreamSpeed({ timeUnit })` - Speed is now calculated based on total data read over a range of time, defaults to 1sec. Previously there were 2 speeds, current speed and average speed. Current speed was calculated based on amount of data received in latest read over the time difference between latest read and last read. - `speed` event now only emits one speed.
1 parent f63fd24 commit 595d0ce

7 files changed

Lines changed: 164 additions & 180 deletions

File tree

README.md

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ let ss = new StreamSpeed();
1515
ss.add(rs);
1616

1717
// Listen for events emitted by streamspeed on the given stream.
18-
ss.on('speed', (speed, avgSpeed) => {
18+
ss.on('speed', (speed) => {
1919
console.log('Reading at', speed, 'bytes per second');
2020
});
2121
```
@@ -28,16 +28,18 @@ group.add(stream1);
2828
group.add(stream2);
2929
group.add(stream3);
3030

31-
group.on('speed', (speed, avg) => {
31+
group.on('speed', (speed) => {
3232
console.log('now reading at', speed, 'bps');
3333
});
3434
```
3535

3636
![example img](http://i.imgur.com/y47Sc.png)
3737

3838
# API
39-
### new StreamSpeed([timeUnit])
40-
A group that can be used to watch several streams. Will emit `speed` events. `timeUnit` defaults to `1000` for speed per second.
39+
### new StreamSpeed([options])
40+
A group that can be used to watch several streams. Will emit `speed` events. `options` can have the following properties,
41+
- `timeUnit` - Defaults to `1000` for speed per second. If you want another unit such as per hour, use `1000 * 60 * 60`.
42+
- `range` - The time in ms to use to calculate the speed over. Defaults to 1000. The longer this is, the longer it'll speed will ramp up and down until it stabilizes for a big readstream. The shorter it is, the more responsive it is to sudden speed changes.
4143

4244
### StreamSpeed#add(stream)
4345
Adds stream to group.
@@ -51,9 +53,6 @@ Returns a list of all streams in the group.
5153
### StreamSpeed#speed
5254
Curent speed.
5355

54-
### StreamSpeed#avg
55-
Current average speed.
56-
5756
### StreamSpeed.toHuman(bytes, options)
5857
Convenient method to convert `bytes` to a human readable string.
5958

@@ -66,9 +65,8 @@ StreamSpeed.toHuman(1024 * 1024 * 20.5, { precision: 3 }); // 20.50MB
6665

6766
### Event: 'speed'
6867
* `number` - Speed at which streams in the group are being read.
69-
* `number` - Average speed.
7068

71-
Will be emitted after the second time a stream is read and only if there is a change in speed.
69+
Will be emitted every time a stream is read and only if there is a change in speed.
7270

7371

7472
# Install

lib/index.js

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,18 @@ module.exports = class StreamSpeed extends EventEmitter {
99
*
1010
* @constructor
1111
* @extends {EventEmitter}
12-
* @param {number?} per The time unit speed will be measured in.
12+
* @param {Object?} options
13+
* @param {number?} options.timeUnit The time unit speed will be measured in.
14+
* @param {number?} options.range Time in ms to calculate speed over.
1315
*/
14-
constructor(per) {
16+
constructor(options = {}) {
1517
super();
16-
this.per = per || 1000;
18+
this.options = Object.assign({
19+
timeUnit: 1000,
20+
range: 1000,
21+
}, options);
1722
this._streams = [];
1823
this.speed = 0;
19-
this.avg = 0;
2024
}
2125

2226

@@ -25,11 +29,9 @@ module.exports = class StreamSpeed extends EventEmitter {
2529
*
2630
* @param {Object} meta
2731
* @param {number} speed
28-
* @param {number} avg
2932
*/
30-
_update(meta, speed, avg) {
33+
_update(meta, speed) {
3134
meta.speed = speed;
32-
meta.avg = avg;
3335

3436
this._streams.forEach((m) => {
3537
// Skip own stream, streams that haven't started,
@@ -40,12 +42,10 @@ module.exports = class StreamSpeed extends EventEmitter {
4042

4143
// Add other streams' speeds to total.
4244
speed += m.speed;
43-
avg += m.avg;
4445
});
4546

4647
this.speed = speed;
47-
this.avg = avg;
48-
this.emit('speed', speed, avg);
48+
this.emit('speed', speed);
4949
}
5050

5151

@@ -87,11 +87,10 @@ module.exports = class StreamSpeed extends EventEmitter {
8787
const meta = {
8888
stream : origstream,
8989
speed : 0,
90-
avg : 0,
9190
cleanup,
9291
};
9392
this._streams.push(meta);
94-
const reader = new Speedometer(this.per);
93+
const reader = new Speedometer(this.options);
9594
const stream = origstream.pipe(new PassThrough());
9695
const onUpdate = this._update.bind(this, meta);
9796

lib/speedometer.js

Lines changed: 23 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,14 @@ module.exports = class Speedometer {
33
* Helps count the number of bytes per data event in streams.
44
*
55
* @constructor
6-
* @param {number} per
6+
* @param {Object?} options
7+
* @param {number?} options.timeUnit
8+
* @param {number?} options.range
79
*/
8-
constructor(per) {
9-
this.per = per;
10-
this.starttime = null;
10+
constructor(options) {
11+
this.options = options;
1112
this.history = [];
1213
this.speed = 0;
13-
this.avg = 0;
1414
}
1515

1616

@@ -23,49 +23,32 @@ module.exports = class Speedometer {
2323
update(data, callback) {
2424
const now = Date.now();
2525

26-
// Check if this is the first data event.
27-
if (!this.starttime) {
28-
this.starttime = now;
29-
return;
30-
}
31-
32-
let lastpoint = this.history[this.history.length - 1];
33-
let currpoint;
34-
35-
if (now === this.starttime) {
36-
// Do nothing on the rare occassion we get all the data at the start.
37-
// We need two time points to measure speed.
38-
return;
39-
40-
} else if (lastpoint && now === lastpoint.time) {
41-
// If more data is read on the same ms, aggregate it.
42-
let lastlastpoint = this.history[this.history.length - 2];
43-
if (!lastlastpoint) { return; }
44-
currpoint = lastpoint;
45-
currpoint.speed += data.length / (now - lastlastpoint.time);
46-
47-
} else {
48-
// Compare now to last time.
49-
let lasttime = lastpoint ? lastpoint.time : this.starttime;
50-
this.history.push(currpoint = {
51-
speed: data.length / (now - lasttime),
52-
time: now,
26+
if (this.history.length) {
27+
// Remove old data events.
28+
let index = this.history.findIndex(item => {
29+
return item.time > now - this.options.range;
5330
});
31+
this.history = index > -1 ? this.history.slice(index) : [];
5432
}
5533

56-
let speed = Math.round(currpoint.speed * this.per);
34+
this.history.push({
35+
speed: data.length,
36+
time: now,
37+
});
38+
39+
// Get total data emitted in `range` time period.
40+
let totaldata = this.history.reduce((sum, point) => point.speed + sum, 0);
5741

58-
// Get average speed.
59-
let total = this.history.reduce((sum, point) => point.speed + sum, 0);
60-
const avg = Math.round((total * this.per) / this.history.length);
42+
let speed = Math.round(
43+
totaldata / this.options.range * this.options.timeUnit
44+
);
6145

62-
const change = this.speed !== speed || this.avg !== avg;
46+
const change = this.speed !== speed;
6347
this.speed = speed;
64-
this.avg = avg;
6548

66-
// Only emit event if there is a change in speed or avg.
49+
// Only emit event if there is a change in speed.
6750
if (change) {
68-
callback(speed, avg);
51+
callback(speed);
6952
}
7053
}
7154
};

test/group-test.js

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,13 @@ describe('Create a group and write to it', () => {
4141
group.on('speed', spy);
4242

4343
// Write at 500 B/s on 2 streams, 1000 B/s in total.
44-
s1.interval(100, 6, 200, { end: true, skipTick: true });
45-
s2.interval(100, 6, 200, { end: true });
44+
s1.interval(500, 6, 1000, { end: true, skipTick: true });
45+
s2.interval(500, 6, 1000, { end: true });
46+
4647
s2.on('finish', () => {
47-
assert.deepEqual(spy.firstCall.args, [500, 500]);
48-
assert.deepEqual(spy.secondCall.args, [1000, 1000]);
48+
assert.equal(spy.callCount, 2);
49+
assert.deepEqual(spy.getCall(0).args, [500]);
50+
assert.deepEqual(spy.getCall(1).args, [1000]);
4951
done();
5052
});
5153
});

test/mockstream.js

Lines changed: 39 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@ const PassThrough = require('stream').PassThrough;
22
const sinon = require('sinon');
33

44
let clock;
5-
before(() => { clock = sinon.useFakeTimers(); });
6-
after(() => { clock.restore(); });
5+
beforeEach(() => { clock = sinon.useFakeTimers(); });
6+
afterEach(() => { clock.restore(); });
77

88

99
/**
@@ -26,29 +26,47 @@ module.exports = class Mock extends PassThrough {
2626
options.amountPerInterval = options.amountPerInterval || 1;
2727
let i = 0;
2828

29-
const iid = setInterval(() => {
30-
let amountSoFar = 0;
31-
const write = () => {
32-
if (++amountSoFar < options.amountPerInterval) {
33-
this.write(Buffer.alloc(length), write);
34-
} else {
35-
this.write(Buffer.alloc(length));
36-
if (++i === amount) {
37-
clearInterval(iid);
38-
if (options.end) {
39-
process.nextTick(this.end.bind(this));
40-
}
41-
resolve();
42-
} else if (!options.skipTick) {
43-
process.nextTick(clock.tick.bind(clock, interval));
44-
}
29+
const iid = setInterval(async () => {
30+
for (let soFar = 0; soFar < options.amountPerInterval; soFar++) {
31+
await this.writeSize(length);
32+
}
33+
if (++i === amount) {
34+
clearInterval(iid);
35+
if (options.end) {
36+
process.nextTick(this.end.bind(this));
4537
}
46-
};
47-
write();
38+
resolve();
39+
} else if (!options.skipTick) {
40+
process.nextTick(clock.tick.bind(clock, interval));
41+
}
4842
}, interval);
4943
if (!options.skipTick) {
50-
process.nextTick(clock.tick.bind(clock, interval));
44+
if (interval) {
45+
process.nextTick(clock.tick.bind(clock, interval));
46+
} else {
47+
process.nextTick(clock.next);
48+
}
5149
}
5250
});
5351
}
52+
53+
/**
54+
* An async function that writes arbitrary data to the stream.
55+
*
56+
* @param {number} length
57+
*/
58+
writeSize(length) {
59+
return new Promise((resolve) => {
60+
this.write(Buffer.alloc(length), resolve);
61+
});
62+
}
63+
64+
/**
65+
* Async version of `setTimeout`.
66+
*
67+
* @param {number} ms
68+
*/
69+
static timeout(ms) {
70+
clock.tick(ms);
71+
}
5472
};

test/remove-test.js

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,24 +32,26 @@ describe('Immediately remove a stream', () => {
3232

3333
describe('Unwatch after several writes', () => {
3434
it('Emits no events after calling remove', async () => {
35-
const ss = new StreamSpeed(1);
35+
const ss = new StreamSpeed();
3636
const s = new MockStream();
3737
ss.add(s);
3838
const spy = sinon.spy();
3939
ss.on('speed', spy);
4040

4141
// Write at 1 bps for 0.5 seconds.
4242
await s.interval(100, 2, 200, { amountPerInterval: 2 });
43-
assert.equal(spy.callCount, 1);
43+
assert.ok(spy.called);
44+
let callCount = spy.callCount;
45+
4446
ss.remove(s);
4547
await s.interval(100, 1, 200, { amountPerInterval: 2 });
46-
assert.equal(spy.callCount, 1);
48+
assert.equal(spy.callCount, callCount);
4749
});
4850

4951
describe('Try removing stream again', () => {
5052
it('Throws error', () => {
5153
assert.throws(() => {
52-
const ss = new StreamSpeed(1);
54+
const ss = new StreamSpeed({ timeUnit: 1 });
5355
const s = new MockStream();
5456
ss.add(s);
5557
ss.remove(s);

0 commit comments

Comments
 (0)