Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.util.function.SerializableSupplier;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.actions.BinPackRewriteFilePlanner;
import org.apache.iceberg.actions.SizeBasedFileRewritePlanner;
Expand Down Expand Up @@ -59,7 +60,7 @@ public static class Builder extends MaintenanceTaskBuilder<RewriteDataFiles.Buil
org.apache.iceberg.actions.RewriteDataFiles.PARTIAL_PROGRESS_MAX_COMMITS_DEFAULT;
private final Map<String, String> rewriteOptions = Maps.newHashMapWithExpectedSize(6);
private long maxRewriteBytes = Long.MAX_VALUE;
private Expression filter = Expressions.alwaysTrue();
private SerializableSupplier<Expression> filterSupplier = Expressions::alwaysTrue;
private String branch = SnapshotRef.MAIN_BRANCH;

@Override
Expand Down Expand Up @@ -214,9 +215,32 @@ public Builder maxFilesToRewrite(int maxFilesToRewrite) {
*
* @param newFilter the filter expression to apply
* @return this for method chaining
* @deprecated will be removed in 1.12.0. Use {@link #filter(SerializableSupplier)} instead
*/
@Deprecated
public Builder filter(Expression newFilter) {
this.filter = newFilter;
this.filterSupplier = () -> newFilter;
return this;
}

/**
* A user-provided supplier of a filter expression that determines which files are considered by
* the rewrite strategy.
*
* <p>The supplier is evaluated by the planner on every compaction trigger, allowing a fresh
* filter to be produced for each compaction run.
*
* <p>This is particularly useful for time-relative filters. For example, a supplier such as
* {@code () -> Expressions.greaterThanOrEqual("ts",
* LocalDateTime.now(ZoneOffset.UTC).minus(Duration.ofDays(3)).toString())} ensures that each
* compaction rewrites files from the last 3 days relative to the time the compaction is
* planned, rather than relative to when the job was started.
*
* @param newFilterSupplier the supplier providing the filter expression to apply
* @return this for method chaining
*/
public Builder filter(SerializableSupplier<Expression> newFilterSupplier) {
this.filterSupplier = newFilterSupplier;
return this;
}

Expand Down Expand Up @@ -276,7 +300,7 @@ DataStream<TaskResult> append(DataStream<Trigger> trigger) {
partialProgressEnabled ? partialProgressMaxCommits : 1,
maxRewriteBytes,
rewriteOptions,
filter,
filterSupplier,
branch))
.name(operatorName(PLANNER_TASK_NAME))
.uid(PLANNER_TASK_NAME + uidSuffix())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.metrics.Counter;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.function.SerializableSupplier;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.SerializableTable;
Expand Down Expand Up @@ -62,8 +63,8 @@ public class DataFileRewritePlanner
private final long maxRewriteBytes;
private final Map<String, String> rewriterOptions;
private transient Counter errorCounter;
private final Expression filter;
private final String branch;
private final SerializableSupplier<Expression> filterSupplier;

public DataFileRewritePlanner(
String tableName,
Expand All @@ -73,7 +74,7 @@ public DataFileRewritePlanner(
int newPartialProgressMaxCommits,
long maxRewriteBytes,
Map<String, String> rewriterOptions,
Expression filter,
SerializableSupplier<Expression> filterSupplier,
String branch) {

Preconditions.checkNotNull(tableName, "Table name should no be null");
Expand All @@ -89,8 +90,8 @@ public DataFileRewritePlanner(
this.partialProgressMaxCommits = newPartialProgressMaxCommits;
this.maxRewriteBytes = maxRewriteBytes;
this.rewriterOptions = rewriterOptions;
this.filter = filter;
this.branch = branch;
this.filterSupplier = filterSupplier;
}

@Override
Expand Down Expand Up @@ -125,7 +126,7 @@ public void processElement(Trigger value, Context ctx, Collector<PlannedGroup> o
}

BinPackRewriteFilePlanner planner =
new BinPackRewriteFilePlanner(table, filter, snapshot.snapshotId(), false);
new BinPackRewriteFilePlanner(table, filterSupplier.get(), snapshot.snapshotId(), false);
planner.init(rewriterOptions);

FileRewritePlan<RewriteDataFiles.FileGroupInfo, FileScanTask, DataFile, RewriteFileGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import static org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.REMOVED_DATA_FILE_SIZE_METRIC;
import static org.assertj.core.api.Assertions.assertThat;

import java.time.Instant;
import java.util.List;
import java.util.stream.StreamSupport;
import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
Expand Down Expand Up @@ -529,6 +530,57 @@ void testRewriteWithFilter() throws Exception {
createRecord(4, "d")));
}

/**
* By verifying that the creation time of the data content in the builder is later than the
* creation time of the filter condition — if the filter condition is actually created in the
* planner, then all files can be compacted; otherwise, not all files can be compacted — we can
* confirm whether the filter condition is actually created in the planner.
*/
@Test
void testRewriteWithFilterSupplier() throws Exception {
Table table = createTable();

appendRewriteDataFiles(
RewriteDataFiles.builder()
.parallelism(2)
.deleteFileThreshold(10)
.targetFileSizeBytes(1_000_000L)
.maxFileGroupSizeBytes(10_000_000L)
.maxFileSizeBytes(2_000_000L)
.minFileSizeBytes(500_000L)
.minInputFiles(2)
// Rewrite data files where id is less than current timestamp in planner
.filter(() -> Expressions.lessThan("id", (int) Instant.now().getEpochSecond()))
.partialProgressEnabled(true)
.partialProgressMaxCommits(1)
.maxRewriteBytes(100_000L)
.rewriteAll(false));

insert(table, 1, "a");
insert(table, 2, "b");
insert(table, 3, "c");

int epochSecond = (int) Instant.now().getEpochSecond();
insert(table, epochSecond, "d");

assertFileNum(table, 4, 0);

Thread.sleep(1_000L);
runAndWaitForSuccess(infra.env(), infra.source(), infra.sink());

// There is four files, only id is less than current timestamp will be rewritten. so expect 2
// files.
assertFileNum(table, 1, 0);

SimpleDataUtil.assertTableRecords(
table,
ImmutableList.of(
createRecord(1, "a"),
createRecord(2, "b"),
createRecord(3, "c"),
createRecord(epochSecond, "d")));
}

@Test
void testBranch() throws Exception {
Table table = createTable();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@
import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.core.execution.JobClient;
Expand Down Expand Up @@ -79,6 +82,12 @@ public class OperatorTestBase {
ImmutableMap.of(),
ImmutableSet.of(SimpleDataUtil.SCHEMA.columns().get(0).fieldId()));

private static final Schema SCHEMA_WITH_TIMESTAMP_WITHOUT_ZONE =
new Schema(
Types.NestedField.optional(1, "id", Types.IntegerType.get()),
Types.NestedField.optional(2, "data", Types.StringType.get()),
Types.NestedField.optional(3, "ts", Types.TimestampType.withoutZone()));

protected static final String UID_SUFFIX = "UID-Dummy";
protected static final String SLOT_SHARING_GROUP = "SlotSharingGroup";
protected static final TriggerLockFactory LOCK_FACTORY = new MemoryLockFactory();
Expand Down Expand Up @@ -142,6 +151,21 @@ protected static Table createTable(int formatVersion) {
"100000"));
}

protected static Table createTableWithTimestampWithoutZone() {
return CATALOG_EXTENSION
.catalog()
.createTable(
TestFixtures.TABLE_IDENTIFIER,
SCHEMA_WITH_TIMESTAMP_WITHOUT_ZONE,
PartitionSpec.builderFor(SCHEMA_WITH_TIMESTAMP_WITHOUT_ZONE).identity("ts").build(),
null,
ImmutableMap.of(
TableProperties.FORMAT_VERSION,
"2",
"flink.max-continuous-empty-commits",
"100000"));
}

protected static Table createTableWithDelete() {
return createTableWithDelete(2);
}
Expand Down Expand Up @@ -194,6 +218,20 @@ protected void insert(Table table, Integer id, String data, String extra) throws
table.refresh();
}

protected void insertWithTimestampWithoutZone(
Table table, Integer id, String data, LocalDateTime ts) throws IOException {
GenericRecord record = GenericRecord.create(SCHEMA_WITH_TIMESTAMP_WITHOUT_ZONE);
record.setField("id", id);
record.setField("data", data);
record.setField("ts", ts);
long tsMicros =
TimeUnit.SECONDS.toMicros(ts.toEpochSecond(ZoneOffset.UTC))
+ TimeUnit.NANOSECONDS.toMicros(ts.getNano());
new GenericAppenderHelper(table, FileFormat.PARQUET, warehouseDir)
.appendToTable(TestHelpers.Row.of(tsMicros), Lists.newArrayList(record));
table.refresh();
}

/**
* For the same identifier column id this methods simulate the following row operations: <tr>
* <li>add an equality delete on oldData
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ static List<DataFileRewritePlanner.PlannedGroup> planDataFileRewrite(
11,
10_000_000L,
rewriterOptions,
Expressions.alwaysTrue(),
Expressions::alwaysTrue,
SnapshotRef.MAIN_BRANCH))) {
testHarness.open();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
import static org.apache.iceberg.flink.maintenance.operator.RewriteUtil.planDataFileRewrite;
import static org.assertj.core.api.Assertions.assertThat;

import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -107,7 +110,7 @@ void testError() throws Exception {
11,
1L,
ImmutableMap.of(MIN_INPUT_FILES, "2"),
Expressions.alwaysTrue(),
Expressions::alwaysTrue,
SnapshotRef.MAIN_BRANCH))) {
testHarness.open();

Expand Down Expand Up @@ -174,7 +177,7 @@ void testMaxRewriteBytes() throws Exception {
11,
maxRewriteBytes,
ImmutableMap.of(MIN_INPUT_FILES, "2"),
Expressions.alwaysTrue(),
Expressions::alwaysTrue,
SnapshotRef.MAIN_BRANCH))) {
testHarness.open();

Expand Down Expand Up @@ -228,7 +231,7 @@ void testBranch() throws Exception {
11,
10_000_000L,
ImmutableMap.of(MIN_INPUT_FILES, "2"),
Expressions.alwaysTrue(),
Expressions::alwaysTrue,
branchName))) {
testHarness.open();

Expand All @@ -243,6 +246,46 @@ void testBranch() throws Exception {
}
}

@Test
void testFilterSupplierWithTimestamp() throws Exception {
Table table = createTableWithTimestampWithoutZone();

LocalDateTime oldTs = LocalDateTime.now().minusDays(10);
insertWithTimestampWithoutZone(table, 1, "old_a", oldTs);
insertWithTimestampWithoutZone(table, 2, "old_b", oldTs);

LocalDateTime recentTs = LocalDateTime.now().minusHours(1);
insertWithTimestampWithoutZone(table, 3, "new_a", recentTs);
insertWithTimestampWithoutZone(table, 4, "new_b", recentTs);

try (OneInputStreamOperatorTestHarness<Trigger, DataFileRewritePlanner.PlannedGroup>
testHarness =
ProcessFunctionTestHarnesses.forProcessFunction(
new DataFileRewritePlanner(
OperatorTestBase.DUMMY_TABLE_NAME,
OperatorTestBase.DUMMY_TABLE_NAME,
0,
tableLoader(),
11,
10_000_000L,
ImmutableMap.of(MIN_INPUT_FILES, "2"),
() ->
Expressions.greaterThanOrEqual(
"ts",
LocalDateTime.now(ZoneOffset.UTC).minus(Duration.ofDays(3)).toString()),
SnapshotRef.MAIN_BRANCH))) {
testHarness.open();

trigger(testHarness);

assertThat(testHarness.getSideOutput(TaskResultAggregator.ERROR_STREAM)).isNull();
List<DataFileRewritePlanner.PlannedGroup> planned = testHarness.extractOutputValues();

assertThat(planned).hasSize(1);
assertThat(planned.get(0).group().fileScanTasks()).hasSize(2);
}
}

void assertRewriteFileGroup(
DataFileRewritePlanner.PlannedGroup plannedGroup, Table table, Set<DataFile> files) {
assertThat(plannedGroup.table().currentSnapshot().snapshotId())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ void testSplitSize() throws Exception {
"2",
TARGET_FILE_SIZE_BYTES,
String.valueOf(targetFileSize)),
Expressions.alwaysTrue(),
Expressions::alwaysTrue,
SnapshotRef.MAIN_BRANCH))) {
testHarness.open();

Expand Down
Loading
Loading