Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.actions.BinPackRewriteFilePlanner;
import org.apache.iceberg.actions.SizeBasedFileRewritePlanner;
import org.apache.iceberg.expressions.Expression;
Expand Down Expand Up @@ -59,6 +60,7 @@ public static class Builder extends MaintenanceTaskBuilder<RewriteDataFiles.Buil
private final Map<String, String> rewriteOptions = Maps.newHashMapWithExpectedSize(6);
private long maxRewriteBytes = Long.MAX_VALUE;
private Expression filter = Expressions.alwaysTrue();
private String branch = SnapshotRef.MAIN_BRANCH;

@Override
String maintenanceTaskName() {
Expand Down Expand Up @@ -218,6 +220,18 @@ public Builder filter(Expression newFilter) {
return this;
}

/**
* Sets the branch to compact. When set, the planner reads from the branch's snapshot and
* commits are made to this branch.
*
* @param newBranch the branch name
* @return this for method chaining
*/
public Builder branch(String newBranch) {
this.branch = newBranch;
return this;
}

/**
* Configures the properties for the rewriter.
*
Expand Down Expand Up @@ -262,7 +276,8 @@ DataStream<TaskResult> append(DataStream<Trigger> trigger) {
partialProgressEnabled ? partialProgressMaxCommits : 1,
maxRewriteBytes,
rewriteOptions,
filter))
filter,
branch))
.name(operatorName(PLANNER_TASK_NAME))
.uid(PLANNER_TASK_NAME + uidSuffix())
.slotSharingGroup(slotSharingGroup())
Expand All @@ -282,7 +297,8 @@ DataStream<TaskResult> append(DataStream<Trigger> trigger) {
.transform(
operatorName(COMMIT_TASK_NAME),
TypeInformation.of(Trigger.class),
new DataFileRewriteCommitter(tableName(), taskName(), index(), tableLoader()))
new DataFileRewriteCommitter(
tableName(), taskName(), index(), tableLoader(), branch))
.uid(COMMIT_TASK_NAME + uidSuffix())
.slotSharingGroup(slotSharingGroup())
.forceNonParallel();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,14 @@
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.Table;
import org.apache.iceberg.actions.RewriteDataFiles;
import org.apache.iceberg.actions.RewriteDataFilesCommitManager;
import org.apache.iceberg.actions.RewriteDataFilesCommitManager.CommitService;
import org.apache.iceberg.actions.RewriteFileGroup;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.maintenance.api.Trigger;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -52,6 +54,7 @@ public class DataFileRewriteCommitter extends AbstractStreamOperator<Trigger>
private final String taskName;
private final int taskIndex;
private final TableLoader tableLoader;
private final String branch;

private transient Table table;
private transient CommitService commitService;
Expand All @@ -62,15 +65,17 @@ public class DataFileRewriteCommitter extends AbstractStreamOperator<Trigger>
private transient Counter removedDataFileSizeCounter;

public DataFileRewriteCommitter(
String tableName, String taskName, int taskIndex, TableLoader tableLoader) {
String tableName, String taskName, int taskIndex, TableLoader tableLoader, String branch) {
Preconditions.checkNotNull(tableName, "Table name should no be null");
Preconditions.checkNotNull(taskName, "Task name should no be null");
Preconditions.checkNotNull(tableLoader, "Table loader should no be null");
Preconditions.checkNotNull(branch, "Branch should not be null");

this.tableName = tableName;
this.taskName = taskName;
this.taskIndex = taskIndex;
this.tableLoader = tableLoader;
this.branch = branch;
}

@Override
Expand Down Expand Up @@ -103,7 +108,7 @@ public void processElement(StreamRecord<DataFileRewriteRunner.ExecutedGroup> str

FlinkRewriteDataFilesCommitManager commitManager =
new FlinkRewriteDataFilesCommitManager(
table, executedGroup.snapshotId(), streamRecord.getTimestamp());
table, executedGroup.snapshotId(), streamRecord.getTimestamp(), branch);
this.commitService = commitManager.service(executedGroup.groupsPerCommit());
commitService.start();
}
Expand Down Expand Up @@ -164,8 +169,14 @@ public void close() throws IOException {
private class FlinkRewriteDataFilesCommitManager extends RewriteDataFilesCommitManager {
private final long timestamp;

FlinkRewriteDataFilesCommitManager(Table table, long startingSnapshotId, long timestamp) {
super(table, startingSnapshotId);
FlinkRewriteDataFilesCommitManager(
Table table, long startingSnapshotId, long timestamp, String branch) {
super(
table,
startingSnapshotId,
RewriteDataFiles.USE_STARTING_SEQUENCE_NUMBER_DEFAULT,
ImmutableMap.of(),
branch);
this.timestamp = timestamp;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.SerializableTable;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.actions.BinPackRewriteFilePlanner;
import org.apache.iceberg.actions.FileRewritePlan;
import org.apache.iceberg.actions.RewriteDataFiles;
Expand Down Expand Up @@ -62,6 +63,7 @@ public class DataFileRewritePlanner
private final Map<String, String> rewriterOptions;
private transient Counter errorCounter;
private final Expression filter;
private final String branch;

public DataFileRewritePlanner(
String tableName,
Expand All @@ -71,12 +73,14 @@ public DataFileRewritePlanner(
int newPartialProgressMaxCommits,
long maxRewriteBytes,
Map<String, String> rewriterOptions,
Expression filter) {
Expression filter,
String branch) {

Preconditions.checkNotNull(tableName, "Table name should no be null");
Preconditions.checkNotNull(taskName, "Task name should no be null");
Preconditions.checkNotNull(tableLoader, "Table loader should no be null");
Preconditions.checkNotNull(rewriterOptions, "Options map should no be null");
Preconditions.checkNotNull(branch, "Branch should no be null");

this.tableName = tableName;
this.taskName = taskName;
Expand All @@ -86,6 +90,7 @@ public DataFileRewritePlanner(
this.maxRewriteBytes = maxRewriteBytes;
this.rewriterOptions = rewriterOptions;
this.filter = filter;
this.branch = branch;
}

@Override
Expand All @@ -108,7 +113,8 @@ public void processElement(Trigger value, Context ctx, Collector<PlannedGroup> o
try {
SerializableTable table =
(SerializableTable) SerializableTable.copyOf(tableLoader.loadTable());
if (table.currentSnapshot() == null) {
Snapshot snapshot = table.snapshot(branch);
if (snapshot == null) {
LOG.info(
DataFileRewritePlanner.MESSAGE_PREFIX + "Nothing to plan for in an empty table",
tableName,
Expand All @@ -118,7 +124,8 @@ public void processElement(Trigger value, Context ctx, Collector<PlannedGroup> o
return;
}

BinPackRewriteFilePlanner planner = new BinPackRewriteFilePlanner(table, filter);
BinPackRewriteFilePlanner planner =
new BinPackRewriteFilePlanner(table, filter, snapshot.snapshotId(), false);
planner.init(rewriterOptions);

FileRewritePlan<RewriteDataFiles.FileGroupInfo, FileScanTask, DataFile, RewriteFileGroup>
Expand Down Expand Up @@ -164,7 +171,7 @@ public void processElement(Trigger value, Context ctx, Collector<PlannedGroup> o
taskIndex,
ctx.timestamp(),
group);
out.collect(new PlannedGroup(table, groupsPerCommit, group));
out.collect(new PlannedGroup(table, groupsPerCommit, group, branch));
}
} catch (Exception e) {
LOG.warn(
Expand All @@ -189,11 +196,14 @@ public static class PlannedGroup {
private final SerializableTable table;
private final int groupsPerCommit;
private final RewriteFileGroup group;
private final String branch;

private PlannedGroup(SerializableTable table, int groupsPerCommit, RewriteFileGroup group) {
private PlannedGroup(
SerializableTable table, int groupsPerCommit, RewriteFileGroup group, String branch) {
this.table = table;
this.groupsPerCommit = groupsPerCommit;
this.group = group;
this.branch = branch;
}

SerializableTable table() {
Expand All @@ -207,5 +217,9 @@ int groupsPerCommit() {
RewriteFileGroup group() {
return group;
}

String branch() {
return branch;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public void processElement(PlannedGroup value, Context ctx, Collector<ExecutedGr
value.group().setOutputFiles(dataFiles);
out.collect(
new ExecutedGroup(
value.table().currentSnapshot().snapshotId(),
value.table().snapshot(value.branch()).snapshotId(),
value.groupsPerCommit(),
value.group()));
if (LOG.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -769,7 +769,10 @@ IcebergSink build() {
if (flinkWriteConf.compactMode()) {
RewriteDataFilesConfig rewriteDataFilesConfig =
flinkMaintenanceConfig.createRewriteDataFilesConfig();
maintenanceTasks.add(RewriteDataFiles.builder().config(rewriteDataFilesConfig));
maintenanceTasks.add(
RewriteDataFiles.builder()
.branch(flinkWriteConf.branch())
.config(rewriteDataFilesConfig));
}

if (flinkWriteConf.expireSnapshotsMode()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,41 @@ void testRewriteWithFilter() throws Exception {
createRecord(4, "d")));
}

@Test
void testBranch() throws Exception {
Table table = createTable();
insert(table, 1, "a");
insert(table, 2, "b");

// Create branch based on above inserts
String branchName = "test-branch";
table.manageSnapshots().createBranch(branchName).commit();

// Insert another file on main only (main has 3 files, branch stays at 2)
insert(table, 3, "c");

appendRewriteDataFiles(RewriteDataFiles.builder().rewriteAll(true).branch(branchName));

runAndWaitForSuccess(infra.env(), infra.source(), infra.sink());

table.refresh();

// Branch should be compacted from 2 files to 1
assertThat(
table.snapshot(branchName).dataManifests(table.io()).stream()
.flatMap(
m ->
StreamSupport.stream(
ManifestFiles.read(m, table.io(), table.specs()).spliterator(), false))
.count())
.isEqualTo(1);
SimpleDataUtil.assertTableRecords(
table, ImmutableList.of(createRecord(1, "a"), createRecord(2, "b")), branchName);

// Main should be untouched with 3 files
assertFileNum(table, 3, 0);
}

private void appendRewriteDataFiles() {
appendRewriteDataFiles(RewriteDataFiles.builder().rewriteAll(true));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.ProcessFunctionTestHarnesses;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.Table;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.flink.TableLoader;
Expand Down Expand Up @@ -55,7 +56,8 @@ static List<DataFileRewritePlanner.PlannedGroup> planDataFileRewrite(
11,
10_000_000L,
rewriterOptions,
Expressions.alwaysTrue()))) {
Expressions.alwaysTrue(),
SnapshotRef.MAIN_BRANCH))) {
testHarness.open();

OperatorTestBase.trigger(testHarness);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.Table;
import org.apache.iceberg.flink.maintenance.api.Trigger;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
Expand Down Expand Up @@ -225,7 +226,8 @@ private OneInputStreamOperatorTestHarness<DataFileRewriteRunner.ExecutedGroup, T
OperatorTestBase.DUMMY_TABLE_NAME,
OperatorTestBase.DUMMY_TABLE_NAME,
0,
tableLoader()));
tableLoader(),
SnapshotRef.MAIN_BRANCH));
}

private static DataFileRewriteRunner.ExecutedGroup setBatchSizeToTwo(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileContent;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.Table;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.flink.maintenance.api.Trigger;
Expand Down Expand Up @@ -106,7 +107,8 @@ void testError() throws Exception {
11,
1L,
ImmutableMap.of(MIN_INPUT_FILES, "2"),
Expressions.alwaysTrue()))) {
Expressions.alwaysTrue(),
SnapshotRef.MAIN_BRANCH))) {
testHarness.open();

// Cause an exception
Expand Down Expand Up @@ -172,7 +174,8 @@ void testMaxRewriteBytes() throws Exception {
11,
maxRewriteBytes,
ImmutableMap.of(MIN_INPUT_FILES, "2"),
Expressions.alwaysTrue()))) {
Expressions.alwaysTrue(),
SnapshotRef.MAIN_BRANCH))) {
testHarness.open();

OperatorTestBase.trigger(testHarness);
Expand Down Expand Up @@ -202,6 +205,44 @@ void testMaxFileGroupCount() throws Exception {
assertThat(planWithMaxFileGroupCount).hasSize(3);
}

@Test
void testBranch() throws Exception {
Table table = createTable();
insert(table, 1, "a");
insert(table, 2, "b");

String branchName = "test-branch";
table.manageSnapshots().createBranch(branchName).commit();

// Insert more data on main only
insert(table, 3, "c");

try (OneInputStreamOperatorTestHarness<Trigger, DataFileRewritePlanner.PlannedGroup>
testHarness =
ProcessFunctionTestHarnesses.forProcessFunction(
new DataFileRewritePlanner(
OperatorTestBase.DUMMY_TABLE_NAME,
OperatorTestBase.DUMMY_TABLE_NAME,
0,
tableLoader(),
11,
10_000_000L,
ImmutableMap.of(MIN_INPUT_FILES, "2"),
Expressions.alwaysTrue(),
branchName))) {
testHarness.open();

trigger(testHarness);

assertThat(testHarness.getSideOutput(TaskResultAggregator.ERROR_STREAM)).isNull();
List<DataFileRewritePlanner.PlannedGroup> planned = testHarness.extractOutputValues();
assertThat(planned).hasSize(1);
// Branch has 2 files, main has 3
assertThat(planned.get(0).group().fileScanTasks()).hasSize(2);
assertThat(planned.get(0).branch()).isEqualTo(branchName);
}
}

void assertRewriteFileGroup(
DataFileRewritePlanner.PlannedGroup plannedGroup, Table table, Set<DataFile> files) {
assertThat(plannedGroup.table().currentSnapshot().snapshotId())
Expand Down
Loading
Loading