Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More memory limiting for HttpPostEmitter #5300

Merged
merged 4 commits into from
Jan 26, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/content/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,8 @@ The Druid servers [emit various metrics](../operations/metrics.html) and alerts
|`druid.emitter.http.basicAuthentication`|Login and password for authentification in "login:password" form, e. g. `druid.emitter.http.basicAuthentication=admin:adminpassword`|not specified = no authentification|
|`druid.emitter.http.flushTimeOut`|The timeout after which an event should be sent to the endpoint, even if internal buffers are not filled, in milliseconds.|not specified = no timeout|
|`druid.emitter.http.batchingStrategy`|The strategy of how the batch is formatted. "ARRAY" means `[event1,event2]`, "NEWLINES" means `event1\nevent2`, ONLY_EVENTS means `event1event2`.|ARRAY|
|`druid.emitter.http.maxBatchSize`|The maximum batch size, in bytes.|5191680 (i. e. 5 MB)|
|`druid.emitter.http.batchQueueSizeLimit`|The maximum number of batches in emitter queue, if there are problems with emitting.|50|
|`druid.emitter.http.maxBatchSize`|The maximum batch size, in bytes.|the minimum of (10% of JVM heap size divided by 2) or (5191680 (i. e. 5 MB))|
|`druid.emitter.http.batchQueueSizeLimit`|The maximum number of batches in emitter queue, if there are problems with emitting.|the maximum of (2) or (10% of the JVM heap size divided by 5MB)|
|`druid.emitter.http.minHttpTimeoutMillis`|If the speed of filling batches imposes timeout smaller than that, not even trying to send batch to endpoint, because it will likely fail, not being able to send the data that fast. Configure this depending based on emitter/successfulSending/minTimeMs metric. Reasonable values are 10ms..100ms.|0|
|`druid.emitter.http.recipientBaseUrl`|The base URL to emit messages to. Druid will POST JSON to be consumed at the HTTP endpoint specified by this property.|none, required config|

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,28 +20,56 @@
package io.druid.java.util.emitter.core;

import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.java.util.common.Pair;

import javax.validation.constraints.Min;

public class BaseHttpEmittingConfig
{
public static final long DEFAULT_FLUSH_MILLIS = 60 * 1000;
public static final int DEFAULT_FLUSH_COUNTS = 500;
public static final int DEFAULT_MAX_BATCH_SIZE = 5 * 1024 * 1024;

/** ensure the event buffers don't use more than 10% of memory by default */
public static final int DEFAULT_MAX_BATCH_SIZE;
public static final int DEFAULT_BATCH_QUEUE_SIZE_LIMIT;
static {
Pair<Integer, Integer> batchConfigPair = getDefaultBatchSizeAndLimit(Runtime.getRuntime().maxMemory());
DEFAULT_MAX_BATCH_SIZE = batchConfigPair.lhs;
DEFAULT_BATCH_QUEUE_SIZE_LIMIT = batchConfigPair.rhs;
}

/**
* Do not time out in case flushTimeOut is not set
*/
public static final long DEFAULT_FLUSH_TIME_OUT = Long.MAX_VALUE;
public static final String DEFAULT_BASIC_AUTHENTICATION = null;
public static final BatchingStrategy DEFAULT_BATCHING_STRATEGY = BatchingStrategy.ARRAY;
public static final ContentEncoding DEFAULT_CONTENT_ENCODING = null;
public static final int DEFAULT_BATCH_QUEUE_SIZE_LIMIT = 50;
public static final float DEFAULT_HTTP_TIMEOUT_ALLOWANCE_FACTOR = 2.0f;
/**
* The default value effective doesn't set the min timeout
*/
public static final int DEFAULT_MIN_HTTP_TIMEOUT_MILLIS = 0;

public static Pair<Integer, Integer> getDefaultBatchSizeAndLimit(long maxMemory)
{
long memoryLimit = maxMemory / 10;
long batchSize = 5 * 1024 * 1024;
long queueLimit = 50;

if (batchSize * queueLimit > memoryLimit) {
queueLimit = memoryLimit / batchSize;
}

// make room for at least two queue items
if (queueLimit < 2) {
queueLimit = 2;
batchSize = memoryLimit / queueLimit;
}

return new Pair<>((int) batchSize, (int) queueLimit);
}

@Min(1)
@JsonProperty
long flushMillis = DEFAULT_FLUSH_MILLIS;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ private void writeLargeEvent(byte[] eventBytes)
);
} else {
largeEventsToEmit.add(eventBytes);
approximateBuffersToEmitCount.incrementAndGet();
approximateLargeEventsToEmitCount.incrementAndGet();
approximateEventsToEmitCount.incrementAndGet();
}
Expand Down Expand Up @@ -553,6 +554,7 @@ private void emitLargeEvents()
largeEventsToEmit.add(LARGE_EVENTS_STOP);
for (byte[] largeEvent; (largeEvent = largeEventsToEmit.poll()) != LARGE_EVENTS_STOP; ) {
emitLargeEvent(largeEvent);
approximateBuffersToEmitCount.decrementAndGet();
approximateLargeEventsToEmitCount.decrementAndGet();
approximateEventsToEmitCount.decrementAndGet();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ public class EmitterTest
.accumulate(new EagerResponseBodyPart(Unpooled.wrappedBuffer("Yay".getBytes(StandardCharsets.UTF_8)), true))
.build();

public static final Response BAD_RESPONSE = responseBuilder(HttpVersion.HTTP_1_1, HttpResponseStatus.FORBIDDEN)
.accumulate(new EagerResponseBodyPart(Unpooled.wrappedBuffer("Not yay".getBytes(StandardCharsets.UTF_8)), true))
.build();

private static Response.ResponseBuilder responseBuilder(HttpVersion version, HttpResponseStatus status)
{
return new Response.ResponseBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package io.druid.java.util.emitter.core;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.druid.java.util.common.Pair;
import org.junit.Assert;
import org.junit.Test;

Expand All @@ -44,9 +45,12 @@ public void testDefaults()
Assert.assertEquals("http://example.com/", config.getRecipientBaseUrl());
Assert.assertEquals(null, config.getBasicAuthentication());
Assert.assertEquals(BatchingStrategy.ARRAY, config.getBatchingStrategy());
Assert.assertEquals(5 * 1024 * 1024, config.getMaxBatchSize());
Pair<Integer, Integer> batchConfigPair = BaseHttpEmittingConfig.getDefaultBatchSizeAndLimit(
Runtime.getRuntime().maxMemory()
);
Assert.assertEquals(batchConfigPair.lhs.intValue(), config.getMaxBatchSize());
Assert.assertEquals(batchConfigPair.rhs.intValue(), config.getBatchQueueSizeLimit());
Assert.assertEquals(Long.MAX_VALUE, config.getFlushTimeOut());
Assert.assertEquals(50, config.getBatchQueueSizeLimit());
Assert.assertEquals(2.0f, config.getHttpTimeoutAllowanceFactor(), 0.0f);
Assert.assertEquals(0, config.getMinHttpTimeoutMillis());
}
Expand All @@ -65,9 +69,12 @@ public void testDefaultsLegacy()
Assert.assertEquals("http://example.com/", config.getRecipientBaseUrl());
Assert.assertEquals(null, config.getBasicAuthentication());
Assert.assertEquals(BatchingStrategy.ARRAY, config.getBatchingStrategy());
Assert.assertEquals(5 * 1024 * 1024, config.getMaxBatchSize());
Pair<Integer, Integer> batchConfigPair = BaseHttpEmittingConfig.getDefaultBatchSizeAndLimit(
Runtime.getRuntime().maxMemory()
);
Assert.assertEquals(batchConfigPair.lhs.intValue(), config.getMaxBatchSize());
Assert.assertEquals(batchConfigPair.rhs.intValue(), config.getBatchQueueSizeLimit());
Assert.assertEquals(Long.MAX_VALUE, config.getFlushTimeOut());
Assert.assertEquals(50, config.getBatchQueueSizeLimit());
Assert.assertEquals(2.0f, config.getHttpTimeoutAllowanceFactor(), 0.0f);
Assert.assertEquals(0, config.getMinHttpTimeoutMillis());
}
Expand Down Expand Up @@ -134,4 +141,32 @@ public void testSettingEverythingLegacy()
Assert.assertEquals(3.0f, config.getHttpTimeoutAllowanceFactor(), 0.0f);
Assert.assertEquals(100, config.getMinHttpTimeoutMillis());
}

@Test
public void testMemoryLimits()
{
Pair<Integer, Integer> batchConfigPair = BaseHttpEmittingConfig.getDefaultBatchSizeAndLimit(
64 * 1024 * 1024
);
Assert.assertEquals(3355443, batchConfigPair.lhs.intValue());
Assert.assertEquals(2, batchConfigPair.rhs.intValue());

Pair<Integer, Integer> batchConfigPair2 = BaseHttpEmittingConfig.getDefaultBatchSizeAndLimit(
128 * 1024 * 1024
);
Assert.assertEquals(5242880, batchConfigPair2.lhs.intValue());
Assert.assertEquals(2, batchConfigPair2.rhs.intValue());

Pair<Integer, Integer> batchConfigPair3 = BaseHttpEmittingConfig.getDefaultBatchSizeAndLimit(
256 * 1024 * 1024
);
Assert.assertEquals(5242880, batchConfigPair3.lhs.intValue());
Assert.assertEquals(5, batchConfigPair3.rhs.intValue());

Pair<Integer, Integer> batchConfigPair4 = BaseHttpEmittingConfig.getDefaultBatchSizeAndLimit(
Long.MAX_VALUE
);
Assert.assertEquals(5242880, batchConfigPair4.lhs.intValue());
Assert.assertEquals(50, batchConfigPair4.rhs.intValue());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,19 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.Futures;
import io.druid.java.util.emitter.service.ServiceMetricEvent;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import it.unimi.dsi.fastutil.ints.IntList;
import org.asynchttpclient.ListenableFuture;
import org.asynchttpclient.Request;
import org.asynchttpclient.Response;
import org.junit.Assert;
import org.junit.Test;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.List;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -140,4 +143,117 @@ public void run()
}
}
}

@Test
public void testLargeEventsQueueLimit() throws InterruptedException, IOException
{
ObjectMapper mapper = new ObjectMapper();

HttpEmitterConfig config = new HttpEmitterConfig.Builder("http://foo.bar")
.setFlushMillis(100)
.setFlushCount(4)
.setBatchingStrategy(BatchingStrategy.ONLY_EVENTS)
.setMaxBatchSize(1024 * 1024)
.setBatchQueueSizeLimit(10)
.build();
final HttpPostEmitter emitter = new HttpPostEmitter(config, httpClient, new ObjectMapper());

emitter.start();

httpClient.setGoHandler(new GoHandler() {
@Override
protected <X extends Exception> ListenableFuture<Response> go(Request request) throws X
{
return GoHandlers.immediateFuture(EmitterTest.BAD_RESPONSE);
}
});

char[] chars = new char[600000];
Arrays.fill(chars, '*');
String bigString = new String(chars);

Event bigEvent = ServiceMetricEvent.builder()
.setFeed("bigEvents")
.setDimension("test", bigString)
.build("metric", 10)
.build("qwerty", "asdfgh");

for (int i = 0; i < 1000; i++) {
emitter.emit(bigEvent);
Assert.assertTrue(emitter.getLargeEventsToEmit() <= 11);
}

emitter.flush();
}

@Test
public void testLargeAndSmallEventsQueueLimit() throws InterruptedException, IOException
{
HttpEmitterConfig config = new HttpEmitterConfig.Builder("http://foo.bar")
.setFlushMillis(100)
.setFlushCount(4)
.setBatchingStrategy(BatchingStrategy.ONLY_EVENTS)
.setMaxBatchSize(1024 * 1024)
.setBatchQueueSizeLimit(10)
.build();
final HttpPostEmitter emitter = new HttpPostEmitter(config, httpClient, new ObjectMapper());

emitter.start();

httpClient.setGoHandler(new GoHandler() {
@Override
protected <X extends Exception> ListenableFuture<Response> go(Request request) throws X
{
return GoHandlers.immediateFuture(EmitterTest.BAD_RESPONSE);
}
});

char[] chars = new char[600000];
Arrays.fill(chars, '*');
String bigString = new String(chars);

Event smallEvent = ServiceMetricEvent.builder()
.setFeed("smallEvents")
.setDimension("test", "hi")
.build("metric", 10)
.build("qwerty", "asdfgh");

Event bigEvent = ServiceMetricEvent.builder()
.setFeed("bigEvents")
.setDimension("test", bigString)
.build("metric", 10)
.build("qwerty", "asdfgh");

final CountDownLatch threadsCompleted = new CountDownLatch(2);
new Thread() {
@Override
public void run()
{
for (int i = 0; i < 1000; i++) {

emitter.emit(smallEvent);

Assert.assertTrue(emitter.getFailedBuffers() <= 10);
Assert.assertTrue(emitter.getBuffersToEmit() <= 12);
}
threadsCompleted.countDown();
}
}.start();
new Thread() {
@Override
public void run()
{
for (int i = 0; i < 1000; i++) {

emitter.emit(bigEvent);

Assert.assertTrue(emitter.getFailedBuffers() <= 10);
Assert.assertTrue(emitter.getBuffersToEmit() <= 12);
}
threadsCompleted.countDown();
}
}.start();
threadsCompleted.await();
emitter.flush();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package io.druid.java.util.emitter.core;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.druid.java.util.common.Pair;
import org.junit.Assert;
import org.junit.Test;

Expand All @@ -41,7 +42,11 @@ public void testDefaults()
Assert.assertEquals("http://example.com/topic", config.getRecipientBaseUrl());
Assert.assertEquals(null, config.getBasicAuthentication());
Assert.assertEquals(BatchingStrategy.ARRAY, config.getBatchingStrategy());
Assert.assertEquals(5 * 1024 * 1024, config.getMaxBatchSize());
Pair<Integer, Integer> batchConfigPair = BaseHttpEmittingConfig.getDefaultBatchSizeAndLimit(
Runtime.getRuntime().maxMemory()
);
Assert.assertEquals(batchConfigPair.lhs.intValue(), config.getMaxBatchSize());
Assert.assertEquals(batchConfigPair.rhs.intValue(), config.getBatchQueueSizeLimit());
Assert.assertEquals(Long.MAX_VALUE, config.getFlushTimeOut());
}

Expand Down