Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add last updated timestamp and snapshotId for partition table #7581

Merged
merged 10 commits into from
Jun 23, 2023
69 changes: 51 additions & 18 deletions core/src/main/java/org/apache/iceberg/PartitionsTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,16 @@ public class PartitionsTable extends BaseMetadataTable {
new Schema(
Types.NestedField.required(1, "partition", Partitioning.partitionType(table)),
Types.NestedField.required(4, "spec_id", Types.IntegerType.get()),
Types.NestedField.required(
dramaticlly marked this conversation as resolved.
Show resolved Hide resolved
9,
"last_updated",
dramaticlly marked this conversation as resolved.
Show resolved Hide resolved
Types.TimestampType.withZone(),
"Partition last updated timestamp"),
Types.NestedField.required(
10,
"last_updated_snapshot_id",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a) Is it a good idea to keep the snapshot id? Because regularly running expire_snapshots can clean up the snapshots and we may not be able to map what operation these files were created from, even with the snapshot id.

b) There was also an ask for "latest sequence number" associated with that partition from the community users during partition stats discussion.

Do you think modified time is enough and no need for the sequence number?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also cc: @szehon-ho

Copy link
Collaborator

@szehon-ho szehon-ho May 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, what do we think to have both? (and null if its expired, as per discussion here #7581 (comment)). If we have snapshot_id, I feel its more useful than the last_update_time, but agree we don't always have it.

I think sequence number will be good too, but do you mean fileSequenceNumber or dataSequenceNumber? Maybe worth another pr if there's more discussion there?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am just worried that most of the snapshots will be expired and we end up not using that field much.
The main purpose of storing the snapshot id is for finding what operation has last updated this partition id? In that case, we can store the operation type itself directly maybe.

I think sequence number will be good too, but do you mean fileSequenceNumber or dataSequenceNumber? Maybe worth another pr if there's more discussion there?

I guess it is fileSequenceNumber.

Yeah, we can have a separate discussion. I think data_file_size_in_bytes per partition can also be one more good candidate for storing here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a) Is it a good idea to keep the snapshot id? Because regularly running expire_snapshots can clean up the snapshots and we may not be able to map what operation these files were created from, even with the snapshot id.

b) There was also an ask for "latest sequence number" associated with that partition from the community users during partition stats discussion.

Do you think modified time is enough and no need for the sequence number?

My initial thought process is like the last updated timestamp is helpful by itself but if there's doubt around the timestamp, it's better to provide a reference to allow for further investigation. Here we derived last updated timestamp from snapshot, so providing snapshotId enable a way to look up further information about snapshot (if it's a rewrite data operation or is it an append from late arrival data).

With respect to the periodic snapshot expiration, I think partition can have null snapshot based on referenced snapshotId if it was already expired, but it seems only applicable to your data outlive your snapshot. i.e if you run data compaction along side your snapshot expiration, or if you also periodically delete your partition (like if it's daily partitioned and your dataset have a retention period) together with your snapshot expiration, it seem to be fine.

As for file sequence number, I think it might be helpful but by itself it seem to be hard to use compare to timestamp and snapshotId.

Copy link
Collaborator

@szehon-ho szehon-ho May 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@RussellSpitzer @aokolnychyi @flyrain any thoughts here, what would make more sense on Partition table?

Copy link
Collaborator

@szehon-ho szehon-ho May 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After some thoughts, I think both will be null if the snapshot is expired. But I think we are no worse off than the alternative, which is to join the entries + snapshots table for the user to find the last update time. This also will be null if snapshot is expired.

Perhaps we can have some more persistent storage of snapshot metadata , even after expire?

I think for this case (lastUpdateTime, lastSnapshotId), it is ok to proceed, typically snapshots live several days at least, and I would imagine the user of the tool is interested in the last updated partition, and can run it before the snapshot is expired.

Types.LongType.get(),
"Partition last updated snapshot id"),
dramaticlly marked this conversation as resolved.
Show resolved Hide resolved
Types.NestedField.required(
2, "record_count", Types.LongType.get(), "Count of records in data files"),
Types.NestedField.required(
Expand Down Expand Up @@ -85,6 +95,8 @@ public TableScan newScan() {
public Schema schema() {
if (unpartitionedTable) {
return schema.select(
"last_updated",
"last_updated_snapshot_id",
"record_count",
"file_count",
"position_delete_record_count",
Expand All @@ -111,6 +123,8 @@ private DataTask task(StaticTableScan scan) {
partitions,
root ->
StaticDataTask.Row.of(
root.lastUpdatedAt,
root.lastUpdatedSnapshotId,
root.dataRecordCount,
root.dataFileCount,
root.posDeleteRecordCount,
Expand All @@ -131,6 +145,8 @@ private static StaticDataTask.Row convertPartition(Partition partition) {
return StaticDataTask.Row.of(
partition.partitionData,
partition.specId,
partition.lastUpdatedAt,
partition.lastUpdatedSnapshotId,
partition.dataRecordCount,
partition.dataFileCount,
partition.posDeleteRecordCount,
Expand All @@ -142,13 +158,14 @@ private static StaticDataTask.Row convertPartition(Partition partition) {
private static Iterable<Partition> partitions(Table table, StaticTableScan scan) {
Types.StructType partitionType = Partitioning.partitionType(table);
PartitionMap partitions = new PartitionMap(partitionType);

try (CloseableIterable<ContentFile<?>> files = planFiles(scan)) {
for (ContentFile<?> file : files) {
try (CloseableIterable<ManifestEntry<? extends ContentFile<?>>> entries = planEntries(scan)) {
for (ManifestEntry<? extends ContentFile<?>> entry : entries) {
Snapshot snapshot = table.snapshot(entry.snapshotId());
dramaticlly marked this conversation as resolved.
Show resolved Hide resolved
ContentFile<?> file = entry.file();
dramaticlly marked this conversation as resolved.
Show resolved Hide resolved
StructLike partition =
PartitionUtil.coercePartition(
partitionType, table.specs().get(file.specId()), file.partition());
partitions.get(partition).update(file);
partitions.get(partition).update(file, snapshot);
}
} catch (IOException e) {
throw new UncheckedIOException(e);
Expand All @@ -158,25 +175,32 @@ private static Iterable<Partition> partitions(Table table, StaticTableScan scan)
}

@VisibleForTesting
static CloseableIterable<ContentFile<?>> planFiles(StaticTableScan scan) {
static CloseableIterable<ManifestEntry<?>> planEntries(StaticTableScan scan) {
Table table = scan.table();

CloseableIterable<ManifestFile> filteredManifests =
filteredManifests(scan, table, scan.snapshot().allManifests(table.io()));

Iterable<CloseableIterable<ContentFile<?>>> tasks =
CloseableIterable.transform(
filteredManifests,
manifest ->
CloseableIterable.transform(
ManifestFiles.open(manifest, table.io(), table.specs())
.caseSensitive(scan.isCaseSensitive())
.select(scanColumns(manifest.content())), // don't select stats columns
t -> (ContentFile<?>) t));
Iterable<CloseableIterable<ManifestEntry<?>>> tasks =
CloseableIterable.transform(filteredManifests, manifest -> readEntries(manifest, scan));

return new ParallelIterable<>(tasks, scan.planExecutor());
}

private static CloseableIterable<ManifestEntry<?>> readEntries(
ManifestFile manifest, StaticTableScan scan) {
Table table = scan.table();
return CloseableIterable.transform(
ManifestFiles.open(manifest, table.io(), table.specs())
.caseSensitive(scan.isCaseSensitive())
.select(scanColumns(manifest.content())) // don't select stats columns
.entries(),
t ->
(ManifestEntry<? extends ContentFile<?>>)
// defensive copy of manifest entry without stats columns
t.copyWithoutStats());
}

private static List<String> scanColumns(ManifestContent content) {
switch (content) {
case DATA:
Expand Down Expand Up @@ -249,19 +273,28 @@ static class Partition {
private int posDeleteFileCount;
private long eqDeleteRecordCount;
private int eqDeleteFileCount;
private long lastUpdatedAt;
private long lastUpdatedSnapshotId;

Partition(StructLike key, Types.StructType keyType) {
this.partitionData = toPartitionData(key, keyType);
this.specId = 0;
this.dataRecordCount = 0;
this.dataRecordCount = 0L;
this.dataFileCount = 0;
this.posDeleteRecordCount = 0;
this.posDeleteRecordCount = 0L;
this.posDeleteFileCount = 0;
this.eqDeleteRecordCount = 0;
this.eqDeleteRecordCount = 0L;
this.eqDeleteFileCount = 0;
this.lastUpdatedAt = 0L;
this.lastUpdatedSnapshotId = 0L;
}

void update(ContentFile<?> file) {
void update(ContentFile<?> file, Snapshot snapshot) {
long snapshotCommitTime = snapshot == null ? 0 : snapshot.timestampMillis() * 1000;
if (snapshotCommitTime > this.lastUpdatedAt) {
this.lastUpdatedAt = snapshotCommitTime;
this.lastUpdatedSnapshotId = snapshot.snapshotId();
}
switch (file.content()) {
case DATA:
this.dataRecordCount += file.recordCount();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,18 +81,20 @@ protected void validateTaskScanResiduals(TableScan scan, boolean ignoreResiduals
}

protected void validateSingleFieldPartition(
CloseableIterable<ContentFile<?>> files, int partitionValue) {
CloseableIterable<ManifestEntry<? extends ContentFile<?>>> files, int partitionValue) {
dramaticlly marked this conversation as resolved.
Show resolved Hide resolved
validatePartition(files, 0, partitionValue);
}

protected void validatePartition(
CloseableIterable<ContentFile<?>> files, int position, int partitionValue) {
CloseableIterable<ManifestEntry<? extends ContentFile<?>>> entries,
int position,
int partitionValue) {
Assert.assertTrue(
"File scan tasks do not include correct file",
StreamSupport.stream(files.spliterator(), false)
StreamSupport.stream(entries.spliterator(), false)
.anyMatch(
file -> {
StructLike partition = file.partition();
entry -> {
StructLike partition = entry.file().partition();
if (position >= partition.size()) {
return false;
}
Expand Down