Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,14 @@ private FlinkOptions() {
+ "usually with delta time compaction strategy that is long enough, for e.g, one week;\n"
+ "2) changelog mode is enabled, this option is a solution to keep data integrity");

// this option is experimental
public static final ConfigOption<Boolean> READ_STREAMING_SKIP_CLUSTERING = ConfigOptions
.key("read.streaming.skip_clustering")
.booleanType()
.defaultValue(true)
.withDescription("Whether to skip clustering instants for streaming read,\n"
+ "to avoid reading duplicates");

public static final String START_COMMIT_EARLIEST = "earliest";
public static final ConfigOption<String> READ_START_COMMIT = ConfigOptions
.key("read.start-commit")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.hudi.sink.partitioner.profile.WriteProfiles;
import org.apache.hudi.table.format.cdc.CdcInputSplit;
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
import org.apache.hudi.util.ClusteringUtil;
import org.apache.hudi.util.StreamerUtil;

import org.apache.flink.configuration.Configuration;
Expand Down Expand Up @@ -95,20 +96,24 @@ public class IncrementalInputSplits implements Serializable {
private final Set<String> requiredPartitions;
// skip compaction
private final boolean skipCompaction;
// skip clustering
private final boolean skipClustering;

private IncrementalInputSplits(
Configuration conf,
Path path,
RowType rowType,
long maxCompactionMemoryInBytes,
@Nullable Set<String> requiredPartitions,
boolean skipCompaction) {
boolean skipCompaction,
boolean skipClustering) {
this.conf = conf;
this.path = path;
this.rowType = rowType;
this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes;
this.requiredPartitions = requiredPartitions;
this.skipCompaction = skipCompaction;
this.skipClustering = skipClustering;
}

/**
Expand Down Expand Up @@ -446,7 +451,7 @@ private List<HoodieCommitMetadata> getArchivedMetadata(
HoodieTimeline archivedCompleteTimeline = archivedTimeline.getCommitsTimeline().filterCompletedInstants();
if (!archivedCompleteTimeline.empty()) {
Stream<HoodieInstant> instantStream = archivedCompleteTimeline.getInstants();
return maySkipCompaction(instantStream)
return filterInstantsByCondition(instantStream, archivedTimeline)
.map(instant -> WriteProfiles.getCommitMetadata(tableName, path, instant, archivedTimeline)).collect(Collectors.toList());
}
}
Expand All @@ -466,7 +471,7 @@ private List<HoodieInstant> filterInstantsWithRange(
HoodieTimeline completedTimeline = commitTimeline.filterCompletedInstants();
if (issuedInstant != null) {
// returns early for streaming mode
return maySkipCompaction(completedTimeline.getInstants())
return filterInstantsByCondition(completedTimeline.getInstants(), commitTimeline)
.filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), GREATER_THAN, issuedInstant))
.collect(Collectors.toList());
}
Expand All @@ -482,13 +487,20 @@ private List<HoodieInstant> filterInstantsWithRange(
final String endCommit = this.conf.get(FlinkOptions.READ_END_COMMIT);
instantStream = instantStream.filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), LESSER_THAN_OR_EQUALS, endCommit));
}
return maySkipCompaction(instantStream).collect(Collectors.toList());
return filterInstantsByCondition(instantStream, commitTimeline).collect(Collectors.toList());
}

private Stream<HoodieInstant> maySkipCompaction(Stream<HoodieInstant> instants) {
return this.skipCompaction
? instants.filter(instant -> !instant.getAction().equals(HoodieTimeline.COMMIT_ACTION))
: instants;
/**
* Filters out the unnecessary instants by user specified condition.
*
* @param instants The instants to filter
* @param timeline The timeline
*
* @return the filtered instants
*/
private Stream<HoodieInstant> filterInstantsByCondition(Stream<HoodieInstant> instants, HoodieTimeline timeline) {
return instants.filter(instant -> !this.skipCompaction || !instant.getAction().equals(HoodieTimeline.COMPACTION_ACTION))
.filter(instant -> !this.skipClustering || !ClusteringUtil.isClusteringInstant(instant, timeline));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

!this.skipClustering|| -> !this.skipClustering ||


private static <T> List<T> mergeList(List<T> list1, List<T> list2) {
Expand Down Expand Up @@ -544,6 +556,8 @@ public static class Builder {
private Set<String> requiredPartitions;
// skip compaction
private boolean skipCompaction = false;
// skip clustering
private boolean skipClustering = true;

public Builder() {
}
Expand Down Expand Up @@ -578,10 +592,15 @@ public Builder skipCompaction(boolean skipCompaction) {
return this;
}

public Builder skipClustering(boolean skipClustering) {
this.skipClustering = skipClustering;
return this;
}

public IncrementalInputSplits build() {
return new IncrementalInputSplits(
Objects.requireNonNull(this.conf), Objects.requireNonNull(this.path), Objects.requireNonNull(this.rowType),
this.maxCompactionMemoryInBytes, this.requiredPartitions, this.skipCompaction);
this.maxCompactionMemoryInBytes, this.requiredPartitions, this.skipCompaction, this.skipClustering);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ public StreamReadMonitoringFunction(
.maxCompactionMemoryInBytes(maxCompactionMemoryInBytes)
.requiredPartitions(requiredPartitionPaths)
.skipCompaction(conf.getBoolean(FlinkOptions.READ_STREAMING_SKIP_COMPACT))
.skipClustering(conf.getBoolean(FlinkOptions.READ_STREAMING_SKIP_CLUSTERING))
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,21 @@
package org.apache.hudi.util;

import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineUtils;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.table.HoodieFlinkTable;

import org.apache.flink.configuration.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -77,4 +82,18 @@ public static void rollbackClustering(HoodieFlinkTable<?> table, HoodieFlinkWrit
table.getMetaClient().reloadActiveTimeline();
});
}

/**
* Returns whether the given instant {@code instant} is with clustering operation.
*/
public static boolean isClusteringInstant(HoodieInstant instant, HoodieTimeline timeline) {
if (!instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) {
return false;
}
try {
return TimelineUtils.getCommitMetadata(instant, timeline).getOperationType().equals(WriteOperationType.CLUSTER);
} catch (IOException e) {
throw new HoodieException("Resolve replace commit metadata error for instant: " + instant, e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,35 @@ void testStreamWriteReadSkippingCompaction() throws Exception {
assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT_LATEST_COMMIT);
}

@Test
void testStreamWriteReadSkippingClustering() throws Exception {
// create filesystem table named source
String createSource = TestConfigurations.getFileSourceDDL("source", 4);
streamTableEnv.executeSql(createSource);

String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
.option(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_COPY_ON_WRITE)
.option(FlinkOptions.READ_AS_STREAMING, true)
.option(FlinkOptions.READ_STREAMING_SKIP_CLUSTERING, true)
.option(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED,true)
.option(FlinkOptions.CLUSTERING_ASYNC_ENABLED, true)
.option(FlinkOptions.CLUSTERING_DELTA_COMMITS,1)
.option(FlinkOptions.CLUSTERING_TASKS, 1)
.end();
streamTableEnv.executeSql(hoodieTableDDL);
String insertInto = "insert into t1 select * from source";
execInsertSql(streamTableEnv, insertInto);

String instant = TestUtils.getNthCompleteInstant(tempFile.getAbsolutePath(), 2, true);

streamTableEnv.getConfig().getConfiguration()
.setBoolean("table.dynamic-table-options.enabled", true);
final String query = String.format("select * from t1/*+ options('read.start-commit'='%s')*/", instant);
List<Row> rows = execSelectSql(streamTableEnv, query, 10);
assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT_LATEST_COMMIT);
}

@Test
void testStreamWriteWithCleaning() {
// create filesystem table named source
Expand Down