Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public abstract class AbstractFileStoreScan implements FileStoreScan {
private Filter<Integer> bucketFilter = null;
private BiFilter<Integer, Integer> totalAwareBucketFilter = null;
protected ScanMode scanMode = ScanMode.ALL;
private Integer specifiedLevel = null;
private Filter<Integer> levelFilter = null;
private Filter<ManifestEntry> manifestEntryFilter = null;
private Filter<String> fileNameFilter = null;
Expand Down Expand Up @@ -191,6 +192,13 @@ public FileStoreScan withKind(ScanMode scanMode) {
return this;
}

@Override
public FileStoreScan withLevel(int level) {
manifestsReader.withLevel(level);
this.specifiedLevel = level;
return this;
}

@Override
public FileStoreScan withLevelFilter(Filter<Integer> levelFilter) {
this.levelFilter = levelFilter;
Expand Down Expand Up @@ -524,7 +532,12 @@ private Filter<InternalRow> createEntryRowFilter() {
return false;
}

if (levelFilter != null && !levelFilter.test(levelGetter.apply(row))) {
int level = levelGetter.apply(row);
if (specifiedLevel != null && level != specifiedLevel) {
return false;
}

if (levelFilter != null && !levelFilter.test(level)) {
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ public interface FileStoreScan {

FileStoreScan withKind(ScanMode scanMode);

FileStoreScan withLevel(int level);

FileStoreScan withLevelFilter(Filter<Integer> levelFilter);

FileStoreScan enableValueFilter();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public class ManifestsReader {

private boolean onlyReadRealBuckets = false;
@Nullable private Integer specifiedBucket = null;
@Nullable private Integer specifiedLevel = null;
@Nullable private PartitionPredicate partitionFilter = null;

public ManifestsReader(
Expand All @@ -73,6 +74,11 @@ public ManifestsReader withBucket(int bucket) {
return this;
}

public ManifestsReader withLevel(int level) {
this.specifiedLevel = level;
return this;
}

public ManifestsReader withPartitionFilter(Predicate predicate) {
this.partitionFilter = PartitionPredicate.fromPredicate(partitionType, predicate);
return this;
Expand Down Expand Up @@ -147,6 +153,15 @@ private boolean filterManifestFileMeta(ManifestFileMeta manifest) {
}
}

Integer minLevel = manifest.minLevel();
Integer maxLevel = manifest.maxLevel();
if (minLevel != null && maxLevel != null) {
if (specifiedLevel != null
&& (specifiedLevel < minLevel || specifiedLevel > maxLevel)) {
return false;
}
}

if (partitionFilter == null) {
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ public interface SnapshotReader {

SnapshotReader withMode(ScanMode scanMode);

SnapshotReader withLevel(int level);

SnapshotReader withLevelFilter(Filter<Integer> levelFilter);

SnapshotReader enableValueFilter();
Expand Down Expand Up @@ -131,6 +133,7 @@ interface Plan extends TableScan.Plan {
/** Result splits. */
List<Split> splits();

@SuppressWarnings({"unchecked", "rawtypes"})
default List<DataSplit> dataSplits() {
return (List) splits();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,12 @@ public SnapshotReader withMode(ScanMode scanMode) {
return this;
}

@Override
public SnapshotReader withLevel(int level) {
scan.withLevel(level);
return this;
}

@Override
public SnapshotReader withLevelFilter(Filter<Integer> levelFilter) {
scan.withLevelFilter(levelFilter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,12 @@ public SnapshotReader withMode(ScanMode scanMode) {
return this;
}

@Override
public SnapshotReader withLevel(int level) {
wrapped.withLevel(level);
return this;
}

@Override
public SnapshotReader withLevelFilter(Filter<Integer> levelFilter) {
wrapped.withLevelFilter(levelFilter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,9 @@ public List<String> primaryKeys() {

@Override
public SnapshotReader newSnapshotReader() {
if (wrapped.schema().primaryKeys().size() > 0) {
if (!wrapped.schema().primaryKeys().isEmpty()) {
return wrapped.newSnapshotReader()
.withLevelFilter(level -> level == coreOptions().numLevels() - 1)
.withLevel(coreOptions().numLevels() - 1)
.enableValueFilter();
} else {
return wrapped.newSnapshotReader();
Expand All @@ -132,15 +132,15 @@ public SnapshotReader newSnapshotReader() {
@Override
public DataTableBatchScan newScan() {
return new DataTableBatchScan(
wrapped.schema().primaryKeys().size() > 0,
!wrapped.schema().primaryKeys().isEmpty(),
coreOptions(),
newSnapshotReader(),
DefaultValueAssigner.create(wrapped.schema()));
}

@Override
public StreamDataTableScan newStreamScan() {
if (wrapped.schema().primaryKeys().size() > 0) {
if (!wrapped.schema().primaryKeys().isEmpty()) {
throw new UnsupportedOperationException(
"Unsupported streaming scan for read optimized table");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public void testScan() throws Exception {

assertThat(snapshotManager.latestSnapshotId()).isEqualTo(5);

snapshotReader.withLevelFilter(level -> level == table.coreOptions().numLevels() - 1);
snapshotReader.withLevel(table.coreOptions().numLevels() - 1);
TableRead read = table.newRead();
ChangelogFollowUpScanner scanner = new ChangelogFollowUpScanner();

Expand Down
Loading