From 8203d4dbcf8e61a79b72d5e9ac86bda0ff447e77 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Tue, 3 Sep 2013 17:56:52 +0200 Subject: [PATCH] fix potential blocking of NettyTransport connect and disconnect methods Currently, in NettyTransport the locks for connecting and disconnecting channels are stored in a ConcurrentMap that has 500 entries. A tread can acquire a lock for an id and the lock returned is chosen on a hash function computed from the id. Unfortunately, a collision of two ids can cause a deadlock as follows: Scenario: one master (no data), one datanode (only data node) DiscoveryNode id of master is X DiscoveryNode id of datanode is Y Both X and Y cause the same lock to be returned by NettyTransport#connectLock() Both are up and running, all is fine until master stops. Thread 1: The master fault detection of the datanode is notified (onNodeDisconnected()), which in turn leads the node to try and reconnect to master via the callstack titled "Thread 1" below. -> connectToNode() is called and lock for X is acquired. The method waits for 45s for the cannels to reconnect. Furthermore, Thread 1 holds the NettyTransport#masterNodeMutex. Thread 2: The connection fails with an exception (connection refused, see callstack below), because the master shut down already. The exception is handled in NettyTransport#exceptionCaught which calls NettyTransport#disconnectFromNodeChannel. This method acquires the lock for Y (see Thread 2 below). Now, if Y and X have two different locks, this would get the lock, disconnect the channels and notify thread 1. But since X and Y have the same locks, thread 2 is deadlocked with thread 1 which waits for 45s. In this time, no thread can acquire the masterNodeMutex (held by thread 1), so the node can, for example, not stop. This commit introduces a mechanism that assures unique locks for unique ids. This lock is not reentrant and therfore assures that threads can not end up in an infinite recursion (see Thread 3 below for an example on how a thread can aquire a lock twice). While this is not a problem right now, it is potentially dangerous to have it that way, because the callstacks are complex as is and slight changes might cause unecpected recursions. Thread 1 ---- owns: Object (id=114) owns: Object (id=118) waiting for: DefaultChannelFuture (id=140) Object.wait(long) line: not available [native method] DefaultChannelFuture(Object).wait(long, int) line: 461 DefaultChannelFuture.await0(long, boolean) line: 311 DefaultChannelFuture.awaitUninterruptibly(long) line: 285 NettyTransport.connectToChannels(NettyTransport$NodeChannels, DiscoveryNode) line: 672 NettyTransport.connectToNode(DiscoveryNode, boolean) line: 609 NettyTransport.connectToNode(DiscoveryNode) line: 579 TransportService.connectToNode(DiscoveryNode) line: 129 MasterFaultDetection.handleTransportDisconnect(DiscoveryNode) line: 195 MasterFaultDetection.access$0(MasterFaultDetection, DiscoveryNode) line: 188 MasterFaultDetection$FDConnectionListener.onNodeDisconnected(DiscoveryNode) line: 245 TransportService$Adapter$2.run() line: 298 EsThreadPoolExecutor(ThreadPoolExecutor).runWorker(ThreadPoolExecutor$Worker) line: 1145 ThreadPoolExecutor$Worker.run() line: 615 Thread.run() line: 724 Thread 2 ------- waiting for: Object (id=114) NettyTransport.disconnectFromNodeChannel(Channel, Throwable) line: 790 NettyTransport.exceptionCaught(ChannelHandlerContext, ExceptionEvent) line: 495 MessageChannelHandler.exceptionCaught(ChannelHandlerContext, ExceptionEvent) line: 228 MessageChannelHandler(SimpleChannelUpstreamHandler).handleUpstream(ChannelHandlerContext, ChannelEvent) line: 112 DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline$DefaultChannelHandlerContext, ChannelEvent) line: 564 DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(ChannelEvent) line: 791 SizeHeaderFrameDecoder(FrameDecoder).exceptionCaught(ChannelHandlerContext, ExceptionEvent) line: 377 SizeHeaderFrameDecoder(SimpleChannelUpstreamHandler).handleUpstream(ChannelHandlerContext, ChannelEvent) line: 112 DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline$DefaultChannelHandlerContext, ChannelEvent) line: 564 DefaultChannelPipeline.sendUpstream(ChannelEvent) line: 559 Channels.fireExceptionCaught(Channel, Throwable) line: 525 NioClientBoss.processSelectedKeys(Set) line: 110 NioClientBoss.process(Selector) line: 79 NioClientBoss(AbstractNioSelector).run() line: 312 NioClientBoss.run() line: 42 ThreadRenamingRunnable.run() line: 108 DeadLockProofWorker$1.run() line: 42 ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) line: 1145 ThreadPoolExecutor$Worker.run() line: 615 Thread.run() line: 724 Thread 3 --------- org.elasticsearch.transport.netty.NettyTransport.disconnectFromNode(NettyTransport.java:772) org.elasticsearch.transport.netty.NettyTransport.access$1200(NettyTransport.java:92) org.elasticsearch.transport.netty.NettyTransport$ChannelCloseListener.operationComplete(NettyTransport.java:830) org.jboss.netty.channel.DefaultChannelFuture.notifyListener(DefaultChannelFuture.java:427) org.jboss.netty.channel.DefaultChannelFuture.notifyListeners(DefaultChannelFuture.java:418) org.jboss.netty.channel.DefaultChannelFuture.setSuccess(DefaultChannelFuture.java:362) org.jboss.netty.channel.AbstractChannel$ChannelCloseFuture.setClosed(AbstractChannel.java:355) org.jboss.netty.channel.AbstractChannel.setClosed(AbstractChannel.java:185) org.jboss.netty.channel.socket.nio.AbstractNioChannel.setClosed(AbstractNioChannel.java:197) org.jboss.netty.channel.socket.nio.NioSocketChannel.setClosed(NioSocketChannel.java:84) org.jboss.netty.channel.socket.nio.AbstractNioWorker.close(AbstractNioWorker.java:357) org.jboss.netty.channel.socket.nio.NioClientSocketPipelineSink.eventSunk(NioClientSocketPipelineSink.java:58) org.jboss.netty.channel.Channels.close(Channels.java:812) org.jboss.netty.channel.AbstractChannel.close(AbstractChannel.java:197) org.elasticsearch.transport.netty.NettyTransport$NodeChannels.closeChannelsAndWait(NettyTransport.java:892) org.elasticsearch.transport.netty.NettyTransport$NodeChannels.close(NettyTransport.java:879) org.elasticsearch.transport.netty.NettyTransport.disconnectFromNode(NettyTransport.java:778) org.elasticsearch.transport.netty.NettyTransport.access$1200(NettyTransport.java:92) org.elasticsearch.transport.netty.NettyTransport$ChannelCloseListener.operationComplete(NettyTransport.java:830) org.jboss.netty.channel.DefaultChannelFuture.notifyListener(DefaultChannelFuture.java:427) org.jboss.netty.channel.DefaultChannelFuture.notifyListeners(DefaultChannelFuture.java:418) org.jboss.netty.channel.DefaultChannelFuture.setSuccess(DefaultChannelFuture.java:362) org.jboss.netty.channel.AbstractChannel$ChannelCloseFuture.setClosed(AbstractChannel.java:355) org.jboss.netty.channel.AbstractChannel.setClosed(AbstractChannel.java:185) org.jboss.netty.channel.socket.nio.AbstractNioChannel.setClosed(AbstractNioChannel.java:197) org.jboss.netty.channel.socket.nio.NioSocketChannel.setClosed(NioSocketChannel.java:84) org.jboss.netty.channel.socket.nio.AbstractNioWorker.close(AbstractNioWorker.java:357) org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:93) org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:109) org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:312) org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:90) org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:724) --- .../common/util/concurrent/KeyedLock.java | 96 ++++++++++++ .../transport/netty/NettyTransport.java | 95 ++++++------ .../unit/transport/netty/KeyedLockTests.java | 141 ++++++++++++++++++ 3 files changed, 289 insertions(+), 43 deletions(-) create mode 100644 src/main/java/org/elasticsearch/common/util/concurrent/KeyedLock.java create mode 100644 src/test/java/org/elasticsearch/test/unit/transport/netty/KeyedLockTests.java diff --git a/src/main/java/org/elasticsearch/common/util/concurrent/KeyedLock.java b/src/main/java/org/elasticsearch/common/util/concurrent/KeyedLock.java new file mode 100644 index 0000000000000..6e279fbab0e52 --- /dev/null +++ b/src/main/java/org/elasticsearch/common/util/concurrent/KeyedLock.java @@ -0,0 +1,96 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.util.concurrent; + +import org.elasticsearch.ElasticSearchIllegalStateException; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; + +/** + * This class manages locks. Locks can be accessed with an identifier and are + * created the first time they are acquired and removed if no thread hold the + * lock. The latter is important to assure that the list of locks does not grow + * infinitely. + * + * A Thread can acquire a lock only once. + * + * */ + +public class KeyedLock { + + private final ConcurrentMap map = new ConcurrentHashMap(); + + private final ThreadLocal threadLocal = new ThreadLocal(); + + public void acquire(T key) { + while (true) { + if (threadLocal.get() != null) { + // if we are here, the thread already has the lock + throw new ElasticSearchIllegalStateException("Lock already accquired in Thread" + Thread.currentThread().getId() + + " for key " + key); + } + KeyLock perNodeLock = map.get(key); + if (perNodeLock == null) { + KeyLock newLock = new KeyLock(); + perNodeLock = map.putIfAbsent(key, newLock); + if (perNodeLock == null) { + newLock.lock(); + threadLocal.set(newLock); + return; + } + } + assert perNodeLock != null; + int i = perNodeLock.count.get(); + if (i > 0 && perNodeLock.count.compareAndSet(i, i + 1)) { + perNodeLock.lock(); + threadLocal.set(perNodeLock); + return; + } + } + } + + public void release(T key) { + KeyLock lock = threadLocal.get(); + if (lock == null) { + throw new ElasticSearchIllegalStateException("Lock not accquired"); + } + assert lock.isHeldByCurrentThread(); + assert lock == map.get(key); + lock.unlock(); + threadLocal.set(null); + int decrementAndGet = lock.count.decrementAndGet(); + if (decrementAndGet == 0) { + map.remove(key, lock); + } + } + + @SuppressWarnings("serial") + private final static class KeyLock extends ReentrantLock { + private final AtomicInteger count = new AtomicInteger(1); + } + + public boolean hasLockedKeys() { + return !map.isEmpty(); + } + +} diff --git a/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java b/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java index 9764831da785f..f641436b6ad44 100644 --- a/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java +++ b/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java @@ -44,6 +44,7 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.common.util.concurrent.KeyedLock; import org.elasticsearch.monitor.jvm.JvmInfo; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.*; @@ -151,7 +152,8 @@ public class NettyTransport extends AbstractLifecycleComponent implem private volatile BoundTransportAddress boundAddress; - private final Object[] connectMutex; + private final KeyedLock connectionLock = new KeyedLock(); + // this lock is here to make sure we close this transport and disconnect all the client nodes // connections while no connect operations is going on... (this might help with 100% CPU when stopping the transport?) private final ReadWriteLock globalLock = new ReentrantReadWriteLock(); @@ -167,11 +169,6 @@ public NettyTransport(Settings settings, ThreadPool threadPool, NetworkService n System.setProperty("org.jboss.netty.epollBugWorkaround", "true"); } - this.connectMutex = new Object[500]; - for (int i = 0; i < connectMutex.length; i++) { - connectMutex[i] = new Object(); - } - this.workerCount = componentSettings.getAsInt("worker_count", EsExecutors.boundedNumberOfProcessors() * 2); this.bossCount = componentSettings.getAsInt("boss_count", 1); this.blockingServer = settings.getAsBoolean("transport.tcp.blocking_server", settings.getAsBoolean(TCP_BLOCKING_SERVER, settings.getAsBoolean(TCP_BLOCKING, false))); @@ -591,15 +588,17 @@ public void connectToNode(DiscoveryNode node, boolean light) { if (!lifecycle.started()) { throw new ElasticSearchIllegalStateException("can't add nodes to a stopped transport"); } - synchronized (connectLock(node.id())) { + NodeChannels nodeChannels = connectedNodes.get(node); + if (nodeChannels != null) { + return; + } + connectionLock.acquire(node.id()); + try { if (!lifecycle.started()) { throw new ElasticSearchIllegalStateException("can't add nodes to a stopped transport"); } try { - NodeChannels nodeChannels = connectedNodes.get(node); - if (nodeChannels != null) { - return; - } + if (light) { nodeChannels = connectToChannelsLight(node); @@ -629,6 +628,8 @@ public void connectToNode(DiscoveryNode node, boolean light) { } catch (Exception e) { throw new ConnectTransportException(node, "General node connection failure", e); } + } finally { + connectionLock.release(node.id()); } } finally { globalLock.readLock().unlock(); @@ -750,15 +751,18 @@ private void connectToChannels(NodeChannels nodeChannels, DiscoveryNode node) { @Override public void disconnectFromNode(DiscoveryNode node) { - synchronized (connectLock(node.id())) { - NodeChannels nodeChannels = connectedNodes.remove(node); - if (nodeChannels != null) { - try { - nodeChannels.close(); - } finally { - logger.debug("disconnected from [{}]", node); - transportServiceAdapter.raiseNodeDisconnected(node); - } + NodeChannels nodeChannels = connectedNodes.remove(node); + if (nodeChannels != null) { + connectionLock.acquire(node.id()); + try { + try { + nodeChannels.close(); + } finally { + logger.debug("disconnected from [{}]", node); + transportServiceAdapter.raiseNodeDisconnected(node); + } + } finally { + connectionLock.release(node.id()); } } } @@ -767,15 +771,22 @@ public void disconnectFromNode(DiscoveryNode node) { * Disconnects from a node, only if the relevant channel is found to be part of the node channels. */ private void disconnectFromNode(DiscoveryNode node, Channel channel, String reason) { - synchronized (connectLock(node.id())) { - NodeChannels nodeChannels = connectedNodes.get(node); - if (nodeChannels != null && nodeChannels.hasChannel(channel)) { - connectedNodes.remove(node); + NodeChannels nodeChannels = connectedNodes.get(node); + if (nodeChannels != null && nodeChannels.hasChannel(channel)) { + connectionLock.acquire(node.id()); + if (!nodeChannels.hasChannel(channel)){ //might have been removed in the meanwhile, safety check + assert !connectedNodes.containsKey(node); + } else { try { - nodeChannels.close(); + connectedNodes.remove(node); + try { + nodeChannels.close(); + } finally { + logger.debug("disconnected from [{}], {}", node, reason); + transportServiceAdapter.raiseNodeDisconnected(node); + } } finally { - logger.debug("disconnected from [{}], {}", node, reason); - transportServiceAdapter.raiseNodeDisconnected(node); + connectionLock.release(node.id()); } } } @@ -786,15 +797,22 @@ private void disconnectFromNode(DiscoveryNode node, Channel channel, String reas */ private void disconnectFromNodeChannel(Channel channel, Throwable failure) { for (DiscoveryNode node : connectedNodes.keySet()) { - synchronized (connectLock(node.id())) { - NodeChannels nodeChannels = connectedNodes.get(node); - if (nodeChannels != null && nodeChannels.hasChannel(channel)) { - connectedNodes.remove(node); + NodeChannels nodeChannels = connectedNodes.get(node); + if (nodeChannels != null && nodeChannels.hasChannel(channel)) { + connectionLock.acquire(node.id()); + if (!nodeChannels.hasChannel(channel)) { //might have been removed in the meanwhile, safety check + assert !connectedNodes.containsKey(node); + } else { try { - nodeChannels.close(); + connectedNodes.remove(node); + try { + nodeChannels.close(); + } finally { + logger.debug("disconnected from [{}] on channel failure", failure, node); + transportServiceAdapter.raiseNodeDisconnected(node); + } } finally { - logger.debug("disconnected from [{}] on channel failure", failure, node); - transportServiceAdapter.raiseNodeDisconnected(node); + connectionLock.release(node.id()); } } } @@ -809,15 +827,6 @@ private Channel nodeChannel(DiscoveryNode node, TransportRequestOptions options) return nodeChannels.channel(options.type()); } - private Object connectLock(String nodeId) { - int hash = nodeId.hashCode(); - // abs returns Integer.MIN_VALUE, so we need to protect against it... - if (hash == Integer.MIN_VALUE) { - hash = 0; - } - return connectMutex[Math.abs(hash) % connectMutex.length]; - } - private class ChannelCloseListener implements ChannelFutureListener { private final DiscoveryNode node; diff --git a/src/test/java/org/elasticsearch/test/unit/transport/netty/KeyedLockTests.java b/src/test/java/org/elasticsearch/test/unit/transport/netty/KeyedLockTests.java new file mode 100644 index 0000000000000..a36c41e74938e --- /dev/null +++ b/src/test/java/org/elasticsearch/test/unit/transport/netty/KeyedLockTests.java @@ -0,0 +1,141 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.test.unit.transport.netty; + +import org.elasticsearch.ElasticSearchIllegalStateException; +import org.elasticsearch.common.util.concurrent.KeyedLock; +import org.elasticsearch.test.integration.ElasticsearchTestCase; +import org.hamcrest.Matchers; +import org.junit.Test; + +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.not; + +public class KeyedLockTests extends ElasticsearchTestCase { + + @Test + public void checkIfMapEmptyAfterLotsOfAcquireAndReleases() throws InterruptedException { + ConcurrentHashMap counter = new ConcurrentHashMap(); + ConcurrentHashMap safeCounter = new ConcurrentHashMap(); + KeyedLock connectionLock = new KeyedLock(); + String[] names = new String[randomIntBetween(1, 40)]; + for (int i = 0; i < names.length; i++) { + names[i] = randomRealisticUnicodeOfLengthBetween(10, 20); + } + CountDownLatch startLatch = new CountDownLatch(1); + int numThreads = randomIntBetween(3, 10); + Thread[] threads = new Thread[numThreads]; + for (int i = 0; i < numThreads; i++) { + threads[i] = new AcquireAndReleaseThread(startLatch, connectionLock, names, counter, safeCounter); + } + for (int i = 0; i < numThreads; i++) { + threads[i].start(); + } + startLatch.countDown(); + for (int i = 0; i < numThreads; i++) { + threads[i].join(); + } + assertThat(connectionLock.hasLockedKeys(), equalTo(false)); + + Set> entrySet = counter.entrySet(); + assertThat(counter.size(), equalTo(safeCounter.size())); + for (Entry entry : entrySet) { + AtomicInteger atomicInteger = safeCounter.get(entry.getKey()); + assertThat(atomicInteger, not(Matchers.nullValue())); + assertThat(atomicInteger.get(), equalTo(entry.getValue())); + } + } + + @Test(expected = ElasticSearchIllegalStateException.class) + public void checkCannotAcquireTwoLocks() throws InterruptedException { + ConcurrentHashMap counters = new ConcurrentHashMap(); + ConcurrentHashMap safeCounter = new ConcurrentHashMap(); + KeyedLock connectionLock = new KeyedLock(); + String[] names = new String[randomIntBetween(1, 40)]; + connectionLock = new KeyedLock(); + String name = randomRealisticUnicodeOfLength(atLeast(10)); + connectionLock.acquire(name); + connectionLock.acquire(name); + } + + @Test(expected = ElasticSearchIllegalStateException.class) + public void checkCannotReleaseUnacquiredLock() throws InterruptedException { + ConcurrentHashMap counters = new ConcurrentHashMap(); + ConcurrentHashMap safeCounter = new ConcurrentHashMap(); + KeyedLock connectionLock = new KeyedLock(); + String[] names = new String[randomIntBetween(1, 40)]; + connectionLock = new KeyedLock(); + String name = randomRealisticUnicodeOfLength(atLeast(10)); + connectionLock.release(name); + } + + public static class AcquireAndReleaseThread extends Thread { + private CountDownLatch startLatch; + KeyedLock connectionLock; + String[] names; + ConcurrentHashMap counter; + ConcurrentHashMap safeCounter; + + public AcquireAndReleaseThread(CountDownLatch startLatch, KeyedLock connectionLock, String[] names, + ConcurrentHashMap counter, ConcurrentHashMap safeCounter) { + this.startLatch = startLatch; + this.connectionLock = connectionLock; + this.names = names; + this.counter = counter; + this.safeCounter = safeCounter; + } + + public void run() { + try { + startLatch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(); + } + int numRuns = atLeast(500); + for (int i = 0; i < numRuns; i++) { + String curName = names[randomInt(names.length - 1)]; + connectionLock.acquire(curName); + try { + Integer integer = counter.get(curName); + if (integer == null) { + counter.put(curName, 1); + } else { + counter.put(curName, integer.intValue() + 1); + } + } finally { + connectionLock.release(curName); + } + AtomicInteger atomicInteger = new AtomicInteger(0); + AtomicInteger value = safeCounter.putIfAbsent(curName, atomicInteger); + if (value == null) { + atomicInteger.incrementAndGet(); + } else { + value.incrementAndGet(); + } + } + } + } +}