diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/BaseClientActor.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/BaseClientActor.java index 7c1ae5593d..115e66ceb8 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/BaseClientActor.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/BaseClientActor.java @@ -326,7 +326,7 @@ protected void init() { if (connection.getSshTunnel().map(SshTunnel::isEnabled).orElse(false)) { tunnelActor = startChildActor(SshTunnelActor.ACTOR_NAME, SshTunnelActor.props(connection, - connectivityStatusResolver, connectivityConfig)); + connectivityStatusResolver, connectionLogger)); } else { tunnelActor = null; } @@ -358,6 +358,12 @@ public void postStop() { clientConnectingGauge.reset(); stopChildActor(tunnelActor); logger.debug("Stopped client with id - <{}>", getDefaultClientId()); + try { + connectionLogger.close(); + } catch (final IOException e) { + logger.warning("Exception during closing connection logger <{}>: {}", e.getClass().getSimpleName(), + e.getMessage()); + } try { super.postStop(); } catch (final Exception e) { diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/monitoring/logs/CompoundConnectionLogger.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/monitoring/logs/CompoundConnectionLogger.java index e6e1266408..0c55569377 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/monitoring/logs/CompoundConnectionLogger.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/monitoring/logs/CompoundConnectionLogger.java @@ -12,6 +12,7 @@ */ package org.eclipse.ditto.connectivity.service.messaging.monitoring.logs; +import java.io.IOException; import java.util.Collection; import java.util.Objects; import java.util.stream.Collectors; @@ -105,6 +106,13 @@ public boolean isMuted() { .allMatch(MuteableConnectionLogger::isMuted); } + @Override + public void close() throws IOException { + for (final var connectionLogger : connectionLoggers) { + connectionLogger.close(); + } + } + @Override public boolean equals(final Object o) { if (this == o) { diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/monitoring/logs/ConnectionLogger.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/monitoring/logs/ConnectionLogger.java index 2e186487af..a097aaaf53 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/monitoring/logs/ConnectionLogger.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/monitoring/logs/ConnectionLogger.java @@ -13,21 +13,22 @@ package org.eclipse.ditto.connectivity.service.messaging.monitoring.logs; +import java.io.Closeable; import java.util.Collection; import javax.annotation.Nullable; import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException; +import org.eclipse.ditto.base.model.signals.Signal; import org.eclipse.ditto.connectivity.model.ConnectionId; import org.eclipse.ditto.connectivity.model.LogEntry; import org.eclipse.ditto.connectivity.service.config.MonitoringLoggerConfig; import org.eclipse.ditto.connectivity.service.messaging.monitoring.ConnectionMonitor; -import org.eclipse.ditto.base.model.signals.Signal; /** * Logger for connections that provides log messages for end users. */ -public interface ConnectionLogger { +public interface ConnectionLogger extends Closeable { /** * Gets the connection logger for the given connection ID. diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/monitoring/logs/ConnectionLoggerRegistry.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/monitoring/logs/ConnectionLoggerRegistry.java index e799c1ef0b..8eaa86430e 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/monitoring/logs/ConnectionLoggerRegistry.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/monitoring/logs/ConnectionLoggerRegistry.java @@ -14,6 +14,7 @@ import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull; +import java.io.IOException; import java.time.Duration; import java.time.Instant; import java.time.temporal.TemporalAmount; @@ -300,8 +301,17 @@ private void invalidateLoggers(final ConnectionId connectionId) { final Set mapsKeysToDelete = LOGGERS.keySet().stream() .filter(mapKey -> mapKey.connectionId.equals(connectionId)) .collect(Collectors.toSet()); - // TODO TJ flush logs before removing from loggers (next task) - mapsKeysToDelete.forEach(LOGGERS::remove); + + mapsKeysToDelete.forEach(loggerKey -> { + // flush logs before removing from loggers: + try { + LOGGERS.get(loggerKey).close(); + } catch (final IOException e) { + LOGGER.warn("Exception during closing logger <{}>: <{}>: {}", loggerKey, e.getClass().getSimpleName(), + e.getMessage()); + } + LOGGERS.remove(loggerKey); + }); } private void initLogger(final ConnectionId connectionId) { diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/monitoring/logs/DefaultMuteableConnectionLogger.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/monitoring/logs/DefaultMuteableConnectionLogger.java index 3803852640..3facf2ba20 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/monitoring/logs/DefaultMuteableConnectionLogger.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/monitoring/logs/DefaultMuteableConnectionLogger.java @@ -13,6 +13,7 @@ package org.eclipse.ditto.connectivity.service.messaging.monitoring.logs; +import java.io.IOException; import java.util.Collection; import java.util.Collections; import java.util.Objects; @@ -141,6 +142,11 @@ public void clear() { wrapInExceptionHandling(() -> delegate.clear()); } + @Override + public void close() throws IOException { + delegate.close(); + } + @Override public Collection getLogs() { if (active) { diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/monitoring/logs/EvictingConnectionLogger.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/monitoring/logs/EvictingConnectionLogger.java index 8f2b25b22a..832631a1bd 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/monitoring/logs/EvictingConnectionLogger.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/monitoring/logs/EvictingConnectionLogger.java @@ -13,6 +13,7 @@ package org.eclipse.ditto.connectivity.service.messaging.monitoring.logs; +import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Objects; @@ -104,6 +105,11 @@ public void clear() { failureLogs.clear(); } + @Override + public void close() throws IOException { + clear(); + } + @Override public Collection getLogs() { final Collection logs = new ArrayList<>(successLogs.size() + failureLogs.size()); diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/monitoring/logs/ExceptionalConnectionLogger.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/monitoring/logs/ExceptionalConnectionLogger.java index 4587eb37f1..cc4815e6df 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/monitoring/logs/ExceptionalConnectionLogger.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/monitoring/logs/ExceptionalConnectionLogger.java @@ -12,6 +12,7 @@ */ package org.eclipse.ditto.connectivity.service.messaging.monitoring.logs; +import java.io.IOException; import java.text.MessageFormat; import java.util.Collection; import java.util.Collections; @@ -103,6 +104,11 @@ public void clear() { logger.trace("Not clearing logs since logger is exceptional."); } + @Override + public void close() throws IOException { + clear(); + } + @Override public Collection getLogs() { logger.trace("Returning empty logs since logger is exceptional."); diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/monitoring/logs/FluentPublishingConnectionLogger.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/monitoring/logs/FluentPublishingConnectionLogger.java index b9be20b1e4..165b9149ac 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/monitoring/logs/FluentPublishingConnectionLogger.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/monitoring/logs/FluentPublishingConnectionLogger.java @@ -106,6 +106,18 @@ public void clear() { // no-op } + @Override + public void close() throws IOException { + LOGGER.info("Flushing and closing Fluency forwarder..."); + // fluencyForwarder.close also flushes: + fluencyForwarder.close(); + try { + fluencyForwarder.waitUntilAllBufferFlushed(10); // TODO TJ do we want this? if yes, make configurable + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + @Override public void success(final ConnectionMonitor.InfoProvider infoProvider, final String message, final Object... messageArguments) { diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/tunnel/SshTunnelActor.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/tunnel/SshTunnelActor.java index 6e449aa3cd..21f9a211c1 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/tunnel/SshTunnelActor.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/tunnel/SshTunnelActor.java @@ -42,13 +42,10 @@ import org.eclipse.ditto.connectivity.model.SshTunnel; import org.eclipse.ditto.connectivity.model.UserPasswordCredentials; import org.eclipse.ditto.connectivity.model.signals.commands.exceptions.ConnectionFailedException; -import org.eclipse.ditto.connectivity.service.config.ConnectivityConfig; -import org.eclipse.ditto.connectivity.service.config.MonitoringConfig; import org.eclipse.ditto.connectivity.service.messaging.ConnectivityStatusResolver; import org.eclipse.ditto.connectivity.service.messaging.internal.ConnectionFailure; import org.eclipse.ditto.connectivity.service.messaging.internal.RetrieveAddressStatus; import org.eclipse.ditto.connectivity.service.messaging.monitoring.logs.ConnectionLogger; -import org.eclipse.ditto.connectivity.service.messaging.monitoring.logs.ConnectionLoggerRegistry; import org.eclipse.ditto.connectivity.service.util.ConnectivityMdcEntryKey; import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory; import org.eclipse.ditto.internal.utils.config.InstanceIdentifierSupplier; @@ -90,19 +87,16 @@ public final class SshTunnelActor extends AbstractActorWithTimers implements Cre @Nullable Throwable error = null; @SuppressWarnings("unused") - private SshTunnelActor(final Connection connection, final ConnectivityStatusResolver connectivityStatusResolver, - final ConnectivityConfig connectivityConfig) { + private SshTunnelActor(final Connection connection, + final ConnectivityStatusResolver connectivityStatusResolver, + final ConnectionLogger connectionLogger) { logger = DittoLoggerFactory.getThreadSafeDittoLoggingAdapter(this) .withMdcEntry(ConnectivityMdcEntryKey.CONNECTION_ID, connection.getId()); this.connection = connection; this.connectivityStatusResolver = connectivityStatusResolver; + this.connectionLogger = connectionLogger; sshClient = SshClientProvider.get(getContext().getSystem()).getSshClient(); - final MonitoringConfig monitoringConfig = connectivityConfig.getMonitoringConfig(); - final ConnectionLoggerRegistry connectionLoggerRegistry = - ConnectionLoggerRegistry.fromConfig(monitoringConfig.logger()); - connectionLogger = connectionLoggerRegistry.forConnection(connection.getId()); - final SshTunnel sshTunnel = this.connection.getSshTunnel() .orElseThrow(() -> ConnectionFailedException .newBuilder(connection.getId()) @@ -127,13 +121,13 @@ private SshTunnelActor(final Connection connection, final ConnectivityStatusReso * @param connection the connection to create the SSH tunnel for. * @param connectivityStatusResolver connectivity status resolver to resolve the SSH tunnel status based on * exceptions. - * @param connectivityConfig the connectivity configuration including potential overwrites. + * @param connectionLogger the connection logger to use for logging. * @return the Props object. */ public static Props props(final Connection connection, final ConnectivityStatusResolver connectivityStatusResolver, - final ConnectivityConfig connectivityConfig) { - return Props.create(SshTunnelActor.class, connection, connectivityStatusResolver, connectivityConfig); + final ConnectionLogger connectionLogger) { + return Props.create(SshTunnelActor.class, connection, connectivityStatusResolver, connectionLogger); } @Override