Skip to content

[SUPPORT] insert operation does not consistently insert duplicate records #3709

@helanto

Description

@helanto

Tips before filing an issue

  • Have you gone through our FAQs? Yes

Describe the problem you faced

Hi,
when using INSERT mode and the input dateset includes duplicate keys, a (not wanted) de-duplication step is performed as part of the merge process.

To Reproduce

Steps to reproduce the behavior:

  1. First I create a Hudi table (which contains duplicate keys) using INSERT mode.
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.config.{HoodieCompactionConfig, HoodieWriteConfig}
import org.apache.hudi.keygen.NonpartitionedKeyGenerator
import org.apache.hudi.hive.NonPartitionedExtractor

val tableName: String = "language"

val cfg: Map[String, String] = Map[String, String](
    DataSourceWriteOptions.TABLE_TYPE.key() -> DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
    DataSourceWriteOptions.PRECOMBINE_FIELD.key() -> "score",
    DataSourceWriteOptions.RECORDKEY_FIELD.key() -> "lang",
    DataSourceWriteOptions.TABLE_NAME.key() -> tableName,
    HoodieWriteConfig.TBL_NAME.key() -> tableName,
    HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key() -> classOf[NonpartitionedKeyGenerator].getCanonicalName,
    DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS.key() -> classOf[NonPartitionedExtractor].getCanonicalName,
    DataSourceWriteOptions.HIVE_STYLE_PARTITIONING.key() -> "true",
    DataSourceWriteOptions.OPERATION.key() -> DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
    DataSourceWriteOptions.INSERT_DROP_DUPS.key() -> "false",
    HoodieWriteConfig.COMBINE_BEFORE_INSERT.key() -> "false",
    HoodieWriteConfig.MERGE_ALLOW_DUPLICATE_ON_INSERTS_ENABLE.key() -> "true",
    HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key() -> "104857600")
    
val initialInput = (("python", 1) :: ("scala", 5) :: ("scala", 4) :: ("haskell", 5) :: Nil).toDF("lang", "score")

initialInput.write.format("hudi").options(cfg).mode(Append).save(basePath)
  1. The Hudi table contains duplicate data, which is what I expect.
spark.read.format("hudi").load(basePath).show()

+-------------------+--------------------+------------------+----------------------+--------------------+-------+-----+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name|   lang|score|
+-------------------+--------------------+------------------+----------------------+--------------------+-------+-----+
|     20210923165656|  20210923165656_0_1|            python|                      |6be75119-47a0-4ad...| python|    1|
|     20210923165656|  20210923165656_0_2|             scala|                      |6be75119-47a0-4ad...|  scala|    5|
|     20210923165656|  20210923165656_0_3|             scala|                      |6be75119-47a0-4ad...|  scala|    4|
|     20210923165656|  20210923165656_0_4|           haskell|                      |6be75119-47a0-4ad...|haskell|    5|
+-------------------+--------------------+------------------+----------------------+--------------------+-------+-----+
  1. We insert a second dataset (which again contains duplicate keys).
val newInput = (("scala", 5) ::  ("java", 3) :: ("java", 4) :: Nil).toDF("lang", "score")

newInput.write.format("hudi").options(cfg).mode(Append).save(basePath)
  1. Some entries from the second dataset is not stored in the Hudi table:
spark.read.format("hudi").load(basePath).show()

+-------------------+--------------------+------------------+----------------------+--------------------+-------+-----+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name|   lang|score|
+-------------------+--------------------+------------------+----------------------+--------------------+-------+-----+
|     20210923165656|  20210923165656_0_1|            python|                      |6be75119-47a0-4ad...| python|    1|
|     20210923165656|  20210923165656_0_2|             scala|                      |6be75119-47a0-4ad...|  scala|    5|
|     20210923165656|  20210923165656_0_3|             scala|                      |6be75119-47a0-4ad...|  scala|    4|
|     20210923165656|  20210923165656_0_4|           haskell|                      |6be75119-47a0-4ad...|haskell|    5|
|     20210923165821|  20210923165821_0_5|              java|                      |6be75119-47a0-4ad...|   java|    4|
|     20210923165821|  20210923165821_0_6|             scala|                      |6be75119-47a0-4ad...|  scala|    5|
+-------------------+--------------------+------------------+----------------------+--------------------+-------+-----+

Expected behavior

I would expect that both entries with key java are stored in the table, since I have deactivated de-duplication on INSERT. I am not sure if de-duplication on input is desirable, but I would expect the behavior of the first insert to be the same as the behavior of the second insert when using the same configuration.

I managed to achieve the expected behavior by setting the HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT to "0" (HoodieMergeHandle seems to deduplicate input dataset even when precombine is deactivated). Having to deactivate merging is something that I want to avoid.

Environment Description

  • Hudi version : 0.9.0

  • Spark version : 3.0.1

Metadata

Metadata

Assignees

Labels

area:writerWrite client and core write operationsengine:sparkSpark integrationpriority:criticalProduction degraded; pipelines stalled

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions