Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core: Enable column statistics filtering after planning #8803

Merged
merged 10 commits into from Nov 14, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions api/src/main/java/org/apache/iceberg/BatchScanAdapter.java
Expand Up @@ -83,6 +83,11 @@ public BatchScan includeColumnStats() {
return new BatchScanAdapter(scan.includeColumnStats());
}

@Override
public BatchScan includeColumnStats(Collection<String> requestedColumns) {
return new BatchScanAdapter(scan.includeColumnStats(requestedColumns));
}

@Override
public BatchScan select(Collection<String> columns) {
return new BatchScanAdapter(scan.select(columns));
Expand Down
15 changes: 15 additions & 0 deletions api/src/main/java/org/apache/iceberg/ContentFile.java
Expand Up @@ -21,6 +21,7 @@
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.Set;

/**
* Superinterface of {@link DataFile} and {@link DeleteFile} that exposes common methods.
Expand Down Expand Up @@ -165,6 +166,20 @@ default Long fileSequenceNumber() {
*/
F copyWithoutStats();

/**
* Copies this file with only specific column stats. Manifest readers can reuse file instances;
pvary marked this conversation as resolved.
Show resolved Hide resolved
* use this method to copy data and only copy specific stats when collecting files.
*
* @param requestedColumnIds column ids for which to keep stats. If <code>null</code> then every
pvary marked this conversation as resolved.
Show resolved Hide resolved
* column stat is kept.
* @return a copy of this data file, with stats lower bounds, upper bounds, value counts, null
pvary marked this conversation as resolved.
Show resolved Hide resolved
pvary marked this conversation as resolved.
Show resolved Hide resolved
* value counts, and nan value counts for only specific columns.
*/
default F copyWithStats(Set<Integer> requestedColumnIds) {
aokolnychyi marked this conversation as resolved.
Show resolved Hide resolved
throw new UnsupportedOperationException(
this.getClass().getName() + " doesn't implement copyWithStats");
}

/**
* Copies this file (potentially without file stats). Manifest readers can reuse file instances;
* use this method to copy data when collecting files from tasks.
Expand Down
15 changes: 15 additions & 0 deletions api/src/main/java/org/apache/iceberg/Scan.java
Expand Up @@ -77,6 +77,21 @@ public interface Scan<ThisT, T extends ScanTask, G extends ScanTaskGroup<T>> {
*/
ThisT includeColumnStats();

/**
* Create a new scan from this that loads the column stats for the specific columns with each data
* file.
*
* <p>Column stats include: value count, null value count, lower bounds, and upper bounds.
*
* @param requestedColumns column names for which to keep the stats. If <code>null</code> then all
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure how I feel about supporting null here. Let me think.
Also, won't the current implementation throw an exception if we pass null?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should not support null here as a valid value independently of what we will do in ContentFile. I would drop the doc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would guess that the situation is the similar with the other @Nullable TableScanContext attributes, like:

  • snapshotId
  • selectedColumns
  • projectedSchema
  • fromSnapshotId
  • toSnapshotId
  • branch

We have an undefined default behaviour which could be archived with not setting the values, or setting them to null. The only difference here is that we define this default behaviour. For consistency's sake we can remove the comment, but the behaviour will remain the same.

Do I miss something?

Thanks for the detailed review!

* column stats will be kept, when {@link #includeColumnStats()} is set.
* @return a new scan based on this that loads column stats for specific columns.
*/
default ThisT includeColumnStats(Collection<String> requestedColumns) {
throw new UnsupportedOperationException(
this.getClass().getName() + " doesn't implement includeColumnStats");
}

/**
* Create a new scan from this that will read the given data columns. This produces an expected
* schema that includes all fields that are either selected or used by this scan's filter
Expand Down
6 changes: 6 additions & 0 deletions api/src/test/java/org/apache/iceberg/TestHelpers.java
Expand Up @@ -38,6 +38,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.IntStream;
import org.apache.iceberg.expressions.BoundPredicate;
import org.apache.iceberg.expressions.BoundSetPredicate;
Expand Down Expand Up @@ -662,6 +663,11 @@ public DataFile copyWithoutStats() {
return this;
}

@Override
public DataFile copyWithStats(Set<Integer> requestedColumns) {
return this;
}

@Override
public List<Long> splitOffsets() {
return null;
Expand Down
Expand Up @@ -38,6 +38,7 @@
import org.apache.iceberg.metrics.ScanMetricsUtil;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.ContentFileUtil;
import org.apache.iceberg.util.ParallelIterable;
import org.apache.iceberg.util.TableScanUtil;
import org.apache.iceberg.util.ThreadPools;
Expand Down Expand Up @@ -368,7 +369,9 @@ private CloseableIterable<ScanTask> toFileTasks(
ScanMetricsUtil.fileTask(scanMetrics(), dataFile, deleteFiles);

return new BaseFileScanTask(
copyDataFiles ? dataFile.copy(shouldReturnColumnStats()) : dataFile,
copyDataFiles
pvary marked this conversation as resolved.
Show resolved Hide resolved
? ContentFileUtil.copy(dataFile, shouldReturnColumnStats(), columnsToKeepStats())
: dataFile,
deleteFiles,
schemaString,
specString,
Expand Down
53 changes: 44 additions & 9 deletions core/src/main/java/org/apache/iceberg/BaseFile.java
Expand Up @@ -25,6 +25,9 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import org.apache.avro.specific.SpecificData;
Expand Down Expand Up @@ -173,9 +176,11 @@ public PartitionData copy() {
* Copy constructor.
*
* @param toCopy a generic data file to copy.
* @param fullCopy whether to copy all fields or to drop column-level stats
* @param copyStats whether to copy all fields or to drop column-level stats
* @param requestedColumnIds column ids for which to keep stats. If <code>null</code> then every
* column stat is kept.
*/
BaseFile(BaseFile<F> toCopy, boolean fullCopy) {
BaseFile(BaseFile<F> toCopy, boolean copyStats, Set<Integer> requestedColumnIds) {
this.fileOrdinal = toCopy.fileOrdinal;
this.partitionSpecId = toCopy.partitionSpecId;
this.content = toCopy.content;
Expand All @@ -185,13 +190,25 @@ public PartitionData copy() {
this.partitionType = toCopy.partitionType;
this.recordCount = toCopy.recordCount;
this.fileSizeInBytes = toCopy.fileSizeInBytes;
if (fullCopy) {
this.columnSizes = SerializableMap.copyOf(toCopy.columnSizes);
this.valueCounts = SerializableMap.copyOf(toCopy.valueCounts);
this.nullValueCounts = SerializableMap.copyOf(toCopy.nullValueCounts);
this.nanValueCounts = SerializableMap.copyOf(toCopy.nanValueCounts);
this.lowerBounds = SerializableByteBufferMap.wrap(SerializableMap.copyOf(toCopy.lowerBounds));
this.upperBounds = SerializableByteBufferMap.wrap(SerializableMap.copyOf(toCopy.upperBounds));
if (copyStats) {
aokolnychyi marked this conversation as resolved.
Show resolved Hide resolved
this.columnSizes =
filterColumnsStats(toCopy.columnSizes, requestedColumnIds, SerializableMap::copyOf);
this.valueCounts =
filterColumnsStats(toCopy.valueCounts, requestedColumnIds, SerializableMap::copyOf);
this.nullValueCounts =
filterColumnsStats(toCopy.nullValueCounts, requestedColumnIds, SerializableMap::copyOf);
this.nanValueCounts =
filterColumnsStats(toCopy.nanValueCounts, requestedColumnIds, SerializableMap::copyOf);
this.lowerBounds =
filterColumnsStats(
toCopy.lowerBounds,
requestedColumnIds,
m -> SerializableByteBufferMap.wrap(SerializableMap.copyOf(m)));
this.upperBounds =
filterColumnsStats(
toCopy.upperBounds,
requestedColumnIds,
m -> SerializableByteBufferMap.wrap(SerializableMap.copyOf(m)));
} else {
this.columnSizes = null;
this.valueCounts = null;
Expand Down Expand Up @@ -504,6 +521,24 @@ private static Map<Integer, ByteBuffer> toReadableByteBufferMap(Map<Integer, Byt
}
}

private static <TypeT> Map<Integer, TypeT> filterColumnsStats(
stevenzwu marked this conversation as resolved.
Show resolved Hide resolved
Map<Integer, TypeT> map,
Set<Integer> columnIds,
Function<Map<Integer, TypeT>, Map<Integer, TypeT>> copyFunction) {
if (columnIds == null || columnIds.isEmpty()) {
return copyFunction.apply(map);
}

if (map == null) {
stevenzwu marked this conversation as resolved.
Show resolved Hide resolved
return null;
}

return copyFunction.apply(
columnIds.stream()
.filter(map::containsKey)
.collect(Collectors.toMap(Function.identity(), map::get)));
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
Expand Down
Expand Up @@ -83,7 +83,8 @@ private CloseableIterable<FileScanTask> appendFilesFromSnapshots(List<Snapshot>
snapshotIds.contains(manifestEntry.snapshotId())
&& manifestEntry.status() == ManifestEntry.Status.ADDED)
.specsById(table().specs())
.ignoreDeleted();
.ignoreDeleted()
.columnsToKeepStats(context().columnsToKeepStats());
pvary marked this conversation as resolved.
Show resolved Hide resolved

if (context().ignoreResiduals()) {
manifestGroup = manifestGroup.ignoreResiduals();
Expand Down
Expand Up @@ -78,7 +78,8 @@ protected CloseableIterable<ChangelogScanTask> doPlanFiles(
.select(scanColumns())
.filterData(filter())
.filterManifestEntries(entry -> changelogSnapshotIds.contains(entry.snapshotId()))
.ignoreExisting();
.ignoreExisting()
.columnsToKeepStats(context().columnsToKeepStats());
pvary marked this conversation as resolved.
Show resolved Hide resolved

if (shouldIgnoreResiduals()) {
manifestGroup = manifestGroup.ignoreResiduals();
Expand Down
18 changes: 18 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseScan.java
Expand Up @@ -24,6 +24,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import org.apache.iceberg.expressions.Binder;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
Expand Down Expand Up @@ -121,6 +122,10 @@ protected boolean shouldReturnColumnStats() {
return context().returnColumnStats();
}

protected Set<Integer> columnsToKeepStats() {
return context().columnsToKeepStats();
}

protected boolean shouldIgnoreResiduals() {
return context().ignoreResiduals();
}
Expand Down Expand Up @@ -165,6 +170,19 @@ public ThisT includeColumnStats() {
return newRefinedScan(table, schema, context.shouldReturnColumnStats(true));
}

@Override
public ThisT includeColumnStats(Collection<String> requestedColumns) {
return newRefinedScan(
table,
schema,
context
.shouldReturnColumnStats(true)
aokolnychyi marked this conversation as resolved.
Show resolved Hide resolved
.columnsToKeepStats(
requestedColumns.stream()
.map(c -> schema.findField(c).fieldId())
.collect(Collectors.toSet())));
}

@Override
public ThisT select(Collection<String> columns) {
return newRefinedScan(table, schema, context.selectColumns(columns));
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/java/org/apache/iceberg/DataScan.java
Expand Up @@ -55,7 +55,8 @@ protected ManifestGroup newManifestGroup(
.filterData(filter())
.specsById(table().specs())
.scanMetrics(scanMetrics())
.ignoreDeleted();
.ignoreDeleted()
.columnsToKeepStats(context().columnsToKeepStats());
pvary marked this conversation as resolved.
Show resolved Hide resolved

if (shouldIgnoreResiduals()) {
manifestGroup = manifestGroup.ignoreResiduals();
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/java/org/apache/iceberg/DataTableScan.java
Expand Up @@ -76,7 +76,8 @@ public CloseableIterable<FileScanTask> doPlanFiles() {
.filterData(filter())
.specsById(table().specs())
.scanMetrics(scanMetrics())
.ignoreDeleted();
.ignoreDeleted()
.columnsToKeepStats(context().columnsToKeepStats());
pvary marked this conversation as resolved.
Show resolved Hide resolved

if (shouldIgnoreResiduals()) {
manifestGroup = manifestGroup.ignoreResiduals();
Expand Down
19 changes: 14 additions & 5 deletions core/src/main/java/org/apache/iceberg/GenericDataFile.java
Expand Up @@ -20,6 +20,7 @@

import java.nio.ByteBuffer;
import java.util.List;
import java.util.Set;
import org.apache.avro.Schema;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -66,23 +67,31 @@ class GenericDataFile extends BaseFile<DataFile> implements DataFile {
* Copy constructor.
*
* @param toCopy a generic data file to copy.
* @param fullCopy whether to copy all fields or to drop column-level stats
* @param copyStats whether to copy all fields or to drop column-level stats.
* @param requestedColumnIds column ids for which to keep stats. If <code>null</code> then every
* column stat is kept.
*/
private GenericDataFile(GenericDataFile toCopy, boolean fullCopy) {
super(toCopy, fullCopy);
private GenericDataFile(
GenericDataFile toCopy, boolean copyStats, Set<Integer> requestedColumnIds) {
super(toCopy, copyStats, requestedColumnIds);
}

/** Constructor for Java serialization. */
GenericDataFile() {}

@Override
public DataFile copyWithoutStats() {
return new GenericDataFile(this, false /* drop stats */);
return new GenericDataFile(this, false /* drop stats */, null);
}

@Override
public DataFile copyWithStats(Set<Integer> requestedColumnIds) {
return new GenericDataFile(this, true, requestedColumnIds);
}

@Override
public DataFile copy() {
return new GenericDataFile(this, true /* full copy */);
return new GenericDataFile(this, true /* full copy */, null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You may consider overloading the constructor so that you don't have to pass an extra null here or adding the comment for the second argument (we have a comment for true but not null).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this usage is straightforward, and adding a new constructor would not help too much.
So I did not apply this change

}

@Override
Expand Down
19 changes: 14 additions & 5 deletions core/src/main/java/org/apache/iceberg/GenericDeleteFile.java
Expand Up @@ -20,6 +20,7 @@

import java.nio.ByteBuffer;
import java.util.List;
import java.util.Set;
import org.apache.avro.Schema;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -67,23 +68,31 @@ class GenericDeleteFile extends BaseFile<DeleteFile> implements DeleteFile {
* Copy constructor.
*
* @param toCopy a generic data file to copy.
* @param fullCopy whether to copy all fields or to drop column-level stats
* @param copyStats whether to copy all fields or to drop column-level stats.
* @param requestedColumnIds column ids for which to keep stats. If <code>null</code> then every
* column stat is kept.
*/
private GenericDeleteFile(GenericDeleteFile toCopy, boolean fullCopy) {
super(toCopy, fullCopy);
private GenericDeleteFile(
GenericDeleteFile toCopy, boolean copyStats, Set<Integer> requestedColumnIds) {
super(toCopy, copyStats, requestedColumnIds);
}

/** Constructor for Java serialization. */
GenericDeleteFile() {}

@Override
public DeleteFile copyWithoutStats() {
return new GenericDeleteFile(this, false /* drop stats */);
return new GenericDeleteFile(this, false /* drop stats */, null);
}

@Override
public DeleteFile copyWithStats(Set<Integer> requestedColumnIds) {
return new GenericDeleteFile(this, true, requestedColumnIds);
}

@Override
public DeleteFile copy() {
return new GenericDeleteFile(this, true /* full copy */);
return new GenericDeleteFile(this, true /* full copy */, null);
pvary marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
Expand Down
Expand Up @@ -102,7 +102,8 @@ public CloseableIterable<FileScanTask> planFiles() {
snapshotIds.contains(manifestEntry.snapshotId())
&& manifestEntry.status() == ManifestEntry.Status.ADDED)
.specsById(table().specs())
.ignoreDeleted();
.ignoreDeleted()
.columnsToKeepStats(context().columnsToKeepStats());
pvary marked this conversation as resolved.
Show resolved Hide resolved

if (shouldIgnoreResiduals()) {
manifestGroup = manifestGroup.ignoreResiduals();
Expand Down