-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[HUDI-7567] Add schema evolution to the filegroup reader #10957
[HUDI-7567] Add schema evolution to the filegroup reader #10957
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Approving subject to clarification on #10957 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd really encourage you to break down such large PR into smaller pieces; so each one can be independent on its own (e.g., adding new APIs, util methods) in terms of scope of changes and stacked properly. Then each set of changes can be reviewed closely.
...di-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
Outdated
Show resolved
Hide resolved
...di-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
Outdated
Show resolved
Hide resolved
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
Outdated
Show resolved
Hide resolved
...-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
Show resolved
Hide resolved
...-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
Outdated
Show resolved
Hide resolved
@@ -275,6 +311,9 @@ protected Option<T> merge(Option<T> older, Map<String, Object> olderInfoMap, | |||
|
|||
if (mergedRecord.isPresent() | |||
&& !mergedRecord.get().getLeft().isDelete(mergedRecord.get().getRight(), payloadProps)) { | |||
if (!mergedRecord.get().getRight().equals(readerSchema)) { | |||
return Option.ofNullable((T) mergedRecord.get().getLeft().rewriteRecordWithNewSchema(mergedRecord.get().getRight(), null, readerSchema).getData()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do partial updates need schema evolution handling like this?
...ommon/src/main/java/org/apache/hudi/internal/schema/convert/AvroInternalSchemaConverter.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you have any performance numbers based on manual benchmarking to make sure there is no regression?
...asource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
Outdated
Show resolved
Hide resolved
...k-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala
Outdated
Show resolved
Hide resolved
...c/test/scala/org/apache/hudi/common/table/read/TestSpark35RecordPositionMetadataColumn.scala
Show resolved
Hide resolved
...di-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looks good as a first cut. We should keep testing and improving the schema evolution logic in the new file group reader.
import org.junit.jupiter.api.{BeforeEach, Test} | ||
import org.junit.jupiter.api.Assertions.{assertArrayEquals, assertEquals, assertFalse} | ||
|
||
class TestSpark35RecordPositionMetadataColumn extends SparkClientFunctionalTestHarness { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We'll add this back in #11413?
Change Logs
Subtask of https://issues.apache.org/jira/browse/HUDI-7045
Extracts from #10278
This pr adds in schema evolution to the filegroup reader, including schema.on.read and schema.on.write.
Impact
schema evolution supported in fg reader
Risk level (write none, low medium or high below)
high
need to do perf testing
Documentation Update
N/A
Contributor's checklist