Skip to content

Commit

Permalink
Kinesis adaptive memory management (#15360)
Browse files Browse the repository at this point in the history
### Description

Our Kinesis consumer works by using the [GetRecords API](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html) in some number of `fetchThreads`, each fetching some number of records (`recordsPerFetch`) and each inserting into a shared buffer that can hold a `recordBufferSize` number of records. The logic is described in our documentation at: https://druid.apache.org/docs/27.0.0/development/extensions-core/kinesis-ingestion/#determine-fetch-settings 

There is a problem with the logic that this pr fixes: the memory limits rely on a hard-coded “estimated record size” that is `10 KB` if `deaggregate: false` and `1 MB` if `deaggregate: true`. There have been cases where a supervisor had `deaggregate: true` set even though it wasn’t needed, leading to under-utilization of memory and poor ingestion performance.

Users don’t always know if their records are aggregated or not. Also, even if they could figure it out, it’s better to not have to. So we’d like to eliminate the `deaggregate` parameter, which means we need to do memory management more adaptively based on the actual record sizes.

We take advantage of the fact that GetRecords doesn’t return more than 10MB (https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html ):

This pr: 

eliminates `recordsPerFetch`, always use the max limit of 10000 records (the default limit if not set)

eliminate `deaggregate`, always have it true

cap `fetchThreads` to ensure that if each fetch returns the max (`10MB`) then we don't exceed our budget (`100MB` or `5% of heap`). In practice this means `fetchThreads` will never be more than `10`. Tasks usually don't have that many processors available to them anyway, so in practice I don't think this will change the number of threads for too many deployments

add `recordBufferSizeBytes` as a bytes-based limit rather than records-based limit for the shared queue. We do know the byte size of kinesis records by at this point. Default should be `100MB` or `10% of heap`, whichever is smaller.

add `maxBytesPerPoll` as a bytes-based limit for how much data we poll from shared buffer at a time. Default is `1000000` bytes.

deprecate `recordBufferSize`, use `recordBufferSizeBytes` instead. Warning is logged if `recordBufferSize` is specified

deprecate `maxRecordsPerPoll`, use `maxBytesPerPoll` instead. Warning is logged if maxRecordsPerPoll` is specified

Fixed issue that when the record buffer is full, the fetchRecords logic throws away the rest of the GetRecords result after `recordBufferOfferTimeout` and starts a new shard iterator. This seems excessively churny. Instead,  wait an unbounded amount of time for queue to stop being full. If the queue remains full, we’ll end up right back waiting for it after the restarted fetch.

There was also a call to `newQ::offer` without check in `filterBufferAndResetBackgroundFetch`, which seemed like it could cause data loss. Now checking return value here, and failing if false.

### Release Note

Kinesis ingestion memory tuning config has been greatly simplified, and a more adaptive approach is now taken for the configuration. Here is a summary of the changes made:

eliminates `recordsPerFetch`, always use the max limit of 10000 records (the default limit if not set)

eliminate `deaggregate`, always have it true

cap `fetchThreads` to ensure that if each fetch returns the max (`10MB`) then we don't exceed our budget (`100MB` or `5% of heap`). In practice this means `fetchThreads` will never be more than `10`. Tasks usually don't have that many processors available to them anyway, so in practice I don't think this will change the number of threads for too many deployments

add `recordBufferSizeBytes` as a bytes-based limit rather than records-based limit for the shared queue. We do know the byte size of kinesis records by at this point. Default should be `100MB` or `10% of heap`, whichever is smaller.

add `maxBytesPerPoll` as a bytes-based limit for how much data we poll from shared buffer at a time. Default is `1000000` bytes.

deprecate `recordBufferSize`, use `recordBufferSizeBytes` instead. Warning is logged if `recordBufferSize` is specified

deprecate `maxRecordsPerPoll`, use `maxBytesPerPoll` instead. Warning is logged if maxRecordsPerPoll` is specified
  • Loading branch information
zachjsh committed Jan 19, 2024
1 parent 38c1def commit 9d4e805
Show file tree
Hide file tree
Showing 23 changed files with 907 additions and 600 deletions.
27 changes: 10 additions & 17 deletions docs/development/extensions-core/kinesis-ingestion.md
Original file line number Diff line number Diff line change
Expand Up @@ -241,11 +241,9 @@ The following table outlines the configuration options for `ioConfig`:
|`completionTimeout`|ISO 8601 period|The length of time to wait before Druid declares a publishing task has failed and terminates it. If this is set too low, your tasks may never publish. The publishing clock for a task begins roughly after `taskDuration` elapses.|No|PT6H|
|`lateMessageRejectionPeriod`|ISO 8601 period|Configure tasks to reject messages with timestamps earlier than this period before the task is created. For example, if `lateMessageRejectionPeriod` is set to `PT1H` and the supervisor creates a task at `2016-01-01T12:00Z`, messages with timestamps earlier than `2016-01-01T11:00Z` are dropped. This may help prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments, such as a streaming and a nightly batch ingestion pipeline.|No||
|`earlyMessageRejectionPeriod`|ISO 8601 period|Configure tasks to reject messages with timestamps later than this period after the task reached its `taskDuration`. For example, if `earlyMessageRejectionPeriod` is set to `PT1H`, the `taskDuration` is set to `PT1H` and the supervisor creates a task at `2016-01-01T12:00Z`. Messages with timestamps later than `2016-01-01T14:00Z` are dropped. **Note:** Tasks sometimes run past their task duration, for example, in cases of supervisor failover. Setting `earlyMessageRejectionPeriod` too low may cause messages to be dropped unexpectedly whenever a task runs past its originally configured task duration.|No||
|`recordsPerFetch`|Integer|The number of records to request per call to fetch records from Kinesis.|No| See [Determine fetch settings](#determine-fetch-settings) for defaults.|
|`fetchDelayMillis`|Integer|Time in milliseconds to wait between subsequent calls to fetch records from Kinesis. See [Determine fetch settings](#determine-fetch-settings).|No|0|
|`awsAssumedRoleArn`|String|The AWS assumed role to use for additional permissions.|No||
|`awsExternalId`|String|The AWS external ID to use for additional permissions.|No||
|`deaggregate`|Boolean|Whether to use the deaggregate function of the Kinesis Client Library (KCL).|No||
|`autoScalerConfig`|Object|Defines autoscaling behavior for Kinesis ingest tasks. See [Task autoscaler properties](#task-autoscaler-properties) for more information.|No|null|

### Task autoscaler properties
Expand Down Expand Up @@ -406,15 +404,15 @@ The following table outlines the configuration options for `tuningConfig`:
|`chatRetries`|Integer|The number of times Druid retries HTTP requests to indexing tasks before considering tasks unresponsive.|No|8|
|`httpTimeout`|ISO 8601 period|The period of time to wait for a HTTP response from an indexing task.|No|PT10S|
|`shutdownTimeout`|ISO 8601 period|The period of time to wait for the supervisor to attempt a graceful shutdown of tasks before exiting.|No|PT80S|
|`recordBufferSize`|Integer|The size of the buffer (number of events) Druid uses between the Kinesis fetch threads and the main ingestion thread.|No|See [Determine fetch settings](#determine-fetch-settings) for defaults.|
|`recordBufferSizeBytes`|Integer| The size of the buffer (heap memory bytes) Druid uses between the Kinesis fetch threads and the main ingestion thread.|No| See [Determine fetch settings](#determine-fetch-settings) for defaults.|
|`recordBufferOfferTimeout`|Integer|The number of milliseconds to wait for space to become available in the buffer before timing out.|No|5000|
|`recordBufferFullWait`|Integer|The number of milliseconds to wait for the buffer to drain before Druid attempts to fetch records from Kinesis again.|No|5000|
|`fetchThreads`|Integer|The size of the pool of threads fetching data from Kinesis. There is no benefit in having more threads than Kinesis shards.|No| `procs * 2`, where `procs` is the number of processors available to the task.|
|`segmentWriteOutMediumFactory`|Object|The segment write-out medium to use when creating segments See [Additional Peon configuration: SegmentWriteOutMediumFactory](../../configuration/index.md#segmentwriteoutmediumfactory) for explanation and available options.|No|If not specified, Druid uses the value from `druid.peon.defaultSegmentWriteOutMediumFactory.type`.|
|`logParseExceptions`|Boolean|If `true`, Druid logs an error message when a parsing exception occurs, containing information about the row where the error occurred.|No|`false`|
|`maxParseExceptions`|Integer|The maximum number of parse exceptions that can occur before the task halts ingestion and fails. Overridden if `reportParseExceptions` is set.|No|unlimited|
|`maxSavedParseExceptions`|Integer|When a parse exception occurs, Druid keeps track of the most recent parse exceptions. `maxSavedParseExceptions` limits the number of saved exception instances. These saved exceptions are available after the task finishes in the [task completion report](../../ingestion/tasks.md#task-reports). Overridden if `reportParseExceptions` is set.|No|0|
|`maxRecordsPerPoll`|Integer|The maximum number of records to be fetched from buffer per poll. The actual maximum will be `Max(maxRecordsPerPoll, Max(bufferSize, 1))`.|No| See [Determine fetch settings](#determine-fetch-settings) for defaults.|
|`maxBytesPerPoll`|Integer| The maximum number of bytes to be fetched from buffer per poll. At least one record is polled from the buffer regardless of this config.|No| 1000000 bytes|
|`repartitionTransitionDuration`|ISO 8601 period|When shards are split or merged, the supervisor recomputes shard to task group mappings. The supervisor also signals any running tasks created under the old mappings to stop early at current time + `repartitionTransitionDuration`. Stopping the tasks early allows Druid to begin reading from the new shards more quickly. The repartition transition wait time controlled by this property gives the stream additional time to write records to the new shards after the split or merge, which helps avoid issues with [empty shard handling](https://github.com/apache/druid/issues/7600).|No|PT2M|
|`offsetFetchPeriod`|ISO 8601 period|Determines how often the supervisor queries Kinesis and the indexing tasks to fetch current offsets and calculate lag. If the user-specified value is below the minimum value of PT5S, the supervisor ignores the value and uses the minimum value instead.|No|PT30S|
|`useListShards`|Boolean|Indicates if `listShards` API of AWS Kinesis SDK can be used to prevent `LimitExceededException` during ingestion. You must set the necessary `IAM` permissions.|No|`false`|
Expand Down Expand Up @@ -656,25 +654,22 @@ For more detail, see [Segment size optimization](../../operations/segment-optimi

Kinesis indexing tasks fetch records using `fetchThreads` threads.
If `fetchThreads` is higher than the number of Kinesis shards, the excess threads are unused.
Each fetch thread fetches up to `recordsPerFetch` records at once from a Kinesis shard, with a delay between fetches
Each fetch thread fetches up to 10 MB of records at once from a Kinesis shard, with a delay between fetches
of `fetchDelayMillis`.
The records fetched by each thread are pushed into a shared queue of size `recordBufferSize`.
The records fetched by each thread are pushed into a shared queue of size `recordBufferSizeBytes`.
The main runner thread for each task polls up to `maxRecordsPerPoll` records from the queue at once.

When using Kinesis Producer Library's aggregation feature, that is when [`deaggregate`](#deaggregation) is set,
each of these parameters refers to aggregated records rather than individual records.

The default values for these parameters are:

- `fetchThreads`: Twice the number of processors available to the task. The number of processors available to the task
is the total number of processors on the server, divided by `druid.worker.capacity` (the number of task slots on that
particular server).
particular server). This value is further limited so that the total data record data fetched at a given time does not
exceed 5% of the max heap configured, assuming that each thread fetches 10 MB of records at once. If the value specified
for this configuration is higher than this limit, no failure occurs, but a warning is logged, and the value is
implicitly lowered to the max allowed by this constraint.
- `fetchDelayMillis`: 0 (no delay between fetches).
- `recordsPerFetch`: 100 MB or an estimated 5% of available heap, whichever is smaller, divided by `fetchThreads`.
For estimation purposes, Druid uses a figure of 10 KB for regular records and 1 MB for [aggregated records](#deaggregation).
- `recordBufferSize`: 100 MB or an estimated 10% of available heap, whichever is smaller.
For estimation purposes, Druid uses a figure of 10 KB for regular records and 1 MB for [aggregated records](#deaggregation).
- `maxRecordsPerPoll`: 100 for regular records, 1 for [aggregated records](#deaggregation).
- `recordBufferSizeBytes`: 100 MB or an estimated 10% of available heap, whichever is smaller.
- `maxBytesPerPoll`: 1000000.

Kinesis places the following restrictions on calls to fetch records:

Expand All @@ -697,8 +692,6 @@ Kinesis stream.
The Kinesis indexing service supports de-aggregation of multiple rows packed into a single record by the Kinesis
Producer Library's aggregate method for more efficient data transfer.

To enable this feature, set `deaggregate` to true in your `ioConfig` when submitting a supervisor spec.

## Resharding

[Resharding](https://docs.aws.amazon.com/streams/latest/dev/kinesis-using-sdk-java-resharding.html) is an advanced operation that lets you adjust the number of shards in a stream to adapt to changes in the rate of data flowing through a stream.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import org.apache.druid.emitter.kafka.KafkaEmitterConfig.EventType;
import org.apache.druid.emitter.kafka.MemoryBoundLinkedBlockingQueue.ObjectContainer;
import org.apache.druid.java.util.common.MemoryBoundLinkedBlockingQueue;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.common.logger.Logger;
Expand Down Expand Up @@ -176,7 +176,7 @@ private void sendSegmentMetadataToKafka()

private void sendToKafka(final String topic, MemoryBoundLinkedBlockingQueue<String> recordQueue, Callback callback)
{
ObjectContainer<String> objectToSend;
MemoryBoundLinkedBlockingQueue.ObjectContainer<String> objectToSend;
try {
while (true) {
objectToSend = recordQueue.take();
Expand Down Expand Up @@ -206,7 +206,7 @@ public void emit(final Event event)

String resultJson = jsonMapper.writeValueAsString(map);

ObjectContainer<String> objectContainer = new ObjectContainer<>(
MemoryBoundLinkedBlockingQueue.ObjectContainer<String> objectContainer = new MemoryBoundLinkedBlockingQueue.ObjectContainer<>(
resultJson,
StringUtils.toUtf8(resultJson).length
);
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@
public class KinesisIndexTask extends SeekableStreamIndexTask<String, String, ByteEntity>
{
private static final String TYPE = "index_kinesis";

// GetRecords returns maximum 10MB per call
// (https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html)
private static final long GET_RECORDS_MAX_BYTES_PER_CALL = 10_000_000L;
private static final Logger log = new Logger(KinesisIndexTask.class);

private final boolean useListShards;
Expand Down Expand Up @@ -84,6 +88,14 @@ public KinesisIndexTask(
public TaskStatus runTask(TaskToolbox toolbox)
{
this.runtimeInfo = toolbox.getAdjustedRuntimeInfo();
if (getTuningConfig().getRecordBufferSizeConfigured() != null) {
log.warn("The 'recordBufferSize' config property of the kinesis tuning config has been deprecated. "
+ "Please use 'recordBufferSizeBytes'.");
}
if (getTuningConfig().getMaxRecordsPerPollConfigured() != null) {
log.warn("The 'maxRecordsPerPoll' config property of the kinesis tuning config has been deprecated. "
+ "Please use 'maxBytesPerPoll'.");
}
return super.runTask(toolbox);
}

Expand All @@ -105,21 +117,18 @@ protected KinesisRecordSupplier newTaskRecordSupplier(final TaskToolbox toolbox)
{
KinesisIndexTaskIOConfig ioConfig = ((KinesisIndexTaskIOConfig) super.ioConfig);
KinesisIndexTaskTuningConfig tuningConfig = ((KinesisIndexTaskTuningConfig) super.tuningConfig);
final int recordBufferSizeBytes =
tuningConfig.getRecordBufferSizeBytesOrDefault(runtimeInfo.getMaxHeapSizeBytes());
final int fetchThreads = computeFetchThreads(runtimeInfo, tuningConfig.getFetchThreads());
final int recordsPerFetch = ioConfig.getRecordsPerFetchOrDefault(runtimeInfo.getMaxHeapSizeBytes(), fetchThreads);
final int recordBufferSize =
tuningConfig.getRecordBufferSizeOrDefault(runtimeInfo.getMaxHeapSizeBytes(), ioConfig.isDeaggregate());
final int maxRecordsPerPoll = tuningConfig.getMaxRecordsPerPollOrDefault(ioConfig.isDeaggregate());
final int maxBytesPerPoll = tuningConfig.getMaxBytesPerPollOrDefault();

log.info(
"Starting record supplier with fetchThreads [%d], fetchDelayMillis [%d], recordsPerFetch [%d], "
+ "recordBufferSize [%d], maxRecordsPerPoll [%d], deaggregate [%s].",
"Starting record supplier with fetchThreads [%d], fetchDelayMillis [%d], "
+ "recordBufferSizeBytes [%d], maxBytesPerPoll [%d]",
fetchThreads,
ioConfig.getFetchDelayMillis(),
recordsPerFetch,
recordBufferSize,
maxRecordsPerPoll,
ioConfig.isDeaggregate()
recordBufferSizeBytes,
maxBytesPerPoll
);

return new KinesisRecordSupplier(
Expand All @@ -129,19 +138,24 @@ protected KinesisRecordSupplier newTaskRecordSupplier(final TaskToolbox toolbox)
ioConfig.getAwsAssumedRoleArn(),
ioConfig.getAwsExternalId()
),
recordsPerFetch,
ioConfig.getFetchDelayMillis(),
fetchThreads,
ioConfig.isDeaggregate(),
recordBufferSize,
recordBufferSizeBytes,
tuningConfig.getRecordBufferOfferTimeout(),
tuningConfig.getRecordBufferFullWait(),
maxRecordsPerPoll,
maxBytesPerPoll,
false,
useListShards
);
}

@Override
@JsonProperty
public KinesisIndexTaskTuningConfig getTuningConfig()
{
return (KinesisIndexTaskTuningConfig) super.getTuningConfig();
}

@Override
@JsonProperty("ioConfig")
public KinesisIndexTaskIOConfig getIOConfig()
Expand Down Expand Up @@ -179,15 +193,38 @@ AWSCredentialsConfig getAwsCredentialsConfig()
}

@VisibleForTesting
static int computeFetchThreads(final RuntimeInfo runtimeInfo, final Integer configuredFetchThreads)
static int computeFetchThreads(
final RuntimeInfo runtimeInfo,
final Integer configuredFetchThreads
)
{
final int fetchThreads;
int fetchThreads;
if (configuredFetchThreads != null) {
fetchThreads = configuredFetchThreads;
} else {
fetchThreads = runtimeInfo.getAvailableProcessors() * 2;
}

// Each fetchThread can return upto 10MB at a time
// (https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html), cap fetchThreads so that
// we don't exceed more than the least of 100MB or 5% of heap at a time. Don't fail if fetchThreads specified
// is greater than this as to not cause failure for older configurations, but log warning in this case, and lower
// fetchThreads implicitly.
final long memoryToUse = Math.min(
KinesisIndexTaskIOConfig.MAX_RECORD_FETCH_MEMORY,
(long) (runtimeInfo.getMaxHeapSizeBytes() * KinesisIndexTaskIOConfig.RECORD_FETCH_MEMORY_MAX_HEAP_FRACTION)
);
int maxFetchThreads = Math.max(
1,
(int) (memoryToUse / GET_RECORDS_MAX_BYTES_PER_CALL)
);
if (fetchThreads > maxFetchThreads) {
if (configuredFetchThreads != null) {
log.warn("fetchThreads [%d] being lowered to [%d]", configuredFetchThreads, maxFetchThreads);
}
fetchThreads = maxFetchThreads;
}

Preconditions.checkArgument(
fetchThreads > 0,
"Must have at least one background fetch thread for the record supplier"
Expand Down
Loading

0 comments on commit 9d4e805

Please sign in to comment.