Skip to content

Commit

Permalink
added MQTT exceptions to "user-indicated-errors"
Browse files Browse the repository at this point in the history
* harmonized "ConnectionStatusDetails" string by pulling out static helper to ConnectionFailure interface
* improved on formatting of the "ConnectionStatusDetails"
* for manually set "misconfigured" status, create a more helpful "ConnectionStatusDetails" message

Signed-off-by: Thomas Jaeckle <thomas.jaeckle@bosch.io>
  • Loading branch information
thjaeckle committed Aug 3, 2021
1 parent e72a779 commit 6246dde
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,25 +12,26 @@
*/
package org.eclipse.ditto.concierge.service.actors.cleanup.persistenceids;

import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;

import org.eclipse.ditto.base.model.entity.type.EntityType;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.concierge.service.common.PersistenceIdsConfig;
import org.eclipse.ditto.connectivity.model.ConnectivityConstants;
import org.eclipse.ditto.policies.model.PolicyConstants;
import org.eclipse.ditto.things.model.ThingConstants;
import org.eclipse.ditto.connectivity.api.ConnectivityMessagingConstants;
import org.eclipse.ditto.policies.api.PoliciesMessagingConstants;
import org.eclipse.ditto.connectivity.model.ConnectivityConstants;
import org.eclipse.ditto.internal.models.streaming.BatchedEntityIdWithRevisions;
import org.eclipse.ditto.internal.models.streaming.EntityIdWithRevision;
import org.eclipse.ditto.internal.models.streaming.LowerBound;
import org.eclipse.ditto.internal.models.streaming.SudoStreamPids;
import org.eclipse.ditto.things.api.ThingsMessagingConstants;
import org.eclipse.ditto.internal.utils.akka.controlflow.ResumeSource;
import org.eclipse.ditto.internal.utils.cluster.DistPubSubAccess;
import org.eclipse.ditto.policies.api.PoliciesMessagingConstants;
import org.eclipse.ditto.policies.model.PolicyConstants;
import org.eclipse.ditto.things.api.ThingsMessagingConstants;
import org.eclipse.ditto.things.model.ThingConstants;

import akka.NotUsed;
import akka.actor.ActorRef;
Expand All @@ -46,11 +47,17 @@
*/
public final class PersistenceIdSource {

private static final Map<String, EntityType> PERSISTENCE_STREAMING_ACTOR_PATHS =
Map.of(ThingsMessagingConstants.THINGS_STREAM_PROVIDER_ACTOR_PATH, ThingConstants.ENTITY_TYPE,
PoliciesMessagingConstants.POLICIES_STREAM_PROVIDER_ACTOR_PATH, PolicyConstants.ENTITY_TYPE,
ConnectivityMessagingConstants.STREAM_PROVIDER_ACTOR_PATH,
ConnectivityConstants.ENTITY_TYPE);
private static final Map<String, EntityType> PERSISTENCE_STREAMING_ACTOR_PATHS;

static {
PERSISTENCE_STREAMING_ACTOR_PATHS = new LinkedHashMap<>();
PERSISTENCE_STREAMING_ACTOR_PATHS.put(PoliciesMessagingConstants.POLICIES_STREAM_PROVIDER_ACTOR_PATH,
PolicyConstants.ENTITY_TYPE);
PERSISTENCE_STREAMING_ACTOR_PATHS.put(ThingsMessagingConstants.THINGS_STREAM_PROVIDER_ACTOR_PATH,
ThingConstants.ENTITY_TYPE);
PERSISTENCE_STREAMING_ACTOR_PATHS.put(ConnectivityMessagingConstants.STREAM_PROVIDER_ACTOR_PATH,
ConnectivityConstants.ENTITY_TYPE);
}

private PersistenceIdSource() {
throw new AssertionError();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -977,7 +977,8 @@ private FSM.State<BaseClientState, BaseClientData> doOpenConnection(final BaseCl
sender.tell(new Status.Failure(error), getSelf());
return goToConnecting(connectingTimeout)
.using(data.setConnectionStatus(ConnectivityStatus.MISCONFIGURED)
.setConnectionStatusDetails(error.getMessage())
.setConnectionStatusDetails(
ConnectionFailure.determineFailureDescription(Instant.now(), error, null))
.resetSession());
}
}
Expand Down Expand Up @@ -1082,7 +1083,8 @@ private FSM.State<BaseClientState, BaseClientData> connectionTimedOut(final Base
} catch (final ConnectionFailedException e){
return goToConnecting(reconnectTimeoutStrategy.getNextTimeout())
.using(data.setConnectionStatus(ConnectivityStatus.MISCONFIGURED)
.setConnectionStatusDetails(e.getMessage())
.setConnectionStatusDetails(
ConnectionFailure.determineFailureDescription(Instant.now(), e, null))
.resetSession());
}
}
Expand Down Expand Up @@ -1128,7 +1130,8 @@ private State<BaseClientState, BaseClientData> connectAfterTunnelStarted(final C
} catch (final ConnectionFailedException e){
return goToConnecting(reconnectTimeoutStrategy.getNextTimeout())
.using(data.setConnectionStatus(ConnectivityStatus.MISCONFIGURED)
.setConnectionStatusDetails(e.getMessage())
.setConnectionStatusDetails(
ConnectionFailure.determineFailureDescription(Instant.now(), e, null))
.resetSession());
}
} else {
Expand Down Expand Up @@ -1674,11 +1677,11 @@ private Pair<ActorRef, ActorRef> startOutboundActors(final ConnectionContext con
throw dre;
}

final Connection connection = connectionContext.getConnection();
final int processorPoolSize = connection.getProcessorPoolSize();
final Connection theConnection = connectionContext.getConnection();
final int processorPoolSize = theConnection.getProcessorPoolSize();
logger.debug("Starting mapping processor actors with pool size of <{}>.", processorPoolSize);
final Props outboundMappingProcessorActorProps =
OutboundMappingProcessorActor.props(getSelf(), outboundMappingProcessor, connection, processorPoolSize);
OutboundMappingProcessorActor.props(getSelf(), outboundMappingProcessor, theConnection, processorPoolSize);

final ActorRef processorActor =
getContext().actorOf(outboundMappingProcessorActorProps, OutboundMappingProcessorActor.ACTOR_NAME);
Expand Down Expand Up @@ -1971,7 +1974,7 @@ private static void pipeConnectionContextToSelfAndRegisterForChanges(
connectionContextProvider.getConnectionContext(connection, dittoHeaders)
.thenCompose(context ->
connectionContextProvider.registerForConnectivityConfigChanges(context, self)
.<Object>thenApply(_void -> context)
.<Object>thenApply(theVoid -> context)
)
.exceptionally(throwable -> {
if (throwable instanceof RuntimeException) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -536,12 +536,10 @@ private FSM.State<BaseClientState, BaseClientData> handleConnectionFailure(
final BaseClientData currentData) {

final ConnectionFailure failure = statusReport.getFailure();
final String message = MessageFormat.format("Failure: {0}, Description: {1}",
failure.getFailure().cause(), failure.getFailureDescription());
connectionLogger.failure(message);
connectionLogger.failure(failure.getFailureDescription());
final ConnectivityStatus newStatus = connectivityStatusResolver.resolve(failure);
return stay().using(currentData.setConnectionStatus(newStatus)
.setConnectionStatusDetails(message));
.setConnectionStatusDetails(failure.getFailureDescription()));
}

private FSM.State<BaseClientState, BaseClientData> handleConsumerClosed(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,15 @@
*/
package org.eclipse.ditto.connectivity.service.messaging.internal;

import java.time.Instant;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;

import javax.annotation.Nullable;

import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.connectivity.model.ConnectivityStatus;

import akka.actor.ActorRef;
Expand All @@ -37,7 +42,7 @@ public interface ConnectionFailure extends WithOrigin {
*/
static ConnectionFailure of(@Nullable final ActorRef origin, @Nullable final Throwable cause,
@Nullable final String description) {
return new ImmutableConnectionFailure(origin, cause, description, null);
return new ImmutableConnectionFailure(origin, getRealCause(cause), description, null);
}

/**
Expand All @@ -50,7 +55,7 @@ static ConnectionFailure of(@Nullable final ActorRef origin, @Nullable final Thr
*/
static ConnectionFailure internal(@Nullable final ActorRef origin, @Nullable final Throwable cause,
@Nullable final String description) {
return new ImmutableConnectionFailure(origin, cause, description, ConnectivityStatus.FAILED);
return new ImmutableConnectionFailure(origin, getRealCause(cause), description, ConnectivityStatus.FAILED);
}


Expand All @@ -67,7 +72,43 @@ static ConnectionFailure internal(@Nullable final ActorRef origin, @Nullable fin
static ConnectionFailure userRelated(@Nullable final ActorRef origin,
@Nullable final Throwable cause,
@Nullable final String description) {
return new ImmutableConnectionFailure(origin, cause, description, ConnectivityStatus.MISCONFIGURED);
return new ImmutableConnectionFailure(origin, getRealCause(cause), description,
ConnectivityStatus.MISCONFIGURED);
}

/**
* Determines a nicely formatted failure description string based on the based in optional {@code cause}, an
* optional {@code description} and the {@code time}.
*
* @param time the time to include in the description.
* @param cause the optional cause to extract {@code message} and (if it was a {@code DittoRuntimeException}
* {@code description} from.
* @param description an optional additional description to include in the created failure description.
* @return the created nicely formatted failure description.
*/
static String determineFailureDescription(final Instant time,
@Nullable final Throwable cause,
@Nullable final String description) {
String responseStr = "";
if (cause != null) {
if (description != null) {
responseStr = description + " - cause ";
}
responseStr += String.format("<%s>: %s", cause.getClass().getSimpleName(), cause.getMessage());
if (cause instanceof DittoRuntimeException) {
if (!responseStr.endsWith(".")) {
responseStr += ".";
}
responseStr += ((DittoRuntimeException) cause).getDescription().map(d -> " " + d).orElse("");
}
} else {
responseStr = Objects.requireNonNullElse(description, "unknown failure");
}
if (!responseStr.endsWith(".")) {
responseStr += ".";
}
responseStr += " At " + time;
return responseStr;
}

/**
Expand All @@ -84,4 +125,17 @@ static ConnectionFailure userRelated(@Nullable final ActorRef origin,
* @return the Failure containing the cause.
*/
Status.Failure getFailure();

@Nullable
private static Throwable getRealCause(@Nullable final Throwable cause) {
final Throwable realCause;
if (cause instanceof CompletionException) {
realCause = cause.getCause();
} else if (cause instanceof ExecutionException) {
realCause = cause.getCause();
} else {
realCause = cause;
}
return realCause;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.connectivity.model.ConnectivityStatus;

import akka.actor.ActorRef;
Expand Down Expand Up @@ -52,22 +51,7 @@ public Status.Failure getFailure() {

@Override
public String getFailureDescription() {
String responseStr = "";
if (cause != null) {
if (description != null) {
responseStr = description + " - cause ";
}
responseStr += cause.getClass().getSimpleName() + ": " + cause.getMessage();
if (cause instanceof DittoRuntimeException) {
responseStr += " / " + ((DittoRuntimeException) cause).getDescription().orElse("");
}
} else if (description != null) {
responseStr = description;
} else {
responseStr = "unknown failure";
}
responseStr += " at " + time;
return responseStr;
return ConnectionFailure.determineFailureDescription(time, cause, description);
}

@Override
Expand Down
17 changes: 17 additions & 0 deletions connectivity/service/src/main/resources/connectivity.conf
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,27 @@ ditto {
connectivity {

user-indicated-errors = [
# Kafka
{exceptionName: "org.apache.kafka.common.errors.SaslAuthenticationException", messagePattern: ".*"},
{exceptionName: "org.apache.kafka.common.errors.UnsupportedSaslMechanismException", messagePattern: ".*"},
{exceptionName: "org.apache.kafka.common.errors.IllegalSaslStateException", messagePattern: ".*"},
{exceptionName: "org.apache.kafka.common.errors.SslAuthenticationException", messagePattern: ".*"}

# MQTT 3.1.1 - treat error codes from the broker as user indicated errors
{exceptionName: "com.hivemq.client.mqtt.mqtt3.exceptions.Mqtt3ConnAckException", messagePattern: ".*"}
{exceptionName: "com.hivemq.client.mqtt.mqtt3.exceptions.Mqtt3DisconnectException", messagePattern: ".*"}
{exceptionName: "com.hivemq.client.mqtt.mqtt3.exceptions.Mqtt3PubAckException", messagePattern: ".*"}
{exceptionName: "com.hivemq.client.mqtt.mqtt3.exceptions.Mqtt3PubRecException", messagePattern: ".*"}
{exceptionName: "com.hivemq.client.mqtt.mqtt3.exceptions.Mqtt3SubAckException", messagePattern: ".*"}
{exceptionName: "com.hivemq.client.mqtt.mqtt3.exceptions.Mqtt3UnsubAckException", messagePattern: ".*"}
# MQTT 5 - treat error codes from the broker as user indicated errors
{exceptionName: "com.hivemq.client.mqtt.mqtt5.exceptions.Mqtt5AuthException", messagePattern: ".*"}
{exceptionName: "com.hivemq.client.mqtt.mqtt5.exceptions.Mqtt5ConnAckException", messagePattern: ".*"}
{exceptionName: "com.hivemq.client.mqtt.mqtt5.exceptions.Mqtt5DisconnectException", messagePattern: ".*"}
{exceptionName: "com.hivemq.client.mqtt.mqtt5.exceptions.Mqtt5PubAckException", messagePattern: ".*"}
{exceptionName: "com.hivemq.client.mqtt.mqtt5.exceptions.Mqtt5PubRecException", messagePattern: ".*"}
{exceptionName: "com.hivemq.client.mqtt.mqtt5.exceptions.Mqtt5SubAckException", messagePattern: ".*"}
{exceptionName: "com.hivemq.client.mqtt.mqtt5.exceptions.Mqtt5UnsubAckException", messagePattern: ".*"}
]

connection {
Expand Down

0 comments on commit 6246dde

Please sign in to comment.