Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
YannByron committed Sep 22, 2022
1 parent 3d9071b commit 2f8e221
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,7 @@ public static Schema resolveNullableSchema(Schema schema) {
* wrapping around provided target non-null type
*/
public static Schema createNullableSchema(Schema.Type avroType) {
checkState(avroType != Schema.Type.NULL);
return Schema.createUnion(Schema.create(Schema.Type.NULL), Schema.create(avroType));
return createNullableSchema(Schema.create(avroType));
}

public static Schema createNullableSchema(Schema schema) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,19 @@
package org.apache.hudi.functional.cdc

import org.apache.avro.Schema
import org.apache.avro.generic.IndexedRecord
import org.apache.avro.generic.{GenericRecord, IndexedRecord}

import org.apache.hadoop.fs.Path

import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieLogFile}
import org.apache.hudi.common.table.cdc.{HoodieCDCSupplementalLoggingMode, HoodieCDCUtils}
import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieKey, HoodieLogFile, HoodieRecord}
import org.apache.hudi.common.table.cdc.{HoodieCDCOperation, HoodieCDCSupplementalLoggingMode, HoodieCDCUtils}
import org.apache.hudi.common.table.log.HoodieLogFormat
import org.apache.hudi.common.table.log.block.{HoodieDataBlock, HoodieLogBlock}
import org.apache.hudi.common.table.log.block.{HoodieDataBlock}
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.table.timeline.HoodieInstant
import org.apache.hudi.common.testutils.RawTripTestPayload
import org.apache.hudi.common.testutils.RawTripTestPayload.{deleteRecordsToStrings, recordsToStrings}
import org.apache.hudi.config.{HoodieCleanConfig, HoodieWriteConfig}
import org.apache.hudi.testutils.HoodieClientTestBase
Expand All @@ -39,7 +40,7 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SaveMode

import org.junit.jupiter.api.{AfterEach, BeforeEach}
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNotEquals, assertNull, assertTrue}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.CsvSource

Expand Down Expand Up @@ -109,7 +110,8 @@ class TestCDCDataFrameSuite extends HoodieClientTestBase {
HoodieCDCSupplementalLoggingMode.parse(cdcSupplementalLoggingMode), dataSchema)

// Upsert Operation
val records2 = recordsToStrings(dataGen.generateUniqueUpdates("001", 50)).toList
val hoodieRecords2 = dataGen.generateUniqueUpdates("001", 50)
val records2 = recordsToStrings(hoodieRecords2).toList
val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2))
inputDF2.write.format("org.apache.hudi")
.options(options)
Expand All @@ -120,11 +122,18 @@ class TestCDCDataFrameSuite extends HoodieClientTestBase {

// part of data are updated, it will write out cdc log files
assertTrue(hasCDCLogFile(instant2))
val cdcData2 = getCDCLogFIle(instant2).flatMap(readCDCLogFile(_, cdcSchema))
val cdcData2: Seq[IndexedRecord] = getCDCLogFIle(instant2).flatMap(readCDCLogFile(_, cdcSchema))
// check the num of cdc data
assertEquals(cdcData2.size, 50)
// check op
assert(cdcData2.forall( r => r.get(0).toString == "u"))
// check record key, before, after according to the supplemental logging mode
checkCDCDataForInsertOrUpdate(cdcSupplementalLoggingMode, cdcSchema, dataSchema,
cdcData2, hoodieRecords2, HoodieCDCOperation.UPDATE)

// Delete Operation
val records3 = deleteRecordsToStrings(dataGen.generateUniqueDeletes(20)).toList
val hoodieKey3 = dataGen.generateUniqueDeletes(20)
val records3 = deleteRecordsToStrings(hoodieKey3).toList
val inputDF3 = spark.read.json(spark.sparkContext.parallelize(records3, 2))
inputDF3.write.format("org.apache.hudi")
.options(options)
Expand All @@ -137,7 +146,12 @@ class TestCDCDataFrameSuite extends HoodieClientTestBase {
// part of data are deleted, it will write out cdc log files
assertTrue(hasCDCLogFile(instant3))
val cdcData3 = getCDCLogFIle(instant3).flatMap(readCDCLogFile(_, cdcSchema))
// check the num of cdc data
assertEquals(cdcData3.size, 20)
// check op
assert(cdcData3.forall( r => r.get(0).toString == "d"))
// check record key, before, after according to the supplemental logging mode
checkCDCDataForDelete(cdcSupplementalLoggingMode, cdcSchema, cdcData3, hoodieKey3)
}

@ParameterizedTest
Expand Down Expand Up @@ -172,11 +186,11 @@ class TestCDCDataFrameSuite extends HoodieClientTestBase {
assertFalse(hasCDCLogFile(instant1))

// 2. Upsert Operation
val records2 = recordsToStrings(dataGen.generateUniqueUpdates("001", 30)).toList
val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2))
val records22 = recordsToStrings(dataGen.generateInserts("001", 20)).toList
val inputDF22 = spark.read.json(spark.sparkContext.parallelize(records22, 2))
inputDF2.union(inputDF22).write.format("org.apache.hudi")
val records2_1 = recordsToStrings(dataGen.generateUniqueUpdates("001", 30)).toList
val inputDF2_1 = spark.read.json(spark.sparkContext.parallelize(records2_1, 2))
val records2_2 = recordsToStrings(dataGen.generateInserts("001", 20)).toList
val inputDF2_2 = spark.read.json(spark.sparkContext.parallelize(records2_2, 2))
inputDF2_1.union(inputDF2_2).write.format("org.apache.hudi")
.options(options)
.mode(SaveMode.Append)
.save(basePath)
Expand All @@ -186,6 +200,9 @@ class TestCDCDataFrameSuite extends HoodieClientTestBase {
assertTrue(hasCDCLogFile(instant2))
val cdcData2 = getCDCLogFIle(instant2).flatMap(readCDCLogFile(_, cdcSchema))
assertEquals(cdcData2.size, 50)
// check op
assertEquals(cdcData2.count(r => r.get(0).toString == "u"), 30)
assertEquals(cdcData2.count(r => r.get(0).toString == "i"), 20)

// 3. Delete Operation
val records3 = deleteRecordsToStrings(dataGen.generateUniqueDeletes(20)).toList
Expand Down Expand Up @@ -235,4 +252,75 @@ class TestCDCDataFrameSuite extends HoodieClientTestBase {
val block = reader.next().asInstanceOf[HoodieDataBlock];
block.getRecordIterator.asScala.toList
}

private def checkCDCDataForInsertOrUpdate(cdcSupplementalLoggingMode: String,
cdcSchema: Schema,
dataSchema: Schema,
cdcRecords: Seq[IndexedRecord],
newHoodieRecords: java.util.List[HoodieRecord[_]],
op: HoodieCDCOperation): Unit = {
val cdcRecord = cdcRecords.head.asInstanceOf[GenericRecord]
// check schema
assertEquals(cdcRecord.getSchema, cdcSchema)
if (cdcSupplementalLoggingMode == "cdc_op_key") {
// check record key
assert(cdcRecords.map(_.get(1).toString).sorted == newHoodieRecords.map(_.getKey.getRecordKey).sorted)
} else if (cdcSupplementalLoggingMode == "cdc_data_before") {
// check record key
assert(cdcRecords.map(_.get(1).toString).sorted == newHoodieRecords.map(_.getKey.getRecordKey).sorted)
// check before
if (op == HoodieCDCOperation.INSERT) {
assertNull(cdcRecord.get("before"))
} else {
val payload = newHoodieRecords.find(_.getKey.getRecordKey == cdcRecord.get("record_key").toString).get
.getData.asInstanceOf[RawTripTestPayload]
val genericRecord = payload.getInsertValue(dataSchema).get.asInstanceOf[GenericRecord]
val cdcBeforeValue = cdcRecord.get("before").asInstanceOf[GenericRecord]
assertNotEquals(genericRecord.get("begin_lat"), cdcBeforeValue.get("begin_lat"))
}
} else {
val cdcBeforeValue = cdcRecord.get("before").asInstanceOf[GenericRecord]
val cdcAfterValue = cdcRecord.get("after").asInstanceOf[GenericRecord]
if (op == HoodieCDCOperation.INSERT) {
// check before
assertNull(cdcBeforeValue)
// check after
val payload = newHoodieRecords.find(_.getKey.getRecordKey == cdcAfterValue.get("_row_key").toString).get
.getData.asInstanceOf[RawTripTestPayload]
val genericRecord = payload.getInsertValue(dataSchema).get.asInstanceOf[GenericRecord]
assertEquals(genericRecord.get("begin_lat"), cdcAfterValue.get("begin_lat"))
} else {
val payload = newHoodieRecords.find(_.getKey.getRecordKey == cdcAfterValue.get("_row_key").toString).get
.getData.asInstanceOf[RawTripTestPayload]
val genericRecord = payload.getInsertValue(dataSchema).get.asInstanceOf[GenericRecord]
// check before
assertNotEquals(genericRecord.get("begin_lat"), cdcBeforeValue.get("begin_lat"))
// check after
assertEquals(genericRecord.get("begin_lat"), cdcAfterValue.get("begin_lat"))
}
}
}

private def checkCDCDataForDelete(cdcSupplementalLoggingMode: String,
cdcSchema: Schema,
cdcRecords: Seq[IndexedRecord],
deletedKeys: java.util.List[HoodieKey]): Unit = {
val cdcRecord = cdcRecords.head.asInstanceOf[GenericRecord]
// check schema
assertEquals(cdcRecord.getSchema, cdcSchema)
if (cdcSupplementalLoggingMode == "cdc_op_key") {
// check record key
assert(cdcRecords.map(_.get(1).toString).sorted == deletedKeys.map(_.getRecordKey).sorted)
} else if (cdcSupplementalLoggingMode == "cdc_data_before") {
// check record key
assert(cdcRecords.map(_.get(1).toString).sorted == deletedKeys.map(_.getRecordKey).sorted)
} else {
val cdcBeforeValue = cdcRecord.get("before").asInstanceOf[GenericRecord]
val cdcAfterValue = cdcRecord.get("after").asInstanceOf[GenericRecord]
// check before
assert(deletedKeys.exists(_.getRecordKey == cdcBeforeValue.get("_row_key").toString))
// check after
assertNull(cdcAfterValue)
}
}
}

0 comments on commit 2f8e221

Please sign in to comment.