Skip to content

Commit

Permalink
Add comments and modify the order of payloadClassName
Browse files Browse the repository at this point in the history
  • Loading branch information
dongkelun committed Oct 21, 2022
1 parent 56c859d commit 2e3520f
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -189,14 +189,14 @@ trait ProvidesHoodieConfig extends Logging {
}

/**
* Create the config for hoodie writer.
* Build the default config for mergeInto.
*/
def buildMergeIntoConfig(hoodieCatalogTable: HoodieCatalogTable, extraOptions: Map[String, String]): Map[String, String] = {
def buildMergeIntoConfig(hoodieCatalogTable: HoodieCatalogTable): Map[String, String] = {
val sparkSession: SparkSession = hoodieCatalogTable.spark
val partitionFields = hoodieCatalogTable.partitionFields.mkString(",")
val path = hoodieCatalogTable.tableLocation

val catalogProperties = hoodieCatalogTable.catalogProperties ++ extraOptions
val catalogProperties = hoodieCatalogTable.catalogProperties
val tableConfig = hoodieCatalogTable.tableConfig

// NOTE: Here we fallback to "" to make sure that null value is not overridden with
Expand Down Expand Up @@ -317,6 +317,7 @@ trait ProvidesHoodieConfig extends Logging {
val hiveSyncConfig: HiveSyncConfig = new HiveSyncConfig(props)
hiveSyncConfig.setValue(HoodieSyncConfig.META_SYNC_ENABLED.key, enableHive.toString)
hiveSyncConfig.setValue(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key, enableHive.toString)
// The default value of HIVE_SYNC_MODE is HMS. Be careful when modifying
hiveSyncConfig.setValue(HiveSyncConfigHolder.HIVE_SYNC_MODE.key, props.getString(HiveSyncConfigHolder.HIVE_SYNC_MODE.key, HiveSyncMode.HMS.name()))
hiveSyncConfig.setValue(HoodieSyncConfig.META_SYNC_BASE_PATH, hoodieCatalogTable.tableLocation)
hiveSyncConfig.setValue(HoodieSyncConfig.META_SYNC_BASE_FILE_FORMAT, hoodieCatalogTable.baseFileFormat)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,10 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie
this.sparkSession = sparkSession

// force to use ExpressionPayload as WRITE_PAYLOAD_CLASS_NAME in MergeIntoHoodieTableCommand
val extraOptions = Map(PAYLOAD_CLASS_NAME.key -> classOf[ExpressionPayload].getCanonicalName)
val payloadClassName = Map(PAYLOAD_CLASS_NAME.key -> classOf[ExpressionPayload].getCanonicalName)
// Create the write parameters
val parameters = buildMergeIntoConfig(hoodieCatalogTable, extraOptions)
// The payloadClassName in mergeInto cannot be overwritten, even if payloadClass has been set when table create
val parameters = payloadClassName ++ buildMergeIntoConfig(hoodieCatalogTable)

if (mergeInto.matchedActions.nonEmpty) { // Do the upsert
executeUpsert(sourceDF, parameters)
Expand Down

0 comments on commit 2e3520f

Please sign in to comment.