Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support minor compaction with segment locking #7547

Merged
merged 71 commits into from
Jul 25, 2019

Conversation

jihoonson
Copy link
Contributor

@jihoonson jihoonson commented Apr 25, 2019

Apologize for the huge PR. I tried to split into sub PRs, but I think it's not easy split more because new changes for segment locking are tightly coupled together and it won't be easy to understand if they are in separate PRs.

This PR fixes #7491 and now it's ready for review.

Here are the key classes to be reviewed.

Overshadowable

New interface to represent a class which can have overshadow relation between its instances. Ex) dataSegment

ShardSpecFactory

New interface to be used to allocate segments remotely in the overlord.

NumberedOverwriteShardSpec

New shardSpec for segments which may overshadow others with their minor version.

DataSegment

  • getVersion is renamed to getMajorVersion.
  • Some short circuit methods to check overshadow relation between segments.

VersionedIntervalTimeline

Improved to consider the new overshadow relation properly.

OvershadowableManager

A new class manages the state of AtomicUpdateGroup.

AtomicUpdateGruop

A set of PartitionChunks which should be atomically visible or not in the timeline.

SegmentTransactionalInsertAction

Added a new parameter segmentsToBeOverwritten. Shouldn't be empty if the new segments are created with segment locks.

AbstractBatchIndexTask

Abstract class for batch tasks like indexTask or parallelIndexTask. Provides some methods for easily acquiring task locks.

IndexTaskSegmentAllocator

Segment allocator interface for only IndexTask. It has 3 different modes for allocating segments.

The new segment locking is currently available for only Kafka/Kinesis index task, index task, and parallel index task.

This PR is marked as Incompatible because it introduces a new shardSpec. It's not possible to roll back after this PR. For rolling update while kafka/kinesis indexing service is running, any kind of overwriting tasks (including compaction task and index task) should be stopped first. Otherwise, kafka/kinesis index tasks might fail when they list segments with the new shardSpec generated by overwriting tasks. Native parallel index tasks cannot be run in middleManagers of mixed versions, so they should be stopped first before update.

Currently there is a known issue of potential race in task locking. If two or more overwriting tasks try to acquire segment locks for overlapped sets of segments, each task acquires locks one by one per segment in the current implementation. This may lead to failures of both tasks if they hold locks necessary for each other. However, I think this happens rarely and so will fix in a follow-up PR.

I've been testing this PR in our in-house cluster since I raised this PR and it looks fine so far.

@jihoonson jihoonson changed the title Minor compaction Add support minor compaction with segment locking Apr 25, 2019
@clintropolis
Copy link
Member

@jihoonson these measurements are great, thanks! I will try to finish up my review sometime this week 😅


/**
* Abstract class for batch tasks like {@link IndexTask}.
* Provides some methods ({@link #determineSegmentGranularity} and {@link #determineSegmentGranularity}) for easily acquiring task
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

determineSegmentGranularity is mentioned twice here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, fixed javadoc.

{
if (requireLockExistingSegments()) {
if (isPerfectRollup()) {
log.info("Using timeChunk lock for perfrect rollup");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perfrect -> perfect

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, fixed.

}

@Nullable
public OverwritingRootGenerationPartitions getOverwritingSegmentMeta(Interval interval)
private static class LockGranularityDeterminResult
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LockGranularityDeterminResult -> LockGranularityDetermineResult

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, fixed.


for (SegmentIdWithShardSpec segmentIdentifier : idsPerInterval) {
shardSpecMap.computeIfAbsent(interval, k -> new ArrayList<>()).add(segmentIdentifier.getShardSpec());
// The shardSpecs for partitinoing and publishing can be different if isExtendableShardSpecs = true.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

partitinoing -> partitioning

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, fixed.

if (bucketIntervals.isPresent()) {
// If the granularity spec has explicit intervals, we just need to find the interval (of the segment
// granularity); we already tried to lock it at task startup.
// If granularity spec has explicit intervals, we just need to find the version accociated to the interval.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

accociated -> associated

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, fixed.

Copy link
Contributor

@jon-wei jon-wei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

/**
* Returns true if this segment overshadows the other segment.
*/
default boolean isOvershadow(T other)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the purpose of this method would be clearer if it was called something like overshadows or doesOvershadow or willOvershadow. It reads with the intention x.overshadows(y) of x.willOvershadow(y) is very clear that it's a check if y will be overshadowed by x.

I think isOvershadow it too close to x is overshadowed by y, which is why I keep having to check the javadoc to remember which one is the overshadowed segment, and will also help it standout/distinguish it from the timeline isOvershadowed method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed to overshadows().

final Set<DataSegment> inputSegments = isUseSegmentLock()
? getSegmentLockHelper().getLockedExistingSegments()
: null;
final SegmentsAndMetadata published = awaitPublish(driver.publishAll(inputSegments, publisher), pushTimeout);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is driver.publishAll going to be sad if inputSegments is null because not using segment lock? could you explain this a bit better in comments?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment.


private boolean tryTimeChunkLock(TaskActionClient client, List<Interval> intervals) throws IOException
{
// In this case, the intervals to lock must be alighed with segmentGranularity if it's defined
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo:

... must be aligned with ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, fixed.

}

final List<Short2ObjectMap.Entry<AtomicUpdateGroup<T>>> found = new ArrayList<>();
while (current != null && rangeOfAug.contains(current.getKey())) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this method could maybe use a few more comments to make understanding what is going on a bit easier

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 added comments and javadoc.

Copy link
Member

@clintropolis clintropolis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm, 👍

@jihoonson jihoonson merged commit db14946 into apache:master Jul 25, 2019
@jihoonson
Copy link
Contributor Author

@jon-wei @clintropolis thank you for the review!!

@wengwh
Copy link

wengwh commented Jun 5, 2020

before the version,we use hadoop-task and kafka-task ingestion data into the same datasource, is success.
but upgrade new version, if one segment is ingestion by hadoop task,the shardSpec will be hashed,
the kafka task ingestion the same segment will exception
org.apache.druid.java.util.common.ISE: Could not allocate segment for row with timestamp

now we must run compact task to solve the exception

Is there any new solution to this problem?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Introducing minor compaction with segment locking
4 participants