Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-4949] optimize cdc read to avoid the problem of reusing buffer underlying the Row #6805

Merged
merged 4 commits into from
Oct 2, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
import com.fasterxml.jackson.module.scala.DefaultScalaModule

import org.apache.avro.Schema
import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder, IndexedRecord}
import org.apache.avro.generic.{GenericData, GenericRecord, GenericRecordBuilder, IndexedRecord}

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
Expand Down Expand Up @@ -310,16 +310,16 @@ class HoodieCDCRDD(
case HoodieCDCSupplementalLoggingMode.WITH_BEFORE_AFTER =>
recordToLoad.update(0, convertToUTF8String(String.valueOf(record.get(0))))
val before = record.get(2).asInstanceOf[GenericRecord]
recordToLoad.update(2, convertToUTF8String(HoodieCDCUtils.recordToJson(before)))
recordToLoad.update(2, recordToJsonAsUTF8String(before))
val after = record.get(3).asInstanceOf[GenericRecord]
recordToLoad.update(3, convertToUTF8String(HoodieCDCUtils.recordToJson(after)))
recordToLoad.update(3, recordToJsonAsUTF8String(after))
case HoodieCDCSupplementalLoggingMode.WITH_BEFORE =>
val row = cdcRecordDeserializer.deserialize(record).get.asInstanceOf[InternalRow]
val op = row.getString(0)
val recordKey = row.getString(1)
recordToLoad.update(0, convertToUTF8String(op))
val before = record.get(2).asInstanceOf[GenericRecord]
recordToLoad.update(2, convertToUTF8String(HoodieCDCUtils.recordToJson(before)))
recordToLoad.update(2, recordToJsonAsUTF8String(before))
parse(op) match {
case INSERT =>
recordToLoad.update(3, convertRowToJsonString(afterImageRecords(recordKey)))
Expand All @@ -338,10 +338,10 @@ class HoodieCDCRDD(
recordToLoad.update(2, null)
recordToLoad.update(3, convertRowToJsonString(afterImageRecords(recordKey)))
case UPDATE =>
recordToLoad.update(2, convertRowToJsonString(deserialize(beforeImageRecords(recordKey))))
recordToLoad.update(2, recordToJsonAsUTF8String(beforeImageRecords(recordKey)))
recordToLoad.update(3, convertRowToJsonString(afterImageRecords(recordKey)))
case _ =>
recordToLoad.update(2, convertRowToJsonString(deserialize(beforeImageRecords(recordKey))))
recordToLoad.update(2, recordToJsonAsUTF8String(beforeImageRecords(recordKey)))
recordToLoad.update(3, null)
}
}
Expand All @@ -355,7 +355,7 @@ class HoodieCDCRDD(
}

/**
* Load the next log record, and judege how to convert it to cdc format.
* Load the next log record, and judge how to convert it to cdc format.
*/
private def loadNextLogRecord(): Boolean = {
var loaded = false
Expand All @@ -370,33 +370,32 @@ class HoodieCDCRDD(
} else {
// there is a real record deleted.
recordToLoad.update(0, CDCRelation.CDC_OPERATION_DELETE)
recordToLoad.update(2, convertRowToJsonString(deserialize(existingRecordOpt.get)))
recordToLoad.update(2, recordToJsonAsUTF8String(existingRecordOpt.get))
recordToLoad.update(3, null)
loaded = true
}
} else {
val existingRecordOpt = beforeImageRecords.get(key)
if (existingRecordOpt.isEmpty) {
// a new record is inserted.
val insertedRecord = convertIndexedRecordToRow(indexedRecord.get)
val insertedRecord = projectAvroUnsafe(indexedRecord.get)
recordToLoad.update(0, CDCRelation.CDC_OPERATION_INSERT)
recordToLoad.update(2, null)
recordToLoad.update(3, convertRowToJsonString(insertedRecord))
recordToLoad.update(3, recordToJsonAsUTF8String(insertedRecord))
// insert into beforeImageRecords
beforeImageRecords(key) = serialize(insertedRecord)
beforeImageRecords(key) = insertedRecord
loaded = true
} else {
// a existed record is updated.
val existingRecord = existingRecordOpt.get
val merged = merge(existingRecord, logRecord)
val mergeRow = convertIndexedRecordToRow(merged)
val existingRow = deserialize(existingRecord)
if (mergeRow != existingRow) {
val mergeRecord = projectAvroUnsafe(merged)
if (existingRecord != mergeRecord) {
recordToLoad.update(0, CDCRelation.CDC_OPERATION_UPDATE)
recordToLoad.update(2, convertRowToJsonString(existingRow))
recordToLoad.update(3, convertRowToJsonString(mergeRow))
recordToLoad.update(2, recordToJsonAsUTF8String(existingRecord))
recordToLoad.update(3, recordToJsonAsUTF8String(mergeRecord))
// update into beforeImageRecords
beforeImageRecords(key) = serialize(mergeRow)
beforeImageRecords(key) = mergeRecord
loaded = true
}
}
Expand Down Expand Up @@ -512,7 +511,9 @@ class HoodieCDCRDD(
val iter = loadFileSlice(fileSlice)
iter.foreach { row =>
val key = getRecordKey(row)
beforeImageRecords.put(key, serialize(row))
// Due to the reuse buffer mechanism of Spark serialization,
// we have to copy the serialized result if we need to retain its reference
beforeImageRecords.put(key, serialize(row, copy = true))
YannByron marked this conversation as resolved.
Show resolved Hide resolved
}
// reset beforeImageFiles
beforeImageFiles.clear()
Expand Down Expand Up @@ -577,8 +578,17 @@ class HoodieCDCRDD(
p.toUri.toString
}

private def serialize(curRowRecord: InternalRow): GenericRecord = {
serializer.serialize(curRowRecord).asInstanceOf[GenericRecord]
private def serialize(curRowRecord: InternalRow, copy: Boolean = false): GenericRecord = {
val record = serializer.serialize(curRowRecord).asInstanceOf[GenericRecord]
if (copy) {
GenericData.get().deepCopy(record.getSchema, record)
} else {
record
}
}

private def recordToJsonAsUTF8String(record: GenericRecord): UTF8String = {
convertToUTF8String(HoodieCDCUtils.recordToJson(record))
}

private def getRecordKey(row: InternalRow): String = {
Expand All @@ -595,11 +605,9 @@ class HoodieCDCRDD(
toScalaOption(record.getData.getInsertValue(avroSchema, payloadProps))
}

private def convertIndexedRecordToRow(record: IndexedRecord): InternalRow = {
deserialize(
LogFileIterator.projectAvroUnsafe(record.asInstanceOf[GenericRecord],
avroSchema, reusableRecordBuilder)
)
private def projectAvroUnsafe(record: IndexedRecord): GenericRecord = {
LogFileIterator.projectAvroUnsafe(record.asInstanceOf[GenericRecord],
avroSchema, reusableRecordBuilder)
}

private def merge(curAvroRecord: GenericRecord, newRecord: HoodieRecord[_ <: HoodieRecordPayload[_]]): IndexedRecord = {
Expand Down