Skip to content

Commit

Permalink
fix potential blocking of NettyTransport connect and disconnect methods
Browse files Browse the repository at this point in the history
Currently, in NettyTransport the locks for connecting and disconnecting channels are stored in a ConcurrentMap that has 500 entries. A tread can acquire a lock for an id and the lock returned is chosen on a hash function computed from the id.
Unfortunately, a collision of two ids can cause a deadlock as follows:

Scenario: one master (no data), one datanode (only data node)

DiscoveryNode id of master is X
DiscoveryNode id of datanode is Y

Both X and Y cause the same lock to be returned by NettyTransport#connectLock()

Both are up and running, all is fine until master stops.

Thread 1: The master fault detection of the datanode is notified (onNodeDisconnected()), which in turn leads the node to try and reconnect to master via the callstack titled "Thread 1" below.

-> connectToNode() is called and lock for X is acquired. The method waits for 45s for the cannels to reconnect.

Furthermore, Thread 1 holds the NettyTransport#masterNodeMutex.

Thread 2: The connection fails with an exception (connection refused, see callstack below), because the master shut down already. The exception is handled in NettyTransport#exceptionCaught which calls NettyTransport#disconnectFromNodeChannel. This method acquires the lock for Y (see Thread 2 below).

Now, if Y and X have two different locks, this would get the lock, disconnect the channels and notify thread 1. But since X and Y have the same locks, thread 2 is deadlocked with thread 1 which waits for 45s.

In this time, no thread can acquire the masterNodeMutex (held by thread 1), so the node can, for example, not stop.

This commit introduces a mechanism that assures unique locks for unique ids. This lock is not reentrant and therfore assures that threads can not end up in an infinite recursion (see Thread 3 below for an example on how a thread can aquire a lock twice).
While this is not a problem right now, it is potentially dangerous to have it that way, because the callstacks are complex as is and slight changes might cause unecpected recursions.

Thread 1
----

	owns: Object  (id=114)
	owns: Object  (id=118)
	waiting for: DefaultChannelFuture  (id=140)
	Object.wait(long) line: not available [native method]
	DefaultChannelFuture(Object).wait(long, int) line: 461
	DefaultChannelFuture.await0(long, boolean) line: 311
	DefaultChannelFuture.awaitUninterruptibly(long) line: 285
	NettyTransport.connectToChannels(NettyTransport$NodeChannels, DiscoveryNode) line: 672
	NettyTransport.connectToNode(DiscoveryNode, boolean) line: 609
	NettyTransport.connectToNode(DiscoveryNode) line: 579
	TransportService.connectToNode(DiscoveryNode) line: 129
	MasterFaultDetection.handleTransportDisconnect(DiscoveryNode) line: 195
	MasterFaultDetection.access$0(MasterFaultDetection, DiscoveryNode) line: 188
	MasterFaultDetection$FDConnectionListener.onNodeDisconnected(DiscoveryNode) line: 245
	TransportService$Adapter$2.run() line: 298
	EsThreadPoolExecutor(ThreadPoolExecutor).runWorker(ThreadPoolExecutor$Worker) line: 1145
	ThreadPoolExecutor$Worker.run() line: 615
	Thread.run() line: 724

Thread 2
-------

	waiting for: Object  (id=114)
	NettyTransport.disconnectFromNodeChannel(Channel, Throwable) line: 790
	NettyTransport.exceptionCaught(ChannelHandlerContext, ExceptionEvent) line: 495
	MessageChannelHandler.exceptionCaught(ChannelHandlerContext, ExceptionEvent) line: 228
	MessageChannelHandler(SimpleChannelUpstreamHandler).handleUpstream(ChannelHandlerContext, ChannelEvent) line: 112
	DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline$DefaultChannelHandlerContext, ChannelEvent) line: 564
	DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(ChannelEvent) line: 791
	SizeHeaderFrameDecoder(FrameDecoder).exceptionCaught(ChannelHandlerContext, ExceptionEvent) line: 377
	SizeHeaderFrameDecoder(SimpleChannelUpstreamHandler).handleUpstream(ChannelHandlerContext, ChannelEvent) line: 112
	DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline$DefaultChannelHandlerContext, ChannelEvent) line: 564
	DefaultChannelPipeline.sendUpstream(ChannelEvent) line: 559
	Channels.fireExceptionCaught(Channel, Throwable) line: 525
	NioClientBoss.processSelectedKeys(Set<SelectionKey>) line: 110
	NioClientBoss.process(Selector) line: 79
	NioClientBoss(AbstractNioSelector).run() line: 312
	NioClientBoss.run() line: 42
	ThreadRenamingRunnable.run() line: 108
	DeadLockProofWorker$1.run() line: 42
	ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) line: 1145
	ThreadPoolExecutor$Worker.run() line: 615
	Thread.run() line: 724

Thread 3
---------

        org.elasticsearch.transport.netty.NettyTransport.disconnectFromNode(NettyTransport.java:772)
        org.elasticsearch.transport.netty.NettyTransport.access$1200(NettyTransport.java:92)
        org.elasticsearch.transport.netty.NettyTransport$ChannelCloseListener.operationComplete(NettyTransport.java:830)
        org.jboss.netty.channel.DefaultChannelFuture.notifyListener(DefaultChannelFuture.java:427)
        org.jboss.netty.channel.DefaultChannelFuture.notifyListeners(DefaultChannelFuture.java:418)
        org.jboss.netty.channel.DefaultChannelFuture.setSuccess(DefaultChannelFuture.java:362)
        org.jboss.netty.channel.AbstractChannel$ChannelCloseFuture.setClosed(AbstractChannel.java:355)
        org.jboss.netty.channel.AbstractChannel.setClosed(AbstractChannel.java:185)
        org.jboss.netty.channel.socket.nio.AbstractNioChannel.setClosed(AbstractNioChannel.java:197)
        org.jboss.netty.channel.socket.nio.NioSocketChannel.setClosed(NioSocketChannel.java:84)
        org.jboss.netty.channel.socket.nio.AbstractNioWorker.close(AbstractNioWorker.java:357)
        org.jboss.netty.channel.socket.nio.NioClientSocketPipelineSink.eventSunk(NioClientSocketPipelineSink.java:58)
        org.jboss.netty.channel.Channels.close(Channels.java:812)
        org.jboss.netty.channel.AbstractChannel.close(AbstractChannel.java:197)
        org.elasticsearch.transport.netty.NettyTransport$NodeChannels.closeChannelsAndWait(NettyTransport.java:892)
        org.elasticsearch.transport.netty.NettyTransport$NodeChannels.close(NettyTransport.java:879)
        org.elasticsearch.transport.netty.NettyTransport.disconnectFromNode(NettyTransport.java:778)
        org.elasticsearch.transport.netty.NettyTransport.access$1200(NettyTransport.java:92)
        org.elasticsearch.transport.netty.NettyTransport$ChannelCloseListener.operationComplete(NettyTransport.java:830)
        org.jboss.netty.channel.DefaultChannelFuture.notifyListener(DefaultChannelFuture.java:427)
        org.jboss.netty.channel.DefaultChannelFuture.notifyListeners(DefaultChannelFuture.java:418)
        org.jboss.netty.channel.DefaultChannelFuture.setSuccess(DefaultChannelFuture.java:362)
        org.jboss.netty.channel.AbstractChannel$ChannelCloseFuture.setClosed(AbstractChannel.java:355)
        org.jboss.netty.channel.AbstractChannel.setClosed(AbstractChannel.java:185)
        org.jboss.netty.channel.socket.nio.AbstractNioChannel.setClosed(AbstractNioChannel.java:197)
        org.jboss.netty.channel.socket.nio.NioSocketChannel.setClosed(NioSocketChannel.java:84)
        org.jboss.netty.channel.socket.nio.AbstractNioWorker.close(AbstractNioWorker.java:357)
        org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:93)
        org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:109)
        org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:312)
        org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:90)
        org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        java.lang.Thread.run(Thread.java:724)
  • Loading branch information
s1monw authored and brwe committed Sep 6, 2013
1 parent 4155741 commit 8203d4d
Show file tree
Hide file tree
Showing 3 changed files with 289 additions and 43 deletions.
@@ -0,0 +1,96 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.common.util.concurrent;

import org.elasticsearch.ElasticSearchIllegalStateException;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;

/**
* This class manages locks. Locks can be accessed with an identifier and are
* created the first time they are acquired and removed if no thread hold the
* lock. The latter is important to assure that the list of locks does not grow
* infinitely.
*
* A Thread can acquire a lock only once.
*
* */

public class KeyedLock<T> {

private final ConcurrentMap<T, KeyLock> map = new ConcurrentHashMap<T, KeyLock>();

private final ThreadLocal<KeyLock> threadLocal = new ThreadLocal<KeyedLock.KeyLock>();

public void acquire(T key) {
while (true) {
if (threadLocal.get() != null) {
// if we are here, the thread already has the lock
throw new ElasticSearchIllegalStateException("Lock already accquired in Thread" + Thread.currentThread().getId()
+ " for key " + key);
}
KeyLock perNodeLock = map.get(key);
if (perNodeLock == null) {
KeyLock newLock = new KeyLock();
perNodeLock = map.putIfAbsent(key, newLock);
if (perNodeLock == null) {
newLock.lock();
threadLocal.set(newLock);
return;
}
}
assert perNodeLock != null;
int i = perNodeLock.count.get();
if (i > 0 && perNodeLock.count.compareAndSet(i, i + 1)) {
perNodeLock.lock();
threadLocal.set(perNodeLock);
return;
}
}
}

public void release(T key) {
KeyLock lock = threadLocal.get();
if (lock == null) {
throw new ElasticSearchIllegalStateException("Lock not accquired");
}
assert lock.isHeldByCurrentThread();
assert lock == map.get(key);
lock.unlock();
threadLocal.set(null);
int decrementAndGet = lock.count.decrementAndGet();
if (decrementAndGet == 0) {
map.remove(key, lock);
}
}

@SuppressWarnings("serial")
private final static class KeyLock extends ReentrantLock {
private final AtomicInteger count = new AtomicInteger(1);
}

public boolean hasLockedKeys() {
return !map.isEmpty();
}

}
95 changes: 52 additions & 43 deletions src/main/java/org/elasticsearch/transport/netty/NettyTransport.java
Expand Up @@ -44,6 +44,7 @@
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.KeyedLock;
import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
Expand Down Expand Up @@ -151,7 +152,8 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem

private volatile BoundTransportAddress boundAddress;

private final Object[] connectMutex;
private final KeyedLock<String> connectionLock = new KeyedLock<String >();

// this lock is here to make sure we close this transport and disconnect all the client nodes
// connections while no connect operations is going on... (this might help with 100% CPU when stopping the transport?)
private final ReadWriteLock globalLock = new ReentrantReadWriteLock();
Expand All @@ -167,11 +169,6 @@ public NettyTransport(Settings settings, ThreadPool threadPool, NetworkService n
System.setProperty("org.jboss.netty.epollBugWorkaround", "true");
}

this.connectMutex = new Object[500];
for (int i = 0; i < connectMutex.length; i++) {
connectMutex[i] = new Object();
}

this.workerCount = componentSettings.getAsInt("worker_count", EsExecutors.boundedNumberOfProcessors() * 2);
this.bossCount = componentSettings.getAsInt("boss_count", 1);
this.blockingServer = settings.getAsBoolean("transport.tcp.blocking_server", settings.getAsBoolean(TCP_BLOCKING_SERVER, settings.getAsBoolean(TCP_BLOCKING, false)));
Expand Down Expand Up @@ -591,15 +588,17 @@ public void connectToNode(DiscoveryNode node, boolean light) {
if (!lifecycle.started()) {
throw new ElasticSearchIllegalStateException("can't add nodes to a stopped transport");
}
synchronized (connectLock(node.id())) {
NodeChannels nodeChannels = connectedNodes.get(node);
if (nodeChannels != null) {
return;
}
connectionLock.acquire(node.id());
try {
if (!lifecycle.started()) {
throw new ElasticSearchIllegalStateException("can't add nodes to a stopped transport");
}
try {
NodeChannels nodeChannels = connectedNodes.get(node);
if (nodeChannels != null) {
return;
}


if (light) {
nodeChannels = connectToChannelsLight(node);
Expand Down Expand Up @@ -629,6 +628,8 @@ public void connectToNode(DiscoveryNode node, boolean light) {
} catch (Exception e) {
throw new ConnectTransportException(node, "General node connection failure", e);
}
} finally {
connectionLock.release(node.id());
}
} finally {
globalLock.readLock().unlock();
Expand Down Expand Up @@ -750,15 +751,18 @@ private void connectToChannels(NodeChannels nodeChannels, DiscoveryNode node) {

@Override
public void disconnectFromNode(DiscoveryNode node) {
synchronized (connectLock(node.id())) {
NodeChannels nodeChannels = connectedNodes.remove(node);
if (nodeChannels != null) {
try {
nodeChannels.close();
} finally {
logger.debug("disconnected from [{}]", node);
transportServiceAdapter.raiseNodeDisconnected(node);
}
NodeChannels nodeChannels = connectedNodes.remove(node);
if (nodeChannels != null) {
connectionLock.acquire(node.id());
try {
try {
nodeChannels.close();
} finally {
logger.debug("disconnected from [{}]", node);
transportServiceAdapter.raiseNodeDisconnected(node);
}
} finally {
connectionLock.release(node.id());
}
}
}
Expand All @@ -767,15 +771,22 @@ public void disconnectFromNode(DiscoveryNode node) {
* Disconnects from a node, only if the relevant channel is found to be part of the node channels.
*/
private void disconnectFromNode(DiscoveryNode node, Channel channel, String reason) {
synchronized (connectLock(node.id())) {
NodeChannels nodeChannels = connectedNodes.get(node);
if (nodeChannels != null && nodeChannels.hasChannel(channel)) {
connectedNodes.remove(node);
NodeChannels nodeChannels = connectedNodes.get(node);
if (nodeChannels != null && nodeChannels.hasChannel(channel)) {
connectionLock.acquire(node.id());
if (!nodeChannels.hasChannel(channel)){ //might have been removed in the meanwhile, safety check
assert !connectedNodes.containsKey(node);
} else {
try {
nodeChannels.close();
connectedNodes.remove(node);
try {
nodeChannels.close();
} finally {
logger.debug("disconnected from [{}], {}", node, reason);
transportServiceAdapter.raiseNodeDisconnected(node);
}
} finally {
logger.debug("disconnected from [{}], {}", node, reason);
transportServiceAdapter.raiseNodeDisconnected(node);
connectionLock.release(node.id());
}
}
}
Expand All @@ -786,15 +797,22 @@ private void disconnectFromNode(DiscoveryNode node, Channel channel, String reas
*/
private void disconnectFromNodeChannel(Channel channel, Throwable failure) {
for (DiscoveryNode node : connectedNodes.keySet()) {
synchronized (connectLock(node.id())) {
NodeChannels nodeChannels = connectedNodes.get(node);
if (nodeChannels != null && nodeChannels.hasChannel(channel)) {
connectedNodes.remove(node);
NodeChannels nodeChannels = connectedNodes.get(node);
if (nodeChannels != null && nodeChannels.hasChannel(channel)) {
connectionLock.acquire(node.id());
if (!nodeChannels.hasChannel(channel)) { //might have been removed in the meanwhile, safety check
assert !connectedNodes.containsKey(node);
} else {
try {
nodeChannels.close();
connectedNodes.remove(node);
try {
nodeChannels.close();
} finally {
logger.debug("disconnected from [{}] on channel failure", failure, node);
transportServiceAdapter.raiseNodeDisconnected(node);
}
} finally {
logger.debug("disconnected from [{}] on channel failure", failure, node);
transportServiceAdapter.raiseNodeDisconnected(node);
connectionLock.release(node.id());
}
}
}
Expand All @@ -809,15 +827,6 @@ private Channel nodeChannel(DiscoveryNode node, TransportRequestOptions options)
return nodeChannels.channel(options.type());
}

private Object connectLock(String nodeId) {
int hash = nodeId.hashCode();
// abs returns Integer.MIN_VALUE, so we need to protect against it...
if (hash == Integer.MIN_VALUE) {
hash = 0;
}
return connectMutex[Math.abs(hash) % connectMutex.length];
}

private class ChannelCloseListener implements ChannelFutureListener {

private final DiscoveryNode node;
Expand Down

0 comments on commit 8203d4d

Please sign in to comment.