Skip to content

Commit

Permalink
flush logs of FluentPublishingConnectionLogger by
Browse files Browse the repository at this point in the history
* making ConnectionLogger extend Closeable
* invoking close() which flushed the logs on actor stop
* invoking close() also when re-initializing the logger

Signed-off-by: Thomas Jaeckle <thomas.jaeckle@bosch.io>
  • Loading branch information
thjaeckle committed Oct 29, 2021
1 parent fcffe67 commit 4d80591
Show file tree
Hide file tree
Showing 9 changed files with 67 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ protected void init() {

if (connection.getSshTunnel().map(SshTunnel::isEnabled).orElse(false)) {
tunnelActor = startChildActor(SshTunnelActor.ACTOR_NAME, SshTunnelActor.props(connection,
connectivityStatusResolver, connectivityConfig));
connectivityStatusResolver, connectionLogger));
} else {
tunnelActor = null;
}
Expand Down Expand Up @@ -358,6 +358,12 @@ public void postStop() {
clientConnectingGauge.reset();
stopChildActor(tunnelActor);
logger.debug("Stopped client with id - <{}>", getDefaultClientId());
try {
connectionLogger.close();
} catch (final IOException e) {
logger.warning("Exception during closing connection logger <{}>: {}", e.getClass().getSimpleName(),
e.getMessage());
}
try {
super.postStop();
} catch (final Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
*/
package org.eclipse.ditto.connectivity.service.messaging.monitoring.logs;

import java.io.IOException;
import java.util.Collection;
import java.util.Objects;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -105,6 +106,13 @@ public boolean isMuted() {
.allMatch(MuteableConnectionLogger::isMuted);
}

@Override
public void close() throws IOException {
for (final var connectionLogger : connectionLoggers) {
connectionLogger.close();
}
}

@Override
public boolean equals(final Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,22 @@

package org.eclipse.ditto.connectivity.service.messaging.monitoring.logs;

import java.io.Closeable;
import java.util.Collection;

import javax.annotation.Nullable;

import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.connectivity.model.ConnectionId;
import org.eclipse.ditto.connectivity.model.LogEntry;
import org.eclipse.ditto.connectivity.service.config.MonitoringLoggerConfig;
import org.eclipse.ditto.connectivity.service.messaging.monitoring.ConnectionMonitor;
import org.eclipse.ditto.base.model.signals.Signal;

/**
* Logger for connections that provides log messages for end users.
*/
public interface ConnectionLogger {
public interface ConnectionLogger extends Closeable {

/**
* Gets the connection logger for the given connection ID.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull;

import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
Expand Down Expand Up @@ -300,8 +301,17 @@ private void invalidateLoggers(final ConnectionId connectionId) {
final Set<MapKey> mapsKeysToDelete = LOGGERS.keySet().stream()
.filter(mapKey -> mapKey.connectionId.equals(connectionId))
.collect(Collectors.toSet());
// TODO TJ flush logs before removing from loggers (next task)
mapsKeysToDelete.forEach(LOGGERS::remove);

mapsKeysToDelete.forEach(loggerKey -> {
// flush logs before removing from loggers:
try {
LOGGERS.get(loggerKey).close();
} catch (final IOException e) {
LOGGER.warn("Exception during closing logger <{}>: <{}>: {}", loggerKey, e.getClass().getSimpleName(),
e.getMessage());
}
LOGGERS.remove(loggerKey);
});
}

private void initLogger(final ConnectionId connectionId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package org.eclipse.ditto.connectivity.service.messaging.monitoring.logs;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Objects;
Expand Down Expand Up @@ -141,6 +142,11 @@ public void clear() {
wrapInExceptionHandling(() -> delegate.clear());
}

@Override
public void close() throws IOException {
delegate.close();
}

@Override
public Collection<LogEntry> getLogs() {
if (active) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package org.eclipse.ditto.connectivity.service.messaging.monitoring.logs;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Objects;
Expand Down Expand Up @@ -104,6 +105,11 @@ public void clear() {
failureLogs.clear();
}

@Override
public void close() throws IOException {
clear();
}

@Override
public Collection<LogEntry> getLogs() {
final Collection<LogEntry> logs = new ArrayList<>(successLogs.size() + failureLogs.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
*/
package org.eclipse.ditto.connectivity.service.messaging.monitoring.logs;

import java.io.IOException;
import java.text.MessageFormat;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -103,6 +104,11 @@ public void clear() {
logger.trace("Not clearing logs since logger is exceptional.");
}

@Override
public void close() throws IOException {
clear();
}

@Override
public Collection<LogEntry> getLogs() {
logger.trace("Returning empty logs since logger is exceptional.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,18 @@ public void clear() {
// no-op
}

@Override
public void close() throws IOException {
LOGGER.info("Flushing and closing Fluency forwarder...");
// fluencyForwarder.close also flushes:
fluencyForwarder.close();
try {
fluencyForwarder.waitUntilAllBufferFlushed(10); // TODO TJ do we want this? if yes, make configurable
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
}
}

@Override
public void success(final ConnectionMonitor.InfoProvider infoProvider, final String message,
final Object... messageArguments) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,10 @@
import org.eclipse.ditto.connectivity.model.SshTunnel;
import org.eclipse.ditto.connectivity.model.UserPasswordCredentials;
import org.eclipse.ditto.connectivity.model.signals.commands.exceptions.ConnectionFailedException;
import org.eclipse.ditto.connectivity.service.config.ConnectivityConfig;
import org.eclipse.ditto.connectivity.service.config.MonitoringConfig;
import org.eclipse.ditto.connectivity.service.messaging.ConnectivityStatusResolver;
import org.eclipse.ditto.connectivity.service.messaging.internal.ConnectionFailure;
import org.eclipse.ditto.connectivity.service.messaging.internal.RetrieveAddressStatus;
import org.eclipse.ditto.connectivity.service.messaging.monitoring.logs.ConnectionLogger;
import org.eclipse.ditto.connectivity.service.messaging.monitoring.logs.ConnectionLoggerRegistry;
import org.eclipse.ditto.connectivity.service.util.ConnectivityMdcEntryKey;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.config.InstanceIdentifierSupplier;
Expand Down Expand Up @@ -90,19 +87,16 @@ public final class SshTunnelActor extends AbstractActorWithTimers implements Cre
@Nullable Throwable error = null;

@SuppressWarnings("unused")
private SshTunnelActor(final Connection connection, final ConnectivityStatusResolver connectivityStatusResolver,
final ConnectivityConfig connectivityConfig) {
private SshTunnelActor(final Connection connection,
final ConnectivityStatusResolver connectivityStatusResolver,
final ConnectionLogger connectionLogger) {
logger = DittoLoggerFactory.getThreadSafeDittoLoggingAdapter(this)
.withMdcEntry(ConnectivityMdcEntryKey.CONNECTION_ID, connection.getId());
this.connection = connection;
this.connectivityStatusResolver = connectivityStatusResolver;
this.connectionLogger = connectionLogger;
sshClient = SshClientProvider.get(getContext().getSystem()).getSshClient();

final MonitoringConfig monitoringConfig = connectivityConfig.getMonitoringConfig();
final ConnectionLoggerRegistry connectionLoggerRegistry =
ConnectionLoggerRegistry.fromConfig(monitoringConfig.logger());
connectionLogger = connectionLoggerRegistry.forConnection(connection.getId());

final SshTunnel sshTunnel = this.connection.getSshTunnel()
.orElseThrow(() -> ConnectionFailedException
.newBuilder(connection.getId())
Expand All @@ -127,13 +121,13 @@ private SshTunnelActor(final Connection connection, final ConnectivityStatusReso
* @param connection the connection to create the SSH tunnel for.
* @param connectivityStatusResolver connectivity status resolver to resolve the SSH tunnel status based on
* exceptions.
* @param connectivityConfig the connectivity configuration including potential overwrites.
* @param connectionLogger the connection logger to use for logging.
* @return the Props object.
*/
public static Props props(final Connection connection,
final ConnectivityStatusResolver connectivityStatusResolver,
final ConnectivityConfig connectivityConfig) {
return Props.create(SshTunnelActor.class, connection, connectivityStatusResolver, connectivityConfig);
final ConnectionLogger connectionLogger) {
return Props.create(SshTunnelActor.class, connection, connectivityStatusResolver, connectionLogger);
}

@Override
Expand Down

0 comments on commit 4d80591

Please sign in to comment.