Skip to content

Commit

Permalink
fix: namespace logic (#78)
Browse files Browse the repository at this point in the history
  • Loading branch information
gxcsoccer committed Apr 16, 2019
1 parent ac0a146 commit cb385e3
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 8 deletions.
2 changes: 1 addition & 1 deletion example/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,6 @@ module.exports = {
consumerGroup: 'GID_alions',
topic: 'TP_alions_test_topic',
// https://help.aliyun.com/document_detail/102895.html 阿里云产品更新,支持实例化
// nameSrv: '112.124.141.191:80',
nameSrv: 'onsaddr.mq-internet-access.mq-internet.aliyuncs.com:80',
onsAddr: 'http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet',
};
9 changes: 9 additions & 0 deletions lib/client_config.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

const Base = require('sdk-base');
const address = require('address');
const MixAll = require('./mix_all');

const defaultOptions = {
instanceName: 'DEFAULT',
Expand Down Expand Up @@ -67,6 +68,14 @@ class ClientConfig extends Base {
return this.options.namespace;
}

formatTopic(topic) {
if (this.namespace && (!topic.startsWith(this.namespace) &&
!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX))) {
topic = `${this.namespace}%${topic}`;
}
return topic;
}

/**
* 将实例名修改为进程号
*/
Expand Down
4 changes: 1 addition & 3 deletions lib/consumer/mq_push_consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -184,9 +184,7 @@ class MQPushConsumer extends ClientConfig {
*/
subscribe(topic, subExpression, handler) {
// 添加 namespace 前缀
if (this.namespace && !topic.startsWith(this.namespace)) {
topic = `${this.namespace}%${topic}`;
}
topic = this.formatTopic(topic);

if (arguments.length === 2) {
handler = subExpression;
Expand Down
4 changes: 1 addition & 3 deletions lib/producer/mq_producer.js
Original file line number Diff line number Diff line change
Expand Up @@ -193,9 +193,7 @@ class MQProducer extends ClientConfig {
throw new Error(`no publish router data for topic: ${msg.topic}`);
}

if (this.namespace && (!msg.topic.startsWith(this.namespace) || !msg.topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX))) {
msg.topic = `${this.namespace}%${msg.topic}`;
}
msg.topic = this.formatTopic(msg.topic);

const maxTimeout = this.options.sendMsgTimeout + 1000;
const timesTotal = 1 + this.options.retryTimesWhenSendFailed;
Expand Down
14 changes: 13 additions & 1 deletion test/index.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,18 @@ describe('test/index.test.js', () => {
});
assert(offset === -1);
});

it('should support namespace', () => {
mm(producer, 'namespace', 'xxx');
mm(consumer, 'namespace', 'xxx');

assert(consumer.consumerGroup === `xxx%${config.consumerGroup}`);
assert(producer.producerGroup === `xxx%${config.producerGroup}`);

assert(consumer.formatTopic(`%RETRY%${consumer.consumerGroup}`) === `%RETRY%${consumer.consumerGroup}`);
assert(producer.formatTopic('TEST_TOPIC') === 'xxx%TEST_TOPIC');
assert(consumer.formatTopic('TEST_TOPIC') === 'xxx%TEST_TOPIC');
});
});

// 广播消费
Expand Down Expand Up @@ -593,7 +605,7 @@ describe('test/index.test.js', () => {
let consumer;
let consumeTime = 0;
// 允许的误差时间
const deviationTime = 3000;
const deviationTime = 4000;

before(async () => {
producer = new Producer(Object.assign({
Expand Down

0 comments on commit cb385e3

Please sign in to comment.