Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -209,72 +209,73 @@ DataStream<TaskResult> append(DataStream<Trigger> trigger) {

// Collect all data files
SingleOutputStreamOperator<MetadataTablePlanner.SplitInfo> splits =
trigger
.process(
new MetadataTablePlanner(
taskName(),
index(),
tableLoader(),
FILE_PATH_SCAN_CONTEXT,
MetadataTableType.ALL_FILES,
planningWorkerPoolSize))
.name(operatorName(PLANNER_TASK_NAME))
.uid(PLANNER_TASK_NAME + uidSuffix())
.slotSharingGroup(slotSharingGroup())
.forceNonParallel();
setSlotSharingGroup(
trigger
.process(
new MetadataTablePlanner(
taskName(),
index(),
tableLoader(),
FILE_PATH_SCAN_CONTEXT,
MetadataTableType.ALL_FILES,
planningWorkerPoolSize))
.name(operatorName(PLANNER_TASK_NAME))
.uid(PLANNER_TASK_NAME + uidSuffix())
.forceNonParallel());

// Read the records and get all data files
SingleOutputStreamOperator<String> tableDataFiles =
splits
.rebalance()
.process(
new FileNameReader(
taskName(),
index(),
tableLoader(),
FILE_PATH_SCHEMA,
FILE_PATH_SCAN_CONTEXT,
MetadataTableType.ALL_FILES))
.name(operatorName(READER_TASK_NAME))
.uid(READER_TASK_NAME + uidSuffix())
.slotSharingGroup(slotSharingGroup())
.setParallelism(parallelism());
setSlotSharingGroup(
splits
.rebalance()
.process(
new FileNameReader(
taskName(),
index(),
tableLoader(),
FILE_PATH_SCHEMA,
FILE_PATH_SCAN_CONTEXT,
MetadataTableType.ALL_FILES))
.name(operatorName(READER_TASK_NAME))
.uid(READER_TASK_NAME + uidSuffix())
.setParallelism(parallelism()));

// Collect all meta data files
SingleOutputStreamOperator<String> tableMetadataFiles =
trigger
.process(new ListMetadataFiles(taskName(), index(), tableLoader()))
.name(operatorName(METADATA_FILES_TASK_NAME))
.uid(METADATA_FILES_TASK_NAME + uidSuffix())
.slotSharingGroup(slotSharingGroup())
.forceNonParallel();
setSlotSharingGroup(
trigger
.process(new ListMetadataFiles(taskName(), index(), tableLoader()))
.name(operatorName(METADATA_FILES_TASK_NAME))
.uid(METADATA_FILES_TASK_NAME + uidSuffix())
.forceNonParallel());

// List the all file system files
SingleOutputStreamOperator<String> allFsFiles =
trigger
.process(
new ListFileSystemFiles(
taskName(),
index(),
tableLoader(),
location,
minAge.toMillis(),
usePrefixListing))
.name(operatorName(FILESYSTEM_FILES_TASK_NAME))
.uid(FILESYSTEM_FILES_TASK_NAME + uidSuffix())
.slotSharingGroup(slotSharingGroup())
.forceNonParallel();
setSlotSharingGroup(
trigger
.process(
new ListFileSystemFiles(
taskName(),
index(),
tableLoader(),
location,
minAge.toMillis(),
usePrefixListing))
.name(operatorName(FILESYSTEM_FILES_TASK_NAME))
.uid(FILESYSTEM_FILES_TASK_NAME + uidSuffix())
.forceNonParallel());

SingleOutputStreamOperator<String> filesToDelete =
tableMetadataFiles
.union(tableDataFiles)
.keyBy(new FileUriKeySelector(equalSchemes, equalAuthorities))
.connect(allFsFiles.keyBy(new FileUriKeySelector(equalSchemes, equalAuthorities)))
.process(new OrphanFilesDetector(prefixMismatchMode, equalSchemes, equalAuthorities))
.slotSharingGroup(slotSharingGroup())
.name(operatorName(FILTER_FILES_TASK_NAME))
.uid(FILTER_FILES_TASK_NAME + uidSuffix())
.setParallelism(parallelism());
setSlotSharingGroup(
tableMetadataFiles
.union(tableDataFiles)
.keyBy(new FileUriKeySelector(equalSchemes, equalAuthorities))
.connect(allFsFiles.keyBy(new FileUriKeySelector(equalSchemes, equalAuthorities)))
.process(
new OrphanFilesDetector(prefixMismatchMode, equalSchemes, equalAuthorities))
.name(operatorName(FILTER_FILES_TASK_NAME))
.uid(FILTER_FILES_TASK_NAME + uidSuffix())
.setParallelism(parallelism()));

DataStream<Exception> errorStream =
tableMetadataFiles
Expand All @@ -287,38 +288,38 @@ DataStream<TaskResult> append(DataStream<Trigger> trigger) {

// Stop deleting the files if there is an error
SingleOutputStreamOperator<String> filesOrSkip =
filesToDelete
.connect(errorStream)
.transform(
operatorName(SKIP_ON_ERROR_TASK_NAME),
TypeInformation.of(String.class),
new SkipOnError())
.uid(SKIP_ON_ERROR_TASK_NAME + uidSuffix())
.slotSharingGroup(slotSharingGroup())
.forceNonParallel();
setSlotSharingGroup(
filesToDelete
.connect(errorStream)
.transform(
operatorName(SKIP_ON_ERROR_TASK_NAME),
TypeInformation.of(String.class),
new SkipOnError())
.uid(SKIP_ON_ERROR_TASK_NAME + uidSuffix())
.forceNonParallel());

// delete the files
filesOrSkip
.rebalance()
.transform(
operatorName(DELETE_FILES_TASK_NAME),
TypeInformation.of(Void.class),
new DeleteFilesProcessor(
tableLoader().loadTable(), taskName(), index(), deleteBatchSize))
.uid(DELETE_FILES_TASK_NAME + uidSuffix())
.slotSharingGroup(slotSharingGroup())
.setParallelism(parallelism());
setSlotSharingGroup(
filesOrSkip
.rebalance()
.transform(
operatorName(DELETE_FILES_TASK_NAME),
TypeInformation.of(Void.class),
new DeleteFilesProcessor(
tableLoader().loadTable(), taskName(), index(), deleteBatchSize))
.uid(DELETE_FILES_TASK_NAME + uidSuffix())
.setParallelism(parallelism()));

// Ignore the file deletion result and return the DataStream<TaskResult> directly
return trigger
.connect(errorStream)
.transform(
operatorName(AGGREGATOR_TASK_NAME),
TypeInformation.of(TaskResult.class),
new TaskResultAggregator(tableName(), taskName(), index()))
.uid(AGGREGATOR_TASK_NAME + uidSuffix())
.slotSharingGroup(slotSharingGroup())
.forceNonParallel();
return setSlotSharingGroup(
trigger
.connect(errorStream)
.transform(
operatorName(AGGREGATOR_TASK_NAME),
TypeInformation.of(TaskResult.class),
new TaskResultAggregator(tableName(), taskName(), index()))
.uid(AGGREGATOR_TASK_NAME + uidSuffix())
.forceNonParallel());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,30 +126,30 @@ DataStream<TaskResult> append(DataStream<Trigger> trigger) {
Preconditions.checkNotNull(tableLoader(), "TableLoader should not be null");

SingleOutputStreamOperator<TaskResult> result =
trigger
.process(
new ExpireSnapshotsProcessor(
tableLoader(),
maxSnapshotAge == null ? null : maxSnapshotAge.toMillis(),
numSnapshots,
planningWorkerPoolSize,
cleanExpiredMetadata))
.name(operatorName(EXECUTOR_OPERATOR_NAME))
.uid(EXECUTOR_OPERATOR_NAME + uidSuffix())
.slotSharingGroup(slotSharingGroup())
.forceNonParallel();

result
.getSideOutput(ExpireSnapshotsProcessor.DELETE_STREAM)
.rebalance()
.transform(
operatorName(DELETE_FILES_OPERATOR_NAME),
TypeInformation.of(Void.class),
new DeleteFilesProcessor(
tableLoader().loadTable(), taskName(), index(), deleteBatchSize))
.uid(DELETE_FILES_OPERATOR_NAME + uidSuffix())
.slotSharingGroup(slotSharingGroup())
.setParallelism(parallelism());
setSlotSharingGroup(
trigger
.process(
new ExpireSnapshotsProcessor(
tableLoader(),
maxSnapshotAge == null ? null : maxSnapshotAge.toMillis(),
numSnapshots,
planningWorkerPoolSize,
cleanExpiredMetadata))
.name(operatorName(EXECUTOR_OPERATOR_NAME))
.uid(EXECUTOR_OPERATOR_NAME + uidSuffix())
.forceNonParallel());

setSlotSharingGroup(
result
.getSideOutput(ExpireSnapshotsProcessor.DELETE_STREAM)
.rebalance()
.transform(
operatorName(DELETE_FILES_OPERATOR_NAME),
TypeInformation.of(Void.class),
new DeleteFilesProcessor(
tableLoader().loadTable(), taskName(), index(), deleteBatchSize))
.uid(DELETE_FILES_OPERATOR_NAME + uidSuffix())
.setParallelism(parallelism()));

// Ignore the file deletion result and return the DataStream<TaskResult> directly
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
import org.apache.iceberg.Table;
import org.apache.iceberg.flink.FlinkConfParser;

Expand Down Expand Up @@ -60,7 +59,7 @@ public class FlinkMaintenanceConfig {
public static final ConfigOption<String> SLOT_SHARING_GROUP_OPTION =
ConfigOptions.key(SLOT_SHARING_GROUP)
.stringType()
.defaultValue(StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP)
.noDefaultValue()
.withDescription(
"The slot sharing group for maintenance tasks. "
+ "Determines which operators can share slots in the Flink execution environment.");
Expand Down Expand Up @@ -114,8 +113,7 @@ public String slotSharingGroup() {
.stringConf()
.option(SLOT_SHARING_GROUP)
.flinkConfig(SLOT_SHARING_GROUP_OPTION)
.defaultValue(SLOT_SHARING_GROUP_OPTION.defaultValue())
.parse();
.parseOptional();
}

public RewriteDataFilesConfig createRewriteDataFilesConfig() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,4 +226,8 @@ DataStream<TaskResult> append(

return append(sourceStream);
}

<O> SingleOutputStreamOperator<O> setSlotSharingGroup(SingleOutputStreamOperator<O> operator) {
return slotSharingGroup == null ? operator : operator.slotSharingGroup(slotSharingGroup);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -290,58 +290,58 @@ public Builder config(RewriteDataFilesConfig rewriteDataFilesConfig) {
@Override
DataStream<TaskResult> append(DataStream<Trigger> trigger) {
SingleOutputStreamOperator<DataFileRewritePlanner.PlannedGroup> planned =
trigger
.process(
new DataFileRewritePlanner(
tableName(),
taskName(),
index(),
tableLoader(),
partialProgressEnabled ? partialProgressMaxCommits : 1,
maxRewriteBytes,
rewriteOptions,
filterSupplier,
branch))
.name(operatorName(PLANNER_TASK_NAME))
.uid(PLANNER_TASK_NAME + uidSuffix())
.slotSharingGroup(slotSharingGroup())
.forceNonParallel();
setSlotSharingGroup(
trigger
.process(
new DataFileRewritePlanner(
tableName(),
taskName(),
index(),
tableLoader(),
partialProgressEnabled ? partialProgressMaxCommits : 1,
maxRewriteBytes,
rewriteOptions,
filterSupplier,
branch))
.name(operatorName(PLANNER_TASK_NAME))
.uid(PLANNER_TASK_NAME + uidSuffix())
.forceNonParallel());

SingleOutputStreamOperator<DataFileRewriteRunner.ExecutedGroup> rewritten =
planned
.rebalance()
.process(new DataFileRewriteRunner(tableName(), taskName(), index()))
.name(operatorName(REWRITE_TASK_NAME))
.uid(REWRITE_TASK_NAME + uidSuffix())
.slotSharingGroup(slotSharingGroup())
.setParallelism(parallelism());
setSlotSharingGroup(
planned
.rebalance()
.process(new DataFileRewriteRunner(tableName(), taskName(), index()))
.name(operatorName(REWRITE_TASK_NAME))
.uid(REWRITE_TASK_NAME + uidSuffix())
.setParallelism(parallelism()));

SingleOutputStreamOperator<Trigger> updated =
rewritten
.transform(
operatorName(COMMIT_TASK_NAME),
TypeInformation.of(Trigger.class),
new DataFileRewriteCommitter(
tableName(), taskName(), index(), tableLoader(), branch))
.uid(COMMIT_TASK_NAME + uidSuffix())
.slotSharingGroup(slotSharingGroup())
.forceNonParallel();
setSlotSharingGroup(
rewritten
.transform(
operatorName(COMMIT_TASK_NAME),
TypeInformation.of(Trigger.class),
new DataFileRewriteCommitter(
tableName(), taskName(), index(), tableLoader(), branch))
.uid(COMMIT_TASK_NAME + uidSuffix())
.forceNonParallel());

return trigger
.union(updated)
.connect(
planned
.getSideOutput(TaskResultAggregator.ERROR_STREAM)
.union(
rewritten.getSideOutput(TaskResultAggregator.ERROR_STREAM),
updated.getSideOutput(TaskResultAggregator.ERROR_STREAM)))
.transform(
operatorName(AGGREGATOR_TASK_NAME),
TypeInformation.of(TaskResult.class),
new TaskResultAggregator(tableName(), taskName(), index()))
.uid(AGGREGATOR_TASK_NAME + uidSuffix())
.slotSharingGroup(slotSharingGroup())
.forceNonParallel();
return setSlotSharingGroup(
trigger
.union(updated)
.connect(
planned
.getSideOutput(TaskResultAggregator.ERROR_STREAM)
.union(
rewritten.getSideOutput(TaskResultAggregator.ERROR_STREAM),
updated.getSideOutput(TaskResultAggregator.ERROR_STREAM)))
.transform(
operatorName(AGGREGATOR_TASK_NAME),
TypeInformation.of(TaskResult.class),
new TaskResultAggregator(tableName(), taskName(), index()))
.uid(AGGREGATOR_TASK_NAME + uidSuffix())
.forceNonParallel());
}
}
}
Loading