Skip to content

Commit

Permalink
[HUDI-4346] Fix params not update BULKINSERT_ARE_PARTITIONER_RECORDS_…
Browse files Browse the repository at this point in the history
…SORTED (#5999)
  • Loading branch information
boneanxs committed Jun 30, 2022
1 parent 6a01f70 commit cdaaa3c
Showing 1 changed file with 3 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,8 @@ object HoodieSparkSqlWriter {
if (parameters(INSERT_DROP_DUPS.key).toBoolean) {
throw new HoodieException("Dropping duplicates with bulk_insert in row writer path is not supported yet")
}
val params = parameters.updated(HoodieWriteConfig.AVRO_SCHEMA_STRING.key, schema.toString)
val params: mutable.Map[String, String] = collection.mutable.Map(parameters.toSeq: _*)
params(HoodieWriteConfig.AVRO_SCHEMA_STRING.key) = schema.toString
val writeConfig = DataSourceUtils.createHoodieConfig(schema.toString, path, tblName, mapAsJavaMap(params))
val bulkInsertPartitionerRows: BulkInsertPartitioner[Dataset[Row]] = if (populateMetaFields) {
val userDefinedBulkInsertPartitionerOpt = DataSourceUtils.createUserDefinedBulkInsertPartitionerWithRows(writeConfig)
Expand All @@ -535,7 +536,7 @@ object HoodieSparkSqlWriter {
new NonSortPartitionerWithRows()
}
val arePartitionRecordsSorted = bulkInsertPartitionerRows.arePartitionRecordsSorted()
parameters.updated(HoodieInternalConfig.BULKINSERT_ARE_PARTITIONER_RECORDS_SORTED, arePartitionRecordsSorted.toString)
params(HoodieInternalConfig.BULKINSERT_ARE_PARTITIONER_RECORDS_SORTED) = arePartitionRecordsSorted.toString
val isGlobalIndex = if (populateMetaFields) {
SparkHoodieIndexFactory.isGlobalIndex(writeConfig)
} else {
Expand Down

0 comments on commit cdaaa3c

Please sign in to comment.