diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java index 9aeee75b1464..f03f33a3fd81 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java @@ -24,6 +24,7 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.util.function.SerializableSupplier; import org.apache.iceberg.SnapshotRef; import org.apache.iceberg.actions.BinPackRewriteFilePlanner; import org.apache.iceberg.actions.SizeBasedFileRewritePlanner; @@ -59,7 +60,7 @@ public static class Builder extends MaintenanceTaskBuilder rewriteOptions = Maps.newHashMapWithExpectedSize(6); private long maxRewriteBytes = Long.MAX_VALUE; - private Expression filter = Expressions.alwaysTrue(); + private SerializableSupplier filterSupplier = Expressions::alwaysTrue; private String branch = SnapshotRef.MAIN_BRANCH; @Override @@ -214,9 +215,32 @@ public Builder maxFilesToRewrite(int maxFilesToRewrite) { * * @param newFilter the filter expression to apply * @return this for method chaining + * @deprecated will be removed in 1.12.0. Use {@link #filter(SerializableSupplier)} instead */ + @Deprecated public Builder filter(Expression newFilter) { - this.filter = newFilter; + this.filterSupplier = () -> newFilter; + return this; + } + + /** + * A user-provided supplier of a filter expression that determines which files are considered by + * the rewrite strategy. + * + *

The supplier is evaluated by the planner on every compaction trigger, allowing a fresh + * filter to be produced for each compaction run. + * + *

This is particularly useful for time-relative filters. For example, a supplier such as + * {@code () -> Expressions.greaterThanOrEqual("ts", + * LocalDateTime.now(ZoneOffset.UTC).minus(Duration.ofDays(3)).toString())} ensures that each + * compaction rewrites files from the last 3 days relative to the time the compaction is + * planned, rather than relative to when the job was started. + * + * @param newFilterSupplier the supplier providing the filter expression to apply + * @return this for method chaining + */ + public Builder filter(SerializableSupplier newFilterSupplier) { + this.filterSupplier = newFilterSupplier; return this; } @@ -276,7 +300,7 @@ DataStream append(DataStream trigger) { partialProgressEnabled ? partialProgressMaxCommits : 1, maxRewriteBytes, rewriteOptions, - filter, + filterSupplier, branch)) .name(operatorName(PLANNER_TASK_NAME)) .uid(PLANNER_TASK_NAME + uidSuffix()) diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewritePlanner.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewritePlanner.java index feb2dd26c807..9c3b44b9d544 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewritePlanner.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewritePlanner.java @@ -26,6 +26,7 @@ import org.apache.flink.metrics.Counter; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.util.Collector; +import org.apache.flink.util.function.SerializableSupplier; import org.apache.iceberg.DataFile; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.SerializableTable; @@ -62,8 +63,8 @@ public class DataFileRewritePlanner private final long maxRewriteBytes; private final Map rewriterOptions; private transient Counter errorCounter; - private final Expression filter; private final String branch; + private final SerializableSupplier filterSupplier; public DataFileRewritePlanner( String tableName, @@ -73,7 +74,7 @@ public DataFileRewritePlanner( int newPartialProgressMaxCommits, long maxRewriteBytes, Map rewriterOptions, - Expression filter, + SerializableSupplier filterSupplier, String branch) { Preconditions.checkNotNull(tableName, "Table name should no be null"); @@ -89,8 +90,8 @@ public DataFileRewritePlanner( this.partialProgressMaxCommits = newPartialProgressMaxCommits; this.maxRewriteBytes = maxRewriteBytes; this.rewriterOptions = rewriterOptions; - this.filter = filter; this.branch = branch; + this.filterSupplier = filterSupplier; } @Override @@ -125,7 +126,7 @@ public void processElement(Trigger value, Context ctx, Collector o } BinPackRewriteFilePlanner planner = - new BinPackRewriteFilePlanner(table, filter, snapshot.snapshotId(), false); + new BinPackRewriteFilePlanner(table, filterSupplier.get(), snapshot.snapshotId(), false); planner.init(rewriterOptions); FileRewritePlan diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java index bb53b5265655..97b8b6786545 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java @@ -29,6 +29,7 @@ import static org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.REMOVED_DATA_FILE_SIZE_METRIC; import static org.assertj.core.api.Assertions.assertThat; +import java.time.Instant; import java.util.List; import java.util.stream.StreamSupport; import org.apache.flink.streaming.api.graph.StreamGraphGenerator; @@ -529,6 +530,57 @@ void testRewriteWithFilter() throws Exception { createRecord(4, "d"))); } + /** + * By verifying that the creation time of the data content in the builder is later than the + * creation time of the filter condition — if the filter condition is actually created in the + * planner, then all files can be compacted; otherwise, not all files can be compacted — we can + * confirm whether the filter condition is actually created in the planner. + */ + @Test + void testRewriteWithFilterSupplier() throws Exception { + Table table = createTable(); + + appendRewriteDataFiles( + RewriteDataFiles.builder() + .parallelism(2) + .deleteFileThreshold(10) + .targetFileSizeBytes(1_000_000L) + .maxFileGroupSizeBytes(10_000_000L) + .maxFileSizeBytes(2_000_000L) + .minFileSizeBytes(500_000L) + .minInputFiles(2) + // Rewrite data files where id is less than current timestamp in planner + .filter(() -> Expressions.lessThan("id", (int) Instant.now().getEpochSecond())) + .partialProgressEnabled(true) + .partialProgressMaxCommits(1) + .maxRewriteBytes(100_000L) + .rewriteAll(false)); + + insert(table, 1, "a"); + insert(table, 2, "b"); + insert(table, 3, "c"); + + int epochSecond = (int) Instant.now().getEpochSecond(); + insert(table, epochSecond, "d"); + + assertFileNum(table, 4, 0); + + Thread.sleep(1_000L); + runAndWaitForSuccess(infra.env(), infra.source(), infra.sink()); + + // There is four files, only id is less than current timestamp will be rewritten. so expect 2 + // files. + assertFileNum(table, 1, 0); + + SimpleDataUtil.assertTableRecords( + table, + ImmutableList.of( + createRecord(1, "a"), + createRecord(2, "b"), + createRecord(3, "c"), + createRecord(epochSecond, "d"))); + } + @Test void testBranch() throws Exception { Table table = createTable(); diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java index 5eecc5a803d3..93291e8cc29a 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java @@ -24,7 +24,10 @@ import java.io.File; import java.io.IOException; import java.nio.file.Path; +import java.time.LocalDateTime; +import java.time.ZoneOffset; import java.util.List; +import java.util.concurrent.TimeUnit; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.MetricOptions; import org.apache.flink.core.execution.JobClient; @@ -79,6 +82,12 @@ public class OperatorTestBase { ImmutableMap.of(), ImmutableSet.of(SimpleDataUtil.SCHEMA.columns().get(0).fieldId())); + private static final Schema SCHEMA_WITH_TIMESTAMP_WITHOUT_ZONE = + new Schema( + Types.NestedField.optional(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "data", Types.StringType.get()), + Types.NestedField.optional(3, "ts", Types.TimestampType.withoutZone())); + protected static final String UID_SUFFIX = "UID-Dummy"; protected static final String SLOT_SHARING_GROUP = "SlotSharingGroup"; protected static final TriggerLockFactory LOCK_FACTORY = new MemoryLockFactory(); @@ -142,6 +151,21 @@ protected static Table createTable(int formatVersion) { "100000")); } + protected static Table createTableWithTimestampWithoutZone() { + return CATALOG_EXTENSION + .catalog() + .createTable( + TestFixtures.TABLE_IDENTIFIER, + SCHEMA_WITH_TIMESTAMP_WITHOUT_ZONE, + PartitionSpec.builderFor(SCHEMA_WITH_TIMESTAMP_WITHOUT_ZONE).identity("ts").build(), + null, + ImmutableMap.of( + TableProperties.FORMAT_VERSION, + "2", + "flink.max-continuous-empty-commits", + "100000")); + } + protected static Table createTableWithDelete() { return createTableWithDelete(2); } @@ -194,6 +218,20 @@ protected void insert(Table table, Integer id, String data, String extra) throws table.refresh(); } + protected void insertWithTimestampWithoutZone( + Table table, Integer id, String data, LocalDateTime ts) throws IOException { + GenericRecord record = GenericRecord.create(SCHEMA_WITH_TIMESTAMP_WITHOUT_ZONE); + record.setField("id", id); + record.setField("data", data); + record.setField("ts", ts); + long tsMicros = + TimeUnit.SECONDS.toMicros(ts.toEpochSecond(ZoneOffset.UTC)) + + TimeUnit.NANOSECONDS.toMicros(ts.getNano()); + new GenericAppenderHelper(table, FileFormat.PARQUET, warehouseDir) + .appendToTable(TestHelpers.Row.of(tsMicros), Lists.newArrayList(record)); + table.refresh(); + } + /** * For the same identifier column id this methods simulate the following row operations: *

  • add an equality delete on oldData diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/RewriteUtil.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/RewriteUtil.java index 8a8a2fa194d4..7b8f638b7e2f 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/RewriteUtil.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/RewriteUtil.java @@ -57,7 +57,7 @@ static List planDataFileRewrite( 11, 10_000_000L, rewriterOptions, - Expressions.alwaysTrue(), + Expressions::alwaysTrue, SnapshotRef.MAIN_BRANCH))) { testHarness.open(); diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java index 16d524f05cf7..8300df8c94eb 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java @@ -24,6 +24,9 @@ import static org.apache.iceberg.flink.maintenance.operator.RewriteUtil.planDataFileRewrite; import static org.assertj.core.api.Assertions.assertThat; +import java.time.Duration; +import java.time.LocalDateTime; +import java.time.ZoneOffset; import java.util.List; import java.util.Set; import java.util.stream.Collectors; @@ -107,7 +110,7 @@ void testError() throws Exception { 11, 1L, ImmutableMap.of(MIN_INPUT_FILES, "2"), - Expressions.alwaysTrue(), + Expressions::alwaysTrue, SnapshotRef.MAIN_BRANCH))) { testHarness.open(); @@ -174,7 +177,7 @@ void testMaxRewriteBytes() throws Exception { 11, maxRewriteBytes, ImmutableMap.of(MIN_INPUT_FILES, "2"), - Expressions.alwaysTrue(), + Expressions::alwaysTrue, SnapshotRef.MAIN_BRANCH))) { testHarness.open(); @@ -228,7 +231,7 @@ void testBranch() throws Exception { 11, 10_000_000L, ImmutableMap.of(MIN_INPUT_FILES, "2"), - Expressions.alwaysTrue(), + Expressions::alwaysTrue, branchName))) { testHarness.open(); @@ -243,6 +246,46 @@ void testBranch() throws Exception { } } + @Test + void testFilterSupplierWithTimestamp() throws Exception { + Table table = createTableWithTimestampWithoutZone(); + + LocalDateTime oldTs = LocalDateTime.now().minusDays(10); + insertWithTimestampWithoutZone(table, 1, "old_a", oldTs); + insertWithTimestampWithoutZone(table, 2, "old_b", oldTs); + + LocalDateTime recentTs = LocalDateTime.now().minusHours(1); + insertWithTimestampWithoutZone(table, 3, "new_a", recentTs); + insertWithTimestampWithoutZone(table, 4, "new_b", recentTs); + + try (OneInputStreamOperatorTestHarness + testHarness = + ProcessFunctionTestHarnesses.forProcessFunction( + new DataFileRewritePlanner( + OperatorTestBase.DUMMY_TABLE_NAME, + OperatorTestBase.DUMMY_TABLE_NAME, + 0, + tableLoader(), + 11, + 10_000_000L, + ImmutableMap.of(MIN_INPUT_FILES, "2"), + () -> + Expressions.greaterThanOrEqual( + "ts", + LocalDateTime.now(ZoneOffset.UTC).minus(Duration.ofDays(3)).toString()), + SnapshotRef.MAIN_BRANCH))) { + testHarness.open(); + + trigger(testHarness); + + assertThat(testHarness.getSideOutput(TaskResultAggregator.ERROR_STREAM)).isNull(); + List planned = testHarness.extractOutputValues(); + + assertThat(planned).hasSize(1); + assertThat(planned.get(0).group().fileScanTasks()).hasSize(2); + } + } + void assertRewriteFileGroup( DataFileRewritePlanner.PlannedGroup plannedGroup, Table table, Set files) { assertThat(plannedGroup.table().currentSnapshot().snapshotId()) diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteRunner.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteRunner.java index 9202a1df92af..62b29e7c017a 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteRunner.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteRunner.java @@ -309,7 +309,7 @@ void testSplitSize() throws Exception { "2", TARGET_FILE_SIZE_BYTES, String.valueOf(targetFileSize)), - Expressions.alwaysTrue(), + Expressions::alwaysTrue, SnapshotRef.MAIN_BRANCH))) { testHarness.open(); diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java index 9aeee75b1464..f03f33a3fd81 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java @@ -24,6 +24,7 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.util.function.SerializableSupplier; import org.apache.iceberg.SnapshotRef; import org.apache.iceberg.actions.BinPackRewriteFilePlanner; import org.apache.iceberg.actions.SizeBasedFileRewritePlanner; @@ -59,7 +60,7 @@ public static class Builder extends MaintenanceTaskBuilder rewriteOptions = Maps.newHashMapWithExpectedSize(6); private long maxRewriteBytes = Long.MAX_VALUE; - private Expression filter = Expressions.alwaysTrue(); + private SerializableSupplier filterSupplier = Expressions::alwaysTrue; private String branch = SnapshotRef.MAIN_BRANCH; @Override @@ -214,9 +215,32 @@ public Builder maxFilesToRewrite(int maxFilesToRewrite) { * * @param newFilter the filter expression to apply * @return this for method chaining + * @deprecated will be removed in 1.12.0. Use {@link #filter(SerializableSupplier)} instead */ + @Deprecated public Builder filter(Expression newFilter) { - this.filter = newFilter; + this.filterSupplier = () -> newFilter; + return this; + } + + /** + * A user-provided supplier of a filter expression that determines which files are considered by + * the rewrite strategy. + * + *

    The supplier is evaluated by the planner on every compaction trigger, allowing a fresh + * filter to be produced for each compaction run. + * + *

    This is particularly useful for time-relative filters. For example, a supplier such as + * {@code () -> Expressions.greaterThanOrEqual("ts", + * LocalDateTime.now(ZoneOffset.UTC).minus(Duration.ofDays(3)).toString())} ensures that each + * compaction rewrites files from the last 3 days relative to the time the compaction is + * planned, rather than relative to when the job was started. + * + * @param newFilterSupplier the supplier providing the filter expression to apply + * @return this for method chaining + */ + public Builder filter(SerializableSupplier newFilterSupplier) { + this.filterSupplier = newFilterSupplier; return this; } @@ -276,7 +300,7 @@ DataStream append(DataStream trigger) { partialProgressEnabled ? partialProgressMaxCommits : 1, maxRewriteBytes, rewriteOptions, - filter, + filterSupplier, branch)) .name(operatorName(PLANNER_TASK_NAME)) .uid(PLANNER_TASK_NAME + uidSuffix()) diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewritePlanner.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewritePlanner.java index a9360374df28..b78c602c647f 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewritePlanner.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewritePlanner.java @@ -26,6 +26,7 @@ import org.apache.flink.metrics.Counter; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.util.Collector; +import org.apache.flink.util.function.SerializableSupplier; import org.apache.iceberg.DataFile; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.SerializableTable; @@ -62,8 +63,8 @@ public class DataFileRewritePlanner private final long maxRewriteBytes; private final Map rewriterOptions; private transient Counter errorCounter; - private final Expression filter; private final String branch; + private final SerializableSupplier filterSupplier; public DataFileRewritePlanner( String tableName, @@ -73,7 +74,7 @@ public DataFileRewritePlanner( int newPartialProgressMaxCommits, long maxRewriteBytes, Map rewriterOptions, - Expression filter, + SerializableSupplier filterSupplier, String branch) { Preconditions.checkNotNull(tableName, "Table name should no be null"); @@ -89,8 +90,8 @@ public DataFileRewritePlanner( this.partialProgressMaxCommits = newPartialProgressMaxCommits; this.maxRewriteBytes = maxRewriteBytes; this.rewriterOptions = rewriterOptions; - this.filter = filter; this.branch = branch; + this.filterSupplier = filterSupplier; } @Override @@ -125,7 +126,7 @@ public void processElement(Trigger value, Context ctx, Collector o } BinPackRewriteFilePlanner planner = - new BinPackRewriteFilePlanner(table, filter, snapshot.snapshotId(), false); + new BinPackRewriteFilePlanner(table, filterSupplier.get(), snapshot.snapshotId(), false); planner.init(rewriterOptions); FileRewritePlan diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java index bb53b5265655..97b8b6786545 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java @@ -29,6 +29,7 @@ import static org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.REMOVED_DATA_FILE_SIZE_METRIC; import static org.assertj.core.api.Assertions.assertThat; +import java.time.Instant; import java.util.List; import java.util.stream.StreamSupport; import org.apache.flink.streaming.api.graph.StreamGraphGenerator; @@ -529,6 +530,57 @@ void testRewriteWithFilter() throws Exception { createRecord(4, "d"))); } + /** + * By verifying that the creation time of the data content in the builder is later than the + * creation time of the filter condition — if the filter condition is actually created in the + * planner, then all files can be compacted; otherwise, not all files can be compacted — we can + * confirm whether the filter condition is actually created in the planner. + */ + @Test + void testRewriteWithFilterSupplier() throws Exception { + Table table = createTable(); + + appendRewriteDataFiles( + RewriteDataFiles.builder() + .parallelism(2) + .deleteFileThreshold(10) + .targetFileSizeBytes(1_000_000L) + .maxFileGroupSizeBytes(10_000_000L) + .maxFileSizeBytes(2_000_000L) + .minFileSizeBytes(500_000L) + .minInputFiles(2) + // Rewrite data files where id is less than current timestamp in planner + .filter(() -> Expressions.lessThan("id", (int) Instant.now().getEpochSecond())) + .partialProgressEnabled(true) + .partialProgressMaxCommits(1) + .maxRewriteBytes(100_000L) + .rewriteAll(false)); + + insert(table, 1, "a"); + insert(table, 2, "b"); + insert(table, 3, "c"); + + int epochSecond = (int) Instant.now().getEpochSecond(); + insert(table, epochSecond, "d"); + + assertFileNum(table, 4, 0); + + Thread.sleep(1_000L); + runAndWaitForSuccess(infra.env(), infra.source(), infra.sink()); + + // There is four files, only id is less than current timestamp will be rewritten. so expect 2 + // files. + assertFileNum(table, 1, 0); + + SimpleDataUtil.assertTableRecords( + table, + ImmutableList.of( + createRecord(1, "a"), + createRecord(2, "b"), + createRecord(3, "c"), + createRecord(epochSecond, "d"))); + } + @Test void testBranch() throws Exception { Table table = createTable(); diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java index b9422a63d646..6dd6cda84f27 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java @@ -24,7 +24,10 @@ import java.io.File; import java.io.IOException; import java.nio.file.Path; +import java.time.LocalDateTime; +import java.time.ZoneOffset; import java.util.List; +import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.MetricOptions; @@ -79,6 +82,12 @@ public class OperatorTestBase { ImmutableMap.of(), ImmutableSet.of(SimpleDataUtil.SCHEMA.columns().get(0).fieldId())); + private static final Schema SCHEMA_WITH_TIMESTAMP_WITHOUT_ZONE = + new Schema( + Types.NestedField.optional(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "data", Types.StringType.get()), + Types.NestedField.optional(3, "ts", Types.TimestampType.withoutZone())); + protected static final String UID_SUFFIX = "UID-Dummy"; protected static final String SLOT_SHARING_GROUP = "SlotSharingGroup"; protected static final TriggerLockFactory LOCK_FACTORY = new MemoryLockFactory(); @@ -142,6 +151,21 @@ protected static Table createTable(int formatVersion) { "100000")); } + protected static Table createTableWithTimestampWithoutZone() { + return CATALOG_EXTENSION + .catalog() + .createTable( + TestFixtures.TABLE_IDENTIFIER, + SCHEMA_WITH_TIMESTAMP_WITHOUT_ZONE, + PartitionSpec.builderFor(SCHEMA_WITH_TIMESTAMP_WITHOUT_ZONE).identity("ts").build(), + null, + ImmutableMap.of( + TableProperties.FORMAT_VERSION, + "2", + "flink.max-continuous-empty-commits", + "100000")); + } + protected static Table createTableWithDelete() { return createTableWithDelete(2); } @@ -194,6 +218,20 @@ protected void insert(Table table, Integer id, String data, String extra) throws table.refresh(); } + protected void insertWithTimestampWithoutZone( + Table table, Integer id, String data, LocalDateTime ts) throws IOException { + GenericRecord record = GenericRecord.create(SCHEMA_WITH_TIMESTAMP_WITHOUT_ZONE); + record.setField("id", id); + record.setField("data", data); + record.setField("ts", ts); + long tsMicros = + TimeUnit.SECONDS.toMicros(ts.toEpochSecond(ZoneOffset.UTC)) + + TimeUnit.NANOSECONDS.toMicros(ts.getNano()); + new GenericAppenderHelper(table, FileFormat.PARQUET, warehouseDir) + .appendToTable(TestHelpers.Row.of(tsMicros), Lists.newArrayList(record)); + table.refresh(); + } + /** * For the same identifier column id this methods simulate the following row operations: *

  • add an equality delete on oldData diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/RewriteUtil.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/RewriteUtil.java index 8a8a2fa194d4..7b8f638b7e2f 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/RewriteUtil.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/RewriteUtil.java @@ -57,7 +57,7 @@ static List planDataFileRewrite( 11, 10_000_000L, rewriterOptions, - Expressions.alwaysTrue(), + Expressions::alwaysTrue, SnapshotRef.MAIN_BRANCH))) { testHarness.open(); diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java index 16d524f05cf7..8300df8c94eb 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java @@ -24,6 +24,9 @@ import static org.apache.iceberg.flink.maintenance.operator.RewriteUtil.planDataFileRewrite; import static org.assertj.core.api.Assertions.assertThat; +import java.time.Duration; +import java.time.LocalDateTime; +import java.time.ZoneOffset; import java.util.List; import java.util.Set; import java.util.stream.Collectors; @@ -107,7 +110,7 @@ void testError() throws Exception { 11, 1L, ImmutableMap.of(MIN_INPUT_FILES, "2"), - Expressions.alwaysTrue(), + Expressions::alwaysTrue, SnapshotRef.MAIN_BRANCH))) { testHarness.open(); @@ -174,7 +177,7 @@ void testMaxRewriteBytes() throws Exception { 11, maxRewriteBytes, ImmutableMap.of(MIN_INPUT_FILES, "2"), - Expressions.alwaysTrue(), + Expressions::alwaysTrue, SnapshotRef.MAIN_BRANCH))) { testHarness.open(); @@ -228,7 +231,7 @@ void testBranch() throws Exception { 11, 10_000_000L, ImmutableMap.of(MIN_INPUT_FILES, "2"), - Expressions.alwaysTrue(), + Expressions::alwaysTrue, branchName))) { testHarness.open(); @@ -243,6 +246,46 @@ void testBranch() throws Exception { } } + @Test + void testFilterSupplierWithTimestamp() throws Exception { + Table table = createTableWithTimestampWithoutZone(); + + LocalDateTime oldTs = LocalDateTime.now().minusDays(10); + insertWithTimestampWithoutZone(table, 1, "old_a", oldTs); + insertWithTimestampWithoutZone(table, 2, "old_b", oldTs); + + LocalDateTime recentTs = LocalDateTime.now().minusHours(1); + insertWithTimestampWithoutZone(table, 3, "new_a", recentTs); + insertWithTimestampWithoutZone(table, 4, "new_b", recentTs); + + try (OneInputStreamOperatorTestHarness + testHarness = + ProcessFunctionTestHarnesses.forProcessFunction( + new DataFileRewritePlanner( + OperatorTestBase.DUMMY_TABLE_NAME, + OperatorTestBase.DUMMY_TABLE_NAME, + 0, + tableLoader(), + 11, + 10_000_000L, + ImmutableMap.of(MIN_INPUT_FILES, "2"), + () -> + Expressions.greaterThanOrEqual( + "ts", + LocalDateTime.now(ZoneOffset.UTC).minus(Duration.ofDays(3)).toString()), + SnapshotRef.MAIN_BRANCH))) { + testHarness.open(); + + trigger(testHarness); + + assertThat(testHarness.getSideOutput(TaskResultAggregator.ERROR_STREAM)).isNull(); + List planned = testHarness.extractOutputValues(); + + assertThat(planned).hasSize(1); + assertThat(planned.get(0).group().fileScanTasks()).hasSize(2); + } + } + void assertRewriteFileGroup( DataFileRewritePlanner.PlannedGroup plannedGroup, Table table, Set files) { assertThat(plannedGroup.table().currentSnapshot().snapshotId()) diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteRunner.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteRunner.java index 9202a1df92af..62b29e7c017a 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteRunner.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteRunner.java @@ -309,7 +309,7 @@ void testSplitSize() throws Exception { "2", TARGET_FILE_SIZE_BYTES, String.valueOf(targetFileSize)), - Expressions.alwaysTrue(), + Expressions::alwaysTrue, SnapshotRef.MAIN_BRANCH))) { testHarness.open();