diff --git a/api/src/main/java/org/apache/iceberg/BatchScanAdapter.java b/api/src/main/java/org/apache/iceberg/BatchScanAdapter.java index dd63ddff333f..3b64a91b311d 100644 --- a/api/src/main/java/org/apache/iceberg/BatchScanAdapter.java +++ b/api/src/main/java/org/apache/iceberg/BatchScanAdapter.java @@ -19,6 +19,7 @@ package org.apache.iceberg; import java.util.Collection; +import java.util.Set; import java.util.concurrent.ExecutorService; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.io.CloseableIterable; @@ -84,7 +85,7 @@ public BatchScan includeColumnStats() { } @Override - public BatchScan includeColumnStats(Collection columns) { + public BatchScan includeColumnStats(Set columns) { return new BatchScanAdapter(scan.includeColumnStats(columns)); } diff --git a/api/src/main/java/org/apache/iceberg/ContentFile.java b/api/src/main/java/org/apache/iceberg/ContentFile.java index 2c2225878c8b..23d43c0ba3d9 100644 --- a/api/src/main/java/org/apache/iceberg/ContentFile.java +++ b/api/src/main/java/org/apache/iceberg/ContentFile.java @@ -19,9 +19,9 @@ package org.apache.iceberg; import java.nio.ByteBuffer; -import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Set; /** * Superinterface of {@link DataFile} and {@link DeleteFile} that exposes common methods. @@ -168,13 +168,14 @@ default Long fileSequenceNumber() { /** * Copies this file with only specific column stats. Manifest readers can reuse file instances; - * use this method to copy data and only copy specific stats when collecting files. + * use this method to copy data and only copy specific stats when collecting files. If the + * columnsToKeepStats set is empty or null, then all column stats will be kept. * - * @param statsToKeep the collection of the column ids for the columns which stats are kept + * @param columnsToKeepStats the set of the column ids for the columns which stats are kept. * @return a copy of this data file, with stats lower bounds, upper bounds, value counts, null * value counts, and nan value counts for only specific columns. */ - default F copyWithSpecificStats(Collection statsToKeep) { + default F copyWithStats(Set columnsToKeepStats) { throw new UnsupportedOperationException( this.getClass().getName() + " doesn't implement copyWithSpecificStats"); } @@ -191,26 +192,4 @@ default F copyWithSpecificStats(Collection statsToKeep) { default F copy(boolean withStats) { return withStats ? copy() : copyWithoutStats(); } - - /** - * Copies this file (potentially with or without specific column stats). Manifest readers can - * reuse file instances; use this method to copy data when collecting files from tasks. - * - * @param withStats Will copy this file without file stats if set to false. - * @param statsToKeep Will keep stats only for these columns. Not used if withStats - * is set to false. - * @return a copy of this data file. If "withStats" is set to false the file will not - * contain lower bounds, upper bounds, value counts, null value counts, or nan value counts. - * If "withStats" is set to true and the "statsToKeep" is not empty then only - * specific column stats will be kept. - */ - default F copy(boolean withStats, Collection statsToKeep) { - if (withStats) { - return statsToKeep != null && !statsToKeep.isEmpty() - ? copyWithSpecificStats(statsToKeep) - : copy(); - } else { - return copyWithoutStats(); - } - } } diff --git a/api/src/main/java/org/apache/iceberg/Scan.java b/api/src/main/java/org/apache/iceberg/Scan.java index d1f9f66b67f0..b279223957e2 100644 --- a/api/src/main/java/org/apache/iceberg/Scan.java +++ b/api/src/main/java/org/apache/iceberg/Scan.java @@ -19,6 +19,7 @@ package org.apache.iceberg; import java.util.Collection; +import java.util.Set; import java.util.concurrent.ExecutorService; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.io.CloseableIterable; @@ -79,14 +80,15 @@ public interface Scan> { /** * Create a new scan from this that loads the column stats for the specific columns with each data - * file. + * file. If the columns set is empty or null then all column stats will be kept, if + * {@link #includeColumnStats()} is set. * *

Column stats include: value count, null value count, lower bounds, and upper bounds. * * @param columns column ids from the table's schema * @return a new scan based on this that loads column stats for specific columns. */ - default ThisT includeColumnStats(Collection columns) { + default ThisT includeColumnStats(Set columns) { throw new UnsupportedOperationException( this.getClass().getName() + " doesn't implement includeColumnStats"); } diff --git a/api/src/main/java/org/apache/iceberg/util/ContentFileUtil.java b/api/src/main/java/org/apache/iceberg/util/ContentFileUtil.java new file mode 100644 index 000000000000..41d74609afa0 --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/util/ContentFileUtil.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.util; + +import java.util.Set; +import org.apache.iceberg.ContentFile; + +public class ContentFileUtil { + private ContentFileUtil() {} + + /** + * Copies the {@link ContentFile} with the specific stat settings. + * + * @param file a generic data file to copy. + * @param withStats whether to keep any stats + * @param statsToKeep a set of column ids to keep stats. If empty or null then every + * column stat is kept. + * @return The copied file + */ + public static , K> K copy( + F file, boolean withStats, Set statsToKeep) { + if (withStats) { + return statsToKeep != null && !statsToKeep.isEmpty() + ? file.copyWithStats(statsToKeep) + : file.copy(); + } else { + return file.copyWithoutStats(); + } + } +} diff --git a/api/src/test/java/org/apache/iceberg/TestHelpers.java b/api/src/test/java/org/apache/iceberg/TestHelpers.java index f655b01e9037..19cf8b6f7262 100644 --- a/api/src/test/java/org/apache/iceberg/TestHelpers.java +++ b/api/src/test/java/org/apache/iceberg/TestHelpers.java @@ -36,9 +36,9 @@ import java.lang.invoke.SerializedLambda; import java.nio.ByteBuffer; import java.util.Arrays; -import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.stream.IntStream; import org.apache.iceberg.expressions.BoundPredicate; import org.apache.iceberg.expressions.BoundSetPredicate; @@ -664,7 +664,7 @@ public DataFile copyWithoutStats() { } @Override - public DataFile copyWithSpecificStats(Collection statsToKeep) { + public DataFile copyWithStats(Set statsToKeep) { return this; } diff --git a/core/src/main/java/org/apache/iceberg/BaseDistributedDataScan.java b/core/src/main/java/org/apache/iceberg/BaseDistributedDataScan.java index 5f5d87298547..5bf040d61e02 100644 --- a/core/src/main/java/org/apache/iceberg/BaseDistributedDataScan.java +++ b/core/src/main/java/org/apache/iceberg/BaseDistributedDataScan.java @@ -38,6 +38,7 @@ import org.apache.iceberg.metrics.ScanMetricsUtil; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.util.ContentFileUtil; import org.apache.iceberg.util.ParallelIterable; import org.apache.iceberg.util.TableScanUtil; import org.apache.iceberg.util.ThreadPools; @@ -369,7 +370,8 @@ private CloseableIterable toFileTasks( return new BaseFileScanTask( copyDataFiles - ? dataFile.copy(shouldReturnColumnStats(), columnStatsToInclude()) + ? ContentFileUtil.copy( + dataFile, shouldReturnColumnStats(), columnsToIncludeStats()) : dataFile, deleteFiles, schemaString, diff --git a/core/src/main/java/org/apache/iceberg/BaseFile.java b/core/src/main/java/org/apache/iceberg/BaseFile.java index 8ed1c5236d7c..c7c9ef89c700 100644 --- a/core/src/main/java/org/apache/iceberg/BaseFile.java +++ b/core/src/main/java/org/apache/iceberg/BaseFile.java @@ -21,11 +21,11 @@ import java.io.Serializable; import java.nio.ByteBuffer; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Set; import org.apache.avro.Schema; import org.apache.avro.generic.IndexedRecord; import org.apache.avro.specific.SpecificData; @@ -176,9 +176,10 @@ public PartitionData copy() { * * @param toCopy a generic data file to copy. * @param fullCopy whether to copy all fields or to drop column-level stats - * @param statsToKeep the collection of the column ids for the columns which stats are kept + * @param statsToKeep a set of column ids to keep stats. If empty or null then every + * column stat is kept. */ - BaseFile(BaseFile toCopy, boolean fullCopy, Collection statsToKeep) { + BaseFile(BaseFile toCopy, boolean fullCopy, Set statsToKeep) { this.fileOrdinal = toCopy.fileOrdinal; this.partitionSpecId = toCopy.partitionSpecId; this.content = toCopy.content; @@ -189,23 +190,12 @@ public PartitionData copy() { this.recordCount = toCopy.recordCount; this.fileSizeInBytes = toCopy.fileSizeInBytes; if (fullCopy) { - if (statsToKeep == null || statsToKeep.isEmpty()) { - this.columnSizes = SerializableMap.copyOf(toCopy.columnSizes); - this.valueCounts = SerializableMap.copyOf(toCopy.valueCounts); - this.nullValueCounts = SerializableMap.copyOf(toCopy.nullValueCounts); - this.nanValueCounts = SerializableMap.copyOf(toCopy.nanValueCounts); - this.lowerBounds = - SerializableByteBufferMap.wrap(SerializableMap.copyOf(toCopy.lowerBounds)); - this.upperBounds = - SerializableByteBufferMap.wrap(SerializableMap.copyOf(toCopy.upperBounds)); - } else { - this.columnSizes = filteredLongMap(toCopy.columnSizes, statsToKeep); - this.valueCounts = filteredLongMap(toCopy.valueCounts, statsToKeep); - this.nullValueCounts = filteredLongMap(toCopy.nullValueCounts, statsToKeep); - this.nanValueCounts = filteredLongMap(toCopy.nanValueCounts, statsToKeep); - this.lowerBounds = filteredByteBufferMap(toCopy.lowerBounds, statsToKeep); - this.upperBounds = filteredByteBufferMap(toCopy.upperBounds, statsToKeep); - } + this.columnSizes = filterColumnsStats(toCopy.columnSizes, statsToKeep); + this.valueCounts = filterColumnsStats(toCopy.valueCounts, statsToKeep); + this.nullValueCounts = filterColumnsStats(toCopy.nullValueCounts, statsToKeep); + this.nanValueCounts = filterColumnsStats(toCopy.nanValueCounts, statsToKeep); + this.lowerBounds = filterColumnsStats(toCopy.lowerBounds, statsToKeep); + this.upperBounds = filterColumnsStats(toCopy.upperBounds, statsToKeep); } else { this.columnSizes = null; this.valueCounts = null; @@ -518,32 +508,19 @@ private static Map toReadableByteBufferMap(Map filteredLongMap( - Map map, Collection columnIds) { - if (map == null) { - return null; - } - - Map filtered = Maps.newHashMapWithExpectedSize(columnIds.size()); - for (Integer columnId : columnIds) { - Long value = map.get(columnId); - if (value != null) { - filtered.put(columnId, value); - } + private static Map filterColumnsStats( + Map map, Set columnIds) { + if (columnIds == null || columnIds.isEmpty()) { + return SerializableMap.copyOf(map); } - return SerializableMap.copyOf(filtered); - } - - private static Map filteredByteBufferMap( - Map map, Collection columnIds) { if (map == null) { return null; } - Map filtered = Maps.newHashMapWithExpectedSize(columnIds.size()); + Map filtered = Maps.newHashMapWithExpectedSize(columnIds.size()); for (Integer columnId : columnIds) { - ByteBuffer value = map.get(columnId); + TypeT value = map.get(columnId); if (value != null) { filtered.put(columnId, value); } diff --git a/core/src/main/java/org/apache/iceberg/BaseIncrementalAppendScan.java b/core/src/main/java/org/apache/iceberg/BaseIncrementalAppendScan.java index ffb74ba46937..621ddc634305 100644 --- a/core/src/main/java/org/apache/iceberg/BaseIncrementalAppendScan.java +++ b/core/src/main/java/org/apache/iceberg/BaseIncrementalAppendScan.java @@ -84,7 +84,7 @@ private CloseableIterable appendFilesFromSnapshots(List && manifestEntry.status() == ManifestEntry.Status.ADDED) .specsById(table().specs()) .ignoreDeleted() - .columnStatsToKeep(context().returnColumnStatsToInclude()); + .columnStatsToKeep(context().columnsToIncludeStats()); if (context().ignoreResiduals()) { manifestGroup = manifestGroup.ignoreResiduals(); diff --git a/core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java b/core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java index d9f32537b5f1..cca04fa1ff8d 100644 --- a/core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java +++ b/core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java @@ -79,7 +79,7 @@ protected CloseableIterable doPlanFiles( .filterData(filter()) .filterManifestEntries(entry -> changelogSnapshotIds.contains(entry.snapshotId())) .ignoreExisting() - .columnStatsToKeep(context().returnColumnStatsToInclude()); + .columnStatsToKeep(context().columnsToIncludeStats()); if (shouldIgnoreResiduals()) { manifestGroup = manifestGroup.ignoreResiduals(); diff --git a/core/src/main/java/org/apache/iceberg/BaseScan.java b/core/src/main/java/org/apache/iceberg/BaseScan.java index 9d94ebe8e0fa..14115605506c 100644 --- a/core/src/main/java/org/apache/iceberg/BaseScan.java +++ b/core/src/main/java/org/apache/iceberg/BaseScan.java @@ -121,8 +121,8 @@ protected boolean shouldReturnColumnStats() { return context().returnColumnStats(); } - protected Collection columnStatsToInclude() { - return context().returnColumnStatsToInclude(); + protected Set columnsToIncludeStats() { + return context().columnsToIncludeStats(); } protected boolean shouldIgnoreResiduals() { @@ -170,11 +170,9 @@ public ThisT includeColumnStats() { } @Override - public ThisT includeColumnStats(Collection statsNeeded) { + public ThisT includeColumnStats(Set statsNeeded) { return newRefinedScan( - table, - schema, - context.shouldReturnColumnStats(true).shouldReturnSpecificColumnStats(statsNeeded)); + table, schema, context.shouldReturnColumnStats(true).columnsToIncludeStats(statsNeeded)); } @Override diff --git a/core/src/main/java/org/apache/iceberg/DataScan.java b/core/src/main/java/org/apache/iceberg/DataScan.java index e0001bb31cda..7a7acebee91b 100644 --- a/core/src/main/java/org/apache/iceberg/DataScan.java +++ b/core/src/main/java/org/apache/iceberg/DataScan.java @@ -56,7 +56,7 @@ protected ManifestGroup newManifestGroup( .specsById(table().specs()) .scanMetrics(scanMetrics()) .ignoreDeleted() - .columnStatsToKeep(context().returnColumnStatsToInclude()); + .columnStatsToKeep(context().columnsToIncludeStats()); if (shouldIgnoreResiduals()) { manifestGroup = manifestGroup.ignoreResiduals(); diff --git a/core/src/main/java/org/apache/iceberg/DataTableScan.java b/core/src/main/java/org/apache/iceberg/DataTableScan.java index dad057ea25d7..9f22d1387827 100644 --- a/core/src/main/java/org/apache/iceberg/DataTableScan.java +++ b/core/src/main/java/org/apache/iceberg/DataTableScan.java @@ -77,7 +77,7 @@ public CloseableIterable doPlanFiles() { .specsById(table().specs()) .scanMetrics(scanMetrics()) .ignoreDeleted() - .columnStatsToKeep(context().returnColumnStatsToInclude()); + .columnStatsToKeep(context().columnsToIncludeStats()); if (shouldIgnoreResiduals()) { manifestGroup = manifestGroup.ignoreResiduals(); diff --git a/core/src/main/java/org/apache/iceberg/GenericDataFile.java b/core/src/main/java/org/apache/iceberg/GenericDataFile.java index a509e320b9a1..490d67c7c348 100644 --- a/core/src/main/java/org/apache/iceberg/GenericDataFile.java +++ b/core/src/main/java/org/apache/iceberg/GenericDataFile.java @@ -19,8 +19,8 @@ package org.apache.iceberg; import java.nio.ByteBuffer; -import java.util.Collection; import java.util.List; +import java.util.Set; import org.apache.avro.Schema; import org.apache.iceberg.avro.AvroSchemaUtil; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; @@ -68,11 +68,11 @@ class GenericDataFile extends BaseFile implements DataFile { * Copy constructor. * * @param toCopy a generic data file to copy. - * @param fullCopy whether to copy all fields or to drop column-level stats - * @param statsToKeep column ids columns where we need to keep the stats + * @param fullCopy whether to copy all fields or to drop column-level stats. + * @param statsToKeep a set of column ids to keep stats. If empty or null then every + * column stat is kept. */ - private GenericDataFile( - GenericDataFile toCopy, boolean fullCopy, Collection statsToKeep) { + private GenericDataFile(GenericDataFile toCopy, boolean fullCopy, Set statsToKeep) { super(toCopy, fullCopy, statsToKeep); } @@ -85,7 +85,7 @@ public DataFile copyWithoutStats() { } @Override - public DataFile copyWithSpecificStats(Collection statsToKeep) { + public DataFile copyWithStats(Set statsToKeep) { return new GenericDataFile(this, true, statsToKeep); } diff --git a/core/src/main/java/org/apache/iceberg/GenericDeleteFile.java b/core/src/main/java/org/apache/iceberg/GenericDeleteFile.java index bc16b677fee6..b78cf8b96b09 100644 --- a/core/src/main/java/org/apache/iceberg/GenericDeleteFile.java +++ b/core/src/main/java/org/apache/iceberg/GenericDeleteFile.java @@ -19,8 +19,8 @@ package org.apache.iceberg; import java.nio.ByteBuffer; -import java.util.Collection; import java.util.List; +import java.util.Set; import org.apache.avro.Schema; import org.apache.iceberg.avro.AvroSchemaUtil; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; @@ -69,11 +69,11 @@ class GenericDeleteFile extends BaseFile implements DeleteFile { * Copy constructor. * * @param toCopy a generic data file to copy. - * @param fullCopy whether to copy all fields or to drop column-level stats - * @param statsToKeep column ids columns where we need to keep the stats + * @param fullCopy whether to copy all fields or to drop column-level stats. + * @param statsToKeep a set of column ids to keep stats. If empty or null then every + * column stat is kept. */ - private GenericDeleteFile( - GenericDeleteFile toCopy, boolean fullCopy, Collection statsToKeep) { + private GenericDeleteFile(GenericDeleteFile toCopy, boolean fullCopy, Set statsToKeep) { super(toCopy, fullCopy, statsToKeep); } @@ -86,7 +86,7 @@ public DeleteFile copyWithoutStats() { } @Override - public DeleteFile copyWithSpecificStats(Collection statsToKeep) { + public DeleteFile copyWithStats(Set statsToKeep) { return new GenericDeleteFile(this, true, statsToKeep); } diff --git a/core/src/main/java/org/apache/iceberg/IncrementalDataTableScan.java b/core/src/main/java/org/apache/iceberg/IncrementalDataTableScan.java index 1cec93e4c4c5..b4633de13d41 100644 --- a/core/src/main/java/org/apache/iceberg/IncrementalDataTableScan.java +++ b/core/src/main/java/org/apache/iceberg/IncrementalDataTableScan.java @@ -103,7 +103,7 @@ public CloseableIterable planFiles() { && manifestEntry.status() == ManifestEntry.Status.ADDED) .specsById(table().specs()) .ignoreDeleted() - .columnStatsToKeep(context().returnColumnStatsToInclude()); + .columnStatsToKeep(context().columnsToIncludeStats()); if (shouldIgnoreResiduals()) { manifestGroup = manifestGroup.ignoreResiduals(); diff --git a/core/src/main/java/org/apache/iceberg/ManifestGroup.java b/core/src/main/java/org/apache/iceberg/ManifestGroup.java index 0a6d9041368f..7a6667e0066c 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestGroup.java +++ b/core/src/main/java/org/apache/iceberg/ManifestGroup.java @@ -21,7 +21,6 @@ import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.LoadingCache; import java.io.IOException; -import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Set; @@ -43,6 +42,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.ContentFileUtil; import org.apache.iceberg.util.ParallelIterable; class ManifestGroup { @@ -62,7 +62,7 @@ class ManifestGroup { private boolean ignoreResiduals; private List columns; private boolean caseSensitive; - private Collection columnStatsToKeep; + private Set columnStatsToKeep; private ExecutorService executorService; private ScanMetrics scanMetrics; @@ -156,7 +156,7 @@ ManifestGroup caseSensitive(boolean newCaseSensitive) { return this; } - ManifestGroup columnStatsToKeep(Collection newColumnStatsToKeep) { + ManifestGroup columnStatsToKeep(Set newColumnStatsToKeep) { this.columnStatsToKeep = newColumnStatsToKeep; return this; } @@ -369,7 +369,8 @@ private static CloseableIterable createFileScanTasks( return CloseableIterable.transform( entries, entry -> { - DataFile dataFile = entry.file().copy(ctx.shouldKeepStats(), ctx.statsToKeep()); + DataFile dataFile = + ContentFileUtil.copy(entry.file(), ctx.shouldKeepStats(), ctx.statsToKeep()); DeleteFile[] deleteFiles = ctx.deletes().forEntry(entry); ScanMetricsUtil.fileTask(ctx.scanMetrics(), dataFile, deleteFiles); return new BaseFileScanTask( @@ -389,7 +390,7 @@ static class TaskContext { private final DeleteFileIndex deletes; private final ResidualEvaluator residuals; private final boolean dropStats; - private final Collection statsToKeep; + private final Set statsToKeep; private final ScanMetrics scanMetrics; TaskContext( @@ -397,7 +398,7 @@ static class TaskContext { DeleteFileIndex deletes, ResidualEvaluator residuals, boolean dropStats, - Collection statsToKeep, + Set statsToKeep, ScanMetrics scanMetrics) { this.schemaAsString = SchemaParser.toJson(spec.schema()); this.specAsString = PartitionSpecParser.toJson(spec); @@ -428,7 +429,7 @@ boolean shouldKeepStats() { return !dropStats; } - Collection statsToKeep() { + Set statsToKeep() { return statsToKeep; } diff --git a/core/src/main/java/org/apache/iceberg/TableScanContext.java b/core/src/main/java/org/apache/iceberg/TableScanContext.java index e558436db221..1fa7436715bf 100644 --- a/core/src/main/java/org/apache/iceberg/TableScanContext.java +++ b/core/src/main/java/org/apache/iceberg/TableScanContext.java @@ -20,6 +20,7 @@ import java.util.Collection; import java.util.Map; +import java.util.Set; import java.util.concurrent.ExecutorService; import javax.annotation.Nullable; import org.apache.iceberg.expressions.Expression; @@ -28,8 +29,8 @@ import org.apache.iceberg.metrics.MetricsReporter; import org.apache.iceberg.metrics.MetricsReporters; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.util.ThreadPools; import org.immutables.value.Value; @@ -61,8 +62,8 @@ public boolean returnColumnStats() { } @Value.Default - public Collection returnColumnStatsToInclude() { - return ImmutableList.of(); + public Set columnsToIncludeStats() { + return ImmutableSet.of(); } @Nullable @@ -131,13 +132,13 @@ TableScanContext shouldReturnColumnStats(boolean returnColumnStats) { .build(); } - TableScanContext shouldReturnSpecificColumnStats(Collection columStatsToInclude) { + TableScanContext columnsToIncludeStats(Set columnStatsToInclude) { Preconditions.checkState( returnColumnStats(), "Cannot select column stats to include when column stats are not returned"); return ImmutableTableScanContext.builder() .from(this) - .returnColumnStatsToInclude(columStatsToInclude) + .columnsToIncludeStats(columnStatsToInclude) .build(); } diff --git a/core/src/main/java/org/apache/iceberg/V1Metadata.java b/core/src/main/java/org/apache/iceberg/V1Metadata.java index f17c523fdeac..71b06b90c9de 100644 --- a/core/src/main/java/org/apache/iceberg/V1Metadata.java +++ b/core/src/main/java/org/apache/iceberg/V1Metadata.java @@ -21,9 +21,9 @@ import static org.apache.iceberg.types.Types.NestedField.required; import java.nio.ByteBuffer; -import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.avro.generic.IndexedRecord; import org.apache.iceberg.avro.AvroSchemaUtil; import org.apache.iceberg.types.Types; @@ -487,8 +487,8 @@ public DataFile copy() { } @Override - public DataFile copyWithSpecificStats(Collection statsToKeep) { - return wrapped.copyWithSpecificStats(statsToKeep); + public DataFile copyWithStats(Set statsToKeep) { + return wrapped.copyWithStats(statsToKeep); } @Override diff --git a/core/src/main/java/org/apache/iceberg/V2Metadata.java b/core/src/main/java/org/apache/iceberg/V2Metadata.java index f7b7d1331aa5..8e5cc7e5b6e4 100644 --- a/core/src/main/java/org/apache/iceberg/V2Metadata.java +++ b/core/src/main/java/org/apache/iceberg/V2Metadata.java @@ -21,9 +21,9 @@ import static org.apache.iceberg.types.Types.NestedField.required; import java.nio.ByteBuffer; -import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.avro.generic.IndexedRecord; import org.apache.iceberg.avro.AvroSchemaUtil; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -562,8 +562,8 @@ public F copy() { } @Override - public F copyWithSpecificStats(Collection statsToKeep) { - return wrapped.copyWithSpecificStats(statsToKeep); + public F copyWithStats(Set statsToKeep) { + return wrapped.copyWithStats(statsToKeep); } @Override diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkReadConf.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkReadConf.java index cf1852510874..312bd8a3b91c 100644 --- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkReadConf.java +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkReadConf.java @@ -20,8 +20,8 @@ import java.time.Duration; import java.util.Arrays; -import java.util.Collection; import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.util.TimeUtils; @@ -155,7 +155,7 @@ public boolean includeColumnStats() { .parse(); } - public Collection columnStatsToKeep() { + public Set columnStatsToKeep() { return split( confParser .stringConf() @@ -204,7 +204,7 @@ public int maxAllowedPlanningFailures() { .parse(); } - public static Collection split(String text) { + public static Set split(String text) { return Arrays.stream(text.split("\\s*,\\s*")) .filter(s -> !s.isEmpty()) .map(Integer::parseInt) diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java index 10bfbe7beaca..e443ef8104d7 100644 --- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java @@ -20,9 +20,9 @@ import java.io.Serializable; import java.time.Duration; -import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.flink.annotation.Internal; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.util.Preconditions; @@ -63,7 +63,7 @@ public class ScanContext implements Serializable { private final List filters; private final long limit; private final boolean includeColumnStats; - private final Collection columnStatsToKeep; + private final Set columnStatsToKeep; private final Integer planParallelism; private final int maxPlanningSnapshotCount; private final int maxAllowedPlanningFailures; @@ -86,7 +86,7 @@ private ScanContext( List filters, long limit, boolean includeColumnStats, - Collection columnStatsToKeep, + Set columnStatsToKeep, boolean exposeLocality, Integer planParallelism, int maxPlanningSnapshotCount, @@ -252,7 +252,7 @@ public boolean includeColumnStats() { return includeColumnStats; } - public Collection columnStatsToKeep() { + public Set columnStatsToKeep() { return columnStatsToKeep; } @@ -359,7 +359,7 @@ public static class Builder { private long limit = FlinkReadOptions.LIMIT_OPTION.defaultValue(); private boolean includeColumnStats = FlinkReadOptions.INCLUDE_COLUMN_STATS_OPTION.defaultValue(); - private Collection columnStatsToKeep = + private Set columnStatsToKeep = FlinkReadConf.split(FlinkReadOptions.COLUMN_STATS_TO_KEEP_OPTION.defaultValue()); private boolean exposeLocality; private Integer planParallelism = @@ -476,7 +476,7 @@ public Builder includeColumnStats(boolean newIncludeColumnStats) { return this; } - public Builder columnStatsToKeep(Collection newColumnStatsToKeep) { + public Builder columnStatsToKeep(Set newColumnStatsToKeep) { this.columnStatsToKeep = newColumnStatsToKeep; return this; }