From 8ef95027eef89456b17531b89761fa375168bfd6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?z=C5=8Dng=20y=C7=94?= Date: Fri, 12 Apr 2019 09:52:17 +0800 Subject: [PATCH] fix: drop message if reconsumeTimes > maxReconsumeTimes (#72) --- example/consumer.js | 4 ++- lib/consumer/mq_push_consumer.js | 11 ++++++-- package.json | 6 ++--- test/index.test.js | 45 ++++++++++++++++++++++++++++++++ 4 files changed, 60 insertions(+), 6 deletions(-) diff --git a/example/consumer.js b/example/consumer.js index 869735d..4056fdb 100644 --- a/example/consumer.js +++ b/example/consumer.js @@ -7,12 +7,14 @@ const config = require('./config'); const consumer = new Consumer(Object.assign(config, { httpclient, logger, + maxReconsumeTimes: 1, // isBroadcast: true, - consumeFromWhere: 'CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST', + // consumeFromWhere: 'CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST', })); consumer.subscribe(config.topic, '*', async function(msg) { console.log(`receive message, msgId: ${msg.msgId}, body: ${msg.body.toString()}`) + // throw new Error(123); }); consumer.on('error', err => console.log(err)); diff --git a/lib/consumer/mq_push_consumer.js b/lib/consumer/mq_push_consumer.js index beaeca5..2303a6e 100644 --- a/lib/consumer/mq_push_consumer.js +++ b/lib/consumer/mq_push_consumer.js @@ -266,12 +266,19 @@ class MQPushConsumer extends ClientConfig { async consumeSingleMsg(handler, msg, mq, pq) { // 集群消费模式下,如果消费失败,反复重试 while (!this._isClosed) { + if (msg.reconsumeTimes > this.options.maxReconsumeTimes) { + this.logger.warn('[MQPushConsumer] consume message failed, drop it for reconsumeTimes=%d and maxReconsumeTimes=%d, msgId: %s, originMsgId: %s', + msg.reconsumeTimes, this.options.maxReconsumeTimes, msg.msgId, msg.originMessageId); + return; + } + try { await handler(msg, mq, pq); return; } catch (err) { err.message = `process mq message failed, topic: ${msg.topic}, msgId: ${msg.msgId}, ${err.message}`; this.emit('error', err); + if (this.messageModel === MessageModel.CLUSTERING) { // 发送重试消息 try { @@ -715,8 +722,8 @@ class MQPushConsumer extends ClientConfig { async sendMessageBack(msg, delayLevel, brokerName, consumerGroup) { - const brokerAddr = brokerName ? this._mqClient.findBrokerAddressInPublish(brokerName) - : msg.storeHost; + const brokerAddr = brokerName ? this._mqClient.findBrokerAddressInPublish(brokerName) : + msg.storeHost; const thatConsumerGroup = consumerGroup ? consumerGroup : this.consumerGroup; try { await this._mqClient.consumerSendMessageBack( diff --git a/package.json b/package.json index 709c799..7e63841 100644 --- a/package.json +++ b/package.json @@ -53,11 +53,11 @@ "devDependencies": { "autod": "^3.1.0", "contributors": "^0.5.1", - "egg-bin": "^4.12.1", - "eslint": "^5.15.3", + "egg-bin": "^4.12.3", + "eslint": "^5.16.0", "eslint-config-egg": "^7.3.1", "mm": "^2.5.0", - "urllib": "^2.33.2" + "urllib": "^2.33.3" }, "engines": { "node": ">= 8.0.0" diff --git a/test/index.test.js b/test/index.test.js index 677ce37..b275964 100644 --- a/test/index.test.js +++ b/test/index.test.js @@ -360,13 +360,24 @@ describe('test/index.test.js', () => { describe('process exception', () => { let consumer; let producer; + const logger = { + info() {}, + warn() {}, + error(...args) { + console.error(...args); + }, + debug() {}, + }; beforeEach(async () => { consumer = new Consumer(Object.assign({ httpclient, + logger, rebalanceInterval: 2000, + maxReconsumeTimes: 1, }, config)); producer = new Producer(Object.assign({ httpclient, + logger, }, config)); await consumer.ready(); await producer.ready(); @@ -427,6 +438,40 @@ describe('test/index.test.js', () => { await consumer.await('*'); }); + it('should drop message if reconsumeTimes gt maxReconsumeTimes', async () => { + let msgId; + let reconsumeTimes = 0; + mm(consumer.logger, 'warn', (msg, reconsumeTimes, maxReconsumeTimes, id, originMessageId) => { + console.log(msg); + if (msg === '[MQPushConsumer] consume message failed, drop it for reconsumeTimes=%d and maxReconsumeTimes=%d, msgId: %s, originMsgId: %s' && + originMessageId === msgId) { + consumer.emit('*'); + } + }); + + consumer.subscribe(TOPIC, '*', async msg => { + console.warn('message receive ------------> ', msg.body.toString(), msg.reconsumeTimes); + if (msg.msgId === msgId || msg.originMessageId === msgId) { + assert(msg.body.toString() === 'Hello MetaQ !!! '); + reconsumeTimes = msg.reconsumeTimes; + throw new Error('mock error'); + } + }); + + await sleep(5000); + + const msg = new Message(TOPIC, // topic + 'TagA', // tag + 'Hello MetaQ !!! ' // body + ); + const sendResult = await producer.send(msg); + assert(sendResult && sendResult.msgId); + msgId = sendResult.msgId; + await consumer.await('*'); + + assert(reconsumeTimes === 1); + }); + it('should retry(retry message) if process failed', async () => { let msgId; mm(consumer._mqClient, 'consumerSendMessageBack', async () => {