Skip to content

Commit

Permalink
Issue #1273: Deal with unavailable Kafka broker in KafkaPublisherActor.
Browse files Browse the repository at this point in the history
Until now a Ditto Kafka connection with targets only did not cope properly with the Kafka broker becoming unavailable.
In such cases, publishing timed out and produced appropriate entries in connection log.
Connection live status however, remained always 'open'.
With this commit, a TimeoutException while publishing is handled in a way that a ConnectionFailure is propagated to parent BaseClientActor.
This sets connection live status to 'misconfigured' and triggers reconnect with back-off semantic.
Thus, a potentially unavailable Kafka broker gets appropriately reflected by connection status.

Signed-off-by: Juergen Fickel <juergen.fickel@bosch.io>
  • Loading branch information
Juergen Fickel committed Jan 18, 2022
1 parent f2e7648 commit fc7a047
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 1 deletion.
Expand Up @@ -15,6 +15,7 @@
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
Expand Down Expand Up @@ -53,6 +54,7 @@
import org.eclipse.ditto.connectivity.service.messaging.ConnectivityStatusResolver;
import org.eclipse.ditto.connectivity.service.messaging.ExceptionToAcknowledgementConverter;
import org.eclipse.ditto.connectivity.service.messaging.SendResult;
import org.eclipse.ditto.connectivity.service.messaging.internal.ConnectionFailure;
import org.eclipse.ditto.json.JsonField;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.json.JsonObjectBuilder;
Expand Down Expand Up @@ -181,6 +183,18 @@ protected CompletionStage<SendResult> publishMessage(final Signal<?> signal,
.withHeader("ditto-connection-id", connection.getId().toString());

return producerStream.publish(publishTarget, messageWithConnectionIdHeader)
.whenComplete((recordMetadata, throwable) -> {
if (null != throwable) {
final var context = getContext();
final var parent = context.getParent();
final var self = getSelf();
parent.tell(ConnectionFailure.of(self,
throwable,
ConnectionFailure.determineFailureDescription(Instant.now(),
throwable,
"Broker may not be available.")), self);
}
})
.thenApply(callback);
}

Expand Down
3 changes: 2 additions & 1 deletion connectivity/service/src/main/resources/connectivity.conf
Expand Up @@ -20,8 +20,9 @@ ditto {
# Kafka
{exceptionName: "org.apache.kafka.common.errors.SaslAuthenticationException", messagePattern: ".*"}
{exceptionName: "org.apache.kafka.common.errors.UnsupportedSaslMechanismException", messagePattern: ".*"}
{exceptionName: "org.apache.kafka.common.errors.IllegalSaslStateException", messagePattern: ".*"},
{exceptionName: "org.apache.kafka.common.errors.IllegalSaslStateException", messagePattern: ".*"}
{exceptionName: "org.apache.kafka.common.errors.SslAuthenticationException", messagePattern: ".*"}
{exceptionName: "org.apache.kafka.common.errors.TimeoutException", messagePattern: ".*"}
# MQTT
{exceptionName: "com.hivemq.client.mqtt.exceptions.ConnectionClosedException", messagePattern: "Server closed connection without DISCONNECT.*"}
{exceptionName: "com.hivemq.client.mqtt.exceptions.ConnectionClosedException", messagePattern: "io.netty.channel.unix.Errors$NativeIoException.*"}
Expand Down

0 comments on commit fc7a047

Please sign in to comment.