Skip to content

Commit

Permalink
More memory limiting for HttpPostEmitter
Browse files Browse the repository at this point in the history
  • Loading branch information
jon-wei committed Jan 26, 2018
1 parent 8041975 commit 4181785
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 3 deletions.
2 changes: 1 addition & 1 deletion docs/content/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ The Druid servers [emit various metrics](../operations/metrics.html) and alerts
|`druid.emitter.http.basicAuthentication`|Login and password for authentification in "login:password" form, e. g. `druid.emitter.http.basicAuthentication=admin:adminpassword`|not specified = no authentification|
|`druid.emitter.http.flushTimeOut`|The timeout after which an event should be sent to the endpoint, even if internal buffers are not filled, in milliseconds.|not specified = no timeout|
|`druid.emitter.http.batchingStrategy`|The strategy of how the batch is formatted. "ARRAY" means `[event1,event2]`, "NEWLINES" means `event1\nevent2`, ONLY_EVENTS means `event1event2`.|ARRAY|
|`druid.emitter.http.maxBatchSize`|The maximum batch size, in bytes.|5191680 (i. e. 5 MB)|
|`druid.emitter.http.maxBatchSize`|The maximum batch size, in bytes.|the minimum of (10% of JVM heap size divided by 50) or (5191680 (i. e. 5 MB))|
|`druid.emitter.http.batchQueueSizeLimit`|The maximum number of batches in emitter queue, if there are problems with emitting.|50|
|`druid.emitter.http.minHttpTimeoutMillis`|If the speed of filling batches imposes timeout smaller than that, not even trying to send batch to endpoint, because it will likely fail, not being able to send the data that fast. Configure this depending based on emitter/successfulSending/minTimeMs metric. Reasonable values are 10ms..100ms.|0|
|`druid.emitter.http.recipientBaseUrl`|The base URL to emit messages to. Druid will POST JSON to be consumed at the HTTP endpoint specified by this property.|none, required config|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,28 @@ public class BaseHttpEmittingConfig
{
public static final long DEFAULT_FLUSH_MILLIS = 60 * 1000;
public static final int DEFAULT_FLUSH_COUNTS = 500;
public static final int DEFAULT_MAX_BATCH_SIZE = 5 * 1024 * 1024;

/** ensure the event buffers don't use more than 10% of memory by default */
public static final int DEFAULT_BATCH_QUEUE_SIZE_LIMIT = 50;
public static final int DEFAULT_MAX_BATCH_SIZE;
static {
long memoryLimit = Runtime.getRuntime().maxMemory() / 10;
long batchSize = 5 * 1024 * 1024;

if (batchSize * DEFAULT_BATCH_QUEUE_SIZE_LIMIT > memoryLimit) {
batchSize = memoryLimit / DEFAULT_BATCH_QUEUE_SIZE_LIMIT;
}

DEFAULT_MAX_BATCH_SIZE = (int) batchSize;
}

/**
* Do not time out in case flushTimeOut is not set
*/
public static final long DEFAULT_FLUSH_TIME_OUT = Long.MAX_VALUE;
public static final String DEFAULT_BASIC_AUTHENTICATION = null;
public static final BatchingStrategy DEFAULT_BATCHING_STRATEGY = BatchingStrategy.ARRAY;
public static final ContentEncoding DEFAULT_CONTENT_ENCODING = null;
public static final int DEFAULT_BATCH_QUEUE_SIZE_LIMIT = 50;
public static final float DEFAULT_HTTP_TIMEOUT_ALLOWANCE_FACTOR = 2.0f;
/**
* The default value effective doesn't set the min timeout
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ private void writeLargeEvent(byte[] eventBytes)
);
} else {
largeEventsToEmit.add(eventBytes);
approximateBuffersToEmitCount.incrementAndGet();
approximateLargeEventsToEmitCount.incrementAndGet();
approximateEventsToEmitCount.incrementAndGet();
}
Expand Down Expand Up @@ -553,6 +554,7 @@ private void emitLargeEvents()
largeEventsToEmit.add(LARGE_EVENTS_STOP);
for (byte[] largeEvent; (largeEvent = largeEventsToEmit.poll()) != LARGE_EVENTS_STOP; ) {
emitLargeEvent(largeEvent);
approximateBuffersToEmitCount.decrementAndGet();
approximateLargeEventsToEmitCount.decrementAndGet();
approximateEventsToEmitCount.decrementAndGet();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ public class EmitterTest
.accumulate(new EagerResponseBodyPart(Unpooled.wrappedBuffer("Yay".getBytes(StandardCharsets.UTF_8)), true))
.build();

public static final Response BAD_RESPONSE = responseBuilder(HttpVersion.HTTP_1_1, HttpResponseStatus.FORBIDDEN)
.accumulate(new EagerResponseBodyPart(Unpooled.wrappedBuffer("Not yay".getBytes(StandardCharsets.UTF_8)), true))
.build();

private static Response.ResponseBuilder responseBuilder(HttpVersion version, HttpResponseStatus status)
{
return new Response.ResponseBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,19 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.Futures;
import io.druid.java.util.emitter.service.ServiceMetricEvent;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import it.unimi.dsi.fastutil.ints.IntList;
import org.asynchttpclient.ListenableFuture;
import org.asynchttpclient.Request;
import org.asynchttpclient.Response;
import org.junit.Assert;
import org.junit.Test;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.List;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -140,4 +143,120 @@ public void run()
}
}
}

/** check that we don't get a heap OOM from too many queued large events */
@Test
public void testLargeEventsQueueLimit() throws InterruptedException, IOException
{
ObjectMapper mapper = new ObjectMapper();

HttpEmitterConfig config = new HttpEmitterConfig.Builder("http://foo.bar")
.setFlushMillis(100)
.setFlushCount(4)
.setBatchingStrategy(BatchingStrategy.ONLY_EVENTS)
.setMaxBatchSize(1024 * 1024)
.setBatchQueueSizeLimit(10)
.build();
final HttpPostEmitter emitter = new HttpPostEmitter(config, httpClient, new ObjectMapper());

emitter.start();

httpClient.setGoHandler(new GoHandler() {
@Override
protected <X extends Exception> ListenableFuture<Response> go(Request request) throws X
{
return GoHandlers.immediateFuture(EmitterTest.BAD_RESPONSE);
}
});

char[] chars = new char[600000];
Arrays.fill(chars, '*');
String bigString = new String(chars);

Event bigEvent = ServiceMetricEvent.builder()
.setFeed("bigEvents")
.setDimension("test", bigString)
.build("metric", 10)
.build("qwerty", "asdfgh");

int bigEventSz = mapper.writeValueAsBytes(bigEvent).length;
long maxMemory = Runtime.getRuntime().maxMemory();
long maxBigEvents = (maxMemory / bigEventSz) + 1;
for (int i = 0; i < maxBigEvents; i++) {
emitter.emit(bigEvent);
}

emitter.flush();
}

@Test
public void testLargeAndSmallEventsQueueLimit() throws InterruptedException, IOException
{
HttpEmitterConfig config = new HttpEmitterConfig.Builder("http://foo.bar")
.setFlushMillis(100)
.setFlushCount(4)
.setBatchingStrategy(BatchingStrategy.ONLY_EVENTS)
.setMaxBatchSize(1024 * 1024)
.setBatchQueueSizeLimit(10)
.build();
final HttpPostEmitter emitter = new HttpPostEmitter(config, httpClient, new ObjectMapper());

emitter.start();

httpClient.setGoHandler(new GoHandler() {
@Override
protected <X extends Exception> ListenableFuture<Response> go(Request request) throws X
{
return GoHandlers.immediateFuture(EmitterTest.BAD_RESPONSE);
}
});

char[] chars = new char[600000];
Arrays.fill(chars, '*');
String bigString = new String(chars);

Event smallEvent = ServiceMetricEvent.builder()
.setFeed("smallEvents")
.setDimension("test", "hi")
.build("metric", 10)
.build("qwerty", "asdfgh");

Event bigEvent = ServiceMetricEvent.builder()
.setFeed("bigEvents")
.setDimension("test", bigString)
.build("metric", 10)
.build("qwerty", "asdfgh");

final CountDownLatch threadsCompleted = new CountDownLatch(2);
new Thread() {
@Override
public void run()
{
for (int i = 0; i < 1000; i++) {

emitter.emit(smallEvent);

Assert.assertTrue(emitter.getFailedBuffers() <= 10);
Assert.assertTrue(emitter.getBuffersToEmit() <= 12);
}
threadsCompleted.countDown();
}
}.start();
new Thread() {
@Override
public void run()
{
for (int i = 0; i < 1000; i++) {

emitter.emit(bigEvent);

Assert.assertTrue(emitter.getFailedBuffers() <= 10);
Assert.assertTrue(emitter.getBuffersToEmit() <= 12);
}
threadsCompleted.countDown();
}
}.start();
threadsCompleted.await();
emitter.flush();
}
}

0 comments on commit 4181785

Please sign in to comment.