Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SUPPORT] Upsert data with an identical record key and pre-combine field #3266

Closed
mtami opened this issue Jul 13, 2021 · 14 comments
Closed

[SUPPORT] Upsert data with an identical record key and pre-combine field #3266

mtami opened this issue Jul 13, 2021 · 14 comments

Comments

@mtami
Copy link

mtami commented Jul 13, 2021

I am using AWS DMS Change data Capture service to get change data from my database and then using Apache Hudi with AWS glue ETL job to process the change data and create a table in hive. I am using a pre-combine field as a timestamp sent from AWS DMS as when the data was committed (update_ts_dms).

I have few use cases where Insert/Updates and Deletes for the same primary key are having the same timestamp sent from DMS and after change data processing apache hudi is not giving the latest updated data in the table or last added updated row for the same primary key in the table. It is adding any random insert or update in the table. May be due to the same primary key and the same pre-combine field.

is there any suggested solution for such case?

Sample Data:
{ "Op": "I", "update_ts_dms": "2021-07-08 10:47:53", "id": 10125412, "brand_id": 9722520, "type": "EXPLICIT", "created": "2021-07-08 10:47:53", "updated": "2021-07-08 10:47:53" } { "Op": "D", "update_ts_dms": "2021-07-08 10:47:53", "id": 10125412, "brand_id": 9722520, "type": "EXPLICIT", "created": "2021-07-08 10:47:53", "updated": "2021-07-08 10:47:53" }

Environment Description

  • Hudi version (jar): hudi-spark-bundle_2.11-0.9.0-SNAPSHOT.jar

  • Spark version : 2.4

  • Hadoop version : 2.8

  • Storage : S3

  • Running on Docker?: no

@mtami mtami changed the title [SUPPORT] Upsert data with the identical record key and pre-combine field [SUPPORT] Upsert data with an identical record key and pre-combine field Jul 13, 2021
@danny0405
Copy link
Contributor

Thanks for the feedback, i guess this is because our payload does not choose the latest data when the preCombine field are the same. See OverwriteWithLatestAvroPayload.preCombine.

@danny0405
Copy link
Contributor

See this PR: #3267

@danny0405 danny0405 added the pr:wip Work in Progress/PRs label Jul 13, 2021
@mtami
Copy link
Author

mtami commented Jul 13, 2021

Thank you @danny0405 for the help.

if i understand well, the OverwriteWithLatestAvroPayload config is the default and it will render any value set for PRECOMBINE_FIELD_OPT_VAL.

but what will happened if i upsert a dataframe that contains two record with same recordKey & preCombine, will this end-up upserting the latest record in the dataframe ?

@danny0405
Copy link
Contributor

danny0405 commented Jul 13, 2021

It will, but only after this patch for #preCombine method.

@mtami
Copy link
Author

mtami commented Jul 14, 2021

Hi @danny0405
do you have estimate time for merging PR: #3267 (weeks, days) ?

Thanks,

@danny0405
Copy link
Contributor

Yes, the PR would be merged soon once the CI tests pass.

@mtami
Copy link
Author

mtami commented Aug 7, 2021

related to PR #3401

@danny0405
Copy link
Contributor

The PR is merged, please verify again if your issue is resolved.

@Rap70r
Copy link

Rap70r commented Aug 23, 2021

Hello @danny0405,

I'm facing similar issue when extracting Kafka events. When a delete is followed by an insert, usually inside a transaction, sometimes the timestamp field that indicates when the events were processed is the same for both events. When I run my spark job to extract the events, the delete event is taking precedence over the insert.
Would this change fix the described issue and also, do you have an estimate when version 0.9.0 will be released in production?

Thank you

@danny0405
Copy link
Contributor

The delete is still problematic for MOR table, because Hudi always writes the DELETE data block after the DATA block, the DELETE block does not record the event time, and the reader always read the DELETE block after, so even a DELETE message has smaller event time than INSERT message, the INSERT message is still deleted.

We have fixed the sequence for COW table though in the 0.9 release.

@mtami
Copy link
Author

mtami commented Aug 24, 2021

Hi @danny0405
Apologies for late response, the PR has resolved my issue.
Thanks!

@Rap70r
Copy link

Rap70r commented Aug 24, 2021

Hi @danny0405,

Thank you for your reply.
I'm using COW for my Kafka/Hudi pipeline described above. I did not clarify that in my first comment. Having that in mind, I assume this change should fix my issue as well, correct?
Also, do you have an estimate for the production release date?

Thank you

@nsivabalan nsivabalan added awaiting-triage and removed pr:wip Work in Progress/PRs labels Aug 25, 2021
@nsivabalan nsivabalan added this to Awaiting Triage in GI Tracker Board Aug 25, 2021
@danny0405
Copy link
Contributor

Hi @Rap70r , the 0.9.0 has been released so i would just close the issue as resolved, if you have other problems, feel free to reopen it again.

GI Tracker Board automation moved this from Awaiting Triage to Done Aug 28, 2021
@Rap70r
Copy link

Rap70r commented Aug 28, 2021

Thank you @danny0405, I have tested and confirmed the issue, I described above, has been resolved for me.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Development

No branches or pull requests

4 participants