Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
ajantha-bhat committed Dec 12, 2023
1 parent 8ed6cc1 commit 3124544
Show file tree
Hide file tree
Showing 17 changed files with 56 additions and 142 deletions.
4 changes: 0 additions & 4 deletions .palantir/revapi.yml
Expand Up @@ -873,10 +873,6 @@ acceptedBreaks:
new: "method void org.apache.iceberg.encryption.Ciphers::<init>()"
justification: "Static utility class - should not have public constructor"
"1.4.0":
org.apache.iceberg:iceberg-api:
- code: "java.method.addedToInterface"
new: "method java.util.List<org.apache.iceberg.PartitionStatisticsFile> org.apache.iceberg.Table::partitionStatisticsFiles()"
justification: "Track partition stats from TableMetadata"
org.apache.iceberg:iceberg-core:
- code: "java.field.serialVersionUIDChanged"
new: "field org.apache.iceberg.util.SerializableMap<K, V>.serialVersionUID"
Expand Down
Expand Up @@ -21,8 +21,7 @@
import java.io.Serializable;

/**
* Represents a partition statistics file in the table default format, that can be used to read
* table data more efficiently.
* Represents a partition statistics file that can be used to read table data more efficiently.
*
* <p>Statistics are informational. A reader can choose to ignore statistics information. Statistics
* support is not required to read the table correctly.
Expand All @@ -31,9 +30,7 @@ public interface PartitionStatisticsFile extends Serializable {
/** ID of the Iceberg table's snapshot the partition statistics file is associated with. */
long snapshotId();

/**
* Returns fully qualified path to the file, suitable for constructing a Hadoop Path. Never null.
*/
/** Returns fully qualified path to the file. Never null. */
String path();

/** Returns the size of the partition statistics file in bytes. */
Expand Down
5 changes: 4 additions & 1 deletion api/src/main/java/org/apache/iceberg/Table.java
Expand Up @@ -24,6 +24,7 @@
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.LocationProvider;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;

/** Represents a table. */
public interface Table {
Expand Down Expand Up @@ -339,7 +340,9 @@ default UpdatePartitionStatistics updatePartitionStatistics() {
List<StatisticsFile> statisticsFiles();

/** Returns the current partition statistics files for the table. */
List<PartitionStatisticsFile> partitionStatisticsFiles();
default List<PartitionStatisticsFile> partitionStatisticsFiles() {
return ImmutableList.of();
}

/**
* Returns the current refs for the table
Expand Down
Expand Up @@ -28,8 +28,7 @@ public interface UpdatePartitionStatistics extends PendingUpdate<List<PartitionS
*
* @return this for method chaining
*/
UpdatePartitionStatistics setPartitionStatistics(
long snapshotId, PartitionStatisticsFile partitionStatisticsFile);
UpdatePartitionStatistics setPartitionStatistics(PartitionStatisticsFile partitionStatisticsFile);

/**
* Remove the table's partition statistics file for given snapshot.
Expand Down
Expand Up @@ -85,6 +85,11 @@ protected void deleteFiles(Set<String> pathsToDelete, String fileType) {
.run(deleteFunc::accept);
}

protected boolean hasAnyStatisticsFiles(TableMetadata tableMetadata) {
return !tableMetadata.statisticsFiles().isEmpty()
|| !tableMetadata.partitionStatisticsFiles().isEmpty();
}

protected Set<String> expiredStatisticsFilesLocations(
TableMetadata beforeExpiration, TableMetadata afterExpiration) {
Set<String> statsFileLocationsBeforeExpiration = statsFileLocations(beforeExpiration);
Expand Down
Expand Up @@ -262,8 +262,7 @@ public void cleanFiles(TableMetadata beforeExpiration, TableMetadata afterExpira
deleteFiles(manifestsToDelete, "manifest");
deleteFiles(manifestListsToDelete, "manifest list");

if (!beforeExpiration.statisticsFiles().isEmpty()
|| !beforeExpiration.partitionStatisticsFiles().isEmpty()) {
if (hasAnyStatisticsFiles(beforeExpiration)) {
Set<String> expiredStatisticsFilesLocations =
expiredStatisticsFilesLocations(beforeExpiration, afterExpiration);
deleteFiles(expiredStatisticsFilesLocations, "statistics files");
Expand Down
9 changes: 3 additions & 6 deletions core/src/main/java/org/apache/iceberg/MetadataUpdate.java
Expand Up @@ -244,17 +244,14 @@ public void applyTo(TableMetadata.Builder metadataBuilder) {
}

class SetPartitionStatistics implements MetadataUpdate {
private final long snapshotId;
private final PartitionStatisticsFile partitionStatisticsFile;

public SetPartitionStatistics(
long snapshotId, PartitionStatisticsFile partitionStatisticsFile) {
this.snapshotId = snapshotId;
public SetPartitionStatistics(PartitionStatisticsFile partitionStatisticsFile) {
this.partitionStatisticsFile = partitionStatisticsFile;
}

public long snapshotId() {
return snapshotId;
return partitionStatisticsFile.snapshotId();
}

public PartitionStatisticsFile partitionStatisticsFile() {
Expand All @@ -263,7 +260,7 @@ public PartitionStatisticsFile partitionStatisticsFile() {

@Override
public void applyTo(TableMetadata.Builder metadataBuilder) {
metadataBuilder.setPartitionStatistics(snapshotId, partitionStatisticsFile);
metadataBuilder.setPartitionStatistics(partitionStatisticsFile);
}
}

Expand Down
Expand Up @@ -376,7 +376,6 @@ private static void writeRemoveStatistics(

private static void writeSetPartitionStatistics(
MetadataUpdate.SetPartitionStatistics update, JsonGenerator gen) throws IOException {
gen.writeNumberField(SNAPSHOT_ID, update.snapshotId());
gen.writeFieldName(PARTITION_STATISTICS);
PartitionStatisticsFileParser.toJson(update.partitionStatisticsFile(), gen);
}
Expand Down Expand Up @@ -510,11 +509,10 @@ private static MetadataUpdate readRemoveStatistics(JsonNode node) {
}

private static MetadataUpdate readSetPartitionStatistics(JsonNode node) {
long snapshotId = JsonUtil.getLong(SNAPSHOT_ID, node);
JsonNode partitionStatisticsFileNode = JsonUtil.get(PARTITION_STATISTICS, node);
PartitionStatisticsFile partitionStatisticsFile =
PartitionStatisticsFileParser.fromJson(partitionStatisticsFileNode);
return new MetadataUpdate.SetPartitionStatistics(snapshotId, partitionStatisticsFile);
return new MetadataUpdate.SetPartitionStatistics(partitionStatisticsFile);
}

private static MetadataUpdate readRemovePartitionStatistics(JsonNode node) {
Expand Down
Expand Up @@ -82,8 +82,7 @@ public void cleanFiles(TableMetadata beforeExpiration, TableMetadata afterExpira

deleteFiles(manifestListsToDelete, "manifest list");

if (!beforeExpiration.statisticsFiles().isEmpty()
|| !beforeExpiration.partitionStatisticsFiles().isEmpty()) {
if (hasAnyStatisticsFiles(beforeExpiration)) {
deleteFiles(
expiredStatisticsFilesLocations(beforeExpiration, afterExpiration), "statistics files");
}
Expand Down
52 changes: 0 additions & 52 deletions core/src/main/java/org/apache/iceberg/ReachableFileUtil.java
Expand Up @@ -137,9 +137,7 @@ public static List<String> manifestListLocations(Table table, Set<Long> snapshot
*
* @param table table for which statistics files needs to be listed
* @return the location of statistics files
* @deprecated use the {@code allStatisticsFilesLocations(table)} instead.
*/
@Deprecated
public static List<String> statisticsFilesLocations(Table table) {
return statisticsFilesLocations(table, statisticsFile -> true);
}
Expand All @@ -150,62 +148,12 @@ public static List<String> statisticsFilesLocations(Table table) {
* @param table table for which statistics files needs to be listed
* @param predicate predicate for filtering the statistics files
* @return the location of statistics files
* @deprecated use the {@code allStatisticsFilesLocations(table, snapshotIds)} instead.
*/
@Deprecated
public static List<String> statisticsFilesLocations(
Table table, Predicate<StatisticsFile> predicate) {
return table.statisticsFiles().stream()
.filter(predicate)
.map(StatisticsFile::path)
.collect(Collectors.toList());
}

/**
* Returns locations of statistics files in a table.
*
* @param table table for which statistics files needs to be listed
* @return the location of statistics files
*/
public static List<String> allStatisticsFilesLocations(Table table) {
return allStatisticsFilesLocations(table, null);
}

/**
* Returns locations of statistics files for a table for given snapshots.
*
* @param table table for which statistics files needs to be listed
* @param snapshotIds snapshot IDs for which statistics files needs to be listed
* @return the location of statistics files
*/
public static List<String> allStatisticsFilesLocations(Table table, Set<Long> snapshotIds) {
List<String> statsFileLocations = Lists.newArrayList();

Predicate<StatisticsFile> statisticsFilePredicate;
Predicate<PartitionStatisticsFile> partitionStatisticsFilePredicate;
if (snapshotIds == null) {
statisticsFilePredicate = statisticsFile -> true;
partitionStatisticsFilePredicate = partitionStatisticsFile -> true;
} else {
statisticsFilePredicate = statisticsFile -> snapshotIds.contains(statisticsFile.snapshotId());
partitionStatisticsFilePredicate =
partitionStatisticsFile -> snapshotIds.contains(partitionStatisticsFile.snapshotId());
}

if (table.statisticsFiles() != null) {
table.statisticsFiles().stream()
.filter(statisticsFilePredicate)
.map(StatisticsFile::path)
.forEach(statsFileLocations::add);
}

if (table.partitionStatisticsFiles() != null) {
table.partitionStatisticsFiles().stream()
.filter(partitionStatisticsFilePredicate)
.map(PartitionStatisticsFile::path)
.forEach(statsFileLocations::add);
}

return statsFileLocations;
}
}
28 changes: 11 additions & 17 deletions core/src/main/java/org/apache/iceberg/SetPartitionStatistics.java
Expand Up @@ -19,31 +19,31 @@
package org.apache.iceberg;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;

public class SetPartitionStatistics implements UpdatePartitionStatistics {
private final TableOperations ops;
private final Map<Long, Optional<PartitionStatisticsFile>> partitionStatisticsToSet =
Maps.newHashMap();
private final Set<PartitionStatisticsFile> partitionStatisticsToSet = Sets.newHashSet();
private final Set<Long> partitionStatisticsToRemove = Sets.newHashSet();

public SetPartitionStatistics(TableOperations ops) {
this.ops = ops;
}

@Override
public UpdatePartitionStatistics setPartitionStatistics(
long snapshotId, PartitionStatisticsFile partitionStatisticsFile) {
Preconditions.checkArgument(snapshotId == partitionStatisticsFile.snapshotId());
partitionStatisticsToSet.put(snapshotId, Optional.of(partitionStatisticsFile));
PartitionStatisticsFile partitionStatisticsFile) {
Preconditions.checkArgument(
null != partitionStatisticsFile, "partition statistics file must not be null");
partitionStatisticsToSet.add(partitionStatisticsFile);
return this;
}

@Override
public UpdatePartitionStatistics removePartitionStatistics(long snapshotId) {
partitionStatisticsToSet.put(snapshotId, Optional.empty());
partitionStatisticsToRemove.add(snapshotId);
return this;
}

Expand All @@ -61,14 +61,8 @@ public void commit() {

private TableMetadata internalApply(TableMetadata base) {
TableMetadata.Builder builder = TableMetadata.buildFrom(base);
partitionStatisticsToSet.forEach(
(snapshotId, statistics) -> {
if (statistics.isPresent()) {
builder.setPartitionStatistics(snapshotId, statistics.get());
} else {
builder.removePartitionStatistics(snapshotId);
}
});
partitionStatisticsToSet.forEach(builder::setPartitionStatistics);
partitionStatisticsToRemove.forEach(builder::removePartitionStatistics);
return builder.build();
}
}
32 changes: 17 additions & 15 deletions core/src/main/java/org/apache/iceberg/TableMetadata.java
Expand Up @@ -955,18 +955,25 @@ private Builder(TableMetadata base) {
this.previousFileLocation = base.metadataFileLocation;
this.previousFiles = base.previousFiles;
this.refs = Maps.newHashMap(base.refs);
this.statisticsFiles =
base.statisticsFiles.stream().collect(Collectors.groupingBy(StatisticsFile::snapshotId));
this.partitionStatisticsFiles =
base.partitionStatisticsFiles.stream()
.collect(Collectors.groupingBy(PartitionStatisticsFile::snapshotId));

this.statisticsFiles = statsFileBySnapshotID(base);
this.partitionStatisticsFiles = partitionStatsFileBySnapshotID(base);
this.snapshotsById = Maps.newHashMap(base.snapshotsById);
this.schemasById = Maps.newHashMap(base.schemasById);
this.specsById = Maps.newHashMap(base.specsById);
this.sortOrdersById = Maps.newHashMap(base.sortOrdersById);
}

private static Map<Long, List<StatisticsFile>> statsFileBySnapshotID(TableMetadata base) {
return base.statisticsFiles.stream()
.collect(Collectors.groupingBy(StatisticsFile::snapshotId));
}

private static Map<Long, List<PartitionStatisticsFile>> partitionStatsFileBySnapshotID(
TableMetadata base) {
return base.partitionStatisticsFiles.stream()
.collect(Collectors.groupingBy(PartitionStatisticsFile::snapshotId));
}

public Builder withMetadataLocation(String newMetadataLocation) {
this.metadataLocation = newMetadataLocation;
return this;
Expand Down Expand Up @@ -1300,16 +1307,11 @@ public Builder suppressHistoricalSnapshots() {
return this;
}

public Builder setPartitionStatistics(
long snapshotId, PartitionStatisticsFile partitionStatisticsFile) {
public Builder setPartitionStatistics(PartitionStatisticsFile partitionStatisticsFile) {
Preconditions.checkNotNull(partitionStatisticsFile, "partition statistics file is null");
Preconditions.checkArgument(
snapshotId == partitionStatisticsFile.snapshotId(),
"snapshotId does not match: %s vs %s",
snapshotId,
partitionStatisticsFile.snapshotId());
partitionStatisticsFiles.put(snapshotId, ImmutableList.of(partitionStatisticsFile));
changes.add(new MetadataUpdate.SetPartitionStatistics(snapshotId, partitionStatisticsFile));
partitionStatisticsFiles.put(
partitionStatisticsFile.snapshotId(), ImmutableList.of(partitionStatisticsFile));
changes.add(new MetadataUpdate.SetPartitionStatistics(partitionStatisticsFile));
return this;
}

Expand Down
Expand Up @@ -887,15 +887,14 @@ public void testSetCurrentViewVersionToJson() {
@Test
public void testSetPartitionStatistics() {
String json =
"{\"action\":\"set-partition-statistics\",\"snapshot-id\":1940541653261589030,"
"{\"action\":\"set-partition-statistics\","
+ "\"partition-statistics\":{\"snapshot-id\":1940541653261589030,"
+ "\"statistics-path\":\"s3://bucket/warehouse/stats1.parquet\","
+ "\"file-size-in-bytes\":43}}";

long snapshotId = 1940541653261589030L;
MetadataUpdate expected =
new MetadataUpdate.SetPartitionStatistics(
snapshotId,
ImmutableGenericPartitionStatisticsFile.builder()
.snapshotId(snapshotId)
.path("s3://bucket/warehouse/stats1" + ".parquet")
Expand Down

0 comments on commit 3124544

Please sign in to comment.