Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-2881] Compact the file group with larger log files to reduce wr… #4152

Merged
merged 3 commits into from
Dec 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,12 @@ public class HoodieCompactionConfig extends HoodieConfig {
.withDocumentation("Amount of MBs to spend during compaction run for the LogFileSizeBasedCompactionStrategy. "
+ "This value helps bound ingestion latency while compaction is run inline mode.");

public static final ConfigProperty<Long> COMPACTION_LOG_FILE_SIZE_THRESHOLD = ConfigProperty
.key("hoodie.compaction.logfile.size.threshold")
.defaultValue(0L)
.withDocumentation("Only if the log file size is greater than the threshold in bytes,"
+ " the file group will be compacted.");

public static final ConfigProperty<String> COMPACTION_STRATEGY = ConfigProperty
.key("hoodie.compaction.strategy")
.defaultValue(LogFileSizeBasedCompactionStrategy.class.getName())
Expand Down Expand Up @@ -588,6 +594,11 @@ public Builder withTargetPartitionsPerDayBasedCompaction(int targetPartitionsPer
return this;
}

public Builder withLogFileSizeThresholdBasedCompaction(long logFileSizeThreshold) {
compactionConfig.setValue(COMPACTION_LOG_FILE_SIZE_THRESHOLD, String.valueOf(logFileSizeThreshold));
return this;
}

public Builder withCommitsArchivalBatchSize(int batchSize) {
compactionConfig.setValue(COMMITS_ARCHIVAL_BATCH_SIZE, String.valueOf(batchSize));
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1127,6 +1127,10 @@ public Long getTargetIOPerCompactionInMB() {
return getLong(HoodieCompactionConfig.TARGET_IO_PER_COMPACTION_IN_MB);
}

public Long getCompactionLogFileSizeThreshold() {
return getLong(HoodieCompactionConfig.COMPACTION_LOG_FILE_SIZE_THRESHOLD);
}

public Boolean getCompactionLazyBlockReadEnabled() {
return getBoolean(HoodieCompactionConfig.COMPACTION_LAZY_BLOCK_READ_ENABLE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@
import java.util.stream.Collectors;

/**
* LogFileSizeBasedCompactionStrategy orders the compactions based on the total log files size and limits the
* LogFileSizeBasedCompactionStrategy orders the compactions based on the total log files size,
* filters the file group which log files size is greater than the threshold and limits the
* compactions within a configured IO bound.
*
* @see BoundedIOCompactionStrategy
Expand All @@ -39,8 +40,12 @@ public class LogFileSizeBasedCompactionStrategy extends BoundedIOCompactionStrat
@Override
public List<HoodieCompactionOperation> orderAndFilter(HoodieWriteConfig writeConfig,
List<HoodieCompactionOperation> operations, List<HoodieCompactionPlan> pendingCompactionPlans) {
// Filter the file group which log files size is greater than the threshold in bytes.
// Order the operations based on the reverse size of the logs and limit them by the IO
return super.orderAndFilter(writeConfig, operations.stream().sorted(this).collect(Collectors.toList()),
long threshold = writeConfig.getCompactionLogFileSizeThreshold();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here i think the default value of "hoodie.compaction.logfile.size.threshold" should be 0 to keep compatibility and once set it to other value and then take effect.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

already fix

return super.orderAndFilter(writeConfig, operations.stream()
.filter(e -> e.getMetrics().getOrDefault(TOTAL_LOG_FILE_SIZE, 0d) >= threshold)
.sorted(this).collect(Collectors.toList()),
pendingCompactionPlans);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,19 +99,20 @@ public void testLogFileSizeCompactionSimple() {
sizesMap.put(90 * MB, Collections.singletonList(1024 * MB));
LogFileSizeBasedCompactionStrategy strategy = new LogFileSizeBasedCompactionStrategy();
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath("/tmp").withCompactionConfig(
HoodieCompactionConfig.newBuilder().withCompactionStrategy(strategy).withTargetIOPerCompactionInMB(400).build())
HoodieCompactionConfig.newBuilder().withCompactionStrategy(strategy).withTargetIOPerCompactionInMB(1205)
.withLogFileSizeThresholdBasedCompaction(100 * 1024 * 1024).build())
.build();
List<HoodieCompactionOperation> operations = createCompactionOperations(writeConfig, sizesMap);
List<HoodieCompactionOperation> returned = strategy.orderAndFilter(writeConfig, operations, new ArrayList<>());

assertTrue(returned.size() < operations.size(),
"LogFileSizeBasedCompactionStrategy should have resulted in fewer compactions");
assertEquals(1, returned.size(), "LogFileSizeBasedCompactionStrategy should have resulted in 1 compaction");
assertEquals(2, returned.size(), "LogFileSizeBasedCompactionStrategy should have resulted in 2 compaction");
// Total size of all the log files
Long returnedSize = returned.stream().map(s -> s.getMetrics().get(BoundedIOCompactionStrategy.TOTAL_IO_MB))
.map(Double::longValue).reduce(Long::sum).orElse(0L);
assertEquals(1204, (long) returnedSize,
"Should chose the first 2 compactions which should result in a total IO of 690 MB");
assertEquals(1594, (long) returnedSize,
"Should chose the first 2 compactions which should result in a total IO of 1594 MB");
}

@Test
Expand Down