Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
YannByron committed Sep 28, 2022
1 parent 5536178 commit dee56f5
Showing 1 changed file with 28 additions and 27 deletions.
Expand Up @@ -314,16 +314,16 @@ class HoodieCDCRDD(
case HoodieCDCSupplementalLoggingMode.WITH_BEFORE_AFTER =>
recordToLoad.update(0, convertToUTF8String(String.valueOf(record.get(0))))
val before = record.get(2).asInstanceOf[GenericRecord]
recordToLoad.update(2, convertToUTF8String(HoodieCDCUtils.recordToJson(before)))
recordToLoad.update(2, recordToJsonAsUTF8String(before))
val after = record.get(3).asInstanceOf[GenericRecord]
recordToLoad.update(3, convertToUTF8String(HoodieCDCUtils.recordToJson(after)))
recordToLoad.update(3, recordToJsonAsUTF8String(after))
case HoodieCDCSupplementalLoggingMode.WITH_BEFORE =>
val row = cdcRecordDeserializer.deserialize(record).get.asInstanceOf[InternalRow]
val op = row.getString(0)
val recordKey = row.getString(1)
recordToLoad.update(0, convertToUTF8String(op))
val before = record.get(2).asInstanceOf[GenericRecord]
recordToLoad.update(2, convertToUTF8String(HoodieCDCUtils.recordToJson(before)))
recordToLoad.update(2, recordToJsonAsUTF8String(before))
parse(op) match {
case INSERT =>
recordToLoad.update(3, convertRowToJsonString(afterImageRecords(recordKey)))
Expand All @@ -342,10 +342,10 @@ class HoodieCDCRDD(
recordToLoad.update(2, null)
recordToLoad.update(3, convertRowToJsonString(afterImageRecords(recordKey)))
case UPDATE =>
recordToLoad.update(2, convertRowToJsonString(deserialize(beforeImageRecords(recordKey))))
recordToLoad.update(2, recordToJsonAsUTF8String(beforeImageRecords(recordKey)))
recordToLoad.update(3, convertRowToJsonString(afterImageRecords(recordKey)))
case _ =>
recordToLoad.update(2, convertRowToJsonString(deserialize(beforeImageRecords(recordKey))))
recordToLoad.update(2, recordToJsonAsUTF8String(beforeImageRecords(recordKey)))
recordToLoad.update(3, null)
}
}
Expand All @@ -359,7 +359,7 @@ class HoodieCDCRDD(
}

/**
* Load the next log record, and judege how to convert it to cdc format.
* Load the next log record, and judge how to convert it to cdc format.
*/
private def loadNextLogRecord(): Boolean = {
var loaded = false
Expand All @@ -374,33 +374,32 @@ class HoodieCDCRDD(
} else {
// there is a real record deleted.
recordToLoad.update(0, CDCRelation.CDC_OPERATION_DELETE)
recordToLoad.update(2, convertRowToJsonString(deserialize(existingRecordOpt.get)))
recordToLoad.update(2, recordToJsonAsUTF8String(existingRecordOpt.get))
recordToLoad.update(3, null)
loaded = true
}
} else {
val existingRecordOpt = beforeImageRecords.get(key)
if (existingRecordOpt.isEmpty) {
// a new record is inserted.
val insertedRecord = convertIndexedRecordToRow(indexedRecord.get)
val insertedRecord = projectAvroUnsafe(indexedRecord.get)
recordToLoad.update(0, CDCRelation.CDC_OPERATION_INSERT)
recordToLoad.update(2, null)
recordToLoad.update(3, convertRowToJsonString(insertedRecord))
recordToLoad.update(3, recordToJsonAsUTF8String(insertedRecord))
// insert into beforeImageRecords
beforeImageRecords(key) = serialize(insertedRecord)
beforeImageRecords(key) = insertedRecord
loaded = true
} else {
// a existed record is updated.
val existingRecord = existingRecordOpt.get
val merged = merge(existingRecord, logRecord)
val mergeRow = convertIndexedRecordToRow(merged)
val existingRow = deserialize(existingRecord)
if (mergeRow != existingRow) {
val mergeRecord = projectAvroUnsafe(merged)
if (existingRecord != mergeRecord) {
recordToLoad.update(0, CDCRelation.CDC_OPERATION_UPDATE)
recordToLoad.update(2, convertRowToJsonString(existingRow))
recordToLoad.update(3, convertRowToJsonString(mergeRow))
recordToLoad.update(2, recordToJsonAsUTF8String(existingRecord))
recordToLoad.update(3, recordToJsonAsUTF8String(mergeRecord))
// update into beforeImageRecords
beforeImageRecords(key) = serialize(mergeRow)
beforeImageRecords(key) = mergeRecord
loaded = true
}
}
Expand Down Expand Up @@ -516,7 +515,7 @@ class HoodieCDCRDD(
val iter = loadFileSlice(fileSlice)
iter.foreach { row =>
val key = getRecordKey(row)
beforeImageRecords.put(key, serialize(row))
beforeImageRecords.put(key, serialize(row, copy = true))
}
// reset beforeImageFiles
beforeImageFiles.clear()
Expand Down Expand Up @@ -581,13 +580,17 @@ class HoodieCDCRDD(
p.toUri.toString
}

private def serialize(curRowRecord: InternalRow): GenericRecord = {
val genericRecord = serializer.serialize(curRowRecord).asInstanceOf[GenericRecord]
GenericData.get().deepCopy(genericRecord.getSchema, genericRecord)
private def serialize(curRowRecord: InternalRow, copy: Boolean = false): GenericRecord = {
val record = serializer.serialize(curRowRecord).asInstanceOf[GenericRecord]
if (copy) {
GenericData.get().deepCopy(record.getSchema, record)
} else {
record
}
}

override def deserialize(avroRecord: GenericRecord): InternalRow = {
super.deserialize(avroRecord).copy()
private def recordToJsonAsUTF8String(record: GenericRecord): UTF8String = {
convertToUTF8String(HoodieCDCUtils.recordToJson(record))
}

private def getRecordKey(row: InternalRow): String = {
Expand All @@ -604,11 +607,9 @@ class HoodieCDCRDD(
toScalaOption(record.getData.getInsertValue(avroSchema, payloadProps))
}

private def convertIndexedRecordToRow(record: IndexedRecord): InternalRow = {
deserialize(
LogFileIterator.projectAvroUnsafe(record.asInstanceOf[GenericRecord],
avroSchema, reusableRecordBuilder)
)
private def projectAvroUnsafe(record: IndexedRecord): GenericRecord = {
LogFileIterator.projectAvroUnsafe(record.asInstanceOf[GenericRecord],
avroSchema, reusableRecordBuilder)
}

private def merge(curAvroRecord: GenericRecord, newRecord: HoodieRecord[_ <: HoodieRecordPayload[_]]): IndexedRecord = {
Expand Down

0 comments on commit dee56f5

Please sign in to comment.