Skip to content

Commit c74cab6

Browse files
committed
feat(Consumer): add method msg.checkTimeout(callback)
1 parent 27fd756 commit c74cab6

File tree

4 files changed

+28
-2
lines changed

4 files changed

+28
-2
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ c.listen(msg => {
9696
// msg.expire = 消息过期秒时间戳
9797
// msg.reject(err) 消息处理出错
9898
// msg.resolve(result) 消息处理成功
99+
// msg.checkTimeout(callback) 检查执行是否超时,如果在expire之后的时间还没有响应,则自动响应一个MessageProcessingTimeoutError,并执行回调函数
99100
});
100101

101102
// 退出

lib/consumer.js

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,17 +156,33 @@ class Consumer extends EventEmitter {
156156
this.expire = expire;
157157
this.data = data;
158158
this.originData = originData;
159+
this._checkTimeoutTid = null;
160+
this._isDone = false;
159161
}
160162

161163
reject(err) {
162-
reply(this.producerName, this.msgId, 'e', (err && err.message) || err.toString(), this.originData);
164+
if (this._isDone) return;
165+
clearTimeout(this._checkTimeoutTid);
166+
this._isDone = true;
167+
reply(this.producerName, this.msgId, 'e', ((err && err.message) || err).toString(), this.originData);
163168
}
164169

165170
resolve(data) {
171+
if (this._isDone) return;
172+
clearTimeout(this._checkTimeoutTid);
173+
this._isDone = true;
166174
if (typeof data !== 'string') throw new Error('`data` must be string');
167175
reply(this.producerName, this.msgId, 's', data, this.originData);
168176
}
169177

178+
checkTimeout(callback) {
179+
if (this._isDone) return;
180+
setTimeout(() => {
181+
this.reject(new utils.MessageProcessingTimeoutError());
182+
callback && callback(null);
183+
}, Date.now() - this.expire * 1000);
184+
}
185+
170186
}
171187

172188
pull();

lib/utils.js

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,15 @@ function MessageExpiredError(message) {
109109
MessageExpiredError.prototype = Error.prototype;
110110
utils.MessageExpiredError = MessageExpiredError;
111111

112+
function MessageProcessingTimeoutError(message) {
113+
Error.captureStackTrace(this, MessageProcessingTimeoutError);
114+
this.name = 'MessageProcessingTimeoutError';
115+
this.message = (message || '');
116+
this.code = 'msg_timeout';
117+
}
118+
MessageProcessingTimeoutError.prototype = Error.prototype;
119+
utils.MessageProcessingTimeoutError = MessageProcessingTimeoutError;
120+
112121
utils.classMethodWrapPromise = function (obj, methods) {
113122
for (const method of methods) {
114123
obj.prototype['_' + method + 'Callback'] = obj.prototype[method];

test/test_normal.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ function generateOptions(options) {
1717
options.queue = utils.date('Ymd_His_') + utils.randomString(10);
1818
options.redis = {
1919
host: '127.0.0.1',
20-
port: 6378,
20+
port: 6379,
2121
db: 15,
2222
prefix: 'TEST:',
2323
};

0 commit comments

Comments
 (0)