Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SUPPORT] CoW: Hudi Upsert not working when there is a timestamp field in the composite key #10303

Closed
srinikandi opened this issue Dec 11, 2023 · 5 comments
Labels
on-call-triaged priority:critical production down; pipelines stalled; Need help asap. writer-core Issues relating to core transactions/write actions

Comments

@srinikandi
Copy link

Hi we have been facing this issue with Hudi Upserts that are converting a timestamp field which is part of the Composite primary key.
The bulk insert on the table works fine and storing the timestamp in a proper timestamp format. But when the same table has upsert operation (Type 2 SCD), The new row inserted is having Timestamp value is getting converting into EPOCH for the __hoodied_record_key. The actual attribute in the table is having the data in proper timestamp format. This is breaking the type 2 SCD that we are trying to achieve as the subsequent updates are all being treated as new records.

Steps to reproduce the behavior:

  1. Created A COW table using bulk_insert and using a timestamp filed as part of the complex primary key
  2. Performed Upserts on the same time and the primary record key value is having timestamp field value converted to INT

We are using Glue with Hudi 0.12.1

  • Hudi version : 0.12.1

  • Spark version : 3.3

  • Hive version :

  • Hadoop version :

  • Storage (HDFS/S3/GCS..) : S3

  • Running on Docker? (yes/no) : No

Additional context

There was a issue opened about 2 years back and there was no resolution mentioned and the ticket was closed.
#3313

@srinikandi
Copy link
Author

srinikandi commented Dec 11, 2023

{'encoding': 'utf-8', 'className': 'org.apache.hudi', 'hoodie.datasource.hive_sync.enable': 'true', 'hoodie.datasource.hive_sync.use_jdbc': 'false', 'hoodie.datasource.hive_sync.support_timestamp': 'true', 'hoodie.table.name': 'account_master', 'hoodie.datasource.hive_sync.table': 'account_master', 'hoodie.datasource.write.recordkey.field': 'acct_id,eff_start_datetime', 'hoodie.datasource.write.precombine.field': 'updated_datetime', 'hoodie.datasource.hive_sync.database': 'account_core', 'hoodie.datasource.write.keygenerator.class': 'org.apache.hudi.keygen.ComplexKeyGenerator', 'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor', 'hoodie.datasource.write.partitionpath.field': '', 'hoodie.datasource.hive_sync_mode': 'hms', 'hoodie.datasource.write.operation': 'upsert'}

@srinikandi
Copy link
Author

srinikandi commented Dec 11, 2023

Screen of the data showing the different behavior between bulk_insert and upsert on the same set of records. If you see we should be only having curr_id = 1 for only 1 record at any point in time. The first upsert broke that. But the subsequent update fixed it as the timestamp value of INT is taken into consideration for the second update.

#	_hoodie_commit_time	_hoodie_commit_time	_hoodie_record_key	curr_ind
1	20231208222125423	20231208222125423	acct_id:4,eff_start_datetime:2023-12-08 16:19:52.982	1
2	20231211170313525	20231211170313525	acct_id:4,eff_start_datetime:1702052392982000	0
3	20231211170313525	20231211170313525	acct_id:4,eff_start_datetime:1702291970173000	1

@ad1happy2go
Copy link
Collaborator

@srinikandi I see a fix(#4201) was tried but then it was reverted due to another issue,. Will look into it. Thanks for raising this again.

@codope codope added priority:critical production down; pipelines stalled; Need help asap. writer-core Issues relating to core transactions/write actions labels Dec 19, 2023
@ad1happy2go
Copy link
Collaborator

@srinikandi Sorry for the delay on this.

I was able to reproduce the issue with Hudi version 0.12.1 and 0.14.1. We have introduced the config "hoodie.datasource.write.keygenerator.consistent.logical.timestamp.enabled", you can set it to True.

  public static final ConfigProperty<String> KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED = ConfigProperty
      .key("hoodie.datasource.write.keygenerator.consistent.logical.timestamp.enabled")
      .defaultValue("false")
      .withDocumentation("When set to true, consistent value will be generated for a logical timestamp type column, "
          + "like timestamp-millis and timestamp-micros, irrespective of whether row-writer is enabled. Disabled by default so "
          + "as not to break the pipeline that deploy either fully row-writer path or non row-writer path. For example, "
          + "if it is kept disabled then record key of timestamp type with value `2016-12-29 09:54:00` will be written as timestamp "
          + "`2016-12-29 09:54:00.0` in row-writer path, while it will be written as long value `1483023240000000` in non row-writer path. "
          + "If enabled, then the timestamp value will be written in both the cases.");

Reproducible Code which works when we set the config. -

from faker import Faker
import pandas as pd
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

#..........................   Fake Data Generation ...........................
fake = Faker()
data = [{"transactionId": fake.uuid4(), "EventTime": "2014-01-01 23:00:01","storeNbr" : "1",
         "FullName": fake.name(), "Address": fake.address(),
         "CompanyName": fake.company(), "JobTitle": fake.job(),
         "EmailAddress": fake.email(), "PhoneNumber": fake.phone_number(),
         "RandomText": fake.sentence(), "City": fake.city(),
         "State": "NYC", "Country": "US"} for _ in range(5)]
pandas_df = pd.DataFrame(data)

hoodi_configs = {
    "hoodie.insert.shuffle.parallelism": "1",
    "hoodie.upsert.shuffle.parallelism": "1",
    "hoodie.bulkinsert.shuffle.parallelism": "1",
    "hoodie.delete.shuffle.parallelism": "1",
    "hoodie.datasource.write.row.writer.enable": "true",
    "hoodie.datasource.write.keygenerator.class": "org.apache.hudi.keygen.ComplexKeyGenerator",
    "hoodie.datasource.write.recordkey.field": "transactionId,storeNbr,EventTime",
    "hoodie.datasource.write.precombine.field": "Country",
    "hoodie.datasource.write.partitionpath.field": "State",
    "hoodie.datasource.write.payload.class": "org.apache.hudi.common.model.DefaultHoodieRecordPayload",
    "hoodie.datasource.write.hive_style_partitioning": "true",
    "hoodie.combine.before.upsert": "true",
    "hoodie.table.name": "huditransaction",
    "hoodie.datasource.write.keygenerator.consistent.logical.timestamp.enabled": "false",
}
spark.sparkContext.setLogLevel("WARN")

df = spark.createDataFrame(pandas_df).withColumn("EventTime", expr("cast(EventTime as timestamp)"))
df.write.format("hudi").options(**hoodi_configs).option("hoodie.datasource.write.operation","bulk_insert").mode("overwrite").save(PATH)
spark.read.options(**hoodi_configs).format("hudi").load(PATH).select("_hoodie_record_key").show(10,False)
df.withColumn("City",lit("updated_city")).write.format("hudi").options(**hoodi_configs).option("hoodie.datasource.write.operation","upsert").mode("append").save(PATH)
spark.read.options(**hoodi_configs).format("hudi").load(PATH).select("_hoodie_record_key").show(10,False)

Let me know in case you need any more help on this. Thanks.

@ad1happy2go
Copy link
Collaborator

@srinikandi Closing out this issue, Please reopen in case you still faces this issue after setting hoodie.datasource.write.keygenerator.consistent.logical.timestamp.enabled

@codope codope closed this as completed Jan 31, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
on-call-triaged priority:critical production down; pipelines stalled; Need help asap. writer-core Issues relating to core transactions/write actions
Projects
Archived in project
Development

No branches or pull requests

3 participants