Skip to content

[SUPPORT] Insert results different than bulk_insert #6531

@parisni

Description

@parisni

hudi 0.11 / 0.12 at least

Bulk insert provides the same semantics as insert
https://hudi.apache.org/docs/write_operations#bulk_insert

From my tests this is not true. The below script results in duplicated rows with bulk insert, while insert replaces rows (what we expect from upsert)

I would expect both bulk_insert / insert share same results ?

sc.setLogLevel("WARN")
from pyspark.sql.types import StructType, StructField, StringType

env = "qa"
datalaketable = "nested_table"
tableName = "test_hudi_{datalaketable}".format(datalaketable=datalaketable)
basePath = "/tmp/{tableName}".format(tableName=tableName)
from pyspark.sql.functions import expr

##
# Create a nested table
##
data = [ (("James", None, "Smith"), "OH", "M"),
    (("Anna", "Rose", ""), "NY", "F"),
    (("Julia", "", "Williams"), "OH", "F"),
    (("Maria", "Anne", "Jones"), "NY", "M"),
    (("Jen", "Mary", "Brown"), "NY", "M"),
    (("Mike", "Mary", "Williams"), "OH", "M"),
]

schema = StructType(
    [
        StructField(
            "name",
            StructType(
                [
                    StructField("firstname", StringType(), True),
                    StructField("middlename", StringType(), True),
                    StructField("lastname", StringType(), True),
                ]
            ),
        ),
        StructField("state", StringType(), True),
        StructField("gender", StringType(), True),
    ]
)
df = (
    spark.createDataFrame(data=data, schema=schema)
    .withColumn("event_id", expr("row_number() over(partition by 1 order by 1)"))
    .withColumn("event_date", expr("current_date()"))
    .withColumn("version", expr("current_date()"))
)

df.printSchema()
hudi_options = {
    "hoodie.table.name": tableName,
    "hoodie.datasource.write.recordkey.field": "event_id",
    "hoodie.datasource.write.partitionpath.field": "event_date",
    "hoodie.datasource.write.table.name": tableName,
    "hoodie.datasource.write.operation": "insert", # change with bulk_insert
    "hoodie.datasource.write.precombine.field": "version",
    "hoodie.upsert.shuffle.parallelism": 80,
    "hoodie.insert.shuffle.parallelism": 80,
    "hoodie.delete.shuffle.parallelism": 80,
    "hoodie.datasource.write.keygenerator.class": "org.apache.hudi.keygen.ComplexKeyGenerator",
    "hoodie.datasource.write.hive_style_partitioning": "true",
    "hoodie.datasource.hive_sync.enable": "false",
    "hoodie.datasource.write.keygenerator.class": "org.apache.hudi.keygen.ComplexKeyGenerator",
    "hoodie.metadata.enable": "true",
}
(df.write.format("hudi").options(**hudi_options).mode("overwrite").save(basePath))
spark.read.format("hudi").load(basePath).count()

(df.write.format("hudi").options(**hudi_options).mode("append").save(basePath))

spark.read.format("hudi").load(basePath).count()

// 12 with bulk insert
// 6 with insert

Metadata

Metadata

Assignees

No one assigned

    Labels

    priority:mediumModerate impact; usability gapsstatus:triagedIssue has been reviewed and categorized

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions