Skip to content

Commit

Permalink
Stevent's comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Peter Vary committed Oct 19, 2023
1 parent 5bb425d commit 8c183a1
Show file tree
Hide file tree
Showing 21 changed files with 128 additions and 121 deletions.
3 changes: 2 additions & 1 deletion api/src/main/java/org/apache/iceberg/BatchScanAdapter.java
Expand Up @@ -19,6 +19,7 @@
package org.apache.iceberg;

import java.util.Collection;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.io.CloseableIterable;
Expand Down Expand Up @@ -84,7 +85,7 @@ public BatchScan includeColumnStats() {
}

@Override
public BatchScan includeColumnStats(Collection<Integer> columns) {
public BatchScan includeColumnStats(Set<Integer> columns) {
return new BatchScanAdapter(scan.includeColumnStats(columns));
}

Expand Down
31 changes: 5 additions & 26 deletions api/src/main/java/org/apache/iceberg/ContentFile.java
Expand Up @@ -19,9 +19,9 @@
package org.apache.iceberg;

import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;

/**
* Superinterface of {@link DataFile} and {@link DeleteFile} that exposes common methods.
Expand Down Expand Up @@ -168,13 +168,14 @@ default Long fileSequenceNumber() {

/**
* Copies this file with only specific column stats. Manifest readers can reuse file instances;
* use this method to copy data and only copy specific stats when collecting files.
* use this method to copy data and only copy specific stats when collecting files. If the
* columnsToKeepStats set is empty or <code>null</code>, then all column stats will be kept.
*
* @param statsToKeep the collection of the column ids for the columns which stats are kept
* @param columnsToKeepStats the set of the column ids for the columns which stats are kept.
* @return a copy of this data file, with stats lower bounds, upper bounds, value counts, null
* value counts, and nan value counts for only specific columns.
*/
default F copyWithSpecificStats(Collection<Integer> statsToKeep) {
default F copyWithStats(Set<Integer> columnsToKeepStats) {
throw new UnsupportedOperationException(
this.getClass().getName() + " doesn't implement copyWithSpecificStats");
}
Expand All @@ -191,26 +192,4 @@ default F copyWithSpecificStats(Collection<Integer> statsToKeep) {
default F copy(boolean withStats) {
return withStats ? copy() : copyWithoutStats();
}

/**
* Copies this file (potentially with or without specific column stats). Manifest readers can
* reuse file instances; use this method to copy data when collecting files from tasks.
*
* @param withStats Will copy this file without file stats if set to <code>false</code>.
* @param statsToKeep Will keep stats only for these columns. Not used if <code>withStats</code>
* is set to <code>false</code>.
* @return a copy of this data file. If "withStats" is set to <code>false</code> the file will not
* contain lower bounds, upper bounds, value counts, null value counts, or nan value counts.
* If "withStats" is set to <code>true</code> and the "statsToKeep" is not empty then only
* specific column stats will be kept.
*/
default F copy(boolean withStats, Collection<Integer> statsToKeep) {
if (withStats) {
return statsToKeep != null && !statsToKeep.isEmpty()
? copyWithSpecificStats(statsToKeep)
: copy();
} else {
return copyWithoutStats();
}
}
}
6 changes: 4 additions & 2 deletions api/src/main/java/org/apache/iceberg/Scan.java
Expand Up @@ -19,6 +19,7 @@
package org.apache.iceberg;

import java.util.Collection;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.io.CloseableIterable;
Expand Down Expand Up @@ -79,14 +80,15 @@ public interface Scan<ThisT, T extends ScanTask, G extends ScanTaskGroup<T>> {

/**
* Create a new scan from this that loads the column stats for the specific columns with each data
* file.
* file. If the columns set is empty or <code>null</code> then all column stats will be kept, if
* {@link #includeColumnStats()} is set.
*
* <p>Column stats include: value count, null value count, lower bounds, and upper bounds.
*
* @param columns column ids from the table's schema
* @return a new scan based on this that loads column stats for specific columns.
*/
default ThisT includeColumnStats(Collection<Integer> columns) {
default ThisT includeColumnStats(Set<Integer> columns) {
throw new UnsupportedOperationException(
this.getClass().getName() + " doesn't implement includeColumnStats");
}
Expand Down
46 changes: 46 additions & 0 deletions api/src/main/java/org/apache/iceberg/util/ContentFileUtil.java
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.util;

import java.util.Set;
import org.apache.iceberg.ContentFile;

public class ContentFileUtil {
private ContentFileUtil() {}

/**
* Copies the {@link ContentFile} with the specific stat settings.
*
* @param file a generic data file to copy.
* @param withStats whether to keep any stats
* @param statsToKeep a set of column ids to keep stats. If empty or <code>null</code> then every
* column stat is kept.
* @return The copied file
*/
public static <F extends ContentFile<K>, K> K copy(
F file, boolean withStats, Set<Integer> statsToKeep) {
if (withStats) {
return statsToKeep != null && !statsToKeep.isEmpty()
? file.copyWithStats(statsToKeep)
: file.copy();
} else {
return file.copyWithoutStats();
}
}
}
4 changes: 2 additions & 2 deletions api/src/test/java/org/apache/iceberg/TestHelpers.java
Expand Up @@ -36,9 +36,9 @@
import java.lang.invoke.SerializedLambda;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.IntStream;
import org.apache.iceberg.expressions.BoundPredicate;
import org.apache.iceberg.expressions.BoundSetPredicate;
Expand Down Expand Up @@ -664,7 +664,7 @@ public DataFile copyWithoutStats() {
}

@Override
public DataFile copyWithSpecificStats(Collection<Integer> statsToKeep) {
public DataFile copyWithStats(Set<Integer> statsToKeep) {
return this;
}

Expand Down
Expand Up @@ -38,6 +38,7 @@
import org.apache.iceberg.metrics.ScanMetricsUtil;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.ContentFileUtil;
import org.apache.iceberg.util.ParallelIterable;
import org.apache.iceberg.util.TableScanUtil;
import org.apache.iceberg.util.ThreadPools;
Expand Down Expand Up @@ -369,7 +370,8 @@ private CloseableIterable<ScanTask> toFileTasks(

return new BaseFileScanTask(
copyDataFiles
? dataFile.copy(shouldReturnColumnStats(), columnStatsToInclude())
? ContentFileUtil.copy(
dataFile, shouldReturnColumnStats(), columnsToIncludeStats())
: dataFile,
deleteFiles,
schemaString,
Expand Down
55 changes: 16 additions & 39 deletions core/src/main/java/org/apache/iceberg/BaseFile.java
Expand Up @@ -21,11 +21,11 @@
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import org.apache.avro.specific.SpecificData;
Expand Down Expand Up @@ -176,9 +176,10 @@ public PartitionData copy() {
*
* @param toCopy a generic data file to copy.
* @param fullCopy whether to copy all fields or to drop column-level stats
* @param statsToKeep the collection of the column ids for the columns which stats are kept
* @param statsToKeep a set of column ids to keep stats. If empty or <code>null</code> then every
* column stat is kept.
*/
BaseFile(BaseFile<F> toCopy, boolean fullCopy, Collection<Integer> statsToKeep) {
BaseFile(BaseFile<F> toCopy, boolean fullCopy, Set<Integer> statsToKeep) {
this.fileOrdinal = toCopy.fileOrdinal;
this.partitionSpecId = toCopy.partitionSpecId;
this.content = toCopy.content;
Expand All @@ -189,23 +190,12 @@ public PartitionData copy() {
this.recordCount = toCopy.recordCount;
this.fileSizeInBytes = toCopy.fileSizeInBytes;
if (fullCopy) {
if (statsToKeep == null || statsToKeep.isEmpty()) {
this.columnSizes = SerializableMap.copyOf(toCopy.columnSizes);
this.valueCounts = SerializableMap.copyOf(toCopy.valueCounts);
this.nullValueCounts = SerializableMap.copyOf(toCopy.nullValueCounts);
this.nanValueCounts = SerializableMap.copyOf(toCopy.nanValueCounts);
this.lowerBounds =
SerializableByteBufferMap.wrap(SerializableMap.copyOf(toCopy.lowerBounds));
this.upperBounds =
SerializableByteBufferMap.wrap(SerializableMap.copyOf(toCopy.upperBounds));
} else {
this.columnSizes = filteredLongMap(toCopy.columnSizes, statsToKeep);
this.valueCounts = filteredLongMap(toCopy.valueCounts, statsToKeep);
this.nullValueCounts = filteredLongMap(toCopy.nullValueCounts, statsToKeep);
this.nanValueCounts = filteredLongMap(toCopy.nanValueCounts, statsToKeep);
this.lowerBounds = filteredByteBufferMap(toCopy.lowerBounds, statsToKeep);
this.upperBounds = filteredByteBufferMap(toCopy.upperBounds, statsToKeep);
}
this.columnSizes = filterColumnsStats(toCopy.columnSizes, statsToKeep);
this.valueCounts = filterColumnsStats(toCopy.valueCounts, statsToKeep);
this.nullValueCounts = filterColumnsStats(toCopy.nullValueCounts, statsToKeep);
this.nanValueCounts = filterColumnsStats(toCopy.nanValueCounts, statsToKeep);
this.lowerBounds = filterColumnsStats(toCopy.lowerBounds, statsToKeep);
this.upperBounds = filterColumnsStats(toCopy.upperBounds, statsToKeep);
} else {
this.columnSizes = null;
this.valueCounts = null;
Expand Down Expand Up @@ -518,32 +508,19 @@ private static Map<Integer, ByteBuffer> toReadableByteBufferMap(Map<Integer, Byt
}
}

private static Map<Integer, Long> filteredLongMap(
Map<Integer, Long> map, Collection<Integer> columnIds) {
if (map == null) {
return null;
}

Map<Integer, Long> filtered = Maps.newHashMapWithExpectedSize(columnIds.size());
for (Integer columnId : columnIds) {
Long value = map.get(columnId);
if (value != null) {
filtered.put(columnId, value);
}
private static <TypeT> Map<Integer, TypeT> filterColumnsStats(
Map<Integer, TypeT> map, Set<Integer> columnIds) {
if (columnIds == null || columnIds.isEmpty()) {
return SerializableMap.copyOf(map);
}

return SerializableMap.copyOf(filtered);
}

private static Map<Integer, ByteBuffer> filteredByteBufferMap(
Map<Integer, ByteBuffer> map, Collection<Integer> columnIds) {
if (map == null) {
return null;
}

Map<Integer, ByteBuffer> filtered = Maps.newHashMapWithExpectedSize(columnIds.size());
Map<Integer, TypeT> filtered = Maps.newHashMapWithExpectedSize(columnIds.size());
for (Integer columnId : columnIds) {
ByteBuffer value = map.get(columnId);
TypeT value = map.get(columnId);
if (value != null) {
filtered.put(columnId, value);
}
Expand Down
Expand Up @@ -84,7 +84,7 @@ private CloseableIterable<FileScanTask> appendFilesFromSnapshots(List<Snapshot>
&& manifestEntry.status() == ManifestEntry.Status.ADDED)
.specsById(table().specs())
.ignoreDeleted()
.columnStatsToKeep(context().returnColumnStatsToInclude());
.columnStatsToKeep(context().columnsToIncludeStats());

if (context().ignoreResiduals()) {
manifestGroup = manifestGroup.ignoreResiduals();
Expand Down
Expand Up @@ -79,7 +79,7 @@ protected CloseableIterable<ChangelogScanTask> doPlanFiles(
.filterData(filter())
.filterManifestEntries(entry -> changelogSnapshotIds.contains(entry.snapshotId()))
.ignoreExisting()
.columnStatsToKeep(context().returnColumnStatsToInclude());
.columnStatsToKeep(context().columnsToIncludeStats());

if (shouldIgnoreResiduals()) {
manifestGroup = manifestGroup.ignoreResiduals();
Expand Down
10 changes: 4 additions & 6 deletions core/src/main/java/org/apache/iceberg/BaseScan.java
Expand Up @@ -121,8 +121,8 @@ protected boolean shouldReturnColumnStats() {
return context().returnColumnStats();
}

protected Collection<Integer> columnStatsToInclude() {
return context().returnColumnStatsToInclude();
protected Set<Integer> columnsToIncludeStats() {
return context().columnsToIncludeStats();
}

protected boolean shouldIgnoreResiduals() {
Expand Down Expand Up @@ -170,11 +170,9 @@ public ThisT includeColumnStats() {
}

@Override
public ThisT includeColumnStats(Collection<Integer> statsNeeded) {
public ThisT includeColumnStats(Set<Integer> statsNeeded) {
return newRefinedScan(
table,
schema,
context.shouldReturnColumnStats(true).shouldReturnSpecificColumnStats(statsNeeded));
table, schema, context.shouldReturnColumnStats(true).columnsToIncludeStats(statsNeeded));
}

@Override
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/org/apache/iceberg/DataScan.java
Expand Up @@ -56,7 +56,7 @@ protected ManifestGroup newManifestGroup(
.specsById(table().specs())
.scanMetrics(scanMetrics())
.ignoreDeleted()
.columnStatsToKeep(context().returnColumnStatsToInclude());
.columnStatsToKeep(context().columnsToIncludeStats());

if (shouldIgnoreResiduals()) {
manifestGroup = manifestGroup.ignoreResiduals();
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/org/apache/iceberg/DataTableScan.java
Expand Up @@ -77,7 +77,7 @@ public CloseableIterable<FileScanTask> doPlanFiles() {
.specsById(table().specs())
.scanMetrics(scanMetrics())
.ignoreDeleted()
.columnStatsToKeep(context().returnColumnStatsToInclude());
.columnStatsToKeep(context().columnsToIncludeStats());

if (shouldIgnoreResiduals()) {
manifestGroup = manifestGroup.ignoreResiduals();
Expand Down
12 changes: 6 additions & 6 deletions core/src/main/java/org/apache/iceberg/GenericDataFile.java
Expand Up @@ -19,8 +19,8 @@
package org.apache.iceberg;

import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import org.apache.avro.Schema;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -68,11 +68,11 @@ class GenericDataFile extends BaseFile<DataFile> implements DataFile {
* Copy constructor.
*
* @param toCopy a generic data file to copy.
* @param fullCopy whether to copy all fields or to drop column-level stats
* @param statsToKeep column ids columns where we need to keep the stats
* @param fullCopy whether to copy all fields or to drop column-level stats.
* @param statsToKeep a set of column ids to keep stats. If empty or <code>null</code> then every
* column stat is kept.
*/
private GenericDataFile(
GenericDataFile toCopy, boolean fullCopy, Collection<Integer> statsToKeep) {
private GenericDataFile(GenericDataFile toCopy, boolean fullCopy, Set<Integer> statsToKeep) {
super(toCopy, fullCopy, statsToKeep);
}

Expand All @@ -85,7 +85,7 @@ public DataFile copyWithoutStats() {
}

@Override
public DataFile copyWithSpecificStats(Collection<Integer> statsToKeep) {
public DataFile copyWithStats(Set<Integer> statsToKeep) {
return new GenericDataFile(this, true, statsToKeep);
}

Expand Down

0 comments on commit 8c183a1

Please sign in to comment.