-
Notifications
You must be signed in to change notification settings - Fork 2.5k
Description
Desciption
There are duplicate values in HUDI MOR table for different partition and not updating values in same partition for GLOBAL_BLOOM.
Steps To Reproduce this behavior
STEP 1
I have created a hudi table with follwing input data and properties.
hudi_options = {
'hoodie.table.name': 'my_hudi_table',
'hoodie.datasource.write.recordkey.field': 'id',
'hoodie.datasource.write.partitionpath.field': 'creation_date',
'hoodie.datasource.write.precombine.field': 'last_update_time',
'hoodie.datasource.write.table.type': 'MERGE_ON_READ' ,
'hoodie.bloom.index.update.partition.path': 'true',
"hoodie.index.type": "GLOBAL_BLOOM",
"hoodie.datasource.write.keygenerator.class" : "org.apache.hudi.keygen.ComplexKeyGenerator",
"hoodie.datasource.write.hive_style_partitioning": 'true',
'hoodie.datasource.hive_sync.assume_date_partitioning':'false',
'hoodie.datasource.hive_sync.enable': 'true',
'hoodie.datasource.hive_sync.database':'pfg_silver_fantasy',
'hoodie.datasource.hive_sync.table': 'hudi_test1',
'hoodie.datasource.hive_sync.partition_fields': 'creation_date',
'hoodie.datasource.hive_sync.support_timestamp': 'true',
'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor'
}
# Create a DataFrame
inputDF = spark.createDataFrame(
[
("100", "2015-01-01", "1", 'a'),
("101", "2015-01-01", "1", 'a'),
],
["id", "creation_date", "last_update_time","new_col"]
)
# Write a DataFrame as a Hudi dataset
inputDF.write \
.format('org.apache.hudi') \
.options(**hudi_options) \
.mode('overwrite') \
.save('s3://<loc>/hudi_test1')
Output after step1 in _rt table:
"_hoodie_commit_time" "_hoodie_commit_seqno" "_hoodie_record_key" "_hoodie_partition_path" "_hoodie_file_name" id last_update_time new_col creation_date
20220615024525 20220615024525_0_1 id:101 creation_date=2015-01-01 cb8df2b4-1268-48b3-8665-1e4ac1196734-0_0-58-25650_20220615024525.parquet 101 1 a 2015-01-01
20220615024525 20220615024525_0_2 id:100 creation_date=2015-01-01 cb8df2b4-1268-48b3-8665-1e4ac1196734-0_0-58-25650_20220615024525.parquet 100 1 a 2015-01-01
Step3: Upserting
inputDF = spark.createDataFrame(
[
("100", "2015-01-02", "2","b"),
("101", "2015-01-01", "2","b")
],
["id", "creation_date", "last_update_time","new_col"]
)
inputDF.write
.format('org.apache.hudi')
.options(**hudi_options)
.mode('append')
.option('hoodie.datasource.write.operation', 'upsert')
.save('s3:///hudi_test2')
Output after step3 in _rt table :
"_hoodie_commit_time" "_hoodie_commit_seqno" "_hoodie_record_key" "_hoodie_partition_path" "_hoodie_file_name" id last_update_time new_col creation_date
20220615024525 20220615024525_0_1 id:101 creation_date=2015-01-01 cb8df2b4-1268-48b3-8665-1e4ac1196734-0_0-58-25650_20220615024525.parquet 101 1 a 2015-01-01
20220615024525 20220615024525_0_2 id:100 creation_date=2015-01-01 cb8df2b4-1268-48b3-8665-1e4ac1196734-0_0-58-25650_20220615024525.parquet 100 1 a 2015-01-01
20220615024626 20220615024626_1_3 id:100 creation_date=2015-01-02 6c1dbd2d-5db5-4c65-b180-f1d9561cf637-0_1-92-39217_20220615024626.parquet 100 2 b 2015-01-02
Expected behavior
It should not have any duplicate values and also update values in same partition.
Environment Description
-
Hudi version : hudi-spark-bundle_2.11-0.7.0-amzn-1.jar
-
Spark version : version 2.4.7-amzn-1
-
Storage (HDFS/S3/GCS..) : S3
-
Running on Docker? (yes/no) : no
Metadata
Metadata
Assignees
Labels
Type
Projects
Status