Skip to content

Commit

Permalink
Add new value "MISCONFIGURED" for ConnectivityStatus enum
Browse files Browse the repository at this point in the history
* This value should indicate a failure which is most likely caused by
  anything that is not in the responsibility of ditto.
  For example: Wrong authentication, downtime of the broker, ...

Signed-off-by: Yannic Klem <yannic.klem@bosch.io>
  • Loading branch information
Yannic92 committed Jul 26, 2021
1 parent 8a4df6f commit 01ce446
Show file tree
Hide file tree
Showing 17 changed files with 128 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@
import java.util.Arrays;
import java.util.Optional;

import org.eclipse.ditto.base.model.json.Jsonifiable;
import org.eclipse.ditto.json.JsonFactory;
import org.eclipse.ditto.json.JsonFieldDefinition;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.base.model.json.Jsonifiable;

/**
* An enumeration of status of connectivity resource.
Expand All @@ -42,6 +42,11 @@ public enum ConnectivityStatus implements CharSequence, Jsonifiable<JsonObject>
*/
FAILED("failed"),

/**
* Indicates a failed {@code Connection} due to wrong configuration.
*/
MISCONFIGURED("misconfigured"),

/**
* Indicates an unknown status.
*/
Expand Down Expand Up @@ -123,4 +128,8 @@ public String toString() {
return name;
}

public boolean isFailure() {
return this == FAILED || this == MISCONFIGURED;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,7 @@ private FSM.State<BaseClientState, BaseClientData> failConnectionDueToPubSubExce
"publish/subscribe infrastructure. Failing the connection to try again later.";
getSelf().tell(
// not setting "cause" to put the description literally in the error log
new ImmutableConnectionFailure(null, exception.asDittoRuntimeException(), description),
ImmutableConnectionFailure.internal(null, exception.asDittoRuntimeException(), description),
getSelf());
}
return stay();
Expand Down Expand Up @@ -972,7 +972,7 @@ private FSM.State<BaseClientState, BaseClientData> doOpenConnection(final BaseCl
final DittoRuntimeException error = newConnectionFailedException(dittoHeaders);
sender.tell(new Status.Failure(error), getSelf());
return goToConnecting(connectingTimeout)
.using(data.setConnectionStatus(ConnectivityStatus.FAILED)
.using(data.setConnectionStatus(ConnectivityStatus.MISCONFIGURED)
.setConnectionStatusDetails(error.getMessage())
.resetSession());
}
Expand Down Expand Up @@ -1139,7 +1139,7 @@ private State<BaseClientState, BaseClientData> tunnelClosed(final SshTunnelActor
final BaseClientData data) {
logger.info("SSH tunnel closed: {}", tunnelClosed.getMessage());
final var failure =
new ImmutableConnectionFailure(null, tunnelClosed.getError(), tunnelClosed.getMessage());
ImmutableConnectionFailure.userRelated(null, tunnelClosed.getError(), tunnelClosed.getMessage());
getSelf().tell(failure, getSelf());
final SshTunnelState closedState = data.getSshTunnelState().failed(tunnelClosed.getError());
return stay().using(data.setSshTunnelState(closedState));
Expand Down Expand Up @@ -1314,7 +1314,7 @@ private State<BaseClientState, BaseClientData> backoffAfterFailure(final Connect
dittoProtocolSub.removeSubscriber(getSelf());
if (ConnectivityStatus.OPEN.equals(data.getDesiredConnectionStatus())) {
if (reconnectTimeoutStrategy.canReconnect()) {
if (ConnectivityStatus.FAILED.equals(data.getConnectionStatus())) {
if (data.getConnectionStatus().isFailure()) {
connectionLogger.failure("Connection failed due to: {0}. Reconnect was already triggered.",
event.getFailureDescription());
logger.info("Connection failed: {}. Reconnect was already triggered.", event);
Expand All @@ -1326,7 +1326,7 @@ private State<BaseClientState, BaseClientData> backoffAfterFailure(final Connect
connectionLogger.failure(errorMessage, event.getFailureDescription());
logger.info("Connection failed: {}. Reconnect after {}.", event, nextBackoff);
return goToConnecting(nextBackoff).using(data.resetSession()
.setConnectionStatus(ConnectivityStatus.FAILED)
.setConnectionStatus(event.getStatus())
.setConnectionStatusDetails(event.getFailureDescription()));
}
} else {
Expand All @@ -1339,7 +1339,7 @@ private State<BaseClientState, BaseClientData> backoffAfterFailure(final Connect

// stay in UNKNOWN state until re-opened manually
return goTo(INITIALIZED).using(data.resetSession()
.setConnectionStatus(ConnectivityStatus.FAILED)
.setConnectionStatus(event.getStatus())
.setConnectionStatusDetails(event.getFailureDescription()
+ " Reached maximum retries and thus will not try to reconnect any longer."));
}
Expand All @@ -1348,7 +1348,7 @@ private State<BaseClientState, BaseClientData> backoffAfterFailure(final Connect
connectionLogger.failure("Connection failed due to: {0}.", event.getFailureDescription());
return goTo(INITIALIZED)
.using(data.resetSession()
.setConnectionStatus(ConnectivityStatus.FAILED)
.setConnectionStatus(event.getStatus())
.setConnectionStatusDetails(event.getFailureDescription())
);
}
Expand Down Expand Up @@ -1848,7 +1848,10 @@ private SupervisorStrategy createSupervisorStrategy(final ActorRef self) {
return (SupervisorStrategy.Directive) SupervisorStrategy.resume();
})
.matchAny(error -> {
self.tell(new ImmutableConnectionFailure(getSender(), error, "exception in child"), self);
self.tell(
ImmutableConnectionFailure.internal(getSender(), error, "exception in child"),
self
);
if (getSender().equals(tunnelActor)) {
logger.debug("Restarting tunnel actor after failure: {}", error.getMessage());
return (SupervisorStrategy.Directive) SupervisorStrategy.restart();
Expand Down Expand Up @@ -2130,7 +2133,7 @@ public static InitializationResult success() {
}

public static InitializationResult failed(@Nullable final Throwable throwable) {
return new InitializationResult(new ImmutableConnectionFailure(null, throwable,
return new InitializationResult(ImmutableConnectionFailure.internal(null, throwable,
"Exception during client actor initialization."));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,7 @@ protected static Charset getCharsetFromMessage(final ExternalMessage message) {
* @param description description of the failure.
*/
protected void escalate(final Throwable error, final String description) {
final ConnectionFailure failure = new ImmutableConnectionFailure(getSelf(), error, description);
final ConnectionFailure failure = ImmutableConnectionFailure.internal(getSelf(), error, description);
getContext().getParent().tell(failure, getSelf());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,17 +296,18 @@ protected OutboundSignalWithSender mapMessage(final OutboundSignal message) {
protected Sink<OutboundSignalWithSender, ?> createSink() {
// Enrich outbound signals by extra fields if necessary.
// Targets attached to the OutboundSignal are pre-selected by authorization, topic and filter sans enrichment.
final Flow<OutboundSignalWithSender, OutboundSignal.MultiMapped, ?> flow = Flow.<OutboundSignalWithSender>create()
.mapAsync(processorPoolSize, outbound -> toMultiMappedOutboundSignal(
outbound,
Source.single(outbound)
.via(splitByTargetExtraFieldsFlow())
.mapAsync(mappingConfig.getParallelism(), this::enrichAndFilterSignal)
.mapConcat(x -> x)
.map(this::handleOutboundSignal)
.flatMapConcat(x -> x)
))
.mapConcat(x -> x);
final Flow<OutboundSignalWithSender, OutboundSignal.MultiMapped, ?> flow =
Flow.<OutboundSignalWithSender>create()
.mapAsync(processorPoolSize, outbound -> toMultiMappedOutboundSignal(
outbound,
Source.single(outbound)
.via(splitByTargetExtraFieldsFlow())
.mapAsync(mappingConfig.getParallelism(), this::enrichAndFilterSignal)
.mapConcat(x -> x)
.map(this::handleOutboundSignal)
.flatMapConcat(x -> x)
))
.mapConcat(x -> x);
return flow.to(Sink.foreach(this::forwardToPublisherActor));
}

Expand Down Expand Up @@ -381,11 +382,11 @@ private CompletionStage<Collection<OutboundSignalWithSender>> enrichAndFilterSig
.filter(ThingId.class::isInstance)
.map(ThingId.class::cast)
.map(thingId ->
signalEnrichmentFacade.retrievePartialThing(
thingId,
extraFields,
headers,
outboundSignal.getSource())
signalEnrichmentFacade.retrievePartialThing(
thingId,
extraFields,
headers,
outboundSignal.getSource())
)
.map(partialThingCompletionStage -> partialThingCompletionStage.thenApply(outboundSignal::setExtra))
.orElse(CompletableFuture.completedStage(outboundSignal))
Expand Down Expand Up @@ -420,15 +421,17 @@ private OutboundSignalWithSender recoverFromEnrichmentError(final OutboundSignal
// show enrichment failure in service logs according to severity
if (dittoRuntimeException instanceof ThingNotAccessibleException) {
// This error should be rare but possible due to user action; log on INFO level
dittoLoggingAdapter.withCorrelationId(outboundSignal.getSource()).info("Enrichment of <{}> failed due to <{}>.",
outboundSignal.getSource().getClass(), dittoRuntimeException);
dittoLoggingAdapter.withCorrelationId(outboundSignal.getSource())
.info("Enrichment of <{}> failed due to <{}>.",
outboundSignal.getSource().getClass(), dittoRuntimeException);
} else {
// This error should not have happened during normal operation.
// There is a (possibly transient) problem with the Ditto cluster. Request parent to restart.
dittoLoggingAdapter.withCorrelationId(outboundSignal.getSource())
.error(dittoRuntimeException, "Enrichment of <{}> failed due to <{}>.", outboundSignal, dittoRuntimeException);
.error(dittoRuntimeException, "Enrichment of <{}> failed due to <{}>.", outboundSignal,
dittoRuntimeException);
final ConnectionFailure connectionFailure =
new ImmutableConnectionFailure(getSelf(), dittoRuntimeException, "Signal enrichment failed");
ImmutableConnectionFailure.internal(getSelf(), dittoRuntimeException, "Signal enrichment failed");
clientActor.tell(connectionFailure, getSelf());
}
return outboundSignal.setTargets(Collections.singletonList(target));
Expand Down Expand Up @@ -463,8 +466,9 @@ private Object handleErrorResponse(final DittoRuntimeException exception, final
private Object handleCommandResponse(final CommandResponse<?> response,
@Nullable final DittoRuntimeException exception, final ActorRef sender) {

final ThreadSafeDittoLoggingAdapter l = dittoLoggingAdapter.isDebugEnabled() ? dittoLoggingAdapter.withCorrelationId(response) :
dittoLoggingAdapter;
final ThreadSafeDittoLoggingAdapter l =
dittoLoggingAdapter.isDebugEnabled() ? dittoLoggingAdapter.withCorrelationId(response) :
dittoLoggingAdapter;
recordResponse(response, exception);
if (!response.isOfExpectedResponseType()) {
l.debug("Requester did not require response (via DittoHeader '{}') - not mapping back to ExternalMessage.",
Expand Down Expand Up @@ -501,7 +505,8 @@ private void recordResponse(final CommandResponse<?> response, @Nullable final D
}

private void forwardToPublisherActor(final OutboundSignal.MultiMapped mappedEnvelop) {
clientActor.tell(new BaseClientActor.PublishMappedMessage(mappedEnvelop), mappedEnvelop.getSender().orElse(null));
clientActor.tell(new BaseClientActor.PublishMappedMessage(mappedEnvelop),
mappedEnvelop.getSender().orElse(null));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 +602,7 @@ private FSM.State<BaseClientState, BaseClientData> handleSessionRecovered(

publisherReady.thenRun(() -> connectionLogger.success("Session has been recovered successfully."))
.exceptionally(t -> {
final ImmutableConnectionFailure failure = new ImmutableConnectionFailure(null, t,
final ImmutableConnectionFailure failure = ImmutableConnectionFailure.internal(null, t,
"failed to recover session");
getSelf().tell(failure, getSelf());
return null;
Expand Down Expand Up @@ -832,8 +832,7 @@ public void onConnectionEstablished(final URI remoteURI) {
public void onConnectionFailure(final Throwable error) {
connectionLogger.failure("Connection failure: {0}", error.getMessage());
logger.warning("Connection Failure: {}", error.getMessage());
final ConnectionFailure failure =
new ImmutableConnectionFailure(ActorRef.noSender(), error, null);
final ConnectionFailure failure = ImmutableConnectionFailure.internal(ActorRef.noSender(), error, null);
self.tell(ConnectionFailureStatusReport.get(failure), ActorRef.noSender());
}

Expand All @@ -842,7 +841,7 @@ public void onConnectionInterrupted(final URI remoteURI) {
connectionLogger.failure("Connection was interrupted.");
logger.warning("Connection interrupted: {}", remoteURI);
final ConnectionFailure failure =
new ImmutableConnectionFailure(ActorRef.noSender(), null, "JMS Interrupted");
ImmutableConnectionFailure.userRelated(ActorRef.noSender(), null, "JMS Interrupted");
self.tell(ConnectionFailureStatusReport.get(failure), ActorRef.noSender());
}

Expand All @@ -863,7 +862,7 @@ public void onSessionClosed(final Session session, final Throwable cause) {
connectionLogger.failure("Session was closed: {0}", cause.getMessage());
logger.warning("Session closed: {} - {}", session, cause.getMessage());
final ConnectionFailure failure =
new ImmutableConnectionFailure(ActorRef.noSender(), cause, "JMS Session closed");
ImmutableConnectionFailure.internal(ActorRef.noSender(), cause, "JMS Session closed");
self.tell(SessionClosedStatusReport.get(failure, session), ActorRef.noSender());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@
import org.eclipse.ditto.connectivity.service.config.ConnectivityConfig;
import org.eclipse.ditto.connectivity.service.config.ConnectivityConfigModifiedBehavior;
import org.eclipse.ditto.connectivity.service.mapping.ConnectionContext;
import org.eclipse.ditto.connectivity.service.messaging.BaseConsumerActor;
import org.eclipse.ditto.connectivity.service.messaging.LegacyBaseConsumerActor;
import org.eclipse.ditto.connectivity.service.messaging.amqp.status.ConsumerClosedStatusReport;
import org.eclipse.ditto.connectivity.service.messaging.internal.ConnectionFailure;
Expand All @@ -70,7 +69,6 @@
import org.eclipse.ditto.internal.utils.akka.logging.ThreadSafeDittoLoggingAdapter;
import org.eclipse.ditto.internal.utils.config.InstanceIdentifierSupplier;

import akka.NotUsed;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.Status;
Expand Down Expand Up @@ -181,8 +179,8 @@ public void preStart() throws Exception {
try {
initMessageConsumer();
} catch (final Exception e) {
final var failure = new ImmutableConnectionFailure(getSelf(), e,
"Failed to initialize message consumers.");
final var failure =
ImmutableConnectionFailure.internal(getSelf(), e, "Failed to initialize message consumers.");
getContext().getParent().tell(failure, getSelf());
getContext().stop(getSelf());
}
Expand Down Expand Up @@ -298,7 +296,7 @@ private void messageConsumerCreated(final CreateMessageConsumerResponse response

private void messageConsumerFailed(final Status.Failure failure) {
// escalate to parent
final ConnectionFailure connectionFailed = new ImmutableConnectionFailure(getSelf(), failure.cause(),
final ConnectionFailure connectionFailed = ImmutableConnectionFailure.internal(getSelf(), failure.cause(),
"Failed to recreate closed message consumer");
getContext().getParent().tell(connectionFailed, getSelf());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,14 +222,16 @@ private void handleStartProducer(final Object startProducer) {
// target producer not creatable; stop self and request restart by parent
final String errorMessage = "Failed to create target";
logger.error(jmsException, errorMessage);
final ConnectionFailure failure = new ImmutableConnectionFailure(getSelf(), jmsException, errorMessage);
final ConnectionFailure failure =
ImmutableConnectionFailure.internal(getSelf(), jmsException, errorMessage);
final ActorContext context = getContext();
context.getParent().tell(failure, getSelf());
context.stop(getSelf());
} catch (final Exception e) {
logger.warning("Failed to create static target producers: {}", e.getMessage());
getContext().getParent().tell(new ImmutableConnectionFailure(null, e,
"failed to initialize static producers"), getSelf());
getContext().getParent()
.tell(ImmutableConnectionFailure.internal(null, e, "failed to initialize static producers"),
getSelf());
throw e;
}
}
Expand Down
Loading

0 comments on commit 01ce446

Please sign in to comment.