diff --git a/.gitignore b/.gitignore index 52bdb09..4b16f25 100644 --- a/.gitignore +++ b/.gitignore @@ -28,3 +28,4 @@ node_modules example/.config.js .nyc_output/ +.idea diff --git a/lib/consumer/mq_push_consumer.js b/lib/consumer/mq_push_consumer.js index 1dd05aa..beaeca5 100644 --- a/lib/consumer/mq_push_consumer.js +++ b/lib/consumer/mq_push_consumer.js @@ -6,6 +6,7 @@ const utils = require('../utils'); const logger = require('../logger'); const MixAll = require('../mix_all'); const MQClient = require('../mq_client'); +const MQProducer = require('../producer/mq_producer'); const sleep = require('mz-modules/sleep'); const ClientConfig = require('../client_config'); const ProcessQueue = require('../process_queue'); @@ -19,6 +20,8 @@ const LocalFileOffsetStore = require('../store/local_file'); const LocalMemoryOffsetStore = require('../store/local_memory'); const RemoteBrokerOffsetStore = require('../store/remote_broker'); const AllocateMessageQueueAveragely = require('./rebalance/allocate_message_queue_averagely'); +const Message = require('../message/message'); +const MessageConst = require('../message/message_const'); const defaultOptions = { logger, @@ -41,14 +44,19 @@ const defaultOptions = { pullInterval: 0, // 拉取消息的频率, 如果为了降低拉取速度,可以设置大于0的值 consumeMessageBatchMaxSize: 1, // 消费一批消息,最大数 pullBatchSize: 32, // 拉消息,一次拉多少条 + parallelConsumeLimit: 1, // 并发消费消息限制 postSubscriptionWhenPull: true, // 是否每次拉消息时,都上传订阅关系 allocateMessageQueueStrategy: new AllocateMessageQueueAveragely(), // 队列分配算法,应用可重写 + maxReconsumeTimes: 16, // 最大重试次数 }; class MQPushConsumer extends ClientConfig { constructor(options) { assert(options && options.consumerGroup, '[MQPushConsumer] options.consumerGroup is required'); - super(Object.assign({ initMethod: 'init' }, defaultOptions, options)); + const mergedOptions = Object.assign({ initMethod: 'init' }, defaultOptions, options); + assert(mergedOptions.parallelConsumeLimit <= mergedOptions.pullBatchSize, + '[MQPushConsumer] options.parallelConsumeLimit must lte pullBatchSize'); + super(mergedOptions); // @example: // pullFromWhichNodeTable => { @@ -85,6 +93,10 @@ class MQPushConsumer extends ClientConfig { return this._processQueueTable; } + get parallelConsumeLimit() { + return this.options.parallelConsumeLimit; + } + get consumerGroup() { return this.options.consumerGroup; } @@ -107,10 +119,30 @@ class MQPushConsumer extends ClientConfig { async init() { this._mqClient.registerConsumer(this.consumerGroup, this); + await MQProducer.getDefaultProducer(this.options); await this._mqClient.ready(); await this._offsetStore.load(); this.logger.info('[mq:consumer] consumer started'); this._inited = true; + + // 订阅重试 TOPIC + if (this.messageModel === MessageModel.CLUSTERING) { + const retryTopic = MixAll.getRetryTopic(this.consumerGroup); + this.subscribe(retryTopic, '*', async msg => { + const originTopic = msg.retryTopic; + const originMsgId = msg.originMessageId; + const subscription = this._subscriptions.get(originTopic) || {}; + const handler = subscription.handler; + if (!MixAll.isRetryTopic(originTopic) && handler) { + await handler(msg); + } else { + this.logger.warn('[MQPushConsumer] retry message no handler, originTopic: %s, originMsgId: %s, msgId: %s', + originTopic, + originMsgId, + msg.msgId); + } + }); + } } /** @@ -148,79 +180,118 @@ class MQPushConsumer extends ClientConfig { * @return {void} */ subscribe(topic, subExpression, handler) { - const _self = this; - if (arguments.length === 2) { handler = subExpression; subExpression = null; } assert(is.asyncFunction(handler), '[MQPushConsumer] handler should be a asyncFunction'); - assert(!_self.subscriptions.has(topic), `[MQPushConsumer] ONLY one handler allowed for topic=${topic}`); + assert(!this.subscriptions.has(topic), `[MQPushConsumer] ONLY one handler allowed for topic=${topic}`); - const subscriptionData = _self.buildSubscriptionData(_self.consumerGroup, topic, subExpression); + const subscriptionData = this.buildSubscriptionData(this.consumerGroup, topic, subExpression); const tagsSet = subscriptionData.tagsSet; const needFilter = !!tagsSet.length; - _self.subscriptions.set(topic, { + this.subscriptions.set(topic, { handler, subscriptionData, }); (async () => { try { - await _self.ready(); + await this.ready(); // 如果 topic 没有路由信息,先更新一下 - if (!_self._topicSubscribeInfoTable.has(topic)) { - await _self._mqClient.updateAllTopicRouterInfo(); - await _self._mqClient.sendHeartbeatToAllBroker(); - await _self._mqClient.doRebalance(); + if (!this._topicSubscribeInfoTable.has(topic)) { + await this._mqClient.updateAllTopicRouterInfo(); + await this._mqClient.sendHeartbeatToAllBroker(); + await this._mqClient.doRebalance(); } - while (!_self._isClosed && _self.subscriptions.has(topic)) { - const mqList = _self._topicSubscribeInfoTable.get(topic); - let hasMsg = false; - if (mqList && mqList.length) { - for (const mq of mqList) { - const item = _self._processQueueTable.get(mq.key); - if (item) { - const pq = item.processQueue; - while (pq.msgCount) { - hasMsg = true; - const msg = pq.msgList[0]; - if (!msg.tags || !needFilter || tagsSet.includes(msg.tags)) { - try { - await handler(msg, mq, pq); - } catch (err) { - err.message = `process mq message failed, topic: ${msg.topic}, msgId: ${msg.msgId}, ${err.message}`; - _self.emit('error', err); - if (_self.messageModel === MessageModel.CLUSTERING) { - // TODO: support retry message - msg.reconsumeTimes++; - await _self._sleep(5000); - continue; - } else { - _self.logger.warn('[MQPushConsumer] BROADCASTING consume message failed, drop it, msgId: %s', msg.msgId); - } - } - } else { - _self.logger.debug('[MQPushConsumer] message filter by tags=, msg.tags=%s', subExpression, msg.tags); - } - const offset = pq.remove(); - if (offset >= 0) { - _self._offsetStore.updateOffset(mq, offset, true); - } - } + // 消息消费循环 + while (!this._isClosed && this.subscriptions.has(topic)) { + await this._consumeMessageLoop(topic, needFilter, tagsSet, subExpression); + } + + } catch (err) { + this._handleError(err); + } + })(); + } + + async _consumeMessageLoop(topic, needFilter, tagsSet, subExpression) { + const mqList = this._topicSubscribeInfoTable.get(topic); + let hasMsg = false; + if (mqList && mqList.length) { + for (const mq of mqList) { + const item = this._processQueueTable.get(mq.key); + if (item) { + const pq = item.processQueue; + while (pq.msgCount) { + hasMsg = true; + let msgs; + if (this.parallelConsumeLimit > pq.msgCount) { + msgs = pq.msgList.slice(0, pq.msgCount); + } else { + msgs = pq.msgList.slice(0, this.parallelConsumeLimit); + } + // 并发消费任务 + const consumeTasks = []; + for (const msg of msgs) { + const handler = this._subscriptions.get(msg.topic).handler; + if (!msg.tags || !needFilter || tagsSet.includes(msg.tags)) { + consumeTasks.push(this.consumeSingleMsg(handler, msg, mq, pq)); + } else { + this.logger.debug('[MQPushConsumer] message filter by tags=, msg.tags=%s', subExpression, msg.tags); } } + // 必须全部成功 + try { + await Promise.all(consumeTasks); + } catch (err) { + continue; + } + // 注意这里必须是批量确认 + const offset = pq.remove(msgs.length); + if (offset >= 0) { + this._offsetStore.updateOffset(mq, offset, true); + } } + } + } + } - if (!hasMsg) { - await _self.await(`topic_${topic}_changed`); + if (!hasMsg) { + await this.await(`topic_${topic}_changed`); + } + } + + async consumeSingleMsg(handler, msg, mq, pq) { + // 集群消费模式下,如果消费失败,反复重试 + while (!this._isClosed) { + try { + await handler(msg, mq, pq); + return; + } catch (err) { + err.message = `process mq message failed, topic: ${msg.topic}, msgId: ${msg.msgId}, ${err.message}`; + this.emit('error', err); + if (this.messageModel === MessageModel.CLUSTERING) { + // 发送重试消息 + try { + // delayLevel 为 0 代表由服务端控制重试间隔 + await this.sendMessageBack(msg, 0, mq.brokerName, this.consumerGroup); + return; + } catch (err) { + this.emit('error', err); + this.logger.error('[MQPushConsumer] send reconsume message failed, fallback to local retry, msgId: %s', msg.msgId); + // 重试消息发送失败,本地重试 + await this._sleep(5000); } + // 本地重试情况下需要给 reconsumeTimes +1 + msg.reconsumeTimes++; + } else { + this.logger.warn('[MQPushConsumer] BROADCASTING consume message failed, drop it, msgId: %s', msg.msgId); + return; } - } catch (err) { - _self._handleError(err); } - })(); + } } /** @@ -342,8 +413,8 @@ class MQPushConsumer extends ClientConfig { case PullStatus.FOUND: { const pullRT = Date.now() - processQueue.lastPullTimestamp; - this.logger.info('[MQPushConsumer] pull message success, found new message size: %d, topic: %s, consumerGroup: %s, cost: %dms.', - pullResult.msgFoundList.length, messageQueue.topic, this.consumerGroup, pullRT); + this.logger.info('[MQPushConsumer] pull message success, found new message size: %d, topic: %s, consumerGroup: %s, messageQueue: %s cost: %dms.', + pullResult.msgFoundList.length, messageQueue.topic, this.consumerGroup, messageQueue.key, pullRT); // submit to consumer processQueue.putMessage(pullResult.msgFoundList); @@ -417,7 +488,10 @@ class MQPushConsumer extends ClientConfig { } correctTagsOffset(pullRequest) { - this._offsetStore.updateOffset(pullRequest.messageQueue, pullRequest.nextOffset, true); + // 仅当已拉下的消息消费完的情况下才更新 offset + if (pullRequest.processQueue.msgCount === 0) { + this._offsetStore.updateOffset(pullRequest.messageQueue, pullRequest.nextOffset, true); + } } updatePullFromWhichNode(messageQueue, brokerId) { @@ -639,6 +713,41 @@ class MQPushConsumer extends ClientConfig { // todo: consume later ? } + + async sendMessageBack(msg, delayLevel, brokerName, consumerGroup) { + const brokerAddr = brokerName ? this._mqClient.findBrokerAddressInPublish(brokerName) + : msg.storeHost; + const thatConsumerGroup = consumerGroup ? consumerGroup : this.consumerGroup; + try { + await this._mqClient.consumerSendMessageBack( + brokerAddr, + msg, + thatConsumerGroup, + delayLevel, + 3000, + this.options.maxReconsumeTimes); + } catch (err) { + err.mesasge = 'sendMessageBack() occurred an exception, ' + thatConsumerGroup + ', ' + err.mesasge; + this._handleError(err); + + let newMsg; + if (MixAll.isRetryTopic(msg.topic)) { + newMsg = msg; + } else { + newMsg = new Message(MixAll.getRetryTopic(thatConsumerGroup), '', msg.body); + newMsg.flag = msg.flag; + newMsg.properties = msg.properties; + newMsg.originMessageId = msg.originMessageId || msg.msgId; + newMsg.retryTopic = msg.topic; + newMsg.properties[MessageConst.PROPERTY_MAX_RECONSUME_TIMES] = this.options.maxReconsumeTimes; + } + + newMsg.properties[MessageConst.PROPERTY_RECONSUME_TIME] = String(msg.reconsumeTimes + 1); + newMsg.delayTimeLevel = 3 + msg.reconsumeTimes; + await (await MQProducer.getDefaultProducer()).send(newMsg); + } + } + // * viewMessage(msgId) { // const info = MessageDecoder.decodeMessageId(msgId); // return yield this._mqClient.viewMessage(info.address, Number(info.offset.toString()), 3000); diff --git a/lib/message/message.js b/lib/message/message.js index 35956ab..07eb25a 100644 --- a/lib/message/message.js +++ b/lib/message/message.js @@ -67,6 +67,30 @@ class Message { this.properties[MessageConst.PROPERTY_KEYS] = val.join(MessageConst.KEY_SEPARATOR).trim(); } + /** + * 原始消息 Id + * @property {String} Message#originMessageId + */ + get originMessageId() { + return this.properties && this.properties[MessageConst.PROPERTY_ORIGIN_MESSAGE_ID]; + } + + set originMessageId(val) { + this.properties[MessageConst.PROPERTY_ORIGIN_MESSAGE_ID] = val; + } + + /** + * 重试 Topic + * @property {String} Message#retryTopic + */ + get retryTopic() { + return this.properties && this.properties[MessageConst.PROPERTY_RETRY_TOPIC]; + } + + set retryTopic(val) { + this.properties[MessageConst.PROPERTY_RETRY_TOPIC] = val; + } + /** * 消息延时投递时间级别,0表示不延时,大于0表示特定延时级别(具体级别在服务器端定义) * @property {Number} Message#delayTimeLevel diff --git a/lib/message/message_const.js b/lib/message/message_const.js index 12217d1..e25f7f5 100644 --- a/lib/message/message_const.js +++ b/lib/message/message_const.js @@ -23,5 +23,6 @@ exports.PROPERTY_TRANSFER_FLAG = 'TRANSFER_FLAG'; exports.PROPERTY_CORRECTION_FLAG = 'CORRECTION_FLAG'; exports.PROPERTY_MQ2_FLAG = 'MQ2_FLAG'; exports.PROPERTY_RECONSUME_TIME = 'RECONSUME_TIME'; +exports.PROPERTY_MAX_RECONSUME_TIMES = 'MAX_RECONSUME_TIMES'; exports.KEY_SEPARATOR = ' '; diff --git a/lib/mix_all.js b/lib/mix_all.js index fc6005b..afa4b57 100644 --- a/lib/mix_all.js +++ b/lib/mix_all.js @@ -3,6 +3,7 @@ exports.DEFAULT_TOPIC = 'TBW102'; exports.DEFAULT_PRODUCER_GROUP = 'DEFAULT_PRODUCER'; exports.DEFAULT_CONSUMER_GROUP = 'DEFAULT_CONSUMER'; +exports.CLIENT_INNER_PRODUCER_GROUP = 'CLIENT_INNER_PRODUCER'; exports.DEFAULT_CHARSET = 'UTF-8'; exports.MASTER_ID = 0; @@ -11,6 +12,20 @@ exports.RETRY_GROUP_TOPIC_PREFIX = '%RETRY%'; // 为每个Consumer Group建立一个默认的Topic,前缀 + GroupName,用来保存重试多次都失败,接下来不再重试的消息 exports.DLQ_GROUP_TOPIC_PREFIX = '%DLQ%'; +/** + * 获取 RETRY_TOPIC + * @param {string} consumerGroup consumerGroup + * @return {string} %RETRY%+consumerGroup + */ exports.getRetryTopic = consumerGroup => { return exports.RETRY_GROUP_TOPIC_PREFIX + consumerGroup; }; + +/** + * 判断是否为 RETRY_TOPIC + * @param {String} topic topic + * @return {boolean} ret + */ +exports.isRetryTopic = topic => { + return topic && topic.startsWith(exports.RETRY_GROUP_TOPIC_PREFIX); +}; diff --git a/lib/mq_client.js b/lib/mq_client.js index 1c621b3..77d7022 100644 --- a/lib/mq_client.js +++ b/lib/mq_client.js @@ -114,7 +114,7 @@ class MQClient extends MQClientAPI { /** * regitser consumer * @param {String} group - consumer group name - * @param {Comsumer} consumer - consumer instance + * @param {Consumer} consumer - consumer instance * @return {void} */ registerConsumer(group, consumer) { diff --git a/lib/mq_client_api.js b/lib/mq_client_api.js index 894d121..3f3a258 100644 --- a/lib/mq_client_api.js +++ b/lib/mq_client_api.js @@ -466,6 +466,7 @@ class MQClientAPI extends RemotingClient { i: requestHeader.properties, j: requestHeader.reconsumeTimes, k: requestHeader.unitMode, + l: requestHeader.maxReconsumeTimes, }; const request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE_V2, requestHeaderV2); request.body = msg.body; @@ -500,6 +501,36 @@ class MQClientAPI extends RemotingClient { }; } + /** + * consumer send message back + * @param {String} brokerAddr - broker address + * @param {Message} msg - message object + * @param {String} consumerGroup - consumer group + * @param {Number} delayLevel - delay level + * @param {Number} timeoutMillis - timeout in millis + * @param {Number} maxConsumeRetryTimes - max retry times + */ + async consumerSendMessageBack(brokerAddr, msg, consumerGroup, delayLevel, timeoutMillis, maxConsumeRetryTimes) { + const requestHeader = { + offset: msg.commitLogOffset, + group: consumerGroup, + delayLevel, + originMsgId: msg.msgId, + originTopic: msg.topic, + unitMode: false, + maxReconsumeTimes: maxConsumeRetryTimes, + }; + const request = RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader); + const response = await this.invoke(brokerAddr, request, timeoutMillis); + switch (response.code) { + case ResponseCode.SUCCESS: + return; + default: + this._defaultHandler(request, response); + break; + } + } + // * viewMessage(brokerAddr, phyoffset, timeoutMillis) { // const requestHeader = { // offset: phyoffset, diff --git a/lib/producer/mq_producer.js b/lib/producer/mq_producer.js index 6d1e5ad..aae860d 100644 --- a/lib/producer/mq_producer.js +++ b/lib/producer/mq_producer.js @@ -78,6 +78,7 @@ class MQProducer extends ClientConfig { */ async init() { // this._topicPublishInfoTable.set(this.createTopicKey, new TopicPublishInfo()); + await MQProducer.getDefaultProducer(this.options); this._mqClient.registerProducer(this.producerGroup, this); await this._mqClient.ready(); this.logger.info('[mq:producer] producer started'); @@ -295,6 +296,11 @@ class MQProducer extends ClientConfig { requestHeader.reconsumeTimes = parseInt(reconsumeTimes, 10); delete msg.properties[MessageConst.PROPERTY_RECONSUME_TIME]; } + const maxReconsumeTimes = msg.properties[MessageConst.PROPERTY_MAX_RECONSUME_TIMES]; + if (maxReconsumeTimes) { + requestHeader.maxReconsumeTimes = parseInt(maxReconsumeTimes, 10); + delete msg.properties[MessageConst.PROPERTY_MAX_RECONSUME_TIMES]; + } } return await this._mqClient.sendMessage(brokerAddr, messageQueue.brokerName, msg, requestHeader, this.options.sendMsgTimeout); @@ -323,6 +329,22 @@ class MQProducer extends ClientConfig { } return false; } + + /** + * 获取默认 Producer 实例 + * @param {Object} options 配置 + * @return {MQProducer} 默认 Producer 实例 + */ + static async getDefaultProducer(options = {}) { + if (!MQProducer.defaultProducer) { + const opts = Object.assign({}, options); + opts.producerGroup = MixAll.CLIENT_INNER_PRODUCER_GROUP; + MQProducer.defaultProducer = new MQProducer(opts); + await MQProducer.defaultProducer.ready(); + return MQProducer.defaultProducer; + } + return MQProducer.defaultProducer; + } } module.exports = MQProducer; diff --git a/lib/utils/index.js b/lib/utils/index.js index d7fe447..715abad 100644 --- a/lib/utils/index.js +++ b/lib/utils/index.js @@ -3,6 +3,7 @@ /* eslint no-bitwise:0 */ const zlib = require('zlib'); +const MixAll = require('../mix_all'); const is = require('is-type-of'); // 压缩 @@ -53,6 +54,14 @@ exports.hashCode = function(str) { return hash & 4294967295; }; +// 重置重试消息的 Topic +exports.resetRetryTopic = function(msg, consumerGroup) { + const groupTopic = MixAll.getRetryTopic(consumerGroup); + if (msg.topic === groupTopic) { + msg.topic = msg.retryTopic; + } +}; + function zeroize(value, length) { if (!length) { length = 2; diff --git a/test/index.test.js b/test/index.test.js index 85b9e17..38529bd 100644 --- a/test/index.test.js +++ b/test/index.test.js @@ -12,6 +12,7 @@ const Producer = require('../').Producer; const sleep = require('mz-modules/sleep'); const rimraf = require('mz-modules/rimraf'); const config = require('../example/config'); +const MixAll = require('../lib/mix_all'); const TOPIC = 'GXCSOCCER'; @@ -146,6 +147,7 @@ describe('test/index.test.js', () => { }); it('should updateProcessQueueTableInRebalance ok', async () => { + await sleep(3000); await consumer.rebalanceByTopic(TOPIC); const size = consumer.processQueueTable.size; assert(size > 0); @@ -160,6 +162,8 @@ describe('test/index.test.js', () => { await consumer.updateProcessQueueTableInRebalance(TOPIC, []); assert(consumer.processQueueTable.size === 0); + await consumer.updateProcessQueueTableInRebalance(MixAll.getRetryTopic(consumer.consumerGroup), []); + assert(consumer.processQueueTable.size === 0); }); it('should computePullFromWhere ok', async () => { @@ -356,7 +360,7 @@ describe('test/index.test.js', () => { describe('process exception', () => { let consumer; let producer; - before(async () => { + beforeEach(async () => { consumer = new Consumer(Object.assign({ httpclient, rebalanceInterval: 2000, @@ -368,15 +372,96 @@ describe('test/index.test.js', () => { await producer.ready(); }); - after(async () => { + afterEach(async () => { await producer.close(); await consumer.close(); }); - it('should retry if process failed', async () => { + it('should not correctTags if process queue not empty', () => { + let done = false; + mm(consumer._offsetStore, 'updateOffset', () => { + done = true; + }); + consumer.correctTagsOffset({ + processQueue: { + msgCount: 0, + }, + }); + assert(done); + done = true; + mm.restore(); + mm(consumer._offsetStore, 'updateOffset', () => { + done = false; + }); + consumer.correctTagsOffset({ + processQueue: { + msgCount: 1, + }, + }); + assert(done); + mm.restore(); + }); + + it('should retry(consume later) if process failed', async () => { let msgId; consumer.subscribe(TOPIC, '*', async msg => { - console.log('message receive ------------> ', msg.body.toString()); + console.warn('message receive ------------> ', msg.body.toString()); + if (msg.msgId === msgId || msg.originMessageId === msgId) { + assert(msg.body.toString() === 'Hello MetaQ !!! '); + if (msg.reconsumeTimes === 0) { + throw new Error('mock error'); + } + consumer.emit('*'); + } + }); + + await sleep(5000); + + const msg = new Message(TOPIC, // topic + 'TagA', // tag + 'Hello MetaQ !!! ' // body + ); + const sendResult = await producer.send(msg); + assert(sendResult && sendResult.msgId); + msgId = sendResult.msgId; + await consumer.await('*'); + }); + + it('should retry(retry message) if process failed', async () => { + let msgId; + mm(consumer._mqClient, 'consumerSendMessageBack', async () => { + throw new Error('mock error'); + }); + consumer.subscribe(TOPIC, '*', async msg => { + console.warn('message receive ------------> ', msg.body.toString()); + if (msg.msgId === msgId || msg.originMessageId === msgId) { + assert(msg.body.toString() === 'Hello MetaQ !!! '); + if (msg.reconsumeTimes === 0) { + throw new Error('mock error'); + } + consumer.emit('*'); + } + }); + + await sleep(5000); + + const msg = new Message(TOPIC, // topic + 'TagA', // tag + 'Hello MetaQ !!! ' // body + ); + const sendResult = await producer.send(msg); + assert(sendResult && sendResult.msgId); + msgId = sendResult.msgId; + await consumer.await('*'); + }); + + it('should retry(fall to local) if process failed', async () => { + let msgId; + mm(consumer, 'sendMessageBack', async () => { + throw new Error('mock error'); + }); + consumer.subscribe(TOPIC, '*', async msg => { + console.warn('message receive ------------> ', msg.body.toString()); if (msg.msgId === msgId) { assert(msg.body.toString() === 'Hello MetaQ !!! '); if (msg.reconsumeTimes === 0) { diff --git a/test/mq_client.test.js b/test/mq_client.test.js index f7cff04..626bdb1 100644 --- a/test/mq_client.test.js +++ b/test/mq_client.test.js @@ -12,12 +12,13 @@ describe('test/mq_client.test.js', () => { const config = new ClientConfig(Object.assign({ httpclient }, require('../example/config'))); let client; - before(() => { + before(async () => { client = MQClient.getAndCreateMQClient(config); + client.unregisterProducer('CLIENT_INNER_PRODUCER'); return client.ready(); }); - after(async () => client.close()); + after(() => client.close()); it('should mqclient is singleton', () => { assert(MQClient.getAndCreateMQClient(config) === client); @@ -71,7 +72,7 @@ describe('test/mq_client.test.js', () => { it('should updateAllTopicRouterInfo ok', async () => { const subscriptions = new Map(); - subscriptions.set(consumerGroup, { + subscriptions.set('TopicTest', { topic: 'TopicTest', subString: '*', classFilterMode: false, @@ -93,11 +94,15 @@ describe('test/mq_client.test.js', () => { }); client.registerProducer('xxx', null); - client.updateAllTopicRouterInfo(); + await client.updateAllTopicRouterInfo(); await client.unregisterConsumer(consumerGroup); await client.unregisterProducer(producerGroup); - await client.unregisterConsumer('xxx'); + try { + await client.unregisterConsumer('xxx'); + } catch (err) { + assert(err.message.includes('resource xxx not created')); + } await client.unregisterProducer('xxx'); }); diff --git a/test/utils/index.test.js b/test/utils/index.test.js index f904ad4..42d04e5 100644 --- a/test/utils/index.test.js +++ b/test/utils/index.test.js @@ -8,4 +8,12 @@ describe('test/utils/index.test.js', () => { const d = util.parseDate('2018112000000'); assert.equal(d.getTime() - d.getTimezoneOffset() * 60 * 1000, 1542672000000); }); + + it('should getRetryTopic ok', () => { + const msg = { + retryTopic: 'xxx', + }; + util.resetRetryTopic(msg, 'yyy'); + assert.equal(msg.retryTopic, 'xxx'); + }); });