-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[HUDI-5968] Fix global index duplicate and handle custom payload when update partition #8490
[HUDI-5968] Fix global index duplicate and handle custom payload when update partition #8490
Conversation
More code clean up needed |
dbe76d5
to
df20633
Compare
991afee
to
f5ebd44
Compare
so, #8344 is not valid anymore ? |
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergedReadHandle.java
Show resolved
Hide resolved
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergedReadHandle.java
Show resolved
Hide resolved
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
Outdated
Show resolved
Hide resolved
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
Show resolved
Hide resolved
do you think we should do the snapshot read only when updatePartitionPath is set to true and avoid when its set to false. |
f5ebd44
to
85dd094
Compare
hudi-common/src/test/java/org/apache/hudi/common/testutils/RawTripTestPayload.java
Show resolved
Hide resolved
hudi-common/src/test/java/org/apache/hudi/common/testutils/RawTripTestPayload.java
Show resolved
Hide resolved
...ent/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieGlobalBloomIndex.java
Outdated
Show resolved
Hide resolved
HoodieData<HoodieRecord<R>> newRecords = taggedHoodieRecords.filter(p -> !p.getRight().isPresent()).map(Pair::getLeft); | ||
// the records tagged to existing base files | ||
HoodieData<HoodieRecord<R>> updatingRecords = taggedHoodieRecords.filter(p -> p.getRight().isPresent()).map(Pair::getLeft) | ||
.distinctWithKey(HoodieRecord::getRecordKey, config.getGlobalIndexReconcileParallelism()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see we are doing distinctWithKey here. So, we assume that records may not be duplicated at all?
what happens if there are duplicates already. for eg, some one ingested same batch of data w/ bulk_insert. may be we need to revisit overall end to end flow for this scenario of how our global index will work.
but trying to think through how it might surface after this fix?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we may not require to fix anything as such. but wanted to see what will be outcome.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the tagged records at this point will contain dups in case of last write updated partition and inserted a new record to new partition, and compaction has not happened yet. The first look up will still get 2 records due to join only with base files.
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
Show resolved
Hide resolved
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
Outdated
Show resolved
Hide resolved
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergedReadHandle.java
Show resolved
Hide resolved
|
||
private Option<FileSlice> getLatestFileSlice() { | ||
if (nonEmpty(instantTime) | ||
&& hoodieTable.getMetaClient().getCommitsTimeline().filterCompletedInstants().lastInstant().isPresent()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we move these checks to constructor.
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergedReadHandle.java
Show resolved
Hide resolved
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
Outdated
Show resolved
Hide resolved
...tasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala
Show resolved
Hide resolved
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergedReadHandle.java
Show resolved
Hide resolved
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
Outdated
Show resolved
Hide resolved
hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java
Outdated
Show resolved
Hide resolved
hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java
Outdated
Show resolved
Hide resolved
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java
Show resolved
Hide resolved
} | ||
|
||
/** | ||
* Utility method to convert bytes to HoodieRecord using schema and payload class. | ||
*/ | ||
private static HoodieRecord<InternalRow> convertToHoodieSparkRecord(StructType structType, HoodieSparkRecord record, Pair<String, String> recordKeyPartitionPathFieldPair, | ||
boolean withOperationField, Option<String> partitionName) { | ||
boolean withOperationField, Option<String> partitionName, Option<StructType> structTypeWithoutMetaFields) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you enhance the java doc on when and how to use this method. for eg, when should the last arg be set?
or should we introduce overloaded method.
MergeHandle needs some thorough testing. can you file a follow up ticket for that |
...t/hudi-client-common/src/main/java/org/apache/hudi/index/simple/HoodieGlobalSimpleIndex.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Once CI is green, we can go ahead!
… update partition (apache#8490)
… update partition (apache#8490)
String key = record.getRecordKey(); | ||
if (deltaRecordMap.containsKey(key)) { | ||
deltaRecordKeys.remove(key); | ||
Option<Pair<HoodieRecord, Schema>> mergeResult = recordMerger |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this merge result needs to be wrapped back to the original payload so that caller won't have to do it. fixed in #8736
if (incoming.getData() instanceof EmptyHoodieRecordPayload) { | ||
// incoming is a delete: force tag the incoming to the old partition | ||
return Collections.singletonList(getTaggedRecord(incoming, Option.of(existing.getCurrentLocation()))).iterator(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this needs to use isDelete()
api to check and incoming
's key need to be overwritten to the existing's key. fixed in #8736
HoodieRecord incomingWithMetaFields = incomingPrepended | ||
.wrapIntoHoodieRecordPayloadWithParams(writeSchema, config.getProps(), Option.empty(), config.allowOperationMetadataField(), Option.empty(), false, Option.empty()); | ||
Option<Pair<HoodieRecord, Schema>> mergeResult = config.getRecordMerger() | ||
.merge(existing, existingSchema, incomingWithMetaFields, writeSchemaWithMetaFields, config.getProps()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The record merger is instantiated for each time, will cause unnecessary onverhead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch! saw it's been fixed now
Change Logs
When using global index (bloom or simple), and update partition is set to true. There is a chance where record is in p1 at the beginning, and later updated to p2, when updating to p3 and compaction not yet happened, global index joined both old versions of the record in p1 and p2, and tagged 2 records to insert to p3. This sort of duplicates will reside in the dataset and won't be reconciled unless manually dedup the table.
When records are inserted into new partitions, existing logic does not honor custom payload, which should be handled by record merger API.
Impact
Global index will load fileslice to perform merge and tagging, which slows down the whole process if a lot partition updates happen.
Risk level
High.
Documentation Update
hoodie.global.index.reconcile.parallelism
Contributor's checklist