From 8172127b57fd0ee92edda462dabdf2f2e2341602 Mon Sep 17 00:00:00 2001 From: "yuzhao.cyz" Date: Sat, 20 Nov 2021 10:23:01 +0800 Subject: [PATCH] [HUDI-2804] Add option to skip compaction instants for streaming read --- .../hudi/configuration/FlinkOptions.java | 11 ++++++++ .../hudi/source/IncrementalInputSplits.java | 25 ++++++++++++++++--- .../source/StreamReadMonitoringFunction.java | 4 ++- .../hudi/table/HoodieDataSourceITCase.java | 22 ++++++++++++++++ 4 files changed, 57 insertions(+), 5 deletions(-) diff --git a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index 65a95ed7c2b0..daeeb561d7a3 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -196,6 +196,17 @@ private FlinkOptions() { .defaultValue(60)// default 1 minute .withDescription("Check interval for streaming read of SECOND, default 1 minute"); + // this option is experimental + public static final ConfigOption READ_STREAMING_SKIP_COMPACT = ConfigOptions + .key("read.streaming.skip_compaction") + .booleanType() + .defaultValue(false)// default read as batch + .withDescription("Whether to skip compaction instants for streaming read,\n" + + "there are two cases that this option can be used to avoid reading duplicates:\n" + + "1) you are definitely sure that the consumer reads faster than any compaction instants, " + + "usually with delta time compaction strategy that is long enough, for e.g, one week;\n" + + "2) changelog mode is enabled, this option is a solution to keep data integrity"); + public static final String START_COMMIT_EARLIEST = "earliest"; public static final ConfigOption READ_START_COMMIT = ConfigOptions .key("read.start-commit") diff --git a/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java b/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java index d0fcc854dbe4..fbb77c63057f 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java +++ b/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java @@ -80,16 +80,20 @@ public class IncrementalInputSplits implements Serializable { private final long maxCompactionMemoryInBytes; // for partition pruning private final Set requiredPartitions; + // skip compaction + private final boolean skipCompaction; private IncrementalInputSplits( Configuration conf, Path path, long maxCompactionMemoryInBytes, - @Nullable Set requiredPartitions) { + @Nullable Set requiredPartitions, + boolean skipCompaction) { this.conf = conf; this.path = path; this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes; this.requiredPartitions = requiredPartitions; + this.skipCompaction = skipCompaction; } /** @@ -262,7 +266,7 @@ private List getArchivedMetadata( final String startTs = archivedCompleteTimeline.firstInstant().get().getTimestamp(); archivedTimeline.loadInstantDetailsInMemory(startTs, endTs); } - return instantStream + return maySkipCompaction(instantStream) .map(instant -> WriteProfiles.getCommitMetadata(tableName, path, instant, archivedTimeline)).collect(Collectors.toList()); } } @@ -299,7 +303,13 @@ private List filterInstantsWithRange( final String endCommit = this.conf.get(FlinkOptions.READ_END_COMMIT); instantStream = instantStream.filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), LESSER_THAN_OR_EQUALS, endCommit)); } - return instantStream.collect(Collectors.toList()); + return maySkipCompaction(instantStream).collect(Collectors.toList()); + } + + private Stream maySkipCompaction(Stream instants) { + return this.skipCompaction + ? instants.filter(instant -> !instant.getAction().equals(HoodieTimeline.COMMIT_ACTION)) + : instants; } private static List mergeList(List list1, List list2) { @@ -352,6 +362,8 @@ public static class Builder { private long maxCompactionMemoryInBytes; // for partition pruning private Set requiredPartitions; + // skip compaction + private boolean skipCompaction = false; public Builder() { } @@ -376,9 +388,14 @@ public Builder requiredPartitions(@Nullable Set requiredPartitions) { return this; } + public Builder skipCompaction(boolean skipCompaction) { + this.skipCompaction = skipCompaction; + return this; + } + public IncrementalInputSplits build() { return new IncrementalInputSplits(Objects.requireNonNull(this.conf), Objects.requireNonNull(this.path), - this.maxCompactionMemoryInBytes, this.requiredPartitions); + this.maxCompactionMemoryInBytes, this.requiredPartitions, this.skipCompaction); } } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java b/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java index c7bcc399ebc9..8138e931e54e 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java @@ -107,7 +107,9 @@ public StreamReadMonitoringFunction( .conf(conf) .path(path) .maxCompactionMemoryInBytes(maxCompactionMemoryInBytes) - .requiredPartitions(requiredPartitionPaths).build(); + .requiredPartitions(requiredPartitionPaths) + .skipCompaction(conf.getBoolean(FlinkOptions.READ_STREAMING_SKIP_COMPACT)) + .build(); } @Override diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java index 7d00a658b7f6..7d54a98018da 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java @@ -242,6 +242,28 @@ void testStreamWriteBatchReadOptimized() { assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT); } + @Test + void testStreamWriteReadSkippingCompaction() throws Exception { + // create filesystem table named source + String createSource = TestConfigurations.getFileSourceDDL("source"); + streamTableEnv.executeSql(createSource); + + String hoodieTableDDL = sql("t1") + .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) + .option(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ) + .option(FlinkOptions.READ_AS_STREAMING, true) + .option(FlinkOptions.READ_STREAMING_SKIP_COMPACT, true) + .option(FlinkOptions.COMPACTION_DELTA_COMMITS, 1) + .option(FlinkOptions.COMPACTION_TASKS, 1) + .end(); + streamTableEnv.executeSql(hoodieTableDDL); + String insertInto = "insert into t1 select * from source"; + execInsertSql(streamTableEnv, insertInto); + + List rows = execSelectSql(streamTableEnv, "select * from t1", 10); + assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT_LATEST_COMMIT); + } + @Test void testStreamWriteWithCleaning() { // create filesystem table named source