Skip to content

Commit

Permalink
fix: drop message if reconsumeTimes > maxReconsumeTimes (#72)
Browse files Browse the repository at this point in the history
  • Loading branch information
gxcsoccer authored Apr 12, 2019
1 parent a44d4d0 commit 8ef9502
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 6 deletions.
4 changes: 3 additions & 1 deletion example/consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@ const config = require('./config');
const consumer = new Consumer(Object.assign(config, {
httpclient,
logger,
maxReconsumeTimes: 1,
// isBroadcast: true,
consumeFromWhere: 'CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST',
// consumeFromWhere: 'CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST',
}));

consumer.subscribe(config.topic, '*', async function(msg) {
console.log(`receive message, msgId: ${msg.msgId}, body: ${msg.body.toString()}`)
// throw new Error(123);
});

consumer.on('error', err => console.log(err));
11 changes: 9 additions & 2 deletions lib/consumer/mq_push_consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -266,12 +266,19 @@ class MQPushConsumer extends ClientConfig {
async consumeSingleMsg(handler, msg, mq, pq) {
// 集群消费模式下,如果消费失败,反复重试
while (!this._isClosed) {
if (msg.reconsumeTimes > this.options.maxReconsumeTimes) {
this.logger.warn('[MQPushConsumer] consume message failed, drop it for reconsumeTimes=%d and maxReconsumeTimes=%d, msgId: %s, originMsgId: %s',
msg.reconsumeTimes, this.options.maxReconsumeTimes, msg.msgId, msg.originMessageId);
return;
}

try {
await handler(msg, mq, pq);
return;
} catch (err) {
err.message = `process mq message failed, topic: ${msg.topic}, msgId: ${msg.msgId}, ${err.message}`;
this.emit('error', err);

if (this.messageModel === MessageModel.CLUSTERING) {
// 发送重试消息
try {
Expand Down Expand Up @@ -715,8 +722,8 @@ class MQPushConsumer extends ClientConfig {


async sendMessageBack(msg, delayLevel, brokerName, consumerGroup) {
const brokerAddr = brokerName ? this._mqClient.findBrokerAddressInPublish(brokerName)
: msg.storeHost;
const brokerAddr = brokerName ? this._mqClient.findBrokerAddressInPublish(brokerName) :
msg.storeHost;
const thatConsumerGroup = consumerGroup ? consumerGroup : this.consumerGroup;
try {
await this._mqClient.consumerSendMessageBack(
Expand Down
6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,11 @@
"devDependencies": {
"autod": "^3.1.0",
"contributors": "^0.5.1",
"egg-bin": "^4.12.1",
"eslint": "^5.15.3",
"egg-bin": "^4.12.3",
"eslint": "^5.16.0",
"eslint-config-egg": "^7.3.1",
"mm": "^2.5.0",
"urllib": "^2.33.2"
"urllib": "^2.33.3"
},
"engines": {
"node": ">= 8.0.0"
Expand Down
45 changes: 45 additions & 0 deletions test/index.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -360,13 +360,24 @@ describe('test/index.test.js', () => {
describe('process exception', () => {
let consumer;
let producer;
const logger = {
info() {},
warn() {},
error(...args) {
console.error(...args);
},
debug() {},
};
beforeEach(async () => {
consumer = new Consumer(Object.assign({
httpclient,
logger,
rebalanceInterval: 2000,
maxReconsumeTimes: 1,
}, config));
producer = new Producer(Object.assign({
httpclient,
logger,
}, config));
await consumer.ready();
await producer.ready();
Expand Down Expand Up @@ -427,6 +438,40 @@ describe('test/index.test.js', () => {
await consumer.await('*');
});

it('should drop message if reconsumeTimes gt maxReconsumeTimes', async () => {
let msgId;
let reconsumeTimes = 0;
mm(consumer.logger, 'warn', (msg, reconsumeTimes, maxReconsumeTimes, id, originMessageId) => {
console.log(msg);
if (msg === '[MQPushConsumer] consume message failed, drop it for reconsumeTimes=%d and maxReconsumeTimes=%d, msgId: %s, originMsgId: %s' &&
originMessageId === msgId) {
consumer.emit('*');
}
});

consumer.subscribe(TOPIC, '*', async msg => {
console.warn('message receive ------------> ', msg.body.toString(), msg.reconsumeTimes);
if (msg.msgId === msgId || msg.originMessageId === msgId) {
assert(msg.body.toString() === 'Hello MetaQ !!! ');
reconsumeTimes = msg.reconsumeTimes;
throw new Error('mock error');
}
});

await sleep(5000);

const msg = new Message(TOPIC, // topic
'TagA', // tag
'Hello MetaQ !!! ' // body
);
const sendResult = await producer.send(msg);
assert(sendResult && sendResult.msgId);
msgId = sendResult.msgId;
await consumer.await('*');

assert(reconsumeTimes === 1);
});

it('should retry(retry message) if process failed', async () => {
let msgId;
mm(consumer._mqClient, 'consumerSendMessageBack', async () => {
Expand Down

0 comments on commit 8ef9502

Please sign in to comment.