diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/kafka/KafkaPublisherActor.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/kafka/KafkaPublisherActor.java index 18f22c3e36..a6c71e815d 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/kafka/KafkaPublisherActor.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/kafka/KafkaPublisherActor.java @@ -15,6 +15,7 @@ import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; +import java.time.Instant; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -53,6 +54,7 @@ import org.eclipse.ditto.connectivity.service.messaging.ConnectivityStatusResolver; import org.eclipse.ditto.connectivity.service.messaging.ExceptionToAcknowledgementConverter; import org.eclipse.ditto.connectivity.service.messaging.SendResult; +import org.eclipse.ditto.connectivity.service.messaging.internal.ConnectionFailure; import org.eclipse.ditto.json.JsonField; import org.eclipse.ditto.json.JsonObject; import org.eclipse.ditto.json.JsonObjectBuilder; @@ -181,6 +183,18 @@ protected CompletionStage publishMessage(final Signal signal, .withHeader("ditto-connection-id", connection.getId().toString()); return producerStream.publish(publishTarget, messageWithConnectionIdHeader) + .whenComplete((recordMetadata, throwable) -> { + if (null != throwable) { + final var context = getContext(); + final var parent = context.getParent(); + final var self = getSelf(); + parent.tell(ConnectionFailure.of(self, + throwable, + ConnectionFailure.determineFailureDescription(Instant.now(), + throwable, + "Broker may not be available.")), self); + } + }) .thenApply(callback); } diff --git a/connectivity/service/src/main/resources/connectivity.conf b/connectivity/service/src/main/resources/connectivity.conf index 7b2b38f54e..d8f5d69b25 100644 --- a/connectivity/service/src/main/resources/connectivity.conf +++ b/connectivity/service/src/main/resources/connectivity.conf @@ -20,8 +20,9 @@ ditto { # Kafka {exceptionName: "org.apache.kafka.common.errors.SaslAuthenticationException", messagePattern: ".*"} {exceptionName: "org.apache.kafka.common.errors.UnsupportedSaslMechanismException", messagePattern: ".*"} - {exceptionName: "org.apache.kafka.common.errors.IllegalSaslStateException", messagePattern: ".*"}, + {exceptionName: "org.apache.kafka.common.errors.IllegalSaslStateException", messagePattern: ".*"} {exceptionName: "org.apache.kafka.common.errors.SslAuthenticationException", messagePattern: ".*"} + {exceptionName: "org.apache.kafka.common.errors.TimeoutException", messagePattern: ".*"} # MQTT {exceptionName: "com.hivemq.client.mqtt.exceptions.ConnectionClosedException", messagePattern: "Server closed connection without DISCONNECT.*"} {exceptionName: "com.hivemq.client.mqtt.exceptions.ConnectionClosedException", messagePattern: "io.netty.channel.unix.Errors$NativeIoException.*"}