-
Notifications
You must be signed in to change notification settings - Fork 2.5k
Description
if COW table has record
(id 1, ts 100)
do MIT delete with given record (id 1, ts 99). This should not be deleted.
JIRA info
- Link: https://issues.apache.org/jira/browse/HUDI-8915
- Type: Sub-task
- Parent: https://issues.apache.org/jira/browse/HUDI-8722
- Fix version(s):
- 1.1.0
Comments
06/Mar/25 21:59;yihua;This should already be fixed. I'll check the test coverage.;;;
26/Mar/25 02:25;shivnarayan;Nope, we still have the issue.
here is the root cause.
with HoodieMergeHandle, we are using HoodieAvroRecordMerger as the merger class.
and the payload is ExpressionPayload.
here is how the call stack looks like
{code:java}
Option<Pair<HoodieRecord, Schema>> mergeResult = recordMerger.merge(oldRecord, oldSchema, newRecord, newSchema, props);{code}
{code:java}
// where oldRecord is valid prev record w/ ts 100. and newRecord is to be deleted record but w/ lower ts of 99. {code}
{code:java}
-> @OverRide
public Option<Pair<HoodieRecord, Schema>> merge(HoodieRecord older, Schema oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) throws IOException {
return combineAndGetUpdateValue(older, newer, newSchema, props)
.map(r -> Pair.of(new HoodieAvroIndexedRecord(r), r.getSchema()));
}{code}
{code:java}
-> private Option combineAndGetUpdateValue(HoodieRecord older, HoodieRecord newer, Schema schema, Properties props) throws IOException {
Option previousAvroData = older.toIndexedRecord(schema, props).map(HoodieAvroIndexedRecord::getData);
if (!previousAvroData.isPresent()) {
return Option.empty();
}
return ((HoodieAvroRecord) newer).getData().combineAndGetUpdateValue(previousAvroData.get(), schema, props);
}{code}
{code:java}
-> the last line from above is invoked -> ExpressionPayload {code}
{code:java}
override def combineAndGetUpdateValue(targetRecord: IndexedRecord,
schema: Schema,
properties: Properties): HOption[IndexedRecord] = {
val recordSchema = getRecordSchema(properties)
val sourceRecord = bytesToAvro(recordBytes, recordSchema)
val joinedRecord = joinRecord(sourceRecord, targetRecord, properties)
processMatchedRecord(ConvertibleRecord(joinedRecord), Some(targetRecord), properties)
} {code}
So, in expression payload, we never account for ordering value. So, at the end of this combineAndGetUpdateValue, we get a merged record.
{code:java}
after merging, we call processMatchedRecord{code}
{code:java}
if (resultRecordOpt == null) {
// Process delete
val deleteConditionText = properties.get(ExpressionPayload.PAYLOAD_DELETE_CONDITION)
if (deleteConditionText != null) {
val (deleteConditionEvaluator, _) = getEvaluator(deleteConditionText.toString, inputRecord.asAvro.getSchema).head
val deleteConditionEvalResult = deleteConditionEvaluator.apply(inputRecord.asRow)
.get(0, BooleanType)
.asInstanceOf[Boolean]
if (deleteConditionEvalResult) {
resultRecordOpt = HOption.empty()
}
}
}{code}
{code:java}
in above code snippet, deleteConditionEvalResult ends up as true. {code}
;;;
26/Mar/25 02:26;shivnarayan;Essentially, ExpressionPayload combineAndGetUpdate does not honor ordering value. guess its a known limitation. Just that our FileGroup reader for MOR table on the read code path takes care of accounting for ordering value as well when two versions of the record is merged. ;;;