Skip to content

Commit

Permalink
Allow to define user indicated errors which can be resolved to a
Browse files Browse the repository at this point in the history
connectivity status by BaseClientActor

Signed-off-by: Yannic Klem <yannic.klem@bosch.io>
  • Loading branch information
Yannic92 committed Jul 26, 2021
1 parent 8b2d667 commit d4b109e
Show file tree
Hide file tree
Showing 19 changed files with 365 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ public abstract class BaseClientActor extends AbstractFSMWithStash<BaseClientSta
private final Materializer materializer;
protected final ConnectionLogger connectionLogger;
private final boolean dryRun;
private final ConnectivityStatusResolver connectivityStatusResolver;

private final ConnectionContextProvider connectionContextProvider;
protected ConnectionContext connectionContext;
Expand All @@ -212,13 +213,14 @@ public abstract class BaseClientActor extends AbstractFSMWithStash<BaseClientSta

protected BaseClientActor(final Connection connection, @Nullable final ActorRef proxyActor,
final ActorRef connectionActor, final DittoHeaders dittoHeaders) {
materializer = Materializer.createMaterializer(getContext().getSystem());
final ActorSystem system = getContext().getSystem();
materializer = Materializer.createMaterializer(system);
this.connection = checkNotNull(connection, "connection");
this.connectionActor = connectionActor;
// this is retrieve via the extension for each baseClientActor in order to not pass it as constructor arg
// as all constructor arguments need to be serializable as the BaseClientActor is started behind a cluster
// router
this.dittoProtocolSub = DittoProtocolSub.get(getContext().getSystem());
this.dittoProtocolSub = DittoProtocolSub.get(system);
actorUUID = UUID.randomUUID().toString();

final var connectionId = connection.getId();
Expand All @@ -228,12 +230,14 @@ protected BaseClientActor(final Connection connection, @Nullable final ActorRef
logger.info("Using default client ID <{}>", getDefaultClientId());

proxyActorSelection = getLocalActorOfSamePath(proxyActor);
connectionContextProvider = ConnectionContextProviderFactory.getInstance(getContext().getSystem());
connectionContextProvider = ConnectionContextProviderFactory.getInstance(system);

final ConnectivityConfig staticConnectivityConfig = ConnectivityConfig.forActorSystem(getContext().getSystem());
final UserIndicatedErrors userIndicatedErrors = UserIndicatedErrors.of(system.settings().config());
connectivityStatusResolver = ConnectivityStatusResolver.of(userIndicatedErrors);
final ConnectivityConfig staticConnectivityConfig = ConnectivityConfig.forActorSystem(system);
final ClientConfig staticClientConfig = staticConnectivityConfig.getClientConfig();
final var protocolAdapterProvider =
ProtocolAdapterProvider.load(staticConnectivityConfig.getProtocolConfig(), getContext().getSystem());
ProtocolAdapterProvider.load(staticConnectivityConfig.getProtocolConfig(), system);
protocolAdapter = protocolAdapterProvider.getProtocolAdapter(null);
connectionContext = DittoConnectionContext.of(connection, staticConnectivityConfig);

Expand Down Expand Up @@ -1326,7 +1330,7 @@ private State<BaseClientState, BaseClientData> backoffAfterFailure(final Connect
connectionLogger.failure(errorMessage, event.getFailureDescription());
logger.info("Connection failed: {}. Reconnect after {}.", event, nextBackoff);
return goToConnecting(nextBackoff).using(data.resetSession()
.setConnectionStatus(event.getStatus())
.setConnectionStatus(connectivityStatusResolver.resolve(event))
.setConnectionStatusDetails(event.getFailureDescription()));
}
} else {
Expand All @@ -1339,7 +1343,7 @@ private State<BaseClientState, BaseClientData> backoffAfterFailure(final Connect

// stay in UNKNOWN state until re-opened manually
return goTo(INITIALIZED).using(data.resetSession()
.setConnectionStatus(event.getStatus())
.setConnectionStatus(connectivityStatusResolver.resolve(event))
.setConnectionStatusDetails(event.getFailureDescription()
+ " Reached maximum retries and thus will not try to reconnect any longer."));
}
Expand All @@ -1348,7 +1352,7 @@ private State<BaseClientState, BaseClientData> backoffAfterFailure(final Connect
connectionLogger.failure("Connection failed due to: {0}.", event.getFailureDescription());
return goTo(INITIALIZED)
.using(data.resetSession()
.setConnectionStatus(event.getStatus())
.setConnectionStatus(connectivityStatusResolver.resolve(event))
.setConnectionStatusDetails(event.getFailureDescription())
);
}
Expand Down Expand Up @@ -1848,10 +1852,7 @@ private SupervisorStrategy createSupervisorStrategy(final ActorRef self) {
return (SupervisorStrategy.Directive) SupervisorStrategy.resume();
})
.matchAny(error -> {
self.tell(
ImmutableConnectionFailure.internal(getSender(), error, "exception in child"),
self
);
self.tell(ImmutableConnectionFailure.of(getSender(), error, "exception in child"), self);
if (getSender().equals(tunnelActor)) {
logger.debug("Restarting tunnel actor after failure: {}", error.getMessage());
return (SupervisorStrategy.Directive) SupervisorStrategy.restart();
Expand Down Expand Up @@ -2133,7 +2134,7 @@ public static InitializationResult success() {
}

public static InitializationResult failed(@Nullable final Throwable throwable) {
return new InitializationResult(ImmutableConnectionFailure.internal(null, throwable,
return new InitializationResult(ImmutableConnectionFailure.of(null, throwable,
"Exception during client actor initialization."));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,7 @@ protected static Charset getCharsetFromMessage(final ExternalMessage message) {
* @param description description of the failure.
*/
protected void escalate(final Throwable error, final String description) {
final ConnectionFailure failure = ImmutableConnectionFailure.internal(getSelf(), error, description);
final ConnectionFailure failure = ImmutableConnectionFailure.of(getSelf(), error, description);
getContext().getParent().tell(failure, getSelf());
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright (c) 2021 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.connectivity.service.messaging;

import org.eclipse.ditto.connectivity.model.ConnectivityStatus;
import org.eclipse.ditto.connectivity.service.messaging.internal.ConnectionFailure;

/**
* Resolves the correct {@link ConnectivityStatus} from a given {@link ConnectionFailure}.
*/
final class ConnectivityStatusResolver {

private final UserIndicatedErrors userIndicatedErrors;

private ConnectivityStatusResolver(final UserIndicatedErrors userIndicatedErrors) {
this.userIndicatedErrors = userIndicatedErrors;
}

/**
* Creates a new instance of {@link ConnectivityStatusResolver}.
*
* @param userIndicatedErrors the list of errors that should be treated as user indicated errors.
* @return the new status resolver.
*/
static ConnectivityStatusResolver of(final UserIndicatedErrors userIndicatedErrors) {
return new ConnectivityStatusResolver(userIndicatedErrors);
}

/**
* Resolves the correct {@link ConnectivityStatus} from a given {@link ConnectionFailure}.
*
* @param connectionFailure the failure.
* @return the resolved status.
*/
ConnectivityStatus resolve(final ConnectionFailure connectionFailure) {
return connectionFailure.getStatus()
.orElseGet(() -> userIndicatedErrors.matches(connectionFailure.getFailure().cause()) ?
ConnectivityStatus.MISCONFIGURED :
ConnectivityStatus.FAILED);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Copyright (c) 2021 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.connectivity.service.messaging;

import java.util.List;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import org.eclipse.ditto.internal.utils.config.DittoConfigError;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigException;

/**
* Allows to find out if a given {@link Throwable} matches this list.
*/
final class UserIndicatedErrors {

private static final String USER_INDICATED_ERRORS = "ditto.connectivity.user-indicated-errors";
private final Iterable<ErrorDefinition> errorDefinitions;

private UserIndicatedErrors(final Iterable<ErrorDefinition> errorDefinitions) {
this.errorDefinitions = errorDefinitions;
}

/**
* Reads the configured list from the hocon config and creates a new {@link UserIndicatedErrors} based on this config.
*
* @param config the hocon config holding the list of known user indicated errors.
* @return the new error list.
*/
static UserIndicatedErrors of(final Config config) {
final List<? extends Config> configList = getConfiguredListOrEmpty(config);
final List<ErrorDefinition> definitionList = configList.stream()
.map(Config.class::cast)
.map(ErrorDefinition::of)
.collect(Collectors.toList());
return new UserIndicatedErrors(definitionList);
}

private static List<? extends Config> getConfiguredListOrEmpty(final Config config) {
try {
return config.getConfigList(USER_INDICATED_ERRORS);
} catch (final ConfigException.Missing | ConfigException.WrongType e) {
return List.of();
}
}

/**
* @param throwable the throwable that should be checked.
* @return True if the throwable matches and {@link ErrorDefinition} contained in this list.
*/
boolean matches(final Throwable throwable) {
return StreamSupport.stream(errorDefinitions.spliterator(), true)
.anyMatch(definition -> definition.matches(throwable));
}

private static final class ErrorDefinition {

private static final String NAME = "exceptionName";
private static final String PATTERN = "messagePattern";

private final String exceptionName;
private final Pattern messagePattern;

private ErrorDefinition(final String exceptionName, final Pattern messagePattern) {
this.exceptionName = exceptionName;
this.messagePattern = messagePattern;
}

/**
* Creates a new {@link UserIndicatedErrors.ErrorDefinition} from a config which is expected to have {@link UserIndicatedErrors.ErrorDefinition#NAME}
* and {@link UserIndicatedErrors.ErrorDefinition#PATTERN} as key.
*
* @param config the config which is expected to have {@link UserIndicatedErrors.ErrorDefinition#NAME}
* and {@link UserIndicatedErrors.ErrorDefinition#PATTERN} as key.
* @return the new blame definition
*/
private static ErrorDefinition of(final Config config) {
try {
final var exceptionName = config.getString(NAME);
final var regex = config.getString(PATTERN);
final var pattern = Pattern.compile(regex);
return new ErrorDefinition(exceptionName, pattern);
} catch (final ConfigException.Missing | ConfigException.WrongType e) {
throw new DittoConfigError(e);
}
}

/**
* @param throwable the throwable that should be checked.
* @return True if the throwable matches this definition and false if not.
*/
boolean matches(final Throwable throwable) {
return exceptionName.equals(throwable.getClass().getName()) &&
messagePattern.matcher(String.valueOf(throwable.getMessage())).matches();
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 +602,7 @@ private FSM.State<BaseClientState, BaseClientData> handleSessionRecovered(

publisherReady.thenRun(() -> connectionLogger.success("Session has been recovered successfully."))
.exceptionally(t -> {
final ImmutableConnectionFailure failure = ImmutableConnectionFailure.internal(null, t,
final ImmutableConnectionFailure failure = ImmutableConnectionFailure.of(null, t,
"failed to recover session");
getSelf().tell(failure, getSelf());
return null;
Expand Down Expand Up @@ -832,7 +832,7 @@ public void onConnectionEstablished(final URI remoteURI) {
public void onConnectionFailure(final Throwable error) {
connectionLogger.failure("Connection failure: {0}", error.getMessage());
logger.warning("Connection Failure: {}", error.getMessage());
final ConnectionFailure failure = ImmutableConnectionFailure.internal(ActorRef.noSender(), error, null);
final ConnectionFailure failure = ImmutableConnectionFailure.of(ActorRef.noSender(), error, null);
self.tell(ConnectionFailureStatusReport.get(failure), ActorRef.noSender());
}

Expand Down Expand Up @@ -862,7 +862,7 @@ public void onSessionClosed(final Session session, final Throwable cause) {
connectionLogger.failure("Session was closed: {0}", cause.getMessage());
logger.warning("Session closed: {} - {}", session, cause.getMessage());
final ConnectionFailure failure =
ImmutableConnectionFailure.internal(ActorRef.noSender(), cause, "JMS Session closed");
ImmutableConnectionFailure.of(ActorRef.noSender(), cause, "JMS Session closed");
self.tell(SessionClosedStatusReport.get(failure, session), ActorRef.noSender());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ public void preStart() throws Exception {
initMessageConsumer();
} catch (final Exception e) {
final var failure =
ImmutableConnectionFailure.internal(getSelf(), e, "Failed to initialize message consumers.");
ImmutableConnectionFailure.of(getSelf(), e, "Failed to initialize message consumers.");
getContext().getParent().tell(failure, getSelf());
getContext().stop(getSelf());
}
Expand Down Expand Up @@ -296,7 +296,7 @@ private void messageConsumerCreated(final CreateMessageConsumerResponse response

private void messageConsumerFailed(final Status.Failure failure) {
// escalate to parent
final ConnectionFailure connectionFailed = ImmutableConnectionFailure.internal(getSelf(), failure.cause(),
final ConnectionFailure connectionFailed = ImmutableConnectionFailure.of(getSelf(), failure.cause(),
"Failed to recreate closed message consumer");
getContext().getParent().tell(connectionFailed, getSelf());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,16 +222,14 @@ private void handleStartProducer(final Object startProducer) {
// target producer not creatable; stop self and request restart by parent
final String errorMessage = "Failed to create target";
logger.error(jmsException, errorMessage);
final ConnectionFailure failure =
ImmutableConnectionFailure.internal(getSelf(), jmsException, errorMessage);
final ConnectionFailure failure = ImmutableConnectionFailure.of(getSelf(), jmsException, errorMessage);
final ActorContext context = getContext();
context.getParent().tell(failure, getSelf());
context.stop(getSelf());
} catch (final Exception e) {
logger.warning("Failed to create static target producers: {}", e.getMessage());
getContext().getParent()
.tell(ImmutableConnectionFailure.internal(null, e, "failed to initialize static producers"),
getSelf());
.tell(ImmutableConnectionFailure.of(null, e, "failed to initialize static producers"), getSelf());
throw e;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,17 +217,19 @@ private void handleRecoverSession(final AmqpClientActor.JmsRecoverSession recove
sender.tell(r, self);
log.debug("Session of connection <{}> recovered successfully.",
connectionContext.getConnection().getId());
} catch (final ConnectionFailedException | ConnectionUnauthorizedException e) {
sender.tell(ImmutableConnectionFailure.userRelated(origin, e, e.getMessage()),
self);
} catch (final ConnectionFailedException e) {
sender.tell(ImmutableConnectionFailure.of(origin, e, e.getMessage()), self);
log.warning(e.getMessage());
} catch (final ConnectionUnauthorizedException e) {
sender.tell(ImmutableConnectionFailure.userRelated(origin, e, e.getMessage()), self);
log.warning(e.getMessage());
} catch (final Exception e) {
sender.tell(ImmutableConnectionFailure.internal(origin, e, e.getMessage()), self);
sender.tell(ImmutableConnectionFailure.of(origin, e, e.getMessage()), self);
log.error("Unexpected error: {}", e.getMessage());
}
} else {
log.info("Recovering session failed, no connection available.");
sender.tell(ImmutableConnectionFailure.internal(origin, null,
sender.tell(ImmutableConnectionFailure.of(origin, null,
"Session recovery failed, no connection available."), self);
}
}
Expand Down Expand Up @@ -258,13 +260,13 @@ private void maybeConnectAndTell(final ActorRef sender, @Nullable final ActorRef
sender.tell(connectedMessage, self);
log.debug("Connection <{}> established successfully.", connectionContext.getConnection().getId());
} catch (final ConnectionFailedException e) {
sender.tell(ImmutableConnectionFailure.internal(origin, e, e.getMessage()), self);
sender.tell(ImmutableConnectionFailure.of(origin, e, e.getMessage()), self);
log.warning(e.getMessage());
} catch (final ConnectionUnauthorizedException e) {
sender.tell(ImmutableConnectionFailure.userRelated(origin, e, e.getMessage()), self);
log.warning(e.getMessage());
} catch (final Exception e) {
sender.tell(ImmutableConnectionFailure.internal(origin, e, e.getMessage()), self);
sender.tell(ImmutableConnectionFailure.of(origin, e, e.getMessage()), self);
log.error("Unexpected error: {}", e.getMessage());
}
}
Expand Down
Loading

0 comments on commit d4b109e

Please sign in to comment.