Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion core/src/main/java/org/apache/iceberg/TrackedFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ interface TrackedFile {
Types.NestedField.optional(
141, "spec_id", Types.IntegerType.get(), "Spec ID used to partition the file");

int PARTITION_ID = 102;
String PARTITION_NAME = "partition";
String PARTITION_DOC = "Partition data tuple, schema based on the partition spec";

int CONTENT_STATS_ID = 146;
String CONTENT_STATS_NAME = "content_stats";
String CONTENT_STATS_DOC = "Content statistics for this entry";
Expand Down Expand Up @@ -88,7 +92,8 @@ interface TrackedFile {
Types.ListType.ofRequired(136, Types.IntegerType.get()),
"Field ids used to determine row equality in equality delete files");

static Types.StructType schemaWithContentStats(Types.StructType contentStatsType) {
static Types.StructType schemaWithContentStats(
Types.StructType partitionType, Types.StructType contentStatsType) {
return Types.StructType.of(
TRACKING,
CONTENT_TYPE,
Expand All @@ -97,6 +102,7 @@ static Types.StructType schemaWithContentStats(Types.StructType contentStatsType
RECORD_COUNT,
FILE_SIZE_IN_BYTES,
SPEC_ID,
Types.NestedField.required(PARTITION_ID, PARTITION_NAME, partitionType, PARTITION_DOC),
Types.NestedField.optional(
CONTENT_STATS_ID, CONTENT_STATS_NAME, contentStatsType, CONTENT_STATS_DOC),
SORT_ORDER_ID,
Expand Down Expand Up @@ -128,6 +134,9 @@ static Types.StructType schemaWithContentStats(Types.StructType contentStatsType
/** Returns the ID of the partition spec used to partition this file, or null. */
Integer specId();

/** Returns partition for this file as a {@link StructLike}. */
StructLike partition();

/** Returns the content stats for this entry. */
ContentStats contentStats();

Expand Down
61 changes: 47 additions & 14 deletions core/src/main/java/org/apache/iceberg/TrackedFileStruct.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,21 @@
import java.util.Set;
import org.apache.iceberg.avro.SupportsIndexProjection;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.ArrayUtil;
import org.apache.iceberg.util.ByteBuffers;

/** Mutable {@link StructLike} implementation of {@link TrackedFile}. */
class TrackedFileStruct extends SupportsIndexProjection implements TrackedFile, Serializable {
private static final Types.StructType EMPTY_STRUCT_TYPE = Types.StructType.of();
private static final PartitionData EMPTY_PARTITION_DATA =
new PartitionData(EMPTY_STRUCT_TYPE) {
@Override
public PartitionData copy() {
return this; // this does not change
}
};

private static final Types.StructType BASE_TYPE =
Types.StructType.of(
Expand All @@ -42,6 +50,11 @@ class TrackedFileStruct extends SupportsIndexProjection implements TrackedFile,
TrackedFile.RECORD_COUNT,
TrackedFile.FILE_SIZE_IN_BYTES,
TrackedFile.SPEC_ID,
Types.NestedField.required(
TrackedFile.PARTITION_ID,
TrackedFile.PARTITION_NAME,
EMPTY_STRUCT_TYPE,
TrackedFile.PARTITION_DOC),
Types.NestedField.optional(
TrackedFile.CONTENT_STATS_ID,
TrackedFile.CONTENT_STATS_NAME,
Expand All @@ -60,6 +73,7 @@ class TrackedFileStruct extends SupportsIndexProjection implements TrackedFile,
private long recordCount = -1L;
private long fileSizeInBytes = -1L;
private Integer specId = null;
private PartitionData partitionData = EMPTY_PARTITION_DATA;

// optional fields
private Tracking tracking = null;
Expand All @@ -74,6 +88,11 @@ class TrackedFileStruct extends SupportsIndexProjection implements TrackedFile,
/** Used by internal readers to instantiate this class with a projection schema. */
TrackedFileStruct(Types.StructType projection) {
super(BASE_TYPE, projection);
// partition type may be null if the field was not projected
Type partType = projection.fieldType("partition");
if (partType != null) {
this.partitionData = new PartitionData(partType.asNestedType().asStructType());
}
}

/** No-projection constructor for direct construction. */
Expand All @@ -87,6 +106,7 @@ class TrackedFileStruct extends SupportsIndexProjection implements TrackedFile,
FileContent contentType,
String location,
FileFormat fileFormat,
PartitionData partition,
long recordCount,
long fileSizeInBytes) {
super(BASE_TYPE.fields().size());
Expand All @@ -96,6 +116,9 @@ class TrackedFileStruct extends SupportsIndexProjection implements TrackedFile,
this.fileFormat = fileFormat;
this.recordCount = recordCount;
this.fileSizeInBytes = fileSizeInBytes;
if (partition != null) {
this.partitionData = partition;
}
}

/** Copy constructor. */
Expand All @@ -107,9 +130,8 @@ private TrackedFileStruct(TrackedFileStruct toCopy, boolean withStats, Set<Integ
this.recordCount = toCopy.recordCount;
this.fileSizeInBytes = toCopy.fileSizeInBytes;
this.specId = toCopy.specId;

this.partitionData = toCopy.partitionData.copy();
this.tracking = toCopy.tracking != null ? toCopy.tracking.copy() : null;

this.sortOrderId = toCopy.sortOrderId;
this.deletionVector = toCopy.deletionVector != null ? toCopy.deletionVector.copy() : null;

Expand Down Expand Up @@ -170,6 +192,11 @@ public Integer specId() {
return specId;
}

@Override
public StructLike partition() {
return partitionData;
}

@Override
public ContentStats contentStats() {
return contentStats;
Expand Down Expand Up @@ -237,18 +264,20 @@ private Object getByPos(int pos) {
case 6:
return specId;
case 7:
return contentStats;
return partitionData;
case 8:
return sortOrderId;
return contentStats;
case 9:
return deletionVector;
return sortOrderId;
case 10:
return manifestInfo;
return deletionVector;
case 11:
return keyMetadata();
return manifestInfo;
case 12:
return splitOffsets();
return keyMetadata();
case 13:
return splitOffsets();
case 14:
return equalityIds();
default:
throw new UnsupportedOperationException("Unknown field ordinal: " + pos);
Expand Down Expand Up @@ -281,24 +310,27 @@ protected <T> void internalSet(int pos, T value) {
this.specId = (Integer) value;
break;
case 7:
this.contentStats = (ContentStats) value;
this.partitionData = (PartitionData) value;
break;
case 8:
this.sortOrderId = (Integer) value;
this.contentStats = (ContentStats) value;
break;
case 9:
this.deletionVector = (DeletionVector) value;
this.sortOrderId = (Integer) value;
break;
case 10:
this.manifestInfo = (ManifestInfo) value;
this.deletionVector = (DeletionVector) value;
break;
case 11:
this.keyMetadata = ByteBuffers.toByteArray((ByteBuffer) value);
this.manifestInfo = (ManifestInfo) value;
break;
case 12:
this.splitOffsets = ArrayUtil.toLongArray((List<Long>) value);
this.keyMetadata = ByteBuffers.toByteArray((ByteBuffer) value);
break;
case 13:
this.splitOffsets = ArrayUtil.toLongArray((List<Long>) value);
break;
case 14:
this.equalityIds = ArrayUtil.toIntArray((List<Integer>) value);
break;
default:
Expand All @@ -315,6 +347,7 @@ public String toString() {
.add("record_count", recordCount)
.add("file_size_in_bytes", fileSizeInBytes)
.add("spec_id", specId())
.add("partition", partitionData)
.add("tracking", tracking)
.add("content_stats", contentStats)
.add("sort_order_id", sortOrderId)
Expand Down
28 changes: 22 additions & 6 deletions core/src/test/java/org/apache/iceberg/TestTrackedFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,12 @@ public class TestTrackedFile {
private static final Types.StructType CONTENT_STATS_TYPE =
StatsUtil.contentStatsFor(TABLE_SCHEMA).type().asStructType();

private static final Types.StructType PARTITION_TYPE =
PartitionSpec.builderFor(TABLE_SCHEMA).identity("id").build().partitionType();

@Test
public void schemaWithContentStatsFieldOrder() {
Types.StructType type = TrackedFile.schemaWithContentStats(CONTENT_STATS_TYPE);
Types.StructType type = TrackedFile.schemaWithContentStats(PARTITION_TYPE, CONTENT_STATS_TYPE);
List<Types.NestedField> fields = type.fields();

assertThat(fields)
Expand All @@ -49,6 +52,7 @@ public void schemaWithContentStatsFieldOrder() {
"record_count",
"file_size_in_bytes",
"spec_id",
"partition",
"content_stats",
"sort_order_id",
"deletion_vector",
Expand All @@ -60,20 +64,22 @@ public void schemaWithContentStatsFieldOrder() {

@Test
public void schemaWithContentStatsFieldIds() {
Types.StructType type = TrackedFile.schemaWithContentStats(CONTENT_STATS_TYPE);
Types.StructType type = TrackedFile.schemaWithContentStats(PARTITION_TYPE, CONTENT_STATS_TYPE);
List<Types.NestedField> fields = type.fields();

assertThat(fields)
.extracting(Types.NestedField::fieldId)
.containsExactly(147, 134, 100, 101, 103, 104, 141, 146, 140, 148, 150, 131, 132, 135);
.containsExactly(147, 134, 100, 101, 103, 104, 141, 102, 146, 140, 148, 150, 131, 132, 135);
}

@Test
public void schemaWithContentStatsUsesProvidedType() {
Types.StructType type = TrackedFile.schemaWithContentStats(CONTENT_STATS_TYPE);
Types.StructType type = TrackedFile.schemaWithContentStats(PARTITION_TYPE, CONTENT_STATS_TYPE);
Types.NestedField contentStatsField = type.field(TrackedFile.CONTENT_STATS_ID);
Types.NestedField partitionField = type.field(TrackedFile.PARTITION_ID);

assertThat(contentStatsField.type().asStructType()).isEqualTo(CONTENT_STATS_TYPE);
assertThat(partitionField.type().asStructType()).isEqualTo(PARTITION_TYPE);
}

@Test
Expand All @@ -88,8 +94,8 @@ public void schemaWithContentStatsReflectsInput() {
Types.StructType smallStats = StatsUtil.contentStatsFor(smallSchema).type().asStructType();
Types.StructType largeStats = StatsUtil.contentStatsFor(largeSchema).type().asStructType();

Types.StructType smallType = TrackedFile.schemaWithContentStats(smallStats);
Types.StructType largeType = TrackedFile.schemaWithContentStats(largeStats);
Types.StructType smallType = TrackedFile.schemaWithContentStats(PARTITION_TYPE, smallStats);
Types.StructType largeType = TrackedFile.schemaWithContentStats(PARTITION_TYPE, largeStats);

Types.StructType smallResult =
smallType.field(TrackedFile.CONTENT_STATS_ID).type().asStructType();
Expand All @@ -99,4 +105,14 @@ public void schemaWithContentStatsReflectsInput() {
assertThat(smallResult.fields()).hasSize(1);
assertThat(largeResult.fields()).hasSize(3);
}

@Test
public void schemaWithContentStatsPartitionIsRequired() {
Types.StructType type = TrackedFile.schemaWithContentStats(PARTITION_TYPE, CONTENT_STATS_TYPE);
Types.NestedField partitionField = type.field(TrackedFile.PARTITION_ID);

assertThat(partitionField.isRequired()).isTrue();
assertThat(partitionField.name()).isEqualTo(TrackedFile.PARTITION_NAME);
assertThat(partitionField.doc()).isEqualTo(TrackedFile.PARTITION_DOC);
}
}
Loading