Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stateful auto compaction #8489

Closed
jihoonson opened this issue Sep 7, 2019 · 4 comments · Fixed by #8573
Closed

Stateful auto compaction #8489

jihoonson opened this issue Sep 7, 2019 · 4 comments · Fixed by #8573

Comments

@jihoonson
Copy link
Contributor

jihoonson commented Sep 7, 2019

Motivation

In auto compaction, the coordinator searches for segments to compact based on their byte size. This algorithm is currently a stateless algorithm. At each coordinator run, it traverses all segments of all datasources from the latest one to the oldest one, compares their size against targetCompactionSizeBytes (this is currently missing which causes #8481), and issues a compaction task if it finds some segments smaller than targetCompactionSizeBytes.

However, only comparing the segment size against targetCompactionSizeBytes is not enough to tell that a given segment requires a further compaction or not because the segment could be created with various types of partitionsSpec and later compacted with one of them. (As of now, auto compaction supports only maxRowsPerSegment, maxTotalRows, and targetCompactionSizeBytes, but it should support all partitionsSpec types in the future.)

As of now, we have 3 partitionsSpec, i.e., DynamicPartitionsSpec, HashedPartitionsSpec, and SingleDimensionPartitionsSpec.

  • DynamicPartitionsSpec has maxRowsPerSegment, and maxTotalRows.
  • HashedPartitionsSpec has targetPartitionSize (target number of rows per segment), numShards, and partitionDimensions.
  • SingleDimensionPartitionsSpec has targetPartitionSize (target number of rows per segment), maxPartitionSize (max number of rows per segment), partitionDimensions.

In the coordinator, most of these configurations are not easy to use to search for segments which need compaction because the number of rows in each segment is not available in the coordinator. And even if we had that in the coordinator, hash or range partitioned segments could have totally different number of rows from targetPartitionSize, which makes hard to tell a given segment needs compaction or not. Note that a segment doesn't need further compaction if it's already compacted with the same partitionsSpec. Auto compaction based on parallel indexing can also make things complicated. In parallel indexing with DynamicPartitionsSpec, the last segment created by each task can have rows less than maxRowsPerSegment.

Proposed changes

To address this issue, I propose to store the state of auto compaction in the metadata store, so that auto compaction can search for compaction candidates from the segments which are not compacted yet.

DataSegment change

A new compactionPartitionsSpec will be added to DataSegment. compactionPartitionsSpec will be filled in only the coordinator.

public class DataSegment
{
  private final Integer binaryVersion;
  private final SegmentId id;
  @Nullable
  private final Map<String, Object> loadSpec;
  private final List<String> dimensions;
  private final List<String> metrics;
  private final ShardSpec shardSpec;
  @Nullable
  private final PartitionsSpec compactionPartitionsSpec;
  private final long size;
}

Changes in publishing segments

When a compaction task creates segments, it can add its partitionsSpec to them (Appenderator.add()). Other task types don't add it.

Changes in the coordinator

SQLMetadataSegmentManager loads DataSegment and keeps them in memory same as now. Since PartitionsSpec is not very diverse in general, interning would be useful to reduce memory usage.

DruidCoordinatorSegmentCompactor checks that the compactionPartitionsSpec is available for a given segment. If it's missing, that segment should be a new segment created by a non-compaction task, which means it will be a compaction candidate. If it exists, the coordinator compares the compactionPartitionsSpec with the partitionsSpec in auto compaction configuration. The segment will be a compaction candidate if it has a different partitionsSpec.

Rationale

Dropped alternative

The coordinator can get the number of rows of each segment from the system schema to find the compaction candidates. However, as mentioned in the motivation section, number of rows is not enough to determine that a given segment needs a compaction or not.

Operational impact

There should be no operational impact.

Test plan

  • Will add unit tests
  • Will test in our internal cluster

Future work

In DataSegment, similar to compactionPartitionsSpec, there are a couple of fields which are loaded only in some particular places; they are null or some default value elsewhere. But, even when they are null or default, they will still use 8 more bytes per segment. I think it would probably be worth to split DataSegment into several classes, so that only necessary fields are loaded to reduce memory usage.

@himanshug
Copy link
Contributor

just trying to wrap my head around compaction, I have few questions, it has evolved alot :)

  1. Does coordinator auto compaction always do "minor compaction" ( described in Introducing minor compaction with segment locking #7491 )?
  2. Same CompactionTask is used for both "minor" as well as "major" compaction. It is major compaction when either of "interval" or "segmentGranularity" fields are explicitly provided ?

If it exists, the coordinator compares the compactionPartitionsSpec that partitionsSpec in auto compaction configuration. The segment will be a compaction candidate if it has a different partitionsSpec.

or else auto/minor compaction would produce exactly same result because it doesn't change segmentGranularity... right ?

@jihoonson
Copy link
Contributor Author

@himanshug thank you for taking a look!

  1. Does coordinator auto compaction always do "minor compaction" ( described in Introducing minor compaction with segment locking #7491 )?

For now, auto compaction doesn't support minor compaction. You can do minor compaction with the compaction task by setting forceGuaranteedRollup.

The future plan I'm thinking of is supporting both minor and major compaction in auto compaction. Like, it will do minor compaction for recent segments while doing major compaction for old ones. There are a couple reasons why we still need major compaction. One reason could be, we can do secondary partitioning with major compaction which could accelerate query performance by further pruning segments.

  1. Same CompactionTask is used for both "minor" as well as "major" compaction. It is major compaction when either of "interval" or "segmentGranularity" fields are explicitly provided ?

Good question. It is the major compaction if 1) forceGuaranteedRollup is true or 2) segmentGranularity is provided explicitly. The second condition should actually be "if the provided segment granularity is different from that of input segments". But I was lazy to implement it in #7547..

If it exists, the coordinator compares the compactionPartitionsSpec with the partitionsSpec in auto compaction configuration. The segment will be a compaction candidate if it has a different partitionsSpec.

or else auto/minor compaction would produce exactly same result because it doesn't change segmentGranularity... right ?

You mean if the compaction task has the exactly same tuningConfig? Then I think yes.

@himanshug
Copy link
Contributor

You mean if the compaction task has the exactly same tuningConfig? Then I think yes.

I see, but then why segment is not candidate for compaction when it has same partitionSpec .. if tuningConfig is different then it could still produce different result, no ?

Thanks for clarifications around major/minor compaction. I think confusion arises because there is no trace of major/minor compaction in the code. Do you think it makes sense to create two new classes , MajorCompactionTask and MinorCompactionTask , both extend CompactionTask but automatically have conditions/restrictions, you noted, taken care of . Then it will be very easy to see from code what type of compaction is happening in a certain situation.

I totally recognize the need for major compaction as ,I think, minor compaction is limited to compact segments in same partition set (i.e. segments with same dataSource, interval and version) only.

@jihoonson
Copy link
Contributor Author

I see, but then why segment is not candidate for compaction when it has same partitionSpec .. if tuningConfig is different then it could still produce different result, no ?

That's a good point! I just checked IndexTuningConfig and ParallelIndexTuningConfig and it looks like only partitionsSpec and indexSpec can change the result. forceGuaranteedRollup can also change the result but it requires to use HashedPartitionsSpec or SingleDimensionPartitionsSpec. (Perhaps we can just remove forceGuaranteedRollup and use only partitionsSpec.) It means, probably indexSpec also needs to be stored for stateful auto compaction.

Thanks for clarifications around major/minor compaction. I think confusion arises because there is no trace of major/minor compaction in the code. Do you think it makes sense to create two new classes , MajorCompactionTask and MinorCompactionTask , both extend CompactionTask but automatically have conditions/restrictions, you noted, taken care of . Then it will be very easy to see from code what type of compaction is happening in a certain situation.

Hmm, yes it sounds good to me. For now, MinorCompactionTask needs to use DynamicPartitionsSpec in tuningConfig and forceTimeChunkLock = false in its context (I forgot to mention the task context thing earlier). But in the future, I think it might be useful to support all kinds of partitionsSpec for minor compaction as well. Like, you may want to do range-based partitioning for published segments while new stream data is coming into the same time chunk to take advantage of the segment pruning at query time.

I totally recognize the need for major compaction as ,I think, minor compaction is limited to compact segments in same partition set (i.e. segments with same dataSource, interval and version) only.

That's also correct. Minor compaction can compact only the segments in the same time chunk.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants