From 2f8e22123894ff370749d341b1a4d5059f0a5844 Mon Sep 17 00:00:00 2001 From: Yann Date: Thu, 22 Sep 2022 11:52:40 +0800 Subject: [PATCH] update --- .../org/apache/hudi/avro/AvroSchemaUtils.java | 3 +- .../cdc/TestCDCDataFrameSuite.scala | 114 ++++++++++++++++-- 2 files changed, 102 insertions(+), 15 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java index fb68507b12dbc..6a87cc3c29907 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java @@ -105,8 +105,7 @@ public static Schema resolveNullableSchema(Schema schema) { * wrapping around provided target non-null type */ public static Schema createNullableSchema(Schema.Type avroType) { - checkState(avroType != Schema.Type.NULL); - return Schema.createUnion(Schema.create(Schema.Type.NULL), Schema.create(avroType)); + return createNullableSchema(Schema.create(avroType)); } public static Schema createNullableSchema(Schema schema) { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala index c10e9cd632683..77c729922ac97 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala @@ -19,18 +19,19 @@ package org.apache.hudi.functional.cdc import org.apache.avro.Schema -import org.apache.avro.generic.IndexedRecord +import org.apache.avro.generic.{GenericRecord, IndexedRecord} import org.apache.hadoop.fs.Path import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.common.config.HoodieMetadataConfig -import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieLogFile} -import org.apache.hudi.common.table.cdc.{HoodieCDCSupplementalLoggingMode, HoodieCDCUtils} +import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieKey, HoodieLogFile, HoodieRecord} +import org.apache.hudi.common.table.cdc.{HoodieCDCOperation, HoodieCDCSupplementalLoggingMode, HoodieCDCUtils} import org.apache.hudi.common.table.log.HoodieLogFormat -import org.apache.hudi.common.table.log.block.{HoodieDataBlock, HoodieLogBlock} +import org.apache.hudi.common.table.log.block.{HoodieDataBlock} import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.table.timeline.HoodieInstant +import org.apache.hudi.common.testutils.RawTripTestPayload import org.apache.hudi.common.testutils.RawTripTestPayload.{deleteRecordsToStrings, recordsToStrings} import org.apache.hudi.config.{HoodieCleanConfig, HoodieWriteConfig} import org.apache.hudi.testutils.HoodieClientTestBase @@ -39,7 +40,7 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.SaveMode import org.junit.jupiter.api.{AfterEach, BeforeEach} -import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue} +import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNotEquals, assertNull, assertTrue} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.CsvSource @@ -109,7 +110,8 @@ class TestCDCDataFrameSuite extends HoodieClientTestBase { HoodieCDCSupplementalLoggingMode.parse(cdcSupplementalLoggingMode), dataSchema) // Upsert Operation - val records2 = recordsToStrings(dataGen.generateUniqueUpdates("001", 50)).toList + val hoodieRecords2 = dataGen.generateUniqueUpdates("001", 50) + val records2 = recordsToStrings(hoodieRecords2).toList val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2)) inputDF2.write.format("org.apache.hudi") .options(options) @@ -120,11 +122,18 @@ class TestCDCDataFrameSuite extends HoodieClientTestBase { // part of data are updated, it will write out cdc log files assertTrue(hasCDCLogFile(instant2)) - val cdcData2 = getCDCLogFIle(instant2).flatMap(readCDCLogFile(_, cdcSchema)) + val cdcData2: Seq[IndexedRecord] = getCDCLogFIle(instant2).flatMap(readCDCLogFile(_, cdcSchema)) + // check the num of cdc data assertEquals(cdcData2.size, 50) + // check op + assert(cdcData2.forall( r => r.get(0).toString == "u")) + // check record key, before, after according to the supplemental logging mode + checkCDCDataForInsertOrUpdate(cdcSupplementalLoggingMode, cdcSchema, dataSchema, + cdcData2, hoodieRecords2, HoodieCDCOperation.UPDATE) // Delete Operation - val records3 = deleteRecordsToStrings(dataGen.generateUniqueDeletes(20)).toList + val hoodieKey3 = dataGen.generateUniqueDeletes(20) + val records3 = deleteRecordsToStrings(hoodieKey3).toList val inputDF3 = spark.read.json(spark.sparkContext.parallelize(records3, 2)) inputDF3.write.format("org.apache.hudi") .options(options) @@ -137,7 +146,12 @@ class TestCDCDataFrameSuite extends HoodieClientTestBase { // part of data are deleted, it will write out cdc log files assertTrue(hasCDCLogFile(instant3)) val cdcData3 = getCDCLogFIle(instant3).flatMap(readCDCLogFile(_, cdcSchema)) + // check the num of cdc data assertEquals(cdcData3.size, 20) + // check op + assert(cdcData3.forall( r => r.get(0).toString == "d")) + // check record key, before, after according to the supplemental logging mode + checkCDCDataForDelete(cdcSupplementalLoggingMode, cdcSchema, cdcData3, hoodieKey3) } @ParameterizedTest @@ -172,11 +186,11 @@ class TestCDCDataFrameSuite extends HoodieClientTestBase { assertFalse(hasCDCLogFile(instant1)) // 2. Upsert Operation - val records2 = recordsToStrings(dataGen.generateUniqueUpdates("001", 30)).toList - val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2)) - val records22 = recordsToStrings(dataGen.generateInserts("001", 20)).toList - val inputDF22 = spark.read.json(spark.sparkContext.parallelize(records22, 2)) - inputDF2.union(inputDF22).write.format("org.apache.hudi") + val records2_1 = recordsToStrings(dataGen.generateUniqueUpdates("001", 30)).toList + val inputDF2_1 = spark.read.json(spark.sparkContext.parallelize(records2_1, 2)) + val records2_2 = recordsToStrings(dataGen.generateInserts("001", 20)).toList + val inputDF2_2 = spark.read.json(spark.sparkContext.parallelize(records2_2, 2)) + inputDF2_1.union(inputDF2_2).write.format("org.apache.hudi") .options(options) .mode(SaveMode.Append) .save(basePath) @@ -186,6 +200,9 @@ class TestCDCDataFrameSuite extends HoodieClientTestBase { assertTrue(hasCDCLogFile(instant2)) val cdcData2 = getCDCLogFIle(instant2).flatMap(readCDCLogFile(_, cdcSchema)) assertEquals(cdcData2.size, 50) + // check op + assertEquals(cdcData2.count(r => r.get(0).toString == "u"), 30) + assertEquals(cdcData2.count(r => r.get(0).toString == "i"), 20) // 3. Delete Operation val records3 = deleteRecordsToStrings(dataGen.generateUniqueDeletes(20)).toList @@ -235,4 +252,75 @@ class TestCDCDataFrameSuite extends HoodieClientTestBase { val block = reader.next().asInstanceOf[HoodieDataBlock]; block.getRecordIterator.asScala.toList } + + private def checkCDCDataForInsertOrUpdate(cdcSupplementalLoggingMode: String, + cdcSchema: Schema, + dataSchema: Schema, + cdcRecords: Seq[IndexedRecord], + newHoodieRecords: java.util.List[HoodieRecord[_]], + op: HoodieCDCOperation): Unit = { + val cdcRecord = cdcRecords.head.asInstanceOf[GenericRecord] + // check schema + assertEquals(cdcRecord.getSchema, cdcSchema) + if (cdcSupplementalLoggingMode == "cdc_op_key") { + // check record key + assert(cdcRecords.map(_.get(1).toString).sorted == newHoodieRecords.map(_.getKey.getRecordKey).sorted) + } else if (cdcSupplementalLoggingMode == "cdc_data_before") { + // check record key + assert(cdcRecords.map(_.get(1).toString).sorted == newHoodieRecords.map(_.getKey.getRecordKey).sorted) + // check before + if (op == HoodieCDCOperation.INSERT) { + assertNull(cdcRecord.get("before")) + } else { + val payload = newHoodieRecords.find(_.getKey.getRecordKey == cdcRecord.get("record_key").toString).get + .getData.asInstanceOf[RawTripTestPayload] + val genericRecord = payload.getInsertValue(dataSchema).get.asInstanceOf[GenericRecord] + val cdcBeforeValue = cdcRecord.get("before").asInstanceOf[GenericRecord] + assertNotEquals(genericRecord.get("begin_lat"), cdcBeforeValue.get("begin_lat")) + } + } else { + val cdcBeforeValue = cdcRecord.get("before").asInstanceOf[GenericRecord] + val cdcAfterValue = cdcRecord.get("after").asInstanceOf[GenericRecord] + if (op == HoodieCDCOperation.INSERT) { + // check before + assertNull(cdcBeforeValue) + // check after + val payload = newHoodieRecords.find(_.getKey.getRecordKey == cdcAfterValue.get("_row_key").toString).get + .getData.asInstanceOf[RawTripTestPayload] + val genericRecord = payload.getInsertValue(dataSchema).get.asInstanceOf[GenericRecord] + assertEquals(genericRecord.get("begin_lat"), cdcAfterValue.get("begin_lat")) + } else { + val payload = newHoodieRecords.find(_.getKey.getRecordKey == cdcAfterValue.get("_row_key").toString).get + .getData.asInstanceOf[RawTripTestPayload] + val genericRecord = payload.getInsertValue(dataSchema).get.asInstanceOf[GenericRecord] + // check before + assertNotEquals(genericRecord.get("begin_lat"), cdcBeforeValue.get("begin_lat")) + // check after + assertEquals(genericRecord.get("begin_lat"), cdcAfterValue.get("begin_lat")) + } + } + } + + private def checkCDCDataForDelete(cdcSupplementalLoggingMode: String, + cdcSchema: Schema, + cdcRecords: Seq[IndexedRecord], + deletedKeys: java.util.List[HoodieKey]): Unit = { + val cdcRecord = cdcRecords.head.asInstanceOf[GenericRecord] + // check schema + assertEquals(cdcRecord.getSchema, cdcSchema) + if (cdcSupplementalLoggingMode == "cdc_op_key") { + // check record key + assert(cdcRecords.map(_.get(1).toString).sorted == deletedKeys.map(_.getRecordKey).sorted) + } else if (cdcSupplementalLoggingMode == "cdc_data_before") { + // check record key + assert(cdcRecords.map(_.get(1).toString).sorted == deletedKeys.map(_.getRecordKey).sorted) + } else { + val cdcBeforeValue = cdcRecord.get("before").asInstanceOf[GenericRecord] + val cdcAfterValue = cdcRecord.get("after").asInstanceOf[GenericRecord] + // check before + assert(deletedKeys.exists(_.getRecordKey == cdcBeforeValue.get("_row_key").toString)) + // check after + assertNull(cdcAfterValue) + } + } }