Skip to content

Commit

Permalink
[HUDI-2804] Add option to skip compaction instants for streaming read
Browse files Browse the repository at this point in the history
  • Loading branch information
danny0405 committed Nov 20, 2021
1 parent 0230d40 commit 8172127
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 5 deletions.
Expand Up @@ -196,6 +196,17 @@ private FlinkOptions() {
.defaultValue(60)// default 1 minute
.withDescription("Check interval for streaming read of SECOND, default 1 minute");

// this option is experimental
public static final ConfigOption<Boolean> READ_STREAMING_SKIP_COMPACT = ConfigOptions
.key("read.streaming.skip_compaction")
.booleanType()
.defaultValue(false)// default read as batch
.withDescription("Whether to skip compaction instants for streaming read,\n"
+ "there are two cases that this option can be used to avoid reading duplicates:\n"
+ "1) you are definitely sure that the consumer reads faster than any compaction instants, "
+ "usually with delta time compaction strategy that is long enough, for e.g, one week;\n"
+ "2) changelog mode is enabled, this option is a solution to keep data integrity");

public static final String START_COMMIT_EARLIEST = "earliest";
public static final ConfigOption<String> READ_START_COMMIT = ConfigOptions
.key("read.start-commit")
Expand Down
Expand Up @@ -80,16 +80,20 @@ public class IncrementalInputSplits implements Serializable {
private final long maxCompactionMemoryInBytes;
// for partition pruning
private final Set<String> requiredPartitions;
// skip compaction
private final boolean skipCompaction;

private IncrementalInputSplits(
Configuration conf,
Path path,
long maxCompactionMemoryInBytes,
@Nullable Set<String> requiredPartitions) {
@Nullable Set<String> requiredPartitions,
boolean skipCompaction) {
this.conf = conf;
this.path = path;
this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes;
this.requiredPartitions = requiredPartitions;
this.skipCompaction = skipCompaction;
}

/**
Expand Down Expand Up @@ -262,7 +266,7 @@ private List<HoodieCommitMetadata> getArchivedMetadata(
final String startTs = archivedCompleteTimeline.firstInstant().get().getTimestamp();
archivedTimeline.loadInstantDetailsInMemory(startTs, endTs);
}
return instantStream
return maySkipCompaction(instantStream)
.map(instant -> WriteProfiles.getCommitMetadata(tableName, path, instant, archivedTimeline)).collect(Collectors.toList());
}
}
Expand Down Expand Up @@ -299,7 +303,13 @@ private List<HoodieInstant> filterInstantsWithRange(
final String endCommit = this.conf.get(FlinkOptions.READ_END_COMMIT);
instantStream = instantStream.filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), LESSER_THAN_OR_EQUALS, endCommit));
}
return instantStream.collect(Collectors.toList());
return maySkipCompaction(instantStream).collect(Collectors.toList());
}

private Stream<HoodieInstant> maySkipCompaction(Stream<HoodieInstant> instants) {
return this.skipCompaction
? instants.filter(instant -> !instant.getAction().equals(HoodieTimeline.COMMIT_ACTION))
: instants;
}

private static <T> List<T> mergeList(List<T> list1, List<T> list2) {
Expand Down Expand Up @@ -352,6 +362,8 @@ public static class Builder {
private long maxCompactionMemoryInBytes;
// for partition pruning
private Set<String> requiredPartitions;
// skip compaction
private boolean skipCompaction = false;

public Builder() {
}
Expand All @@ -376,9 +388,14 @@ public Builder requiredPartitions(@Nullable Set<String> requiredPartitions) {
return this;
}

public Builder skipCompaction(boolean skipCompaction) {
this.skipCompaction = skipCompaction;
return this;
}

public IncrementalInputSplits build() {
return new IncrementalInputSplits(Objects.requireNonNull(this.conf), Objects.requireNonNull(this.path),
this.maxCompactionMemoryInBytes, this.requiredPartitions);
this.maxCompactionMemoryInBytes, this.requiredPartitions, this.skipCompaction);
}
}
}
Expand Up @@ -107,7 +107,9 @@ public StreamReadMonitoringFunction(
.conf(conf)
.path(path)
.maxCompactionMemoryInBytes(maxCompactionMemoryInBytes)
.requiredPartitions(requiredPartitionPaths).build();
.requiredPartitions(requiredPartitionPaths)
.skipCompaction(conf.getBoolean(FlinkOptions.READ_STREAMING_SKIP_COMPACT))
.build();
}

@Override
Expand Down
Expand Up @@ -242,6 +242,28 @@ void testStreamWriteBatchReadOptimized() {
assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT);
}

@Test
void testStreamWriteReadSkippingCompaction() throws Exception {
// create filesystem table named source
String createSource = TestConfigurations.getFileSourceDDL("source");
streamTableEnv.executeSql(createSource);

String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
.option(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ)
.option(FlinkOptions.READ_AS_STREAMING, true)
.option(FlinkOptions.READ_STREAMING_SKIP_COMPACT, true)
.option(FlinkOptions.COMPACTION_DELTA_COMMITS, 1)
.option(FlinkOptions.COMPACTION_TASKS, 1)
.end();
streamTableEnv.executeSql(hoodieTableDDL);
String insertInto = "insert into t1 select * from source";
execInsertSql(streamTableEnv, insertInto);

List<Row> rows = execSelectSql(streamTableEnv, "select * from t1", 10);
assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT_LATEST_COMMIT);
}

@Test
void testStreamWriteWithCleaning() {
// create filesystem table named source
Expand Down

0 comments on commit 8172127

Please sign in to comment.