Skip to content

Commit

Permalink
make logLevels to publish configurable via Ditto config
Browse files Browse the repository at this point in the history
* also make "logHeadersAndPayload" configurable
* added new FluentPublishingConnectionLoggerContext holding all static information for building FluentPublishingConnectionLogger

Signed-off-by: Thomas Jaeckle <thomas.jaeckle@bosch.io>
  • Loading branch information
thjaeckle committed Oct 27, 2021
1 parent a0603e3 commit fcffe67
Show file tree
Hide file tree
Showing 7 changed files with 268 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,14 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.connectivity.model.LogLevel;
import org.eclipse.ditto.internal.utils.config.ConfigWithFallback;

import com.typesafe.config.Config;
Expand All @@ -32,13 +36,20 @@ public final class DefaultLoggerPublisherConfig implements LoggerPublisherConfig
private static final String CONFIG_PATH = "publisher";

private final boolean enabled;
private final Set<LogLevel> logLevels;
private final boolean logHeadersAndPayload;
@Nullable private final String logTag;
private final Map<String, Object> additionalLogContext;
private final FluencyLoggerPublisherConfig fluencyLoggerPublisherConfig;

private DefaultLoggerPublisherConfig(final ConfigWithFallback config) {
enabled = config.getBoolean(ConfigValue.ENABLED.getConfigPath());
logTag = config.getStringOrNull(ConfigValue.LOG_TAG);
logLevels = Stream.of(config.getString(ConfigValue.LOG_LEVELS.getConfigPath()).split(","))
.map(LogLevel::forLevel)
.flatMap(Optional::stream)
.collect(Collectors.toSet());
logHeadersAndPayload = config.getBoolean(ConfigValue.LOG_HEADERS_AND_PAYLOAD.getConfigPath());
additionalLogContext = config.getObject(ConfigValue.ADDITIONAL_LOG_CONTEXT.getConfigPath()).unwrapped();
fluencyLoggerPublisherConfig = DefaultFluencyLoggerPublisherConfig.of(config);
}
Expand All @@ -60,6 +71,16 @@ public boolean isEnabled() {
return enabled;
}

@Override
public Set<LogLevel> getLogLevels() {
return logLevels;
}

@Override
public boolean isLogHeadersAndPayload() {
return logHeadersAndPayload;
}

@Override
public Optional<String> getLogTag() {
return Optional.ofNullable(logTag);
Expand All @@ -85,21 +106,26 @@ public boolean equals(@Nullable final Object o) {
}
final DefaultLoggerPublisherConfig that = (DefaultLoggerPublisherConfig) o;
return enabled == that.enabled &&
Objects.equals(logLevels, that.logLevels) &&
logHeadersAndPayload == that.logHeadersAndPayload &&
Objects.equals(logTag, that.logTag) &&
Objects.equals(additionalLogContext, that.additionalLogContext) &&
Objects.equals(fluencyLoggerPublisherConfig, that.fluencyLoggerPublisherConfig);
}

@Override
public int hashCode() {
return Objects.hash(enabled, logTag, additionalLogContext, fluencyLoggerPublisherConfig);
return Objects.hash(enabled, logLevels, logTag, logHeadersAndPayload, additionalLogContext,
fluencyLoggerPublisherConfig);
}

@Override
public String toString() {
return getClass().getSimpleName() + " [" +
"enabled=" + enabled +
", logLevels=" + logLevels +
", logTag=" + logTag +
", logHeadersAndPayload=" + logHeadersAndPayload +
", additionalLogContext=" + additionalLogContext +
", fluencyLoggerPublisherConfig=" + fluencyLoggerPublisherConfig +
"]";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@

import java.util.Map;
import java.util.Optional;
import java.util.Set;

import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.connectivity.model.LogLevel;
import org.eclipse.ditto.internal.utils.config.KnownConfigValue;

/**
Expand All @@ -29,10 +31,24 @@ public interface LoggerPublisherConfig {
/**
* Indicates whether publishing connection logs to a fluentd/fluentbit endpoint should be enabled.
*
* @return {@code true} if connection logs should be published, {@code false} else.
* @return {@code true} if connection logs should be published.
*/
boolean isEnabled();

/**
* Returns the log levels to include for the publisher logs.
*
* @return the log levels to include for the publisher logs.
*/
Set<LogLevel> getLogLevels();

/**
* Indicates whether the published log entries should contain headers and payloads if those were available.
*
* @return {@code true} if published connection logs should contain headers and payloads.
*/
boolean isLogHeadersAndPayload();

/**
* Returns a specific log-tag to use for the published logs. If empty, a default log-tag will be used:
* {@code connection:<connection-id>}.
Expand Down Expand Up @@ -66,6 +82,16 @@ enum ConfigValue implements KnownConfigValue {
*/
ENABLED("enabled", false),

/**
* The log levels to include for the publisher logs.
*/
LOG_LEVELS("logLevels", LogLevel.SUCCESS.getLevel() + "," + LogLevel.FAILURE.getLevel()),

/**
* Whether to log headers and payload for publisher logs.
*/
LOG_HEADERS_AND_PAYLOAD("logHeadersAndPayload", false),

/**
* The optional specific log-tag to use for published logs.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@
import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull;

import java.text.MessageFormat;
import java.util.Collection;
import java.util.Map;

import javax.annotation.Nullable;

import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition;
import org.eclipse.ditto.connectivity.model.ConnectionId;
import org.eclipse.ditto.connectivity.model.LogCategory;
import org.eclipse.ditto.connectivity.model.LogLevel;
import org.eclipse.ditto.connectivity.model.LogType;
import org.eclipse.ditto.internal.utils.config.InstanceIdentifierSupplier;
import org.komamitsu.fluency.Fluency;
Expand Down Expand Up @@ -97,6 +99,28 @@ static MuteableConnectionLogger newMuteableLogger(final ConnectionId connectionI
return new DefaultMuteableConnectionLogger(connectionId, checkNotNull(delegate));
}

/**
* Creates a new {@link FluentPublishingConnectionLoggerContext} used by
* {@link #newPublishingLogger(ConnectionId, LogCategory, LogType, String, FluentPublishingConnectionLoggerContext)}
* as connection static context information.
*
* @param fluencyForwarder the fluency forwarder for the logger.
* @param logLevels the log levels which should be included when publishing logs.
* @param logHeadersAndPayload whether to also include headers and payload information in published logs.
* @param logTag an optional log-tag to use and overwrite the default one: {@code connection:<connection-id>}
* @param additionalLogContext additional log context to include in each logged entry.
* @return a new fluent publishing connection logger context.
*/
static FluentPublishingConnectionLoggerContext newPublishingLoggerContext(final Fluency fluencyForwarder,
final Collection<LogLevel> logLevels,
final boolean logHeadersAndPayload,
@Nullable final CharSequence logTag,
final Map<String, Object> additionalLogContext) {

return new FluentPublishingConnectionLoggerContext(fluencyForwarder, logLevels, logHeadersAndPayload,
logTag, additionalLogContext);
}

/**
* Creates a new {@link FluentPublishingConnectionLogger} that is used to forward all connection logs to a fluentd
* or fluentbit endpoint.
Expand All @@ -105,24 +129,23 @@ static MuteableConnectionLogger newMuteableLogger(final ConnectionId connectionI
* @param logCategory the log category for which the logger is created.
* @param logType the log type for which the logger is created.
* @param address the address for which the logger is created.
* @param fluencyForwarder the fluency forwarder for the logger.
* @param logTag an optional log-tag to use and overwrite the default one: {@code connection:<connection-id>}
* @param additionalLogContext additional log context to include in each logged entry.
* @param context the connection static context information for creating the publishing connection logger.
* @return a new fluent publishing connection logger instance.
*/
static FluentPublishingConnectionLogger newPublishingLogger(final ConnectionId connectionId,
final LogCategory logCategory, final LogType logType, @Nullable final String address,
final Fluency fluencyForwarder, @Nullable final CharSequence logTag,
final Map<String, Object> additionalLogContext) {
final FluentPublishingConnectionLoggerContext context) {

final FluentPublishingConnectionLogger.Builder builder = FluentPublishingConnectionLogger
.newBuilder(connectionId, logCategory, logType, fluencyForwarder)
.newBuilder(connectionId, logCategory, logType, context.getFluencyForwarder())
.withAddress(address)
.withAdditionalLogContext(additionalLogContext)
.withAdditionalLogContext(context.getAdditionalLogContext())
.withLogLevels(context.getLogLevels())
.withInstanceIdentifier(InstanceIdentifierSupplier.getInstance().get());
if (null != logTag) {
builder.withFluentTag(logTag);
if (context.isLogHeadersAndPayload()) {
builder.logHeadersAndPayload();
}
context.getLogTag().ifPresent(builder::withFluentTag);
return builder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,7 @@ public final class ConnectionLoggerRegistry implements ConnectionMonitorRegistry
private final int failureCapacity;
private final TemporalAmount loggingDuration;
private final long maximumLogSizeInByte;
@Nullable private final Fluency fluencyForwarder;
@Nullable private final String logTag;
private final Map<String, Object> additionalLogContext;
@Nullable private final FluentPublishingConnectionLoggerContext fluentPublishingConnectionLoggerContext;

private ConnectionLoggerRegistry(final int successCapacity,
final int failureCapacity,
Expand All @@ -91,13 +89,15 @@ private ConnectionLoggerRegistry(final int successCapacity,

if (loggerPublisherConfig.isEnabled()) {
final FluencyLoggerPublisherConfig fluencyConfig = loggerPublisherConfig.getFluencyLoggerPublisherConfig();
fluencyForwarder = fluencyConfig.buildFluencyLoggerPublisher();
logTag = loggerPublisherConfig.getLogTag().orElse(null);
additionalLogContext = Map.copyOf(loggerPublisherConfig.getAdditionalLogContext());
final Fluency fluency = fluencyConfig.buildFluencyLoggerPublisher();
fluentPublishingConnectionLoggerContext = ConnectionLoggerFactory.newPublishingLoggerContext(fluency,
loggerPublisherConfig.getLogLevels(),
loggerPublisherConfig.isLogHeadersAndPayload(),
loggerPublisherConfig.getLogTag().orElse(null),
loggerPublisherConfig.getAdditionalLogContext()
);
} else {
fluencyForwarder = null;
logTag = null;
additionalLogContext = Map.of();
fluentPublishingConnectionLoggerContext = null;
}
}

Expand Down Expand Up @@ -480,9 +480,9 @@ private ConnectionLogger newLogger(final ConnectionId connectionId, final LogCat
final MuteableConnectionLogger muteableLogger = ConnectionLoggerFactory.newMuteableLogger(
connectionId, evictingLogger);

if (null != fluencyForwarder) {
if (null != fluentPublishingConnectionLoggerContext) {
final FluentPublishingConnectionLogger publishingLogger = ConnectionLoggerFactory.newPublishingLogger(
connectionId, logCategory, logType, address, fluencyForwarder, logTag, additionalLogContext);
connectionId, logCategory, logType, address, fluentPublishingConnectionLoggerContext);
return new CompoundConnectionLogger(List.of(muteableLogger, publishingLogger));
} else {
return muteableLogger;
Expand All @@ -502,15 +502,13 @@ public boolean equals(@Nullable final Object o) {
failureCapacity == that.failureCapacity &&
maximumLogSizeInByte == that.maximumLogSizeInByte &&
Objects.equals(loggingDuration, that.loggingDuration) &&
Objects.equals(fluencyForwarder, that.fluencyForwarder) &&
Objects.equals(logTag, that.logTag) &&
Objects.equals(additionalLogContext, that.additionalLogContext);
Objects.equals(fluentPublishingConnectionLoggerContext, that.fluentPublishingConnectionLoggerContext);
}

@Override
public int hashCode() {
return Objects.hash(successCapacity, failureCapacity, loggingDuration, maximumLogSizeInByte, fluencyForwarder,
logTag, additionalLogContext);
return Objects.hash(successCapacity, failureCapacity, loggingDuration, maximumLogSizeInByte,
fluentPublishingConnectionLoggerContext);
}

@Override
Expand All @@ -520,9 +518,7 @@ public String toString() {
", failureCapacity=" + failureCapacity +
", loggingDuration=" + loggingDuration +
", maximumLogSizeInByte=" + maximumLogSizeInByte +
", fluencyForwarder=" + fluencyForwarder +
", logTag=" + logTag +
", additionalLogContext=" + additionalLogContext +
", fluentPublishingConnectionLoggerContext=" + fluentPublishingConnectionLoggerContext +
"]";
}

Expand Down
Loading

0 comments on commit fcffe67

Please sign in to comment.