Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-32139][connector/hbase] Using strongly increasing nanosecond timestamp and DeleteColumn type to fix data accidental deletion and non deletion issues when sink hbase. #22612

Merged

Conversation

lzshlzsh
Copy link

@lzshlzsh lzshlzsh commented May 20, 2023

What is the purpose of the change

Using strongly increasing nanosecond timestamp and DeleteColumn type to fix data accidental deletion and non deletion issues when sink upsert data to hbase.

https://issues.apache.org/jira/browse/FLINK-32139

Brief change log

  • Add a class HBaseTimestampGenerator which generates strongly increasing timestamp in nanoseconds to set to hbase mutation.
  • HBaseSerde createPutMutation and createDeleteMutation use muation with timestamp.
  • Use delete.addColumns instead of delete.addColumn, the first hbase client api use DeleteColumn key type and the second use Delete key type.

Verifying this change

  • Added test that validates that HBaseTimestampGenerator generates strongly increasing timestamp.
  • Online test described in #32139

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? no

@flinkbot
Copy link
Collaborator

flinkbot commented May 20, 2023

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

for (int i = 0; i < 100_000_000; i++) {
long now = timestampGenerator.get();
if (lastTimestamp > 0) {
assertTrue(lastTimestamp < now);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to use 'now > lastTimestamp' as assertion condition to explicitly show that timestamp is increasing

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good advice

}
final long realNow = timestampGenerator.getCurrentSystemTimeNano();
final long diff = lastTimestamp - realNow;
assertTrue("Should no timestamp diff in 100 billion tests", diff <= 0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

billion → million

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks

final long diff = lastTimestamp - realNow;
assertTrue("Should no timestamp diff in 100 billion tests", diff <= 0);
LOG.info(
"timestamp diff of 100 billion tests in nanosecond: {} (use {}, real {})",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

billion → million

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

lastTimestamp = now;
}
final long realNow = timestampGenerator.getCurrentSystemTimeNano();
final long diff = lastTimestamp - realNow;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to use 'realNow - lastTimestamp' to explicitly show that the timestamp is increasing

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea

…DeleteColumn type to fix data accidental deletion and non deletion issues when sink hbase.
@lzshlzsh lzshlzsh force-pushed the dev-fix-hbase-sink-delete-problem branch from a234181 to fe2ef22 Compare May 22, 2023 04:45
@kylemeow
Copy link
Contributor

Also, it is recommended that you add test cases to verify this HBase data loss issue is completely fixed with this solution.

public void testStronglyIncreasingTimestampGenerator() {
HBaseTimestampGenerator timestampGenerator = HBaseTimestampGenerator.stronglyIncreasing();
long lastTimestamp = 0;
for (int i = 0; i < 100_000_000; i++) {
Copy link
Contributor

@MartijnVisser MartijnVisser May 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please have a look at https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#7-testing - Having a test run 100 million times isn't an effective test

Copy link
Author

@lzshlzsh lzshlzsh May 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please have a look at https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#7-testing - Having a test run 100 million times isn't an effective test

@MartijnVisser
Thank you for your review and valuable feedback. I think I may need to use Junit5 to write test cases, and change 100 million times tests to one times。Can you give more suggestions on how to write this test?

Actually, I have another question. How to write test cases for the probability of errors occurring in this data loss situation, as we cannot assert a definite state ?

Copy link
Contributor

@ferenc-csaky ferenc-csaky left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The solution logic itself LGTM. Agree what other reviewers mentions about the tests. Added 1 more comment.

package org.apache.flink.connector.hbase.util;

/** Generate timestamp for HBase mutation. */
public abstract class HBaseTimestampGenerator {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a chance we will have another implementation for this class in the future? Even if yes, I think we can introduce the extra abstraction layer when it will be necessary. For now, a simple class, e.g. StronglyIncreasingTsGenerator with 1 public method that returns the ts would be enough, resulting a simpler and cleaner solution.

Copy link
Author

@lzshlzsh lzshlzsh May 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a chance we will have another implementation for this class in the future? Even if yes, I think we can introduce the extra abstraction layer when it will be necessary. For now, a simple class, e.g. StronglyIncreasingTsGenerator with 1 public method that returns the ts would be enough, resulting a simpler and cleaner solution.

Thank you for your feedback. I will make some modifications according to your suggestion.

@lzshlzsh
Copy link
Author

@flinkbot run azure

Copy link
Contributor

@ferenc-csaky ferenc-csaky left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thank you for incorporating the requested changes!

@MartijnVisser MartijnVisser merged commit ebfeb32 into apache:master Jun 5, 2023
@1996fanrui
Copy link
Member

Hi @MartijnVisser , thanks for the reviewing and merging.

A friendly reminder: Please note the squish commit before merging and the commit message should start with [FLINK-JIRA_ID], thanks~

@MartijnVisser
Copy link
Contributor

A friendly reminder: Please note the squish commit before merging and the commit message should start with [FLINK-JIRA_ID], thanks~

Hi @1996fanrui - Yeah I made a mistake, there's unfortunately nothing I can do to repair that situation :(

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants