diff --git a/example/config.js b/example/config.js index 2d5dd3d..41abbfb 100644 --- a/example/config.js +++ b/example/config.js @@ -12,6 +12,6 @@ module.exports = { consumerGroup: 'GID_alions', topic: 'TP_alions_test_topic', // https://help.aliyun.com/document_detail/102895.html 阿里云产品更新,支持实例化 - // nameSrv: '112.124.141.191:80', + nameSrv: 'onsaddr.mq-internet-access.mq-internet.aliyuncs.com:80', onsAddr: 'http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet', }; diff --git a/lib/client_config.js b/lib/client_config.js index bff3e73..73ff67d 100644 --- a/lib/client_config.js +++ b/lib/client_config.js @@ -2,6 +2,7 @@ const Base = require('sdk-base'); const address = require('address'); +const MixAll = require('./mix_all'); const defaultOptions = { instanceName: 'DEFAULT', @@ -67,6 +68,14 @@ class ClientConfig extends Base { return this.options.namespace; } + formatTopic(topic) { + if (this.namespace && (!topic.startsWith(this.namespace) && + !topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX))) { + topic = `${this.namespace}%${topic}`; + } + return topic; + } + /** * 将实例名修改为进程号 */ diff --git a/lib/consumer/mq_push_consumer.js b/lib/consumer/mq_push_consumer.js index 540126c..21676a4 100644 --- a/lib/consumer/mq_push_consumer.js +++ b/lib/consumer/mq_push_consumer.js @@ -184,9 +184,7 @@ class MQPushConsumer extends ClientConfig { */ subscribe(topic, subExpression, handler) { // 添加 namespace 前缀 - if (this.namespace && !topic.startsWith(this.namespace)) { - topic = `${this.namespace}%${topic}`; - } + topic = this.formatTopic(topic); if (arguments.length === 2) { handler = subExpression; diff --git a/lib/producer/mq_producer.js b/lib/producer/mq_producer.js index f89d4bd..b8c9558 100644 --- a/lib/producer/mq_producer.js +++ b/lib/producer/mq_producer.js @@ -193,9 +193,7 @@ class MQProducer extends ClientConfig { throw new Error(`no publish router data for topic: ${msg.topic}`); } - if (this.namespace && (!msg.topic.startsWith(this.namespace) || !msg.topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX))) { - msg.topic = `${this.namespace}%${msg.topic}`; - } + msg.topic = this.formatTopic(msg.topic); const maxTimeout = this.options.sendMsgTimeout + 1000; const timesTotal = 1 + this.options.retryTimesWhenSendFailed; diff --git a/test/index.test.js b/test/index.test.js index 5780484..0ea0349 100644 --- a/test/index.test.js +++ b/test/index.test.js @@ -212,6 +212,18 @@ describe('test/index.test.js', () => { }); assert(offset === -1); }); + + it('should support namespace', () => { + mm(producer, 'namespace', 'xxx'); + mm(consumer, 'namespace', 'xxx'); + + assert(consumer.consumerGroup === `xxx%${config.consumerGroup}`); + assert(producer.producerGroup === `xxx%${config.producerGroup}`); + + assert(consumer.formatTopic(`%RETRY%${consumer.consumerGroup}`) === `%RETRY%${consumer.consumerGroup}`); + assert(producer.formatTopic('TEST_TOPIC') === 'xxx%TEST_TOPIC'); + assert(consumer.formatTopic('TEST_TOPIC') === 'xxx%TEST_TOPIC'); + }); }); // 广播消费 @@ -593,7 +605,7 @@ describe('test/index.test.js', () => { let consumer; let consumeTime = 0; // 允许的误差时间 - const deviationTime = 3000; + const deviationTime = 4000; before(async () => { producer = new Producer(Object.assign({