-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Kinesis adaptive memory management #15360
Changes from 15 commits
85f25ed
013b9d7
39545aa
ff77302
97a7ae3
7b155f6
74e0ad2
9547251
d7f9c26
fd57dfb
6666bc8
8bd7c69
f3bac06
b6349fb
0300074
f4c0665
a4b3b35
bca26e9
127bf0d
07840a7
0287eb1
329a4d6
1a6d83a
bce0530
047c266
765efc4
30e0148
5e5a84e
48425e4
40f4f9e
cb1fe85
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -49,6 +49,10 @@ | |
public class KinesisIndexTask extends SeekableStreamIndexTask<String, String, ByteEntity> | ||
{ | ||
private static final String TYPE = "index_kinesis"; | ||
|
||
// GetRecords returns maximum 10MB per call | ||
// (https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html) | ||
private static final long GET_RECORDS_MAX_BYTES_PER_CALL = 10_000_000L; | ||
private static final Logger log = new Logger(KinesisIndexTask.class); | ||
|
||
private final boolean useListShards; | ||
|
@@ -78,6 +82,10 @@ public KinesisIndexTask( | |
); | ||
this.useListShards = useListShards; | ||
this.awsCredentialsConfig = awsCredentialsConfig; | ||
if (tuningConfig.getRecordBufferSizeConfigured() != null) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please move these two checks to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good catch. Moved. |
||
log.warn("The 'recordBufferSize' config property of the kinesis tuning config has been deprecated. " | ||
+ "Please use 'recordBufferSizeBytes'."); | ||
} | ||
} | ||
|
||
@Override | ||
|
@@ -105,21 +113,18 @@ protected KinesisRecordSupplier newTaskRecordSupplier(final TaskToolbox toolbox) | |
{ | ||
KinesisIndexTaskIOConfig ioConfig = ((KinesisIndexTaskIOConfig) super.ioConfig); | ||
KinesisIndexTaskTuningConfig tuningConfig = ((KinesisIndexTaskTuningConfig) super.tuningConfig); | ||
final int recordBufferSizeBytes = | ||
tuningConfig.getRecordBufferSizeBytesOrDefault(runtimeInfo.getMaxHeapSizeBytes()); | ||
final int fetchThreads = computeFetchThreads(runtimeInfo, tuningConfig.getFetchThreads()); | ||
final int recordsPerFetch = ioConfig.getRecordsPerFetchOrDefault(runtimeInfo.getMaxHeapSizeBytes(), fetchThreads); | ||
final int recordBufferSize = | ||
tuningConfig.getRecordBufferSizeOrDefault(runtimeInfo.getMaxHeapSizeBytes(), ioConfig.isDeaggregate()); | ||
final int maxRecordsPerPoll = tuningConfig.getMaxRecordsPerPollOrDefault(ioConfig.isDeaggregate()); | ||
final int maxRecordsPerPoll = tuningConfig.getMaxRecordsPerPollOrDefault(); | ||
|
||
log.info( | ||
"Starting record supplier with fetchThreads [%d], fetchDelayMillis [%d], recordsPerFetch [%d], " | ||
+ "recordBufferSize [%d], maxRecordsPerPoll [%d], deaggregate [%s].", | ||
"Starting record supplier with fetchThreads [%d], fetchDelayMillis [%d], " | ||
+ "recordBufferSizeBytes [%d], maxRecordsPerPoll [%d]", | ||
fetchThreads, | ||
ioConfig.getFetchDelayMillis(), | ||
recordsPerFetch, | ||
recordBufferSize, | ||
maxRecordsPerPoll, | ||
ioConfig.isDeaggregate() | ||
recordBufferSizeBytes, | ||
maxRecordsPerPoll | ||
); | ||
|
||
return new KinesisRecordSupplier( | ||
|
@@ -129,11 +134,9 @@ protected KinesisRecordSupplier newTaskRecordSupplier(final TaskToolbox toolbox) | |
ioConfig.getAwsAssumedRoleArn(), | ||
ioConfig.getAwsExternalId() | ||
), | ||
recordsPerFetch, | ||
ioConfig.getFetchDelayMillis(), | ||
fetchThreads, | ||
ioConfig.isDeaggregate(), | ||
recordBufferSize, | ||
recordBufferSizeBytes, | ||
tuningConfig.getRecordBufferOfferTimeout(), | ||
tuningConfig.getRecordBufferFullWait(), | ||
maxRecordsPerPoll, | ||
|
@@ -179,15 +182,36 @@ AWSCredentialsConfig getAwsCredentialsConfig() | |
} | ||
|
||
@VisibleForTesting | ||
static int computeFetchThreads(final RuntimeInfo runtimeInfo, final Integer configuredFetchThreads) | ||
static int computeFetchThreads( | ||
final RuntimeInfo runtimeInfo, | ||
final Integer configuredFetchThreads | ||
) | ||
{ | ||
final int fetchThreads; | ||
int fetchThreads; | ||
if (configuredFetchThreads != null) { | ||
fetchThreads = configuredFetchThreads; | ||
} else { | ||
fetchThreads = runtimeInfo.getAvailableProcessors() * 2; | ||
} | ||
|
||
// Each fetchThread can return upto 10MB at a time | ||
// (https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html), cap fetchThreads so that | ||
// we don't exceed more than the least of 100MB or 5% of heap at a time. Don't fail if fetchThreads specified | ||
// is greater than this as to not cause failure for older configurations, but log warning in this case, and lower | ||
// fetchThreads implicitly. | ||
final long memoryToUse = Math.min( | ||
KinesisIndexTaskIOConfig.MAX_RECORD_FETCH_MEMORY, | ||
(long) (runtimeInfo.getMaxHeapSizeBytes() * KinesisIndexTaskIOConfig.RECORD_FETCH_MEMORY_MAX_HEAP_FRACTION) | ||
); | ||
int maxFetchThreads = Math.max( | ||
1, | ||
(int) (memoryToUse / GET_RECORDS_MAX_BYTES_PER_CALL) | ||
); | ||
if (fetchThreads > maxFetchThreads) { | ||
log.warn("fetchThreads [%d] being lowered to [%d]", fetchThreads, maxFetchThreads); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This warning should only get logged if There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good catch, updated. |
||
fetchThreads = maxFetchThreads; | ||
} | ||
|
||
Preconditions.checkArgument( | ||
fetchThreads > 0, | ||
"Must have at least one background fetch thread for the record supplier" | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be higher? I wonder if this is too low in the case of non-aggregated records
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wondered the same actually. tbh, im not sure. I think validation for this requires extensive performance testing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed it so that it polls for at least one record and at most 1_000_000 bytes if more than 1 record, which is what we were targeting for before.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So does that mean we should update the
maxRecordsPerPoll: 1
here?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated