-
Notifications
You must be signed in to change notification settings - Fork 13.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-28910][Connectors/hbase]Fix potential data deletion while updating HBase rows #20542
Conversation
@flinkbot run azure |
@flinkbot run azure |
hi @luoyuxia, could you please help me review the changes?Thank you. |
@wuchong @dannycranmer could you please help me review the changes?Thank you.😄 |
@MartijnVisser could you please help me review the changes?Thank you.😄 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For clarity, the title could be changed to something like 'Fix potential data deletion while updating HBase rows'. Just my suggestion : )
Thank you for your suggestion, I think it is really much more clear. |
@ganlute I have no experience with HBase, so I can't review it unfortunately. I think the Flink community is lacking on HBase maintainers in general to be honest. |
@leonardBang Do you think you could have a look? Since you have experience with CDC, I thought you might could help out here :) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also have no experience with HBase, however your thread safe logic looks ok, besides the following callouts.
@@ -76,6 +79,7 @@ | |||
private transient ScheduledExecutorService executor; | |||
private transient ScheduledFuture scheduledFuture; | |||
private transient AtomicLong numPendingRequests; | |||
private static Map<byte[], Mutation> mutationMap = new HashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this static? This means subtasks of the same job would all have access to, and try to flush the same data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for your review. I agree with you. I try to make it be the global queue to reduce the rowkey before. However, in this case, In fact, there is no need to declare it as static.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will fix it later.
@@ -213,6 +225,7 @@ public void close() throws Exception { | |||
|
|||
if (mutator != null) { | |||
try { | |||
flush(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While your thread safety looks ok (besides the static
Map
), generally speaking flush()
on close()
can cause issues. If the destination is down, the job might fail to stop. A better solution is to checkpoint the internal buffer. Longer term we can consider migrating to the Async Sink base.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @dannycranmer , personally I reckon this might not be an issue because according to the documentation, the close()
method of mutator inherently flushes buffered data to the HBase server before closing the connection, so the flush logic is already there before this PR.
Also, as it is already in the try ... catch
block, when an IOException
is thrown by the client during the flush, the job-stopping process would not be interrupted as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
master:
close(){
...
mutator.close();
...
}
this pr:
close(){
...
flush();
mutator.close();
...
}
On the one hand, mutator.close will call mutator.flush as well. I think the metioned problem will happen at master as well. On the other hand, If the destination is down, mutator.close/mutator.flush will throw IOException.
|
||
Mutation mutation = mutationConverter.convertToMutation(value); | ||
synchronized (mutationMap) { | ||
mutationMap.put(mutation.getRow(), mutation); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this way, all mutations with the same rowkey value will only remain the last one.
But mutationConverter
behavior is not controlled, which means such mutations as following list will casuse data quaility problem.
-D (rk1, f1:v1)
+I (rk1, f2:v2)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For now, all mutationConverter implementations is ok in this case.
308b10a
to
0b7ce93
Compare
The failure of CI seems to have nothing to do with pr. |
|
||
Mutation mutation = mutationConverter.convertToMutation(value); | ||
synchronized (mutationMap) { | ||
mutationMap.put(mutation.getRow(), mutation); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One potential bug might arise when the mutation.getRow()
returns an array. As we know, the hashCode
and equals
of two different array instances are different regardless of whether their contents are identical.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To overcome this, I suggest to convert it to a base64 string, e.g.:
String key = Base64.getEncoder().encodeToString(mutation.getRow());
Or to create a simple wrapper class, where the equals
and hashCode
are overridden properly for arrays.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thank you for yours suggestion, I will improve it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the ByteBuffer
improvement, I added another comment, but the logic itself LGTM.
@@ -76,6 +80,7 @@ | |||
private transient ScheduledExecutorService executor; | |||
private transient ScheduledFuture scheduledFuture; | |||
private transient AtomicLong numPendingRequests; | |||
private Map<ByteBuffer, Mutation> mutationMap = new HashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This field could be final
as well. Can we move it under mutationConverter
and init it inside the ctor to be consistent with the field initialization?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ganlute I tested about 2 million data ingest into the HBase,
When sink.buffer-flush.max-rows=1000, the data can finally be consistent.
When sink.buffer-flush.max-rows=1, the data cannot be guaranteed to be consistent
@@ -201,6 +208,12 @@ public void invoke(T value, Context context) throws Exception { | |||
} | |||
|
|||
private void flush() throws IOException { | |||
synchronized (mutationMap) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding mutationMap to drop duplicated data on the client side could not avoid the data consistency issue. For example:
sink.buffer-flush.max-rows=1
+I(1,...)
-U(1,...)
+U(1,...)
These three rows are put into HBase with the same timestamp version.
In the end, HBase cannot find the data with rowkey=1.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @YesOrNo828 , this issue has already been addressed in FLINK-32139, and the PR is merged into master. Therefore, this PR may no longer be needed.
So is this superseded by #22612 or not? |
Yes, the 2 issues have the same root cause, that an insert and a delete operation are passed to HBase with the same millisecond precision TS and in that case, the order of the HBase execution is not guaranteed. The changes made in #22612 explicitly sets nanosecond precision timestamps for the HBase operations, so it eliminates the possibility to have multiple operations "at the same time", so deletes and inserts will be executed in correct order. |
What is the purpose of the change
https://issues.apache.org/jira/browse/FLINK-28910
Brief change log
Verifying this change
CI passed
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: noDocumentation