Skip to content

Commit

Permalink
feat: support consume back (#56)
Browse files Browse the repository at this point in the history
  • Loading branch information
denghongcai authored and gxcsoccer committed Dec 14, 2018
1 parent f1258c2 commit 9acff9b
Show file tree
Hide file tree
Showing 12 changed files with 373 additions and 63 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,4 @@ node_modules

example/.config.js
.nyc_output/
.idea
215 changes: 162 additions & 53 deletions lib/consumer/mq_push_consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ const utils = require('../utils');
const logger = require('../logger');
const MixAll = require('../mix_all');
const MQClient = require('../mq_client');
const MQProducer = require('../producer/mq_producer');
const sleep = require('mz-modules/sleep');
const ClientConfig = require('../client_config');
const ProcessQueue = require('../process_queue');
Expand All @@ -19,6 +20,8 @@ const LocalFileOffsetStore = require('../store/local_file');
const LocalMemoryOffsetStore = require('../store/local_memory');
const RemoteBrokerOffsetStore = require('../store/remote_broker');
const AllocateMessageQueueAveragely = require('./rebalance/allocate_message_queue_averagely');
const Message = require('../message/message');
const MessageConst = require('../message/message_const');

const defaultOptions = {
logger,
Expand All @@ -41,14 +44,19 @@ const defaultOptions = {
pullInterval: 0, // 拉取消息的频率, 如果为了降低拉取速度,可以设置大于0的值
consumeMessageBatchMaxSize: 1, // 消费一批消息,最大数
pullBatchSize: 32, // 拉消息,一次拉多少条
parallelConsumeLimit: 1, // 并发消费消息限制
postSubscriptionWhenPull: true, // 是否每次拉消息时,都上传订阅关系
allocateMessageQueueStrategy: new AllocateMessageQueueAveragely(), // 队列分配算法,应用可重写
maxReconsumeTimes: 16, // 最大重试次数
};

class MQPushConsumer extends ClientConfig {
constructor(options) {
assert(options && options.consumerGroup, '[MQPushConsumer] options.consumerGroup is required');
super(Object.assign({ initMethod: 'init' }, defaultOptions, options));
const mergedOptions = Object.assign({ initMethod: 'init' }, defaultOptions, options);
assert(mergedOptions.parallelConsumeLimit <= mergedOptions.pullBatchSize,
'[MQPushConsumer] options.parallelConsumeLimit must lte pullBatchSize');
super(mergedOptions);

// @example:
// pullFromWhichNodeTable => {
Expand Down Expand Up @@ -85,6 +93,10 @@ class MQPushConsumer extends ClientConfig {
return this._processQueueTable;
}

get parallelConsumeLimit() {
return this.options.parallelConsumeLimit;
}

get consumerGroup() {
return this.options.consumerGroup;
}
Expand All @@ -107,10 +119,30 @@ class MQPushConsumer extends ClientConfig {

async init() {
this._mqClient.registerConsumer(this.consumerGroup, this);
await MQProducer.getDefaultProducer(this.options);
await this._mqClient.ready();
await this._offsetStore.load();
this.logger.info('[mq:consumer] consumer started');
this._inited = true;

// 订阅重试 TOPIC
if (this.messageModel === MessageModel.CLUSTERING) {
const retryTopic = MixAll.getRetryTopic(this.consumerGroup);
this.subscribe(retryTopic, '*', async msg => {
const originTopic = msg.retryTopic;
const originMsgId = msg.originMessageId;
const subscription = this._subscriptions.get(originTopic) || {};
const handler = subscription.handler;
if (!MixAll.isRetryTopic(originTopic) && handler) {
await handler(msg);
} else {
this.logger.warn('[MQPushConsumer] retry message no handler, originTopic: %s, originMsgId: %s, msgId: %s',
originTopic,
originMsgId,
msg.msgId);
}
});
}
}

/**
Expand Down Expand Up @@ -148,79 +180,118 @@ class MQPushConsumer extends ClientConfig {
* @return {void}
*/
subscribe(topic, subExpression, handler) {
const _self = this;

if (arguments.length === 2) {
handler = subExpression;
subExpression = null;
}
assert(is.asyncFunction(handler), '[MQPushConsumer] handler should be a asyncFunction');
assert(!_self.subscriptions.has(topic), `[MQPushConsumer] ONLY one handler allowed for topic=${topic}`);
assert(!this.subscriptions.has(topic), `[MQPushConsumer] ONLY one handler allowed for topic=${topic}`);

const subscriptionData = _self.buildSubscriptionData(_self.consumerGroup, topic, subExpression);
const subscriptionData = this.buildSubscriptionData(this.consumerGroup, topic, subExpression);
const tagsSet = subscriptionData.tagsSet;
const needFilter = !!tagsSet.length;
_self.subscriptions.set(topic, {
this.subscriptions.set(topic, {
handler,
subscriptionData,
});

(async () => {
try {
await _self.ready();
await this.ready();
// 如果 topic 没有路由信息,先更新一下
if (!_self._topicSubscribeInfoTable.has(topic)) {
await _self._mqClient.updateAllTopicRouterInfo();
await _self._mqClient.sendHeartbeatToAllBroker();
await _self._mqClient.doRebalance();
if (!this._topicSubscribeInfoTable.has(topic)) {
await this._mqClient.updateAllTopicRouterInfo();
await this._mqClient.sendHeartbeatToAllBroker();
await this._mqClient.doRebalance();
}

while (!_self._isClosed && _self.subscriptions.has(topic)) {
const mqList = _self._topicSubscribeInfoTable.get(topic);
let hasMsg = false;
if (mqList && mqList.length) {
for (const mq of mqList) {
const item = _self._processQueueTable.get(mq.key);
if (item) {
const pq = item.processQueue;
while (pq.msgCount) {
hasMsg = true;
const msg = pq.msgList[0];
if (!msg.tags || !needFilter || tagsSet.includes(msg.tags)) {
try {
await handler(msg, mq, pq);
} catch (err) {
err.message = `process mq message failed, topic: ${msg.topic}, msgId: ${msg.msgId}, ${err.message}`;
_self.emit('error', err);
if (_self.messageModel === MessageModel.CLUSTERING) {
// TODO: support retry message
msg.reconsumeTimes++;
await _self._sleep(5000);
continue;
} else {
_self.logger.warn('[MQPushConsumer] BROADCASTING consume message failed, drop it, msgId: %s', msg.msgId);
}
}
} else {
_self.logger.debug('[MQPushConsumer] message filter by tags=, msg.tags=%s', subExpression, msg.tags);
}
const offset = pq.remove();
if (offset >= 0) {
_self._offsetStore.updateOffset(mq, offset, true);
}
}
// 消息消费循环
while (!this._isClosed && this.subscriptions.has(topic)) {
await this._consumeMessageLoop(topic, needFilter, tagsSet, subExpression);
}

} catch (err) {
this._handleError(err);
}
})();
}

async _consumeMessageLoop(topic, needFilter, tagsSet, subExpression) {
const mqList = this._topicSubscribeInfoTable.get(topic);
let hasMsg = false;
if (mqList && mqList.length) {
for (const mq of mqList) {
const item = this._processQueueTable.get(mq.key);
if (item) {
const pq = item.processQueue;
while (pq.msgCount) {
hasMsg = true;
let msgs;
if (this.parallelConsumeLimit > pq.msgCount) {
msgs = pq.msgList.slice(0, pq.msgCount);
} else {
msgs = pq.msgList.slice(0, this.parallelConsumeLimit);
}
// 并发消费任务
const consumeTasks = [];
for (const msg of msgs) {
const handler = this._subscriptions.get(msg.topic).handler;
if (!msg.tags || !needFilter || tagsSet.includes(msg.tags)) {
consumeTasks.push(this.consumeSingleMsg(handler, msg, mq, pq));
} else {
this.logger.debug('[MQPushConsumer] message filter by tags=, msg.tags=%s', subExpression, msg.tags);
}
}
// 必须全部成功
try {
await Promise.all(consumeTasks);
} catch (err) {
continue;
}
// 注意这里必须是批量确认
const offset = pq.remove(msgs.length);
if (offset >= 0) {
this._offsetStore.updateOffset(mq, offset, true);
}
}
}
}
}

if (!hasMsg) {
await _self.await(`topic_${topic}_changed`);
if (!hasMsg) {
await this.await(`topic_${topic}_changed`);
}
}

async consumeSingleMsg(handler, msg, mq, pq) {
// 集群消费模式下,如果消费失败,反复重试
while (!this._isClosed) {
try {
await handler(msg, mq, pq);
return;
} catch (err) {
err.message = `process mq message failed, topic: ${msg.topic}, msgId: ${msg.msgId}, ${err.message}`;
this.emit('error', err);
if (this.messageModel === MessageModel.CLUSTERING) {
// 发送重试消息
try {
// delayLevel 为 0 代表由服务端控制重试间隔
await this.sendMessageBack(msg, 0, mq.brokerName, this.consumerGroup);
return;
} catch (err) {
this.emit('error', err);
this.logger.error('[MQPushConsumer] send reconsume message failed, fallback to local retry, msgId: %s', msg.msgId);
// 重试消息发送失败,本地重试
await this._sleep(5000);
}
// 本地重试情况下需要给 reconsumeTimes +1
msg.reconsumeTimes++;
} else {
this.logger.warn('[MQPushConsumer] BROADCASTING consume message failed, drop it, msgId: %s', msg.msgId);
return;
}
} catch (err) {
_self._handleError(err);
}
})();
}
}

/**
Expand Down Expand Up @@ -342,8 +413,8 @@ class MQPushConsumer extends ClientConfig {
case PullStatus.FOUND:
{
const pullRT = Date.now() - processQueue.lastPullTimestamp;
this.logger.info('[MQPushConsumer] pull message success, found new message size: %d, topic: %s, consumerGroup: %s, cost: %dms.',
pullResult.msgFoundList.length, messageQueue.topic, this.consumerGroup, pullRT);
this.logger.info('[MQPushConsumer] pull message success, found new message size: %d, topic: %s, consumerGroup: %s, messageQueue: %s cost: %dms.',
pullResult.msgFoundList.length, messageQueue.topic, this.consumerGroup, messageQueue.key, pullRT);

// submit to consumer
processQueue.putMessage(pullResult.msgFoundList);
Expand Down Expand Up @@ -417,7 +488,10 @@ class MQPushConsumer extends ClientConfig {
}

correctTagsOffset(pullRequest) {
this._offsetStore.updateOffset(pullRequest.messageQueue, pullRequest.nextOffset, true);
// 仅当已拉下的消息消费完的情况下才更新 offset
if (pullRequest.processQueue.msgCount === 0) {
this._offsetStore.updateOffset(pullRequest.messageQueue, pullRequest.nextOffset, true);
}
}

updatePullFromWhichNode(messageQueue, brokerId) {
Expand Down Expand Up @@ -639,6 +713,41 @@ class MQPushConsumer extends ClientConfig {
// todo: consume later ?
}


async sendMessageBack(msg, delayLevel, brokerName, consumerGroup) {
const brokerAddr = brokerName ? this._mqClient.findBrokerAddressInPublish(brokerName)
: msg.storeHost;
const thatConsumerGroup = consumerGroup ? consumerGroup : this.consumerGroup;
try {
await this._mqClient.consumerSendMessageBack(
brokerAddr,
msg,
thatConsumerGroup,
delayLevel,
3000,
this.options.maxReconsumeTimes);
} catch (err) {
err.mesasge = 'sendMessageBack() occurred an exception, ' + thatConsumerGroup + ', ' + err.mesasge;
this._handleError(err);

let newMsg;
if (MixAll.isRetryTopic(msg.topic)) {
newMsg = msg;
} else {
newMsg = new Message(MixAll.getRetryTopic(thatConsumerGroup), '', msg.body);
newMsg.flag = msg.flag;
newMsg.properties = msg.properties;
newMsg.originMessageId = msg.originMessageId || msg.msgId;
newMsg.retryTopic = msg.topic;
newMsg.properties[MessageConst.PROPERTY_MAX_RECONSUME_TIMES] = this.options.maxReconsumeTimes;
}

newMsg.properties[MessageConst.PROPERTY_RECONSUME_TIME] = String(msg.reconsumeTimes + 1);
newMsg.delayTimeLevel = 3 + msg.reconsumeTimes;
await (await MQProducer.getDefaultProducer()).send(newMsg);
}
}

// * viewMessage(msgId) {
// const info = MessageDecoder.decodeMessageId(msgId);
// return yield this._mqClient.viewMessage(info.address, Number(info.offset.toString()), 3000);
Expand Down
24 changes: 24 additions & 0 deletions lib/message/message.js
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,30 @@ class Message {
this.properties[MessageConst.PROPERTY_KEYS] = val.join(MessageConst.KEY_SEPARATOR).trim();
}

/**
* 原始消息 Id
* @property {String} Message#originMessageId
*/
get originMessageId() {
return this.properties && this.properties[MessageConst.PROPERTY_ORIGIN_MESSAGE_ID];
}

set originMessageId(val) {
this.properties[MessageConst.PROPERTY_ORIGIN_MESSAGE_ID] = val;
}

/**
* 重试 Topic
* @property {String} Message#retryTopic
*/
get retryTopic() {
return this.properties && this.properties[MessageConst.PROPERTY_RETRY_TOPIC];
}

set retryTopic(val) {
this.properties[MessageConst.PROPERTY_RETRY_TOPIC] = val;
}

/**
* 消息延时投递时间级别,0表示不延时,大于0表示特定延时级别(具体级别在服务器端定义)
* @property {Number} Message#delayTimeLevel
Expand Down
1 change: 1 addition & 0 deletions lib/message/message_const.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,6 @@ exports.PROPERTY_TRANSFER_FLAG = 'TRANSFER_FLAG';
exports.PROPERTY_CORRECTION_FLAG = 'CORRECTION_FLAG';
exports.PROPERTY_MQ2_FLAG = 'MQ2_FLAG';
exports.PROPERTY_RECONSUME_TIME = 'RECONSUME_TIME';
exports.PROPERTY_MAX_RECONSUME_TIMES = 'MAX_RECONSUME_TIMES';

exports.KEY_SEPARATOR = ' ';
15 changes: 15 additions & 0 deletions lib/mix_all.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
exports.DEFAULT_TOPIC = 'TBW102';
exports.DEFAULT_PRODUCER_GROUP = 'DEFAULT_PRODUCER';
exports.DEFAULT_CONSUMER_GROUP = 'DEFAULT_CONSUMER';
exports.CLIENT_INNER_PRODUCER_GROUP = 'CLIENT_INNER_PRODUCER';

exports.DEFAULT_CHARSET = 'UTF-8';
exports.MASTER_ID = 0;
Expand All @@ -11,6 +12,20 @@ exports.RETRY_GROUP_TOPIC_PREFIX = '%RETRY%';
// 为每个Consumer Group建立一个默认的Topic,前缀 + GroupName,用来保存重试多次都失败,接下来不再重试的消息
exports.DLQ_GROUP_TOPIC_PREFIX = '%DLQ%';

/**
* 获取 RETRY_TOPIC
* @param {string} consumerGroup consumerGroup
* @return {string} %RETRY%+consumerGroup
*/
exports.getRetryTopic = consumerGroup => {
return exports.RETRY_GROUP_TOPIC_PREFIX + consumerGroup;
};

/**
* 判断是否为 RETRY_TOPIC
* @param {String} topic topic
* @return {boolean} ret
*/
exports.isRetryTopic = topic => {
return topic && topic.startsWith(exports.RETRY_GROUP_TOPIC_PREFIX);
};
2 changes: 1 addition & 1 deletion lib/mq_client.js
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ class MQClient extends MQClientAPI {
/**
* regitser consumer
* @param {String} group - consumer group name
* @param {Comsumer} consumer - consumer instance
* @param {Consumer} consumer - consumer instance
* @return {void}
*/
registerConsumer(group, consumer) {
Expand Down

0 comments on commit 9acff9b

Please sign in to comment.