Skip to content

Commit

Permalink
Api: Track partition statistics via TableMetadata
Browse files Browse the repository at this point in the history
Tracking `PartitionStatisticsFile` in a same way as how `StatisticsFile` is already tracked.
  • Loading branch information
ajantha-bhat committed Nov 3, 2023
1 parent 52e69fb commit a9741f9
Show file tree
Hide file tree
Showing 27 changed files with 1,091 additions and 10 deletions.
3 changes: 3 additions & 0 deletions .palantir/revapi.yml
Expand Up @@ -781,6 +781,9 @@ acceptedBreaks:
- code: "java.class.removed"
old: "interface org.apache.iceberg.view.SQLViewRepresentation"
justification: "Moving from iceberg-api to iceberg-core"
- code: "java.method.addedToInterface"
new: "method java.util.List<org.apache.iceberg.PartitionStatisticsFile> org.apache.iceberg.Table::partitionStatisticsFiles()"
justification: "Track partition stats from TableMetadata"
- code: "java.method.addedToInterface"
new: "method org.apache.iceberg.catalog.Namespace org.apache.iceberg.view.ViewVersion::defaultNamespace()"
justification: "Acceptable break due to updating View APIs and the View Spec"
Expand Down
41 changes: 41 additions & 0 deletions api/src/main/java/org/apache/iceberg/PartitionStatisticsFile.java
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg;

import java.io.Serializable;

/**
* Represents a partition statistics file in the table default format, that can be used to read
* table data more efficiently.
*
* <p>Statistics are informational. A reader can choose to ignore statistics information. Statistics
* support is not required to read the table correctly.
*/
public interface PartitionStatisticsFile extends Serializable {
/** ID of the Iceberg table's snapshot the partition statistics file is associated with. */
long snapshotId();

/**
* Returns fully qualified path to the file, suitable for constructing a Hadoop Path. Never null.
*/
String path();

/** Returns the size of the partition statistics file in bytes. */
long fileSizeInBytes();
}
14 changes: 14 additions & 0 deletions api/src/main/java/org/apache/iceberg/Table.java
Expand Up @@ -286,6 +286,17 @@ default UpdateStatistics updateStatistics() {
"Updating statistics is not supported by " + getClass().getName());
}

/**
* Create a new {@link UpdatePartitionStatistics update partition statistics API} to add or remove
* partition statistics files in this table.
*
* @return a new {@link UpdatePartitionStatistics}
*/
default UpdatePartitionStatistics updatePartitionStatistics() {
throw new UnsupportedOperationException(
"Updating partition statistics is not supported by " + getClass().getName());
}

/**
* Create a new {@link ExpireSnapshots expire API} to manage snapshots in this table and commit.
*
Expand Down Expand Up @@ -327,6 +338,9 @@ default UpdateStatistics updateStatistics() {
*/
List<StatisticsFile> statisticsFiles();

/** Returns the current partition statistics files for the table. */
List<PartitionStatisticsFile> partitionStatisticsFiles();

/**
* Returns the current refs for the table
*
Expand Down
11 changes: 11 additions & 0 deletions api/src/main/java/org/apache/iceberg/Transaction.java
Expand Up @@ -148,6 +148,17 @@ default UpdateStatistics updateStatistics() {
"Updating statistics is not supported by " + getClass().getName());
}

/**
* Create a new {@link UpdatePartitionStatistics update partition statistics API} to add or remove
* partition statistics files in this table.
*
* @return a new {@link UpdatePartitionStatistics}
*/
default UpdatePartitionStatistics updatePartitionStatistics() {
throw new UnsupportedOperationException(
"Updating partition statistics is not supported by " + getClass().getName());
}

/**
* Create a new {@link ExpireSnapshots expire API} to manage snapshots in this table.
*
Expand Down
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg;

import java.util.List;

/** API for updating partition statistics files in a table. */
public interface UpdatePartitionStatistics extends PendingUpdate<List<PartitionStatisticsFile>> {
/**
* Set the table's partition statistics file for given snapshot, replacing the previous partition
* statistics file for the snapshot if any exists.
*
* @return this for method chaining
*/
UpdatePartitionStatistics setPartitionStatistics(
long snapshotId, PartitionStatisticsFile partitionStatisticsFile);

/**
* Remove the table's partition statistics file for given snapshot.
*
* @return this for method chaining
*/
UpdatePartitionStatistics removePartitionStatistics(long snapshotId);
}
5 changes: 5 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseMetadataTable.java
Expand Up @@ -195,6 +195,11 @@ public List<StatisticsFile> statisticsFiles() {
return ImmutableList.of();
}

@Override
public List<PartitionStatisticsFile> partitionStatisticsFiles() {
return ImmutableList.of();
}

@Override
public Map<String, SnapshotRef> refs() {
return table().refs();
Expand Down
6 changes: 6 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseReadOnlyTable.java
Expand Up @@ -100,6 +100,12 @@ public UpdateStatistics updateStatistics() {
"Cannot update statistics of a " + descriptor + " table");
}

@Override
public UpdatePartitionStatistics updatePartitionStatistics() {
throw new UnsupportedOperationException(
"Cannot update partition statistics of a " + descriptor + " table");
}

@Override
public ExpireSnapshots expireSnapshots() {
throw new UnsupportedOperationException(
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseTable.java
Expand Up @@ -220,6 +220,11 @@ public UpdateStatistics updateStatistics() {
return new SetStatistics(ops);
}

@Override
public UpdatePartitionStatistics updatePartitionStatistics() {
return new SetPartitionStatistics(ops);
}

@Override
public ExpireSnapshots expireSnapshots() {
return new RemoveSnapshots(ops);
Expand Down Expand Up @@ -255,6 +260,11 @@ public List<StatisticsFile> statisticsFiles() {
return ops.current().statisticsFiles();
}

@Override
public List<PartitionStatisticsFile> partitionStatisticsFiles() {
return ops.current().partitionStatisticsFiles();
}

@Override
public Map<String, SnapshotRef> refs() {
return ops.current().refs();
Expand Down
19 changes: 19 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseTransaction.java
Expand Up @@ -247,6 +247,15 @@ public UpdateStatistics updateStatistics() {
return updateStatistics;
}

@Override
public UpdatePartitionStatistics updatePartitionStatistics() {
checkLastOperationCommitted("UpdatePartitionStatistics");
UpdatePartitionStatistics updatePartitionStatistics =
new SetPartitionStatistics(transactionOps);
updates.add(updatePartitionStatistics);
return updatePartitionStatistics;
}

@Override
public ExpireSnapshots expireSnapshots() {
checkLastOperationCommitted("ExpireSnapshots");
Expand Down Expand Up @@ -730,6 +739,11 @@ public UpdateStatistics updateStatistics() {
return BaseTransaction.this.updateStatistics();
}

@Override
public UpdatePartitionStatistics updatePartitionStatistics() {
return BaseTransaction.this.updatePartitionStatistics();
}

@Override
public ExpireSnapshots expireSnapshots() {
return BaseTransaction.this.expireSnapshots();
Expand Down Expand Up @@ -766,6 +780,11 @@ public List<StatisticsFile> statisticsFiles() {
return current.statisticsFiles();
}

@Override
public List<PartitionStatisticsFile> partitionStatisticsFiles() {
return current.partitionStatisticsFiles();
}

@Override
public Map<String, SnapshotRef> refs() {
return current.refs();
Expand Down
Expand Up @@ -106,6 +106,11 @@ public UpdateStatistics updateStatistics() {
return wrapped.updateStatistics();
}

@Override
public UpdatePartitionStatistics updatePartitionStatistics() {
return wrapped.updatePartitionStatistics();
}

@Override
public ExpireSnapshots expireSnapshots() {
return wrapped.expireSnapshots();
Expand Down
14 changes: 9 additions & 5 deletions core/src/main/java/org/apache/iceberg/FileCleanupStrategy.java
Expand Up @@ -21,7 +21,6 @@
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.io.CloseableIterable;
Expand Down Expand Up @@ -98,10 +97,15 @@ private Set<String> statsFileLocations(TableMetadata tableMetadata) {
Set<String> statsFileLocations = Sets.newHashSet();

if (tableMetadata.statisticsFiles() != null) {
statsFileLocations =
tableMetadata.statisticsFiles().stream()
.map(StatisticsFile::path)
.collect(Collectors.toSet());
tableMetadata.statisticsFiles().stream()
.map(StatisticsFile::path)
.forEach(statsFileLocations::add);
}

if (tableMetadata.partitionStatisticsFiles() != null) {
tableMetadata.partitionStatisticsFiles().stream()
.map(PartitionStatisticsFile::path)
.forEach(statsFileLocations::add);
}

return statsFileLocations;
Expand Down
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg;

import org.immutables.value.Value;

@Value.Immutable
public interface GenericPartitionStatisticsFile extends PartitionStatisticsFile {}
Expand Up @@ -262,7 +262,8 @@ public void cleanFiles(TableMetadata beforeExpiration, TableMetadata afterExpira
deleteFiles(manifestsToDelete, "manifest");
deleteFiles(manifestListsToDelete, "manifest list");

if (!beforeExpiration.statisticsFiles().isEmpty()) {
if (!beforeExpiration.statisticsFiles().isEmpty()
|| !beforeExpiration.partitionStatisticsFiles().isEmpty()) {
Set<String> expiredStatisticsFilesLocations =
expiredStatisticsFilesLocations(beforeExpiration, afterExpiration);
deleteFiles(expiredStatisticsFilesLocations, "statistics files");
Expand Down
41 changes: 41 additions & 0 deletions core/src/main/java/org/apache/iceberg/MetadataUpdate.java
Expand Up @@ -243,6 +243,47 @@ public void applyTo(TableMetadata.Builder metadataBuilder) {
}
}

class SetPartitionStatistics implements MetadataUpdate {
private final long snapshotId;
private final PartitionStatisticsFile partitionStatisticsFile;

public SetPartitionStatistics(
long snapshotId, PartitionStatisticsFile partitionStatisticsFile) {
this.snapshotId = snapshotId;
this.partitionStatisticsFile = partitionStatisticsFile;
}

public long snapshotId() {
return snapshotId;
}

public PartitionStatisticsFile partitionStatisticsFile() {
return partitionStatisticsFile;
}

@Override
public void applyTo(TableMetadata.Builder metadataBuilder) {
metadataBuilder.setPartitionStatistics(snapshotId, partitionStatisticsFile);
}
}

class RemovePartitionStatistics implements MetadataUpdate {
private final long snapshotId;

public RemovePartitionStatistics(long snapshotId) {
this.snapshotId = snapshotId;
}

public long snapshotId() {
return snapshotId;
}

@Override
public void applyTo(TableMetadata.Builder metadataBuilder) {
metadataBuilder.removePartitionStatistics(snapshotId);
}
}

class AddSnapshot implements MetadataUpdate {
private final Snapshot snapshot;

Expand Down

0 comments on commit a9741f9

Please sign in to comment.