New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-15416][network] add task manager netty client retry mechenism #11541
[FLINK-15416][network] add task manager netty client retry mechenism #11541
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit 3bc439b (Fri Mar 27 04:24:31 UTC 2020) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
3d6cba1
to
b5ef192
Compare
Would you please review this PR? |
Please next time do not start work before being assigned to the ticket. Especially that there was my unresolved question in the Jira ticket whether we want to have such change or not. In the end I think we can go in the direction of supporting retry mechanism in the network stack (including TCP connection loss in the middle of transfer), so we can just as well start with the reconnection retries as you are proposing. I will give more detailed look at your PR later. For starter, could you try to provide unit tests for the |
@pnowojski |
95069b3
to
8b0f177
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update. I've left couple of comments.
flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
Outdated
Show resolved
Hide resolved
...e/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
Outdated
Show resolved
Hide resolved
|
||
try { | ||
client = ((ConnectingChannel) old).waitForChannel(); | ||
} catch (IOException e) { | ||
LOG.info("Continue to waitForChannel until the original thread has completed or failed"); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you mean by this change/log message?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As multiple subpartition will share the same channel. Let's say subpartition a, b both needs the channel. A goes to wait first, B will still wait here. If exception happens, both of them will be notified. But we don't want the exception throw in B as A can still retry.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But shouldn't B
implement the same retry logic? Otherwise we are swallowing exceptions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current logic seems have some problems. When it exists exception while waiting, then the client is still null
which is replaced into the map via below clients.replace(connectionId, old, client)
. In next loop, it would enter the above connectChannelWithRetry
logic, that means the B
might also encounter the same process as previous A
to retry some times.
I guess the expectation we want is only let A
retry connections and B
is always waiting during A
retries until final success or failure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pnowojski
The logic is always to let the first requester to retry. For the following requesters, will always go to wait until the first requester get the client or finished retry. Otherwise, there will be more retry times than expected.
@zhijiangW
I am not sure whether my understanding is correct or now. Once the ConnectingChannel is put into clients concurrentMap by the first requester, the object is never null after that. Thus, other requesters will go to line 97. So the process is what you described " let A retry connections and B is always waiting during A retries until final success or failure." Are you aligned?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. I see, I missed the outer while (client == null)
loop.
But as it is now, isn't it possible that B
will loop indefinitely? Shouldn't it also follow the max retry count logic and if exceeded, throw an exception?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree. Apply the same retry logic for following channel request other than the first one.
...e/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
Outdated
Show resolved
Hide resolved
...e/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
Outdated
Show resolved
Hide resolved
@@ -130,6 +130,23 @@ public void run() { | |||
} | |||
} | |||
|
|||
@Test | |||
public void testNettyClientConnectRetry() throws Exception { | |||
NettyClient nettyClient = mock(NettyClient.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We stopped using mockito for tests (for various of reasons), could you try to provide a proper mock of NettyClient
class? Maybe if you used NettyTestUtil
you could just implement NettyClient#connect
method that just throws an exception on the first invocation, but succeeds (returns real/correct ChannelFuture
) on the second attempt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it will be better. But when I try to implement the TestingNettyClient, I found the SucceededChannelFuture only visible to package io.netty.channel. Thus, I need to create a new class of SucceededChannelFuture in Flink to return a success ChannelFuture. Do you think it is a right way to go?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I noticed that and probably implementing ChannelFuture
is not worth it, hence I suggested to use NettyTestUtil
. It looks like you could just init a server (NettyTestUtil#initServer()
) and NettyClient
from #initClient()
and you could use it directly, without a need to implement custom ChannelFuture
? You can check usages of NettyTestUtil
, like for example NettyClientServerSslTest#testClientUntrustedCertificate
.
I might be missing something, but it looks like it could work quite easily.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. I created a UnstableNettyClient to simulate the first n-1 connection failure and success in the last try.
6cdd6e4
to
3c0e3fb
Compare
Thanks for the suggestions. I updated the PR accordingly. My major concern is to add a TestingNettyClient. It is a little bit overhead to create a SucceededChannelFuture to simulate the success path. Do you think it is the right way to proceed or we can have a simpler way? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update, I've posted couple of responses.
@@ -130,6 +130,23 @@ public void run() { | |||
} | |||
} | |||
|
|||
@Test | |||
public void testNettyClientConnectRetry() throws Exception { | |||
NettyClient nettyClient = mock(NettyClient.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I noticed that and probably implementing ChannelFuture
is not worth it, hence I suggested to use NettyTestUtil
. It looks like you could just init a server (NettyTestUtil#initServer()
) and NettyClient
from #initClient()
and you could use it directly, without a need to implement custom ChannelFuture
? You can check usages of NettyTestUtil
, like for example NettyClientServerSslTest#testClientUntrustedCertificate
.
I might be missing something, but it looks like it could work quite easily.
|
||
try { | ||
client = ((ConnectingChannel) old).waitForChannel(); | ||
} catch (IOException e) { | ||
LOG.info("Continue to waitForChannel until the original thread has completed or failed"); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But shouldn't B
implement the same retry logic? Otherwise we are swallowing exceptions.
docs/_includes/generated/netty_shuffle_environment_configuration.html
Outdated
Show resolved
Hide resolved
docs/_includes/generated/netty_shuffle_environment_configuration.html
Outdated
Show resolved
Hide resolved
flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
Show resolved
Hide resolved
...e/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
Outdated
Show resolved
Hide resolved
...e/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
Outdated
Show resolved
Hide resolved
...e/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
Outdated
Show resolved
Hide resolved
9a02c82
to
eeee4cd
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zhijiangW Thanks for joining the discussion. Fixed two obvious issues as you identified. I also replied to your concerns about the retry logic.
flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
Show resolved
Hide resolved
...e/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
Outdated
Show resolved
Hide resolved
91c10df
to
baf2c55
Compare
Thanks for the udpate @HuangZhenQiu
The whole |
@pnowojski |
f64492a
to
5149176
Compare
I haven't thought about that :) Yep, it's an obvious simple solution. Test failure is unrelated and already fixed on master: FLINK-18239 |
I think the |
5149176
to
2ff2e89
Compare
@pnowojski |
Can you explain what was the deadlock scenario? I haven't got time to fully analyse your solution, but |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this PR @HuangZhenQiu, I also took a look at it and left some comments.
My major concern, however, is that Flink already has retry mechanism configured via taskmanager.network.request-backoff.xxx
.
I guess it didn't work for you because it handles higher level errors, when we the connection is already established, right?
Did you consider modifying it?
I think it would be great for both users and developers to have a single retry mechanism for both kind of errors (besides complexity, it's not clear how would they interact with each other_.
Besides that, there is a deadlock in Azure (I'm also able to reproduce it locally).
clients.replace(connectionId, connectingChannel, client); | ||
return client; | ||
} catch (IOException | ChannelException e) { | ||
LOG.error("Failed {} times to connect to {}", count, connectionId.getAddress(), e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it will print 0 times
for the 1st failure and 1 times
for the 2nd. Should count + 1
be used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. There is why I syncronized on connectionId in line 74. It makes only one thread can goes into the connection for particular connectionId, it either success or fail after several times of retries.
} catch (IOException | ChannelException e) { | ||
LOG.error("Failed {} times to connect to {}", count, connectionId.getAddress(), e); | ||
ConnectingChannel newConnectingChannel = new ConnectingChannel(connectionId, this); | ||
clients.replace(connectionId, connectingChannel, newConnectingChannel); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the exception is thrown by connect
and not waitForChannel
then we use a new channel.
But other threads are waiting on the old channel - which will never be completed, right?
I guess this is the reason of the deadlock I see in Azure and locally.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. It is the reason of deadlock before adding the synchronized on connectionId change.
@rkhachatryan Yes, you are right. The deadline is due to the UnstableNettyClient can't simulate the netty bootstrap to notify the exception to ConnectingChannel. Currently, the exception is thrown in connect function, so that we need to explicitly notify it. |
@pnowojski |
Yes, I'm running a version with
I referred to this failure of @flinkbot run azure |
4d30a9d
to
dcfd168
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I found the cause of deadlock, please see comments below.
Just a side note, I think we wouldn't have all these concurrency issues if we use ConcurrentHashMap
"idiomatically" from the beginning, something like this:
ConcurrentMap<ConnectionID, Future<NettyPartitionRequestClient>> clients = new ConcurrentHashMap<>();
return clients.computeIfAbsent(connectionId, unused -> {
int tried = 0;
while (true) {
try {
ConnectingChannel connectingChannel = new ConnectingChannel(connectionId, this);
nettyClient.connect(connectionId.getAddress()).addListener(connectingChannel);
return connectingChannel.waitForChannel();
} catch (IOException e) {
if (++tried > retryNumber) {
throw new CompletionException(e);
}
}
}
}).get();
@@ -60,49 +71,41 @@ NettyPartitionRequestClient createPartitionRequestClient(ConnectionID connection | |||
while (client == null) { | |||
entry = clients.get(connectionId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found it to be the cause of deadlock:
- entry is placed by A
- entry is read by B
- A gets the connection and replaces entry - but A already holds an old entry
- B acquires lock and waits for channel indefinitely
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, you are right. Usually the exception can be notified to the old connecting channel by bootstrap. The test case actually triggers the exception within the connect function, thus we need to notify the exception explicitly.
nettyClient.connect(connectionId.getAddress()).addListener(connectingChannel); | ||
|
||
client = connectingChannel.waitForChannel(); | ||
synchronized (connectionId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's not safe to synchronize on this because it's passed from the outside.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, you are right. Java doesn't allow to synchronize on a variable.
@rkhachatryan |
f231abd
to
3b7bc2e
Compare
@pnowojski @rkhachatryan |
@flinkbot run azure |
@flinkbot run travis |
3b7bc2e
to
a8e0437
Compare
@flinkbot run travis |
@flinkbot run azure |
} | ||
else { | ||
client = (NettyPartitionRequestClient) old; | ||
client = (NettyPartitionRequestClient) clients.computeIfAbsent(connectionId, unused -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I indeed meant to use Future
s as map values, because otherwise all operations on map (or it's part) are blocked for the duration of connection.
I implemented this in #12746 based on your changes. Please take a look.
What is the purpose of the change
Add retry logic for netty client creation in PartitionRequestClientFactory. It is useful to make flink runtime tolerant physical link issue in the switch.
Brief changelog
Verifying this change
This change is a trivial rework / code cleanup without any test coverage.
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (no)Documentation