Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP]HUDI-644 Implement checkpoint generator helper tool #1362

Closed
wants to merge 1 commit into from

Conversation

garyli1019
Copy link
Member

What is the purpose of the pull request

This PR is to resolve the following problem:

The user is using a homebrew Spark data source to read new data and write to Hudi table

The user would like to migrate to Delta Streamer

But the Delta Streamer only checks the last commit metadata, if there is no checkpoint info, then the Delta Streamer will use the default. For Kafka source, it is LATEST.

The user would like to run the homebrew Spark data source reader and Delta Streamer in parallel to prevent data loss, but the Spark data source writer will make commit without checkpoint info, which will reset the delta streamer.

So if we have an option to allow the user to retrieve the checkpoint from previous commits instead of the latest commit would be helpful for the migration.

Brief change log

  • Add an option to restrieve checkpoint in DeltaSync

Verify this pull request

This pull request is a trivial rework / code cleanup without any test coverage.

Committer checklist

  • Has a corresponding JIRA in PR title & commit

  • Commit message is descriptive of the change

  • CI is green

  • Necessary doc changes done or have another open PR

  • For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.

@leesf leesf self-assigned this Feb 28, 2020
@codecov-io
Copy link

codecov-io commented Feb 28, 2020

Codecov Report

❗ No coverage uploaded for pull request base (master@cacd9a3). Click here to learn what that means.
The diff coverage is 59.09%.

Impacted file tree graph

@@            Coverage Diff            @@
##             master    #1362   +/-   ##
=========================================
  Coverage          ?   67.09%           
  Complexity        ?      224           
=========================================
  Files             ?      333           
  Lines             ?    16217           
  Branches          ?     1659           
=========================================
  Hits              ?    10880           
  Misses            ?     4600           
  Partials          ?      737
Impacted Files Coverage Δ Complexity Δ
...i/utilities/deltastreamer/HoodieDeltaStreamer.java 81% <100%> (ø) 8 <0> (?)
...apache/hudi/utilities/deltastreamer/DeltaSync.java 72.03% <55%> (ø) 39 <2> (?)

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update cacd9a3...cc087db. Read the comment docs.

@pratyakshsharma
Copy link
Contributor

@garyli1019 If I understand it correctly, you are talking of a use case where you are using HoodieDeltaStreamer along with using spark data source as a backup. Why do you want to have two different pipelines writing to the same destination path?

If you really want to have a backup to prevent any data loss, you can write to a separate path using spark data source and continue using DeltaStreamer to write to Hudi dataset. In case of any issues, you can always use CHECKPOINT_RESET_KEY to ingest the data from your back up path into your Hudi dataset path. We have support for kafka as well as DFS source for this purpose.

Also what is the source for your homebrew spark? If it is also consuming from kafka, then I do not see any case where using DeltaStreamer can result in data loss. Can you please explain why do you want to use two pipelines for writing to the same destination path?

@garyli1019
Copy link
Member Author

@pratyakshsharma Thanks for reviewing this PR.
I can say more about my use cases:

  • I am using Kafka connect to sink Kafka to HDFS every 30 minutes, partitioned by the arrival time(year=xx/month=xx/day=xx/hour=xx)
  • The homebrew spark Datasource I built is using (the current time - the last Hudi commit timestamp) to find a timeWindow and use it to load the data generated by Kafka connect.
  • I will be easy to switch to delta streamer with DFS source, but a little bit tricky to switch to Kafka, because there is a delay caused by the Kafka connect.

So right now, if I switch to delta streamer directly ingesting from Kafka, I will start from the LATEST checkpoint, and EARLIEST is not possible because my Kafka cluster retention time is pretty long.
The data loss I mentioned in the few hours gap between the commit I switched from my homebrew Spark data source reader to delta streamer. All I need to do is in this commit, I run the delta streamer first to store the LATEST checkpoint, then run my data source reader to read the data in those few hours gap. Only need one parallel run here then I will be good to go with the delta streamer.

@garyli1019
Copy link
Member Author

I think running the parallel jobs once sounds a little bit hacky. The best way should be to generate the checkpoint string and pass it to the delta streamer in the first run. In this way, I will need to write a checkpoint generator to scan all the files generated by Kafka connect. This is definitely doable but needs some effort.
So I think we can do this to help the users migrate to delta streamer:

  • checkPointGenerator helper functions help users generate the checkpoint from popular sink connectors(Kafka connect, Spark streaming e.t.c)
  • Allow the user to commit without using delta streamer to fix the gap if the checkpoint is difficult to generate.
    Any thoughts?

@pratyakshsharma
Copy link
Contributor

@garyli1019 still I feel all these challenges are arising because you are trying to ingest data in the same dataset using 2 different spark jobs. Few questions -

  1. If the kafka cluster retention time is too long, have you tried using BULK_INSERT mode of Hudi?If not, you can tune parameters around spark and Hudi to increase source limit and then ingest the data. Else you can also try using DeltaStreamer in continuous mode.
  2. Also I would like to know the reason behind switching everytime from homebrew spark to Hudi. Are you doing some POC on Hudi? Why don't you simply use DeltaStreamer and never switch to the other data source? The data loss will not happen if you simply rely on one of the data sources :)

I am a bit skeptical of trying to use 2 pipelines to write to same destination path. Additionally we have options available for taking backup of your hudi dataset or for migrating existing dataset to Hudi. Anyways if you strongly feel the need to write this checkPointGenerator, let us hear the opinion of @leesf and @vinothchandar as well on this before proceeding.

@garyli1019
Copy link
Member Author

@pratyakshsharma So let's forget about my homebrew Spark data source reader. Let's assume I am using delta streamer consuming DFS source, now I'd like to switch to delta streamer consuming Kafka source. The data arrive at DFS and Kafka is asynchronous. DFS source has 30 minutes delay from Kafka.
So basically I'd like to switch from: Kafka -> HDFS raw parquet -> Hudi table to Kafka -> Hudi table. If you have a good solution for this case please let me know.

  • The problem I have here is Kafka retention time is long but not long enough to cover all the data. All the raw data I have is in DFS and they are keep coming in. If I simply do BULK_INSERT from EARLIEST checkpoint from Kafka, I will lose data. If I do HDFS import first, then UPSERT from:
    the EARLIEST checkpoint, it could eat up the resources of both my Spark cluster and Kafka cluster because the data volume is huge.
    the LATEST checkpoint, I will lose data(30 mins gap).
  • There are some Hudi users are not using Delta Streamer in the first place and would like to switch to it later I believe. And I am one of them. Cause form a user perspective, I won't fully trust a framework until I fully understand and gain enough experience with it.

Currently, I couldn't find a perfect way to switch to delta streamer cause:
I need to make a non-deltastreamer commit to append the gap data into the Hudi dataset but this commit will let me lose the checkpoint. Let's not say this is a parallel pipeline cause it's confusing. This is a one-time thing to fix the data gap from two different sources and the delta streamer will be the only one to do the sink later.

@pratyakshsharma
Copy link
Contributor

pratyakshsharma commented Mar 4, 2020

Let me put forward my viewpoint on this. When I was in the phase of adopting Hudi, I kept my already running pipeline writing to some path and started DeltaStreamer to write to some other path. Then I used to do validation everyday for some period of time to gain enough confidence on this framework before completely switching to Hudi.

Coming to your point of switching from Kafka -> HDFS raw parquet -> Hudi table to Kafka -> Hudi table, I was thinking of a similar use case some time back and the simplest thing I could think of was to support having checkpoints for Hudi dataset source wise. Currently we store checkpoint "deltastreamer.checkpoint.key" in .commit file and this variable stores checkpoint in a particular format for every source which creates problems when you try to switch your source for the same dataset. So I think if we could simply introduce more variables like this and each one of them will store checkpoints for their corresponding sources, this use case can be solved with minimal efforts. And yes this needs development cycle since what I am proposing is not supported as of now. WDYT?

Currently to handle such scenarios, we have "deltastreamer.checkpoint.reset_key" configurable for every DeltaStreamer run and you can do hacks around these two variables ("deltastreamer.checkpoint.key" and "deltastreamer.checkpoint.reset_key") to get your use case solved but a clean solution should be what I proposed above. The above solution works well in cases where you want to switch sources quite frequently also.

Also would like to hear from @leesf and @vinothchandar on this.

@garyli1019
Copy link
Member Author

Yeah, I definitely agree that there are some work to do to improve the migration process to the delta streamer. In order to use deltastreamer.checkpoint.reset_key I will need something like a checkpointGenerator mentioned above, otherwise it would be difficult to find the correct checkpoint for each table. I have a few hundreds of tables to manage so I do need a robust and trustworthy solution for the migration.
Also, I think it makes sense to give more options to the users to play around with the delta streamer for their own use cases.
e.g.

  • Allow the user to get checkpoint from commits older than the last commit(This PR)
  • Allow the user to get checkpoint from a specific commit
  • Allow the user to store checkpoint info in the commit metadata even if they are not using delta streamer. For example, when they are using HDFS importer or Spark Datasource writer to do the initial bulk_insert.
  • Maybe more ...

With though flexibility, I believe the user will be able to use the delta streamer in a more programmatically way.

@vinothchandar
Copy link
Member

Let me catch up on this discussion and circle back.. :)

Just one high level question (apologies if its already answered above).

why can't we use the checkpoint reset flag, if one-time manual restarts are needed for deltastreamer? is it because its hard to compute that?

@vinothchandar vinothchandar self-assigned this Mar 6, 2020
@garyli1019
Copy link
Member Author

Let me catch up on this discussion and circle back.. :)

Just one high level question (apologies if its already answered above).

why can't we use the checkpoint reset flag, if one-time manual restarts are needed for deltastreamer? is it because its hard to compute that?

Right. I need a robust way to generate the checkpoint from kafka-connect-hdfs managed files and kafka-connect itself sometimes having an issue to retrieve checkpoint when the Kafka partition number was large. The mechanism is to scan every single file and get the latest checkpoint of each Kafka partition.

@vinothchandar
Copy link
Member

Okay. caught up now..

Firstly, writing in parallel using two jobs is a dangerous thing as Hudi does not support such multi writer access. I would advise against it (although you could hack it to work per se if you tried enough)..

@garyli1019 we can definitely add tooling to generate checkpoints in the format that DeltaStreamer expects.. But, I would like to decouple that from the delta streamer itself.. I favor, keeping it simple and just a single knob for the user wanting to override the checkpoint.. There is already an option to override the checkpoint I believe..

   /**
     * Resume Delta Streamer from this checkpoint.
     */
    @Parameter(names = {"--checkpoint"}, description = "Resume Delta Streamer from this checkpoint.")
    public String checkpoint = null;

I need a robust way to generate the checkpoint from kafka-connect-hdfs managed files and kafka-connect itself sometimes having an issue to retrieve checkpoint when the Kafka partition number was large

Would like to understand this more in general .. For DFS sources, all you need is a timestamp right? And for Kafka, you need to call consumer.offsetForTimes() and get a bunch of offsets to override from

@garyli1019
Copy link
Member Author

@vinothchandar @pratyakshsharma Agree that running non-delta streamer commit to fix the data gap sounds a bit hacky. I think I can get the checkpoint from previous commit myself and pass it to the delta streamer as the checkpoint would serve the same purpose of this PR.

The ideal migration process for my use case would be:

  • Do the initial bulk inset from reading DFS parquet source managed by Kafka-connect, the naming convention of the parquet files: topicname+partition1+offsetLowerbound+offsetUpperbound.parquet
  • Generate the delta streamer checkpoint from those parquet files: {partition1: offsetUpperbound, ...}
  • Start the delta streamer from the checkpoint
    I think the tool to generate the checkpoint for the user would make the migration seamless. Will look into how to implement this.

Thanks for the hint on the Kafka API :) One edge case would stop me using it is that a kafka-connector stuck for one partition and other partitions are fine, if I pick a timestamp not earlier than the stuck partition, then I might lose some messages there.

Once I figure out the offsetGenerator tool I will update this PR.

@garyli1019 garyli1019 changed the title HUDI-644 Enable user to get checkpoint from previous commits in DeltaStreamer [WIP]HUDI-644 Implement checkpoint generator helper tool Mar 12, 2020
@vinothchandar
Copy link
Member

@garyli1019 I understand what you are getting at.. We had a similar issue cutting over pipelines and we handled that by having ability to force a checkpoint for a single run of delta streamer..

So, I my guess is, we will explore a way to generate checkpoints from different other mechanisms like connect-hdfs.?

@garyli1019
Copy link
Member Author

So, I my guess is, we will explore a way to generate checkpoints from different other mechanisms like connect-hdfs.?

@vinothchandar right. Step1: implement the tool. Step2: find a way to integrate it with the initial bulk insert or the HDFS importer. In this way we can provide a migration guide of the delta streamer to the users.

@vinothchandar
Copy link
Member

Given that, do we still need the ability to search for the checkpoints in reverse time order? tbh I don't see a value in it, since there cannot be multiple writers to a hudi table anyway.

May be we can think about an CheckPointProvider abstraction where if DeltaStreamer cannot find a checkpoint from the last delta commit/commit, it invokes checkpointProvider.getCheckpoint(). We can actually introduce that in this PR and have two implementations

  1. (default)NoOpCheckpointProvider (throws an error if it cannot find a checkpoint)
  2. ScanOlderCommitsCheckpointProvider (what you have now)

@garyli1019
Copy link
Member Author

Given that, do we still need the ability to search for the checkpoints in reverse time order?

Maybe not anymore? If I have a tool to tell me where the checkpoint is, I can use the --checkpoint field directly. Do you see any other use case the reverse search would be useful?

@vinothchandar
Copy link
Member

Do you see any other use case the reverse search would be useful?

No. not at the moment.. We can close this PR out if you agree

@garyli1019
Copy link
Member Author

ok, I will make a separate PR for the tool. Thanks everyone who participated in this long discussion...

@garyli1019 garyli1019 closed this Mar 19, 2020
@vinothchandar
Copy link
Member

No. thank you.. This kind of stuff, gives me energy to keep pushing more :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants