Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-349]: Added new cleaning policy based on number of hours #3646

Merged
merged 15 commits into from
Feb 21, 2022

Conversation

pratyakshsharma
Copy link
Contributor

@pratyakshsharma pratyakshsharma commented Sep 12, 2021

Tips

What is the purpose of the pull request

Add a new cleaning policy based on the number of hours. This gives more flexibility for users compared to the one based on the number of commits

Brief change log

  • Added new cleaning policy
  • Added supporting methods

Verify this pull request

  • Added 2 test cases in TestCleaner class

(Please pick either of the following options)

This pull request is a trivial rework / code cleanup without any test coverage.

(or)

This pull request is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end.
  • Added HoodieClientWriteTest to verify the change.
  • Manually verified the change by running a job locally.

Committer checklist

  • Has a corresponding JIRA in PR title & commit

  • Commit message is descriptive of the change

  • CI is green

  • Necessary doc changes done or have another open PR

  • For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.

earliestCommitToRetain = commitTimeline.nthInstant(commitTimeline.countInstants() - commitsRetained); //15 instants total, 10 commits to retain, this gives 6th instant in the list
} else if (config.getCleanerPolicy() == HoodieCleaningPolicy.KEEP_LAST_X_HOURS) {
Instant instant = Instant.now();
ZonedDateTime commitDateTime = ZonedDateTime.ofInstant(instant, ZoneId.systemDefault());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have any precedence in hudi code base for doing time based calculations. Can you explore and let me know. Wanna maintain some uniformity if we have any.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me check.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should sync this to how the instant times are generated.

Another way to think about this could be:
latestInstantTime = HoodieActiveTimeline.createNewInstantTime()
earliestCommitToRetain = HoodieActiveTimeline.findInstantInPast(latestInstantTime, N); // N = number of hours in the past.

Keeping the instant timestamp code together will prevent any surprises from a wrong time calculation cleaning data unintentionally.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nsivabalan I checked. There is no uniformity as of now. We use joda dateTime library in TimestampBasedKeyGenerator, java.time.ZonedDateTime in SlashEncodedHourPartitionValueExtractor, and java.util.Date in HoodieActiveTimeline.

@prashantwason nthInstant is what we are using throughout the codebase. I guess there is no need to introduce a new method findInstantInPast here.

@nsivabalan
Copy link
Contributor

@prashantwason : can you please address the feedback when you get a chance.

@pratyakshsharma
Copy link
Contributor Author

@nsivabalan ack.

@pratyakshsharma
Copy link
Contributor Author

@nsivabalan Please take a look.

@veenaypatil
Copy link
Contributor

any update on this , I faced an issue today where the ETL failed in Prod because the file was deleted by Cleaner service

Caused by: java.io.FileNotFoundException: No such file or directory: s3a://bucket/cdcv2/data/a02b9653-d715-43a7-8faf-950cbdafebc4-0_123-95450-42475479_20211111205930.parquet
[2021-11-12 00:40:15,491] {logging_mixin.py:112} INFO - [2021-11-12 00:40:15,491] {batch.py:321} INFO - 	at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2269)

Having config based on time will help instead of relying on Number of commits
cc @vinothchandar

@nsivabalan
Copy link
Contributor

@pratyakshsharma : Can you update the patch when you get a chance.

@pratyakshsharma
Copy link
Contributor Author

@nsivabalan Please take a pass.

@vinothchandar
Copy link
Member

Let me review this as well

} else if (policy == HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS) {
// This block corresponds to KEEP_LATEST_BY_HOURS policy
// Do not delete the latest commit.
if (fileCommitTime.equals(lastVersion)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we know why do have (fileCommitTime.equals(lastVersionBeforeEarliestCommitToRetain) in L318. I would prefer to keep the same logic unless we have strong reasons to remove. As I mentioned, cleaning based on num hours is just a diff way to conveying retain N commits. So, lets try to keep the logic similar across both. Easier to maintain.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cleaner policies are designed to enable the longest running query to succeed. Let us take a small example where ingestion is happening every 30 minutes and hoodie.cleaner.commits.retained = 2. This implies the longest running query is going to take 1 hour to complete. Now if I maintain a list of commits, when commit at index=2 in this list (i.e 3rd commit from the beginning) is completed, the query which started when commit at index=0 completed might still be running.

list of commits -> 0. 1. 2
time (in mins). -> 0. 30. 60

Hence to allow this query to finish, we take into account lastVersionBeforeEarliestCommitToRetain in L318. lastVersion in this case is going to be commit at index=1.

With the new policy KEEP_LATEST_BY_HOURS, when 3rd commit is completed, 1st commit will be retained automatically by the logic. Hence the code in its current form works fine.

PR Tracker Board automation moved this from Ready for Review to Nearing Landing Jan 24, 2022
@nsivabalan
Copy link
Contributor

@prashantwason : ping

@prashantwason
Copy link
Member

@prashantwason : ping

@nsivabalan Did you intend to ping @pratyakshsharma so he can respond to the comments? For me, this is on my review list but am a bit behind due to metadata index related PRs.

@nsivabalan nsivabalan added the priority:critical production down; pipelines stalled; Need help asap. label Feb 8, 2022
@pratyakshsharma
Copy link
Contributor Author

pratyakshsharma commented Feb 8, 2022

@nsivabalan @prashantwason

I had been waiting for @vinothchandar 's review on this. Let me address the existing comments so we can land this.

@pratyakshsharma
Copy link
Contributor Author

@hudi-bot run azure

@pratyakshsharma
Copy link
Contributor Author

@nsivabalan Please take a pass.

@nsivabalan
Copy link
Contributor

@pratyakshsharma : Can you check for CI failures. also, is it already rebased w/ latest master.

Copy link
Contributor

@nsivabalan nsivabalan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As suggested in other patch, can we try to keep the tests to TestCleanActionExecutor if possible.

@nsivabalan
Copy link
Contributor

thanks for fixing all refactoring feedback. now the source code changes looks a lot less.

@nsivabalan
Copy link
Contributor

@pratyakshsharma : once you address the feedback, let me know.

Copy link
Contributor

@nsivabalan nsivabalan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. wrt tests as suggested, can we move the tests to CleanActionExecutor tests rather than at write client layer. Is there any challenges with that?

*/
@ParameterizedTest
@MethodSource("argumentsForTestKeepLatestCommits")
public void testKeepXHoursWithCleaning(boolean simulateFailureRetry, boolean enableIncrementalClean, boolean enableBootstrapSourceClean) throws Exception {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have moved only one test in this PR since the rest are already moved in #4385. Once that is merged, changes will automatically reflect here as well. @nsivabalan

@hudi-bot
Copy link

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@nsivabalan nsivabalan merged commit bf16bc1 into apache:master Feb 21, 2022
PR Tracker Board automation moved this from Nearing Landing to Done Feb 21, 2022
vingov pushed a commit to vingov/hudi that referenced this pull request Apr 3, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
on-call-triaged priority:critical production down; pipelines stalled; Need help asap.
Projects
Development

Successfully merging this pull request may close these issues.

None yet

6 participants