From 2c1f972a6b3953fdf48ec00a4be1c4784ea5005f Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Wed, 21 Jun 2023 13:45:52 -0400 Subject: [PATCH] GH-2711: Non-Blocking Retries and Custom Publisher Resolves https://github.com/spring-projects/spring-kafka/issues/2711 Previously, it was not easy to provide custom DLPRs, for example to override `createProducerRecord`. --- .../src/main/asciidoc/retrytopic.adoc | 23 ++++ .../DeadLetterPublishingRecoverer.java | 68 ++++++++---- .../DeadLetterPublishingRecovererFactory.java | 104 +++++++++++++----- ...tryTopicConfigurationIntegrationTests.java | 36 +++++- 4 files changed, 181 insertions(+), 50 deletions(-) diff --git a/spring-kafka-docs/src/main/asciidoc/retrytopic.adoc b/spring-kafka-docs/src/main/asciidoc/retrytopic.adoc index cb7b8de676..40dd5d6d7a 100644 --- a/spring-kafka-docs/src/main/asciidoc/retrytopic.adoc +++ b/spring-kafka-docs/src/main/asciidoc/retrytopic.adoc @@ -613,6 +613,29 @@ Starting with version 2.8.4, if you wish to add custom headers (in addition to t By default, any headers added will be cumulative - Kafka headers can contain multiple values. Starting with version 2.9.5, if the `Headers` returned by the function contains a header of type `DeadLetterPublishingRecoverer.SingleRecordHeader`, then any existing values for that header will be removed and only the new single value will remain. +[[custom-dlpr]] +===== Custom DeadLetterPublishingRecoverer + +As can be seen in <> it is possible to customize the default `DeadLetterPublishingRecoverer` instances created by the framework. +However, for some use cases, it is necessary to subclass the `DeadLetterPublishingRecoverer`, for example to override `createProducerRecord()` to modify the contents sent to the retry (or dead-letter) topics. +Starting with version 3.0.9, you can override the `RetryConfigurationSupport.configureDeadLetterPublishingContainerFactory()` method to provide a `DeadLetterPublisherCreator` instance, for example: + +==== +[source, java] +---- +@Override +protected Consumer + configureDeadLetterPublishingContainerFactory() { + + return (factory) -> factory.setDeadLetterPublisherCreator( + (templateResolver, destinationResolver) -> + new CustomDLPR(templateResolver, destinationResolver)); +} +---- +==== + +It is recommended that you use the provided resolvers when constructing the custom instance. + [[retry-topic-combine-blocking]] ==== Combining Blocking and Non-Blocking Retries diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/DeadLetterPublishingRecoverer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/DeadLetterPublishingRecoverer.java index d9cacecae9..24c8aa6622 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/DeadLetterPublishingRecoverer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/DeadLetterPublishingRecoverer.java @@ -81,8 +81,6 @@ public class DeadLetterPublishingRecoverer extends ExceptionClassifier implement private static final long THIRTY = 30L; - private final HeaderNames headerNames = getHeaderNames(); - private final boolean transactional; private final BiFunction, Exception, TopicPartition> destinationResolver; @@ -91,6 +89,8 @@ public class DeadLetterPublishingRecoverer extends ExceptionClassifier implement private final EnumSet whichHeaders = EnumSet.allOf(HeaderNames.HeadersToAdd.class); + private HeaderNames headerNames = getHeaderNames(); + private boolean retainExceptionHeader; private BiFunction, Exception, Headers> headersFunction = DEFAULT_HEADERS_FUNCTION; @@ -115,6 +115,24 @@ public class DeadLetterPublishingRecoverer extends ExceptionClassifier implement private ExceptionHeadersCreator exceptionHeadersCreator = this::addExceptionInfoHeaders; + private Supplier headerNamesSupplier = () -> HeaderNames.Builder + .original() + .offsetHeader(KafkaHeaders.DLT_ORIGINAL_OFFSET) + .timestampHeader(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP) + .timestampTypeHeader(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE) + .topicHeader(KafkaHeaders.DLT_ORIGINAL_TOPIC) + .partitionHeader(KafkaHeaders.DLT_ORIGINAL_PARTITION) + .consumerGroupHeader(KafkaHeaders.DLT_ORIGINAL_CONSUMER_GROUP) + .exception() + .keyExceptionFqcn(KafkaHeaders.DLT_KEY_EXCEPTION_FQCN) + .exceptionFqcn(KafkaHeaders.DLT_EXCEPTION_FQCN) + .exceptionCauseFqcn(KafkaHeaders.DLT_EXCEPTION_CAUSE_FQCN) + .keyExceptionMessage(KafkaHeaders.DLT_KEY_EXCEPTION_MESSAGE) + .exceptionMessage(KafkaHeaders.DLT_EXCEPTION_MESSAGE) + .keyExceptionStacktrace(KafkaHeaders.DLT_KEY_EXCEPTION_STACKTRACE) + .exceptionStacktrace(KafkaHeaders.DLT_EXCEPTION_STACKTRACE) + .build(); + /** * Create an instance with the provided template and a default destination resolving * function that returns a TopicPartition based on the original topic (appended with ".DLT") @@ -188,6 +206,23 @@ public DeadLetterPublishingRecoverer(Map, KafkaOperations, KafkaOperations> templateResolver, + BiFunction, Exception, TopicPartition> destinationResolver) { + this(templateResolver, false, destinationResolver); + } + /** * Create an instance with a template resolving function that receives the failed * consumer record and the exception and returns a {@link KafkaOperations} and a @@ -487,6 +522,9 @@ public void accept(ConsumerRecord record, @Nullable Consumer consume private void addAndEnhanceHeaders(ConsumerRecord record, Exception exception, @Nullable DeserializationException vDeserEx, @Nullable DeserializationException kDeserEx, Headers headers) { + if (this.headerNames == null) { + this.headerNames = this.headerNamesSupplier.get(); + } if (kDeserEx != null) { if (!this.retainExceptionHeader) { headers.remove(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER); @@ -825,25 +863,17 @@ private String getStackTraceAsString(Throwable cause) { * in the sent record. * @return the header names. * @since 2.7 + * @deprecated since 3.0.9 - provide a supplier in the constructor instead */ + @Nullable + @Deprecated(since = "3.0.9", forRemoval = true) protected HeaderNames getHeaderNames() { - return HeaderNames.Builder - .original() - .offsetHeader(KafkaHeaders.DLT_ORIGINAL_OFFSET) - .timestampHeader(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP) - .timestampTypeHeader(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE) - .topicHeader(KafkaHeaders.DLT_ORIGINAL_TOPIC) - .partitionHeader(KafkaHeaders.DLT_ORIGINAL_PARTITION) - .consumerGroupHeader(KafkaHeaders.DLT_ORIGINAL_CONSUMER_GROUP) - .exception() - .keyExceptionFqcn(KafkaHeaders.DLT_KEY_EXCEPTION_FQCN) - .exceptionFqcn(KafkaHeaders.DLT_EXCEPTION_FQCN) - .exceptionCauseFqcn(KafkaHeaders.DLT_EXCEPTION_CAUSE_FQCN) - .keyExceptionMessage(KafkaHeaders.DLT_KEY_EXCEPTION_MESSAGE) - .exceptionMessage(KafkaHeaders.DLT_EXCEPTION_MESSAGE) - .keyExceptionStacktrace(KafkaHeaders.DLT_KEY_EXCEPTION_STACKTRACE) - .exceptionStacktrace(KafkaHeaders.DLT_EXCEPTION_STACKTRACE) - .build(); + return null; + } + + public void setHeaderNamesSupplier(Supplier supplier) { + Assert.notNull(supplier, "'HeaderNames supplier cannot be null"); + this.headerNamesSupplier = supplier; } /** diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DeadLetterPublishingRecovererFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DeadLetterPublishingRecovererFactory.java index de500b29ba..e0c7e45164 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DeadLetterPublishingRecovererFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DeadLetterPublishingRecovererFactory.java @@ -25,6 +25,7 @@ import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; +import java.util.function.Supplier; import org.apache.commons.logging.LogFactory; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -38,6 +39,7 @@ import org.springframework.core.log.LogAccessor; import org.springframework.kafka.core.KafkaOperations; import org.springframework.kafka.listener.DeadLetterPublishingRecoverer; +import org.springframework.kafka.listener.DeadLetterPublishingRecoverer.HeaderNames; import org.springframework.kafka.listener.DeadLetterPublishingRecoverer.SingleRecordHeader; import org.springframework.kafka.listener.SeekUtils; import org.springframework.kafka.listener.TimestampedException; @@ -76,6 +78,8 @@ public class DeadLetterPublishingRecovererFactory { private boolean retainAllRetryHeaderValues = true; + private DeadLetterPublisherCreator dlpCreator = this::create; + public DeadLetterPublishingRecovererFactory(DestinationTopicResolver destinationTopicResolver) { this.destinationTopicResolver = destinationTopicResolver; } @@ -113,6 +117,28 @@ public void setRetainAllRetryHeaderValues(boolean retainAllRetryHeaderValues) { this.retainAllRetryHeaderValues = retainAllRetryHeaderValues; } + /** + * Provide a {@link DeadLetterPublisherCreator}; used to create a subclass of the + * {@link DeadLetterPublishingRecoverer}, instead of the default, for example, to + * modify the published records. + * @param creator the creator, + * @since 3.0.9. + */ + public void setDeadLetterPublisherCreator(DeadLetterPublisherCreator creator) { + Assert.notNull(creator, "'creator' cannot be null"); + this.dlpCreator = creator; + } + + /** + * Set a customizer to customize the default {@link DeadLetterPublishingRecoverer}. + * @param customizer the customizer. + * @see #setDeadLetterPublisherCreator(DeadLetterPublisherCreator) + */ + public void setDeadLetterPublishingRecovererCustomizer(Consumer customizer) { + Assert.notNull(customizer, "'customizer' cannot be null"); + this.recovererCustomizer = customizer; + } + /** * Add exception type to the default list. By default, the following exceptions will * not be retried: @@ -175,31 +201,26 @@ public void alwaysLogListenerException() { @SuppressWarnings("unchecked") public DeadLetterPublishingRecoverer create(String mainListenerId) { Assert.notNull(mainListenerId, "'listenerId' cannot be null"); - DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(// NOSONAR anon. class size - templateResolver(mainListenerId), false, destinationResolver(mainListenerId)) { - - @Override - protected DeadLetterPublishingRecoverer.HeaderNames getHeaderNames() { - return DeadLetterPublishingRecoverer.HeaderNames.Builder - .original() - .offsetHeader(KafkaHeaders.ORIGINAL_OFFSET) - .timestampHeader(KafkaHeaders.ORIGINAL_TIMESTAMP) - .timestampTypeHeader(KafkaHeaders.ORIGINAL_TIMESTAMP_TYPE) - .topicHeader(KafkaHeaders.ORIGINAL_TOPIC) - .partitionHeader(KafkaHeaders.ORIGINAL_PARTITION) - .consumerGroupHeader(KafkaHeaders.ORIGINAL_CONSUMER_GROUP) - .exception() - .keyExceptionFqcn(KafkaHeaders.KEY_EXCEPTION_FQCN) - .exceptionFqcn(KafkaHeaders.EXCEPTION_FQCN) - .exceptionCauseFqcn(KafkaHeaders.EXCEPTION_CAUSE_FQCN) - .keyExceptionMessage(KafkaHeaders.KEY_EXCEPTION_MESSAGE) - .exceptionMessage(KafkaHeaders.EXCEPTION_MESSAGE) - .keyExceptionStacktrace(KafkaHeaders.KEY_EXCEPTION_STACKTRACE) - .exceptionStacktrace(KafkaHeaders.EXCEPTION_STACKTRACE) - .build(); - } - }; - + Supplier headerNamesSupplier = () -> HeaderNames.Builder + .original() + .offsetHeader(KafkaHeaders.ORIGINAL_OFFSET) + .timestampHeader(KafkaHeaders.ORIGINAL_TIMESTAMP) + .timestampTypeHeader(KafkaHeaders.ORIGINAL_TIMESTAMP_TYPE) + .topicHeader(KafkaHeaders.ORIGINAL_TOPIC) + .partitionHeader(KafkaHeaders.ORIGINAL_PARTITION) + .consumerGroupHeader(KafkaHeaders.ORIGINAL_CONSUMER_GROUP) + .exception() + .keyExceptionFqcn(KafkaHeaders.KEY_EXCEPTION_FQCN) + .exceptionFqcn(KafkaHeaders.EXCEPTION_FQCN) + .exceptionCauseFqcn(KafkaHeaders.EXCEPTION_CAUSE_FQCN) + .keyExceptionMessage(KafkaHeaders.KEY_EXCEPTION_MESSAGE) + .exceptionMessage(KafkaHeaders.EXCEPTION_MESSAGE) + .keyExceptionStacktrace(KafkaHeaders.KEY_EXCEPTION_STACKTRACE) + .exceptionStacktrace(KafkaHeaders.EXCEPTION_STACKTRACE) + .build(); + DeadLetterPublishingRecoverer recoverer = this.dlpCreator.create(templateResolver(mainListenerId), + destinationResolver(mainListenerId)); + recoverer.setHeaderNamesSupplier(headerNamesSupplier); recoverer.setHeadersFunction( (consumerRecord, e) -> addHeaders(mainListenerId, consumerRecord, e, getAttempts(consumerRecord))); if (this.headersFunction != null) { @@ -215,16 +236,21 @@ protected DeadLetterPublishingRecoverer.HeaderNames getHeaderNames() { return recoverer; } + private DeadLetterPublishingRecoverer create( + Function, KafkaOperations> templateResolver, + BiFunction, Exception, TopicPartition> destinationResolver) { + + DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(templateResolver, + destinationResolver); + return recoverer; + } + private Function, KafkaOperations> templateResolver(String mainListenerId) { return outRecord -> this.destinationTopicResolver .getDestinationTopicByName(mainListenerId, outRecord.topic()) .getKafkaOperations(); } - public void setDeadLetterPublishingRecovererCustomizer(Consumer customizer) { - this.recovererCustomizer = customizer; - } - private BiFunction, Exception, TopicPartition> destinationResolver(String mainListenerId) { return (cr, ex) -> { if (SeekUtils.isBackoffException(ex)) { @@ -412,4 +438,24 @@ private enum ListenerExceptionLoggingStrategy { AFTER_RETRIES_EXHAUSTED } + + /** + * Implement this interface to create each {@link DeadLetterPublishingRecoverer}. + * + * @since 3.0.9 + */ + @FunctionalInterface + public interface DeadLetterPublisherCreator { + + /** + * Create a {@link DeadLetterPublishingRecoverer} using the supplied properties. + * @param templateResolver the template resolver. + * @param destinationResolver the destination resolver. + * @return the publisher. + */ + DeadLetterPublishingRecoverer create(Function, KafkaOperations> templateResolver, + BiFunction, Exception, TopicPartition> destinationResolver); + + } + } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationIntegrationTests.java index a74136923e..2ad2ab8c35 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2021-2022 the original author or authors. + * Copyright 2021-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,10 +24,15 @@ import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.function.BiFunction; +import java.util.function.Function; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; @@ -37,12 +42,15 @@ import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; +import org.springframework.kafka.config.KafkaListenerEndpointRegistry; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaAdmin; +import org.springframework.kafka.core.KafkaOperations; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.listener.DeadLetterPublishingRecoverer; import org.springframework.kafka.test.EmbeddedKafkaBroker; import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.kafka.test.utils.KafkaTestUtils; @@ -66,7 +74,8 @@ class RetryTopicConfigurationIntegrationTests { @Test void includeTopic(@Autowired EmbeddedKafkaBroker broker, @Autowired ConsumerFactory cf, @Autowired KafkaTemplate template, @Autowired Config config, - @Autowired RetryTopicComponentFactory componentFactory) throws InterruptedException { + @Autowired RetryTopicComponentFactory componentFactory, @Autowired KafkaListenerEndpointRegistry registry) + throws InterruptedException { Consumer consumer = cf.createConsumer("grp2", ""); Map> topics = consumer.listTopics(); @@ -76,6 +85,11 @@ void includeTopic(@Autowired EmbeddedKafkaBroker broker, @Autowired ConsumerFact template.send(TOPIC1, "foo"); assertThat(config.latch.await(10, TimeUnit.SECONDS)).isTrue(); verify(componentFactory).destinationTopicResolver(); + assertThat(registry.getListenerContainer(TOPIC1)) + .extracting("commonErrorHandler") + .extracting("failureTracker") + .extracting("recoverer") + .isInstanceOf(CustomDLPR.class); } @Configuration(proxyBeanMethods = false) @@ -139,6 +153,15 @@ RetryTopicConfiguration retryTopicConfiguration1(KafkaTemplate .create(template); } + @Override + protected java.util.function.Consumer + configureDeadLetterPublishingContainerFactory() { + + return (factory) -> factory.setDeadLetterPublisherCreator( + (templateResolver, destinationResolver) -> + new CustomDLPR(templateResolver, destinationResolver)); + } + @Bean TaskScheduler sched() { return new ThreadPoolTaskScheduler(); @@ -146,4 +169,13 @@ TaskScheduler sched() { } + static class CustomDLPR extends DeadLetterPublishingRecoverer { + + CustomDLPR(Function, KafkaOperations> templateResolver, + BiFunction, Exception, TopicPartition> destinationResolver) { + super(templateResolver, destinationResolver); + } + + } + }