Skip to content

Commit

Permalink
[HUDI-3304] review
Browse files Browse the repository at this point in the history
  • Loading branch information
jian.feng committed Sep 16, 2022
1 parent c6aadba commit 256b995
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 9 deletions.
Expand Up @@ -35,8 +35,8 @@ public class SerializableSchema implements Serializable {
public SerializableSchema() {
}

public SerializableSchema(String schema) {
this.schema = new Schema.Parser().parse(schema);
public SerializableSchema(String schemaStr) {
this.schema = new Schema.Parser().parse(schemaStr);
}

public SerializableSchema(Schema schema) {
Expand Down
Expand Up @@ -102,13 +102,13 @@ public PartialUpdateAvroPayload preCombine(OverwriteWithLatestAvroPayload oldVal
return this;
}
// pick the payload with greater ordering value as insert record
final boolean isOldRecordNewer = oldValue.orderingVal.compareTo(orderingVal) > 0 ? true : false;
final boolean shouldPickOldRecord = oldValue.orderingVal.compareTo(orderingVal) > 0 ? true : false;
try {
GenericRecord oldRecord = (GenericRecord) oldValue.getInsertValue(schema).get();
Option<IndexedRecord> mergedRecord = mergeOldRecord(oldRecord, schema, isOldRecordNewer);
Option<IndexedRecord> mergedRecord = mergeOldRecord(oldRecord, schema, shouldPickOldRecord);
if (mergedRecord.isPresent()) {
return new PartialUpdateAvroPayload((GenericRecord) mergedRecord.get(),
isOldRecordNewer ? oldValue.orderingVal : this.orderingVal);
shouldPickOldRecord ? oldValue.orderingVal : this.orderingVal);
}
} catch (Exception ex) {
return this;
Expand All @@ -123,7 +123,7 @@ public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue

@Override
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema, Properties prop) throws IOException {
return mergeOldRecord(currentValue, schema, isRecordNewer(currentValue, prop));
return mergeOldRecord(currentValue, schema, isRecordNewer(orderingVal, currentValue, prop));
}

/**
Expand All @@ -149,20 +149,21 @@ private Option<IndexedRecord> mergeOldRecord(
}

GenericRecord baseRecord = isOldRecordNewer ? (GenericRecord) oldRecord : (GenericRecord) recordOption.get();
GenericRecord mergedRecord = isOldRecordNewer ? (GenericRecord) recordOption.get() : (GenericRecord) oldRecord;
GenericRecord updatingRecord = isOldRecordNewer ? (GenericRecord) recordOption.get() : (GenericRecord) oldRecord;

return mergeRecords(schema, baseRecord, mergedRecord);
return mergeRecords(schema, baseRecord, updatingRecord);
}

/**
* Returns whether the given record is newer than the record of this payload.
*
* @param orderingVal
* @param record The record
* @param prop The payload properties
*
* @return true if the given record is newer
*/
private boolean isRecordNewer(IndexedRecord record, Properties prop) {
private static boolean isRecordNewer(Comparable orderingVal, IndexedRecord record, Properties prop) {
String orderingField = prop.getProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY);
if (!StringUtils.isNullOrEmpty(orderingField)) {
boolean consistentLogicalTimestampEnabled = Boolean.parseBoolean(prop.getProperty(
Expand Down

0 comments on commit 256b995

Please sign in to comment.