Skip to content

Commit

Permalink
Fix connect concurrency, can cause connection nodes to close
Browse files Browse the repository at this point in the history
Looking at the connect code, if 2 threads at the same time try and connect to a node, and both enter sequentially the connectLock code block, the second one would try and put the connection in the map, and close the replaced channels, which will cause the existing connection to close as well (since it removes the node from the connectedNodes map)
To fix this, simply make sure we properly check the existence of the connection within the connectionLock block, so there won't be concurrent connections going on.
While doing this, also went over all the mutation code that handles disconnections, and made sure they are properly done only within a connection lock.
closes #6964
  • Loading branch information
kimchy committed Jul 22, 2014
1 parent 72b3d6e commit 88f3afe
Showing 1 changed file with 38 additions and 59 deletions.
97 changes: 38 additions & 59 deletions src/main/java/org/elasticsearch/transport/netty/NettyTransport.java
Expand Up @@ -21,10 +21,7 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.Version;
import org.elasticsearch.*;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
Expand Down Expand Up @@ -598,48 +595,38 @@ public void connectToNode(DiscoveryNode node, boolean light) {
}
globalLock.readLock().lock();
try {
if (!lifecycle.started()) {
throw new ElasticsearchIllegalStateException("can't add nodes to a stopped transport");
}
NodeChannels nodeChannels = connectedNodes.get(node);
if (nodeChannels != null) {
return;
}
connectionLock.acquire(node.id());
try {
if (!lifecycle.started()) {
throw new ElasticsearchIllegalStateException("can't add nodes to a stopped transport");
}
NodeChannels nodeChannels = connectedNodes.get(node);
if (nodeChannels != null) {
return;
}
try {


if (light) {
nodeChannels = connectToChannelsLight(node);
} else {
nodeChannels = new NodeChannels(new Channel[connectionsPerNodeRecovery], new Channel[connectionsPerNodeBulk], new Channel[connectionsPerNodeReg], new Channel[connectionsPerNodeState], new Channel[connectionsPerNodePing]);
try {
connectToChannels(nodeChannels, node);
} catch (Exception e) {
} catch (Throwable e) {
logger.trace("failed to connect to [{}], cleaning dangling connections", e, node);
nodeChannels.close();
throw e;
}
}

NodeChannels existing = connectedNodes.putIfAbsent(node, nodeChannels);
if (existing != null) {
// we are already connected to a node, close this ones
nodeChannels.close();
} else {
if (logger.isDebugEnabled()) {
logger.debug("connected to node [{}]", node);
}
transportServiceAdapter.raiseNodeConnected(node);
// we acquire a connection lock, so no way there is an existing connection
connectedNodes.put(node, nodeChannels);
if (logger.isDebugEnabled()) {
logger.debug("connected to node [{}]", node);
}

transportServiceAdapter.raiseNodeConnected(node);
} catch (ConnectTransportException e) {
throw e;
} catch (Exception e) {
throw new ConnectTransportException(node, "General node connection failure", e);
throw new ConnectTransportException(node, "general node connection failure", e);
}
} finally {
connectionLock.release(node.id());
Expand Down Expand Up @@ -759,70 +746,62 @@ private void connectToChannels(NodeChannels nodeChannels, DiscoveryNode node) {

@Override
public void disconnectFromNode(DiscoveryNode node) {
NodeChannels nodeChannels = connectedNodes.remove(node);
if (nodeChannels != null) {
connectionLock.acquire(node.id());
try {
connectionLock.acquire(node.id());
try {
NodeChannels nodeChannels = connectedNodes.remove(node);
if (nodeChannels != null) {
try {
logger.debug("disconnecting from [{}] due to explicit disconnect call", node);
nodeChannels.close();
} finally {
logger.debug("disconnected from [{}]", node);
logger.trace("disconnected from [{}] due to explicit disconnect call", node);
transportServiceAdapter.raiseNodeDisconnected(node);
}
} finally {
connectionLock.release(node.id());
}
} finally {
connectionLock.release(node.id());
}
}

/**
* Disconnects from a node, only if the relevant channel is found to be part of the node channels.
*/
private void disconnectFromNode(DiscoveryNode node, Channel channel, String reason) {
private boolean disconnectFromNode(DiscoveryNode node, Channel channel, String reason) {
// this might be called multiple times from all the node channels, so do a lightweight
// check outside of the lock
NodeChannels nodeChannels = connectedNodes.get(node);
if (nodeChannels != null && nodeChannels.hasChannel(channel)) {
connectionLock.acquire(node.id());
if (!nodeChannels.hasChannel(channel)) { //might have been removed in the meanwhile, safety check
assert !connectedNodes.containsKey(node);
} else {
try {
try {
nodeChannels = connectedNodes.get(node);
// check again within the connection lock, if its still applicable to remove it
if (nodeChannels != null && nodeChannels.hasChannel(channel)) {
connectedNodes.remove(node);
try {
logger.debug("disconnecting from [{}], {}", node, reason);
nodeChannels.close();
} finally {
logger.debug("disconnected from [{}], {}", node, reason);
logger.trace("disconnected from [{}], {}", node, reason);
transportServiceAdapter.raiseNodeDisconnected(node);
}
} finally {
connectionLock.release(node.id());
return true;
}
} finally {
connectionLock.release(node.id());
}
}
return false;
}

/**
* Disconnects from a node if a channel is found as part of that nodes channels.
*/
private void disconnectFromNodeChannel(Channel channel, Throwable failure) {
for (DiscoveryNode node : connectedNodes.keySet()) {
NodeChannels nodeChannels = connectedNodes.get(node);
if (nodeChannels != null && nodeChannels.hasChannel(channel)) {
connectionLock.acquire(node.id());
if (!nodeChannels.hasChannel(channel)) { //might have been removed in the meanwhile, safety check
assert !connectedNodes.containsKey(node);
} else {
try {
connectedNodes.remove(node);
try {
nodeChannels.close();
} finally {
logger.debug("disconnected from [{}] on channel failure", failure, node);
transportServiceAdapter.raiseNodeDisconnected(node);
}
} finally {
connectionLock.release(node.id());
}
}
if (disconnectFromNode(node, channel, ExceptionsHelper.detailedMessage(failure))) {
// if we managed to find this channel and disconnect from it, then break, no need to check on
// the rest of the nodes
break;
}
}
}
Expand Down

0 comments on commit 88f3afe

Please sign in to comment.