-
Notifications
You must be signed in to change notification settings - Fork 25
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-30998] Add optional exception handler to flink-connector-opensearch #11
Conversation
80ea673
to
ff041bd
Compare
@lilyevsky the relevant changes looks good, but your branch is out of sync with https://github.com/apache/flink-connector-opensearch, you need to sync + rebase |
@reta I see. Please disregard my previous message. |
I think I can do it in github: https://docs.github.com/en/pull-requests/collaborating-with-pull-requests/working-with-forks/syncing-a-fork |
79222a1
to
1c4eda0
Compare
Done |
this.hosts = checkNotNull(hosts); | ||
checkArgument(!hosts.isEmpty(), "Hosts cannot be empty."); | ||
this.emitter = checkNotNull(emitter); | ||
this.deliveryGuarantee = checkNotNull(deliveryGuarantee); | ||
this.buildBulkProcessorConfig = checkNotNull(buildBulkProcessorConfig); | ||
this.networkClientConfig = checkNotNull(networkClientConfig); | ||
this.failureHandler = failureHandler; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this.failureHandler = failureHandler; | |
this.failureHandler = checkNotNull(failureHandler); |
@@ -72,6 +72,7 @@ | |||
private Integer connectionRequestTimeout; | |||
private Integer socketTimeout; | |||
private Boolean allowInsecure; | |||
private FailureHandler failureHandler; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private FailureHandler failureHandler; | |
private FailureHandler failureHandler = DEFAULT_FAILURE_HANDLER; |
@@ -343,7 +348,11 @@ private void extractFailures(BulkRequest request, BulkResponse response) { | |||
if (chainedFailures == null) { | |||
return; | |||
} | |||
throw new FlinkRuntimeException(chainedFailures); | |||
if (failureHandler == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This null
check is not needed anymore (the builder would supply the default handler), just failureHandler.onFailure(chainedFailures);
Thanks @lilyevsky just a few minor comments, I think the only part that is missed are tests, and probably you may need to run |
@reta Did all the changes you suggested, also ran the mvn spotless:apply . |
👍 thank you!
Hm .. could you share the error please?
Yes, sure, I think OpensearchWriterITCase could be a good candidate to add a test case with the simulated failure (fe we could pause / shutdown Opensearch container, just an idea). It may be a bit difficult, please let me know if help needed, thank you. |
The error is about this line: Apparently that parameter is no longer present. I actually can fix all of that if it is OK, then you can review. Also will look into the tests tomorrow. |
Thank you, cleanup would be much appreciated, I would suggest let's finish up with tests first so to have the scope finalized, and we could fix javadocs after. Thanks! |
Sure. |
@reta I fixed some warnings, pushed as a separate commit, so you can review. |
I shared the hints with you here #11 (comment) Here is sample test case:
You need to attach the failure handler and assert afterwards it has been called. |
@reta Trying to add the test. For some reason the tests from OpensearchWriterITCase class are not executed at all. mvn clean test in flink-connector-opensearch directory. I see that some tests from org.apache.flink.connector.opensearch.sink.OpensearchSinkBuilderTest are executed, but not from other 4 files in the same directory. |
@lilyevsky OpensearchWriterITCase is integration test, |
Thanks @reta . One more step. It looks like I need to have a "valid Docker environment". I install the docker things: sudo apt-get install docker-ce docker-ce-cli containerd.io docker-buildx-plugin docker-compose-plugin What else do I need to do? |
@lilyevsky I assume you are on Ubuntu [1] |
@reta I installed docker, the daemon is running. I think I can fix it. Will let you know. |
@reta Added the test. I think now it is good. |
@@ -103,7 +107,8 @@ | |||
BulkProcessorConfig bulkProcessorConfig, | |||
NetworkClientConfig networkClientConfig, | |||
SinkWriterMetricGroup metricGroup, | |||
MailboxExecutor mailboxExecutor) { | |||
MailboxExecutor mailboxExecutor, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please update javadocs with failureHandler
arg
return failed; | ||
} | ||
|
||
@java.lang.Override |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Super nit, just @Override
@@ -238,19 +240,68 @@ void testCurrentSendTime() throws Exception { | |||
} | |||
} | |||
|
|||
private class TestHandler implements FailureHandler { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably private static
is better
// Trigger an error by updating non-existing document | ||
writer.write(Tuple2.of(1, "u" + buildMessage(1)), null); | ||
context.assertThatIdsAreNotWritten(index, 1); | ||
assertThat(testHandler.isFailed()).isEqualTo(true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
@@ -174,7 +175,8 @@ void testIncrementByteOutMetric() throws Exception { | |||
new BulkProcessorConfig(flushAfterNActions, -1, -1, FlushBackoffType.NONE, 0, 0); | |||
|
|||
try (final OpensearchWriter<Tuple2<Integer, String>> writer = | |||
createWriter(index, false, bulkProcessorConfig, metricGroup)) { | |||
createWriter( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You probably don't need this change since createWriter
has an overloaded version without failure handler
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, it will be better not to touch that line. I did it because it needed to pass the metricGroup.
So I am fixing the situation by adding yet another constructor, with metricGroup but without failureHandler.
Also in the main constructor put the failureHandler at the end of the parameters list.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need for constructor, createWriter
is a method in this class
import java.io.Serializable; | ||
|
||
/** Handler to process failures. */ | ||
@Public |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Public
-> @PublicEvolving
would be probably better (to leave some space to evolve the API)
@lilyevsky a few minor comments, LGTM otherwise! |
@reta I committed the changes you suggested. |
} | ||
|
||
@Override | ||
public void write(IN element, Context context) throws IOException, InterruptedException { | ||
public void write(IN element, Context context) throws InterruptedException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please revert these changes, they violate the SinkWriter
interface
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lilyevsky could you please address this and this comment? thank you
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@reta No problem, could you please clarify: you want me to put back the IOException to both write and flush methods, correct? Also, I am not sure what you mean by addressing this . Is it about removing the "@SuppressWarnings("All")"?
Please confirm.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lilyevsky correct, the throws IOException, InterruptedException
is the right part of the signature: the first comment is about write
, the second is about flush
, thank you
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@reta Done. Also removed the line with warnings suppressions that I added at some point.
@@ -134,7 +141,7 @@ public void write(IN element, Context context) throws IOException, InterruptedEx | |||
} | |||
|
|||
@Override | |||
public void flush(boolean endOfInput) throws IOException, InterruptedException { | |||
public void flush(boolean endOfInput) throws InterruptedException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same, please revert
@@ -148,6 +150,7 @@ public OpensearchSinkBuilder<IN> setBulkFlushMaxActions(int numMaxActions) { | |||
* @param maxSizeMb the maximum size of buffered actions, in mb. | |||
* @return this builder | |||
*/ | |||
@SuppressWarnings("UnusedReturnValue") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand why these suppression are needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I put it because compiler gave me a warning about it. I can remove that suppression if you prefer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please remove, thank you
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @reta , please let me know what are the plans for merging.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @reta , please let me know what are the plans for merging.
Thanks @lilyevsky , LGTM, asked Flink committers to take I look (I do not have commit rights)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @reta
@MartijnVisser @zentol could you please take a look guys? thank you! |
I see since I finalized this request there were some other changes in main branch, and so there are conflicts. |
I actually resolved the conflicts in the web editor, and updated my branch. |
@MartijnVisser @snuyanzin please help us here :-) |
Actually, hold on for now. I was not able to compile it in my work environment, now tried in my home desktop and found some errors. Let me fix them. Will let you know. |
I made it clean now, you can review and maybe merge already. |
So you can merge it now? |
Awesome work, congrats on your first merged pull request! |
No description provided.