Skip to content

Commit

Permalink
Throttling logging appender (#2384)
Browse files Browse the repository at this point in the history
  • Loading branch information
Olivier Chédru authored and nickbabcock committed Jun 21, 2018
1 parent 2ffb989 commit c448456
Show file tree
Hide file tree
Showing 8 changed files with 330 additions and 9 deletions.
Expand Up @@ -10,24 +10,23 @@
import ch.qos.logback.core.pattern.PatternLayoutBase; import ch.qos.logback.core.pattern.PatternLayoutBase;
import ch.qos.logback.core.spi.DeferredProcessingAware; import ch.qos.logback.core.spi.DeferredProcessingAware;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings; import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import io.dropwizard.jackson.Jackson;
import io.dropwizard.logging.async.AsyncAppenderFactory; import io.dropwizard.logging.async.AsyncAppenderFactory;
import io.dropwizard.logging.filter.FilterFactory; import io.dropwizard.logging.filter.FilterFactory;
import io.dropwizard.logging.layout.DiscoverableLayoutFactory; import io.dropwizard.logging.layout.DiscoverableLayoutFactory;
import io.dropwizard.logging.layout.LayoutFactory; import io.dropwizard.logging.layout.LayoutFactory;
import io.dropwizard.util.Duration;
import io.dropwizard.validation.MaxDuration;
import io.dropwizard.validation.MinDuration;


import javax.annotation.Nullable; import javax.annotation.Nullable;
import javax.validation.constraints.Max; import javax.validation.constraints.Max;
import javax.validation.constraints.Min; import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull; import javax.validation.constraints.NotNull;
import java.util.List; import java.util.List;
import java.util.TimeZone; import java.util.TimeZone;
import java.util.concurrent.TimeUnit;


import static com.google.common.base.Strings.nullToEmpty; import static com.google.common.base.Strings.nullToEmpty;


Expand Down Expand Up @@ -82,6 +81,17 @@
* </td> * </td>
* </tr> * </tr>
* <tr> * <tr>
* <td>{@code messageRate}</td>
* <td>
* Maximum message rate: average duration between messages. Extra messages are discarded.
* This setting avoids flooding a paid logging service by accident.
* For example, a duration of 100ms allows for a maximum of 10 messages per second and 30s would mean
* 1 message every 30 seconds.
* The maximum acceptable duration is 1 minute.
* By default, this duration is not set and this feature is disabled.
* </td>
* </tr>
* <tr>
* <td>{@code filterFactories}</td> * <td>{@code filterFactories}</td>
* <td>(none)</td> * <td>(none)</td>
* <td> * <td>
Expand Down Expand Up @@ -111,6 +121,11 @@ public abstract class AbstractAppenderFactory<E extends DeferredProcessingAware>


private int discardingThreshold = -1; private int discardingThreshold = -1;


@Nullable
@MinDuration(value = 0, unit = TimeUnit.SECONDS, inclusive = false)
@MaxDuration(value = 1, unit = TimeUnit.MINUTES)
private Duration messageRate;

private boolean includeCallerData = false; private boolean includeCallerData = false;


private ImmutableList<FilterFactory<E>> filterFactories = ImmutableList.of(); private ImmutableList<FilterFactory<E>> filterFactories = ImmutableList.of();
Expand All @@ -137,6 +152,17 @@ public void setDiscardingThreshold(int discardingThreshold) {
this.discardingThreshold = discardingThreshold; this.discardingThreshold = discardingThreshold;
} }


@JsonProperty
@Nullable
public Duration getMessageRate() {
return messageRate;
}

@JsonProperty
public void setMessageRate(Duration messageRate) {
this.messageRate = messageRate;
}

@JsonProperty @JsonProperty
public String getThreshold() { public String getThreshold() {
return threshold.toString(); return threshold.toString();
Expand Down Expand Up @@ -224,7 +250,11 @@ protected Appender<E> wrapAsync(Appender<E> appender, AsyncAppenderFactory<E> as
asyncAppender.addAppender(appender); asyncAppender.addAppender(appender);
asyncAppender.setNeverBlock(neverBlock); asyncAppender.setNeverBlock(neverBlock);
asyncAppender.start(); asyncAppender.start();
return asyncAppender; if (messageRate == null) {
return asyncAppender;
} else {
return new ThrottlingAppenderWrapper(asyncAppender, messageRate);
}
} }


@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
Expand Down
@@ -0,0 +1,9 @@
package io.dropwizard.logging;

import ch.qos.logback.core.AsyncAppenderBase;
import ch.qos.logback.core.spi.DeferredProcessingAware;

public interface AsyncAppenderBaseProxy<E extends DeferredProcessingAware> {

AsyncAppenderBase<E> getAppender();
}
Expand Up @@ -8,6 +8,7 @@
import ch.qos.logback.classic.jul.LevelChangePropagator; import ch.qos.logback.classic.jul.LevelChangePropagator;
import ch.qos.logback.classic.spi.ILoggingEvent; import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.Appender; import ch.qos.logback.core.Appender;
import ch.qos.logback.core.AsyncAppenderBase;
import ch.qos.logback.core.ConsoleAppender; import ch.qos.logback.core.ConsoleAppender;
import ch.qos.logback.core.encoder.LayoutWrappingEncoder; import ch.qos.logback.core.encoder.LayoutWrappingEncoder;
import ch.qos.logback.core.util.StatusPrinter; import ch.qos.logback.core.util.StatusPrinter;
Expand Down Expand Up @@ -182,8 +183,10 @@ public void stop() {
final Logger logger = loggerContext.getLogger(org.slf4j.Logger.ROOT_LOGGER_NAME); final Logger logger = loggerContext.getLogger(org.slf4j.Logger.ROOT_LOGGER_NAME);
final ArrayList<Appender<ILoggingEvent>> appenders = Lists.newArrayList(logger.iteratorForAppenders()); final ArrayList<Appender<ILoggingEvent>> appenders = Lists.newArrayList(logger.iteratorForAppenders());
for (Appender<ILoggingEvent> appender : appenders) { for (Appender<ILoggingEvent> appender : appenders) {
if (appender instanceof AsyncAppender) { if (appender instanceof AsyncAppenderBase) {
flushAppender((AsyncAppender) appender); flushAppender((AsyncAppenderBase) appender);
} else if (appender instanceof AsyncAppenderBaseProxy) {
flushAppender(((AsyncAppenderBaseProxy) appender).getAppender());
} }
} }
} catch (InterruptedException ignored) { } catch (InterruptedException ignored) {
Expand Down Expand Up @@ -221,7 +224,7 @@ public void reset() {
} }
} }


private void flushAppender(AsyncAppender appender) throws InterruptedException { private void flushAppender(AsyncAppenderBase appender) throws InterruptedException {
int timeWaiting = 0; int timeWaiting = 0;
while (timeWaiting < appender.getMaxFlushTime() && appender.getNumberOfElementsInQueue() > 0) { while (timeWaiting < appender.getMaxFlushTime() && appender.getNumberOfElementsInQueue() > 0) {
Thread.sleep(100); Thread.sleep(100);
Expand Down
@@ -0,0 +1,133 @@
package io.dropwizard.logging;

import ch.qos.logback.core.Appender;
import ch.qos.logback.core.AsyncAppenderBase;
import ch.qos.logback.core.Context;
import ch.qos.logback.core.LogbackException;
import ch.qos.logback.core.filter.Filter;
import ch.qos.logback.core.spi.DeferredProcessingAware;
import ch.qos.logback.core.spi.FilterReply;
import ch.qos.logback.core.status.Status;
import com.google.common.util.concurrent.RateLimiter;
import io.dropwizard.util.Duration;

import java.util.List;
import java.util.concurrent.TimeUnit;

/**
* An {@link AsyncAppenderBase} that applies throttling to a proxied appender.
* Throttling is defined by an average duration between messages.
* Throttled messages are discarded.
*/
class ThrottlingAppenderWrapper<E extends DeferredProcessingAware> implements Appender<E>, AsyncAppenderBaseProxy<E> {

private final AsyncAppenderBase<E> appender;
private final RateLimiter rateLimiter;

public ThrottlingAppenderWrapper(AsyncAppenderBase<E> delegate, Duration messageRate) {
this.appender = delegate;
this.rateLimiter = RateLimiter.create(1_000_000_000.0 / messageRate.toNanoseconds());
}

@Override
public AsyncAppenderBase<E> getAppender() {
return appender;
}

@Override
public void start() {
appender.start();
}

@Override
public void stop() {
appender.stop();
}

@Override
public boolean isStarted() {
return appender.isStarted();
}

@Override
public void doAppend(E event) throws LogbackException {
if (rateLimiter.tryAcquire()) {
appender.doAppend(event);
}
}

@Override
public String getName() {
return appender.getName();
}

@Override
public void setName(String name) {
appender.setName(name);
}

@Override
public Context getContext() {
return appender.getContext();
}

@Override
public void setContext(Context context) {
appender.setContext(context);
}

@Override
public void addStatus(Status status) {
appender.addStatus(status);
}

@Override
public void addInfo(String msg) {
appender.addInfo(msg);
}

@Override
public void addInfo(String msg, Throwable ex) {
appender.addInfo(msg, ex);
}

@Override
public void addWarn(String msg) {
appender.addWarn(msg);
}

@Override
public void addWarn(String msg, Throwable ex) {
appender.addWarn(msg, ex);
}

@Override
public void addError(String msg) {
appender.addError(msg);
}

@Override
public void addError(String msg, Throwable ex) {
appender.addError(msg, ex);
}

@Override
public void addFilter(Filter<E> newFilter) {
appender.addFilter(newFilter);
}

@Override
public void clearAllFilters() {
appender.clearAllFilters();
}

@Override
public List<Filter<E>> getCopyOfAttachedFiltersList() {
return appender.getCopyOfAttachedFiltersList();
}

@Override
public FilterReply getFilterChainDecision(E event) {
return appender.getFilterChainDecision(event);
}
}
@@ -0,0 +1,130 @@
package io.dropwizard.logging;

import com.codahale.metrics.MetricRegistry;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.Resources;
import io.dropwizard.configuration.ConfigurationValidationException;
import io.dropwizard.configuration.FileConfigurationSourceProvider;
import io.dropwizard.configuration.SubstitutingSourceProvider;
import io.dropwizard.configuration.YamlConfigurationFactory;
import io.dropwizard.jackson.Jackson;
import io.dropwizard.validation.BaseValidator;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.text.StrSubstitutor;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.util.List;

import static org.assertj.core.api.Assertions.assertThat;

public class ThrottlingAppenderTest {
private final ObjectMapper objectMapper = Jackson.newObjectMapper();
private final YamlConfigurationFactory<DefaultLoggingFactory> factory = new YamlConfigurationFactory<>(
DefaultLoggingFactory.class,
BaseValidator.newValidator(),
objectMapper, "dw");


private static File loadResource(String resourceName) throws URISyntaxException {
return new File(Resources.getResource(resourceName).toURI());
}

@Test(expected = ConfigurationValidationException.class)
public void appenderWithZeroMessageRate() throws Exception {
final YamlConfigurationFactory<ConsoleAppenderFactory> factory = new YamlConfigurationFactory<>(
ConsoleAppenderFactory.class, BaseValidator.newValidator(), Jackson.newObjectMapper(), "dw");
final ConsoleAppenderFactory appender = factory.build(loadResource("yaml/appender_with_zero_message_rate.yml"));
}

@Test(expected = ConfigurationValidationException.class)
public void appenderWithInvalidMessageRate() throws Exception {
final YamlConfigurationFactory<ConsoleAppenderFactory> factory = new YamlConfigurationFactory<>(
ConsoleAppenderFactory.class, BaseValidator.newValidator(), Jackson.newObjectMapper(), "dw");
final ConsoleAppenderFactory appender = factory.build(loadResource("yaml/appender_with_invalid_message_rate.yml"));
}

@Rule
public TemporaryFolder folder = new TemporaryFolder();

private File newLog() throws IOException {
return folder.newFile("throttling.log");
}

private DefaultLoggingFactory setup(File defaultLog, String messageRate) throws Exception {
StrSubstitutor substitutor = new StrSubstitutor(ImmutableMap.of(
"default", StringUtils.removeEnd(defaultLog.getAbsolutePath(), ".log"),
"messageRate", messageRate
));
DefaultLoggingFactory config = factory.build(
new SubstitutingSourceProvider(new FileConfigurationSourceProvider(), substitutor),
loadResource("yaml/logging-message-rate.yml").getPath());
config.configure(new MetricRegistry(), "test-logger");
return config;
}

@Test
public void overThrottlingLimit() throws Exception {
File defaultLog = newLog();
DefaultLoggingFactory config = setup(defaultLog, "100ms");
Logger logger = LoggerFactory.getLogger("com.example.app");
Thread.sleep(1000);
for (int i = 0; i < 100; i++) {
logger.info("Application log {}", i);
}
config.stop();
assertThat(Files.readAllLines(defaultLog.toPath())).containsOnly(
"INFO com.example.app: Application log 0",
"INFO com.example.app: Application log 1",
"INFO com.example.app: Application log 2",
"INFO com.example.app: Application log 3",
"INFO com.example.app: Application log 4",
"INFO com.example.app: Application log 5",
"INFO com.example.app: Application log 6",
"INFO com.example.app: Application log 7",
"INFO com.example.app: Application log 8",
"INFO com.example.app: Application log 9",
"INFO com.example.app: Application log 10");
}

@Test
public void belowThrottlingLimit() throws Exception {
File defaultLog = newLog();
DefaultLoggingFactory config = setup(defaultLog, "1ms");
Logger logger = LoggerFactory.getLogger("com.example.app");
Thread.sleep(1000);
for (int i = 0; i < 1000; i++) {
logger.info("Application log {}", i);
}
config.stop();
assertThat(Files.readAllLines(defaultLog.toPath())).size().isEqualTo(1000);
}

@Test
public void overThrottlingLimit2Seconds() throws Exception {
File defaultLog = newLog();
DefaultLoggingFactory config = setup(defaultLog, "100ms");
Logger logger = LoggerFactory.getLogger("com.example.app");
Thread.sleep(1000);
for (int i = 0; i < 100; i++) {
if (i == 50) {
Thread.sleep(1000);
}
logger.info("Application log {}", i);
}
config.stop();
List<String> lines = Files.readAllLines(defaultLog.toPath());
assertThat(lines).hasSize(21);
assertThat(lines.get(0)).isEqualTo("INFO com.example.app: Application log 0");
assertThat(lines.get(20)).isEqualTo("INFO com.example.app: Application log 59");
}

}
@@ -0,0 +1,2 @@
type: console
messageRate: 1d

0 comments on commit c448456

Please sign in to comment.