Skip to content

Commit

Permalink
Merge pull request #466 from zhangjidi2016/fix_broadcast
Browse files Browse the repository at this point in the history
[ISSUE #458]Fixed the problem of cannot consume previous messages in broadcast consumption mode
  • Loading branch information
panzhi33 committed Jul 23, 2022
2 parents 1aae0cd + a11049d commit c3b851f
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,6 @@ private void initRocketMQPushConsumer() throws MQClientException {
resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));
}
consumer.setNamespace(namespace);
consumer.setInstanceName(RocketMQUtil.getInstanceName(nameServer));

String customizedNameServer = this.applicationContext.getEnvironment().resolveRequiredPlaceholders(this.rocketMQMessageListener.nameServer());
if (customizedNameServer != null) {
Expand All @@ -639,9 +638,11 @@ private void initRocketMQPushConsumer() throws MQClientException {
switch (messageModel) {
case BROADCASTING:
consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING);
consumer.setInstanceName(Long.toString(nameServer.hashCode()));
break;
case CLUSTERING:
consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING);
consumer.setInstanceName(RocketMQUtil.getInstanceName(nameServer));
break;
default:
throw new IllegalArgumentException("Property 'messageModel' was wrong.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ public static String getInstanceName(String identify) {
instanceName.append(identify, 0, maxLength)
.append(identify.hashCode());
} else {
instanceName.append(identify);
instanceName.append(identify.hashCode());
}
instanceName.append(separator).append(UtilAll.getPid())
.append(separator).append(System.nanoTime());
Expand All @@ -312,7 +312,6 @@ public static DefaultLitePullConsumer createDefaultLitePullConsumer(String nameS
litePullConsumer = new DefaultLitePullConsumer(groupName);
}
litePullConsumer.setNamesrvAddr(nameServer);
litePullConsumer.setInstanceName(RocketMQUtil.getInstanceName(nameServer));
litePullConsumer.setPullBatchSize(pullBatchSize);
if (accessChannel != null) {
litePullConsumer.setAccessChannel(AccessChannel.valueOf(accessChannel));
Expand All @@ -322,9 +321,11 @@ public static DefaultLitePullConsumer createDefaultLitePullConsumer(String nameS
switch (messageModel) {
case BROADCASTING:
litePullConsumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING);
litePullConsumer.setInstanceName(Long.toString(nameServer.hashCode()));
break;
case CLUSTERING:
litePullConsumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING);
litePullConsumer.setInstanceName(RocketMQUtil.getInstanceName(nameServer));
break;
default:
throw new IllegalArgumentException("Property 'messageModel' was wrong.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,8 @@ public void testConvertToSpringMessage() {
@Test
public void testGetInstanceName() {
String nameServer = "127.0.0.1:9876";
String expected = "127.0.0.1:9876@";
assertEquals(expected + UtilAll.getPid(), removeNanoTime(RocketMQUtil.getInstanceName(nameServer)));
String expected = "127.0.0.1:9876";
assertEquals(expected.hashCode() + "@" + UtilAll.getPid(), removeNanoTime(RocketMQUtil.getInstanceName(nameServer)));

nameServer = "I-am-a-very-very-long-domain-name-1:9876;I-am-a-very-very-long-domain-name-2:9876;I-am-a-very-very-long-domain-name-3:9876";
expected = "I-am-a-very-very-long-domain-name-1:9876;I-am-a-very-very-long-domain-name-2:9876;I-am-a-very-very-l-335144505@";
Expand Down

0 comments on commit c3b851f

Please sign in to comment.