Skip to content

Commit 3dc31f4

Browse files
committed
fix(Consumer): 使用自己编写的Timer代替setTimeout
1 parent c74cab6 commit 3dc31f4

File tree

2 files changed

+63
-2
lines changed

2 files changed

+63
-2
lines changed

lib/consumer.js

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
const EventEmitter = require('events').EventEmitter;
1010
const utils = require('./utils');
11+
const Timer = require('./timer');
1112

1213
class Consumer extends EventEmitter {
1314

@@ -64,6 +65,8 @@ class Consumer extends EventEmitter {
6465
};
6566
this._heartbeatTid = setInterval(heartbeat, this._heartbeat * 1000);
6667

68+
this._timer = new Timer(1000);
69+
6770
this._debug = utils.debug('consumer:' + this.name);
6871
this._debug('created: queue=%s, capacity=%s, redis=%j', this.queue, this.capacity, options.redis);
6972

@@ -148,6 +151,8 @@ class Consumer extends EventEmitter {
148151

149152
};
150153

154+
const timer = this._timer;
155+
151156
class Message {
152157

153158
constructor(producerName, msgId, expire, data, originData) {
@@ -177,10 +182,10 @@ class Consumer extends EventEmitter {
177182

178183
checkTimeout(callback) {
179184
if (this._isDone) return;
180-
setTimeout(() => {
185+
timer.add(this.expire, () => {
181186
this.reject(new utils.MessageProcessingTimeoutError());
182187
callback && callback(null);
183-
}, Date.now() - this.expire * 1000);
188+
});
184189
}
185190

186191
}
@@ -199,6 +204,7 @@ class Consumer extends EventEmitter {
199204
clearInterval(this._heartbeatTid);
200205
this._redisPull.end();
201206
this._redis.end();
207+
this._timer.destroy();
202208
callback && callback(null);
203209
}
204210

lib/timer.js

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
'use strict';
2+
3+
/**
4+
* super-queue Timer
5+
*
6+
* @authro Zongmin Lei <leizongmin@gmail.com>
7+
*/
8+
9+
const EventEmitter = require('events').EventEmitter;
10+
const utils = require('./utils');
11+
12+
let counter = 0;
13+
14+
class Timer {
15+
16+
constructor(interval) {
17+
18+
this.interval = interval || 1000;
19+
this.list = [];
20+
this._tid = setInterval(() => {
21+
this.check();
22+
}, this.interval);
23+
24+
this.id = counter++;
25+
this._debug = utils.debug('timer:#' + this.id);
26+
this._debug('created');
27+
28+
}
29+
30+
add(expire, callback) {
31+
this.list.push({expire, callback});
32+
}
33+
34+
check() {
35+
const now = utils.secondTimestamp();
36+
this.list.sort((a, b) => a.expire - b.expire);
37+
let i = 0;
38+
for (; i < this.list.length; i++) {
39+
const item = this.list[i];
40+
if (item.expire > now) break;
41+
}
42+
const expires = this.list.slice(0, i);
43+
this.list = this.list.slice(i);
44+
for (const item of expires) {
45+
item.callback(item.expire);
46+
}
47+
}
48+
49+
destroy() {
50+
clearInterval(this._tid);
51+
}
52+
53+
}
54+
55+
module.exports = Timer;

0 commit comments

Comments
 (0)