-
Notifications
You must be signed in to change notification settings - Fork 13k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-3857][Streaming Connectors]Add reconnect attempt to Elasticsearch host #1962
Conversation
Hello @fhueske , The tests do not fail because of the changes made in the PR. I tested the Junits for elasticsearch connector & all of them runs fine. |
@@ -86,6 +88,7 @@ | |||
public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = "bulk.flush.max.actions"; | |||
public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = "bulk.flush.max.size.mb"; | |||
public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = "bulk.flush.interval.ms"; | |||
public static final String CONFIG_NO_OF_CONN_RETRIES = "conn.retries"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We usually don't abbreviate configuration keys. Can you rename it to "CONFIG_KEY_CONNECTION_RETRIES" ?
Thank you for opening the pull request. I made some inline comments. |
The change is missing documentation updates & test cases. |
d24aa6e
to
9f5df39
Compare
Hello @rmetzger , Thanks a lot for reviewing the PR. |
Hello @rmetzger , Looking at the test case |
How did you test/try out the code you've implemented in this pull request? |
9f5df39
to
93aba33
Compare
Hello @rmetzger , I added a testcase now to the |
I took a quick look at this. I am wondering if this actually needs an extra timer service for retries. Can this be solved without a timer? The failures could be detected in the Triggering asynchronous timers is very complex and easily creates leaks, races, or leftover work / tasks at shutdown. |
93aba33
to
ee23516
Compare
Hello @StephanEwen , I have removed the asynchronous timer & doing the retry logic directly now. The backoff is 3s. Please have a look. |
|
||
if (params.has(CONFIG_KEY_CONNECTION_RETRIES)) { | ||
this.connectionRetries = params.getInt(CONFIG_KEY_CONNECTION_RETRIES); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to have a default value set if not specified by user? Otherwise null exception in invoke().
Hi @sbcd90, Gave the changes a quick review and commented. please let me know your opinion on them. Hope they'll be helpful to get you going. |
ee23516
to
b835402
Compare
Hello @tzulitai , I think default value for int in Java is 0. |
Hi @sbcd90, |
LOG.info("Created Elasticsearch TransportClient {}", client); | ||
if (LOG.isInfoEnabled()) { | ||
LOG.info("Created Elasticsearch TransportClient {}", client); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Log message here doesn't match the purpose of this method
Hi @sbcd90, On the other hand, another problem that raises if we are to add reconnect attempt for the ES sink is that failing records due to connection errors also need to be caught in the |
Using the sniffing feature of transport client can achieve this with the current ES2 connector in master branch. Explanation:
Source: |
Thanks @HungUnicorn, thats useful info. I wonder though if this config should be set by the user, instead of letting the connector internally set this. |
Hi @sbcd90, will you like to continue working on this PR? There's going to be a restructuring of the ES connectors (#3112) perhaps soon after the 1.2 release, and this PR will very likely need a rebase. I'd like to include this fix after the restructuring, so please let me know on how you'd like to proceed with this contribution :-) Thank you! |
Hi @tzulitai , thanks for the updates. I'll refactor the code & will rebase the PR. |
b835402
to
85c45ce
Compare
85c45ce
to
0c8bbc7
Compare
Hello @tzulitai , I have rebased the changes. Can you please review? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot for the rebase, and really sorry for the long delay on the review here @sbcd90.
I finally have some time to revisit the Elasticsearch connectors :) I have some comments, could you please let me know what you think?
elasticsearchSinkFunction.process(value, getRuntimeContext(), requestIndexer); | ||
} catch (Exception ex) { | ||
if (client instanceof TransportClient && !callBridge.isConnected(((TransportClient) client))) { | ||
TimeUnit.SECONDS.sleep(3); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be configurable?
Also, could you explain a bit on why you've chosen 3 seconds?
|
||
// if there is a connectivity failure, then retry | ||
if (failureThrowable.get() != null && | ||
client instanceof TransportClient && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why exactly does the client need to be a TransportClient
?
@@ -208,6 +222,13 @@ public void invoke(T value) throws Exception { | |||
checkErrorAndRethrow(); | |||
|
|||
elasticsearchSinkFunction.process(value, getRuntimeContext(), requestIndexer); | |||
|
|||
// if there is a connectivity failure, then retry | |||
if (failureThrowable.get() != null && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering whether or not checking the exception type would be enough for verifying the connectivity failure.
} | ||
|
||
try { | ||
open(null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This open() call seems a bit odd to me. I don't think its a good practice to call that here, since essentially its a life cycle method used by the system.
I'm closing this as "Abandoned", since there is no more activity and the code base has moved on quite a bit. Please re-open this if you feel otherwise and work should continue. |
mvn clean verify
has been executed successfully locally or a Travis build has passed