From 045771be4dbcc8134ab2b0caa7e44b96b8830aec Mon Sep 17 00:00:00 2001 From: dezhiliu Date: Sun, 5 Apr 2020 10:58:05 +0800 Subject: [PATCH] add Tests --- .../pulsar/client/api/RetryTopicTest.java | 274 ++++++++++++++++++ .../pulsar/client/impl/ConsumerImpl.java | 4 +- 2 files changed, 276 insertions(+), 2 deletions(-) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java new file mode 100644 index 00000000000000..820c813877499f --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java @@ -0,0 +1,274 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.util.concurrent.TimeUnit; + +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertNotNull; + +public class RetryTopicTest extends ProducerConsumerBase { + + private static final Logger log = LoggerFactory.getLogger(RetryTopicTest.class); + + @BeforeMethod + @Override + protected void setup() throws Exception { + super.internalSetup(); + super.producerBaseSetup(); + } + + @AfterMethod + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Test + public void testRetryTopic() throws Exception { + final String topic = "persistent://my-property/my-ns/retry-topic"; + + final int maxRedeliveryCount = 2; + + final int sendMessages = 100; + + Consumer consumer = pulsarClient.newConsumer(Schema.BYTES) + .topic(topic) + .subscriptionName("my-subscription") + .subscriptionType(SubscriptionType.Shared) + .enableRetry(true) + .deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).build()) + .receiverQueueSize(100) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + + PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection + Consumer deadLetterConsumer = newPulsarClient.newConsumer(Schema.BYTES) + .topic("persistent://my-property/my-ns/my-subscription-DLQ") + .subscriptionName("my-subscription") + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + + Producer producer = pulsarClient.newProducer(Schema.BYTES) + .topic(topic) + .create(); + + for (int i = 0; i < sendMessages; i++) { + producer.send(String.format("Hello Pulsar [%d]", i).getBytes()); + } + + producer.close(); + + int totalReceived = 0; + do { + Message message = consumer.receive(); + log.info("consumer received message : {} {}", message.getMessageId(), new String(message.getData())); + consumer.reconsumeLater(message, 1 , TimeUnit.SECONDS); + totalReceived++; + } while (totalReceived < sendMessages * (maxRedeliveryCount + 1)); + + int totalInDeadLetter = 0; + do { + Message message = deadLetterConsumer.receive(); + log.info("dead letter consumer received message : {} {}", message.getMessageId(), new String(message.getData())); + deadLetterConsumer.acknowledge(message); + totalInDeadLetter++; + } while (totalInDeadLetter < sendMessages); + + deadLetterConsumer.close(); + consumer.close(); + + Consumer checkConsumer = this.pulsarClient.newConsumer(Schema.BYTES) + .topic(topic) + .subscriptionName("my-subscription") + .subscriptionType(SubscriptionType.Shared) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + + Message checkMessage = checkConsumer.receive(3, TimeUnit.SECONDS); + if (checkMessage != null) { + log.info("check consumer received message : {} {}", checkMessage.getMessageId(), new String(checkMessage.getData())); + } + assertNull(checkMessage); + + checkConsumer.close(); + newPulsarClient.close(); + } + + /** + * The test is disabled {@link https://github.com/apache/pulsar/issues/2647}. + * @throws Exception + */ + @Test + public void testDeadLetterTopicWithMultiTopic() throws Exception { + final String topic1 = "persistent://my-property/my-ns/retry-topic-1"; + final String topic2 = "persistent://my-property/my-ns/retry-topic-2"; + final String topic3 = "persistent://my-property/my-ns/retry-topic-3"; + + final int maxRedeliveryCount = 2; + + int sendMessages = 50; + + // subscribe to the original topics before publish + Consumer consumer = pulsarClient.newConsumer(Schema.BYTES) + .topic(topic1, topic2, topic3) + .subscriptionName("my-subscription") + .subscriptionType(SubscriptionType.Shared) + .enableRetry(true) + .deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).build()) + .receiverQueueSize(100) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + + // subscribe to the DLQ topics before consuming original topics + Consumer deadLetterConsumer = pulsarClient.newConsumer(Schema.BYTES) + .topic("persistent://my-property/my-ns/my-subscription-DLQ") + .subscriptionName("my-subscription") + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + + Producer producer1 = pulsarClient.newProducer(Schema.BYTES) + .topic(topic1) + .create(); + + Producer producer2 = pulsarClient.newProducer(Schema.BYTES) + .topic(topic2) + .create(); + + Producer producer3 = pulsarClient.newProducer(Schema.BYTES) + .topic(topic3) + .create(); + for (int i = 0; i < sendMessages; i++) { + producer1.send(String.format("Hello Pulsar [%d]", i).getBytes()); + producer2.send(String.format("Hello Pulsar [%d]", i).getBytes()); + producer3.send(String.format("Hello Pulsar [%d]", i).getBytes()); + } + + sendMessages = sendMessages * 3; + + producer1.close(); + producer2.close(); + + int totalReceived = 0; + do { + Message message = consumer.receive(); + log.info("consumer received message : {} {} - total = {}", + message.getMessageId(), new String(message.getData()), ++totalReceived); + } while (totalReceived < sendMessages * (maxRedeliveryCount + 1)); + + int totalInDeadLetter = 0; + do { + Message message = deadLetterConsumer.receive(); + log.info("dead letter consumer received message : {} {}", message.getMessageId(), new String(message.getData())); + deadLetterConsumer.acknowledge(message); + totalInDeadLetter++; + } while (totalInDeadLetter < sendMessages); + + deadLetterConsumer.close(); + consumer.close(); + + Consumer checkConsumer = pulsarClient.newConsumer(Schema.BYTES) + .topic(topic1, topic2, topic3) + .subscriptionName("my-subscription") + .subscriptionType(SubscriptionType.Shared) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + + Message checkMessage = checkConsumer.receive(3, TimeUnit.SECONDS); + if (checkMessage != null) { + log.info("check consumer received message : {} {}", checkMessage.getMessageId(), new String(checkMessage.getData())); + } + assertNull(checkMessage); + + checkConsumer.close(); + } + + @Test + public void testDeadLetterTopicByCustomTopicName() throws Exception { + final String topic = "persistent://my-property/my-ns/retry-topic"; + final int maxRedeliveryCount = 2; + final int sendMessages = 100; + + // subscribe before publish + Consumer consumer = pulsarClient.newConsumer(Schema.BYTES) + .topic(topic) + .subscriptionName("my-subscription") + .subscriptionType(SubscriptionType.Shared) + .enableRetry(true) + .receiverQueueSize(100) + .deadLetterPolicy(DeadLetterPolicy.builder() + .maxRedeliverCount(maxRedeliveryCount) + .retryLetterTopic("persistent://my-property/my-ns/my-subscription-custom-Retry") + .build()) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection + Consumer deadLetterConsumer = newPulsarClient.newConsumer(Schema.BYTES) + .topic("persistent://my-property/my-ns/my-subscription-DLQ") + .subscriptionName("my-subscription") + .subscribe(); + + Producer producer = pulsarClient.newProducer(Schema.BYTES) + .topic(topic) + .create(); + for (int i = 0; i < sendMessages; i++) { + producer.send(String.format("Hello Pulsar [%d]", i).getBytes()); + } + producer.close(); + + int totalReceived = 0; + do { + Message message = consumer.receive(); + log.info("consumer received message : {} {}", message.getMessageId(), new String(message.getData())); + consumer.reconsumeLater(message, 1 , TimeUnit.SECONDS); + totalReceived++; + } while (totalReceived < sendMessages * (maxRedeliveryCount + 1)); + int totalInDeadLetter = 0; + do { + Message message = deadLetterConsumer.receive(); + log.info("dead letter consumer received message : {} {}", message.getMessageId(), new String(message.getData())); + deadLetterConsumer.acknowledge(message); + totalInDeadLetter++; + } while (totalInDeadLetter < sendMessages); + deadLetterConsumer.close(); + consumer.close(); + PulsarClient newPulsarClient1 = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection + Consumer checkConsumer = newPulsarClient1.newConsumer(Schema.BYTES) + .topic(topic) + .subscriptionName("my-subscription") + .subscriptionType(SubscriptionType.Shared) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + Message checkMessage = checkConsumer.receive(3, TimeUnit.SECONDS); + if (checkMessage != null) { + log.info("check consumer received message : {} {}", checkMessage.getMessageId(), new String(checkMessage.getData())); + } + assertNull(checkMessage); + newPulsarClient.close(); + newPulsarClient1.close(); + checkConsumer.close(); + } + +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 4be829deda879a..a9b05a23ee4d46 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -270,14 +270,14 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat } else { this.deadLetterPolicy = DeadLetterPolicy.builder() .maxRedeliverCount(conf.getDeadLetterPolicy().getMaxRedeliverCount()) - .deadLetterTopic(String.format("%s-%s" + RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX, topic, subscription)) + .deadLetterTopic(String.format("%s-%s" + RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX, topic, subscription)) .build(); } if (StringUtils.isNotBlank(conf.getDeadLetterPolicy().getRetryLetterTopic())) { this.deadLetterPolicy.setRetryLetterTopic(conf.getDeadLetterPolicy().getRetryLetterTopic()); } else { - this.deadLetterPolicy.setRetryLetterTopic(String.format("%s-%s" + RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX, + this.deadLetterPolicy.setRetryLetterTopic(String.format("%s-%s" + RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX, topic, subscription)); }