Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-2536][streaming]add a re-connect for socket sink #1030

Closed
wants to merge 2 commits into from

Conversation

HuangWHWHW
Copy link
Contributor

add a re-connect in function invoke() when it throws exception.

@HuangWHWHW
Copy link
Contributor Author

Add tests for retry 10 times and 0.

dataOutputStream.write(msg);
success = true;

}catch(Exception ee){
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We usually add whitespaces between keywords.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really want to catch all exceptions here? Maybe IOException is enough?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it depends on you.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, it actually depends on the exceptions for which we want to restart. If I'm not mistaken, then new Socket() and dataOutputStream.write only throw IOExceptions. Thus, we should change it.

@tillrohrmann
Copy link
Contributor

Hi @HuangWHWHW, I had some minor comments. Could we also add a test case where we test that the SocketClientSink can reconnect against a newly opened socket after it has been closed? This would be great.

@HuangWHWHW
Copy link
Contributor Author

" Could we also add a test case where we test that the SocketClientSink can reconnect against a newly opened socket after it has been closed? This would be great."

Good idea!
I will try to do it.

@HuangWHWHW
Copy link
Contributor Author

@tillrohrmann
Hi,
I take a new fix and add a test for retry success.
Would you please to take a look?
Thank you.

BTW:Why does not the CI rerun??

server.close();

if (error.get() != null) {
Throwable t = error.get();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's good to set the error to null again. Otherwise all subsequent tests will fail with the same error message which is, however, completely unrelated.

@tillrohrmann
Copy link
Contributor

Hi @HuangWHWHW, I had some comments concerning the test cases.

@HuangWHWHW
Copy link
Contributor Author

Hi,
Thank you.
I`ll update a new fix.

} catch(IOException ee) {
Log.error("Reconnect to socket server and send message failed. Caused by " +
ee.toString() + ". Retry time(s):" + retries);
synchronized (this) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Locking on this is usually not recommended because outside code can influence your behaviour here. Thus, use a private lock field here.

@HuangWHWHW
Copy link
Contributor Author

@tillrohrmann
Hi, I take a new fix.
But this:
image
I have no good idea to care both the sink need to retry and should not finished retry when I reopen the socket server.
So, I change the test "testSocketSinkRetryAccess” from retry ten times to retry forever since this will never finished retry until the method closeConnection() is called or the reconnect is success.

@HuangWHWHW
Copy link
Contributor Author

@tillrohrmann
BTW:How to make the CI rerun?

@HuangWHWHW
Copy link
Contributor Author

@tillrohrmann
Hi,very sorry for disturbing.
I have changed the PR as your comments and passed the CI.
It will be thankful if you can take a look.

byte[] msg = schema.serialize(value);
try {
dataOutputStream.write(msg);
} catch (IOException e) {
throw new RuntimeException("Cannot send message " + value.toString() +
" to socket server at " + hostName + ":" + port, e);
LOG.error("Cannot send message " + value.toString() +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can log the exception simpler like this:

LOG.error("Cannot send message " + value + " to socket server at " + hostName + ":" + port + ". Trying to reconnect.", e);
`´`

@HuangWHWHW
Copy link
Contributor Author

@StephanEwen
Hi, I take changes for your comments but the synchronized (lock) in SocketClientSink.java.
Need I change this to Thread.sleep()?

@HuangWHWHW
Copy link
Contributor Author

@StephanEwen
@tillrohrmann
Hi,
I get the CI to rerun.
Any new comment?

@HuangWHWHW
Copy link
Contributor Author

@StephanEwen
@tillrohrmann
Hallo?

@tillrohrmann
Copy link
Contributor

Sorry, @HuangWHWHW, currently we're really busy. I'll try to review your PR once I find a free minute.

@StephanEwen
Copy link
Contributor

I think this looks good, except for the comment with the final variable for the lock.

One more comment: When concatenating strings, avoid constructs like " value=" + value.toString(). Rather do "value=" + value. That is safe against null pointers.

@HuangWHWHW
Copy link
Contributor Author

@tillrohrmann
Ah,sorry for bothering.
It doesn't matter.
Just I thought I did something wrong in the community.
:-D

@StephanEwen
Copy link
Contributor

No worries. We are simply overloaded right now. Many hard features under development, and many pull requests being opened.

@HuangWHWHW
Copy link
Contributor Author

I removed the toString() method and change the lock to private final SerializableObject lock.

@HuangWHWHW
Copy link
Contributor Author

Sorry for misunderstading.
I have changed the LOG.error(e) to LOG.error(e.getMessage()) now.

@HuangWHWHW
Copy link
Contributor Author

Hi, very sorry for bothering again.
Since two weeks passed, do you have some time to review this PR recently?
Will greatly appreciate it:)

@StephanEwen
Copy link
Contributor

Looks good now, will merge this...

@asfgit asfgit closed this in fd354ba Sep 21, 2015
@StephanEwen
Copy link
Contributor

I reworked this quite heavily during merging. There were a lot of issues that were against good Java style:

  • Variables in the classes, rather than in methods
  • The way references to threads were obtained
  • Defining clear parameter checks and exceptions
  • Handling InterruptedExceptions
  • polling versus clear conditions when state can be checked

You can have a look at the code after my fixes, to see these issues in context.

I would suggest to get a Java book (like "Effective Java", that is a good one) and take this as a guideline for future work. This pull request took more than 70 comments and still needed quite some rework (not for Flink-specific issues, but all of it general Java style/efficiency/correctness). I am afraid we cannot do that for every pull request, it would be completely overwhelming...

@HuangWHWHW
Copy link
Contributor Author

@StephanEwen
Hi, I'm very sorry for the poor Java style of mine.
And many thanks for your rework.I did a full review about your new fixes and get the points.
I'll be more careful next time!
And also thanks for the book.I'm doing more studies from now on.
Generally, thanks for the time very much!

@StephanEwen
Copy link
Contributor

@HuangWHWHW No problem, we all learn all the time.
It would only help to review and merge pull requests if the style follows more the Java best practices. It is something that you will learn fast, I am sure.

nikste pushed a commit to nikste/flink that referenced this pull request Sep 29, 2015
lofifnc pushed a commit to lofifnc/flink that referenced this pull request Oct 8, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants