diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java index ffa88bb259bb..ec856e57b55d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java @@ -83,6 +83,7 @@ public abstract class AbstractFileStoreScan implements FileStoreScan { private Filter bucketFilter = null; private BiFilter totalAwareBucketFilter = null; protected ScanMode scanMode = ScanMode.ALL; + private Integer specifiedLevel = null; private Filter levelFilter = null; private Filter manifestEntryFilter = null; private Filter fileNameFilter = null; @@ -191,6 +192,13 @@ public FileStoreScan withKind(ScanMode scanMode) { return this; } + @Override + public FileStoreScan withLevel(int level) { + manifestsReader.withLevel(level); + this.specifiedLevel = level; + return this; + } + @Override public FileStoreScan withLevelFilter(Filter levelFilter) { this.levelFilter = levelFilter; @@ -524,7 +532,12 @@ private Filter createEntryRowFilter() { return false; } - if (levelFilter != null && !levelFilter.test(levelGetter.apply(row))) { + int level = levelGetter.apply(row); + if (specifiedLevel != null && level != specifiedLevel) { + return false; + } + + if (levelFilter != null && !levelFilter.test(level)) { return false; } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java index 3b77c0a1a141..e3759960f8ca 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java @@ -72,6 +72,8 @@ public interface FileStoreScan { FileStoreScan withKind(ScanMode scanMode); + FileStoreScan withLevel(int level); + FileStoreScan withLevelFilter(Filter levelFilter); FileStoreScan enableValueFilter(); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java index afcbb82b732d..d58bb797e340 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java @@ -50,6 +50,7 @@ public class ManifestsReader { private boolean onlyReadRealBuckets = false; @Nullable private Integer specifiedBucket = null; + @Nullable private Integer specifiedLevel = null; @Nullable private PartitionPredicate partitionFilter = null; public ManifestsReader( @@ -73,6 +74,11 @@ public ManifestsReader withBucket(int bucket) { return this; } + public ManifestsReader withLevel(int level) { + this.specifiedLevel = level; + return this; + } + public ManifestsReader withPartitionFilter(Predicate predicate) { this.partitionFilter = PartitionPredicate.fromPredicate(partitionType, predicate); return this; @@ -147,6 +153,15 @@ private boolean filterManifestFileMeta(ManifestFileMeta manifest) { } } + Integer minLevel = manifest.minLevel(); + Integer maxLevel = manifest.maxLevel(); + if (minLevel != null && maxLevel != null) { + if (specifiedLevel != null + && (specifiedLevel < minLevel || specifiedLevel > maxLevel)) { + return false; + } + } + if (partitionFilter == null) { return true; } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java index 0cc42088304a..a8504f3db3ee 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java @@ -80,6 +80,8 @@ public interface SnapshotReader { SnapshotReader withMode(ScanMode scanMode); + SnapshotReader withLevel(int level); + SnapshotReader withLevelFilter(Filter levelFilter); SnapshotReader enableValueFilter(); @@ -131,6 +133,7 @@ interface Plan extends TableScan.Plan { /** Result splits. */ List splits(); + @SuppressWarnings({"unchecked", "rawtypes"}) default List dataSplits() { return (List) splits(); } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java index 931e9d4eba3a..93002027e31d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java @@ -245,6 +245,12 @@ public SnapshotReader withMode(ScanMode scanMode) { return this; } + @Override + public SnapshotReader withLevel(int level) { + scan.withLevel(level); + return this; + } + @Override public SnapshotReader withLevelFilter(Filter levelFilter) { scan.withLevelFilter(levelFilter); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java index 57e18fe46237..28b10bddd28e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java @@ -335,6 +335,12 @@ public SnapshotReader withMode(ScanMode scanMode) { return this; } + @Override + public SnapshotReader withLevel(int level) { + wrapped.withLevel(level); + return this; + } + @Override public SnapshotReader withLevelFilter(Filter levelFilter) { wrapped.withLevelFilter(levelFilter); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java index fc9527c19568..1e252fc08819 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java @@ -120,9 +120,9 @@ public List primaryKeys() { @Override public SnapshotReader newSnapshotReader() { - if (wrapped.schema().primaryKeys().size() > 0) { + if (!wrapped.schema().primaryKeys().isEmpty()) { return wrapped.newSnapshotReader() - .withLevelFilter(level -> level == coreOptions().numLevels() - 1) + .withLevel(coreOptions().numLevels() - 1) .enableValueFilter(); } else { return wrapped.newSnapshotReader(); @@ -132,7 +132,7 @@ public SnapshotReader newSnapshotReader() { @Override public DataTableBatchScan newScan() { return new DataTableBatchScan( - wrapped.schema().primaryKeys().size() > 0, + !wrapped.schema().primaryKeys().isEmpty(), coreOptions(), newSnapshotReader(), DefaultValueAssigner.create(wrapped.schema())); @@ -140,7 +140,7 @@ public DataTableBatchScan newScan() { @Override public StreamDataTableScan newStreamScan() { - if (wrapped.schema().primaryKeys().size() > 0) { + if (!wrapped.schema().primaryKeys().isEmpty()) { throw new UnsupportedOperationException( "Unsupported streaming scan for read optimized table"); } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/CompactionChangelogFollowUpScannerTest.java b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/CompactionChangelogFollowUpScannerTest.java index a51d24eaf45f..6a825c9a00bb 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/CompactionChangelogFollowUpScannerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/CompactionChangelogFollowUpScannerTest.java @@ -67,7 +67,7 @@ public void testScan() throws Exception { assertThat(snapshotManager.latestSnapshotId()).isEqualTo(5); - snapshotReader.withLevelFilter(level -> level == table.coreOptions().numLevels() - 1); + snapshotReader.withLevel(table.coreOptions().numLevels() - 1); TableRead read = table.newRead(); ChangelogFollowUpScanner scanner = new ChangelogFollowUpScanner();