Skip to content

Commit

Permalink
feat: support consume back
Browse files Browse the repository at this point in the history
  • Loading branch information
denghongcai committed Dec 10, 2018
1 parent f1258c2 commit 11039f6
Show file tree
Hide file tree
Showing 12 changed files with 246 additions and 54 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,4 @@ node_modules

example/.config.js
.nyc_output/
.idea
174 changes: 128 additions & 46 deletions lib/consumer/mq_push_consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ const utils = require('../utils');
const logger = require('../logger');
const MixAll = require('../mix_all');
const MQClient = require('../mq_client');
const MQProducer = require('../producer/mq_producer');
const sleep = require('mz-modules/sleep');
const ClientConfig = require('../client_config');
const ProcessQueue = require('../process_queue');
Expand All @@ -19,6 +20,8 @@ const LocalFileOffsetStore = require('../store/local_file');
const LocalMemoryOffsetStore = require('../store/local_memory');
const RemoteBrokerOffsetStore = require('../store/remote_broker');
const AllocateMessageQueueAveragely = require('./rebalance/allocate_message_queue_averagely');
const Message = require('../message/message');
const MessageConst = require('../message/message_const');

const defaultOptions = {
logger,
Expand All @@ -41,14 +44,19 @@ const defaultOptions = {
pullInterval: 0, // 拉取消息的频率, 如果为了降低拉取速度,可以设置大于0的值
consumeMessageBatchMaxSize: 1, // 消费一批消息,最大数
pullBatchSize: 32, // 拉消息,一次拉多少条
parallelConsumeLimit: 1, // 并发消费消息限制
postSubscriptionWhenPull: true, // 是否每次拉消息时,都上传订阅关系
allocateMessageQueueStrategy: new AllocateMessageQueueAveragely(), // 队列分配算法,应用可重写
maxReconsumeTimes: 16, // 最大重试次数
};

class MQPushConsumer extends ClientConfig {
constructor(options) {
assert(options && options.consumerGroup, '[MQPushConsumer] options.consumerGroup is required');
super(Object.assign({ initMethod: 'init' }, defaultOptions, options));
const mergedOptions = Object.assign({ initMethod: 'init' }, defaultOptions, options);
assert(mergedOptions.parallelConsumeLimit <= mergedOptions.pullBatchSize,
'[MQPushConsumer] options.parallelConsumeLimit must lte pullBatchSize');
super(mergedOptions);

// @example:
// pullFromWhichNodeTable => {
Expand Down Expand Up @@ -85,6 +93,10 @@ class MQPushConsumer extends ClientConfig {
return this._processQueueTable;
}

get parallelConsumeLimit() {
return this.options.parallelConsumeLimit;
}

get consumerGroup() {
return this.options.consumerGroup;
}
Expand All @@ -107,6 +119,7 @@ class MQPushConsumer extends ClientConfig {

async init() {
this._mqClient.registerConsumer(this.consumerGroup, this);
await MQProducer.getDefaultProducer(this.options);
await this._mqClient.ready();
await this._offsetStore.load();
this.logger.info('[mq:consumer] consumer started');
Expand Down Expand Up @@ -148,79 +161,119 @@ class MQPushConsumer extends ClientConfig {
* @return {void}
*/
subscribe(topic, subExpression, handler) {
const _self = this;

if (arguments.length === 2) {
handler = subExpression;
subExpression = null;
}
assert(is.asyncFunction(handler), '[MQPushConsumer] handler should be a asyncFunction');
assert(!_self.subscriptions.has(topic), `[MQPushConsumer] ONLY one handler allowed for topic=${topic}`);
assert(!this.subscriptions.has(topic), `[MQPushConsumer] ONLY one handler allowed for topic=${topic}`);

const subscriptionData = _self.buildSubscriptionData(_self.consumerGroup, topic, subExpression);
const subscriptionData = this.buildSubscriptionData(this.consumerGroup, topic, subExpression);
const retryTopic = MixAll.getRetryTopic(this.consumerGroup);
const retrySubscriptionData = this.buildSubscriptionData(this.consumerGroup, retryTopic, subExpression);
this.subscriptions.set(retryTopic, {
handler,
subscriptionData: retrySubscriptionData,
});
const tagsSet = subscriptionData.tagsSet;
const needFilter = !!tagsSet.length;
_self.subscriptions.set(topic, {
this.subscriptions.set(topic, {
handler,
subscriptionData,
});

(async () => {
try {
await _self.ready();
await this.ready();
// 如果 topic 没有路由信息,先更新一下
if (!_self._topicSubscribeInfoTable.has(topic)) {
await _self._mqClient.updateAllTopicRouterInfo();
await _self._mqClient.sendHeartbeatToAllBroker();
await _self._mqClient.doRebalance();
if (!this._topicSubscribeInfoTable.has(topic)) {
await this._mqClient.updateAllTopicRouterInfo();
await this._mqClient.sendHeartbeatToAllBroker();
await this._mqClient.doRebalance();
}

while (!_self._isClosed && _self.subscriptions.has(topic)) {
const mqList = _self._topicSubscribeInfoTable.get(topic);
let hasMsg = false;
if (mqList && mqList.length) {
for (const mq of mqList) {
const item = _self._processQueueTable.get(mq.key);
if (item) {
const pq = item.processQueue;
while (pq.msgCount) {
hasMsg = true;
const msg = pq.msgList[0];
if (!msg.tags || !needFilter || tagsSet.includes(msg.tags)) {
await Promise.all([ (async () => {
// 消息消费循环
while (!this._isClosed && this.subscriptions.has(topic)) {
await this._consumeMessageLoop(topic, needFilter, tagsSet, handler, subExpression);
}
})(), (async () => {
// 重试消息消费循环
while (!this._isClosed && this.subscriptions.has(retryTopic)) {
await this._consumeMessageLoop(retryTopic, needFilter, tagsSet, handler, subExpression);
}
})() ]);

} catch (err) {
this._handleError(err);
}
})();
}

async _consumeMessageLoop(topic, needFilter, tagsSet, handler, subExpression) {
const mqList = this._topicSubscribeInfoTable.get(topic);
let hasMsg = false;
if (mqList && mqList.length) {
for (const mq of mqList) {
const item = this._processQueueTable.get(mq.key);
if (item) {
const pq = item.processQueue;
while (pq.msgCount) {
hasMsg = true;
let msgs;
if (this.parallelConsumeLimit > pq.msgCount) {
msgs = pq.msgList.slice(0, pq.msgCount);
} else {
msgs = pq.msgList.slice(0, this.parallelConsumeLimit);
}
let localRetry = false;
for (const msg of msgs) {
utils.resetRetryTopic(msg, this.consumerGroup);
if (!msg.tags || !needFilter || tagsSet.includes(msg.tags)) {
try {
if (msg.reconsumeTimes < this.options.maxReconsumeTimes) {
await handler(msg, mq, pq);
}
} catch (err) {
err.message = `process mq message failed, topic: ${msg.topic}, msgId: ${msg.msgId}, ${err.message}`;
this.emit('error', err);
if (this.messageModel === MessageModel.CLUSTERING) {
// 发送重试消息
try {
await handler(msg, mq, pq);
// delayLevel 为 0 代表由服务端控制重试间隔
await this.sendMessageBack(msg, 0, mq.brokerName, this.consumerGroup);
} catch (err) {
err.message = `process mq message failed, topic: ${msg.topic}, msgId: ${msg.msgId}, ${err.message}`;
_self.emit('error', err);
if (_self.messageModel === MessageModel.CLUSTERING) {
// TODO: support retry message
msg.reconsumeTimes++;
await _self._sleep(5000);
continue;
} else {
_self.logger.warn('[MQPushConsumer] BROADCASTING consume message failed, drop it, msgId: %s', msg.msgId);
}
this.emit('error', err);
this.logger.error('[MQPushConsumer] send reconsume message failed, fall to local retry, msgId: %s', msg.msgId);
// 重试消息发送失败,本地重试
msg.reconsumeTimes++;
await this._sleep(5000);
localRetry = true;
}
} else {
_self.logger.debug('[MQPushConsumer] message filter by tags=, msg.tags=%s', subExpression, msg.tags);
}
const offset = pq.remove();
if (offset >= 0) {
_self._offsetStore.updateOffset(mq, offset, true);
this.logger.warn('[MQPushConsumer] BROADCASTING consume message failed, drop it, msgId: %s', msg.msgId);
}
}
} else {
this.logger.debug('[MQPushConsumer] message filter by tags=, msg.tags=%s', subExpression, msg.tags);
}
}
}

if (!hasMsg) {
await _self.await(`topic_${topic}_changed`);
if (localRetry) {
continue;
}
// 注意这里必须是批量确认
const offset = pq.remove(msgs.length);
if (offset >= 0) {
this._offsetStore.updateOffset(mq, offset, true);
}
}
}
} catch (err) {
_self._handleError(err);
}
})();
}

if (!hasMsg) {
await this.await(`topic_${topic}_changed`);
}
}

/**
Expand Down Expand Up @@ -639,6 +692,35 @@ class MQPushConsumer extends ClientConfig {
// todo: consume later ?
}


async sendMessageBack(msg, delayLevel, brokerName, consumerGroup) {
const brokerAddr = brokerName ? this._mqClient.findBrokerAddressInPublish(brokerName)
: msg.storeHost;
const thatConsumerGroup = consumerGroup ? consumerGroup : this.consumerGroup;
try {
await this._mqClient.consumerSendMessageBack(
brokerAddr,
msg,
thatConsumerGroup,
delayLevel,
3000,
this.options.maxReconsumeTimes);
} catch (err) {
err.mesasge = 'sendMessageBack() occurred an exception, ' + thatConsumerGroup + ', ' + err.mesasge;
this._handleError(err);

const newMsg = new Message(MixAll.getRetryTopic(thatConsumerGroup), '', msg.body);
newMsg.flag = msg.flag;
newMsg.properties = msg.properties;
newMsg.originMessageId = msg.originMessageId || msg.msgId;
newMsg.retryTopic = msg.topic;
newMsg.properties[MessageConst.PROPERTY_RECONSUME_TIME] = String(msg.reconsumeTimes + 1);
newMsg.properties[MessageConst.PROPERTY_MAX_RECONSUME_TIMES] = this.options.maxReconsumeTimes;
newMsg.delayTimeLevel = 1 + msg.reconsumeTimes;
await (await MQProducer.getDefaultProducer()).send(newMsg);
}
}

// * viewMessage(msgId) {
// const info = MessageDecoder.decodeMessageId(msgId);
// return yield this._mqClient.viewMessage(info.address, Number(info.offset.toString()), 3000);
Expand Down
24 changes: 24 additions & 0 deletions lib/message/message.js
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,30 @@ class Message {
this.properties[MessageConst.PROPERTY_KEYS] = val.join(MessageConst.KEY_SEPARATOR).trim();
}

/**
* 原始消息 Id
* @property {String} Message#originMessageId
*/
get originMessageId() {
return this.properties && this.properties[MessageConst.PROPERTY_ORIGIN_MESSAGE_ID];
}

set originMessageId(val) {
this.properties[MessageConst.PROPERTY_ORIGIN_MESSAGE_ID] = val;
}

/**
* 重试 Topic
* @property {String} Message#retryTopic
*/
get retryTopic() {
return this.properties && this.properties[MessageConst.PROPERTY_RETRY_TOPIC];
}

set retryTopic(val) {
this.properties[MessageConst.PROPERTY_RETRY_TOPIC] = val;
}

/**
* 消息延时投递时间级别,0表示不延时,大于0表示特定延时级别(具体级别在服务器端定义)
* @property {Number} Message#delayTimeLevel
Expand Down
1 change: 1 addition & 0 deletions lib/message/message_const.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,6 @@ exports.PROPERTY_TRANSFER_FLAG = 'TRANSFER_FLAG';
exports.PROPERTY_CORRECTION_FLAG = 'CORRECTION_FLAG';
exports.PROPERTY_MQ2_FLAG = 'MQ2_FLAG';
exports.PROPERTY_RECONSUME_TIME = 'RECONSUME_TIME';
exports.PROPERTY_MAX_RECONSUME_TIMES = 'MAX_RECONSUME_TIMES';

exports.KEY_SEPARATOR = ' ';
6 changes: 6 additions & 0 deletions lib/mix_all.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
exports.DEFAULT_TOPIC = 'TBW102';
exports.DEFAULT_PRODUCER_GROUP = 'DEFAULT_PRODUCER';
exports.DEFAULT_CONSUMER_GROUP = 'DEFAULT_CONSUMER';
exports.CLIENT_INNER_PRODUCER_GROUP = 'CLIENT_INNER_PRODUCER';

exports.DEFAULT_CHARSET = 'UTF-8';
exports.MASTER_ID = 0;
Expand All @@ -11,6 +12,11 @@ exports.RETRY_GROUP_TOPIC_PREFIX = '%RETRY%';
// 为每个Consumer Group建立一个默认的Topic,前缀 + GroupName,用来保存重试多次都失败,接下来不再重试的消息
exports.DLQ_GROUP_TOPIC_PREFIX = '%DLQ%';

/**
* 获取 RETRY_TOPIC
* @param {string} consumerGroup consumerGroup
* @return {string} %RETRY%+consumerGroup
*/
exports.getRetryTopic = consumerGroup => {
return exports.RETRY_GROUP_TOPIC_PREFIX + consumerGroup;
};
2 changes: 1 addition & 1 deletion lib/mq_client.js
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ class MQClient extends MQClientAPI {
/**
* regitser consumer
* @param {String} group - consumer group name
* @param {Comsumer} consumer - consumer instance
* @param {Consumer} consumer - consumer instance
* @return {void}
*/
registerConsumer(group, consumer) {
Expand Down
31 changes: 31 additions & 0 deletions lib/mq_client_api.js
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,7 @@ class MQClientAPI extends RemotingClient {
i: requestHeader.properties,
j: requestHeader.reconsumeTimes,
k: requestHeader.unitMode,
l: requestHeader.maxReconsumeTimes,
};
const request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
request.body = msg.body;
Expand Down Expand Up @@ -500,6 +501,36 @@ class MQClientAPI extends RemotingClient {
};
}

/**
* consumer send message back
* @param {String} brokerAddr - broker address
* @param {Message} msg - message object
* @param {String} consumerGroup - consumer group
* @param {Number} delayLevel - delay level
* @param {Number} timeoutMillis - timeout in millis
* @param {Number} maxConsumeRetryTimes - max retry times
*/
async consumerSendMessageBack(brokerAddr, msg, consumerGroup, delayLevel, timeoutMillis, maxConsumeRetryTimes) {
const requestHeader = {
offset: msg.commitLogOffset,
group: consumerGroup,
delayLevel,
originMsgId: msg.msgId,
originTopic: msg.topic,
unitMode: false,
maxReconsumeTimes: maxConsumeRetryTimes,
};
const request = RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader);
const response = await this.invoke(brokerAddr, request, timeoutMillis);
switch (response.code) {
case ResponseCode.SUCCESS:
return;
default:
this._defaultHandler(request, response);
break;
}
}

// * viewMessage(brokerAddr, phyoffset, timeoutMillis) {
// const requestHeader = {
// offset: phyoffset,
Expand Down
Loading

0 comments on commit 11039f6

Please sign in to comment.