Skip to content

Commit

Permalink
review: made ImmutableConnectionFailurand ImmutableClientDisconnected…
Browse files Browse the repository at this point in the history
… package private, pulled static creators to interfaces

* disabled parallel stream processing in UserIndicatedErrors and made the underlying structure a List instead of an Iterable
* stabilized KafkaClientActorTest

Signed-off-by: Thomas Jaeckle <thomas.jaeckle@bosch.io>
  • Loading branch information
thjaeckle committed Jul 29, 2021
1 parent c921a93 commit 65883ef
Show file tree
Hide file tree
Showing 22 changed files with 139 additions and 116 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@
import org.eclipse.ditto.connectivity.service.messaging.internal.ClientConnected;
import org.eclipse.ditto.connectivity.service.messaging.internal.ClientDisconnected;
import org.eclipse.ditto.connectivity.service.messaging.internal.ConnectionFailure;
import org.eclipse.ditto.connectivity.service.messaging.internal.ImmutableConnectionFailure;
import org.eclipse.ditto.connectivity.service.messaging.internal.RetrieveAddressStatus;
import org.eclipse.ditto.connectivity.service.messaging.monitoring.logs.ConnectionLogger;
import org.eclipse.ditto.connectivity.service.messaging.monitoring.logs.ConnectionLoggerRegistry;
Expand Down Expand Up @@ -597,7 +596,7 @@ private FSM.State<BaseClientState, BaseClientData> failConnectionDueToPubSubExce
"publish/subscribe infrastructure. Failing the connection to try again later.";
getSelf().tell(
// not setting "cause" to put the description literally in the error log
ImmutableConnectionFailure.internal(null, exception.asDittoRuntimeException(), description),
ConnectionFailure.internal(null, exception.asDittoRuntimeException(), description),
getSelf());
}
return stay();
Expand Down Expand Up @@ -1143,7 +1142,7 @@ private State<BaseClientState, BaseClientData> tunnelClosed(final SshTunnelActor
final BaseClientData data) {
logger.info("SSH tunnel closed: {}", tunnelClosed.getMessage());
final var failure =
ImmutableConnectionFailure.userRelated(null, tunnelClosed.getError(), tunnelClosed.getMessage());
ConnectionFailure.userRelated(null, tunnelClosed.getError(), tunnelClosed.getMessage());
getSelf().tell(failure, getSelf());
final SshTunnelState closedState = data.getSshTunnelState().failed(tunnelClosed.getError());
return stay().using(data.setSshTunnelState(closedState));
Expand Down Expand Up @@ -1852,7 +1851,7 @@ private SupervisorStrategy createSupervisorStrategy(final ActorRef self) {
return (SupervisorStrategy.Directive) SupervisorStrategy.resume();
})
.matchAny(error -> {
self.tell(ImmutableConnectionFailure.of(getSender(), error, "exception in child"), self);
self.tell(ConnectionFailure.of(getSender(), error, "exception in child"), self);
if (getSender().equals(tunnelActor)) {
logger.debug("Restarting tunnel actor after failure: {}", error.getMessage());
return (SupervisorStrategy.Directive) SupervisorStrategy.restart();
Expand Down Expand Up @@ -2134,7 +2133,7 @@ public static InitializationResult success() {
}

public static InitializationResult failed(@Nullable final Throwable throwable) {
return new InitializationResult(ImmutableConnectionFailure.of(null, throwable,
return new InitializationResult(ConnectionFailure.of(null, throwable,
"Exception during client actor initialization."));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@
import org.eclipse.ditto.connectivity.service.config.MonitoringConfig;
import org.eclipse.ditto.connectivity.service.config.MonitoringLoggerConfig;
import org.eclipse.ditto.connectivity.service.messaging.internal.ConnectionFailure;
import org.eclipse.ditto.connectivity.service.messaging.internal.ImmutableConnectionFailure;
import org.eclipse.ditto.connectivity.service.messaging.internal.RetrieveAddressStatus;
import org.eclipse.ditto.connectivity.service.messaging.monitoring.ConnectionMonitor;
import org.eclipse.ditto.connectivity.service.messaging.monitoring.ConnectionMonitorRegistry;
Expand Down Expand Up @@ -563,7 +562,7 @@ protected static Charset getCharsetFromMessage(final ExternalMessage message) {
* @param description description of the failure.
*/
protected void escalate(final Throwable error, final String description) {
final ConnectionFailure failure = ImmutableConnectionFailure.of(getSelf(), error, description);
final ConnectionFailure failure = ConnectionFailure.of(getSelf(), error, description);
getContext().getParent().tell(failure, getSelf());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@
import org.eclipse.ditto.connectivity.service.config.mapping.MappingConfig;
import org.eclipse.ditto.connectivity.service.mapping.ConnectivitySignalEnrichmentProvider;
import org.eclipse.ditto.connectivity.service.messaging.internal.ConnectionFailure;
import org.eclipse.ditto.connectivity.service.messaging.internal.ImmutableConnectionFailure;
import org.eclipse.ditto.connectivity.service.messaging.mappingoutcome.MappingOutcome;
import org.eclipse.ditto.connectivity.service.messaging.monitoring.ConnectionMonitor;
import org.eclipse.ditto.connectivity.service.messaging.monitoring.DefaultConnectionMonitorRegistry;
Expand Down Expand Up @@ -431,7 +430,7 @@ private OutboundSignalWithSender recoverFromEnrichmentError(final OutboundSignal
.error(dittoRuntimeException, "Enrichment of <{}> failed due to <{}>.", outboundSignal,
dittoRuntimeException);
final ConnectionFailure connectionFailure =
ImmutableConnectionFailure.internal(getSelf(), dittoRuntimeException, "Signal enrichment failed");
ConnectionFailure.internal(getSelf(), dittoRuntimeException, "Signal enrichment failed");
clientActor.tell(connectionFailure, getSelf());
}
return outboundSignal.setTargets(Collections.singletonList(target));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import java.util.List;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import javax.annotation.Nullable;

Expand All @@ -25,14 +24,14 @@
import com.typesafe.config.ConfigException;

/**
* Allows to find out if a given {@link Throwable} matches this list.
* Allows finding out if a given {@link Throwable} matches the configured list of user indicated errors.
*/
final class UserIndicatedErrors {

private static final String USER_INDICATED_ERRORS = "ditto.connectivity.user-indicated-errors";
private final Iterable<ErrorDefinition> errorDefinitions;
private final List<ErrorDefinition> errorDefinitions;

private UserIndicatedErrors(final Iterable<ErrorDefinition> errorDefinitions) {
private UserIndicatedErrors(final List<ErrorDefinition> errorDefinitions) {
this.errorDefinitions = errorDefinitions;
}

Expand Down Expand Up @@ -60,14 +59,17 @@ private static List<? extends Config> getConfiguredListOrEmpty(final Config conf
}

/**
* Checks whether the passed {@code throwable} matches against any of the configured error definitions indicating
* that the Throwable is an error indicated by a user and not an internal one.
*
* @param throwable the throwable that should be checked.
* @return True if the throwable matches and {@link ErrorDefinition} contained in this list.
*/
boolean matches(@Nullable final Throwable throwable) {
if (throwable == null) {
return false;
}
return StreamSupport.stream(errorDefinitions.spliterator(), true)
return errorDefinitions.stream()
.anyMatch(definition -> definition.matches(throwable));
}

Expand All @@ -90,7 +92,7 @@ private ErrorDefinition(final String exceptionName, final Pattern messagePattern
*
* @param config the config which is expected to have {@link UserIndicatedErrors.ErrorDefinition#NAME}
* and {@link UserIndicatedErrors.ErrorDefinition#PATTERN} as key.
* @return the new blame definition
* @return the new error definition
*/
private static ErrorDefinition of(final Config config) {
try {
Expand All @@ -104,8 +106,11 @@ private static ErrorDefinition of(final Config config) {
}

/**
* Matches the passed {@code throwable}'s class name and message against the configured ones of this instace
* and returns {@code true} if they match.
*
* @param throwable the throwable that should be checked.
* @return True if the throwable matches this definition and false if not.
* @return {@code true} if the throwable matches this definition and false if not.
*/
boolean matches(final Throwable throwable) {
return exceptionName.equals(throwable.getClass().getName()) &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
import org.eclipse.ditto.connectivity.service.messaging.internal.ConnectClient;
import org.eclipse.ditto.connectivity.service.messaging.internal.ConnectionFailure;
import org.eclipse.ditto.connectivity.service.messaging.internal.DisconnectClient;
import org.eclipse.ditto.connectivity.service.messaging.internal.ImmutableConnectionFailure;
import org.eclipse.ditto.connectivity.service.messaging.internal.RecoverSession;
import org.eclipse.ditto.connectivity.service.messaging.monitoring.logs.ConnectionLogger;
import org.eclipse.ditto.connectivity.service.messaging.tunnel.SshTunnelState;
Expand Down Expand Up @@ -603,8 +602,7 @@ private FSM.State<BaseClientState, BaseClientData> handleSessionRecovered(

publisherReady.thenRun(() -> connectionLogger.success("Session has been recovered successfully."))
.exceptionally(t -> {
final ImmutableConnectionFailure failure = ImmutableConnectionFailure.of(null, t,
"failed to recover session");
final ConnectionFailure failure = ConnectionFailure.of(null, t, "failed to recover session");
getSelf().tell(failure, getSelf());
return null;
});
Expand Down Expand Up @@ -833,7 +831,7 @@ public void onConnectionEstablished(final URI remoteURI) {
public void onConnectionFailure(final Throwable error) {
connectionLogger.failure("Connection failure: {0}", error.getMessage());
logger.warning("Connection Failure: {}", error.getMessage());
final ConnectionFailure failure = ImmutableConnectionFailure.of(ActorRef.noSender(), error, null);
final ConnectionFailure failure = ConnectionFailure.of(ActorRef.noSender(), error, null);
self.tell(ConnectionFailureStatusReport.get(failure), ActorRef.noSender());
}

Expand All @@ -842,7 +840,7 @@ public void onConnectionInterrupted(final URI remoteURI) {
connectionLogger.failure("Connection was interrupted.");
logger.warning("Connection interrupted: {}", remoteURI);
final ConnectionFailure failure =
ImmutableConnectionFailure.userRelated(ActorRef.noSender(), null, "JMS Interrupted");
ConnectionFailure.userRelated(ActorRef.noSender(), null, "JMS Interrupted");
self.tell(ConnectionFailureStatusReport.get(failure), ActorRef.noSender());
}

Expand All @@ -863,7 +861,7 @@ public void onSessionClosed(final Session session, final Throwable cause) {
connectionLogger.failure("Session was closed: {0}", cause.getMessage());
logger.warning("Session closed: {} - {}", session, cause.getMessage());
final ConnectionFailure failure =
ImmutableConnectionFailure.of(ActorRef.noSender(), cause, "JMS Session closed");
ConnectionFailure.of(ActorRef.noSender(), cause, "JMS Session closed");
self.tell(SessionClosedStatusReport.get(failure, session), ActorRef.noSender());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
import org.eclipse.ditto.connectivity.service.messaging.LegacyBaseConsumerActor;
import org.eclipse.ditto.connectivity.service.messaging.amqp.status.ConsumerClosedStatusReport;
import org.eclipse.ditto.connectivity.service.messaging.internal.ConnectionFailure;
import org.eclipse.ditto.connectivity.service.messaging.internal.ImmutableConnectionFailure;
import org.eclipse.ditto.connectivity.service.messaging.internal.RetrieveAddressStatus;
import org.eclipse.ditto.connectivity.service.messaging.monitoring.logs.InfoProviderFactory;
import org.eclipse.ditto.connectivity.service.util.ConnectivityMdcEntryKey;
Expand Down Expand Up @@ -180,7 +179,7 @@ public void preStart() throws Exception {
initMessageConsumer();
} catch (final Exception e) {
final var failure =
ImmutableConnectionFailure.of(getSelf(), e, "Failed to initialize message consumers.");
ConnectionFailure.of(getSelf(), e, "Failed to initialize message consumers.");
getContext().getParent().tell(failure, getSelf());
getContext().stop(getSelf());
}
Expand Down Expand Up @@ -296,7 +295,7 @@ private void messageConsumerCreated(final CreateMessageConsumerResponse response

private void messageConsumerFailed(final Status.Failure failure) {
// escalate to parent
final ConnectionFailure connectionFailed = ImmutableConnectionFailure.of(getSelf(), failure.cause(),
final ConnectionFailure connectionFailed = ConnectionFailure.of(getSelf(), failure.cause(),
"Failed to recreate closed message consumer");
getContext().getParent().tell(connectionFailed, getSelf());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@
import org.eclipse.ditto.connectivity.service.messaging.amqp.status.ProducerClosedStatusReport;
import org.eclipse.ditto.connectivity.service.messaging.backoff.BackOffActor;
import org.eclipse.ditto.connectivity.service.messaging.internal.ConnectionFailure;
import org.eclipse.ditto.connectivity.service.messaging.internal.ImmutableConnectionFailure;
import org.eclipse.ditto.internal.utils.akka.logging.ThreadSafeDittoLoggingAdapter;

import akka.Done;
Expand Down Expand Up @@ -222,14 +221,14 @@ private void handleStartProducer(final Object startProducer) {
// target producer not creatable; stop self and request restart by parent
final String errorMessage = "Failed to create target";
logger.error(jmsException, errorMessage);
final ConnectionFailure failure = ImmutableConnectionFailure.of(getSelf(), jmsException, errorMessage);
final ConnectionFailure failure = ConnectionFailure.of(getSelf(), jmsException, errorMessage);
final ActorContext context = getContext();
context.getParent().tell(failure, getSelf());
context.stop(getSelf());
} catch (final Exception e) {
logger.warning("Failed to create static target producers: {}", e.getMessage());
getContext().getParent()
.tell(ImmutableConnectionFailure.of(null, e, "failed to initialize static producers"), getSelf());
.tell(ConnectionFailure.of(null, e, "failed to initialize static producers"), getSelf());
throw e;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@
import org.eclipse.ditto.connectivity.model.signals.commands.exceptions.ConnectionFailedException;
import org.eclipse.ditto.connectivity.model.signals.commands.exceptions.ConnectionUnauthorizedException;
import org.eclipse.ditto.connectivity.service.mapping.ConnectionContext;
import org.eclipse.ditto.connectivity.service.messaging.internal.ImmutableClientDisconnected;
import org.eclipse.ditto.connectivity.service.messaging.internal.ImmutableConnectionFailure;
import org.eclipse.ditto.connectivity.service.messaging.internal.ClientDisconnected;
import org.eclipse.ditto.connectivity.service.messaging.internal.ConnectionFailure;
import org.eclipse.ditto.connectivity.service.messaging.monitoring.logs.ConnectionLogger;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;

Expand Down Expand Up @@ -218,18 +218,18 @@ private void handleRecoverSession(final AmqpClientActor.JmsRecoverSession recove
log.debug("Session of connection <{}> recovered successfully.",
connectionContext.getConnection().getId());
} catch (final ConnectionFailedException e) {
sender.tell(ImmutableConnectionFailure.of(origin, e, e.getMessage()), self);
sender.tell(ConnectionFailure.of(origin, e, e.getMessage()), self);
log.warning(e.getMessage());
} catch (final ConnectionUnauthorizedException e) {
sender.tell(ImmutableConnectionFailure.userRelated(origin, e, e.getMessage()), self);
sender.tell(ConnectionFailure.userRelated(origin, e, e.getMessage()), self);
log.warning(e.getMessage());
} catch (final Exception e) {
sender.tell(ImmutableConnectionFailure.of(origin, e, e.getMessage()), self);
sender.tell(ConnectionFailure.of(origin, e, e.getMessage()), self);
log.error("Unexpected error: {}", e.getMessage());
}
} else {
log.info("Recovering session failed, no connection available.");
sender.tell(ImmutableConnectionFailure.of(origin, null,
sender.tell(ConnectionFailure.of(origin, null,
"Session recovery failed, no connection available."), self);
}
}
Expand All @@ -244,7 +244,7 @@ private void handleDisconnect(final AmqpClientActor.JmsDisconnect disconnect) {
disconnectAndTell(connectionOpt.get(), disconnect.getOrigin().orElse(null),
disconnect.isShutdownAfterDisconnect());
} else {
final Object answer = new ImmutableClientDisconnected(disconnect.getOrigin().orElse(null),
final Object answer = ClientDisconnected.of(disconnect.getOrigin().orElse(null),
disconnect.isShutdownAfterDisconnect());
getSender().tell(answer, getSelf());
}
Expand All @@ -260,13 +260,13 @@ private void maybeConnectAndTell(final ActorRef sender, @Nullable final ActorRef
sender.tell(connectedMessage, self);
log.debug("Connection <{}> established successfully.", connectionContext.getConnection().getId());
} catch (final ConnectionFailedException e) {
sender.tell(ImmutableConnectionFailure.of(origin, e, e.getMessage()), self);
sender.tell(ConnectionFailure.of(origin, e, e.getMessage()), self);
log.warning(e.getMessage());
} catch (final ConnectionUnauthorizedException e) {
sender.tell(ImmutableConnectionFailure.userRelated(origin, e, e.getMessage()), self);
sender.tell(ConnectionFailure.userRelated(origin, e, e.getMessage()), self);
log.warning(e.getMessage());
} catch (final Exception e) {
sender.tell(ImmutableConnectionFailure.of(origin, e, e.getMessage()), self);
sender.tell(ConnectionFailure.of(origin, e, e.getMessage()), self);
log.error("Unexpected error: {}", e.getMessage());
}
}
Expand Down Expand Up @@ -436,7 +436,7 @@ private void disconnectAndTell(final javax.jms.Connection connection, @Nullable
terminateConnection(connection);
log.info("Connection <{}> closed.", this.connectionContext.getConnection().getId());

getSender().tell(new ImmutableClientDisconnected(origin, shutdownAfterDisconnect), getSelf());
getSender().tell(ClientDisconnected.of(origin, shutdownAfterDisconnect), getSelf());
}


Expand Down
Loading

0 comments on commit 65883ef

Please sign in to comment.