Skip to content

Commit

Permalink
Anton's comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Peter Vary committed Nov 10, 2023
1 parent 9872f1f commit f6279d0
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 44 deletions.
12 changes: 6 additions & 6 deletions api/src/main/java/org/apache/iceberg/ContentFile.java
Expand Up @@ -167,13 +167,13 @@ default Long fileSequenceNumber() {
F copyWithoutStats();

/**
* Copies this file with only specific column stats. Manifest readers can reuse file instances;
* use this method to copy data and only copy specific stats when collecting files.
* Copies this file with column stats only for specific columns. Manifest readers can reuse file
* instances; use this method to copy data with stats only for specific columns when collecting
* files.
*
* @param requestedColumnIds column ids for which to keep stats. If <code>null</code> then every
* column stat is kept.
* @return a copy of this data file, with stats lower bounds, upper bounds, value counts, null
* value counts, and nan value counts for only specific columns.
* @param requestedColumnIds column IDs for which to keep stats.
* @return a copy of data file, with lower bounds, upper bounds, value counts, null value counts,
* and nan value counts for only specific columns.
*/
default F copyWithStats(Set<Integer> requestedColumnIds) {
throw new UnsupportedOperationException(
Expand Down
61 changes: 23 additions & 38 deletions core/src/main/java/org/apache/iceberg/BaseFile.java
Expand Up @@ -26,8 +26,6 @@
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import org.apache.avro.specific.SpecificData;
Expand Down Expand Up @@ -191,24 +189,29 @@ public PartitionData copy() {
this.recordCount = toCopy.recordCount;
this.fileSizeInBytes = toCopy.fileSizeInBytes;
if (copyStats) {
this.columnSizes =
filterColumnsStats(toCopy.columnSizes, requestedColumnIds, SerializableMap::copyOf);
this.valueCounts =
filterColumnsStats(toCopy.valueCounts, requestedColumnIds, SerializableMap::copyOf);
this.nullValueCounts =
filterColumnsStats(toCopy.nullValueCounts, requestedColumnIds, SerializableMap::copyOf);
this.nanValueCounts =
filterColumnsStats(toCopy.nanValueCounts, requestedColumnIds, SerializableMap::copyOf);
this.lowerBounds =
filterColumnsStats(
toCopy.lowerBounds,
requestedColumnIds,
m -> SerializableByteBufferMap.wrap(SerializableMap.copyOf(m)));
this.upperBounds =
filterColumnsStats(
toCopy.upperBounds,
requestedColumnIds,
m -> SerializableByteBufferMap.wrap(SerializableMap.copyOf(m)));
if (requestedColumnIds == null) {
this.columnSizes = SerializableMap.copyOf(toCopy.columnSizes);
this.valueCounts = SerializableMap.copyOf(toCopy.valueCounts);
this.nullValueCounts = SerializableMap.copyOf(toCopy.nullValueCounts);
this.nanValueCounts = SerializableMap.copyOf(toCopy.nanValueCounts);
this.lowerBounds =
SerializableByteBufferMap.wrap(SerializableMap.copyOf(toCopy.lowerBounds));
this.upperBounds =
SerializableByteBufferMap.wrap(SerializableMap.copyOf(toCopy.upperBounds));
} else {
this.columnSizes = SerializableMap.filteredCopyOf(toCopy.columnSizes, requestedColumnIds);
this.valueCounts = SerializableMap.filteredCopyOf(toCopy.valueCounts, requestedColumnIds);
this.nullValueCounts =
SerializableMap.filteredCopyOf(toCopy.nullValueCounts, requestedColumnIds);
this.nanValueCounts =
SerializableMap.filteredCopyOf(toCopy.nanValueCounts, requestedColumnIds);
this.lowerBounds =
SerializableByteBufferMap.wrap(
SerializableMap.filteredCopyOf(toCopy.lowerBounds, requestedColumnIds));
this.upperBounds =
SerializableByteBufferMap.wrap(
SerializableMap.filteredCopyOf(toCopy.upperBounds, requestedColumnIds));
}
} else {
this.columnSizes = null;
this.valueCounts = null;
Expand Down Expand Up @@ -521,24 +524,6 @@ private static Map<Integer, ByteBuffer> toReadableByteBufferMap(Map<Integer, Byt
}
}

private static <TypeT> Map<Integer, TypeT> filterColumnsStats(
Map<Integer, TypeT> map,
Set<Integer> columnIds,
Function<Map<Integer, TypeT>, Map<Integer, TypeT>> copyFunction) {
if (columnIds == null || columnIds.isEmpty()) {
return copyFunction.apply(map);
}

if (map == null) {
return null;
}

return copyFunction.apply(
columnIds.stream()
.filter(map::containsKey)
.collect(Collectors.toMap(Function.identity(), map::get)));
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
Expand Down
16 changes: 16 additions & 0 deletions core/src/main/java/org/apache/iceberg/util/SerializableMap.java
Expand Up @@ -39,10 +39,26 @@ private SerializableMap(Map<K, V> map) {
this.copiedMap.putAll(map);
}

private SerializableMap(Map<K, V> map, Set<K> keys) {
Map<K, V> filteredMap = Maps.newHashMapWithExpectedSize(keys.size());

for (K key : keys) {
if (map.containsKey(key)) {
filteredMap.put(key, map.get(key));
}
}

this.copiedMap = filteredMap;
}

public static <K, V> SerializableMap<K, V> copyOf(Map<K, V> map) {
return map == null ? null : new SerializableMap<>(map);
}

public static <K, V> SerializableMap<K, V> filteredCopyOf(Map<K, V> map, Set<K> keys) {
return map == null ? null : new SerializableMap<>(map, keys);
}

public Map<K, V> immutableMap() {
if (immutableMap == null) {
synchronized (this) {
Expand Down

0 comments on commit f6279d0

Please sign in to comment.