-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[HUDI-1160] Support update partial fields for CoW table #1929
Conversation
@vinothchandar @bvaradar please take a look when free |
@satishkotha Can you please help review? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@leesf Added some questions to understand this change. My high level opinion is it'd be great to find a way to place this logic to MergeHandle or few places instead of repeating the same check multiple places. Please take a look.
@@ -94,6 +95,11 @@ | |||
public static final String BULKINSERT_SORT_MODE = "hoodie.bulkinsert.sort.mode"; | |||
public static final String DEFAULT_BULKINSERT_SORT_MODE = BulkInsertSortMode.GLOBAL_SORT | |||
.toString(); | |||
public static final String DELETE_MARKER_FIELD_PROP = "hoodie.write.delete.marker.field"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this needed for this change? what is this used for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch, not needed. will revert
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
String schema = config.getSchema(); | ||
if (config.updatePartialFields()) { | ||
try { | ||
TableSchemaResolver resolver = new TableSchemaResolver(table.getMetaClient()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you need to create resolver again? Does config.getLastSchema() work here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you need to create resolver again? Does config.getLastSchema() work here?
I tested it E2E and found that would not get the lastSchema from config since the config object are different
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can get the last Schema from the commit metadata, right?
schema = resolver.getTableAvroSchemaWithoutMetadataFields().toString(); | ||
} catch (Exception e) { | ||
// ignore exception. | ||
schema = config.getSchema(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are potentially reducing schema here, so I think this can lead to issues. Can we throw error? At the least, can you add a LOG here to make sure this gets noticed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are potentially reducing schema here, so I think this can lead to issues. Can we throw error? At the least, can you add a LOG here to make sure this gets noticed?
it handles the case that users config update partial fields in the first time, my original idea is not to throw error in this case, and LOG here sounds reasonable to me.
boolean updatePartialFields = config.updatePartialFields(); | ||
if (updatePartialFields) { | ||
try { | ||
TableSchemaResolver resolver = new TableSchemaResolver(hoodieTable.getMetaClient()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is only applicable for MergeHandle if i understand correctly. Do you think its better to override this in MergeHandle?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds reasonable
@@ -192,6 +208,10 @@ protected long getAttemptId() { | |||
|
|||
protected HoodieFileWriter createNewFileWriter(String instantTime, Path path, HoodieTable<T> hoodieTable, | |||
HoodieWriteConfig config, Schema schema, SparkTaskContextSupplier sparkTaskContextSupplier) throws IOException { | |||
return HoodieFileWriterFactory.getFileWriter(instantTime, path, hoodieTable, config, schema, sparkTaskContextSupplier); | |||
if (config.updatePartialFields() && !StringUtils.isNullOrEmpty(config.getLastSchema())) { | |||
return HoodieFileWriterFactory.getFileWriter(instantTime, path, hoodieTable, config, new Schema.Parser().parse(config.getLastSchema()), sparkTaskContextSupplier); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same question as above, is it better to override this only in MergeHandle?
@@ -237,6 +238,9 @@ protected void finalizeWrite(String instantTime, List<HoodieWriteStat> stats, Ho | |||
* By default, return the writer schema in Write Config for storing in commit. | |||
*/ | |||
protected String getSchemaToStoreInCommit() { | |||
if (config.updatePartialFields() && !StringUtils.isNullOrEmpty(config.getLastSchema())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This check is repeated in multiple places. I usually find this pattern error-prone. Is it possible to reorganize? For example, we always expect config.getSchema() to represent full table schema. We add new config.getUpdateSchema() that tracks partial fields that are being updated.
With that approach, I think we can use getUpdateSchema only in MergeHandle/helpers. Storing schema would work as before as we store full table schema i.e., config.getSchema()
@@ -73,7 +74,11 @@ | |||
} else { | |||
gReader = null; | |||
gWriter = null; | |||
readSchema = upsertHandle.getWriterSchemaWithMetafields(); | |||
if (table.getConfig().updatePartialFields() && !StringUtils.isNullOrEmpty(table.getConfig().getLastSchema())) { | |||
readSchema = new Schema.Parser().parse(table.getConfig().getLastSchema()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
similar comment as before. if we make config.getSchema() to always track full table schema, this can be simplified.
if (table.getConfig().updatePartialFields() && !StringUtils.isNullOrEmpty(table.getConfig().getLastSchema())) { | ||
readSchema = new Schema.Parser().parse(table.getConfig().getLastSchema()); | ||
} else { | ||
readSchema = upsertHandle.getWriterSchemaWithMetafields(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we are also calling getWriterSchemaWithMetafields in other places in this class (example: line 163). Dont we need to read getLastSchema() there?
@leesf Any update? Let me know if you need any help here |
ack, will update the PR ASAP |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do have some concerns about introducing these new configs. adding support for partial merges themselves, a big +1
@@ -117,7 +118,17 @@ public boolean commitStats(String instantTime, List<HoodieWriteStat> stats, Opti | |||
if (extraMetadata.isPresent()) { | |||
extraMetadata.get().forEach(metadata::addMetadata); | |||
} | |||
metadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, config.getSchema()); | |||
String schema = config.getSchema(); | |||
if (config.updatePartialFields()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we have this so that whether or not we can update partial fields is simply controlled by the payload impl.
I am concerned that a config like config.updatePartialFields
is not inline with how we treat payloads. for e.g for MOR, there is no writeConfig when we merge the fields via record reader
String schema = config.getSchema(); | ||
if (config.updatePartialFields()) { | ||
try { | ||
TableSchemaResolver resolver = new TableSchemaResolver(table.getMetaClient()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can get the last Schema from the commit metadata, right?
@@ -94,6 +95,11 @@ | |||
public static final String BULKINSERT_SORT_MODE = "hoodie.bulkinsert.sort.mode"; | |||
public static final String DEFAULT_BULKINSERT_SORT_MODE = BulkInsertSortMode.GLOBAL_SORT | |||
.toString(); | |||
public static final String DELETE_MARKER_FIELD_PROP = "hoodie.write.delete.marker.field"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
public static final String DELETE_MARKER_FIELD_PROP = "hoodie.write.delete.marker.field"; | ||
public static final String DEFAULT_DELETE_MARKER_FIELD = "_hoodie_is_deleted"; | ||
|
||
public static final String UPDATE_PARTIAL_FIELDS = "hoodie.update.partial.fields"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
javadocs on what these mean please.
@leesf are you still pursuing this? |
yes, will update the PR when free |
@leesf : do you think we can get this in by next week for upcoming release cut? |
@vinothchandar : do you think we can make this a blocker? |
@liujinhui1994 takes over this work, would you please share the progress |
Can be completed today, and PR |
@liujinhui1994 : please let me know once you have addressed all comments and rebased w/ latest master. |
Option<IndexedRecord> recordOption = getInsertValue(schema); | ||
if (recordOption.isPresent()) { | ||
IndexedRecord record = recordOption.get(); | ||
GenericRecord current = (GenericRecord) record; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is not current, this is new incoming record. Can we fix the naming please.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@liujinhui1994 @leesf
Since this has been open for sometime, let me try to summarize the requirement. And then I can review and help take it to closure.
Requirement:
Add ability to update partial fields w/ COW table. In other words, if existing hudi dataset has 5 fields, new incoming batch could have only 2 fields set. In this case, for those fields which are null in incoming record, we would like to fetch values from previous version of this record (based on hoodie key) from hudi dataset and use it for the new record.
If my understanding is right, can you help me clarify few questions:
- How is the schema passed for new incoming batch?
- is it inferred or via schema provider?
- if its via schema provider, is the schema also trimmed? If yes, we might run into schema compatibility issues.
- If schema is intact, there are chances that default value will be rendered for fields which doesn't have a value set. So, is the requirement to ignore these fields or even choose values from previous version of the record and use it?
FYI: we have another PR open to support similar feature just for preCombine. |
@nsivabalan I’m sorry for the late reply. Recently I cannot access git due to network reasons. I will deal with this PR as suggested in the comments. I hope you will have time to review later. |
@nsivabalan This PR has been modified based on the comments, |
Closing in favor of #2666 |
Tips
What is the purpose of the pull request
Support update partial fields for Cow table
Brief change log
(for example:)
Support update partial fields for Cow table, currently, when updating hudi table, users have to provide all fields, however in some cases, users would not provide all fields, and hudi needs to support update partial fields.
Introduce PartialUpdatePayload
Verify this pull request
(Please pick either of the following options)
Committer checklist
Has a corresponding JIRA in PR title & commit
Commit message is descriptive of the change
CI is green
Necessary doc changes done or have another open PR
For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.