-
Notifications
You must be signed in to change notification settings - Fork 2.8k
Core: Fix filed ids of partition stats file #13329
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
record.get(PARTITION_FIELD_ID, StructLike.class), | ||
record.get(SPEC_ID.fieldId(), Integer.class)); | ||
stats.set(DATA_RECORD_COUNT.fieldId(), record.get(DATA_RECORD_COUNT.fieldId(), Long.class)); | ||
stats.set(DATA_FILE_COUNT.fieldId(), record.get(DATA_FILE_COUNT.fieldId(), Integer.class)); | ||
record.get(PARTITION_POSITION, StructLike.class), | ||
record.get(SPEC_ID_POSITION, Integer.class)); | ||
stats.set(DATA_RECORD_COUNT_POSITION, record.get(DATA_RECORD_COUNT_POSITION, Long.class)); | ||
stats.set(DATA_FILE_COUNT_POSITION, record.get(DATA_FILE_COUNT_POSITION, Integer.class)); | ||
stats.set( | ||
TOTAL_DATA_FILE_SIZE_IN_BYTES.fieldId(), | ||
record.get(TOTAL_DATA_FILE_SIZE_IN_BYTES.fieldId(), Long.class)); | ||
TOTAL_DATA_FILE_SIZE_IN_BYTES_POSITION, | ||
record.get(TOTAL_DATA_FILE_SIZE_IN_BYTES_POSITION, Long.class)); | ||
stats.set( | ||
POSITION_DELETE_RECORD_COUNT.fieldId(), | ||
record.get(POSITION_DELETE_RECORD_COUNT.fieldId(), Long.class)); | ||
POSITION_DELETE_RECORD_COUNT_POSITION, | ||
record.get(POSITION_DELETE_RECORD_COUNT_POSITION, Long.class)); | ||
stats.set( | ||
POSITION_DELETE_FILE_COUNT.fieldId(), | ||
record.get(POSITION_DELETE_FILE_COUNT.fieldId(), Integer.class)); | ||
POSITION_DELETE_FILE_COUNT_POSITION, | ||
record.get(POSITION_DELETE_FILE_COUNT_POSITION, Integer.class)); | ||
stats.set( | ||
EQUALITY_DELETE_RECORD_COUNT.fieldId(), | ||
record.get(EQUALITY_DELETE_RECORD_COUNT.fieldId(), Long.class)); | ||
EQUALITY_DELETE_RECORD_COUNT_POSITION, | ||
record.get(EQUALITY_DELETE_RECORD_COUNT_POSITION, Long.class)); | ||
stats.set( | ||
EQUALITY_DELETE_FILE_COUNT.fieldId(), | ||
record.get(EQUALITY_DELETE_FILE_COUNT.fieldId(), Integer.class)); | ||
stats.set(TOTAL_RECORD_COUNT.fieldId(), record.get(TOTAL_RECORD_COUNT.fieldId(), Long.class)); | ||
stats.set(LAST_UPDATED_AT.fieldId(), record.get(LAST_UPDATED_AT.fieldId(), Long.class)); | ||
EQUALITY_DELETE_FILE_COUNT_POSITION, | ||
record.get(EQUALITY_DELETE_FILE_COUNT_POSITION, Integer.class)); | ||
stats.set(TOTAL_RECORD_COUNT_POSITION, record.get(TOTAL_RECORD_COUNT_POSITION, Long.class)); | ||
stats.set(LAST_UPDATED_AT_POSITION, record.get(LAST_UPDATED_AT_POSITION, Long.class)); | ||
stats.set( | ||
LAST_UPDATED_SNAPSHOT_ID.fieldId(), | ||
record.get(LAST_UPDATED_SNAPSHOT_ID.fieldId(), Long.class)); | ||
LAST_UPDATED_SNAPSHOT_ID_POSITION, | ||
record.get(LAST_UPDATED_SNAPSHOT_ID_POSITION, Long.class)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we just do this?
PartitionStats stats =
new PartitionStats(
record.get(PARTITION_POSITION, StructLike.class),
record.get(SPEC_ID_POSITION, Integer.class));
for(int i = 0; i<record.size(); ++i) {
stats.set(i, record.get(i, Object.class));
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since, both are same datatype, it should be possible. Let me check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i
should start from 2
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, for sure. But I got what he mean and updated locally.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SPEC_ID_POSITION + 1
static final int PARTITION_POSITION = 0; | ||
static final int SPEC_ID_POSITION = 1; | ||
static final int DATA_RECORD_COUNT_POSITION = 2; | ||
static final int DATA_FILE_COUNT_POSITION = 3; | ||
static final int TOTAL_DATA_FILE_SIZE_IN_BYTES_POSITION = 4; | ||
static final int POSITION_DELETE_RECORD_COUNT_POSITION = 5; | ||
static final int POSITION_DELETE_FILE_COUNT_POSITION = 6; | ||
static final int EQUALITY_DELETE_RECORD_COUNT_POSITION = 7; | ||
static final int EQUALITY_DELETE_FILE_COUNT_POSITION = 8; | ||
static final int TOTAL_RECORD_COUNT_POSITION = 9; | ||
static final int LAST_UPDATED_AT_POSITION = 10; | ||
static final int LAST_UPDATED_SNAPSHOT_ID_POSITION = 11; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really need these?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are you suggesting to use the hardcoded value like 0,1,2 instead?
|
||
public static final int PARTITION_FIELD_ID = 0; | ||
// schema of the partition stats file as per spec | ||
public static final int PARTITION_FIELD_ID = 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are binary breaking changes in 1.9.2 compared to 1.9.0.
What is our policy here?
Do we announce that the 1.9.0 partition stats implementation is broken, and we don't support it?
In this case we might be ok to do incompatible changes (making it compatible with the spec)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@RussellSpitzer: thoughts? Should be part of only 1.10.0?
String invalidSchema = | ||
getClass() | ||
.getClassLoader() | ||
.getResource("org/apache/iceberg/PartitionStatsInvalidSchema.parquet") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we just generate the file on the fly instead of checking in the binary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't have old schema in code now.
So, creating a parquet file as per old schema, adding rows etc will be a big chunk of code.
I observed that we do maintain puffin files, delete files and metadata files in resource folder
https://github.com/apache/iceberg/tree/main/core/src/test/resources/org/apache/iceberg
Hence, I kept it to reduce PR code size.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Found a way to generate stats with old schema with few lines of code and added it instead of binaries.
String invalidSchema = | ||
getClass() | ||
.getClassLoader() | ||
.getResource("org/apache/iceberg/PartitionStatsInvalidSchema.avro") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we just generate the file on the fly instead of checking in the binary?
oldStats.forEach( | ||
partitionStats -> | ||
statsMap.put(partitionStats.specId(), partitionStats.partition(), partitionStats)); | ||
} catch (IllegalArgumentException | IllegalStateException exception) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
callstack for parquet
Not a primitive type: struct<1000: c2: optional string, 1001: c3: optional string>
java.lang.IllegalArgumentException: Not a primitive type: struct<1000: c2: optional string, 1001: c3: optional string>
at org.apache.iceberg.types.Type.asPrimitiveType(Type.java:73)
at org.apache.iceberg.parquet.TypeWithSchemaVisitor.visit(TypeWithSchemaVisitor.java:53)
at org.apache.iceberg.parquet.TypeWithSchemaVisitor.visitField(TypeWithSchemaVisitor.java:192)
at org.apache.iceberg.parquet.TypeWithSchemaVisitor.visitFields(TypeWithSchemaVisitor.java:207)
at org.apache.iceberg.parquet.TypeWithSchemaVisitor.visit(TypeWithSchemaVisitor.java:49)
at org.apache.iceberg.parquet.ParquetSchemaUtil.pruneColumns(ParquetSchemaUtil.java:134)
at org.apache.iceberg.parquet.ReadConf.<init>(ReadConf.java:82)
at org.apache.iceberg.parquet.ParquetReader.init(ParquetReader.java:74)
at org.apache.iceberg.parquet.ParquetReader.iterator(ParquetReader.java:94)
at org.apache.iceberg.io.CloseableIterable$7$1.<init>(CloseableIterable.java:205)
at org.apache.iceberg.io.CloseableIterable$7.iterator(CloseableIterable.java:204)
at org.apache.iceberg.io.CloseableIterable$7.iterator(CloseableIterable.java:196)
at org.apache.iceberg.relocated.com.google.common.collect.Lists.newArrayList(Lists.java:139)
at org.apache.iceberg.TestParquetPartitionStatsHandler.testStatsWithInvalidSchema(TestParquetPartitionStatsHandler.java:45)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
callstack for avro
java.lang.IllegalStateException: Not an instance of org.apache.iceberg.StructLike: 0
at org.apache.iceberg.data.GenericRecord.get(GenericRecord.java:138)
at org.apache.iceberg.PartitionStatsHandler.recordToPartitionStats(PartitionStatsHandler.java:266)
at org.apache.iceberg.io.CloseableIterable$7$1.next(CloseableIterable.java:219)
...(99 remaining lines not displayed - this can be changed with Assertions.setMaxStackTraceElementsDisplayed)
at org.apache.iceberg.avro.TestAvroPartitionStatsHandler.testReadingStatsWithInvalidSchema(TestAvroPartitionStatsHandler.java:57)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
or should I just catch RuntimeException
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens, if we have a temporary file access issue here, and not able to read the old stat file?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also what happens if the file is corrupted? Not a parquet file, or something like missing footer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can I just catch IOException
and RuntimeException
here? As all the error file handling errors falls into this category?
I saw that many places, iceberg just catches whole exception catch (Exception
I will go with that.
@lirui-apache, @deniskuzZ: Just a heads up as you guys are using this feature as per my knowledge. And sorry for this oversight during refactoring! |
Thanks @ajantha-bhat , we are also looking at the issue internally. Guess I can incorporate your fix when it's done. |
Thanks @ajantha-bhat for the heads up, we've just upgraded the Hive to Iceberg 1.9.1 |
off-topic: partition stats file format is highly coupled with table |
Yes. In the initial proposal document jack-ye has brought this up. Spec was hardcoded that it should be in table default format. Then we updated that it can be any of the format (https://iceberg.apache.org/spec/#partition-statistics-file). So, we can have a table property to configure a different format stats than data format. But I recommend supporting InternalData for ORC, I can help on it too if needed (I did it for parquet and avro). So, that ORC can be used for writing table metadata in v4. Than changing the format of partition stats here. |
c23de67
to
9e67cfc
Compare
Flink new flaky test: #13338 |
9e67cfc
to
1724e5c
Compare
@ajantha-bhat, that would be awesome if we add an InternalData reader/writer for ORC. However, I am not sure how big that effort would be. If i understand correctly, we'll need to implement support for a few missing features, such as default values, timestamp(9), variant |
stats.set( | ||
LAST_UPDATED_SNAPSHOT_ID_POSITION, | ||
record.get(LAST_UPDATED_SNAPSHOT_ID_POSITION, Long.class)); | ||
record.get(0, StructLike.class), // partition |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using raw numbers directly in code is considered bad practice. Static analysis tools flag them as magic numbers and recommend replacing them with named constants. This enhances code readability and makes maintenance easier.
Alternatively, to adapt easily if the schema changes, we can introduce an index variable.
int pos = 0;
PartitionStats stats =
new PartitionStats(
record.get(pos++, StructLike.class),
record.get(pos++, Integer.class)
);
for (; pos < record.size(); pos++) {
stats.set(pos, record.get(pos, Object.class));
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
1724e5c
to
abb09e1
Compare
oldStats.forEach( | ||
partitionStats -> | ||
statsMap.put(partitionStats.specId(), partitionStats.partition(), partitionStats)); | ||
} catch (IllegalArgumentException | IllegalStateException exception) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this mean that being able to read an old stats file is not a goal here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The intention is to handle this as a bug, and we don't provide backward compatibility.
The goal is to make the stats recalculation resilient, but we don't want to read the old stats, and keep code "indefinitely" just to fix this issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In that case, is it possible to wrap this logic in the iterable returned by readPartitionStatsFile, so that callers don't have to check exceptions themselves? Besides, IllegalArgumentException and IllegalStateException seem too common to me. Perhaps we should further check the exception messages, or even check the field IDs in the file, to make sure exceptions are really caused by the old schema?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
readPartitionStatsFile
, returns an Iterable. The exception happens only when the first element is accessed. Collecting elements in readPartitionStatsFile
just for the sake of catching exception can be expensive operation.
Are you not using the exposed API, computeAndWriteStatsFile
for your integration?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think readPartitionStatsFile
is also a public API, and it's not friendly to ask users to check low level exceptions to determine invalid stats files. The returned Iterable can stay lazy, and is only a wrapper of an underlying Iterable reading the file. When the underlying iterable/iterator throws exceptions, the wrapper catches and determines if this is due to invalid stats file and throws InvalidStatsFileException
accordingly. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would love to hear from @lirui-apache: Seems like he has users on this feature already
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lirui-apache: Let me know your final thoughts on this. If you still think wrapping the IllegalArgumentException
, IllegalStateException
to InvalidStatsFileException
is useful for the users I can wrap it. But I feel users can figure out it is invalid based on callstack.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pvary @ajantha-bhat We haven't released the refactor causing the issue in our internal production so this won't affect our users.
But I wonder what our messages will be to users of the readPartitionStatsFile
API in 1.10.0. Should they also treat any exception as invalid file and re-generate the stats? If that's the case, then I think we don't even need the InvalidStatsFileException
class because it's essentially same as Exception
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But I wonder what our messages will be to users of the readPartitionStatsFile API in 1.10.0. Should they also treat any exception as invalid file and re-generate the stats? If that's the case, then I think we don't even need the InvalidStatsFileException class because it's essentially same as Exception.
InvalidStatsFileException
is for the callers of computeAndMergeStatsIncremental
to know that it needs to be a fallback when old stats cannot be read. I didn't fallback on any exception for computeAndMergeStatsIncremental
is because, if manifest reading failed for network issues, it can be retired and can succeed the incremental compute after network is repaired.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, so it helps ignore all exceptions when reading the old stats, but not when doing the compute. +1 to the current change.
Merged to main. This is an important change which we have to highlight in the release notes. @ajantha-bhat: Please create those. |
While working on DV support for partition stats, observed that schema field ids of partition stats file is not as per spec. Spec, field id starts from 1 and java implementation the filed id starts from 0.
This happened because of this refactoring PR. We missed that field ids are tracked in spec. We wanted to avoid ugly code of index-1 when reused the filed id for StructLike.
Updated schema id as per spec (starts from 1). Uses separate position variables for
StructLike
(starts from 0).Handled compatibility of reading old corrupted stats to fallback on full compute.