Skip to content

Commit

Permalink
[HUDI-2868] Fix skipped HoodieSparkSqlWriterSuite (#4125)
Browse files Browse the repository at this point in the history
- Co-authored-by: Yann Byron <biyan900116@gmail.com>
  • Loading branch information
xushiyan committed Nov 27, 2021
1 parent 9c059ef commit 3a8d64e
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 90 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,15 @@

package org.apache.hudi

import java.sql.{Date, Timestamp}

import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.testutils.HoodieClientTestBase
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
import org.apache.spark.sql.DataFrame
import org.junit.jupiter.api.Test

import org.junit.jupiter.api.{BeforeEach, Test}
import java.sql.{Date, Timestamp}

class TestGenericRecordAndRowConsistency extends HoodieClientTestBase {
class TestGenericRecordAndRowConsistency extends SparkClientFunctionalTestHarness {

var spark: SparkSession = _
val commonOpts = Map(
HoodieWriteConfig.TBL_NAME.key -> "hoodie_type_consistency_tbl",
"hoodie.insert.shuffle.parallelism" -> "1",
Expand All @@ -40,16 +38,6 @@ class TestGenericRecordAndRowConsistency extends HoodieClientTestBase {
DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> "org.apache.hudi.keygen.ComplexKeyGenerator"
)

/**
* Setup method running before each test.
*/
@BeforeEach override def setUp() {
setTableName("hoodie_type_consistency_tbl")
initPath()
initSparkContexts()
spark = sqlContext.sparkSession
}

@Test
def testTimestampTypeConsistency(): Unit = {
val _spark = spark
Expand All @@ -60,7 +48,7 @@ class TestGenericRecordAndRowConsistency extends HoodieClientTestBase {
(1, Timestamp.valueOf("2014-11-30 12:40:32"), "abc"),
(2, Timestamp.valueOf("2016-12-29 09:54:00"), "def"),
(2, Timestamp.valueOf("2016-05-09 10:12:43"), "def")
).toDF("typeId","eventTime", "str")
).toDF("typeId", "eventTime", "str")

testConsistencyBetweenGenericRecordAndRow(df)
}
Expand All @@ -75,7 +63,7 @@ class TestGenericRecordAndRowConsistency extends HoodieClientTestBase {
(1, Date.valueOf("2014-11-30"), "abc"),
(2, Date.valueOf("2016-12-29"), "def"),
(2, Date.valueOf("2016-05-09"), "def")
).toDF("typeId","eventTime", "str")
).toDF("typeId", "eventTime", "str")

testConsistencyBetweenGenericRecordAndRow(df)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ package org.apache.hudi

import org.apache.commons.io.FileUtils
import org.apache.hadoop.fs.Path
import org.apache.hudi.DataSourceWriteOptions.{INSERT_DROP_DUPS, INSERT_OPERATION_OPT_VAL, KEYGENERATOR_CLASS_NAME, MOR_TABLE_TYPE_OPT_VAL, OPERATION, PAYLOAD_CLASS_NAME, PRECOMBINE_FIELD, RECORDKEY_FIELD, TABLE_TYPE}
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.client.SparkRDDWriteClient
import org.apache.hudi.common.config.HoodieConfig
import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord, HoodieRecordPayload, HoodieTableType, WriteOperationType}
import org.apache.hudi.common.model._
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.testutils.HoodieTestDataGenerator
import org.apache.hudi.config.{HoodieBootstrapConfig, HoodieWriteConfig}
Expand All @@ -35,10 +35,12 @@ import org.apache.hudi.testutils.DataSourceTestUtils
import org.apache.spark.SparkContext
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.functions.{expr, lit}
import org.apache.spark.sql.hudi.HoodieSparkSessionExtension
import org.apache.spark.sql.hudi.command.SqlKeyGenerator
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SQLContext, SaveMode, SparkSession}
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue, fail}
import org.junit.jupiter.api.{AfterEach, Assertions, BeforeEach, Test}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{CsvSource, EnumSource, ValueSource}
import org.mockito.ArgumentMatchers.any
Expand All @@ -47,15 +49,13 @@ import org.scalatest.Matchers.{assertResult, be, convertToAnyShouldWrapper, inte

import java.time.Instant
import java.util.{Collections, Date, UUID}

import scala.collection.JavaConversions._
import scala.collection.JavaConverters
import scala.util.control.NonFatal

/**
* Test suite for SparkSqlWriter class.
*/
class HoodieSparkSqlWriterSuite {
class TestHoodieSparkSqlWriter {
var spark: SparkSession = _
var sqlContext: SQLContext = _
var sc: SparkContext = _
Expand All @@ -70,7 +70,7 @@ class HoodieSparkSqlWriterSuite {
* Setup method running before each test.
*/
@BeforeEach
def setUp() {
def setUp(): Unit = {
initSparkContext()
tempPath = java.nio.file.Files.createTempDirectory("hoodie_test_path")
tempBootStrapPath = java.nio.file.Files.createTempDirectory("hoodie_test_bootstrap")
Expand All @@ -95,6 +95,7 @@ class HoodieSparkSqlWriterSuite {
spark = SparkSession.builder()
.appName(hoodieFooTableName)
.master("local[2]")
.withExtensions(new HoodieSparkSessionExtension)
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.getOrCreate()
sc = spark.sparkContext
Expand Down Expand Up @@ -250,12 +251,14 @@ class HoodieSparkSqlWriterSuite {
"hoodie.insert.shuffle.parallelism" -> "4", "hoodie.upsert.shuffle.parallelism" -> "4")
val dataFrame2 = spark.createDataFrame(Seq(StringLongTest(UUID.randomUUID().toString, new Date().getTime)))
val tableAlreadyExistException = intercept[HoodieException](HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, barTableModifier, dataFrame2))
assert(tableAlreadyExistException.getMessage.contains("hoodie table with name " + hoodieFooTableName + " already exist"))
assert(tableAlreadyExistException.getMessage.contains("Config conflict"))
assert(tableAlreadyExistException.getMessage.contains(s"${HoodieWriteConfig.TBL_NAME.key}:\thoodie_bar_tbl\thoodie_foo_tbl"))

//on same path try append with delete operation and different("hoodie_bar_tbl") table name which should throw an exception
val deleteTableModifier = barTableModifier ++ Map(OPERATION.key -> "delete")
val deleteCmdException = intercept[HoodieException](HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, deleteTableModifier, dataFrame2))
assert(deleteCmdException.getMessage.contains("hoodie table with name " + hoodieFooTableName + " already exist"))
assert(tableAlreadyExistException.getMessage.contains("Config conflict"))
assert(tableAlreadyExistException.getMessage.contains(s"${HoodieWriteConfig.TBL_NAME.key}:\thoodie_bar_tbl\thoodie_foo_tbl"))
}

/**
Expand All @@ -266,7 +269,7 @@ class HoodieSparkSqlWriterSuite {
@ParameterizedTest
@EnumSource(value = classOf[BulkInsertSortMode])
def testBulkInsertForSortMode(sortMode: BulkInsertSortMode): Unit = {
testBulkInsertWithSortMode(sortMode, true)
testBulkInsertWithSortMode(sortMode, populateMetaFields = true)
}

/**
Expand All @@ -287,12 +290,13 @@ class HoodieSparkSqlWriterSuite {
@Test
def testDisableAndEnableMetaFields(): Unit = {
try {
testBulkInsertWithSortMode(BulkInsertSortMode.NONE, false)
testBulkInsertWithSortMode(BulkInsertSortMode.NONE, populateMetaFields = false)
//create a new table
val fooTableModifier = commonTableModifier.updated("hoodie.bulkinsert.shuffle.parallelism", "4")
.updated(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
.updated(DataSourceWriteOptions.ENABLE_ROW_WRITER.key, "true")
.updated(HoodieWriteConfig.BULK_INSERT_SORT_MODE.key(), BulkInsertSortMode.NONE.name())
.updated(HoodieTableConfig.POPULATE_META_FIELDS.key(), "true")

// generate the inserts
val schema = DataSourceTestUtils.getStructTypeExampleSchema
Expand All @@ -302,9 +306,10 @@ class HoodieSparkSqlWriterSuite {
try {
// write to Hudi
HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableModifier, df)
Assertions.fail("Should have thrown exception")
fail("Should have thrown exception")
} catch {
case e: HoodieException => assertTrue(e.getMessage.contains("hoodie.populate.meta.fields already disabled for the table. Can't be re-enabled back"))
case e: HoodieException => assertTrue(e.getMessage.startsWith("Config conflict"))
case e: Exception => fail(e);
}
}
}
Expand Down Expand Up @@ -439,7 +444,7 @@ class HoodieSparkSqlWriterSuite {
val records = DataSourceTestUtils.generateRandomRows(100)
val recordsSeq = convertRowListToSeq(records)
val df = spark.createDataFrame(sc.parallelize(recordsSeq), structType)
initializeMetaClientForBootstrap(fooTableParams, tableType, false)
initializeMetaClientForBootstrap(fooTableParams, tableType, addBootstrapPath = false)
val client = spy(DataSourceUtils.createHoodieClient(
new JavaSparkContext(sc), modifiedSchema.toString, tempBasePath, hoodieFooTableName,
mapAsJavaMap(fooTableParams)).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]])
Expand Down Expand Up @@ -496,7 +501,7 @@ class HoodieSparkSqlWriterSuite {
DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
HoodieBootstrapConfig.KEYGEN_CLASS_NAME.key -> classOf[NonpartitionedKeyGenerator].getCanonicalName)
val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier)
initializeMetaClientForBootstrap(fooTableParams, tableType, true)
initializeMetaClientForBootstrap(fooTableParams, tableType, addBootstrapPath = true)

val client = spy(DataSourceUtils.createHoodieClient(
new JavaSparkContext(sc),
Expand Down Expand Up @@ -526,7 +531,8 @@ class HoodieSparkSqlWriterSuite {
.setTableType(tableType)
.setTableName(hoodieFooTableName)
.setRecordKeyFields(fooTableParams(DataSourceWriteOptions.RECORDKEY_FIELD.key))
.setBaseFileFormat(HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().name())
.setBaseFileFormat(fooTableParams.getOrElse(HoodieWriteConfig.BASE_FILE_FORMAT.key,
HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().name))
.setArchiveLogFolder(HoodieTableConfig.ARCHIVELOG_FOLDER.defaultValue())
.setPayloadClassName(fooTableParams(PAYLOAD_CLASS_NAME.key))
.setPreCombineField(fooTableParams(PRECOMBINE_FIELD.key))
Expand Down Expand Up @@ -873,18 +879,15 @@ class HoodieSparkSqlWriterSuite {

val df2 = Seq((2, "a2", 20, 1000, "2021-10-16")).toDF("id", "name", "value", "ts", "dt")
// raise exception when use params which is not same with HoodieTableConfig
try {
val configConflictException = intercept[HoodieException] {
df2.write.format("hudi")
.options(options)
.option(HoodieWriteConfig.TBL_NAME.key, tableName2)
.option(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key, classOf[ComplexKeyGenerator].getName)
.mode(SaveMode.Append).save(tablePath2)
} catch {
case NonFatal(e) =>
assert(e.getMessage.contains("Config conflict"))
assert(e.getMessage.contains(
s"${HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key}\t${classOf[ComplexKeyGenerator].getName}\t${classOf[SimpleKeyGenerator].getName}"))
}
assert(configConflictException.getMessage.contains("Config conflict"))
assert(configConflictException.getMessage.contains(s"KeyGenerator:\t${classOf[ComplexKeyGenerator].getName}\t${classOf[SimpleKeyGenerator].getName}"))

// do not need to specify hiveStylePartitioning/urlEncodePartitioning/KeyGenerator params
df2.write.format("hudi")
Expand All @@ -893,6 +896,24 @@ class HoodieSparkSqlWriterSuite {
.mode(SaveMode.Append).save(tablePath2)
val data = spark.read.format("hudi").load(tablePath2 + "/*")
assert(data.count() == 2)
assert(data.select("_hoodie_partition_path").map(_.getString(0)).distinct.collect.head == "dt=2021-10-16")
assert(data.select("_hoodie_partition_path").map(_.getString(0)).distinct.collect.head == "2021-10-16")
}

@Test
def testGetOriginKeyGenerator(): Unit = {
// for dataframe write
val m1 = Map(
HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key -> classOf[ComplexKeyGenerator].getName
)
val kg1 = HoodieWriterUtils.getOriginKeyGenerator(m1)
assertTrue(kg1 == classOf[ComplexKeyGenerator].getName)

// for sql write
val m2 = Map(
HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getName,
SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME -> classOf[SimpleKeyGenerator].getName
)
val kg2 = HoodieWriterUtils.getOriginKeyGenerator(m2)
assertTrue(kg2 == classOf[SimpleKeyGenerator].getName)
}
}
2 changes: 1 addition & 1 deletion style/scalastyle.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
<check level="error" class="org.scalastyle.file.FileTabChecker" enabled="true"/>
<check level="error" class="org.scalastyle.file.FileLengthChecker" enabled="true">
<parameters>
<parameter name="maxFileLength"><![CDATA[900]]></parameter>
<parameter name="maxFileLength"><![CDATA[1000]]></parameter>
</parameters>
</check>
<check level="error" class="org.scalastyle.scalariform.SpacesAfterPlusChecker" enabled="true"/>
Expand Down

0 comments on commit 3a8d64e

Please sign in to comment.