diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java new file mode 100644 index 0000000000000..b29e12acb9bf1 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java @@ -0,0 +1,269 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.util.concurrent.TimeUnit; + +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertNotNull; + +public class RetryTopicTest extends ProducerConsumerBase { + + private static final Logger log = LoggerFactory.getLogger(RetryTopicTest.class); + + @BeforeMethod + @Override + protected void setup() throws Exception { + super.internalSetup(); + super.producerBaseSetup(); + } + + @AfterMethod + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Test + public void testRetryTopic() throws Exception { + final String topic = "persistent://my-property/my-ns/retry-topic"; + + final int maxRedeliveryCount = 2; + + final int sendMessages = 100; + + Consumer consumer = pulsarClient.newConsumer(Schema.BYTES) + .topic(topic) + .subscriptionName("my-subscription") + .subscriptionType(SubscriptionType.Shared) + .enableRetry(true) + .deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).build()) + .receiverQueueSize(100) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + + PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection + Consumer deadLetterConsumer = newPulsarClient.newConsumer(Schema.BYTES) + .topic("persistent://my-property/my-ns/my-subscription-DLQ") + .subscriptionName("my-subscription") + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + + Producer producer = pulsarClient.newProducer(Schema.BYTES) + .topic(topic) + .create(); + + for (int i = 0; i < sendMessages; i++) { + producer.send(String.format("Hello Pulsar [%d]", i).getBytes()); + } + + producer.close(); + + int totalReceived = 0; + do { + Message message = consumer.receive(); + log.info("consumer received message : {} {}", message.getMessageId(), new String(message.getData())); + consumer.reconsumeLater(message, 1 , TimeUnit.SECONDS); + totalReceived++; + } while (totalReceived < sendMessages * (maxRedeliveryCount + 1)); + + int totalInDeadLetter = 0; + do { + Message message = deadLetterConsumer.receive(); + log.info("dead letter consumer received message : {} {}", message.getMessageId(), new String(message.getData())); + deadLetterConsumer.acknowledge(message); + totalInDeadLetter++; + } while (totalInDeadLetter < sendMessages); + + deadLetterConsumer.close(); + consumer.close(); + + Consumer checkConsumer = this.pulsarClient.newConsumer(Schema.BYTES) + .topic(topic) + .subscriptionName("my-subscription") + .subscriptionType(SubscriptionType.Shared) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + + Message checkMessage = checkConsumer.receive(3, TimeUnit.SECONDS); + if (checkMessage != null) { + log.info("check consumer received message : {} {}", checkMessage.getMessageId(), new String(checkMessage.getData())); + } + assertNull(checkMessage); + + checkConsumer.close(); + newPulsarClient.close(); + } + + /** + * The test is disabled {@link https://github.com/apache/pulsar/issues/2647}. + * @throws Exception + */ + @Test + public void testRetryTopicWithMultiTopic() throws Exception { + final String topic1 = "persistent://my-property/my-ns/retry-topic-1"; + final String topic2 = "persistent://my-property/my-ns/retry-topic-2"; + + final int maxRedeliveryCount = 2; + + int sendMessages = 100; + + // subscribe to the original topics before publish + Consumer consumer = pulsarClient.newConsumer(Schema.BYTES) + .topic(topic1, topic2) + .subscriptionName("my-subscription") + .subscriptionType(SubscriptionType.Shared) + .enableRetry(true) + .deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).build()) + .receiverQueueSize(100) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + + // subscribe to the DLQ topics before consuming original topics + Consumer deadLetterConsumer = pulsarClient.newConsumer(Schema.BYTES) + .topic("persistent://my-property/my-ns/my-subscription-DLQ") + .subscriptionName("my-subscription") + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + + Producer producer1 = pulsarClient.newProducer(Schema.BYTES) + .topic(topic1) + .create(); + + Producer producer2 = pulsarClient.newProducer(Schema.BYTES) + .topic(topic2) + .create(); + + for (int i = 0; i < sendMessages; i++) { + producer1.send(String.format("Hello Pulsar [%d]", i).getBytes()); + producer2.send(String.format("Hello Pulsar [%d]", i).getBytes()); + } + + sendMessages = sendMessages * 2; + + producer1.close(); + producer2.close(); + + int totalReceived = 0; + do { + Message message = consumer.receive(); + log.info("consumer received message : {} {} - total = {}", + message.getMessageId(), new String(message.getData()), ++totalReceived); + } while (totalReceived < sendMessages * (maxRedeliveryCount + 1)); + + int totalInDeadLetter = 0; + do { + Message message = deadLetterConsumer.receive(); + log.info("dead letter consumer received message : {} {}", message.getMessageId(), new String(message.getData())); + deadLetterConsumer.acknowledge(message); + totalInDeadLetter++; + } while (totalInDeadLetter < sendMessages); + + deadLetterConsumer.close(); + consumer.close(); + + Consumer checkConsumer = pulsarClient.newConsumer(Schema.BYTES) + .topic(topic1, topic2) + .subscriptionName("my-subscription") + .subscriptionType(SubscriptionType.Shared) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + + Message checkMessage = checkConsumer.receive(3, TimeUnit.SECONDS); + if (checkMessage != null) { + log.info("check consumer received message : {} {}", checkMessage.getMessageId(), new String(checkMessage.getData())); + } + assertNull(checkMessage); + + checkConsumer.close(); + } + + @Test + public void testRetryTopicByCustomTopicName() throws Exception { + final String topic = "persistent://my-property/my-ns/retry-topic"; + final int maxRedeliveryCount = 2; + final int sendMessages = 100; + + // subscribe before publish + Consumer consumer = pulsarClient.newConsumer(Schema.BYTES) + .topic(topic) + .subscriptionName("my-subscription") + .subscriptionType(SubscriptionType.Shared) + .enableRetry(true) + .receiverQueueSize(100) + .deadLetterPolicy(DeadLetterPolicy.builder() + .maxRedeliverCount(maxRedeliveryCount) + .retryLetterTopic("persistent://my-property/my-ns/my-subscription-custom-Retry") + .build()) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection + Consumer deadLetterConsumer = newPulsarClient.newConsumer(Schema.BYTES) + .topic("persistent://my-property/my-ns/my-subscription-DLQ") + .subscriptionName("my-subscription") + .subscribe(); + + Producer producer = pulsarClient.newProducer(Schema.BYTES) + .topic(topic) + .create(); + for (int i = 0; i < sendMessages; i++) { + producer.send(String.format("Hello Pulsar [%d]", i).getBytes()); + } + producer.close(); + + int totalReceived = 0; + do { + Message message = consumer.receive(); + log.info("consumer received message : {} {}", message.getMessageId(), new String(message.getData())); + consumer.reconsumeLater(message, 1 , TimeUnit.SECONDS); + totalReceived++; + } while (totalReceived < sendMessages * (maxRedeliveryCount + 1)); + int totalInDeadLetter = 0; + do { + Message message = deadLetterConsumer.receive(); + log.info("dead letter consumer received message : {} {}", message.getMessageId(), new String(message.getData())); + deadLetterConsumer.acknowledge(message); + totalInDeadLetter++; + } while (totalInDeadLetter < sendMessages); + deadLetterConsumer.close(); + consumer.close(); + PulsarClient newPulsarClient1 = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection + Consumer checkConsumer = newPulsarClient1.newConsumer(Schema.BYTES) + .topic(topic) + .subscriptionName("my-subscription") + .subscriptionType(SubscriptionType.Shared) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + Message checkMessage = checkConsumer.receive(3, TimeUnit.SECONDS); + if (checkMessage != null) { + log.info("check consumer received message : {} {}", checkMessage.getMessageId(), new String(checkMessage.getData())); + } + assertNull(checkMessage); + newPulsarClient.close(); + newPulsarClient1.close(); + checkConsumer.close(); + } + +} diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java index b77d1b1e2e58b..1ee5bfa23f43b 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java @@ -243,6 +243,53 @@ public interface Consumer extends Closeable { */ void negativeAcknowledge(Messages messages); + /** + * reconsumeLater the consumption of {@link Messages}. + * + *

When a message is "reconsumeLater" it will be marked for redelivery after + * some custom delay. + * + *

Example of usage: + *


+     * while (true) {
+     *     Message<String> msg = consumer.receive();
+     *
+     *     try {
+     *          // Process message...
+     *
+     *          consumer.acknowledge(msg);
+     *     } catch (Throwable t) {
+     *          log.warn("Failed to process message");
+     *          consumer.reconsumeLater(msg, 1000 , TimeUnit.MILLISECONDS);
+     *     }
+     * }
+     * 
+ * + * @param message + * the {@code Message} to be reconsumeLater + * @param delayTime + * the amount of delay before the message will be delivered + * @param unit + * the time unit for the delay + * @throws PulsarClientException.AlreadyClosedException + * if the consumer was already closed + */ + void reconsumeLater(Message message, long delayTime, TimeUnit unit) throws PulsarClientException; + + /** + * reconsumeLater the consumption of {@link Messages}. + * + * @param messages + * the {@code messages} to be reconsumeLater + * @param delayTime + * the amount of delay before the message will be delivered + * @param unit + * the time unit for the delay + * @throws PulsarClientException.AlreadyClosedException + * if the consumer was already closed + */ + void reconsumeLater(Messages messages, long delayTime, TimeUnit unit) throws PulsarClientException; + /** * Acknowledge the reception of all the messages in the stream up to (and including) the provided message. * @@ -277,6 +324,20 @@ public interface Consumer extends Closeable { */ void acknowledgeCumulative(MessageId messageId) throws PulsarClientException; + /** + * reconsumeLater the reception of all the messages in the stream up to (and including) the provided message. + * + * @param message + * The {@code message} to be cumulatively reconsumeLater + * @param delayTime + * the amount of delay before the message will be delivered + * @param unit + * the time unit for the delay + * @throws PulsarClientException.AlreadyClosedException + * if the consumer was already closed + */ + void reconsumeLaterCumulative(Message message, long delayTime, TimeUnit unit) throws PulsarClientException; + /** * Asynchronously acknowledge the consumption of a single message. * @@ -304,6 +365,32 @@ public interface Consumer extends Closeable { */ CompletableFuture acknowledgeAsync(Messages messages); + /** + * Asynchronously reconsumeLater the consumption of a single message. + * + * @param message + * The {@code Message} to be reconsumeLater + * @param delayTime + * the amount of delay before the message will be delivered + * @param unit + * the time unit for the delay + * @return a future that can be used to track the completion of the operation + */ + CompletableFuture reconsumeLaterAsync(Message message, long delayTime, TimeUnit unit); + + /** + * Asynchronously reconsumeLater the consumption of {@link Messages}. + * + * @param messages + * The {@link Messages} to be reconsumeLater + * @param delayTime + * the amount of delay before the message will be delivered + * @param unit + * the time unit for the delay + * @return a future that can be used to track the completion of the operation + */ + CompletableFuture reconsumeLaterAsync(Messages messages, long delayTime, TimeUnit unit); + /** * Asynchronously Acknowledge the reception of all the messages in the stream up to (and including) the provided * message. @@ -328,6 +415,22 @@ public interface Consumer extends Closeable { */ CompletableFuture acknowledgeCumulativeAsync(MessageId messageId); + /** + * Asynchronously ReconsumeLater the reception of all the messages in the stream up to (and including) the provided + * message. + * + *

Cumulative reconsumeLater cannot be used when the consumer type is set to ConsumerShared. + * + * @param message + * The {@code message} to be cumulatively reconsumeLater + * @param delayTime + * the amount of delay before the message will be delivered + * @param unit + * the time unit for the delay + * @return a future that can be used to track the completion of the operation + */ + CompletableFuture reconsumeLaterCumulativeAsync(Message message, long delayTime, TimeUnit unit); + /** * Get statistics for the consumer. *

    diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java index 1e467fb167f40..461ba70a20299 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java @@ -599,4 +599,14 @@ public interface ConsumerBuilder extends Cloneable { * */ ConsumerBuilder batchReceivePolicy(BatchReceivePolicy batchReceivePolicy); + + /** + * If enabled, the consumer will auto retry message. + * default unabled. + * + * @param retryEnable + * whether to auto retry message + */ + ConsumerBuilder enableRetry(boolean retryEnable); + } diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/DeadLetterPolicy.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/DeadLetterPolicy.java index bf69d9c51ecb8..84e1faffff35d 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/DeadLetterPolicy.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/DeadLetterPolicy.java @@ -36,7 +36,12 @@ public class DeadLetterPolicy { private int maxRedeliverCount; /** - * Name of the topic where the failing messages will be sent. + * Name of the retry topic where the failing messages will be sent. + */ + private String retryLetterTopic; + + /** + * Name of the dead topic where the failing messages will be sent. */ private String deadLetterTopic; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java index 59d40417779a3..8f53df98c330c 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java @@ -198,6 +198,32 @@ public void acknowledge(Messages messages) throws PulsarClientException { } } + @Override + public void reconsumeLater(Message message, long delayTime, TimeUnit unit) throws PulsarClientException { + if (conf.isRetryEnable() == false) { + throw new PulsarClientException("reconsumeLater method not support!"); + } + try { + reconsumeLaterAsync(message, delayTime, unit).get(); + } catch (Exception e) { + Throwable t = e.getCause(); + if (t instanceof PulsarClientException) { + throw (PulsarClientException) t; + } else { + throw new PulsarClientException(t); + } + } + } + + @Override + public void reconsumeLater(Messages messages, long delayTime, TimeUnit unit) throws PulsarClientException { + try { + reconsumeLaterAsync(messages, delayTime, unit).get(); + } catch (Exception e) { + throw PulsarClientException.unwrap(e); + } + } + @Override public void acknowledgeCumulative(Message message) throws PulsarClientException { try { @@ -216,6 +242,15 @@ public void acknowledgeCumulative(MessageId messageId) throws PulsarClientExcept } } + @Override + public void reconsumeLaterCumulative(Message message, long delayTime, TimeUnit unit) throws PulsarClientException { + try { + reconsumeLaterCumulativeAsync(message, delayTime, unit).get(); + } catch (Exception e) { + throw PulsarClientException.unwrap(e); + } + } + @Override public CompletableFuture acknowledgeAsync(Message message) { try { @@ -235,6 +270,28 @@ public CompletableFuture acknowledgeAsync(Messages messages) { } } + @Override + public CompletableFuture reconsumeLaterAsync(Message message, long delayTime, TimeUnit unit) { + if (conf.isRetryEnable() == false) { + return FutureUtil.failedFuture(new PulsarClientException("reconsumeLater method not support!")); + } + try { + return doReconsumeLater(message, AckType.Individual, Collections.emptyMap(), delayTime, unit); + } catch (NullPointerException npe) { + return FutureUtil.failedFuture(new PulsarClientException.InvalidMessageException(npe.getMessage())); + } + } + + @Override + public CompletableFuture reconsumeLaterAsync(Messages messages, long delayTime, TimeUnit unit) { + try { + messages.forEach(message -> reconsumeLaterAsync(message,delayTime, unit)); + return CompletableFuture.completedFuture(null); + } catch (NullPointerException npe) { + return FutureUtil.failedFuture(new PulsarClientException.InvalidMessageException(npe.getMessage())); + } + } + @Override public CompletableFuture acknowledgeCumulativeAsync(Message message) { try { @@ -244,6 +301,18 @@ public CompletableFuture acknowledgeCumulativeAsync(Message message) { } } + @Override + public CompletableFuture reconsumeLaterCumulativeAsync(Message message, long delayTime, TimeUnit unit) { + if (conf.isRetryEnable() == false) { + return FutureUtil.failedFuture(new PulsarClientException("reconsumeLater method not support!")); + } + if (!isCumulativeAcknowledgementAllowed(conf.getSubscriptionType())) { + return FutureUtil.failedFuture(new PulsarClientException.InvalidConfigurationException( + "Cannot use cumulative acks on a non-exclusive subscription")); + } + return doReconsumeLater(message, AckType.Cumulative, Collections.emptyMap(), delayTime, unit); + } + @Override public CompletableFuture acknowledgeAsync(MessageId messageId) { return acknowledgeAsync(messageId, null); @@ -306,6 +375,12 @@ protected CompletableFuture doAcknowledgeWithTxn(MessageId messageId, AckT protected abstract CompletableFuture doAcknowledge(MessageId messageId, AckType ackType, Map properties, TransactionImpl txn); + + protected abstract CompletableFuture doReconsumeLater(Message message, AckType ackType, + Map properties, + long delayTime, + TimeUnit unit); + @Override public void negativeAcknowledge(Messages messages) { messages.forEach(this::negativeAcknowledge); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java index 111aaead8a2dd..a1ef384a1079a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java @@ -52,6 +52,8 @@ import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; +import org.apache.pulsar.client.util.RetryMessageUtil; +import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.util.FutureUtil; import com.google.common.collect.Lists; @@ -116,7 +118,26 @@ public CompletableFuture> subscribeAsync() { return FutureUtil.failedFuture( new InvalidConfigurationException("KeySharedPolicy must set with KeyShared subscription")); } - + if(conf.isRetryEnable() == true && conf.getTopicNames().size() > 0 ) { + TopicName topicFisrt = TopicName.get(conf.getTopicNames().iterator().next()); + String retryLetterTopic = topicFisrt.getNamespace() + "/" + conf.getSubscriptionName() + RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX; + String deadLetterTopic = topicFisrt.getNamespace() + "/" + conf.getSubscriptionName() + RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX; + if(conf.getDeadLetterPolicy() == null) { + conf.setDeadLetterPolicy(DeadLetterPolicy.builder() + .maxRedeliverCount(RetryMessageUtil.MAX_RECONSUMETIMES) + .retryLetterTopic(retryLetterTopic) + .deadLetterTopic(deadLetterTopic) + .build()); + } else { + if (StringUtils.isBlank(conf.getDeadLetterPolicy().getRetryLetterTopic())) { + conf.getDeadLetterPolicy().setRetryLetterTopic(retryLetterTopic); + } + if (StringUtils.isBlank(conf.getDeadLetterPolicy().getDeadLetterTopic())) { + conf.getDeadLetterPolicy().setDeadLetterTopic(deadLetterTopic); + } + } + conf.getTopicNames().add(conf.getDeadLetterPolicy().getRetryLetterTopic()); + } return interceptorList == null || interceptorList.size() == 0 ? client.subscribeAsync(conf, schema, null) : client.subscribeAsync(conf, schema, new ConsumerInterceptors<>(interceptorList)); @@ -375,4 +396,11 @@ public ConsumerBuilder keySharedPolicy(KeySharedPolicy keySharedPolicy) { conf.setKeySharedPolicy(keySharedPolicy); return this; } + + @Override + public ConsumerBuilder enableRetry(boolean retryEnable) { + conf.setRetryEnable(retryEnable); + return this; + } + } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 5c59bca0ef0ac..04983d9055406 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -34,11 +34,14 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; @@ -67,10 +70,12 @@ import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.SubscriptionMode; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.api.TypedMessageBuilder; import org.apache.pulsar.client.api.PulsarClientException.TopicDoesNotExistException; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; import org.apache.pulsar.client.impl.crypto.MessageCryptoBc; import org.apache.pulsar.client.impl.transaction.TransactionImpl; +import org.apache.pulsar.client.util.RetryMessageUtil; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.api.EncryptionContext; import org.apache.pulsar.common.api.EncryptionContext.EncryptionKey; @@ -153,6 +158,9 @@ public class ConsumerImpl extends ConsumerBase implements ConnectionHandle private Producer deadLetterProducer; + private volatile Producer retryLetterProducer; + private final ReadWriteLock createProducerLock = new ReentrantReadWriteLock(); + protected volatile boolean paused; private final boolean createTopicIfDoesNotExist; @@ -277,9 +285,17 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat } else { this.deadLetterPolicy = DeadLetterPolicy.builder() .maxRedeliverCount(conf.getDeadLetterPolicy().getMaxRedeliverCount()) - .deadLetterTopic(String.format("%s-%s-DLQ", topic, subscription)) + .deadLetterTopic(String.format("%s-%s" + RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX, topic, subscription)) .build(); } + + if (StringUtils.isNotBlank(conf.getDeadLetterPolicy().getRetryLetterTopic())) { + this.deadLetterPolicy.setRetryLetterTopic(conf.getDeadLetterPolicy().getRetryLetterTopic()); + } else { + this.deadLetterPolicy.setRetryLetterTopic(String.format("%s-%s" + RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX, + topic, subscription)); + } + } else { deadLetterPolicy = null; possibleSendToDeadLetterTopicMessages = null; @@ -511,6 +527,129 @@ protected CompletableFuture doAcknowledge(MessageId messageId, AckType ack return sendAcknowledge(messageId, ackType, properties, txnImpl); } + @SuppressWarnings("unchecked") + @Override + protected CompletableFuture doReconsumeLater(Message message, AckType ackType, + Map properties, + long delayTime, + TimeUnit unit) { + MessageId messageId = message.getMessageId(); + if(messageId instanceof TopicMessageIdImpl) { + messageId = ((TopicMessageIdImpl)messageId).getInnerMessageId(); + } + checkArgument(messageId instanceof MessageIdImpl); + if (getState() != State.Ready && getState() != State.Connecting) { + stats.incrementNumAcksFailed(); + PulsarClientException exception = new PulsarClientException("Consumer not ready. State: " + getState()); + if (AckType.Individual.equals(ackType)) { + onAcknowledge(messageId, exception); + } else if (AckType.Cumulative.equals(ackType)) { + onAcknowledgeCumulative(messageId, exception); + } + return FutureUtil.failedFuture(exception); + } + if (delayTime < 0) { + delayTime = 0; + } + if (retryLetterProducer == null) { + try { + createProducerLock.writeLock().lock(); + if (retryLetterProducer == null) { + retryLetterProducer = client.newProducer(schema) + .topic(this.deadLetterPolicy.getRetryLetterTopic()) + .enableBatching(false) + .blockIfQueueFull(false) + .create(); + } + } catch (Exception e) { + log.error("Create retry letter producer exception with topic: {}", deadLetterPolicy.getRetryLetterTopic(), e); + } finally { + createProducerLock.writeLock().unlock(); + } + } + if (retryLetterProducer != null) { + try { + MessageImpl retryMessage = null; + String originMessageIdStr = null; + String originTopicNameStr = null; + if (message instanceof TopicMessageImpl) { + retryMessage = (MessageImpl) ((TopicMessageImpl) message).getMessage(); + originMessageIdStr = ((TopicMessageIdImpl) message.getMessageId()).getInnerMessageId().toString(); + originTopicNameStr = ((TopicMessageIdImpl) message.getMessageId()).getTopicName(); + } else if (message instanceof MessageImpl) { + retryMessage = (MessageImpl) message; + originMessageIdStr = ((MessageImpl) message).getMessageId().toString(); + originTopicNameStr = ((MessageImpl) message).getTopicName(); + } + SortedMap propertiesMap = new TreeMap<>(); + int reconsumetimes = 1; + if (message.getProperties() != null) { + propertiesMap.putAll(message.getProperties()); + } + + if (propertiesMap.containsKey(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES)) { + reconsumetimes = Integer.valueOf(propertiesMap.get(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES)); + reconsumetimes = reconsumetimes + 1; + + } else { + propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_REAL_TOPIC, originTopicNameStr); + propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID, originMessageIdStr); + } + + propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES, String.valueOf(reconsumetimes)); + propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_DELAY_TIME, String.valueOf(unit.toMillis(delayTime))); + + if (reconsumetimes > this.deadLetterPolicy.getMaxRedeliverCount()) { + processPossibleToDLQ((MessageIdImpl)messageId); + if (deadLetterProducer == null) { + try { + if (deadLetterProducer == null) { + createProducerLock.writeLock().lock(); + deadLetterProducer = client.newProducer(schema) + .topic(this.deadLetterPolicy + .getDeadLetterTopic()) + .blockIfQueueFull(false) + .create(); + } + } catch (Exception e) { + log.error("Create dead letter producer exception with topic: {}", deadLetterPolicy.getDeadLetterTopic(), e); + } finally { + createProducerLock.writeLock().unlock(); + } + } + if (deadLetterProducer != null) { + propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_REAL_TOPIC, originTopicNameStr); + propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID, originMessageIdStr); + TypedMessageBuilder typedMessageBuilderNew = deadLetterProducer.newMessage() + .value(retryMessage.getValue()) + .properties(propertiesMap); + typedMessageBuilderNew.send(); + return doAcknowledge(messageId, ackType, properties, null); + } + } else { + TypedMessageBuilder typedMessageBuilderNew = retryLetterProducer.newMessage() + .value(retryMessage.getValue()) + .properties(propertiesMap); + if (delayTime > 0) { + typedMessageBuilderNew.deliverAfter(delayTime, unit); + } + if (message.hasKey()) { + typedMessageBuilderNew.key(message.getKey()); + } + typedMessageBuilderNew.send(); + return doAcknowledge(messageId, ackType, properties, null); + } + } catch (Exception e) { + log.error("Send to retry letter topic exception with topic: {}, messageId: {}", deadLetterProducer.getTopic(), messageId, e); + Set messageIds = new HashSet<>(); + messageIds.add(messageId); + unAckedMessageTracker.remove(messageId); + redeliverUnacknowledgedMessages(messageIds); + } + } + return CompletableFuture.completedFuture(null); + } + // TODO: handle transactional acknowledgements. private CompletableFuture sendAcknowledge(MessageId messageId, AckType ackType, Map properties, diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index e6c990ceeaa7a..e2016a95317d1 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -471,6 +471,34 @@ protected CompletableFuture doAcknowledge(MessageId messageId, AckType ack } } + @Override + protected CompletableFuture doReconsumeLater(Message message, AckType ackType, + Map properties, + long delayTime, + TimeUnit unit) { + MessageId messageId = message.getMessageId(); + checkArgument(messageId instanceof TopicMessageIdImpl); + TopicMessageIdImpl topicMessageId = (TopicMessageIdImpl) messageId; + if (getState() != State.Ready) { + return FutureUtil.failedFuture(new PulsarClientException("Consumer already closed")); + } + + if (ackType == AckType.Cumulative) { + Consumer individualConsumer = consumers.get(topicMessageId.getTopicPartitionName()); + if (individualConsumer != null) { + MessageId innerId = topicMessageId.getInnerMessageId(); + return individualConsumer.reconsumeLaterCumulativeAsync(message, delayTime, unit); + } else { + return FutureUtil.failedFuture(new PulsarClientException.NotConnectedException()); + } + } else { + ConsumerImpl consumer = consumers.get(topicMessageId.getTopicPartitionName()); + MessageId innerId = topicMessageId.getInnerMessageId(); + return consumer.doReconsumeLater(message, ackType, properties, delayTime, unit) + .thenRun(() ->unAckedMessageTracker.remove(topicMessageId)); + } + } + @Override public void negativeAcknowledge(MessageId messageId) { checkArgument(messageId instanceof TopicMessageIdImpl); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java index bc93bf29c70f4..1314327d19ca0 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java @@ -105,6 +105,8 @@ public class ConsumerConfigurationData implements Serializable, Cloneable { private DeadLetterPolicy deadLetterPolicy; + private boolean retryEnable = false; + @JsonIgnore private BatchReceivePolicy batchReceivePolicy; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/util/RetryMessageUtil.java b/pulsar-client/src/main/java/org/apache/pulsar/client/util/RetryMessageUtil.java new file mode 100644 index 0000000000000..588ec96086748 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/util/RetryMessageUtil.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.util; + +public class RetryMessageUtil { + + public final static String SYSTEM_PROPERTY_RECONSUMETIMES = "RECONSUMETIMES"; + public final static String SYSTEM_PROPERTY_DELAY_TIME = "DELAY_TIME"; + public final static String SYSTEM_PROPERTY_REAL_TOPIC = "REAL_TOPIC"; + public final static String SYSTEM_PROPERTY_RETRY_TOPIC = "RETRY_TOPIC"; + public final static String SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID = "ORIGIN_MESSAGE_IDY_TIME"; + + public final static int MAX_RECONSUMETIMES = 16; + public final static String RETRY_GROUP_TOPIC_SUFFIX = "-RETRY"; + public final static String DLQ_GROUP_TOPIC_SUFFIX = "-DLQ"; +} diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/SampleConsumer.java b/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/SampleConsumer.java index 93ee4d22c050a..305f42259f9cf 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/SampleConsumer.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/SampleConsumer.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.client.tutorial; +import java.util.concurrent.TimeUnit; + import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.PulsarClient; @@ -41,7 +43,12 @@ public static void main(String[] args) throws PulsarClientException, Interrupted } // Acknowledge the consumption of all messages at once - consumer.acknowledgeCumulative(msg); + try { + consumer.acknowledgeCumulative(msg); + } catch (Exception e) { + consumer.reconsumeLater(msg, 10, TimeUnit.SECONDS); + } + pulsarClient.close(); } } diff --git a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSourceTests.java b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSourceTests.java index 88fcadc5ee8d5..cd39d96e247f2 100644 --- a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSourceTests.java +++ b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSourceTests.java @@ -566,6 +566,38 @@ public MessageId getLastMessageId() throws PulsarClientException { public CompletableFuture getLastMessageIdAsync() { return null; } + + @Override + public void reconsumeLater(Message message, long delayTime, TimeUnit unit) throws PulsarClientException { + + } + + @Override + public void reconsumeLater(Messages messages, long delayTime, TimeUnit unit) throws PulsarClientException { + + } + + @Override + public void reconsumeLaterCumulative(Message message, long delayTime, TimeUnit unit) + throws PulsarClientException { + + } + + @Override + public CompletableFuture reconsumeLaterAsync(Message message, long delayTime, TimeUnit unit) { + return null; + } + + @Override + public CompletableFuture reconsumeLaterAsync(Messages messages, long delayTime, TimeUnit unit) { + return null; + } + + @Override + public CompletableFuture reconsumeLaterCumulativeAsync(Message message, long delayTime, + TimeUnit unit) { + return null; + } } private static List createMessages(int startIndex, int numMessages) {