Skip to content

Commit cdaec92

Browse files
thetumbledsrinath-ctds
authored andcommitted
[clean][client] Clean code for the construction of retry/dead letter topic name (apache#24082)
(cherry picked from commit a608810) (cherry picked from commit cf10ac9)
1 parent 3c6b12a commit cdaec92

File tree

5 files changed

+20
-19
lines changed

5 files changed

+20
-19
lines changed

pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerDisallowAutoCreateTopicTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,11 @@
1818
*/
1919
package org.apache.pulsar.client.api;
2020

21-
import static org.apache.pulsar.client.util.RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX;
2221
import static org.testng.Assert.assertTrue;
2322
import static org.testng.Assert.fail;
2423
import lombok.extern.slf4j.Slf4j;
2524
import org.apache.pulsar.broker.BrokerTestUtil;
25+
import org.apache.pulsar.client.util.RetryMessageUtil;
2626
import org.testng.annotations.AfterClass;
2727
import org.testng.annotations.BeforeClass;
2828
import org.testng.annotations.Test;
@@ -54,7 +54,7 @@ protected void doInitConf() throws Exception {
5454
public void testClearErrorIfRetryTopicNotExists() throws Exception {
5555
final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp_");
5656
final String subName = "sub";
57-
final String retryTopicName = topicName + "-" + subName + RETRY_GROUP_TOPIC_SUFFIX;
57+
final String retryTopicName = RetryMessageUtil.getRetryTopic(topicName, subName);
5858
admin.topics().createNonPartitionedTopic(topicName);
5959
Consumer consumer = null;
6060
try {

pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1500,8 +1500,7 @@ public void testSendTxnAckMessageToDLQ() throws Exception {
15001500

15011501
@Cleanup
15021502
Consumer<byte[]> deadLetterConsumer = pulsarClient.newConsumer()
1503-
.topic(String.format("%s-%s" + RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX,
1504-
topic, subName))
1503+
.topic(RetryMessageUtil.getDLQTopic(topic, subName))
15051504
.subscriptionType(SubscriptionType.Shared)
15061505
.deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(1).build())
15071506
.subscriptionName("test")
@@ -1536,8 +1535,7 @@ public void testSendTxnAckMessageToDLQ() throws Exception {
15361535
consumer.close();
15371536
deadLetterConsumer.close();
15381537
producer.close();
1539-
admin.topics().delete(String.format("%s-%s" + RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX,
1540-
topic, subName), true);
1538+
admin.topics().delete(RetryMessageUtil.getDLQTopic(topic, subName), true);
15411539
admin.topics().delete(topic, true);
15421540
}
15431541

@@ -1564,8 +1562,7 @@ public void testSendTxnAckBatchMessageToDLQ() throws Exception {
15641562

15651563
@Cleanup
15661564
Consumer<byte[]> deadLetterConsumer = pulsarClient.newConsumer()
1567-
.topic(String.format("%s-%s" + RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX,
1568-
topic, subName))
1565+
.topic(RetryMessageUtil.getDLQTopic(topic, subName))
15691566
.subscriptionType(SubscriptionType.Shared)
15701567
.deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(1).build())
15711568
.subscriptionName("test")
@@ -1611,8 +1608,7 @@ public void testSendTxnAckBatchMessageToDLQ() throws Exception {
16111608
consumer.close();
16121609
deadLetterConsumer.close();
16131610
producer.close();
1614-
admin.topics().delete(String.format("%s-%s" + RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX,
1615-
topic, subName), true);
1611+
admin.topics().delete(RetryMessageUtil.getDLQTopic(topic, subName), true);
16161612
admin.topics().delete(topic, true);
16171613
}
16181614

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -165,10 +165,10 @@ public CompletableFuture<Consumer<T>> subscribeAsync() {
165165
CompletableFuture<Boolean> deadLetterTopicMetadata = checkDlqAlreadyExists(oldDeadLetterTopic);
166166
applyDLQConfig = CompletableFuture.allOf(retryLetterTopicMetadata, deadLetterTopicMetadata)
167167
.thenAccept(__ -> {
168-
String retryLetterTopic = topicFirst + "-" + conf.getSubscriptionName()
169-
+ RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX;
170-
String deadLetterTopic = topicFirst + "-" + conf.getSubscriptionName()
171-
+ RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX;
168+
String retryLetterTopic = RetryMessageUtil.getRetryTopic(topicFirst.toString(),
169+
conf.getSubscriptionName());
170+
String deadLetterTopic = RetryMessageUtil.getDLQTopic(topicFirst.toString(),
171+
conf.getSubscriptionName());
172172
if (retryLetterTopicMetadata.join()) {
173173
retryLetterTopic = oldRetryLetterTopic;
174174
}

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -395,16 +395,13 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat
395395
} else {
396396
this.deadLetterPolicy = DeadLetterPolicy.builder()
397397
.maxRedeliverCount(conf.getDeadLetterPolicy().getMaxRedeliverCount())
398-
.deadLetterTopic(String.format("%s-%s" + RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX,
399-
topic, subscription))
398+
.deadLetterTopic(RetryMessageUtil.getDLQTopic(topic, subscription))
400399
.build();
401400
}
402401
if (StringUtils.isNotBlank(conf.getDeadLetterPolicy().getRetryLetterTopic())) {
403402
this.deadLetterPolicy.setRetryLetterTopic(conf.getDeadLetterPolicy().getRetryLetterTopic());
404403
} else {
405-
this.deadLetterPolicy.setRetryLetterTopic(String.format(
406-
"%s-%s" + RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX,
407-
topic, subscription));
404+
this.deadLetterPolicy.setRetryLetterTopic(RetryMessageUtil.getRetryTopic(topic, subscription));
408405
}
409406
if (StringUtils.isNotBlank(conf.getDeadLetterPolicy().getInitialSubscriptionName())) {
410407
this.deadLetterPolicy.setInitialSubscriptionName(

pulsar-client/src/main/java/org/apache/pulsar/client/util/RetryMessageUtil.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,4 +31,12 @@ public class RetryMessageUtil {
3131
public static final int MAX_RECONSUMETIMES = 16;
3232
public static final String RETRY_GROUP_TOPIC_SUFFIX = "-RETRY";
3333
public static final String DLQ_GROUP_TOPIC_SUFFIX = "-DLQ";
34+
35+
public static String getRetryTopic(String topic, String subscription) {
36+
return topic + "-" + subscription + RETRY_GROUP_TOPIC_SUFFIX;
37+
}
38+
39+
public static String getDLQTopic(String topic, String subscription) {
40+
return topic + "-" + subscription + DLQ_GROUP_TOPIC_SUFFIX;
41+
}
3442
}

0 commit comments

Comments
 (0)