diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/BaseClientActor.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/BaseClientActor.java index a0dda56213..66f0f58346 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/BaseClientActor.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/BaseClientActor.java @@ -214,7 +214,7 @@ protected BaseClientActor(final Connection connection, @Nullable final ActorRef this.dittoProtocolSub = DittoProtocolSub.get(getContext().getSystem()); actorUUID = UUID.randomUUID().toString(); - final ConnectionId connectionId = connection.getId(); + final var connectionId = connection.getId(); logger = DittoLoggerFactory.getThreadSafeDittoLoggingAdapter(this) .withMdcEntry(ConnectivityMdcEntryKey.CONNECTION_ID, connection.getId()); // log the default client ID for tracing @@ -226,7 +226,7 @@ protected BaseClientActor(final Connection connection, @Nullable final ActorRef proxyActorSelection = getLocalActorOfSamePath(proxyActor); connectivityConfigProvider = ConnectivityConfigProviderFactory.getInstance(getContext().getSystem()); - final ProtocolAdapterProvider protocolAdapterProvider = + final var protocolAdapterProvider = ProtocolAdapterProvider.load(connectivityConfig.getProtocolConfig(), getContext().getSystem()); protocolAdapter = protocolAdapterProvider.getProtocolAdapter(null); @@ -237,7 +237,7 @@ protected BaseClientActor(final Connection connection, @Nullable final ActorRef .tag("id", connectionId.toString()) .tag("type", connection.getConnectionType().getName()); - final MonitoringConfig monitoringConfig = connectivityConfig.getMonitoringConfig(); + final var monitoringConfig = connectivityConfig.getMonitoringConfig(); connectionCounterRegistry = ConnectivityCounterRegistry.newInstance(); connectionLoggerRegistry = ConnectionLoggerRegistry.fromConfig(monitoringConfig.logger()); @@ -347,6 +347,7 @@ public void postStop() { clientGauge.reset(); clientConnectingGauge.reset(); stopChildActor(tunnelActor); + logger.debug("Stopped client with id - <{}>", getDefaultClientId()); try { super.postStop(); } catch (final Exception e) { @@ -358,7 +359,7 @@ public void postStop() { public void onConnectivityConfigModified(final ConnectivityConfig modifiedConfig) { if (hasInboundMapperConfigChanged(modifiedConfig)) { logger.debug("Config changed for InboundMappingProcessor, recreating it."); - final InboundMappingProcessor inboundMappingProcessor = + final var inboundMappingProcessor = InboundMappingProcessor.of(connection.getId(), connection.getConnectionType(), connection.getPayloadMappingDefinition(), getContext().getSystem(), modifiedConfig, @@ -367,7 +368,7 @@ public void onConnectivityConfigModified(final ConnectivityConfig modifiedConfig } if (hasOutboundMapperConfigChanged(modifiedConfig)) { logger.debug("Config changed for OutboundMappingProcessor, recreating it."); - final OutboundMappingProcessor outboundMappingProcessor = + final var outboundMappingProcessor = OutboundMappingProcessor.of(connection, getContext().getSystem(), modifiedConfig, protocolAdapter, logger); outboundMappingProcessorActor.tell(new ReplaceOutboundMappingProcessor(outboundMappingProcessor), @@ -401,15 +402,15 @@ protected String getDefaultClientId() { } private boolean hasInboundMapperConfigChanged(final ConnectivityConfig connectivityConfig) { - final MapperLimitsConfig currentConfig = this.connectivityConfig.getMappingConfig().getMapperLimitsConfig(); - final MapperLimitsConfig modifiedConfig = connectivityConfig.getMappingConfig().getMapperLimitsConfig(); + final var currentConfig = this.connectivityConfig.getMappingConfig().getMapperLimitsConfig(); + final var modifiedConfig = connectivityConfig.getMappingConfig().getMapperLimitsConfig(); return currentConfig.getMaxMappedInboundMessages() != modifiedConfig.getMaxMappedInboundMessages() || currentConfig.getMaxSourceMappers() != modifiedConfig.getMaxSourceMappers(); } private boolean hasOutboundMapperConfigChanged(final ConnectivityConfig connectivityConfig) { - final MapperLimitsConfig currentConfig = this.connectivityConfig.getMappingConfig().getMapperLimitsConfig(); - final MapperLimitsConfig modifiedConfig = connectivityConfig.getMappingConfig().getMapperLimitsConfig(); + final var currentConfig = this.connectivityConfig.getMappingConfig().getMapperLimitsConfig(); + final var modifiedConfig = connectivityConfig.getMappingConfig().getMapperLimitsConfig(); return currentConfig.getMaxMappedOutboundMessages() != modifiedConfig.getMaxMappedOutboundMessages() || currentConfig.getMaxTargetMappers() != modifiedConfig.getMaxTargetMappers(); } @@ -544,7 +545,7 @@ protected final ActorRef getInboundMappingProcessorActor() { /** * Escapes the passed actorName in a actorName valid way. Actor name should be a valid URL with ASCII letters, see - * also {@code akka.actor.ActorPath#isValidPathElement}, therefor we encode the name as an ASCII URL. + * also {@code akka.actor.ActorPath#isValidPathElement}, therefore we encode the name as an ASCII URL. * * @param name the actorName to escape. * @return the escaped name. @@ -982,13 +983,13 @@ private FSM.State testConnection(final TestConn final ActorRef self = getSelf(); final ActorRef sender = getSender(); - final Connection connectionToBeTested = testConnection.getConnection(); + final var connectionToBeTested = testConnection.getConnection(); if (stateData().getSshTunnelState().isEnabled() && !stateData().getSshTunnelState().isEstablished()) { logger.info("Connection requires SSH tunnel, starting tunnel."); tellTunnelActor(SshTunnelActor.TunnelControl.START_TUNNEL); } else if (!canConnectViaSocket(connectionToBeTested)) { - final ConnectionFailedException connectionFailedException = + final var connectionFailedException = newConnectionFailedException(testConnection.getDittoHeaders()); final Status.Status failure = new Status.Failure(connectionFailedException); getSelf().tell(failure, self); @@ -1112,7 +1113,7 @@ private State testConnectionAfterTunnelStarted( private State tunnelClosed(final SshTunnelActor.TunnelClosed tunnelClosed, final BaseClientData data) { logger.info("SSH tunnel closed: {}", tunnelClosed.getMessage()); - final ImmutableConnectionFailure failure = + final var failure = new ImmutableConnectionFailure(null, tunnelClosed.getError(), tunnelClosed.getMessage()); getSelf().tell(failure, getSelf()); final SshTunnelState closedState = data.getSshTunnelState().failed(tunnelClosed.getError()); @@ -1289,7 +1290,7 @@ private State backoffAfterFailure(final Connect if (ConnectivityStatus.OPEN.equals(data.getDesiredConnectionStatus())) { if (reconnectTimeoutStrategy.canReconnect()) { final Duration nextBackoff = reconnectTimeoutStrategy.getNextBackoff(); - final String errorMessage = + final var errorMessage = String.format("Connection failed due to: {0}. Will reconnect after %s.", nextBackoff); connectionLogger.failure(errorMessage, event.getFailureDescription()); logger.info("Connection failed: {}. Reconnect after {}.", event, nextBackoff); @@ -1343,7 +1344,7 @@ private FSM.State retrieveConnectionStatus(fina // send to all children (consumers, publishers, except mapping actor) getContext().getChildren().forEach(child -> { - final String childName = child.path().name(); + final var childName = child.path().name(); if (!NO_ADDRESS_REPORTING_CHILD_NAMES.contains(childName)) { logger.withCorrelationId(command) .debug("Forwarding RetrieveAddressStatus to child <{}>.", child.path()); @@ -1371,15 +1372,15 @@ private FSM.State retrieveConnectionMetrics( logger.withCorrelationId(command) .debug("Received RetrieveConnectionMetrics message for connection <{}>. Gathering metrics.", command.getEntityId()); - final DittoHeaders dittoHeaders = command.getDittoHeaders(); + final var dittoHeaders = command.getDittoHeaders(); - final SourceMetrics sourceMetrics = connectionCounterRegistry.aggregateSourceMetrics(connectionId()); - final TargetMetrics targetMetrics = connectionCounterRegistry.aggregateTargetMetrics(connectionId()); + final var sourceMetrics = connectionCounterRegistry.aggregateSourceMetrics(connectionId()); + final var targetMetrics = connectionCounterRegistry.aggregateTargetMetrics(connectionId()); - final ConnectionMetrics connectionMetrics = + final var connectionMetrics = connectionCounterRegistry.aggregateConnectionMetrics(sourceMetrics, targetMetrics); - final RetrieveConnectionMetricsResponse retrieveConnectionMetricsResponse = + final var retrieveConnectionMetricsResponse = RetrieveConnectionMetricsResponse.getBuilder(connectionId(), dittoHeaders) .connectionMetrics(connectionMetrics) .sourceMetrics(sourceMetrics) @@ -1402,7 +1403,7 @@ private FSM.State resetConnectionMetrics(final } private FSM.State enableConnectionLogs(final EnableConnectionLogs command) { - final ConnectionId connectionId = command.getEntityId(); + final var connectionId = command.getEntityId(); logger.withCorrelationId(command) .debug("Received EnableConnectionLogs message for connection <{}>. Enabling logs.", connectionId); @@ -1412,7 +1413,7 @@ private FSM.State enableConnectionLogs(final En } private FSM.State checkLoggingActive(final CheckConnectionLogsActive command) { - final ConnectionId connectionId = command.getEntityId(); + final var connectionId = command.getEntityId(); logger.withCorrelationId(command) .debug("Received checkLoggingActive message for connection <{}>." + " Checking if logging for connection is expired.", connectionId); @@ -1488,9 +1489,9 @@ private DittoRuntimeException unhandledExceptionForSignalInState(final Object si } protected boolean canConnectViaSocket(final Connection connection) { - final SshTunnelState tunnelState = stateData().getSshTunnelState(); + final var tunnelState = stateData().getSshTunnelState(); if (tunnelState.isEnabled()) { - final URI uri = connection.getSshTunnel().map(SshTunnel::getUri).map(URI::create).orElseThrow(); + final var uri = connection.getSshTunnel().map(SshTunnel::getUri).map(URI::create).orElseThrow(); final String sshHost = uri.getHost(); final int sshPort = uri.getPort(); final int localTunnelPort = tunnelState.getLocalPort(); @@ -1548,7 +1549,7 @@ private FSM.State handleInboundSignal(final Inb private static Optional tryExtractEntityId(Signal signal) { if (signal instanceof WithEntityId) { - final WithEntityId withEntityId = (WithEntityId) signal; + final var withEntityId = (WithEntityId) signal; return Optional.of(withEntityId.getEntityId()); } else { return Optional.empty(); @@ -1558,7 +1559,7 @@ private static Optional tryExtractEntityId(Signal signal) { private void dispatchSearchCommand(final WithSubscriptionId searchCommand) { final String subscriptionId = searchCommand.getSubscriptionId(); if (subscriptionId.length() > subscriptionIdPrefixLength) { - final String prefix = subscriptionId.substring(0, subscriptionIdPrefixLength); + final var prefix = subscriptionId.substring(0, subscriptionIdPrefixLength); final Optional index = parseHexString(prefix); if (index.isPresent()) { final ActorRef receiver = clientActorRefs.get(index.get()).orElseThrow(); @@ -1620,7 +1621,7 @@ private Pair startOutboundActors(final Connection connection final OutboundMappingSettings settings; final OutboundMappingProcessor outboundMappingProcessor; try { - ConnectivityConfig retrievedConnectivityConfig = + var retrievedConnectivityConfig = connectivityConfigProvider.getConnectivityConfig(connection.getId()); // this one throws DittoRuntimeExceptions when the mapper could not be configured settings = OutboundMappingSettings.of(connection, @@ -1685,7 +1686,7 @@ private ActorRef startInboundMappingProcessorActor(final Connection connection, final InboundMappingProcessor inboundMappingProcessor; try { - ConnectivityConfig retrievedConnectivityConfig = + var retrievedConnectivityConfig = connectivityConfigProvider.getConnectivityConfig(connection.getId()); // this one throws DittoRuntimeExceptions when the mapper could not be configured @@ -1721,8 +1722,8 @@ private ActorRef startInboundMappingProcessorActor(final Connection connection, */ private ActorRef startSubscriptionManager(final ActorSelection proxyActor) { final ActorRef pubSubMediator = DistributedPubSub.get(getContext().getSystem()).mediator(); - final Materializer mat = Materializer.createMaterializer(this::getContext); - final Props props = SubscriptionManager.props(clientConfig.getSubscriptionManagerTimeout(), pubSubMediator, + final var mat = Materializer.createMaterializer(this::getContext); + final var props = SubscriptionManager.props(clientConfig.getSubscriptionManagerTimeout(), pubSubMediator, proxyActor, mat); return getContext().actorOf(props, SubscriptionManager.ACTOR_NAME); } @@ -1777,7 +1778,7 @@ private void scheduleStateTimeout(final Duration duration) { private Status.Status getStatusToReport(final Status.Status status, final DittoHeaders dittoHeaders) { final Status.Status answerToPublish; if (status instanceof Status.Failure) { - final Status.Failure failure = (Status.Failure) status; + final var failure = (Status.Failure) status; if (!(failure.cause() instanceof DittoRuntimeException)) { final DittoRuntimeException error = ConnectionFailedException.newBuilder(connectionId()) .description(describeEventualCause(failure.cause())) @@ -1802,7 +1803,7 @@ private static String describeEventualCause(@Nullable final Throwable throwable) if (null == throwable) { return "Unknown cause."; } - final Throwable cause = throwable.getCause(); + final var cause = throwable.getCause(); if (cause == null || cause.equals(throwable)) { final String message = throwable.getMessage() != null @@ -2004,7 +2005,7 @@ public Duration getNextBackoff() { } private void increaseTimeoutAfterRecovery() { - final Instant now = Instant.now(); + final var now = Instant.now(); performRecovery(now); currentTimeout = minDuration(maxTimeout, currentTimeout.multipliedBy(2L)); ++currentTries;