Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core: Enable column statistics filtering after planning #8803

Merged
merged 10 commits into from Nov 14, 2023
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions .palantir/revapi.yml
Expand Up @@ -866,6 +866,11 @@ acceptedBreaks:
old: "method void org.apache.iceberg.encryption.Ciphers::<init>()"
new: "method void org.apache.iceberg.encryption.Ciphers::<init>()"
justification: "Static utility class - should not have public constructor"
"1.4.0":
pvary marked this conversation as resolved.
Show resolved Hide resolved
org.apache.iceberg:iceberg-core:
- code: "java.field.serialVersionUIDChanged"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I think it should be fine, here is an idea. Java comes with serialver utility that allows us to generate the version UID prior to the change in this PR. We can use that value instead of 1L to be fully compatible. We don't modify the serialization of this class, we just missed to assign serialVersionUID. If we can recover the default value, we shouldn't worry about compatibility.

Here is the value I got locally:

cd core/build/classes/java/main
serialver org.apache.iceberg.util.SerializableMap
org.apache.iceberg.util.SerializableMap:    private static final long serialVersionUID = -3377238354349859240L;

Could you double check, @pvary? If not, we can keep it as is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checked, but even when setting the serialVersionUID to -3377238354349859240L we have a revapi failure.
Also double checked, but serialver generated the same id for the old and the new code on my mac.

Do resorted to the revapi change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I am not sure what revapi actually does. I doubt they compare actual values. I think we should be fine.

new: "field org.apache.iceberg.util.SerializableMap<K, V>.serialVersionUID"
justification: "Serialization is not be used"
apache-iceberg-0.14.0:
org.apache.iceberg:iceberg-api:
- code: "java.class.defaultSerializationChanged"
Expand Down
5 changes: 5 additions & 0 deletions api/src/main/java/org/apache/iceberg/BatchScanAdapter.java
Expand Up @@ -83,6 +83,11 @@ public BatchScan includeColumnStats() {
return new BatchScanAdapter(scan.includeColumnStats());
}

@Override
public BatchScan includeColumnStats(Collection<String> requestedColumns) {
return new BatchScanAdapter(scan.includeColumnStats(requestedColumns));
}

@Override
public BatchScan select(Collection<String> columns) {
return new BatchScanAdapter(scan.select(columns));
Expand Down
15 changes: 15 additions & 0 deletions api/src/main/java/org/apache/iceberg/ContentFile.java
Expand Up @@ -21,6 +21,7 @@
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.Set;

/**
* Superinterface of {@link DataFile} and {@link DeleteFile} that exposes common methods.
Expand Down Expand Up @@ -165,6 +166,20 @@ default Long fileSequenceNumber() {
*/
F copyWithoutStats();

/**
* Copies this file with column stats only for specific columns. Manifest readers can reuse file
* instances; use this method to copy data with stats only for specific columns when collecting
* files.
*
* @param requestedColumnIds column IDs for which to keep stats.
* @return a copy of data file, with lower bounds, upper bounds, value counts, null value counts,
* and nan value counts for only specific columns.
*/
default F copyWithStats(Set<Integer> requestedColumnIds) {
aokolnychyi marked this conversation as resolved.
Show resolved Hide resolved
throw new UnsupportedOperationException(
this.getClass().getName() + " doesn't implement copyWithStats");
}

/**
* Copies this file (potentially without file stats). Manifest readers can reuse file instances;
* use this method to copy data when collecting files from tasks.
Expand Down
14 changes: 14 additions & 0 deletions api/src/main/java/org/apache/iceberg/Scan.java
Expand Up @@ -77,6 +77,20 @@ public interface Scan<ThisT, T extends ScanTask, G extends ScanTaskGroup<T>> {
*/
ThisT includeColumnStats();

/**
* Create a new scan from this that loads the column stats for the specific columns with each data
* file.
*
* <p>Column stats include: value count, null value count, lower bounds, and upper bounds.
*
* @param requestedColumns column names for which to keep the stats.
* @return a new scan based on this that loads column stats for specific columns.
*/
default ThisT includeColumnStats(Collection<String> requestedColumns) {
throw new UnsupportedOperationException(
this.getClass().getName() + " doesn't implement includeColumnStats");
}

/**
* Create a new scan from this that will read the given data columns. This produces an expected
* schema that includes all fields that are either selected or used by this scan's filter
Expand Down
6 changes: 6 additions & 0 deletions api/src/test/java/org/apache/iceberg/TestHelpers.java
Expand Up @@ -38,6 +38,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.IntStream;
import org.apache.iceberg.expressions.BoundPredicate;
import org.apache.iceberg.expressions.BoundSetPredicate;
Expand Down Expand Up @@ -662,6 +663,11 @@ public DataFile copyWithoutStats() {
return this;
}

@Override
public DataFile copyWithStats(Set<Integer> requestedColumns) {
return this;
}

@Override
public List<Long> splitOffsets() {
return null;
Expand Down
Expand Up @@ -38,6 +38,7 @@
import org.apache.iceberg.metrics.ScanMetricsUtil;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.ContentFileUtil;
import org.apache.iceberg.util.ParallelIterable;
import org.apache.iceberg.util.TableScanUtil;
import org.apache.iceberg.util.ThreadPools;
Expand Down Expand Up @@ -368,7 +369,9 @@ private CloseableIterable<ScanTask> toFileTasks(
ScanMetricsUtil.fileTask(scanMetrics(), dataFile, deleteFiles);

return new BaseFileScanTask(
copyDataFiles ? dataFile.copy(shouldReturnColumnStats()) : dataFile,
copyDataFiles
pvary marked this conversation as resolved.
Show resolved Hide resolved
? ContentFileUtil.copy(dataFile, shouldReturnColumnStats(), columnsToKeepStats())
: dataFile,
deleteFiles,
schemaString,
specString,
Expand Down
38 changes: 29 additions & 9 deletions core/src/main/java/org/apache/iceberg/BaseFile.java
Expand Up @@ -25,6 +25,7 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import org.apache.avro.specific.SpecificData;
Expand Down Expand Up @@ -173,9 +174,11 @@ public PartitionData copy() {
* Copy constructor.
*
* @param toCopy a generic data file to copy.
* @param fullCopy whether to copy all fields or to drop column-level stats
* @param copyStats whether to copy all fields or to drop column-level stats
* @param requestedColumnIds column ids for which to keep stats. If <code>null</code> then every
* column stat is kept.
*/
BaseFile(BaseFile<F> toCopy, boolean fullCopy) {
BaseFile(BaseFile<F> toCopy, boolean copyStats, Set<Integer> requestedColumnIds) {
this.fileOrdinal = toCopy.fileOrdinal;
this.partitionSpecId = toCopy.partitionSpecId;
this.content = toCopy.content;
Expand All @@ -185,13 +188,30 @@ public PartitionData copy() {
this.partitionType = toCopy.partitionType;
this.recordCount = toCopy.recordCount;
this.fileSizeInBytes = toCopy.fileSizeInBytes;
if (fullCopy) {
this.columnSizes = SerializableMap.copyOf(toCopy.columnSizes);
this.valueCounts = SerializableMap.copyOf(toCopy.valueCounts);
this.nullValueCounts = SerializableMap.copyOf(toCopy.nullValueCounts);
this.nanValueCounts = SerializableMap.copyOf(toCopy.nanValueCounts);
this.lowerBounds = SerializableByteBufferMap.wrap(SerializableMap.copyOf(toCopy.lowerBounds));
this.upperBounds = SerializableByteBufferMap.wrap(SerializableMap.copyOf(toCopy.upperBounds));
if (copyStats) {
aokolnychyi marked this conversation as resolved.
Show resolved Hide resolved
if (requestedColumnIds == null) {
pvary marked this conversation as resolved.
Show resolved Hide resolved
this.columnSizes = SerializableMap.copyOf(toCopy.columnSizes);
this.valueCounts = SerializableMap.copyOf(toCopy.valueCounts);
this.nullValueCounts = SerializableMap.copyOf(toCopy.nullValueCounts);
this.nanValueCounts = SerializableMap.copyOf(toCopy.nanValueCounts);
this.lowerBounds =
SerializableByteBufferMap.wrap(SerializableMap.copyOf(toCopy.lowerBounds));
this.upperBounds =
SerializableByteBufferMap.wrap(SerializableMap.copyOf(toCopy.upperBounds));
} else {
this.columnSizes = SerializableMap.filteredCopyOf(toCopy.columnSizes, requestedColumnIds);
this.valueCounts = SerializableMap.filteredCopyOf(toCopy.valueCounts, requestedColumnIds);
this.nullValueCounts =
SerializableMap.filteredCopyOf(toCopy.nullValueCounts, requestedColumnIds);
this.nanValueCounts =
SerializableMap.filteredCopyOf(toCopy.nanValueCounts, requestedColumnIds);
this.lowerBounds =
SerializableByteBufferMap.wrap(
SerializableMap.filteredCopyOf(toCopy.lowerBounds, requestedColumnIds));
this.upperBounds =
SerializableByteBufferMap.wrap(
SerializableMap.filteredCopyOf(toCopy.upperBounds, requestedColumnIds));
}
} else {
this.columnSizes = null;
this.valueCounts = null;
Expand Down
Expand Up @@ -83,7 +83,8 @@ private CloseableIterable<FileScanTask> appendFilesFromSnapshots(List<Snapshot>
snapshotIds.contains(manifestEntry.snapshotId())
&& manifestEntry.status() == ManifestEntry.Status.ADDED)
.specsById(table().specs())
.ignoreDeleted();
.ignoreDeleted()
.columnsToKeepStats(context().columnsToKeepStats());
pvary marked this conversation as resolved.
Show resolved Hide resolved

if (context().ignoreResiduals()) {
manifestGroup = manifestGroup.ignoreResiduals();
Expand Down
Expand Up @@ -78,7 +78,8 @@ protected CloseableIterable<ChangelogScanTask> doPlanFiles(
.select(scanColumns())
.filterData(filter())
.filterManifestEntries(entry -> changelogSnapshotIds.contains(entry.snapshotId()))
.ignoreExisting();
.ignoreExisting()
.columnsToKeepStats(context().columnsToKeepStats());
pvary marked this conversation as resolved.
Show resolved Hide resolved

if (shouldIgnoreResiduals()) {
manifestGroup = manifestGroup.ignoreResiduals();
Expand Down
18 changes: 18 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseScan.java
Expand Up @@ -24,6 +24,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import org.apache.iceberg.expressions.Binder;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
Expand Down Expand Up @@ -121,6 +122,10 @@ protected boolean shouldReturnColumnStats() {
return context().returnColumnStats();
}

protected Set<Integer> columnsToKeepStats() {
return context().columnsToKeepStats();
}

protected boolean shouldIgnoreResiduals() {
return context().ignoreResiduals();
}
Expand Down Expand Up @@ -165,6 +170,19 @@ public ThisT includeColumnStats() {
return newRefinedScan(table, schema, context.shouldReturnColumnStats(true));
}

@Override
public ThisT includeColumnStats(Collection<String> requestedColumns) {
return newRefinedScan(
table,
schema,
context
.shouldReturnColumnStats(true)
aokolnychyi marked this conversation as resolved.
Show resolved Hide resolved
.columnsToKeepStats(
requestedColumns.stream()
.map(c -> schema.findField(c).fieldId())
.collect(Collectors.toSet())));
}

@Override
public ThisT select(Collection<String> columns) {
return newRefinedScan(table, schema, context.selectColumns(columns));
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/java/org/apache/iceberg/DataScan.java
Expand Up @@ -55,7 +55,8 @@ protected ManifestGroup newManifestGroup(
.filterData(filter())
.specsById(table().specs())
.scanMetrics(scanMetrics())
.ignoreDeleted();
.ignoreDeleted()
.columnsToKeepStats(context().columnsToKeepStats());
pvary marked this conversation as resolved.
Show resolved Hide resolved

if (shouldIgnoreResiduals()) {
manifestGroup = manifestGroup.ignoreResiduals();
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/java/org/apache/iceberg/DataTableScan.java
Expand Up @@ -76,7 +76,8 @@ public CloseableIterable<FileScanTask> doPlanFiles() {
.filterData(filter())
.specsById(table().specs())
.scanMetrics(scanMetrics())
.ignoreDeleted();
.ignoreDeleted()
.columnsToKeepStats(context().columnsToKeepStats());
pvary marked this conversation as resolved.
Show resolved Hide resolved

if (shouldIgnoreResiduals()) {
manifestGroup = manifestGroup.ignoreResiduals();
Expand Down
19 changes: 14 additions & 5 deletions core/src/main/java/org/apache/iceberg/GenericDataFile.java
Expand Up @@ -20,6 +20,7 @@

import java.nio.ByteBuffer;
import java.util.List;
import java.util.Set;
import org.apache.avro.Schema;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -66,23 +67,31 @@ class GenericDataFile extends BaseFile<DataFile> implements DataFile {
* Copy constructor.
*
* @param toCopy a generic data file to copy.
* @param fullCopy whether to copy all fields or to drop column-level stats
* @param copyStats whether to copy all fields or to drop column-level stats.
* @param requestedColumnIds column ids for which to keep stats. If <code>null</code> then every
* column stat is kept.
*/
private GenericDataFile(GenericDataFile toCopy, boolean fullCopy) {
super(toCopy, fullCopy);
private GenericDataFile(
GenericDataFile toCopy, boolean copyStats, Set<Integer> requestedColumnIds) {
super(toCopy, copyStats, requestedColumnIds);
}

/** Constructor for Java serialization. */
GenericDataFile() {}

@Override
public DataFile copyWithoutStats() {
return new GenericDataFile(this, false /* drop stats */);
return new GenericDataFile(this, false /* drop stats */, null);
}

@Override
public DataFile copyWithStats(Set<Integer> requestedColumnIds) {
return new GenericDataFile(this, true, requestedColumnIds);
}

@Override
public DataFile copy() {
return new GenericDataFile(this, true /* full copy */);
return new GenericDataFile(this, true /* full copy */, null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You may consider overloading the constructor so that you don't have to pass an extra null here or adding the comment for the second argument (we have a comment for true but not null).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this usage is straightforward, and adding a new constructor would not help too much.
So I did not apply this change

}

@Override
Expand Down
19 changes: 14 additions & 5 deletions core/src/main/java/org/apache/iceberg/GenericDeleteFile.java
Expand Up @@ -20,6 +20,7 @@

import java.nio.ByteBuffer;
import java.util.List;
import java.util.Set;
import org.apache.avro.Schema;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -67,23 +68,31 @@ class GenericDeleteFile extends BaseFile<DeleteFile> implements DeleteFile {
* Copy constructor.
*
* @param toCopy a generic data file to copy.
* @param fullCopy whether to copy all fields or to drop column-level stats
* @param copyStats whether to copy all fields or to drop column-level stats.
* @param requestedColumnIds column ids for which to keep stats. If <code>null</code> then every
* column stat is kept.
*/
private GenericDeleteFile(GenericDeleteFile toCopy, boolean fullCopy) {
super(toCopy, fullCopy);
private GenericDeleteFile(
GenericDeleteFile toCopy, boolean copyStats, Set<Integer> requestedColumnIds) {
super(toCopy, copyStats, requestedColumnIds);
}

/** Constructor for Java serialization. */
GenericDeleteFile() {}

@Override
public DeleteFile copyWithoutStats() {
return new GenericDeleteFile(this, false /* drop stats */);
return new GenericDeleteFile(this, false /* drop stats */, null);
}

@Override
public DeleteFile copyWithStats(Set<Integer> requestedColumnIds) {
return new GenericDeleteFile(this, true, requestedColumnIds);
}

@Override
public DeleteFile copy() {
return new GenericDeleteFile(this, true /* full copy */);
return new GenericDeleteFile(this, true /* full copy */, null);
pvary marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
Expand Down
Expand Up @@ -102,7 +102,8 @@ public CloseableIterable<FileScanTask> planFiles() {
snapshotIds.contains(manifestEntry.snapshotId())
&& manifestEntry.status() == ManifestEntry.Status.ADDED)
.specsById(table().specs())
.ignoreDeleted();
.ignoreDeleted()
.columnsToKeepStats(context().columnsToKeepStats());
pvary marked this conversation as resolved.
Show resolved Hide resolved

if (shouldIgnoreResiduals()) {
manifestGroup = manifestGroup.ignoreResiduals();
Expand Down