Skip to content

Commit

Permalink
[HUDI-2881] Compact the file group with larger log files to reduce wr…
Browse files Browse the repository at this point in the history
…ite amplification (#4152)
  • Loading branch information
minihippo committed Dec 2, 2021
1 parent f4c25ba commit 5284730
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,12 @@ public class HoodieCompactionConfig extends HoodieConfig {
.withDocumentation("Amount of MBs to spend during compaction run for the LogFileSizeBasedCompactionStrategy. "
+ "This value helps bound ingestion latency while compaction is run inline mode.");

public static final ConfigProperty<Long> COMPACTION_LOG_FILE_SIZE_THRESHOLD = ConfigProperty
.key("hoodie.compaction.logfile.size.threshold")
.defaultValue(0L)
.withDocumentation("Only if the log file size is greater than the threshold in bytes,"
+ " the file group will be compacted.");

public static final ConfigProperty<String> COMPACTION_STRATEGY = ConfigProperty
.key("hoodie.compaction.strategy")
.defaultValue(LogFileSizeBasedCompactionStrategy.class.getName())
Expand Down Expand Up @@ -598,6 +604,11 @@ public Builder withTargetPartitionsPerDayBasedCompaction(int targetPartitionsPer
return this;
}

public Builder withLogFileSizeThresholdBasedCompaction(long logFileSizeThreshold) {
compactionConfig.setValue(COMPACTION_LOG_FILE_SIZE_THRESHOLD, String.valueOf(logFileSizeThreshold));
return this;
}

public Builder withCommitsArchivalBatchSize(int batchSize) {
compactionConfig.setValue(COMMITS_ARCHIVAL_BATCH_SIZE, String.valueOf(batchSize));
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1133,6 +1133,10 @@ public Long getTargetIOPerCompactionInMB() {
return getLong(HoodieCompactionConfig.TARGET_IO_PER_COMPACTION_IN_MB);
}

public Long getCompactionLogFileSizeThreshold() {
return getLong(HoodieCompactionConfig.COMPACTION_LOG_FILE_SIZE_THRESHOLD);
}

public Boolean getCompactionLazyBlockReadEnabled() {
return getBoolean(HoodieCompactionConfig.COMPACTION_LAZY_BLOCK_READ_ENABLE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@
import java.util.stream.Collectors;

/**
* LogFileSizeBasedCompactionStrategy orders the compactions based on the total log files size and limits the
* LogFileSizeBasedCompactionStrategy orders the compactions based on the total log files size,
* filters the file group which log files size is greater than the threshold and limits the
* compactions within a configured IO bound.
*
* @see BoundedIOCompactionStrategy
Expand All @@ -39,8 +40,12 @@ public class LogFileSizeBasedCompactionStrategy extends BoundedIOCompactionStrat
@Override
public List<HoodieCompactionOperation> orderAndFilter(HoodieWriteConfig writeConfig,
List<HoodieCompactionOperation> operations, List<HoodieCompactionPlan> pendingCompactionPlans) {
// Filter the file group which log files size is greater than the threshold in bytes.
// Order the operations based on the reverse size of the logs and limit them by the IO
return super.orderAndFilter(writeConfig, operations.stream().sorted(this).collect(Collectors.toList()),
long threshold = writeConfig.getCompactionLogFileSizeThreshold();
return super.orderAndFilter(writeConfig, operations.stream()
.filter(e -> e.getMetrics().getOrDefault(TOTAL_LOG_FILE_SIZE, 0d) >= threshold)
.sorted(this).collect(Collectors.toList()),
pendingCompactionPlans);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,19 +99,20 @@ public void testLogFileSizeCompactionSimple() {
sizesMap.put(90 * MB, Collections.singletonList(1024 * MB));
LogFileSizeBasedCompactionStrategy strategy = new LogFileSizeBasedCompactionStrategy();
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath("/tmp").withCompactionConfig(
HoodieCompactionConfig.newBuilder().withCompactionStrategy(strategy).withTargetIOPerCompactionInMB(400).build())
HoodieCompactionConfig.newBuilder().withCompactionStrategy(strategy).withTargetIOPerCompactionInMB(1205)
.withLogFileSizeThresholdBasedCompaction(100 * 1024 * 1024).build())
.build();
List<HoodieCompactionOperation> operations = createCompactionOperations(writeConfig, sizesMap);
List<HoodieCompactionOperation> returned = strategy.orderAndFilter(writeConfig, operations, new ArrayList<>());

assertTrue(returned.size() < operations.size(),
"LogFileSizeBasedCompactionStrategy should have resulted in fewer compactions");
assertEquals(1, returned.size(), "LogFileSizeBasedCompactionStrategy should have resulted in 1 compaction");
assertEquals(2, returned.size(), "LogFileSizeBasedCompactionStrategy should have resulted in 2 compaction");
// Total size of all the log files
Long returnedSize = returned.stream().map(s -> s.getMetrics().get(BoundedIOCompactionStrategy.TOTAL_IO_MB))
.map(Double::longValue).reduce(Long::sum).orElse(0L);
assertEquals(1204, (long) returnedSize,
"Should chose the first 2 compactions which should result in a total IO of 690 MB");
assertEquals(1594, (long) returnedSize,
"Should chose the first 2 compactions which should result in a total IO of 1594 MB");
}

@Test
Expand Down

0 comments on commit 5284730

Please sign in to comment.