Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.logstash.log;

import com.google.common.hash.BloomFilter;
import com.google.common.hash.Funnels;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.core.Appender;
import org.apache.logging.log4j.core.Core;
import org.apache.logging.log4j.core.LogEvent;
import org.apache.logging.log4j.core.config.plugins.Plugin;
import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
import org.apache.logging.log4j.core.config.plugins.PluginFactory;
import org.apache.logging.log4j.core.filter.AbstractFilter;
import org.apache.logging.log4j.status.StatusLogger;

import java.nio.charset.StandardCharsets;

/**
* Log4j2 filter that suppresses repeated log lines using a Guava {@link BloomFilter}.
* <p>
* Deduplication key is {@code level + formattedMessage}. The first occurrence of a key
* yields {@link Result#NEUTRAL}; subsequent occurrences yield {@link Result#DENY}.
* </p>
Comment thread
andsel marked this conversation as resolved.
* <p>
* Example {@code log4j2.properties} wiring on a logger:
* </p>
* <pre>
* logger.periodic_flusher.name = org.logstash.execution.PeriodicFlush
* logger.periodic_flusher.level = DEBUG
* logger.periodic_flusher.filter.dedup.type = DeduplicationFilter
* </pre>
Comment thread
andsel marked this conversation as resolved.
*/
@Plugin(name = "DeduplicationFilter", category = Core.CATEGORY_NAME, elementType = Appender.ELEMENT_TYPE, printObject = true)
public final class DeduplicationFilter extends AbstractFilter {

static final double DEFAULT_FALSE_POSITIVE_PROBABILITY = 0.01;
private static final int DEFAULT_EXPECTED_INSERTIONS = 1_000_000;
private static final Logger STATUS_LOGGER = StatusLogger.getLogger();

private final BloomFilter<String> seenKeys;

@PluginFactory
public static DeduplicationFilter createFilter(
@PluginAttribute(value = "falsePositiveProbability", defaultDouble = DEFAULT_FALSE_POSITIVE_PROBABILITY)
final double falsePositiveProbability) {
return new DeduplicationFilter(resolveFalsePositiveProbability(falsePositiveProbability));
}

private DeduplicationFilter(final double falsePositiveProbability) {
seenKeys = BloomFilter.create(
Funnels.stringFunnel(StandardCharsets.UTF_8),
DEFAULT_EXPECTED_INSERTIONS,

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logstash processes tend to be up for a significant amount of time, and can log heavily, especially in debug mode. Should the bloom filter regularly reset itself to avoid false positives once > 1 million log entries have been written?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could associate a counter that when passes an high watermark value (say 90% of the DEFAULT_EXPECTED_INSERTIONS) re-instantiates the Bloom filter. However, this lead to duplicated lines.
We could think to index in the filter only the format strings (if possible) and not the formatted ones, that should help to reduce a lot the total number of items.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My main concern here would be to avoid falsely flagging log entries as duplicates where they are not - we cannot skip logging unique log entries.

With this approach, are we able to determine how often a log entry is flagged as duplicate? Blanket de-duplication without understanding how prevalent a log is could potentially mask problems

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we decide to go with the "format string" path, I wouldn't expect that we have that much of single format strings, but to be sure I should extract some data from the source code and dependencies that we have.

If that value is small, we don't either need the Bloom filter and use a map/set.

Maybe before moving in that direction we should be sure to be able to grab the "formatted string" from the log events, and how many of those strings Logstash can use.

Do you think that such kind of analysis would help?

falsePositiveProbability
);
}

static double resolveFalsePositiveProbability(final double falsePositiveProbability) {
if (falsePositiveProbability > 0.0 && falsePositiveProbability <= 0.5) {
return falsePositiveProbability;
}
STATUS_LOGGER.warn("falsePositiveProbability is expected to be in the range (0, 5%] but was {}, defaulting to {}",
falsePositiveProbability, DEFAULT_FALSE_POSITIVE_PROBABILITY);
return DEFAULT_FALSE_POSITIVE_PROBABILITY;
Comment thread
andsel marked this conversation as resolved.
}

@Override
public Result filter(final LogEvent event) {
final String key = dedupKey(event);
synchronized (seenKeys) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a concern for heavy multi-threaded logging?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, surely it impacts the multi-threaded logging. However, from Guava 23 the Bloom filter is thread safe and lock-free, uses internal CAS operations and atomic. The problem is that if a log line is not in the filter we want it to be logged once and inserted for future denial. If we could accept some duplication, presumably small but not quantifiable, we could completely remove the critical section (and the implicit lock) and leverage the idempotent property of put (if I add once or million for the same key, then obtain the same result).

if (!seenKeys.mightContain(key)) {
seenKeys.put(key);
return Result.NEUTRAL;
}
return Result.DENY;
}
}
Comment thread
andsel marked this conversation as resolved.

private static String dedupKey(final LogEvent event) {
return event.getLevel().name() + '\0' + event.getMessage().getFormattedMessage();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.logstash.log;

import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.core.Filter;
import org.apache.logging.log4j.core.LogEvent;
import org.apache.logging.log4j.core.impl.Log4jLogEvent;
import org.apache.logging.log4j.message.SimpleMessage;
import org.apache.logging.log4j.status.StatusData;
import org.apache.logging.log4j.status.StatusListener;
import org.apache.logging.log4j.status.StatusLogger;
import org.junit.Test;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import static org.hamcrest.MatcherAssert.assertThat;

import static org.hamcrest.Matchers.containsString;
import static org.junit.Assert.assertEquals;

public class DeduplicationFilterTest {

@Test
public void givenAStringAtInfoLevelWhenAppearsFirstTimeThenIsForwarded() {
final DeduplicationFilter filter = DeduplicationFilter.createFilter(
DeduplicationFilter.DEFAULT_FALSE_POSITIVE_PROBABILITY);

final Filter.Result result = filter.filter(logEvent(Level.INFO, "duplicate me"));

assertEquals(Filter.Result.NEUTRAL, result);
}

@Test
public void givenAStringAtWarnLevelWhenAppearsMultipleTimesThenIsDenied() {
final DeduplicationFilter filter = DeduplicationFilter.createFilter(
DeduplicationFilter.DEFAULT_FALSE_POSITIVE_PROBABILITY);

filter.filter(logEvent(Level.WARN, "same line"));
final Filter.Result result = filter.filter(logEvent(Level.WARN, "same line"));

assertEquals(Filter.Result.DENY, result);
}

@Test
public void givenAStringWhenAppearsAtDifferentLevelThenIsForwarded() {
final DeduplicationFilter filter = DeduplicationFilter.createFilter(
DeduplicationFilter.DEFAULT_FALSE_POSITIVE_PROBABILITY);

filter.filter(logEvent(Level.INFO, "shared text"));
final Filter.Result result = filter.filter(logEvent(Level.ERROR, "shared text"));

assertEquals(Filter.Result.NEUTRAL, result);
}

@Test
public void invalidFalsePositiveProbabilityFallsBackToDefault() {
assertEquals(
DeduplicationFilter.DEFAULT_FALSE_POSITIVE_PROBABILITY,
DeduplicationFilter.resolveFalsePositiveProbability(0.0),
0.0
);
assertEquals(
DeduplicationFilter.DEFAULT_FALSE_POSITIVE_PROBABILITY,
DeduplicationFilter.resolveFalsePositiveProbability(1.0),
0.0
);
assertEquals(
DeduplicationFilter.DEFAULT_FALSE_POSITIVE_PROBABILITY,
DeduplicationFilter.resolveFalsePositiveProbability(-0.5),
0.0
);
}

@Test
public void validFalsePositiveProbabilityIsPreserved() {
assertEquals(0.001, DeduplicationFilter.resolveFalsePositiveProbability(0.001), 0.0);
}

private static LogEvent logEvent(final Level level, final String message) {
return Log4jLogEvent.newBuilder()
.setLevel(level)
.setMessage(new SimpleMessage(message))
.build();
}

private static class SpyListener implements StatusListener {

private final List<StatusData> spiedMessages = new ArrayList<>();

@Override
public void log(StatusData data) {
spiedMessages.add(data);
}

@Override
public Level getStatusLevel() {
return Level.WARN;
}

@Override
public void close() throws IOException {

}
}

@Test
public void givenFalsePositiveProbabilitySetToValueOutsideExpectedRangeThenOverrideToDefaultAndLog() throws IOException {
// setup
try (SpyListener loggerSpy = new SpyListener()) {
StatusLogger.getLogger().registerListener(loggerSpy);

// Exercise
DeduplicationFilter.resolveFalsePositiveProbability(1.0);

// Verify
final StatusData data = loggerSpy.spiedMessages.get(0);
assertEquals(Level.WARN, data.getLevel());
assertThat(data.getMessage().getFormattedMessage(), containsString("falsePositiveProbability is expected to be in the range (0, 5%] but was"));

// teardown
StatusLogger.getLogger().removeListener(loggerSpy);
}
}
}
Loading