Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kinesis adaptive memory management #15360

Merged
merged 31 commits into from
Jan 19, 2024
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
85f25ed
* do stuff
zachjsh Oct 31, 2023
013b9d7
* move existing MemoryBoundLinkedBlockingQueue to druid core and exte…
zachjsh Nov 10, 2023
39545aa
* fix configs, tests
zachjsh Nov 14, 2023
ff77302
* add tests
zachjsh Nov 14, 2023
97a7ae3
Merge remote-tracking branch 'apache/master' into kinesis-adaptive-me…
zachjsh Nov 14, 2023
7b155f6
* fix serde
zachjsh Nov 14, 2023
74e0ad2
* minor fix to logging
zachjsh Nov 14, 2023
9547251
* update comment
zachjsh Nov 14, 2023
d7f9c26
* remove references to removed config properties from documentation a…
zachjsh Nov 15, 2023
fd57dfb
* fix spellcheck
zachjsh Nov 16, 2023
6666bc8
* dont throw away the rest of the GetRecords result after recordBuff…
zachjsh Nov 17, 2023
8bd7c69
* address review comments
zachjsh Nov 27, 2023
f3bac06
Merge remote-tracking branch 'apache/master' into kinesis-adaptive-me…
zachjsh Nov 27, 2023
b6349fb
* fix integration test compilation failure
zachjsh Nov 27, 2023
0300074
* fix code scan failure
zachjsh Nov 28, 2023
f4c0665
* review comments
zachjsh Nov 29, 2023
a4b3b35
Merge remote-tracking branch 'apache/master' into kinesis-adaptive-me…
zachjsh Nov 29, 2023
bca26e9
* fix ingestion-spec.tsx
zachjsh Nov 29, 2023
127bf0d
* change back to ArrayList
zachjsh Nov 29, 2023
07840a7
* fix ingestion-spec.tsx again
zachjsh Nov 29, 2023
0287eb1
* more review comments
zachjsh Dec 1, 2023
329a4d6
Merge remote-tracking branch 'apache/master' into kinesis-adaptive-me…
zachjsh Dec 1, 2023
1a6d83a
* make blocking queue actually block for time specified
zachjsh Dec 15, 2023
bce0530
* fix checkstyle
zachjsh Dec 15, 2023
047c266
* fix failing test
zachjsh Dec 15, 2023
765efc4
* signal not empty
zachjsh Dec 15, 2023
30e0148
* fix test
zachjsh Dec 18, 2023
5e5a84e
* fix test finally
zachjsh Dec 18, 2023
48425e4
Merge remote-tracking branch 'apache/master' into kinesis-adaptive-me…
zachjsh Jan 11, 2024
40f4f9e
Merge remote-tracking branch 'apache/master' into kinesis-adaptive-me…
zachjsh Jan 17, 2024
cb1fe85
Merge remote-tracking branch 'apache/master' into kinesis-adaptive-me…
zachjsh Jan 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 41 additions & 48 deletions docs/development/extensions-core/kinesis-ingestion.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import org.apache.druid.emitter.kafka.KafkaEmitterConfig.EventType;
import org.apache.druid.emitter.kafka.MemoryBoundLinkedBlockingQueue.ObjectContainer;
import org.apache.druid.java.util.common.MemoryBoundLinkedBlockingQueue;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.common.logger.Logger;
Expand Down Expand Up @@ -173,7 +173,7 @@ private void sendSegmentMetadataToKafka()

private void sendToKafka(final String topic, MemoryBoundLinkedBlockingQueue<String> recordQueue, Callback callback)
{
ObjectContainer<String> objectToSend;
MemoryBoundLinkedBlockingQueue.ObjectContainer<String> objectToSend;
try {
while (true) {
objectToSend = recordQueue.take();
Expand All @@ -199,7 +199,7 @@ public void emit(final Event event)

String resultJson = jsonMapper.writeValueAsString(map);

ObjectContainer<String> objectContainer = new ObjectContainer<>(
MemoryBoundLinkedBlockingQueue.ObjectContainer<String> objectContainer = new MemoryBoundLinkedBlockingQueue.ObjectContainer<>(
resultJson,
StringUtils.toUtf8(resultJson).length
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@
public class KinesisIndexTask extends SeekableStreamIndexTask<String, String, ByteEntity>
{
private static final String TYPE = "index_kinesis";

// GetRecords returns maximum 10MB per call
// (https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html)
private static final long GET_RECORDS_MAX_BYTES_PER_CALL = 10_000_000L;
private static final Logger log = new Logger(KinesisIndexTask.class);

private final boolean useListShards;
Expand Down Expand Up @@ -78,6 +82,14 @@ public KinesisIndexTask(
);
this.useListShards = useListShards;
this.awsCredentialsConfig = awsCredentialsConfig;
if (tuningConfig.getRecordBufferSizeConfigured() != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please move these two checks to run rather than the constructor, because we don't need to log this stuff every time a task object is constructed. (That happens at various points on the Overlord due to various API calls and internal machinations, and will create a log of log spam.)

Copy link
Contributor Author

@zachjsh zachjsh Dec 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Moved.

log.warn("The 'recordBufferSize' config property of the kinesis tuning config has been deprecated. "
+ "Please use 'recordBufferSizeBytes'.");
}
if (tuningConfig.getMaxRecordsPerPollConfigured() != null) {
log.warn("The 'maxRecordsPerPoll' config property of the kinesis tuning config has been deprecated. "
+ "Please use 'maxBytesPerPoll'.");
}
}

@Override
Expand Down Expand Up @@ -105,21 +117,18 @@ protected KinesisRecordSupplier newTaskRecordSupplier(final TaskToolbox toolbox)
{
KinesisIndexTaskIOConfig ioConfig = ((KinesisIndexTaskIOConfig) super.ioConfig);
KinesisIndexTaskTuningConfig tuningConfig = ((KinesisIndexTaskTuningConfig) super.tuningConfig);
final int recordBufferSizeBytes =
tuningConfig.getRecordBufferSizeBytesOrDefault(runtimeInfo.getMaxHeapSizeBytes());
final int fetchThreads = computeFetchThreads(runtimeInfo, tuningConfig.getFetchThreads());
final int recordsPerFetch = ioConfig.getRecordsPerFetchOrDefault(runtimeInfo.getMaxHeapSizeBytes(), fetchThreads);
final int recordBufferSize =
tuningConfig.getRecordBufferSizeOrDefault(runtimeInfo.getMaxHeapSizeBytes(), ioConfig.isDeaggregate());
final int maxRecordsPerPoll = tuningConfig.getMaxRecordsPerPollOrDefault(ioConfig.isDeaggregate());
final int maxBytesPerPoll = tuningConfig.getMaxBytesPerPollOrDefault();

log.info(
"Starting record supplier with fetchThreads [%d], fetchDelayMillis [%d], recordsPerFetch [%d], "
+ "recordBufferSize [%d], maxRecordsPerPoll [%d], deaggregate [%s].",
"Starting record supplier with fetchThreads [%d], fetchDelayMillis [%d], "
+ "recordBufferSizeBytes [%d], maxBytesPerPoll [%d]",
fetchThreads,
ioConfig.getFetchDelayMillis(),
recordsPerFetch,
recordBufferSize,
maxRecordsPerPoll,
ioConfig.isDeaggregate()
recordBufferSizeBytes,
maxBytesPerPoll
);

return new KinesisRecordSupplier(
Expand All @@ -129,14 +138,12 @@ protected KinesisRecordSupplier newTaskRecordSupplier(final TaskToolbox toolbox)
ioConfig.getAwsAssumedRoleArn(),
ioConfig.getAwsExternalId()
),
recordsPerFetch,
ioConfig.getFetchDelayMillis(),
fetchThreads,
ioConfig.isDeaggregate(),
recordBufferSize,
recordBufferSizeBytes,
tuningConfig.getRecordBufferOfferTimeout(),
tuningConfig.getRecordBufferFullWait(),
maxRecordsPerPoll,
maxBytesPerPoll,
false,
useListShards
);
Expand Down Expand Up @@ -179,15 +186,36 @@ AWSCredentialsConfig getAwsCredentialsConfig()
}

@VisibleForTesting
static int computeFetchThreads(final RuntimeInfo runtimeInfo, final Integer configuredFetchThreads)
static int computeFetchThreads(
final RuntimeInfo runtimeInfo,
final Integer configuredFetchThreads
)
{
final int fetchThreads;
int fetchThreads;
if (configuredFetchThreads != null) {
fetchThreads = configuredFetchThreads;
} else {
fetchThreads = runtimeInfo.getAvailableProcessors() * 2;
}

// Each fetchThread can return upto 10MB at a time
// (https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html), cap fetchThreads so that
// we don't exceed more than the least of 100MB or 5% of heap at a time. Don't fail if fetchThreads specified
// is greater than this as to not cause failure for older configurations, but log warning in this case, and lower
// fetchThreads implicitly.
final long memoryToUse = Math.min(
KinesisIndexTaskIOConfig.MAX_RECORD_FETCH_MEMORY,
(long) (runtimeInfo.getMaxHeapSizeBytes() * KinesisIndexTaskIOConfig.RECORD_FETCH_MEMORY_MAX_HEAP_FRACTION)
);
int maxFetchThreads = Math.max(
1,
(int) (memoryToUse / GET_RECORDS_MAX_BYTES_PER_CALL)
);
if (fetchThreads > maxFetchThreads) {
log.warn("fetchThreads [%d] being lowered to [%d]", fetchThreads, maxFetchThreads);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This warning should only get logged if configuredFetchThreads != null. There's no reason to log it if runtimeInfo.getAvailableProcessors() * 2 is lower than maxFetchThreads.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, updated.

fetchThreads = maxFetchThreads;
}

Preconditions.checkArgument(
fetchThreads > 0,
"Must have at least one background fetch thread for the record supplier"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Ints;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig;
Expand All @@ -41,21 +40,19 @@ public class KinesisIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig<St
* Together with {@link KinesisIndexTaskTuningConfig#MAX_RECORD_BUFFER_MEMORY}, don't take up more than 200MB
* per task.
*/
private static final int MAX_RECORD_FETCH_MEMORY = 100_000_000;
public static final int MAX_RECORD_FETCH_MEMORY = 100_000_000;

/**
* Together with {@link KinesisIndexTaskTuningConfig#RECORD_BUFFER_MEMORY_MAX_HEAP_FRACTION}, don't take up more
* than 15% of the heap.
*/
private static final double RECORD_FETCH_MEMORY_MAX_HEAP_FRACTION = 0.05;
public static final double RECORD_FETCH_MEMORY_MAX_HEAP_FRACTION = 0.05;

private final String endpoint;
private final Integer recordsPerFetch;
private final int fetchDelayMillis;

private final String awsAssumedRoleArn;
private final String awsExternalId;
private final boolean deaggregate;

@JsonCreator
public KinesisIndexTaskIOConfig(
Expand All @@ -79,11 +76,9 @@ public KinesisIndexTaskIOConfig(
@JsonProperty("maximumMessageTime") DateTime maximumMessageTime,
@JsonProperty("inputFormat") @Nullable InputFormat inputFormat,
@JsonProperty("endpoint") String endpoint,
@JsonProperty("recordsPerFetch") Integer recordsPerFetch,
@JsonProperty("fetchDelayMillis") Integer fetchDelayMillis,
@JsonProperty("awsAssumedRoleArn") String awsAssumedRoleArn,
@JsonProperty("awsExternalId") String awsExternalId,
@JsonProperty("deaggregate") boolean deaggregate
@JsonProperty("awsExternalId") String awsExternalId
)
{
super(
Expand All @@ -105,11 +100,9 @@ public KinesisIndexTaskIOConfig(
);

this.endpoint = Preconditions.checkNotNull(endpoint, "endpoint");
this.recordsPerFetch = recordsPerFetch;
this.fetchDelayMillis = fetchDelayMillis != null ? fetchDelayMillis : DEFAULT_FETCH_DELAY_MILLIS;
this.awsAssumedRoleArn = awsAssumedRoleArn;
this.awsExternalId = awsExternalId;
this.deaggregate = deaggregate;
}

public KinesisIndexTaskIOConfig(
Expand All @@ -122,11 +115,9 @@ public KinesisIndexTaskIOConfig(
DateTime maximumMessageTime,
InputFormat inputFormat,
String endpoint,
Integer recordsPerFetch,
Integer fetchDelayMillis,
String awsAssumedRoleArn,
String awsExternalId,
boolean deaggregate
String awsExternalId
)
{
this(
Expand All @@ -142,11 +133,9 @@ public KinesisIndexTaskIOConfig(
maximumMessageTime,
inputFormat,
endpoint,
recordsPerFetch,
fetchDelayMillis,
awsAssumedRoleArn,
awsExternalId,
deaggregate
awsExternalId
);
}

Expand Down Expand Up @@ -215,32 +204,6 @@ public String getEndpoint()
return endpoint;
}

@Nullable
@JsonProperty("recordsPerFetch")
@JsonInclude(JsonInclude.Include.NON_NULL)
public Integer getRecordsPerFetchConfigured()
{
return recordsPerFetch;
}

public int getRecordsPerFetchOrDefault(final long maxHeapSize, final int fetchThreads)
{
if (recordsPerFetch != null) {
return recordsPerFetch;
} else {
final long memoryToUse = Math.min(
MAX_RECORD_FETCH_MEMORY,
(long) (maxHeapSize * RECORD_FETCH_MEMORY_MAX_HEAP_FRACTION)
);

final int assumedRecordSize = deaggregate
? KinesisIndexTaskTuningConfig.ASSUMED_RECORD_SIZE_AGGREGATE
: KinesisIndexTaskTuningConfig.ASSUMED_RECORD_SIZE;

return Ints.checkedCast(Math.max(1, memoryToUse / assumedRecordSize / fetchThreads));
}
}

@JsonProperty
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
public int getFetchDelayMillis()
Expand All @@ -262,13 +225,6 @@ public String getAwsExternalId()
return awsExternalId;
}

@JsonProperty
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
public boolean isDeaggregate()
{
return deaggregate;
}

@Override
public String toString()
{
Expand All @@ -280,11 +236,9 @@ public String toString()
", minimumMessageTime=" + getMinimumMessageTime() +
", maximumMessageTime=" + getMaximumMessageTime() +
", endpoint='" + endpoint + '\'' +
", recordsPerFetch=" + recordsPerFetch +
", fetchDelayMillis=" + fetchDelayMillis +
", awsAssumedRoleArn='" + awsAssumedRoleArn + '\'' +
", awsExternalId='" + awsExternalId + '\'' +
", deaggregate=" + deaggregate +
'}';
}
}
Loading
Loading