Skip to content

Commit

Permalink
Issue #106: Use appropriate logger for handling AddConnectionLogEntry.
Browse files Browse the repository at this point in the history
Using always the logger for category 'CONNECTION' may discard log entries if the log entry has a different category.

Signed-off-by: Juergen Fickel <juergen.fickel@bosch.io>
  • Loading branch information
Juergen Fickel committed Dec 16, 2021
1 parent 73a333b commit 988962c
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -153,7 +154,7 @@ private List<LogEntry> aggregateLogsForActiveConnection(final Logger logger,
final ConnectionId connectionId) {

logger.trace("Logging is enabled, will aggregate logs for connection <{}>", connectionId);
final List<LogEntry> allLogs = streamLoggers(connectionId)
final List<LogEntry> allLogs = loggersForConnectionId(connectionId)
.map(ConnectionLogger::getLogs)
.flatMap(Collection::stream)
.sorted(Comparator.comparing(LogEntry::getTimestamp).reversed())
Expand Down Expand Up @@ -193,11 +194,7 @@ private List<LogEntry> restrictMaxLogEntriesLength(final List<LogEntry> original
* @return true if logging is currently enabled for the connection.
*/
static boolean isActiveForConnection(final ConnectionId connectionId) {
final boolean muted = streamLoggers(connectionId)
.filter(MuteableConnectionLogger.class::isInstance)
.anyMatch(logger -> ((MuteableConnectionLogger) logger).isMuted());

return !muted;
return muteableLoggersForConnectionId(connectionId).anyMatch(Predicate.not(MuteableConnectionLogger::isMuted));
}

/**
Expand Down Expand Up @@ -230,9 +227,7 @@ public static void muteForConnection(final ConnectionId connectionId) {
.info("Muting loggers for connection <{}>.", connectionId);

try {
streamLoggers(connectionId)
.filter(MuteableConnectionLogger.class::isInstance)
.forEach(logger -> ((MuteableConnectionLogger) logger).mute());
muteableLoggersForConnectionId(connectionId).forEach(MuteableConnectionLogger::mute);
stopMetadata(connectionId);
resetForConnectionId(connectionId);
} catch (final Exception e) {
Expand All @@ -250,22 +245,43 @@ public static void muteForConnection(final ConnectionId connectionId) {
public void unmuteForConnection(final ConnectionId connectionId) {
LOGGER.withMdcEntry(MDC_CONNECTION_ID, connectionId)
.info("Unmuting loggers for connection <{}>.", connectionId);
tryToUnmuteLoggersForConnection(connectionId);
startMetadata(connectionId);
}

private static void tryToUnmuteLoggersForConnection(final ConnectionId connectionId) {
try {
streamLoggers(connectionId)
.filter(MuteableConnectionLogger.class::isInstance)
.forEach(logger -> ((MuteableConnectionLogger) logger).unmute());
startMetadata(connectionId);
unmuteLoggersForConnection(connectionId);
} catch (final Exception e) {
LOGGER.withMdcEntry(MDC_CONNECTION_ID, connectionId)
.error("Failed unmuting loggers for connection <{}>. Reason: <{}>.", connectionId, e);
}
}

private static Stream<ConnectionLogger> streamLoggers(final ConnectionId connectionId) {
return LOGGERS.entrySet()
.stream()
.filter(e -> e.getKey().connectionId.equals(connectionId))
private static void unmuteLoggersForConnection(final ConnectionId connectionId) {
final var amountUnmutedLoggers = muteableLoggersForConnectionId(connectionId)
.mapToInt(logger -> {
logger.unmute();
return 1;
})
.sum();
LOGGER.withMdcEntry(MDC_CONNECTION_ID, connectionId)
.info("Unmuted <{}> loggers for connection <{}>.", amountUnmutedLoggers, connectionId);
}

private static Stream<MuteableConnectionLogger> muteableLoggersForConnectionId(final ConnectionId connectionId) {
return loggersForConnectionId(connectionId)
.filter(MuteableConnectionLogger.class::isInstance)
.map(MuteableConnectionLogger.class::cast);
}

private static Stream<ConnectionLogger> loggersForConnectionId(final ConnectionId connectionId) {
final var connectionLoggerEntries = LOGGERS.entrySet();
return connectionLoggerEntries.stream()
.filter(e -> {
final var connectionLoggerMapKey = e.getKey();
return connectionId.equals(connectionLoggerMapKey.connectionId);
})
.map(Map.Entry::getValue);
}

Expand Down Expand Up @@ -347,7 +363,7 @@ public void resetForConnection(final Connection connection) {
}

private static void resetForConnectionId(final ConnectionId connectionId) {
streamLoggers(connectionId)
loggersForConnectionId(connectionId)
.forEach(ConnectionLogger::clear);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.eclipse.ditto.connectivity.model.ConnectionMetrics;
import org.eclipse.ditto.connectivity.model.ConnectivityModelFactory;
import org.eclipse.ditto.connectivity.model.ConnectivityStatus;
import org.eclipse.ditto.connectivity.model.LogEntry;
import org.eclipse.ditto.connectivity.model.signals.commands.ConnectivityCommand;
import org.eclipse.ditto.connectivity.model.signals.commands.ConnectivityCommandInterceptor;
import org.eclipse.ditto.connectivity.model.signals.commands.exceptions.ConnectionFailedException;
Expand Down Expand Up @@ -170,6 +171,7 @@ public final class ConnectionPersistenceActor
private final ClientActorPropsFactory propsFactory;
private final ActorRef pubSubMediator;
private final boolean allClientActorsOnOneNode;
private final ConnectionLoggerRegistry connectionLoggerRegistry;
private final ConnectionLogger connectionLogger;
private final Duration clientActorAskTimeout;
private final Duration checkLoggingActiveInterval;
Expand Down Expand Up @@ -209,9 +211,8 @@ public final class ConnectionPersistenceActor
connectionPriorityProvider = connectionPriorityProviderFactory.newProvider(self(), log);
clientActorAskTimeout = connectionConfig.getClientActorAskTimeout();
final MonitoringConfig monitoringConfig = connectivityConfig.getMonitoringConfig();
final ConnectionLoggerRegistry loggerRegistry =
ConnectionLoggerRegistry.fromConfig(monitoringConfig.logger());
connectionLogger = loggerRegistry.forConnection(connectionId);
connectionLoggerRegistry = ConnectionLoggerRegistry.fromConfig(monitoringConfig.logger());
connectionLogger = connectionLoggerRegistry.forConnection(connectionId);

loggingEnabledDuration = monitoringConfig.logger().logDuration();
checkLoggingActiveInterval = monitoringConfig.logger().loggingActiveCheckInterval();
Expand Down Expand Up @@ -626,7 +627,15 @@ protected Receive matchAnyAfterInitialization() {
private void handleAddConnectionLogEntry(final AddConnectionLogEntry addConnectionLogEntry) {
final var logEntry = addConnectionLogEntry.getLogEntry();
log.withCorrelationId(logEntry.getCorrelationId()).debug("Handling <{}>.", addConnectionLogEntry);
connectionLogger.logEntry(logEntry);
final var logger = getAppropriateLogger(addConnectionLogEntry.getEntityId(), logEntry);
logger.logEntry(logEntry);
}

private ConnectionLogger getAppropriateLogger(final ConnectionId connectionId, final LogEntry logEntry) {
return connectionLoggerRegistry.getLogger(connectionId,
logEntry.getLogCategory(),
logEntry.getLogType(),
logEntry.getAddress().orElse(null));
}

@Override
Expand Down

0 comments on commit 988962c

Please sign in to comment.