Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add last updated timestamp and snapshotId for partition table #7581

Merged
merged 10 commits into from
Jun 23, 2023
80 changes: 58 additions & 22 deletions core/src/main/java/org/apache/iceberg/PartitionsTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,17 @@ public class PartitionsTable extends BaseMetadataTable {
8,
"equality_delete_file_count",
Types.IntegerType.get(),
"Count of equality delete files"));
"Count of equality delete files"),
Types.NestedField.optional(
9,
"last_updated_ms",
Types.TimestampType.withZone(),
"Partition last updated timestamp"),
dramaticlly marked this conversation as resolved.
Show resolved Hide resolved
Types.NestedField.optional(
10,
"last_updated_snapshot_id",
Types.LongType.get(),
"Id of snapshot that last updated this partition"));
this.unpartitionedTable = Partitioning.partitionType(table).fields().isEmpty();
}

Expand All @@ -90,7 +100,9 @@ public Schema schema() {
"position_delete_record_count",
"position_delete_file_count",
"equality_delete_record_count",
"equality_delete_file_count");
"equality_delete_file_count",
"last_updated_ms",
"last_updated_snapshot_id");
}
return schema;
}
Expand All @@ -116,7 +128,9 @@ private DataTask task(StaticTableScan scan) {
root.posDeleteRecordCount,
root.posDeleteFileCount,
root.eqDeleteRecordCount,
root.eqDeleteFileCount));
root.eqDeleteFileCount,
root.lastUpdatedMs,
root.lastUpdatedSnapshotId));
} else {
return StaticDataTask.of(
io().newInputFile(table().operations().current().metadataFileLocation()),
Expand All @@ -136,19 +150,22 @@ private static StaticDataTask.Row convertPartition(Partition partition) {
partition.posDeleteRecordCount,
partition.posDeleteFileCount,
partition.eqDeleteRecordCount,
partition.eqDeleteFileCount);
partition.eqDeleteFileCount,
partition.lastUpdatedMs,
partition.lastUpdatedSnapshotId);
}

private static Iterable<Partition> partitions(Table table, StaticTableScan scan) {
Types.StructType partitionType = Partitioning.partitionType(table);
PartitionMap partitions = new PartitionMap(partitionType);

try (CloseableIterable<ContentFile<?>> files = planFiles(scan)) {
for (ContentFile<?> file : files) {
try (CloseableIterable<ManifestEntry<? extends ContentFile<?>>> entries = planEntries(scan)) {
for (ManifestEntry<? extends ContentFile<?>> entry : entries) {
Snapshot snapshot = table.snapshot(entry.snapshotId());
dramaticlly marked this conversation as resolved.
Show resolved Hide resolved
ContentFile<?> file = entry.file();
dramaticlly marked this conversation as resolved.
Show resolved Hide resolved
StructLike partition =
PartitionUtil.coercePartition(
partitionType, table.specs().get(file.specId()), file.partition());
partitions.get(partition).update(file);
partitions.get(partition).update(file, snapshot);
}
} catch (IOException e) {
throw new UncheckedIOException(e);
Expand All @@ -158,25 +175,32 @@ private static Iterable<Partition> partitions(Table table, StaticTableScan scan)
}

@VisibleForTesting
static CloseableIterable<ContentFile<?>> planFiles(StaticTableScan scan) {
static CloseableIterable<ManifestEntry<?>> planEntries(StaticTableScan scan) {
Table table = scan.table();

CloseableIterable<ManifestFile> filteredManifests =
filteredManifests(scan, table, scan.snapshot().allManifests(table.io()));

Iterable<CloseableIterable<ContentFile<?>>> tasks =
CloseableIterable.transform(
filteredManifests,
manifest ->
CloseableIterable.transform(
ManifestFiles.open(manifest, table.io(), table.specs())
.caseSensitive(scan.isCaseSensitive())
.select(scanColumns(manifest.content())), // don't select stats columns
t -> (ContentFile<?>) t));
Iterable<CloseableIterable<ManifestEntry<?>>> tasks =
CloseableIterable.transform(filteredManifests, manifest -> readEntries(manifest, scan));

return new ParallelIterable<>(tasks, scan.planExecutor());
}

private static CloseableIterable<ManifestEntry<?>> readEntries(
ManifestFile manifest, StaticTableScan scan) {
Table table = scan.table();
return CloseableIterable.transform(
ManifestFiles.open(manifest, table.io(), table.specs())
.caseSensitive(scan.isCaseSensitive())
.select(scanColumns(manifest.content())) // don't select stats columns
.entries(),
t ->
(ManifestEntry<? extends ContentFile<?>>)
// defensive copy of manifest entry without stats columns
t.copyWithoutStats());
}

private static List<String> scanColumns(ManifestContent content) {
switch (content) {
case DATA:
Expand Down Expand Up @@ -249,19 +273,31 @@ static class Partition {
private int posDeleteFileCount;
private long eqDeleteRecordCount;
private int eqDeleteFileCount;
private Long lastUpdatedMs;
private Long lastUpdatedSnapshotId;

Partition(StructLike key, Types.StructType keyType) {
this.partitionData = toPartitionData(key, keyType);
this.specId = 0;
this.dataRecordCount = 0;
this.dataRecordCount = 0L;
this.dataFileCount = 0;
this.posDeleteRecordCount = 0;
this.posDeleteRecordCount = 0L;
this.posDeleteFileCount = 0;
this.eqDeleteRecordCount = 0;
this.eqDeleteRecordCount = 0L;
this.eqDeleteFileCount = 0;
this.lastUpdatedMs = null;
dramaticlly marked this conversation as resolved.
Show resolved Hide resolved
this.lastUpdatedSnapshotId = null;
}

void update(ContentFile<?> file) {
void update(ContentFile<?> file, Snapshot snapshot) {
if (snapshot != null) {
long snapshotCommitTime = snapshot.timestampMillis() * 1000;
if (this.lastUpdatedMs == null || snapshotCommitTime > this.lastUpdatedMs) {
this.lastUpdatedMs = snapshotCommitTime;
this.lastUpdatedSnapshotId = snapshot.snapshotId();
}
}

switch (file.content()) {
case DATA:
this.dataRecordCount += file.recordCount();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,18 +81,20 @@ protected void validateTaskScanResiduals(TableScan scan, boolean ignoreResiduals
}

protected void validateSingleFieldPartition(
CloseableIterable<ContentFile<?>> files, int partitionValue) {
CloseableIterable<ManifestEntry<?>> files, int partitionValue) {
validatePartition(files, 0, partitionValue);
}

protected void validatePartition(
CloseableIterable<ContentFile<?>> files, int position, int partitionValue) {
CloseableIterable<ManifestEntry<? extends ContentFile<?>>> entries,
int position,
int partitionValue) {
Assert.assertTrue(
"File scan tasks do not include correct file",
StreamSupport.stream(files.spliterator(), false)
StreamSupport.stream(entries.spliterator(), false)
.anyMatch(
file -> {
StructLike partition = file.partition();
entry -> {
StructLike partition = entry.file().partition();
if (position >= partition.size()) {
return false;
}
Expand Down