Skip to content

Commit f183ccd

Browse files
committed
feat(Monitor): 当consumer下线时,自动将其未完成的消息转移回主队列
1 parent 803f9c1 commit f183ccd

File tree

1 file changed

+17
-7
lines changed

1 file changed

+17
-7
lines changed

lib/monitor.js

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -89,10 +89,10 @@ class Monitor extends EventEmitter {
8989
this._previousConsumers = [];
9090
this._autoCheckTid = setInterval(() => this.check(), this._interval * 1000);
9191
this._emitCounter = 0;
92-
93-
this.on('consumerDown', name => {
94-
this.withdrawnProcessingQueue(name);
92+
this.on('consumerDown', info => {
93+
this.withdrawnProcessingQueue(info);
9594
});
95+
this.check();
9696

9797
this._debug = utils.debug('monitor:' + this.name);
9898
this._debug('created: interval=%s, redis=%j', this._interval, options.redis);
@@ -131,7 +131,7 @@ class Monitor extends EventEmitter {
131131
}
132132

133133
const emit = (event, list) => {
134-
//if (this._emitCounter < 1) return this._debug('fist time emit, skip');
134+
if (this._emitCounter < 1) return this._debug('fist time emit, skip');
135135
for (const item of list) {
136136
this.emit(event, item);
137137
}
@@ -255,11 +255,21 @@ class Monitor extends EventEmitter {
255255

256256
}
257257

258-
withdrawnProcessingQueue(queue) {
258+
withdrawnProcessingQueue(info, callback) {
259+
260+
const processingKey = utils.getProcessingQueueKey(this._redisPrefix, info.queue, info.name);
261+
const waitingKey = utils.getQueueKey(this._redisPrefix, info.queue);
262+
this._debug('withdrawnProcessingQueue: %s -> %s', processingKey, waitingKey);
259263

260-
const key = utils.getProcessingQueueKey(this._redisPrefix, queue);
264+
callback = callback || () => {};
265+
const next = (err, ret) => {
266+
if (err) return callback(err);
267+
if (!ret) return callback(null);
268+
if (ret !== true) this._debug('withdrawnProcessingQueue: %s -> %s %s', processingKey, waitingKey, ret);
269+
this._redis.rpoplpush(processingKey, waitingKey, next);
270+
};
271+
next(null, true);
261272

262-
this._debug('withdrawnProcessingQueue');
263273
}
264274

265275
/**

0 commit comments

Comments
 (0)