Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-2490][FIX]Remove the retryForever check in function streamFrom… #992

Closed
wants to merge 2 commits into from

Conversation

HuangWHWHW
Copy link
Contributor

In the class SocketTextStreamFunction, the var retryForever only be set in the line "this.retryForever = maxRetry < 0;"(SocketTextStreamFunction.java:54).
When the program executes this “while (retry < maxRetry && !success) ” it means the maxRetry > 0 and the retryForever will always be false.
So it`s unnecessary to judge whether retryForever be false.

@zentol
Copy link
Contributor

zentol commented Aug 6, 2015

if you remove that check, retryForever is unused and can be removed completely.

@HuangWHWHW
Copy link
Contributor Author

Yes, I understand you.
But I think the retryForever is necessary.
Maybe there is a bug that make the retryForever not working.
I`ll get another fix after the CI.

@zentol
Copy link
Contributor

zentol commented Aug 6, 2015

If you think it was necessary why was your first step to remove it's usage...

@HuangWHWHW
Copy link
Contributor Author

Hah....
Sorry, this thought was generated after this PR.

@StephanEwen
Copy link
Contributor

This fix looks valid.

Can it be included in an extended test for the socket function? Something that validates that the function properly tries to reconnect?

@HuangWHWHW
Copy link
Contributor Author

@StephanEwen
Hi,yes,I plan to add a test for it.
However, the test may be failed since the retryForever in the flink-master is also unworked currently.
Will the test I`d add run together with this PR?

@mxm
Copy link
Contributor

mxm commented Aug 11, 2015

@HuangWHWHW retryForever is just a convenience variable for maxRetry < 0. Your fix is correct because the loop will only execute if maxRetry > 0 and thus not execute at all if it should retry "forever". It would be great if you added a test that checks for the correct number of retries. In case of infinite retries, just check up to a certain number (e.g. 100 retries).

@HuangWHWHW
Copy link
Contributor Author

@mxm
Ok, Ill add a test. There is a little difficult that I cant get the retry times in test since the retry is a local variable.
So can I add a function to get the retry times?

@HuangWHWHW
Copy link
Contributor Author

@mxm
@StephanEwen
Hi, I do a test for this today and I got another problem.
The SocketTextStreamFunction use BufferedReader.read() to get the buffer which is sent by socket server.
And whether this function BufferedReader.read() will never return -1 as the end of the sent message?
If it was there should be another bug that code following will never be reachable:

if (data == -1) {
socket.close();
long retry = 0;
boolean success = false;
while ((retry < maxRetry || retryForever) && !success) {
if (!retryForever) {
retry++;
}
LOG.warn("Lost connection to server socket. Retrying in "
+ (CONNECTION_RETRY_SLEEP / 1000) + " seconds...");
try {
socket = new Socket();
socket.connect(new InetSocketAddress(hostname, port),
CONNECTION_TIMEOUT_TIME);
success = true;
} catch (ConnectException ce) {
Thread.sleep(CONNECTION_RETRY_SLEEP);
socket.close();
}
}

                if (success) {
                    LOG.info("Server socket is reconnected.");
                } else {
                    LOG.error("Could not reconnect to server socket.");
                    break;
                }
                reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                continue;
            }

@mxm
Copy link
Contributor

mxm commented Aug 12, 2015

@HuangWHWHW read() method of the BufferedReader object returns -1 in case the end of the stream has been reached.

A couple of things I noticed apart from the retryForever issue. I wonder if we can fix these with this pull request as well:

  1. The control flow of the streamFromSocket function is hard to predict because there are many while loops with break, continue, or throw statements.
  2. We could use StringBuilder instead of StringBuffer in this class. StringBuilder is faster in the case of single-threaded access.
  3. The function reads a single character at a time from the socket. It is more efficient to use a buffer and read several characters at once. Edit: Not a concern, that's what the BufferedReader does.

@HuangWHWHW You asked how you could count the number of retries in a unit test. Typically, you would insert a Mock or a Spy into your test method. Unfortunately, this does not work here because the socket variables is overwritten in case of a retry. So for this test, I would recommend creating a local ServerSocket and let the function connect to this socket. You can then control the failures from your test socket.

@HuangWHWHW
Copy link
Contributor Author

@mxm
Hi, thank you for suggestions.
I will try to follow your suggestions and improve the test.
I understand almost of yours and I also read the Class documentation of BufferedReader.read().
When I was doing the test I found the BufferedReader.read() would never stop until it read next char from socket server or throw a Exception when socket is closed.
Returning -1 in BufferedReader.read() seems to be only worked in text file instead socket message.
And I looked for help in the net that some guys said you might add a method(Socket.setSoTimeout()) so the BufferedReader.read() would stop.
But this way is not satisfied neither since it would throw a exception.

@mxm
Copy link
Contributor

mxm commented Aug 12, 2015

Actually point 3 is not so bad because we're using a buffered reader that fills the buffer and does not read a character from the socket on every call to read().

The read() method may throw an Exception or return -1. So we need to handle both of these cases. If closed properly, the socket should send the EOF event and the read() method returns -1.

@HuangWHWHW
Copy link
Contributor Author

Hi, there are two more questions:
1.In using StringBuilder, does it mean that we should use BufferedReader.readLine() instead of BufferedReader.read()?
2.Could you tell me how to make the BufferedReader.read() return -1? I tried many ways that all filed.

@mxm
Copy link
Contributor

mxm commented Aug 12, 2015

1.In using StringBuilder, does it mean that we should use BufferedReader.readLine() instead of BufferedReader.read()?

Reading by character is the way to go if we use a custom delimiter. If our delimiter was \n then it would be ok to read entire lines.

Could you tell me how to make the BufferedReader.read() return -1? I tried many ways that all filed.

Ok :) Here is a minimal working example where read() returns -1:

public static void main(String[] args) throws IOException {

    ServerSocket socket = new ServerSocket(12345);

    final SocketAddress socketAddress = socket.getLocalSocketAddress();

    new Thread(new Runnable() {
        @Override
        public void run() {
            Socket socket = new Socket();

            try {
                socket.connect(socketAddress);
            } catch (IOException e) {
                e.printStackTrace();
            }

            try {
                BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                System.out.println((bufferedReader.read()));
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }).start();

    Socket channel = socket.accept();

    channel.close();
}

Output:

-1

@HuangWHWHW
Copy link
Contributor Author

Thank you!
I`ll try again.

@HuangWHWHW
Copy link
Contributor Author

@mxm
Hi, I fixed the StringBuffer and add the test.
Take a look whether it`s correct.
Thank you!

@mxm
Copy link
Contributor

mxm commented Aug 13, 2015

Thanks for your changes. I think we should use read() instead of readLine() because we are using a custom delimiter and not necessarily "\n" (newline symbol). The danger of reading an entire line is that the newline symbol might never arrive. So it might continue to read forever. And even if it manages to find a newline symbol, you have to truncate your input to find the custom delimiter. That's not very efficient. Can you change the code back to using the read() method? I think we had a misunderstanding.

For you test case: It's not considered good practice to mix production and test code. You're doing that by introducing the isRetrying flag and exposing it. Alternatively, you have two options:

  1. Create a ServerSocket and pass its address to the SocketTextStreamFunction. Then control the connection to this socket and count how often the function reconnects (e.g. use the accept() method).
  2. Create your test in the same package as the SocketTextStreamFunction function (package is org.apache.flink.streaming.api.functions.source). Then you can access all field variables which are protected. So make your retries variable a protected field variable of the SocketTextStreamFunction class.

I hope that this helps you. If not, feel free to ask more questions.

private Socket socket;
private static final int CONNECTION_TIMEOUT_TIME = 0;
private static final int CONNECTION_RETRY_SLEEP = 1000;

private volatile boolean isExit = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this flag necessary? We have isRunning already.

@HuangWHWHW
Copy link
Contributor Author

Hi Max,
I fixed all as your reviews.
And I retained the change of StringBuffer to StringBuilder.
There is a question that as I see the StringBuilder just do the same thing as StringBuffer currently.
So what`s the real different the two type in the SocketTextStreamFunction?

@mxm
Copy link
Contributor

mxm commented Aug 13, 2015

StringBuilder is only for single-threaded while StringBuffer enables multi-thread access. If you use StringBuffer in a single-threaded scenario it has worse performance than StringBuilder.

Thanks for you changes. In addition to the "infinity" test, can you add a test that checks for a certain number of retries (e.g. 10)? Also please add a check for 1 and 0 retries. It's always good to test corner cases :)

@HuangWHWHW
Copy link
Contributor Author

Ok, I add two cases(retry 10 and 0) since I thought retry 1 time just same as 10.
And would you please take a look with another two tests(#991 and #977)?

@HuangWHWHW
Copy link
Contributor Author

@mxm
Otherwise, I found the SocketClientSink didn`t have the "retry".
Is it necessary to get a "retry"?

channel = serverSo.accept();
channel.close();
serverSo.close();
while(source.socketSource.retrys < 10);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're "busy waiting" here which consumes a lot of CPU. You could check in regular intervals and sleep in between.

@mxm
Copy link
Contributor

mxm commented Aug 25, 2015

Looks good to merge if we further adjust the waiting time of the tests.

@HuangWHWHW
Copy link
Contributor Author

Hi, I decreased both the waiting time and the retry times since it will still cost over 10 seconds if only the waiting time is decreased due to the "Thread.sleep(CONNECTION_RETRY_SLEEP);".

@@ -42,11 +42,13 @@
private boolean retryForever;
private Socket socket;
private static final int CONNECTION_TIMEOUT_TIME = 0;
private static final int CONNECTION_RETRY_SLEEP = 1000;
public static int CONNECTION_RETRY_SLEEP = 1000;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shouldn't be modifiable by everyone. Please make it just package-visible by removing the public modifier. Also, please keep the final modifier because the current implementation just lets the number of retries be configurable with a fixed 1 second retry rate. This is also documented in the user-facing API methods on DataStream.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But if I add the final, it will be a error in my test:
Cannot assign a value to final variable "CONNECTION_RETRY_SLEEP".

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok no problem. Then make the variable non-final but don't expose it. So just have it static int CONNECTION_RETRY_SLEEP = 1000.

@HuangWHWHW
Copy link
Contributor Author

@mxm
Hi,I chage the CONNECTION_RETRY_SLEEP to static final int CONNECTION_RETRY_SLEEP = 1000;
But I have no idea to straightly changing the CONNECTION_RETRY_SLEEP in my test using:
SocketTextStreamFunction.CONNECTION_RETRY_SLEEP = 200.
So, I add a reflection mechanism to resolve this.
And now the CONNECTION_RETRY_SLEEP changes to 200 in my test.
Would you please to take a look whether it is correct?

Field modifiersField = Field.class.getDeclaredField("modifiers");
modifiersField.setAccessible(true);
modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
field.set(null, 200);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's quite a hack. I think it is ok to remove the final modifier and make the field variable package-local: static int CONNECTION_RETRY_SLEEP = 1000. Then you can set it directly here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

...
Sorry for that.
I changed this in my new one. :-D

@mxm
Copy link
Contributor

mxm commented Aug 26, 2015

@HuangWHWHW Thanks for your changes. Adding reflection calls to the testing codes is not good practice and makes the code hard to maintain.

@HuangWHWHW
Copy link
Contributor Author

Hi,@mxm
any new comment?

@mxm
Copy link
Contributor

mxm commented Aug 27, 2015

While merging your pull request I noticed that the SocketTextStreamFunction actually does not wait the time specified in CONNCTION_RETRY_SLEEP but immediately tries to reconnect in case of an EOF. It only waits in case of a ConnectionError. I'm not sure whether this behavior is desired but this should also be reflected in your test cases. Could you add a test case where you first pass a Socket with an EOF and then let a ConnectionError occur? Thank you.

@HuangWHWHW
Copy link
Contributor Author

@mxm
Hi,
my test just do the same thing that first pass the Socket
when it calls SocketTextStreamFunction.open.
Then close the Socket that means send nothing(EOF) to the SocketTextStreamFunction.
And after this it will retry.
And first time the retry happens immediately since SocketTextStreamFunction receives the end of the sent message.
Then it will get retry second, third... times since I do not open the socket which means a ConnectionError occur.
So can you describe more in detail?
Thank you.

@HuangWHWHW
Copy link
Contributor Author

@mxm
Hi,
I add a test that first send a message to the SocketTextStreamFunction and this is success.
Then I close the server let the SocketTextStreamFunction retry.
Is it you need?

@HuangWHWHW
Copy link
Contributor Author

@mxm
BTW:Could you help me to take a look of this CI:#1030?
Since I still can not watch the CI details currently.
Thank you very much!

@HuangWHWHW
Copy link
Contributor Author

@mxm
Hi, max.
Any comment about my new changes?


private volatile boolean isRunning;

public SocketTextStreamFunction(String hostname, int port, char delimiter, long maxRetry) {
this.retries = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initialization to 0 is not needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi,
I removed this in my new commit.
Otherwise why doesn`t the CI run?
This happen in all of my new commits.

@StephanEwen
Copy link
Contributor

I am not sure why the CI is not retesting this.

Can you try to squash your commits into one commit and force-push this branch? This always triggers CI for me...

@mxm
Copy link
Contributor

mxm commented Sep 1, 2015

@HuangWHWHW Thank you for addressing my comments. Could you please squash your commits and force push to this branch again?

@HuangWHWHW
Copy link
Contributor Author

@mxm @StephanEwen
Hi, I just update one commit yesterday.
And I found that a few PRs got the same trouble yesterday.
Is there any issue in CI?

@mxm
Copy link
Contributor

mxm commented Sep 3, 2015

I think there is an issue with Travis at the moment. Could you force push to this branch again?

@HuangWHWHW
Copy link
Contributor Author

@mxm
Ok, I make the CI rerun.
Any new comment?

@HuangWHWHW
Copy link
Contributor Author

@mxm
@StephanEwen
Hi, very sorry for bothering.
I got the CI passed.
Is there any new comment or this can be merged?

final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
private final String host = "127.0.0.1";

SourceFunction.SourceContext<String> ctx = new SourceFunction.SourceContext<String>() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you replace this with a simple Mock using Mockito?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ctx is used to get the received msg from socket server.
So I override the toString() method in ctx.
I think it will be invalid if we use mockito to check the received msg is correct.
Or is it unnecessary to check the msg since this test is for the retry times?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, you need to capture the input to the collect method. How about using an ArgumentCaptor? http://mockito.googlecode.com/svn/tags/1.8.0/javadoc/org/mockito/ArgumentCaptor.html

@HuangWHWHW
Copy link
Contributor Author

@mxm
Hi, I added the ArgumentCaptor to the test and removed the unwanted code.

@mxm
Copy link
Contributor

mxm commented Sep 8, 2015

Nice :) Merging...

@asfgit asfgit closed this in 1800434 Sep 8, 2015
nikste pushed a commit to nikste/flink that referenced this pull request Sep 29, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
6 participants