Skip to content

Commit

Permalink
add debug log to postStop method;
Browse files Browse the repository at this point in the history
fix sonar warnings;

Signed-off-by: Stefan Maute <stefan.maute@bosch.io>
  • Loading branch information
Stefan Maute committed Jun 16, 2021
1 parent e2e9690 commit 3ed9b8c
Showing 1 changed file with 34 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ protected BaseClientActor(final Connection connection, @Nullable final ActorRef
this.dittoProtocolSub = DittoProtocolSub.get(getContext().getSystem());
actorUUID = UUID.randomUUID().toString();

final ConnectionId connectionId = connection.getId();
final var connectionId = connection.getId();
logger = DittoLoggerFactory.getThreadSafeDittoLoggingAdapter(this)
.withMdcEntry(ConnectivityMdcEntryKey.CONNECTION_ID, connection.getId());
// log the default client ID for tracing
Expand All @@ -226,7 +226,7 @@ protected BaseClientActor(final Connection connection, @Nullable final ActorRef
proxyActorSelection = getLocalActorOfSamePath(proxyActor);
connectivityConfigProvider = ConnectivityConfigProviderFactory.getInstance(getContext().getSystem());

final ProtocolAdapterProvider protocolAdapterProvider =
final var protocolAdapterProvider =
ProtocolAdapterProvider.load(connectivityConfig.getProtocolConfig(), getContext().getSystem());
protocolAdapter = protocolAdapterProvider.getProtocolAdapter(null);

Expand All @@ -237,7 +237,7 @@ protected BaseClientActor(final Connection connection, @Nullable final ActorRef
.tag("id", connectionId.toString())
.tag("type", connection.getConnectionType().getName());

final MonitoringConfig monitoringConfig = connectivityConfig.getMonitoringConfig();
final var monitoringConfig = connectivityConfig.getMonitoringConfig();
connectionCounterRegistry = ConnectivityCounterRegistry.newInstance();
connectionLoggerRegistry = ConnectionLoggerRegistry.fromConfig(monitoringConfig.logger());

Expand Down Expand Up @@ -347,6 +347,7 @@ public void postStop() {
clientGauge.reset();
clientConnectingGauge.reset();
stopChildActor(tunnelActor);
logger.debug("Stopped client with id - <{}>", getDefaultClientId());
try {
super.postStop();
} catch (final Exception e) {
Expand All @@ -358,7 +359,7 @@ public void postStop() {
public void onConnectivityConfigModified(final ConnectivityConfig modifiedConfig) {
if (hasInboundMapperConfigChanged(modifiedConfig)) {
logger.debug("Config changed for InboundMappingProcessor, recreating it.");
final InboundMappingProcessor inboundMappingProcessor =
final var inboundMappingProcessor =
InboundMappingProcessor.of(connection.getId(), connection.getConnectionType(),
connection.getPayloadMappingDefinition(),
getContext().getSystem(), modifiedConfig,
Expand All @@ -367,7 +368,7 @@ public void onConnectivityConfigModified(final ConnectivityConfig modifiedConfig
}
if (hasOutboundMapperConfigChanged(modifiedConfig)) {
logger.debug("Config changed for OutboundMappingProcessor, recreating it.");
final OutboundMappingProcessor outboundMappingProcessor =
final var outboundMappingProcessor =
OutboundMappingProcessor.of(connection, getContext().getSystem(), modifiedConfig,
protocolAdapter, logger);
outboundMappingProcessorActor.tell(new ReplaceOutboundMappingProcessor(outboundMappingProcessor),
Expand Down Expand Up @@ -401,15 +402,15 @@ protected String getDefaultClientId() {
}

private boolean hasInboundMapperConfigChanged(final ConnectivityConfig connectivityConfig) {
final MapperLimitsConfig currentConfig = this.connectivityConfig.getMappingConfig().getMapperLimitsConfig();
final MapperLimitsConfig modifiedConfig = connectivityConfig.getMappingConfig().getMapperLimitsConfig();
final var currentConfig = this.connectivityConfig.getMappingConfig().getMapperLimitsConfig();
final var modifiedConfig = connectivityConfig.getMappingConfig().getMapperLimitsConfig();
return currentConfig.getMaxMappedInboundMessages() != modifiedConfig.getMaxMappedInboundMessages()
|| currentConfig.getMaxSourceMappers() != modifiedConfig.getMaxSourceMappers();
}

private boolean hasOutboundMapperConfigChanged(final ConnectivityConfig connectivityConfig) {
final MapperLimitsConfig currentConfig = this.connectivityConfig.getMappingConfig().getMapperLimitsConfig();
final MapperLimitsConfig modifiedConfig = connectivityConfig.getMappingConfig().getMapperLimitsConfig();
final var currentConfig = this.connectivityConfig.getMappingConfig().getMapperLimitsConfig();
final var modifiedConfig = connectivityConfig.getMappingConfig().getMapperLimitsConfig();
return currentConfig.getMaxMappedOutboundMessages() != modifiedConfig.getMaxMappedOutboundMessages()
|| currentConfig.getMaxTargetMappers() != modifiedConfig.getMaxTargetMappers();
}
Expand Down Expand Up @@ -544,7 +545,7 @@ protected final ActorRef getInboundMappingProcessorActor() {

/**
* Escapes the passed actorName in a actorName valid way. Actor name should be a valid URL with ASCII letters, see
* also {@code akka.actor.ActorPath#isValidPathElement}, therefor we encode the name as an ASCII URL.
* also {@code akka.actor.ActorPath#isValidPathElement}, therefore we encode the name as an ASCII URL.
*
* @param name the actorName to escape.
* @return the escaped name.
Expand Down Expand Up @@ -982,13 +983,13 @@ private FSM.State<BaseClientState, BaseClientData> testConnection(final TestConn

final ActorRef self = getSelf();
final ActorRef sender = getSender();
final Connection connectionToBeTested = testConnection.getConnection();
final var connectionToBeTested = testConnection.getConnection();

if (stateData().getSshTunnelState().isEnabled() && !stateData().getSshTunnelState().isEstablished()) {
logger.info("Connection requires SSH tunnel, starting tunnel.");
tellTunnelActor(SshTunnelActor.TunnelControl.START_TUNNEL);
} else if (!canConnectViaSocket(connectionToBeTested)) {
final ConnectionFailedException connectionFailedException =
final var connectionFailedException =
newConnectionFailedException(testConnection.getDittoHeaders());
final Status.Status failure = new Status.Failure(connectionFailedException);
getSelf().tell(failure, self);
Expand Down Expand Up @@ -1112,7 +1113,7 @@ private State<BaseClientState, BaseClientData> testConnectionAfterTunnelStarted(
private State<BaseClientState, BaseClientData> tunnelClosed(final SshTunnelActor.TunnelClosed tunnelClosed,
final BaseClientData data) {
logger.info("SSH tunnel closed: {}", tunnelClosed.getMessage());
final ImmutableConnectionFailure failure =
final var failure =
new ImmutableConnectionFailure(null, tunnelClosed.getError(), tunnelClosed.getMessage());
getSelf().tell(failure, getSelf());
final SshTunnelState closedState = data.getSshTunnelState().failed(tunnelClosed.getError());
Expand Down Expand Up @@ -1289,7 +1290,7 @@ private State<BaseClientState, BaseClientData> backoffAfterFailure(final Connect
if (ConnectivityStatus.OPEN.equals(data.getDesiredConnectionStatus())) {
if (reconnectTimeoutStrategy.canReconnect()) {
final Duration nextBackoff = reconnectTimeoutStrategy.getNextBackoff();
final String errorMessage =
final var errorMessage =
String.format("Connection failed due to: {0}. Will reconnect after %s.", nextBackoff);
connectionLogger.failure(errorMessage, event.getFailureDescription());
logger.info("Connection failed: {}. Reconnect after {}.", event, nextBackoff);
Expand Down Expand Up @@ -1343,7 +1344,7 @@ private FSM.State<BaseClientState, BaseClientData> retrieveConnectionStatus(fina

// send to all children (consumers, publishers, except mapping actor)
getContext().getChildren().forEach(child -> {
final String childName = child.path().name();
final var childName = child.path().name();
if (!NO_ADDRESS_REPORTING_CHILD_NAMES.contains(childName)) {
logger.withCorrelationId(command)
.debug("Forwarding RetrieveAddressStatus to child <{}>.", child.path());
Expand Down Expand Up @@ -1371,15 +1372,15 @@ private FSM.State<BaseClientState, BaseClientData> retrieveConnectionMetrics(
logger.withCorrelationId(command)
.debug("Received RetrieveConnectionMetrics message for connection <{}>. Gathering metrics.",
command.getEntityId());
final DittoHeaders dittoHeaders = command.getDittoHeaders();
final var dittoHeaders = command.getDittoHeaders();

final SourceMetrics sourceMetrics = connectionCounterRegistry.aggregateSourceMetrics(connectionId());
final TargetMetrics targetMetrics = connectionCounterRegistry.aggregateTargetMetrics(connectionId());
final var sourceMetrics = connectionCounterRegistry.aggregateSourceMetrics(connectionId());
final var targetMetrics = connectionCounterRegistry.aggregateTargetMetrics(connectionId());

final ConnectionMetrics connectionMetrics =
final var connectionMetrics =
connectionCounterRegistry.aggregateConnectionMetrics(sourceMetrics, targetMetrics);

final RetrieveConnectionMetricsResponse retrieveConnectionMetricsResponse =
final var retrieveConnectionMetricsResponse =
RetrieveConnectionMetricsResponse.getBuilder(connectionId(), dittoHeaders)
.connectionMetrics(connectionMetrics)
.sourceMetrics(sourceMetrics)
Expand All @@ -1402,7 +1403,7 @@ private FSM.State<BaseClientState, BaseClientData> resetConnectionMetrics(final
}

private FSM.State<BaseClientState, BaseClientData> enableConnectionLogs(final EnableConnectionLogs command) {
final ConnectionId connectionId = command.getEntityId();
final var connectionId = command.getEntityId();
logger.withCorrelationId(command)
.debug("Received EnableConnectionLogs message for connection <{}>. Enabling logs.", connectionId);

Expand All @@ -1412,7 +1413,7 @@ private FSM.State<BaseClientState, BaseClientData> enableConnectionLogs(final En
}

private FSM.State<BaseClientState, BaseClientData> checkLoggingActive(final CheckConnectionLogsActive command) {
final ConnectionId connectionId = command.getEntityId();
final var connectionId = command.getEntityId();
logger.withCorrelationId(command)
.debug("Received checkLoggingActive message for connection <{}>." +
" Checking if logging for connection is expired.", connectionId);
Expand Down Expand Up @@ -1488,9 +1489,9 @@ private DittoRuntimeException unhandledExceptionForSignalInState(final Object si
}

protected boolean canConnectViaSocket(final Connection connection) {
final SshTunnelState tunnelState = stateData().getSshTunnelState();
final var tunnelState = stateData().getSshTunnelState();
if (tunnelState.isEnabled()) {
final URI uri = connection.getSshTunnel().map(SshTunnel::getUri).map(URI::create).orElseThrow();
final var uri = connection.getSshTunnel().map(SshTunnel::getUri).map(URI::create).orElseThrow();
final String sshHost = uri.getHost();
final int sshPort = uri.getPort();
final int localTunnelPort = tunnelState.getLocalPort();
Expand Down Expand Up @@ -1548,7 +1549,7 @@ private FSM.State<BaseClientState, BaseClientData> handleInboundSignal(final Inb

private static Optional<EntityId> tryExtractEntityId(Signal<?> signal) {
if (signal instanceof WithEntityId) {
final WithEntityId withEntityId = (WithEntityId) signal;
final var withEntityId = (WithEntityId) signal;
return Optional.of(withEntityId.getEntityId());
} else {
return Optional.empty();
Expand All @@ -1558,7 +1559,7 @@ private static Optional<EntityId> tryExtractEntityId(Signal<?> signal) {
private void dispatchSearchCommand(final WithSubscriptionId<?> searchCommand) {
final String subscriptionId = searchCommand.getSubscriptionId();
if (subscriptionId.length() > subscriptionIdPrefixLength) {
final String prefix = subscriptionId.substring(0, subscriptionIdPrefixLength);
final var prefix = subscriptionId.substring(0, subscriptionIdPrefixLength);
final Optional<Integer> index = parseHexString(prefix);
if (index.isPresent()) {
final ActorRef receiver = clientActorRefs.get(index.get()).orElseThrow();
Expand Down Expand Up @@ -1620,7 +1621,7 @@ private Pair<ActorRef, ActorRef> startOutboundActors(final Connection connection
final OutboundMappingSettings settings;
final OutboundMappingProcessor outboundMappingProcessor;
try {
ConnectivityConfig retrievedConnectivityConfig =
var retrievedConnectivityConfig =
connectivityConfigProvider.getConnectivityConfig(connection.getId());
// this one throws DittoRuntimeExceptions when the mapper could not be configured
settings = OutboundMappingSettings.of(connection,
Expand Down Expand Up @@ -1685,7 +1686,7 @@ private ActorRef startInboundMappingProcessorActor(final Connection connection,

final InboundMappingProcessor inboundMappingProcessor;
try {
ConnectivityConfig retrievedConnectivityConfig =
var retrievedConnectivityConfig =
connectivityConfigProvider.getConnectivityConfig(connection.getId());

// this one throws DittoRuntimeExceptions when the mapper could not be configured
Expand Down Expand Up @@ -1721,8 +1722,8 @@ private ActorRef startInboundMappingProcessorActor(final Connection connection,
*/
private ActorRef startSubscriptionManager(final ActorSelection proxyActor) {
final ActorRef pubSubMediator = DistributedPubSub.get(getContext().getSystem()).mediator();
final Materializer mat = Materializer.createMaterializer(this::getContext);
final Props props = SubscriptionManager.props(clientConfig.getSubscriptionManagerTimeout(), pubSubMediator,
final var mat = Materializer.createMaterializer(this::getContext);
final var props = SubscriptionManager.props(clientConfig.getSubscriptionManagerTimeout(), pubSubMediator,
proxyActor, mat);
return getContext().actorOf(props, SubscriptionManager.ACTOR_NAME);
}
Expand Down Expand Up @@ -1777,7 +1778,7 @@ private void scheduleStateTimeout(final Duration duration) {
private Status.Status getStatusToReport(final Status.Status status, final DittoHeaders dittoHeaders) {
final Status.Status answerToPublish;
if (status instanceof Status.Failure) {
final Status.Failure failure = (Status.Failure) status;
final var failure = (Status.Failure) status;
if (!(failure.cause() instanceof DittoRuntimeException)) {
final DittoRuntimeException error = ConnectionFailedException.newBuilder(connectionId())
.description(describeEventualCause(failure.cause()))
Expand All @@ -1802,7 +1803,7 @@ private static String describeEventualCause(@Nullable final Throwable throwable)
if (null == throwable) {
return "Unknown cause.";
}
final Throwable cause = throwable.getCause();
final var cause = throwable.getCause();
if (cause == null || cause.equals(throwable)) {
final String message =
throwable.getMessage() != null
Expand Down Expand Up @@ -2004,7 +2005,7 @@ public Duration getNextBackoff() {
}

private void increaseTimeoutAfterRecovery() {
final Instant now = Instant.now();
final var now = Instant.now();
performRecovery(now);
currentTimeout = minDuration(maxTimeout, currentTimeout.multipliedBy(2L));
++currentTries;
Expand Down

0 comments on commit 3ed9b8c

Please sign in to comment.