-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add SegmentAllocationQueue to batch allocation actions #13369
Conversation
This pull request introduces 2 alerts when merging 0524e83 into 78d0b0a - view on LGTM.com new alerts:
Heads-up: LGTM.com's PR analysis will be disabled on the 5th of December, and LGTM.com will be shut down ⏻ completely on the 16th of December 2022. Please enable GitHub code scanning, which uses the same CodeQL engine ⚙️ that powers LGTM.com. For more information, please check out our post on the GitHub blog. |
This pull request introduces 2 alerts when merging b546f6c into 78d0b0a - view on LGTM.com new alerts:
Heads-up: LGTM.com's PR analysis will be disabled on the 5th of December, and LGTM.com will be shut down ⏻ completely on the 16th of December 2022. Please enable GitHub code scanning, which uses the same CodeQL engine ⚙️ that powers LGTM.com. For more information, please check out our post on the GitHub blog. |
This pull request introduces 2 alerts when merging 4c66afd into bf10ff7 - view on LGTM.com new alerts:
Heads-up: LGTM.com's PR analysis will be disabled on the 5th of December, and LGTM.com will be shut down ⏻ completely on the 16th of December 2022. Please enable GitHub code scanning, which uses the same CodeQL engine ⚙️ that powers LGTM.com. For more information, please check out our post on the GitHub blog. |
This pull request introduces 2 alerts when merging e07e852 into bf10ff7 - view on LGTM.com new alerts:
Heads-up: LGTM.com's PR analysis will be disabled on the 5th of December, and LGTM.com will be shut down ⏻ completely on the 16th of December 2022. Please enable GitHub code scanning, which uses the same CodeQL engine ⚙️ that powers LGTM.com. For more information, please check out our post on the GitHub blog. |
This pull request introduces 2 alerts when merging 92479da into edd076c - view on LGTM.com new alerts:
Heads-up: LGTM.com's PR analysis will be disabled on the 5th of December, and LGTM.com will be shut down ⏻ completely on the 16th of December 2022. Please enable GitHub code scanning, which uses the same CodeQL engine ⚙️ that powers LGTM.com. For more information, please check out our post on the GitHub blog. |
final Set<DataSegment> usedSegments = retrieveUsedSegments(requestKey); | ||
final int successCount = allocateSegmentsForBatch(requestBatch, usedSegments); | ||
|
||
emitBatchMetric("task/action/batch/retries", 1L, requestKey); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: task/action/batch/attempts
seems more accurate
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! Yeah, I had originally used attempts
but then moved the code around a bit and this got missed.
return true; | ||
} | ||
|
||
// Requeue the batch only if used segments have changed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please explain a bit more about this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is to retain behaviour from the existing flow. If the set of used segments has changed since we started processing the request, it's possible that retrying might make it pass.
Line 223 in 79df11c
if (!newUsedSegmentsForRow.equals(usedSegmentsForRow)) { |
holderList.getPending().forEach(holder -> acquireTaskLock(holder, false)); | ||
} | ||
|
||
// TODO: for failed allocations, cleanup newly created locks from the posse map |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this need to be addressed as part of these changes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, let me check this. If not done, this might leave some garbage locks in the posse map.
...g-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java
Outdated
Show resolved
Hide resolved
emitTaskMetric("task/action/success/count", 1L, request); | ||
requestToFuture.remove(request).complete(result.getSegmentId()); | ||
} else if (request.canRetry()) { | ||
log.debug( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be info
or warn
I think. If the retry is causing a delay, we will only know it from logs.
holderList.getPending().forEach(holder -> acquireTaskLock(holder, false)); | ||
} | ||
|
||
// TODO: for failed allocations, cleanup newly created locks from the posse map |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can this todo be resolved?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I need to verify if this is an actual issue. Also, this behaviour is the same in the old flow too.
I think I should tackle this in an immediate follow up PR, so that this PR just adds the new flow without modifying any behaviour.
log.debug("Added a new batch [%s] to queue.", batch.key); | ||
return true; | ||
} else { | ||
log.warn("Cannot add batch [%s] as queue is full. Failing [%d] requests.", batch.key, batch.size()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what happens as a result of this? I assume it will be caused when the metadata store is slow. can there be any other reason? should we add some possible corrective action here like checking metadata store sizing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It can also happen when things keep getting retried, due to frequent updates to the set of used segments. It would be up to the client to retry in such cases.
The action can be to look at the emitted metrics batch/runTime
and batch/attempts
. A greater value of the first would be caused by metadata slowness, the second is retries. The user can then determine if the metadata store needs resizing. What do you think?
if (!isLeader.get()) { | ||
return; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this necessary? what if some items were added to the queue just before line 225?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess that is not likely to happen. New items are rejected if we are not the leader.
But maybe it could happen if the leader is flapping?
- t1: we are not leader, we clear the queue
- t2: become leader
- t3: add a request
- t4: stop being leader
- t1: we are not leader, return
the request added by t3 will thus remain stuck in the queue, until we become leader again and process it. The future will timeout after 5 minutes.
A simpler alternative to all of this can be to have the leadership check done for each batch separately in processBatch
(this check should probably exist anyway). If not leader, fail it there.
The differences would be:
- even non-leaders would keep polling the queue
- requests would wait until their due time to be failed
Please let me know what you think.
++numProcessedBatches; | ||
} | ||
catch (Throwable t) { | ||
processed = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
well there could be db failures for example. If we don't retry, we should surface the failure to the caller.
|
||
for (SegmentCreateRequest request : requests) { | ||
// Check if the required segment has already been created in this batch | ||
final String sequenceHash = getSequenceNameAndPrevIdSha(request, interval, skipSegmentLineageCheck); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When can this occur?
Also, SegmentCreateRequest states that requests must be treated distinctly even if all fields are the same.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Multiple requests for the same segment, in the case of task replicas, I should think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SegmentCreateRequest states that requests must be treated distinctly even if all fields are the same.
The SegmentCreateRequest
is different, the segment id created is not different.
// If there is an existing chunk, find the max id with the same version as the existing chunk. | ||
// There may still be a pending segment with a higher version (but no corresponding used segments) | ||
// which may generate a clash with an existing segment once the new id is generated | ||
final SegmentIdWithShardSpec overallMaxId = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The overallMaxId is computed assuming that committedMaxId has already added to the set of pendingSegments.
I think it's better to not make that assumption and add committedMaxId explicitly in this method
This pull request introduces 2 alerts when merging a03ba2e into 50963ed - view on LGTM.com new alerts:
Heads-up: LGTM.com's PR analysis will be disabled on the 5th of December, and LGTM.com will be shut down ⏻ completely on the 16th of December 2022. Please enable GitHub code scanning, which uses the same CodeQL engine ⚙️ that powers LGTM.com. For more information, please check out our post on the GitHub blog. |
@AmatyaAvadhanula , @abhishekagarwal87 , thanks for the review. I have addressed your feedback. Changes made:
Changes in follow up PR:
|
This pull request introduces 2 alerts when merging 0d09a6e into f6f625e - view on LGTM.com new alerts:
Heads-up: LGTM.com's PR analysis will be disabled on the 5th of December, and LGTM.com will be shut down ⏻ completely on the 16th of December 2022. Please enable GitHub code scanning, which uses the same CodeQL engine ⚙️ that powers LGTM.com. For more information, please check out our post on the GitHub blog. |
final Map<Interval, List<SegmentAllocateRequest>> overlapIntervalToRequests = new HashMap<>(); | ||
|
||
for (SegmentAllocateRequest request : allRequests) { | ||
// If there is an overlapping used segment, the interval of the used segment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The allocation for a single segment considers overlap with queryGranularity.bucket(row.timestamp())
This code fetches all used segments overlapping with segmentGranularity.bucket(row.timestamp()). I think this could lead to a case where the row is allocated to a used segment interval to which it doesn't belong
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The used segments are fetched for the allocate interval (segmentGranularity.bucket(row.timestamp())
) so that we don't have to make the metadata call multiple times.
The logic hasn't changed because we use the rowInterval
(queryGranularity.bucket(row.timestamp())
) to find the actual overlapping interval. Please look at the line after this.
This pull request introduces 2 alerts when merging c132748 into 138a6de - view on LGTM.com new alerts:
Heads-up: LGTM.com's PR analysis will be disabled on the 5th of December, and LGTM.com will be shut down ⏻ completely on the 16th of December 2022. Please enable GitHub code scanning, which uses the same CodeQL engine ⚙️ that powers LGTM.com. For more information, please check out our post on the GitHub blog. |
This pull request introduces 2 alerts when merging 29bc0d4 into 30498c1 - view on LGTM.com new alerts:
Heads-up: LGTM.com's PR analysis will be disabled on the 5th of December, and LGTM.com will be shut down ⏻ completely on the 16th of December 2022. Please enable GitHub code scanning, which uses the same CodeQL engine ⚙️ that powers LGTM.com. For more information, please check out our post on the GitHub blog. |
All unit and integration tests have run successfully with segment allocation enabled in test PR #13484 . |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @kfaraz. LGTM
Merging as build is green. Latest commit is only comment change. |
) In a cluster with a large number of streaming tasks (~1000), SegmentAllocateActions on the overlord can often take very long intervals of time to finish thus causing spikes in the `task/action/run/time`. This may result in lag building up while a task waits for a segment to get allocated. The root causes are: - large number of metadata calls made to the segments and pending segments tables - `giant` lock held in `TaskLockbox.tryLock()` to acquire task locks and allocate segments Since the contention typically arises when several tasks of the same datasource try to allocate segments for the same interval/granularity, the allocation run times can be improved by batching the requests together. Changes - Add flags - `druid.indexer.tasklock.batchSegmentAllocation` (default `false`) - `druid.indexer.tasklock.batchAllocationMaxWaitTime` (in millis) (default `1000`) - Add methods `canPerformAsync` and `performAsync` to `TaskAction` - Submit each allocate action to a `SegmentAllocationQueue`, and add to correct batch - Process batch after `batchAllocationMaxWaitTime` - Acquire `giant` lock just once per batch in `TaskLockbox` - Reduce metadata calls by batching statements together and updating query filters - Except for batching, retain the whole behaviour (order of steps, retries, etc.) - Respond to leadership changes and fail items in queue when not leader - Emit batch and request level metrics
This pull request introduces 2 alerts when merging 88c4cca into 9177419 - view on LGTM.com new alerts:
Heads-up: LGTM.com's PR analysis will be disabled on the 5th of December, and LGTM.com will be shut down ⏻ completely on the 16th of December 2022. Please enable GitHub code scanning, which uses the same CodeQL engine ⚙️ that powers LGTM.com. For more information, please check out our post on the GitHub blog. |
…13493) In a cluster with a large number of streaming tasks (~1000), SegmentAllocateActions on the overlord can often take very long intervals of time to finish thus causing spikes in the `task/action/run/time`. This may result in lag building up while a task waits for a segment to get allocated. The root causes are: - large number of metadata calls made to the segments and pending segments tables - `giant` lock held in `TaskLockbox.tryLock()` to acquire task locks and allocate segments Since the contention typically arises when several tasks of the same datasource try to allocate segments for the same interval/granularity, the allocation run times can be improved by batching the requests together. Changes - Add flags - `druid.indexer.tasklock.batchSegmentAllocation` (default `false`) - `druid.indexer.tasklock.batchAllocationMaxWaitTime` (in millis) (default `1000`) - Add methods `canPerformAsync` and `performAsync` to `TaskAction` - Submit each allocate action to a `SegmentAllocationQueue`, and add to correct batch - Process batch after `batchAllocationMaxWaitTime` - Acquire `giant` lock just once per batch in `TaskLockbox` - Reduce metadata calls by batching statements together and updating query filters - Except for batching, retain the whole behaviour (order of steps, retries, etc.) - Respond to leadership changes and fail items in queue when not leader - Emit batch and request level metrics
Description
In a cluster with a large number of streaming tasks (~1000),
SegmentAllocateActions
on the overlord can often take very long intervals of time to finish thus causing spikes in thetask/action/run/time
. This may result in lag building up while a task waits for a segment to get allocated.The root causes are:
TaskLockbox.tryLock()
to acquire task locks and allocate segmentsSince the contention typically arises when several tasks of the same datasource try to allocate segments for the same interval/granularity, the allocation run times can be improved by batching the requests together.
Changes
Broad strokes
druid.indexer.tasklock.batchSegmentAllocation
(default false)druid.indexer.tasklock.batchAllocationMaxWaitTime
(in millis) (default 1000)batchAllocationMaxWaitTime
SegmentAllocationQueue
to queue and batch allocate actionsdataSource
groupId
preferredAllocationInterval
skipSegmentLineageCheck
useNonRootGenerationPartitionSpace
(as a proxy forPartialShardSpec.getClass()
)lockGranularity
TaskLockbox
Adding a request
canPerformAsync
andperformAsync
toTaskAction
performAsync
currently throws anUnsupportedOperationException
for other action typesSegmentAllocationQueue
Processing a request batch
batchAllocationMaxWaitTime
before processing a batchTaskLockbox
to acquire task locks and allocate segmentsAllocating segments
TaskLockbox.allocateSegments()
retaining the logic fromTaskLockbox.tryLock()
IndexerMetadataStorageCoordinator.allocatePendingSegments()
retaining the logic fromallocatePendingSegment()
New metrics
Per batch (dims:
dataSource
,taskActionType=segmentAllocate
):task/action/batch/runTime
task/action/batch/queueTime
task/action/batch/size
task/action/batch/attempts
Per request (dims:
taskId
,taskType
,dataSource
,taskActionType=segmentAllocate
):task/action/failed/count
task/action/success/count
Pending changes
Further improvements
HttpServletRequest.startAsync()
. This would allow batching of even more requests.HttpServletRequest.startAsync()
.skipLineageCheck = false
.Release note
Speed up segment allocation and reduce metadata calls by clubbing multiple requests together.
Set
druid.indexer.tasklock.batchSegmentAllocation=true
to enable this feature.