Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure pending transport handlers are invoked for all channel failures #25150

Merged
merged 11 commits into from Jun 13, 2017

Conversation

Projects
None yet
4 participants
@s1monw
Copy link
Contributor

commented Jun 9, 2017

Today if a channel gets closed due to a disconnect we notify the response
handler that the connection is closed and the node is disconnected. Unfortunately
this is not a complete solution since it only works for published connections.
Connections that are unpublished ie. for discovery can indefinitely hang since we
never invoke their handers when we get a failure while a user is waiting for
the response. This change adds connection tracking to TcpTransport that ensures
we are notifying the corresponding connection if there is a failure on a channel.

Ensure pending transport handlers are invoked for all channel failures
Today if a channel gets closed due to a disconnect we notify the response
handler that the connection is closed and the node is disconnected. Unfortunately
this is not a complete solution since it only works for published connections.
Connections that are unpublished ie. for discovery can indefinitly hang since we
never invoke their handers when we get a failure while a user is waiting for
the response. This change adds connection tracking to TcpTransport that ensures
we are notifying the corresponding connection if there is a failure on a channel.
@s1monw

This comment has been minimized.

Copy link
Contributor Author

commented Jun 9, 2017

@elasticmachine test this please

@tbrooks8
Copy link
Contributor

left a comment

I made some comments.

}

This comment has been minimized.

Copy link
@tbrooks8

tbrooks8 Jun 9, 2017

Contributor

Extra line?

@@ -345,7 +345,7 @@ protected NodeChannels connectToChannels(DiscoveryNode node, ConnectionProfile p
throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", future.cause());
}
channels[i] = future.channel();
channels[i].closeFuture().addListener(new ChannelCloseListener(node));
channels[i].closeFuture().addListener(new ChannelCloseListener());

This comment has been minimized.

Copy link
@tbrooks8

tbrooks8 Jun 9, 2017

Contributor

As you are no longer passing any references here, you could just use a single ChannelCloseListener instance.

This comment has been minimized.

Copy link
@s1monw

s1monw Jun 10, 2017

Author Contributor

++

break;
// Removing it here from open connections is paranoia since the close handler should deal with this.
openConnections.remove(entry.getValue());
return;

This comment has been minimized.

Copy link
@tbrooks8

tbrooks8 Jun 9, 2017

Contributor

Are you sure you want this? Won't this cause any exception thrown by closeChannels to be lost?

This comment has been minimized.

Copy link
@tbrooks8

tbrooks8 Jun 9, 2017

Contributor

And there is still a catch block there to log that exception, so I assume we still want it?

This comment has been minimized.

Copy link
@tbrooks8

tbrooks8 Jun 9, 2017

Contributor

Also there is like double try blocks that I'm not sure need to be nested.

This comment has been minimized.

Copy link
@s1monw

s1monw Jun 10, 2017

Author Contributor

I simplified this and added a comment. the double try is likely a leftover..

for (NodeChannels channels : openConnections) {
if (channels.hasChannel(channel)) {
IOUtils.closeWhileHandlingException(channels);
return;

This comment has been minimized.

Copy link
@tbrooks8

tbrooks8 Jun 9, 2017

Contributor

Same comment

IOUtils.closeWhileHandlingException(nodeChannels);
}
IOUtils.closeWhileHandlingException(() -> IOUtils.closeWhileHandlingException(connectedNodes.values()),
() -> IOUtils.closeWhileHandlingException(openConnections), openConnections::clear, connectedNodes::clear);

This comment has been minimized.

Copy link
@tbrooks8

tbrooks8 Jun 9, 2017

Contributor

It is reasonably challenging to figure out what is going on here. Maybe like a comment about the order of operations or something here?

I think it's closing nodes. Followed by closing any "light" channels. And then clearing the maps?

This comment has been minimized.

Copy link
@s1monw

s1monw Jun 10, 2017

Author Contributor

I simplified it in a new commit

if (nodeChannels != null && nodeChannels.hasChannel(future.channel())) {
threadPool.generic().execute(() -> disconnectFromNode(node, future.channel(), "channel closed event"));
try {
onChannelClosed(future.channel());

This comment has been minimized.

Copy link
@tbrooks8

tbrooks8 Jun 9, 2017

Contributor

I know there are are a bunch of checks at various points so that we can call this close methods multiple times safely. But I just want to point out that in this case:

  1. We call onChannelClosed here
  2. In the finally block below we call disconnectFromNodeChannel which dispatches a Runnable to a generic thread pool.
  3. At the end of disconnectFromNodeChannel we call onChannelClosed again with the same channel.

That is all probably fine. But this all seems to be pretty complicated to me right now. Maybe we can find some ways to simplify in the future?

This comment has been minimized.

Copy link
@s1monw

s1monw Jun 10, 2017

Author Contributor

you are right, I simplified this by simply calling disconnectFromNodeChannel from onChannelClosed only. It's much simpler now but I think we can improve it further down the road.

@tbrooks8

This comment has been minimized.

Copy link
Contributor

commented Jun 9, 2017

I do want to clarify, for my understanding, the core of this PR is here correct?

for (Map.Entry<DiscoveryNode, NodeChannels> entry : connectedNodes.entrySet()) {
                        if (disconnectFromNode(entry.getKey(), channel, reason)) {
                            // if we managed to find this channel and disconnect from it, then break, no need to check on
                            // the rest of the nodes
                            // Removing it here from open connections is paranoia since the close handler should deal with this.
                            openConnections.remove(entry.getValue());
                            return;
                        }
                    }
                    // now if we haven't found the right connection in the connected nodes we have to go through the open connections
                    // it might be that the channel belongs to a connection that is not published
                    for (NodeChannels channels : openConnections) {
                        if (channels.hasChannel(channel)) {
                            IOUtils.closeWhileHandlingException(channels);
                            return;
                        }
                    }

This is where you're checking, if this is not a fully connected node, check ALL the connections and close if it is found?

@s1monw

This comment has been minimized.

Copy link
Contributor Author

commented Jun 10, 2017

This is where you're checking, if this is not a fully connected node, check ALL the connections and close if it is found?

correct!

@s1monw

This comment has been minimized.

Copy link
Contributor Author

commented Jun 10, 2017

great feedback @tbrooks8 I pushed a new commit to address all your comments.

@bleskes
Copy link
Member

left a comment

This is a great catch and the solution is nice too. I left a few small questions/comments on concurrency aspects.

threadPool.generic().execute(() -> {
try {
try {
if (isOpen(channel)) {

This comment has been minimized.

Copy link
@bleskes

bleskes Jun 12, 2017

Member

question - why did you add this? it seems all channels has double closing protection?

This comment has been minimized.

Copy link
@s1monw

s1monw Jun 12, 2017

Author Contributor

I left a comment in the code for this

// if we managed to find this channel and disconnect from it, then break, no need to check on
// the rest of the nodes
// Removing it here from open connections is paranoia since the close handler should deal with this.
openConnections.remove(entry.getValue());

This comment has been minimized.

Copy link
@bleskes

bleskes Jun 12, 2017

Member

given the comment, should we make this an assertion?

it.remove();
IOUtils.closeWhileHandlingException(nodeChannels);
}
// we are holding a write lock so nobody modifies the connectedNodes / openConnections map - it's safe to first close

This comment has been minimized.

Copy link
@bleskes
}
// now if we haven't found the right connection in the connected nodes we have to go through the open connections
// it might be that the channel belongs to a connection that is not published
for (NodeChannels channels : openConnections) {

This comment has been minimized.

Copy link
@bleskes

bleskes Jun 12, 2017

Member

I think we need some kind of locking coordination here with the opening connections line:

                nodeChannels = connectToChannels(node, connectionProfile); <-- from here we might get exceptions
                final Channel channel = nodeChannels.getChannels().get(0); // one channel is guaranteed by the connection profile
                final TimeValue connectTimeout = connectionProfile.getConnectTimeout() == null ?
                    defaultConnectionProfile.getConnectTimeout() :
                    connectionProfile.getConnectTimeout();
                final TimeValue handshakeTimeout = connectionProfile.getHandshakeTimeout() == null ?
                    connectTimeout : connectionProfile.getHandshakeTimeout();
                final Version version = executeHandshake(node, channel, handshakeTimeout);
                nodeChannels = new NodeChannels(nodeChannels, version); // clone the channels - we now have the correct version
                openConnections.add(nodeChannels); <-- published 
                transportServiceAdapter.onConnectionOpened(nodeChannels);

I guess the easiest would be to acquire the write lock as this is a very rare situation and only relevant to temporary connections. Alternatively we would could introduce a dedicated lock.

This comment has been minimized.

Copy link
@s1monw

s1monw Jun 12, 2017

Author Contributor

why do we need this, this is only relevant for fully established connections.

This comment has been minimized.

Copy link
@bleskes

bleskes Jun 12, 2017

Member

@s1monw and I discussed this and apparently transportServiceAdapter spawns threads and any point of synchronization is moot and only a source of trouble (blocking on netty is super tricky)

}
} finally {
disconnectFromNodeChannel(channel, "channel closed");

This comment has been minimized.

Copy link
@bleskes

bleskes Jun 12, 2017

Member

question - why did you add this? it currently has a nasty side effect of spawning extra "disconnect" generic threads when we are already disconnecting. Say we have an issue on one channel, we find the relevant NodeChannels and close all other channels in it. Those will end up here, spawning another disconnectFromNodeChannel call, which will be a no-op (as we are cleaning it up), but we're creating all kind of concurrency issues. I'm wondering why we need this.

This comment has been minimized.

Copy link
@s1monw

s1monw Jun 12, 2017

Author Contributor

you need this since otherwise we might not notify a the listeners waiting for an response on the connection. We might be able to reduce it's footprint more. I can look but it's already improved and preexsting hence why I don't wanted to touch it more. The call was hidden by other calls that's why you don't see it

@@ -374,24 +375,6 @@ protected NodeChannels connectToChannels(DiscoveryNode node, ConnectionProfile p
return nodeChannels;
}

private class ChannelCloseListener implements ChannelFutureListener {

This comment has been minimized.

Copy link
@bleskes

bleskes Jun 12, 2017

Member

++ to push this up to TcpTransport

@@ -190,6 +194,13 @@ public void assertNoPendingHandshakes(Transport transport) {
}
}

public void assertPendingConnections(int numConnections, Transport transport) {

This comment has been minimized.

Copy link
@bleskes

bleskes Jun 12, 2017

Member

❤️

@@ -1852,7 +1861,11 @@ public String executor() {
}
logger.debug("DONE");
serviceC.close();

// when we close C here we have to disconnect the service otherwise assertions mit trip with pending connections in tearDown

This comment has been minimized.

Copy link
@bleskes

bleskes Jun 12, 2017

Member

type: mit -> might

serviceC.close();
assertPendingConnections(0, serviceC.getOriginalTransport());
sendResponseLatch.countDown();
responseLatch.await();

This comment has been minimized.

Copy link
@bleskes

bleskes Jun 12, 2017

Member

I would expect the request to be cancelled as the connection closed on it mid flight. What is the role of sendResponseLatch? serviceC is close now?

This comment has been minimized.

Copy link
@s1monw

s1monw Jun 12, 2017

Author Contributor

if I don't keep this latch the test is useless since we are not stopping in between. I can remove the countDown here and that would still work but it feels wrong. Let it deal with what happens if you try to respond to a closed channel?

This comment has been minimized.

Copy link
@s1monw

s1monw Jun 12, 2017

Author Contributor

it's also relevant for some implemenations that we let things go threadwise if there is only one event thread. they will block.

This comment has been minimized.

Copy link
@bleskes

bleskes Jun 12, 2017

Member

We talked this one too - the extra sendResponseLatch.countDown(); is needed to release a netty thread so that netty won't be annoyed. We agreed to add a comment.

s1monw added some commits Jun 12, 2017

@clintongormley clintongormley added v5.6.0 and removed v5.5.0 labels Jun 12, 2017

@tbrooks8
Copy link
Contributor

left a comment

LGTM

@s1monw

This comment has been minimized.

Copy link
Contributor Author

commented Jun 12, 2017

@elasticmachine test this please

@s1monw s1monw merged commit 186c16e into elastic:master Jun 13, 2017

2 checks passed

CLA Commit author is a member of Elasticsearch
Details
elasticsearch-ci Build finished.
Details

@s1monw s1monw deleted the s1monw:fix_notification_on_close branch Jun 13, 2017

s1monw added a commit that referenced this pull request Jun 13, 2017

Ensure pending transport handlers are invoked for all channel failures (
#25150)

Today if a channel gets closed due to a disconnect we notify the response
handler that the connection is closed and the node is disconnected. Unfortunately
this is not a complete solution since it only works for published connections.
Connections that are unpublished ie. for discovery can indefinitely hang since we
never invoke their handers when we get a failure while a user is waiting for
the response. This change adds connection tracking to TcpTransport that ensures
we are notifying the corresponding connection if there is a failure on a channel.

s1monw added a commit that referenced this pull request Jun 13, 2017

Ensure pending transport handlers are invoked for all channel failures (
#25150)

Today if a channel gets closed due to a disconnect we notify the response
handler that the connection is closed and the node is disconnected. Unfortunately
this is not a complete solution since it only works for published connections.
Connections that are unpublished ie. for discovery can indefinitely hang since we
never invoke their handers when we get a failure while a user is waiting for
the response. This change adds connection tracking to TcpTransport that ensures
we are notifying the corresponding connection if there is a failure on a channel.

s1monw added a commit that referenced this pull request Jun 13, 2017

Ensure pending transport handlers are invoked for all channel failures (
#25150)

Today if a channel gets closed due to a disconnect we notify the response
handler that the connection is closed and the node is disconnected. Unfortunately
this is not a complete solution since it only works for published connections.
Connections that are unpublished ie. for discovery can indefinitely hang since we
never invoke their handers when we get a failure while a user is waiting for
the response. This change adds connection tracking to TcpTransport that ensures
we are notifying the corresponding connection if there is a failure on a channel.

@clintongormley clintongormley added v6.0.0-beta1 and removed v6.0.0 labels Jul 25, 2017

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.