-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Auto compaction based on parallel indexing #8570
Conversation
public class SegmentsSplitHintSpec implements SplitHintSpec | ||
{ | ||
public static final String TYPE = "segments"; | ||
public static final long DEFAULT_MAX_INPUT_SEGMENT_BYTES_PER_TASK = 150 * 1024 * 1024; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Access can be private
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, fixed.
@JsonProperty("maxInputSegmentBytesPerTask") @Nullable Long maxInputSegmentBytesPerTask | ||
) | ||
{ | ||
this.maxInputSegmentBytesPerTask = maxInputSegmentBytesPerTask == null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have to handle -1 as null for new specs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This parameter was added in #7048 and it doesn't count -1 as null. IMO, handling -1 as null is a legacy behavior and new parameters shouldn't do that.
docs/configuration/index.md
Outdated
@@ -815,9 +815,11 @@ If you see this problem, it's recommended to set `skipOffsetFromLatest` to some | |||
|`maxRowsInMemory`|See [tuningConfig for indexTask](../ingestion/native-batch.md#tuningconfig)|no (default = 1000000)| | |||
|`maxBytesInMemory`|See [tuningConfig for indexTask](../ingestion/native-batch.md#tuningconfig)|no (1/6 of max JVM memory)| | |||
|`maxTotalRows`|See [tuningConfig for indexTask](../ingestion/native-batch.md#tuningconfig)|no (default = 20000000)| | |||
|`splitHintSpec`|See [tuningConfig for indexTask](../ingestion/native-batch.md#tuningconfig)|no (default = null| |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: null -> null)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed, thanks.
return new ParallelIndexTuningConfig( | ||
null, | ||
null, | ||
getMaxRowsInMemory(), | ||
getMaxBytesInMemory(), | ||
null, | ||
null, | ||
splitHintSpec, | ||
partitionsSpec, | ||
getIndexSpec(), | ||
getIndexSpecForIntermediatePersists(), | ||
getMaxPendingPersists(), | ||
isForceGuaranteedRollup(), | ||
isReportParseExceptions(), | ||
getPushTimeout(), | ||
getSegmentWriteOutMediumFactory(), | ||
null, | ||
maxNumConcurrentSubTasks, | ||
maxRetry, | ||
taskStatusCheckPeriodMs, | ||
chatHandlerTimeout, | ||
chatHandlerNumRetries, | ||
maxNumSegmentsToMerge, | ||
totalNumMergeTasks, | ||
isLogParseExceptions(), | ||
getMaxParseExceptions(), | ||
getMaxSavedParseExceptions() | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Possibly make consistent to use either getters for all parameters or direct access for all
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to use getters.
); | ||
final List<ParallelIndexSupervisorTask> indexTaskSpecs = IntStream | ||
.range(0, ingestionSpecs.size()) | ||
.mapToObj(i -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool! Didn't know about IntStream
.
* compaction tasks, we should count the sub tasks of parallel indexing task as well. However, we currently | ||
* don't have a way to get the number of current running sub tasks except poking each supervisor task, | ||
* which is complex to handle all kinds of failures. | ||
* Here, instead, we compute a rough number of running sub tasks by summing maxNumConcurrentSubTasks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment is misleading as the summation is done in the caller of this method. Perhaps the comment should be moved/reworded.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops, forgot to update it after I moved this javadoc. Fixed.
if (tuningConfig != null && tuningConfig.getMaxNumConcurrentSubTasks() != null) { | ||
// The actual number of subtasks might be smaller than the configured max. | ||
// However, we use the max to simplify the estimation here. | ||
return tuningConfig.getMaxNumConcurrentSubTasks(); | ||
} else { | ||
return 0; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tuningConfig
defaults to a value of 1
for maxNumConcurrentSubTasks
. Is that inconsistent with this method returning a value of 0
if tuningConfig
/maxNumConcurrentSubTasks
is missing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If maxNumConcurrentSubTasks
is 1, the supervisor task runs in the sequential mode and processes data by itself instead of spawning sub tasks.
@@ -193,6 +222,7 @@ private CoordinatorStats doRun( | |||
taskId, | |||
Iterables.transform(segmentsToCompact, DataSegment::getId) | |||
); | |||
numSubmittedTasks += findNumMaxConcurrentSubTasks(config.getTuningConfig()) + 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment for the + 1
(similar to what you have for line 116) may be useful
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added.
|
||
private static ParallelIndexTuningConfig newTuningConfig() | ||
{ | ||
return new ParallelIndexTuningConfig( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's unfortunate that there isn't a builder
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah 😞
@@ -54,11 +59,15 @@ public static ClientCompactQueryTuningConfig from( | |||
userCompactionTaskQueryTuningConfig == null ? null : userCompactionTaskQueryTuningConfig.getMaxRowsInMemory(), | |||
userCompactionTaskQueryTuningConfig == null ? null : userCompactionTaskQueryTuningConfig.getMaxBytesInMemory(), | |||
userCompactionTaskQueryTuningConfig == null ? null : userCompactionTaskQueryTuningConfig.getMaxTotalRows(), | |||
userCompactionTaskQueryTuningConfig == null ? null : userCompactionTaskQueryTuningConfig.getSplitHintSpec(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May be more readable if the check for null userCompactionTaskQueryTuningConfig
is moved up:
if (userCompactionTaskQueryTuningConfig == null) {
return new ClientCompactQueryTuningConfig(
maxRowsPerSegment,
null,
null,
...
)
} else {
return new ClientCompactQueryTuningConfig(
maxRowsPerSegment,
userCompactionTaskQueryTuningConfig.getMaxRowsInMemory(),
userCompactionTaskQueryTuningConfig.getMaxBytesInMemory(),
...
)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. Fixed.
LOG.warn( | ||
"maxNumConcurrentSubTasks is 1. Running sequentially. " | ||
+ "Please set maxNumConcurrentSubTasks to something higher than 1 if you want to run in parallel ingestion mode." | ||
"maxNumConcurrentSubTasks[%s] is less than 1. Running sequentially. Please set maxNumConcurrentSubTasks " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Message should say "less than or equal to 1"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! Fixed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 after resolve conflicts (and CI)
{ | ||
public static final String TYPE = "segments"; | ||
|
||
private static final long DEFAULT_MAX_INPUT_SEGMENT_BYTES_PER_TASK = 150 * 1024 * 1024; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this value be larger maybe?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, increased to 500MB.
@ccaominh @clintropolis thank you for the review! |
Does this still create a |
Description
This PR is to allow for auto compaction to use the parallel indexing task. This will be useful when there are too many or large segments in a single time chunk.
New/changed configurations
splitHintSpec
, in the tuningConfig to allow for operators to give a hint to control the amount of data that each first phase sub task reads.SegmentsSplitHintSpec
is the only available option for now which is used only forIngestSegmentFirehose
.ParallelIndexTuningConfig
.maxNumConcurrentSubTasks
andsplitHintSpec
.This PR has:
This change is