Skip to content

Commit

Permalink
spring-projectsGH-8638: Kafka: Send All Fails to Failure Channel
Browse files Browse the repository at this point in the history
Resolves spring-projects#8638

Previously, immediate failures (e.g. timeout getting metadata) were
only thrown as exceptions, and not sent to the failure channel, if present.

**cherry-pick to all supported branches**
  • Loading branch information
garyrussell committed Jun 8, 2023
1 parent 95f1eb8 commit c4ee551
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 15 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2013-2022 the original author or authors.
* Copyright 2013-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -438,6 +438,7 @@ public String getComponentType() {
return this.isGateway ? "kafka:outbound-gateway" : "kafka:outbound-channel-adapter";
}

@Nullable
protected MessageChannel getSendFailureChannel() {
if (this.sendFailureChannel != null) {
return this.sendFailureChannel;
Expand Down Expand Up @@ -515,19 +516,27 @@ protected Object handleRequestMessage(final Message<?> message) {
}
CompletableFuture<SendResult<K, V>> sendFuture;
RequestReplyFuture<K, V, Object> gatewayFuture = null;
if (this.isGateway && (!preBuilt || producerRecord.headers().lastHeader(KafkaHeaders.REPLY_TOPIC) == null)) {
producerRecord.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, getReplyTopic(message)));
gatewayFuture = ((ReplyingKafkaTemplate<K, V, Object>) this.kafkaTemplate).sendAndReceive(producerRecord);
sendFuture = gatewayFuture.getSendFuture();
}
else {
if (this.transactional && !this.kafkaTemplate.inTransaction() && !this.allowNonTransactional) {
sendFuture = this.kafkaTemplate.executeInTransaction(template -> template.send(producerRecord));
try {
if (this.isGateway
&& (!preBuilt || producerRecord.headers().lastHeader(KafkaHeaders.REPLY_TOPIC) == null)) {
producerRecord.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, getReplyTopic(message)));
gatewayFuture = ((ReplyingKafkaTemplate<K, V, Object>) this.kafkaTemplate)
.sendAndReceive(producerRecord);
sendFuture = gatewayFuture.getSendFuture();
}
else {
sendFuture = this.kafkaTemplate.send(producerRecord);
if (this.transactional && !this.kafkaTemplate.inTransaction() && !this.allowNonTransactional) {
sendFuture = this.kafkaTemplate.executeInTransaction(template -> template.send(producerRecord));
}
else {
sendFuture = this.kafkaTemplate.send(producerRecord);
}
}
}
catch (RuntimeException rtex) {
sendFailure(message, producerRecord, getSendFailureChannel(), rtex);
throw rtex;
}
sendFutureIfRequested(sendFuture, futureToken);
if (flush) {
this.kafkaTemplate.flush();
Expand Down Expand Up @@ -699,10 +708,8 @@ public void processSendResult(final Message<?> message, final ProducerRecord<K,
.build());
}
}
else if (failureChannel != null) {
KafkaProducerMessageHandler.this.messagingTemplate.send(failureChannel,
KafkaProducerMessageHandler.this.errorMessageStrategy.buildErrorMessage(
new KafkaSendFailureException(message, producerRecord, exception), null));
else {
sendFailure(message, producerRecord, failureChannel, exception);
}
});
}
Expand Down Expand Up @@ -730,6 +737,16 @@ else if (failureChannel != null) {
}
}

private void sendFailure(final Message<?> message, final ProducerRecord<K, V> producerRecord,
@Nullable MessageChannel failureChannel, Throwable exception) {

if (failureChannel != null) {
KafkaProducerMessageHandler.this.messagingTemplate.send(failureChannel,
KafkaProducerMessageHandler.this.errorMessageStrategy.buildErrorMessage(
new KafkaSendFailureException(message, producerRecord, exception), null));
}
}

private Future<?> processReplyFuture(@Nullable RequestReplyFuture<?, ?, Object> future) {
if (future == null) {
return null;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2022 the original author or authors.
* Copyright 2016-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -60,6 +60,7 @@
import org.springframework.integration.kafka.support.KafkaIntegrationHeaders;
import org.springframework.integration.kafka.support.KafkaSendFailureException;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.kafka.KafkaException;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
Expand Down Expand Up @@ -93,6 +94,7 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.assertj.core.api.InstanceOfAssertFactories.throwable;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.BDDMockito.given;
Expand Down Expand Up @@ -344,6 +346,35 @@ protected CompletableFuture<SendResult<Integer, String>> doSend(
producerFactory.destroy();
}

@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
void immediateFailure() {
Producer producer = mock(Producer.class);
CompletableFuture cf = new CompletableFuture();
RuntimeException rte = new RuntimeException("test.immediate");
cf.completeExceptionally(rte);
given(producer.send(any(), any())).willReturn(cf);
ProducerFactory pf = mock(ProducerFactory.class);
given(pf.createProducer()).willReturn(producer);
KafkaTemplate template = new KafkaTemplate(pf);
template.setDefaultTopic("foo");
KafkaProducerMessageHandler handler = new KafkaProducerMessageHandler<>(template);
QueueChannel fails = new QueueChannel();
handler.setSendFailureChannel(fails);
assertThatExceptionOfType(MessageHandlingException.class).isThrownBy(
() -> handler.handleMessage(new GenericMessage<>("")))
.withCauseExactlyInstanceOf(KafkaException.class)
.withStackTraceContaining("test.immediate");
Message<?> fail = fails.receive(0);
assertThat(fail).isNotNull();
assertThat(fail.getPayload())
.asInstanceOf(throwable(KafkaSendFailureException.class))
.cause()
.isInstanceOf(KafkaException.class)
.cause()
.isEqualTo(rte);
}

@Test
void testOutboundWithCustomHeaderMapper() {
DefaultKafkaProducerFactory<Integer, String> producerFactory = new DefaultKafkaProducerFactory<>(
Expand Down
1 change: 1 addition & 0 deletions src/checkstyle/checkstyle.xml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
value="org.assertj.core.api.Assertions.*,
org.xmlunit.assertj3.XmlAssert.*,
org.assertj.core.api.Assumptions.*,
org.assertj.core.api.InstanceOfAssertFactories.*,
org.awaitility.Awaitility.*,
org.mockito.Mockito.*,
org.mockito.BDDMockito.*,
Expand Down

0 comments on commit c4ee551

Please sign in to comment.