From 3a8d64e584295dd3979a2d893497d90070d15621 Mon Sep 17 00:00:00 2001 From: Raymond Xu <2701446+xushiyan@users.noreply.github.com> Date: Fri, 26 Nov 2021 19:59:20 -0800 Subject: [PATCH] [HUDI-2868] Fix skipped HoodieSparkSqlWriterSuite (#4125) - Co-authored-by: Yann Byron --- .../hudi/HoodieSparkSqlWriterSuite2.scala | 47 ------------- .../TestGenericRecordAndRowConsistency.scala | 26 ++----- ...e.scala => TestHoodieSparkSqlWriter.scala} | 67 ++++++++++++------- style/scalastyle.xml | 2 +- 4 files changed, 52 insertions(+), 90 deletions(-) delete mode 100644 hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/HoodieSparkSqlWriterSuite2.scala rename hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/{HoodieSparkSqlWriterSuite.scala => TestHoodieSparkSqlWriter.scala} (94%) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/HoodieSparkSqlWriterSuite2.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/HoodieSparkSqlWriterSuite2.scala deleted file mode 100644 index e64f96ff8d0f..000000000000 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/HoodieSparkSqlWriterSuite2.scala +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi - -import org.apache.hudi.config.HoodieWriteConfig -import org.apache.hudi.keygen.{ComplexKeyGenerator, SimpleKeyGenerator} - -import org.apache.spark.sql.hudi.command.SqlKeyGenerator - -import org.junit.jupiter.api.Assertions.assertTrue -import org.junit.jupiter.api.Test - -class HoodieSparkSqlWriterSuite2 { - - @Test - def testGetOriginKeyGenerator(): Unit = { - // for dataframe write - val m1 = Map( - HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key -> classOf[ComplexKeyGenerator].getName - ) - val kg1 = HoodieWriterUtils.getOriginKeyGenerator(m1) - assertTrue(kg1 == classOf[ComplexKeyGenerator].getName) - - // for sql write - val m2 = Map( - HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getName, - SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME -> classOf[SimpleKeyGenerator].getName - ) - val kg2 = HoodieWriterUtils.getOriginKeyGenerator(m2) - assertTrue(kg2 == classOf[SimpleKeyGenerator].getName) - } -} diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestGenericRecordAndRowConsistency.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestGenericRecordAndRowConsistency.scala index 2caf4cc20eaa..985bf2e9408b 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestGenericRecordAndRowConsistency.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestGenericRecordAndRowConsistency.scala @@ -18,17 +18,15 @@ package org.apache.hudi -import java.sql.{Date, Timestamp} - import org.apache.hudi.config.HoodieWriteConfig -import org.apache.hudi.testutils.HoodieClientTestBase -import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.hudi.testutils.SparkClientFunctionalTestHarness +import org.apache.spark.sql.DataFrame +import org.junit.jupiter.api.Test -import org.junit.jupiter.api.{BeforeEach, Test} +import java.sql.{Date, Timestamp} -class TestGenericRecordAndRowConsistency extends HoodieClientTestBase { +class TestGenericRecordAndRowConsistency extends SparkClientFunctionalTestHarness { - var spark: SparkSession = _ val commonOpts = Map( HoodieWriteConfig.TBL_NAME.key -> "hoodie_type_consistency_tbl", "hoodie.insert.shuffle.parallelism" -> "1", @@ -40,16 +38,6 @@ class TestGenericRecordAndRowConsistency extends HoodieClientTestBase { DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> "org.apache.hudi.keygen.ComplexKeyGenerator" ) - /** - * Setup method running before each test. - */ - @BeforeEach override def setUp() { - setTableName("hoodie_type_consistency_tbl") - initPath() - initSparkContexts() - spark = sqlContext.sparkSession - } - @Test def testTimestampTypeConsistency(): Unit = { val _spark = spark @@ -60,7 +48,7 @@ class TestGenericRecordAndRowConsistency extends HoodieClientTestBase { (1, Timestamp.valueOf("2014-11-30 12:40:32"), "abc"), (2, Timestamp.valueOf("2016-12-29 09:54:00"), "def"), (2, Timestamp.valueOf("2016-05-09 10:12:43"), "def") - ).toDF("typeId","eventTime", "str") + ).toDF("typeId", "eventTime", "str") testConsistencyBetweenGenericRecordAndRow(df) } @@ -75,7 +63,7 @@ class TestGenericRecordAndRowConsistency extends HoodieClientTestBase { (1, Date.valueOf("2014-11-30"), "abc"), (2, Date.valueOf("2016-12-29"), "def"), (2, Date.valueOf("2016-05-09"), "def") - ).toDF("typeId","eventTime", "str") + ).toDF("typeId", "eventTime", "str") testConsistencyBetweenGenericRecordAndRow(df) } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/HoodieSparkSqlWriterSuite.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala similarity index 94% rename from hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/HoodieSparkSqlWriterSuite.scala rename to hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala index ad372c462064..fbdfb699bb89 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/HoodieSparkSqlWriterSuite.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala @@ -19,10 +19,10 @@ package org.apache.hudi import org.apache.commons.io.FileUtils import org.apache.hadoop.fs.Path -import org.apache.hudi.DataSourceWriteOptions.{INSERT_DROP_DUPS, INSERT_OPERATION_OPT_VAL, KEYGENERATOR_CLASS_NAME, MOR_TABLE_TYPE_OPT_VAL, OPERATION, PAYLOAD_CLASS_NAME, PRECOMBINE_FIELD, RECORDKEY_FIELD, TABLE_TYPE} +import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.client.SparkRDDWriteClient import org.apache.hudi.common.config.HoodieConfig -import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord, HoodieRecordPayload, HoodieTableType, WriteOperationType} +import org.apache.hudi.common.model._ import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.testutils.HoodieTestDataGenerator import org.apache.hudi.config.{HoodieBootstrapConfig, HoodieWriteConfig} @@ -35,10 +35,12 @@ import org.apache.hudi.testutils.DataSourceTestUtils import org.apache.spark.SparkContext import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.sql.functions.{expr, lit} +import org.apache.spark.sql.hudi.HoodieSparkSessionExtension +import org.apache.spark.sql.hudi.command.SqlKeyGenerator import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} import org.apache.spark.sql.{DataFrame, Dataset, Row, SQLContext, SaveMode, SparkSession} import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue, fail} -import org.junit.jupiter.api.{AfterEach, Assertions, BeforeEach, Test} +import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.{CsvSource, EnumSource, ValueSource} import org.mockito.ArgumentMatchers.any @@ -47,15 +49,13 @@ import org.scalatest.Matchers.{assertResult, be, convertToAnyShouldWrapper, inte import java.time.Instant import java.util.{Collections, Date, UUID} - import scala.collection.JavaConversions._ import scala.collection.JavaConverters -import scala.util.control.NonFatal /** * Test suite for SparkSqlWriter class. */ -class HoodieSparkSqlWriterSuite { +class TestHoodieSparkSqlWriter { var spark: SparkSession = _ var sqlContext: SQLContext = _ var sc: SparkContext = _ @@ -70,7 +70,7 @@ class HoodieSparkSqlWriterSuite { * Setup method running before each test. */ @BeforeEach - def setUp() { + def setUp(): Unit = { initSparkContext() tempPath = java.nio.file.Files.createTempDirectory("hoodie_test_path") tempBootStrapPath = java.nio.file.Files.createTempDirectory("hoodie_test_bootstrap") @@ -95,6 +95,7 @@ class HoodieSparkSqlWriterSuite { spark = SparkSession.builder() .appName(hoodieFooTableName) .master("local[2]") + .withExtensions(new HoodieSparkSessionExtension) .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .getOrCreate() sc = spark.sparkContext @@ -250,12 +251,14 @@ class HoodieSparkSqlWriterSuite { "hoodie.insert.shuffle.parallelism" -> "4", "hoodie.upsert.shuffle.parallelism" -> "4") val dataFrame2 = spark.createDataFrame(Seq(StringLongTest(UUID.randomUUID().toString, new Date().getTime))) val tableAlreadyExistException = intercept[HoodieException](HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, barTableModifier, dataFrame2)) - assert(tableAlreadyExistException.getMessage.contains("hoodie table with name " + hoodieFooTableName + " already exist")) + assert(tableAlreadyExistException.getMessage.contains("Config conflict")) + assert(tableAlreadyExistException.getMessage.contains(s"${HoodieWriteConfig.TBL_NAME.key}:\thoodie_bar_tbl\thoodie_foo_tbl")) //on same path try append with delete operation and different("hoodie_bar_tbl") table name which should throw an exception val deleteTableModifier = barTableModifier ++ Map(OPERATION.key -> "delete") val deleteCmdException = intercept[HoodieException](HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, deleteTableModifier, dataFrame2)) - assert(deleteCmdException.getMessage.contains("hoodie table with name " + hoodieFooTableName + " already exist")) + assert(tableAlreadyExistException.getMessage.contains("Config conflict")) + assert(tableAlreadyExistException.getMessage.contains(s"${HoodieWriteConfig.TBL_NAME.key}:\thoodie_bar_tbl\thoodie_foo_tbl")) } /** @@ -266,7 +269,7 @@ class HoodieSparkSqlWriterSuite { @ParameterizedTest @EnumSource(value = classOf[BulkInsertSortMode]) def testBulkInsertForSortMode(sortMode: BulkInsertSortMode): Unit = { - testBulkInsertWithSortMode(sortMode, true) + testBulkInsertWithSortMode(sortMode, populateMetaFields = true) } /** @@ -287,12 +290,13 @@ class HoodieSparkSqlWriterSuite { @Test def testDisableAndEnableMetaFields(): Unit = { try { - testBulkInsertWithSortMode(BulkInsertSortMode.NONE, false) + testBulkInsertWithSortMode(BulkInsertSortMode.NONE, populateMetaFields = false) //create a new table val fooTableModifier = commonTableModifier.updated("hoodie.bulkinsert.shuffle.parallelism", "4") .updated(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL) .updated(DataSourceWriteOptions.ENABLE_ROW_WRITER.key, "true") .updated(HoodieWriteConfig.BULK_INSERT_SORT_MODE.key(), BulkInsertSortMode.NONE.name()) + .updated(HoodieTableConfig.POPULATE_META_FIELDS.key(), "true") // generate the inserts val schema = DataSourceTestUtils.getStructTypeExampleSchema @@ -302,9 +306,10 @@ class HoodieSparkSqlWriterSuite { try { // write to Hudi HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableModifier, df) - Assertions.fail("Should have thrown exception") + fail("Should have thrown exception") } catch { - case e: HoodieException => assertTrue(e.getMessage.contains("hoodie.populate.meta.fields already disabled for the table. Can't be re-enabled back")) + case e: HoodieException => assertTrue(e.getMessage.startsWith("Config conflict")) + case e: Exception => fail(e); } } } @@ -439,7 +444,7 @@ class HoodieSparkSqlWriterSuite { val records = DataSourceTestUtils.generateRandomRows(100) val recordsSeq = convertRowListToSeq(records) val df = spark.createDataFrame(sc.parallelize(recordsSeq), structType) - initializeMetaClientForBootstrap(fooTableParams, tableType, false) + initializeMetaClientForBootstrap(fooTableParams, tableType, addBootstrapPath = false) val client = spy(DataSourceUtils.createHoodieClient( new JavaSparkContext(sc), modifiedSchema.toString, tempBasePath, hoodieFooTableName, mapAsJavaMap(fooTableParams)).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]) @@ -496,7 +501,7 @@ class HoodieSparkSqlWriterSuite { DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition", HoodieBootstrapConfig.KEYGEN_CLASS_NAME.key -> classOf[NonpartitionedKeyGenerator].getCanonicalName) val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier) - initializeMetaClientForBootstrap(fooTableParams, tableType, true) + initializeMetaClientForBootstrap(fooTableParams, tableType, addBootstrapPath = true) val client = spy(DataSourceUtils.createHoodieClient( new JavaSparkContext(sc), @@ -526,7 +531,8 @@ class HoodieSparkSqlWriterSuite { .setTableType(tableType) .setTableName(hoodieFooTableName) .setRecordKeyFields(fooTableParams(DataSourceWriteOptions.RECORDKEY_FIELD.key)) - .setBaseFileFormat(HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().name()) + .setBaseFileFormat(fooTableParams.getOrElse(HoodieWriteConfig.BASE_FILE_FORMAT.key, + HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().name)) .setArchiveLogFolder(HoodieTableConfig.ARCHIVELOG_FOLDER.defaultValue()) .setPayloadClassName(fooTableParams(PAYLOAD_CLASS_NAME.key)) .setPreCombineField(fooTableParams(PRECOMBINE_FIELD.key)) @@ -873,18 +879,15 @@ class HoodieSparkSqlWriterSuite { val df2 = Seq((2, "a2", 20, 1000, "2021-10-16")).toDF("id", "name", "value", "ts", "dt") // raise exception when use params which is not same with HoodieTableConfig - try { + val configConflictException = intercept[HoodieException] { df2.write.format("hudi") .options(options) .option(HoodieWriteConfig.TBL_NAME.key, tableName2) .option(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key, classOf[ComplexKeyGenerator].getName) .mode(SaveMode.Append).save(tablePath2) - } catch { - case NonFatal(e) => - assert(e.getMessage.contains("Config conflict")) - assert(e.getMessage.contains( - s"${HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key}\t${classOf[ComplexKeyGenerator].getName}\t${classOf[SimpleKeyGenerator].getName}")) } + assert(configConflictException.getMessage.contains("Config conflict")) + assert(configConflictException.getMessage.contains(s"KeyGenerator:\t${classOf[ComplexKeyGenerator].getName}\t${classOf[SimpleKeyGenerator].getName}")) // do not need to specify hiveStylePartitioning/urlEncodePartitioning/KeyGenerator params df2.write.format("hudi") @@ -893,6 +896,24 @@ class HoodieSparkSqlWriterSuite { .mode(SaveMode.Append).save(tablePath2) val data = spark.read.format("hudi").load(tablePath2 + "/*") assert(data.count() == 2) - assert(data.select("_hoodie_partition_path").map(_.getString(0)).distinct.collect.head == "dt=2021-10-16") + assert(data.select("_hoodie_partition_path").map(_.getString(0)).distinct.collect.head == "2021-10-16") + } + + @Test + def testGetOriginKeyGenerator(): Unit = { + // for dataframe write + val m1 = Map( + HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key -> classOf[ComplexKeyGenerator].getName + ) + val kg1 = HoodieWriterUtils.getOriginKeyGenerator(m1) + assertTrue(kg1 == classOf[ComplexKeyGenerator].getName) + + // for sql write + val m2 = Map( + HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getName, + SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME -> classOf[SimpleKeyGenerator].getName + ) + val kg2 = HoodieWriterUtils.getOriginKeyGenerator(m2) + assertTrue(kg2 == classOf[SimpleKeyGenerator].getName) } } diff --git a/style/scalastyle.xml b/style/scalastyle.xml index 89306f36e1c3..2ba4042be0ca 100644 --- a/style/scalastyle.xml +++ b/style/scalastyle.xml @@ -27,7 +27,7 @@ - +