/
RateLimiterQueue.js
93 lines (81 loc) · 2.57 KB
/
RateLimiterQueue.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
const RateLimiterQueueError = require('./component/RateLimiterQueueError')
const MAX_QUEUE_SIZE = 4294967295;
module.exports = class RateLimiterQueue {
constructor(limiterFlexible, opts = {
maxQueueSize: MAX_QUEUE_SIZE,
}) {
this._key = 'limiter';
this._waitTimeout = null;
this._queue = [];
this._limiterFlexible = limiterFlexible;
this._maxQueueSize = opts.maxQueueSize
}
getTokensRemaining() {
return this._limiterFlexible.get(this._key)
.then((rlRes) => {
return rlRes.remainingPoints
})
}
removeTokens(tokens) {
const _this = this;
return new Promise((resolve, reject) => {
if (tokens > _this._limiterFlexible.points) {
reject(new RateLimiterQueueError(`Requested tokens ${tokens} exceeds maximum ${_this._limiterFlexible.points} tokens per interval`));
return
}
if (_this._queue.length > 0) {
_this._queueRequest.call(_this, resolve, reject, tokens);
} else {
_this._limiterFlexible.consume(_this._key, tokens)
.then((res) => {
resolve(res.remainingPoints);
})
.catch((rej) => {
if (rej instanceof Error) {
reject(rej);
} else {
_this._queueRequest.call(_this, resolve, reject, tokens);
if (_this._waitTimeout === null) {
_this._waitTimeout = setTimeout(_this._processFIFO.bind(_this), rej.msBeforeNext);
}
}
});
}
})
}
_queueRequest(resolve, reject, tokens) {
const _this = this;
if (_this._queue.length < _this._maxQueueSize) {
_this._queue.push({resolve, reject, tokens});
} else {
reject(new RateLimiterQueueError(`Number of requests reached it's maximum ${_this._maxQueueSize}`))
}
}
_processFIFO() {
const _this = this;
if (_this._waitTimeout !== null) {
clearTimeout(_this._waitTimeout);
_this._waitTimeout = null;
}
if (_this._queue.length === 0) {
return;
}
const item = _this._queue.shift();
_this._limiterFlexible.consume(_this._key, item.tokens)
.then((res) => {
item.resolve(res.remainingPoints);
_this._processFIFO.call(_this);
})
.catch((rej) => {
if (rej instanceof Error) {
item.reject(rej);
_this._processFIFO.call(_this);
} else {
_this._queue.unshift(item);
if (_this._waitTimeout === null) {
_this._waitTimeout = setTimeout(_this._processFIFO.bind(_this), rej.msBeforeNext);
}
}
});
}
};