Skip to content

Commit

Permalink
[HUDI-2759] [MINOR] solve comments
Browse files Browse the repository at this point in the history
  • Loading branch information
YannByron committed Nov 23, 2021
1 parent dcc13b8 commit be763a2
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@

package org.apache.spark.sql.catalyst.catalog

import org.apache.hudi.HoodieWriterUtils.{convertMapToHoodieConfig, validateTableConfig}
import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieTableType}
import org.apache.hudi.HoodieWriterUtils._
import org.apache.hudi.common.config.DFSPropertiesConfiguration
import org.apache.hudi.common.model.HoodieTableType
import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.util.ValidationUtils
import org.apache.hudi.keygen.ComplexKeyGenerator
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory

Expand All @@ -46,6 +48,11 @@ class HoodieCatalogTable(val spark: SparkSession, val table: CatalogTable) exten

private val hadoopConf = spark.sessionState.newHadoopConf

/**
* database.table in catalog
*/
val catalogTableName = table.qualifiedName

/**
* properties defined in catalog.
*/
Expand Down Expand Up @@ -144,7 +151,7 @@ class HoodieCatalogTable(val spark: SparkSession, val table: CatalogTable) exten
/**
* init hoodie table for create table (as select)
*/
def initHoodieTableIfNeeded(force: Boolean = false): Unit = {
def initHoodieTable(): Unit = {
logInfo(s"Init hoodie.properties for ${table.identifier.unquotedString}")
val (finalSchema, tableConfigs) = parseSchemaAndConfigs()

Expand All @@ -164,38 +171,43 @@ class HoodieCatalogTable(val spark: SparkSession, val table: CatalogTable) exten
* @return schema, table parameters in which all parameters aren't sql-styled.
*/
private def parseSchemaAndConfigs(): (StructType, Map[String, String]) = {
val sqlOptions = HoodieOptionConfig.defaultSqlOptions ++ catalogProperties
val globalProps = DFSPropertiesConfiguration.getGlobalProps.asScala.toMap
val globalTableConfigs = mappingSparkDatasourceConfigsToTableConfigs(globalProps)
val globalSqlOptions = HoodieOptionConfig.mappingTableConfigToSqlOption(globalTableConfigs)

val sqlOptions = HoodieOptionConfig.withDefaultSqlOptions(globalSqlOptions ++ catalogProperties)

// get final schema and parameters
val (finalSchema, tableConfigs) = (table.tableType, hoodieTableExists) match {
case (CatalogTableType.EXTERNAL, true) =>
val existingTableConfig = tableConfig.getProps.asScala.toMap
val currentTableConfig = globalTableConfigs ++ existingTableConfig
val catalogTableProps = HoodieOptionConfig.mappingSqlOptionToTableConfig(catalogProperties)
validateTableConfig(spark, catalogTableProps, convertMapToHoodieConfig(existingTableConfig))

val options = extraTableConfig(spark, hoodieTableExists, existingTableConfig) ++
HoodieOptionConfig.mappingSqlOptionToTableConfig(sqlOptions) ++ existingTableConfig
val options = extraTableConfig(spark, hoodieTableExists, currentTableConfig) ++
HoodieOptionConfig.mappingSqlOptionToTableConfig(sqlOptions) ++ currentTableConfig

val userSpecifiedSchema = table.schema
ValidationUtils.checkArgument(tableSchema.nonEmpty || table.schema.nonEmpty,
s"Missing schema for Create Table: $catalogTableName")
val schema = if (tableSchema.nonEmpty) {
tableSchema
} else if (userSpecifiedSchema.nonEmpty) {
addMetaFields(userSpecifiedSchema)
} else {
throw new IllegalArgumentException(s"Missing schema for Create Table: $tableName")
addMetaFields(table.schema)
}

(schema, options)

case (_, false) =>
assert(table.schema.nonEmpty, s"Missing schema for Create Table: $tableName")
ValidationUtils.checkArgument(table.schema.nonEmpty,
s"Missing schema for Create Table: $catalogTableName")
val schema = table.schema
val options = extraTableConfig(spark, isTableExists = false) ++
val options = extraTableConfig(spark, isTableExists = false, globalTableConfigs) ++
HoodieOptionConfig.mappingSqlOptionToTableConfig(sqlOptions)
(addMetaFields(schema), options)

case (CatalogTableType.MANAGED, true) =>
throw new AnalysisException(s"Can not create the managed table('$tableName')" +
throw new AnalysisException(s"Can not create the managed table('$catalogTableName')" +
s". The associated location('$tableLocation') already exists.")
}
HoodieOptionConfig.validateTable(spark, finalSchema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ case class CreateHoodieTableAsSelectCommand(
// Execute the insert query
try {
// init hoodie table
hoodieCatalogTable.initHoodieTableIfNeeded()
hoodieCatalogTable.initHoodieTable()

val tblProperties = hoodieCatalogTable.catalogProperties
val options = Map(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean
// check if there are conflict between table configs defined in hoodie table and properties defined in catalog.
CreateHoodieTableCommand.validateTblProperties(hoodieCatalogTable)
// init hoodie table
hoodieCatalogTable.initHoodieTableIfNeeded()
hoodieCatalogTable.initHoodieTable()

try {
// create catalog table for this hoodie table
Expand Down

0 comments on commit be763a2

Please sign in to comment.