Skip to content

Commit

Permalink
Improved batcher accuracy
Browse files Browse the repository at this point in the history
  • Loading branch information
Simon Grondin committed Oct 13, 2018
1 parent 09b4b82 commit 4fbb933
Show file tree
Hide file tree
Showing 6 changed files with 131 additions and 59 deletions.
28 changes: 15 additions & 13 deletions README.md
Expand Up @@ -29,7 +29,7 @@ Not using npm? Import the `bottleneck.min.js` file.

Most APIs have a rate limit. For example, to execute 3 requests per second:
```js
import Bottleneck from "bottleneck"
import Bottleneck from "bottleneck";

const limiter = new Bottleneck({
minTime: 333
Expand Down Expand Up @@ -126,7 +126,7 @@ Instead of throttling maybe [you want to batch up requests](#batching) into fewe

##### Gotchas

* Make sure you're catching `error` events emitted by your limiters!
* Make sure you're catching `"error"` events emitted by your limiters!

* Consider setting a `maxConcurrent` value instead of leaving it `null`. This can help your application's performance, especially if you think the limiter's queue might become very long.

Expand Down Expand Up @@ -345,7 +345,7 @@ Checks if a new job would be executed immediately if it was submitted now. Retur

### Events

Event names: `error`, `empty`, `idle`, `dropped`, `depleted` and `debug`.
Event names: `"error"`, `"empty"`, `"idle"`, `"dropped"`, `"depleted"` and `"debug"`.

__error__
```js
Expand All @@ -354,9 +354,9 @@ limiter.on("error", function (error) {
});
```

By far the most common source of errors is uncaught exceptions in your application code. If the jobs you add to Bottleneck don't catch their own exceptions, the limiter will emit an `error` event.
By far the most common source of errors is uncaught exceptions in your application code. If the jobs you add to Bottleneck don't catch their own exceptions, the limiter will emit an `"error"` event.

If using Clustering, errors thrown by the Redis client will emit an `error` event.
If using Clustering, errors thrown by the Redis client will emit an `"error"` event.

__empty__
```js
Expand Down Expand Up @@ -496,14 +496,14 @@ __on("created")__
group.on("created", (limiter, key) => {
console.log("A new limiter was created for key: " + key)

// Prepare the limiter, for example we'll want to listen to its 'error' events!
// Prepare the limiter, for example we'll want to listen to its "error" events!
limiter.on("error", (err) => {
// Handle errors here
})
});
```

Listening for the `created` event is the recommended way to set up a new limiter. Your event handler is executed before `key()` returns the newly created limiter.
Listening for the `"created"` event is the recommended way to set up a new limiter. Your event handler is executed before `key()` returns the newly created limiter.

__updateSettings()__

Expand Down Expand Up @@ -552,6 +552,8 @@ batcher.add("some-data");
batcher.add("some-other-data");
```

`batcher.add()` returns a Promise that resolves once the request has been flushed to a `"batch"` event.

| Option | Default | Description |
|--------|---------|-------------|
| `maxTime` | `null` (unlimited) | Maximum acceptable time (in milliseconds) a request can have to wait before being flushed to the `"batch"` event. |
Expand Down Expand Up @@ -615,8 +617,8 @@ Due to the above, functionality relying on the queue length happens purely local
- Priorities are local. A higher priority job will run before a lower priority job **on the same limiter**. Another limiter on the cluster might run a lower priority job before our higher priority one.
- Assuming constant priority levels, Bottleneck guarantees that jobs will be run in the order they were received **on the same limiter**. Another limiter on the cluster might run a job received later before ours runs.
- `highWater` and load shedding ([strategies](#strategies)) are per limiter. However, one limiter entering Blocked mode will put the entire cluster in Blocked mode until `penalty` milliseconds have passed. See [Strategies](#strategies).
- The `empty` event is triggered when the (local) queue is empty.
- The `idle` event is triggered when the (local) queue is empty *and* no jobs are currently running anywhere in the cluster.
- The `"empty"` event is triggered when the (local) queue is empty.
- The `"idle"` event is triggered when the (local) queue is empty *and* no jobs are currently running anywhere in the cluster.

You must work around these limitations in your application code if they are an issue to you. The `publish()` method could be useful here.

Expand All @@ -628,7 +630,7 @@ It is **strongly recommended** that you set an `expiration` (See [Job Options](#

Network latency between Node.js and Redis is not taken into account when calculating timings (such as `minTime`). To minimize the impact of latency, Bottleneck performs the absolute minimum number of state accesses. Keeping the Redis server close to your limiters will help you get a more consistent experience. Keeping the clients' OS time consistent will also help.

It is **strongly recommended** to [set up an `error` listener](#events) on all your limiters and on your Groups.
It is **strongly recommended** to [set up an `"error"` listener](#events) on all your limiters and on your Groups.

Bottleneck does not guarantee that the concurrency will be spread evenly across limiters. With `{ maxConcurrent: 5 }`, it's absolutely possible for a single limiter to end up running 5 jobs simultaneously while the other limiters in the cluster sit idle. To spread the load, use the `.chain()` method:

Expand Down Expand Up @@ -656,7 +658,7 @@ The `ready()`, `publish()` and `clients()` methods also exist when using the `lo

This method returns a promise that resolves once the limiter is connected to Redis.

As of v2.9.0, it's no longer necessary to wait for `.ready()` to resolve before issuing commands to a limiter. The commands will be queued until the limiter successfully connects. Make sure to listen to the `error` event to handle connection errors.
As of v2.9.0, it's no longer necessary to wait for `.ready()` to resolve before issuing commands to a limiter. The commands will be queued until the limiter successfully connects. Make sure to listen to the `"error"` event to handle connection errors.

```js
const limiter = new Bottleneck({/* options */});
Expand Down Expand Up @@ -758,11 +760,11 @@ If you created the Connection object manually, you need to call `connection.disc

Debugging complex scheduling logic can be difficult, especially when priorities, weights, and network latency all interact with one another.

If your application is not behaving as expected, start by making sure you're catching `error` [events emitted](#events) by your limiters and your Groups. Those errors are most likely uncaught exceptions from your application code.
If your application is not behaving as expected, start by making sure you're catching `"error"` [events emitted](#events) by your limiters and your Groups. Those errors are most likely uncaught exceptions from your application code.

Make sure you've read the ['Gotchas'](#gotchas) section.

To see exactly what a limiter is doing in real time, listen to the `debug` event. It contains detailed information about how the limiter is executing your code. Adding [job IDs](#job-options) to all your jobs makes the debug output more readable.
To see exactly what a limiter is doing in real time, listen to the `"debug"` event. It contains detailed information about how the limiter is executing your code. Adding [job IDs](#job-options) to all your jobs makes the debug output more readable.

When Bottleneck has to fail one of your jobs, it does so by using `BottleneckError` objects. This lets you tell those errors apart from your own code's errors:
```js
Expand Down
24 changes: 7 additions & 17 deletions bottleneck.js
Expand Up @@ -11,36 +11,22 @@
Batcher = function () {
class Batcher {
constructor(options = {}) {
var base;
this.options = options;
parser.load(this.options, this.defaults, this);
this.Events = new Events(this);
this._arr = [];
this._resetPromise();
this._lastFlush = Date.now();
if (this.maxTime != null) {
if (typeof (base = this.interval = setInterval(() => {
if (Date.now() >= this._lastFlush + this.maxTime && this._arr.length > 0) {
return this._flush();
}
}, Math.max(Math.floor(this.maxTime / 5), 25))).unref === "function") {
base.unref();
}
}
}

_resetPromise() {
var _promise$_resolve;

var _promise, _resolve;
_resolve = null;
_promise = new this.Promise(function (res, rej) {
return _resolve = res;
return this._promise = new this.Promise((res, rej) => {
return this._resolve = res;
});
return _promise$_resolve = { _promise, _resolve }, this._promise = _promise$_resolve._promise, this._resolve = _promise$_resolve._resolve, _promise$_resolve;
}

_flush() {
clearTimeout(this._timeout);
this._lastFlush = Date.now();
this._resolve();
this.Events.trigger("batch", [this._arr]);
Expand All @@ -54,6 +40,10 @@
ret = this._promise;
if (this._arr.length === this.maxSize) {
this._flush();
} else if (this.maxTime != null && this._arr.length === 1) {
this._timeout = setTimeout(() => {
return this._flush();
}, this.maxTime);
}
return ret;
}
Expand Down
2 changes: 1 addition & 1 deletion bottleneck.min.js

Large diffs are not rendered by default.

24 changes: 7 additions & 17 deletions lib/Batcher.js
Expand Up @@ -10,36 +10,22 @@
Batcher = function () {
class Batcher {
constructor(options = {}) {
var base;
this.options = options;
parser.load(this.options, this.defaults, this);
this.Events = new Events(this);
this._arr = [];
this._resetPromise();
this._lastFlush = Date.now();
if (this.maxTime != null) {
if (typeof (base = this.interval = setInterval(() => {
if (Date.now() >= this._lastFlush + this.maxTime && this._arr.length > 0) {
return this._flush();
}
}, Math.max(Math.floor(this.maxTime / 5), 25))).unref === "function") {
base.unref();
}
}
}

_resetPromise() {
var _promise$_resolve;

var _promise, _resolve;
_resolve = null;
_promise = new this.Promise(function (res, rej) {
return _resolve = res;
return this._promise = new this.Promise((res, rej) => {
return this._resolve = res;
});
return _promise$_resolve = { _promise, _resolve }, this._promise = _promise$_resolve._promise, this._resolve = _promise$_resolve._resolve, _promise$_resolve;
}

_flush() {
clearTimeout(this._timeout);
this._lastFlush = Date.now();
this._resolve();
this.Events.trigger("batch", [this._arr]);
Expand All @@ -53,6 +39,10 @@
ret = this._promise;
if (this._arr.length === this.maxSize) {
this._flush();
} else if (this.maxTime != null && this._arr.length === 1) {
this._timeout = setTimeout(() => {
return this._flush();
}, this.maxTime);
}
return ret;
}
Expand Down
14 changes: 6 additions & 8 deletions src/Batcher.coffee
Expand Up @@ -13,18 +13,12 @@ class Batcher
@_arr = []
@_resetPromise()
@_lastFlush = Date.now()
if @maxTime?
(@interval = setInterval =>
if Date.now() >= @_lastFlush + @maxTime && @_arr.length > 0
@_flush()
, Math.max(Math.floor(@maxTime / 5), 25)).unref?()

_resetPromise: ->
_resolve = null
_promise = new @Promise (res, rej) -> _resolve = res
{ @_promise, @_resolve } = { _promise, _resolve }
@_promise = new @Promise (res, rej) => @_resolve = res

_flush: ->
clearTimeout @_timeout
@_lastFlush = Date.now()
@_resolve()
@Events.trigger "batch", [@_arr]
Expand All @@ -36,6 +30,10 @@ class Batcher
ret = @_promise
if @_arr.length == @maxSize
@_flush()
else if @maxTime? and @_arr.length == 1
@_timeout = setTimeout =>
@_flush()
, @maxTime
ret

module.exports = Batcher
98 changes: 95 additions & 3 deletions test/batcher.js
Expand Up @@ -40,7 +40,6 @@ describe('Batcher', function () {
.then(function (results) {
c.checkDuration(50, 20)
c.mustEqual(batches, [[1, 2, 3], [4, 5]])
assert(batcher.interval != null)
})
})

Expand Down Expand Up @@ -82,7 +81,6 @@ describe('Batcher', function () {
.then(function (results) {
c.checkDuration(100)
c.mustEqual(batches, [[1, 2], [3, 4]])
assert(batcher.interval != null)
})
})

Expand Down Expand Up @@ -111,7 +109,101 @@ describe('Batcher', function () {
.then(function (results) {
c.checkDuration(0)
c.mustEqual(batches, [[1, 2], [3, 4]])
assert(batcher.interval == null)
})
})

it('Should stagger flushes', function () {
c = makeTest()
var batcher = new Bottleneck.Batcher({
maxTime: 50,
maxSize: 3
})
var t0 = Date.now()
var batches = []

batcher.on('batch', function (batcher) {
batches.push(batcher)
})

return Promise.all([
batcher.add(1).then((x) => c.limiter.schedule(c.promise, null, Date.now() - t0, 1)),
batcher.add(2).then((x) => c.limiter.schedule(c.promise, null, Date.now() - t0, 2))
])
.then(function (data) {
c.mustEqual(
data.map((([t, x]) => [Math.floor(t / 50), x])),
[[1, 1], [1, 2]]
)

var promises = []
promises.push(batcher.add(3).then((x) => c.limiter.schedule(c.promise, null, Date.now() - t0, 3)))

return c.wait(10)
.then(function () {
promises.push(batcher.add(4).then((x) => c.limiter.schedule(c.promise, null, Date.now() - t0, 4)))

return Promise.all(promises)
})
})
.then(function (data) {
c.mustEqual(
data.map((([t, x]) => [Math.floor(t / 50), x])),
[[2, 3], [2, 4]]
)

return c.last()
})
.then(function (results) {
c.checkDuration(120, 20)
c.mustEqual(batches, [[1, 2], [3, 4]])
})
})

it('Should force then stagger flushes', function () {
c = makeTest()
var batcher = new Bottleneck.Batcher({
maxTime: 50,
maxSize: 3
})
var t0 = Date.now()
var batches = []

batcher.on('batch', function (batcher) {
batches.push(batcher)
})

var promises = []
promises.push(batcher.add(1).then((x) => c.limiter.schedule(c.promise, null, Date.now() - t0, 1)))
promises.push(batcher.add(2).then((x) => c.limiter.schedule(c.promise, null, Date.now() - t0, 2)))

return c.wait(10)
.then(function () {
promises.push(batcher.add(3).then((x) => c.limiter.schedule(c.promise, null, Date.now() - t0, 3)))

return Promise.all(promises)
})
.then(function (data) {
c.mustEqual(
data.map((([t, x]) => [Math.floor(t / 50), x])),
[[0, 1], [0, 2], [0, 3]]
)

return Promise.all([
batcher.add(4).then((x) => c.limiter.schedule(c.promise, null, Date.now() - t0, 4)),
batcher.add(5).then((x) => c.limiter.schedule(c.promise, null, Date.now() - t0, 5)),
])
})
.then(function (data) {
c.mustEqual(
data.map((([t, x]) => [Math.floor(t / 50), x])),
[[1, 4], [1, 5]]
)

return c.last()
})
.then(function (results) {
c.checkDuration(85, 20)
c.mustEqual(batches, [[1, 2, 3], [4, 5]])
})
})
})

0 comments on commit 4fbb933

Please sign in to comment.