Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.AbstractRefCounted;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.RefCounted;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.core.internal.io.IOUtils;
Expand Down Expand Up @@ -202,20 +203,22 @@ private void connectToNodeOrRetry(
ActionListener.wrap(
conn -> connectionValidator.validate(conn, resolvedProfile, ActionListener.runAfter(ActionListener.wrap(ignored -> {
assert Transports.assertNotTransportThread("connection validator success");
final RefCounted managerRefs = AbstractRefCounted.of(conn::onRemoved);
try {
if (connectedNodes.putIfAbsent(node, conn) != null) {
assert false : "redundant connection to " + node;
logger.warn("existing connection to node [{}], closing new redundant connection", node);
IOUtils.closeWhileHandlingException(conn);
} else {
logger.debug("connected to node [{}]", node);
managerRefs.incRef();
try {
connectionListener.onNodeConnected(node, conn);
} finally {
conn.addCloseListener(ActionListener.wrap(() -> {
connectedNodes.remove(node, conn);
connectionListener.onNodeDisconnected(node, conn);
conn.onRemoved();
managerRefs.decRef();
}));

conn.addCloseListener(ActionListener.wrap(() -> {
Expand All @@ -236,6 +239,7 @@ private void connectToNodeOrRetry(
} finally {
ListenableFuture<Transport.Connection> future = pendingConnections.remove(node);
assert future == currentListener : "Listener in pending map is different than the expected listener";
managerRefs.decRef();
releaseOnce.run();
future.onResponse(conn);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.RunOnce;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.AbstractRefCounted;
import org.elasticsearch.core.RefCounted;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.core.TimeValue;
Expand Down Expand Up @@ -116,15 +118,16 @@ public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connecti

assertFalse(connectionManager.nodeConnected(node));

AtomicReference<Transport.Connection> connectionRef = new AtomicReference<>();
AtomicReference<Transport.Connection> validatedConnectionRef = new AtomicReference<Transport.Connection>();
ConnectionManager.ConnectionValidator validator = (c, p, l) -> {
connectionRef.set(c);
validatedConnectionRef.set(c);
l.onResponse(null);
};
PlainActionFuture.get(fut -> connectionManager.connectToNode(node, connectionProfile, validator, fut.map(x -> null)));

assertFalse(connection.isClosed());
assertTrue(connectionManager.nodeConnected(node));
assertSame(connection, validatedConnectionRef.get());
assertSame(connection, connectionManager.getConnection(node));
assertEquals(1, connectionManager.size());
assertEquals(1, nodeConnectedCount.get());
Expand Down Expand Up @@ -494,35 +497,44 @@ public void testConcurrentConnectsAndDisconnects() throws Exception {
});
};

final Semaphore pendingConnections = new Semaphore(between(1, 1000));
final int connectionCount = between(1, 1000);
final int disconnectionCount = randomFrom(connectionCount, connectionCount - 1, between(0, connectionCount - 1));
final Semaphore connectionPermits = new Semaphore(connectionCount);
final Semaphore disconnectionPermits = new Semaphore(disconnectionCount);
final int threadCount = between(1, 10);
final CountDownLatch countDownLatch = new CountDownLatch(threadCount);

final Runnable action = new Runnable() {
@Override
public void run() {
if (pendingConnections.tryAcquire()) {
if (connectionPermits.tryAcquire()) {
connectionManager.connectToNode(node, null, validator, new ActionListener<Releasable>() {
@Override
public void onResponse(Releasable releasable) {
if (connectionManager.nodeConnected(node) == false) {
final String description = releasable.toString();
fail(description);
}
Releasables.close(releasable);
threadPool.generic().execute(() -> run());
if (disconnectionPermits.tryAcquire()) {
Releasables.close(releasable);
}
runAgain();
}

@Override
public void onFailure(Exception e) {
if (e instanceof ConnectTransportException
&& e.getMessage().contains("concurrently connecting and disconnecting")) {
pendingConnections.release();
threadPool.generic().execute(() -> run());
connectionPermits.release();
runAgain();
} else {
throw new AssertionError("unexpected", e);
}
}

private void runAgain() {
threadPool.generic().execute(() -> run());
}
});
} else {
countDownLatch.countDown();
Expand All @@ -536,7 +548,116 @@ public void onFailure(Exception e) {

assertTrue("threads did not all complete", countDownLatch.await(10, TimeUnit.SECONDS));
assertTrue("validatorPermits not all released", validatorPermits.tryAcquire(Integer.MAX_VALUE, 10, TimeUnit.SECONDS));
assertFalse("node still connected", connectionManager.nodeConnected(node));
assertEquals("node still connected", disconnectionCount < connectionCount, connectionManager.nodeConnected(node));
connectionManager.close();
}

@TestLogging(reason = "ignore copious 'closed by remote' messages", value = "org.elasticsearch.transport.ClusterConnectionManager:WARN")
public void testConcurrentConnectsAndCloses() throws Exception {
final DiscoveryNode node = new DiscoveryNode("", new TransportAddress(InetAddress.getLoopbackAddress(), 0), Version.CURRENT);
doAnswer(invocationOnMock -> {
@SuppressWarnings("unchecked")
ActionListener<Transport.Connection> listener = (ActionListener<Transport.Connection>) invocationOnMock.getArguments()[2];
listener.onResponse(new TestConnect(node));
return null;
}).when(transport).openConnection(eq(node), any(), anyActionListener());

final Semaphore validatorPermits = new Semaphore(Integer.MAX_VALUE);

final ConnectionManager.ConnectionValidator validator = (c, p, l) -> {
assertTrue(validatorPermits.tryAcquire());
threadPool.executor(randomFrom(ThreadPool.Names.GENERIC, ThreadPool.Names.SAME)).execute(() -> {
try {
l.onResponse(null);
} finally {
validatorPermits.release();
}
});
};

final Semaphore closePermits = new Semaphore(between(1, 1000));
final int connectThreadCount = between(1, 3);
final int closeThreadCount = between(1, 3);
final CountDownLatch countDownLatch = new CountDownLatch(connectThreadCount + closeThreadCount);

final PlainActionFuture<Boolean> cleanlyOpenedConnectionFuture = new PlainActionFuture<>();
final RefCounted closingRefs = AbstractRefCounted.of(
() -> connectionManager.connectToNode(
node,
null,
validator,
cleanlyOpenedConnectionFuture.map(r -> connectionManager.nodeConnected(node))
)
);

final Runnable connectAction = new Runnable() {
private void runAgain() {
threadPool.generic().execute(this);
}

@Override
public void run() {
if (cleanlyOpenedConnectionFuture.isDone() == false) {
connectionManager.connectToNode(node, null, validator, new ActionListener<Releasable>() {
@Override
public void onResponse(Releasable releasable) {
runAgain();
}

@Override
public void onFailure(Exception e) {
if (e instanceof ConnectTransportException
&& e.getMessage().contains("concurrently connecting and disconnecting")) {
runAgain();
} else {
throw new AssertionError("unexpected", e);
}
}

});
} else {
countDownLatch.countDown();
}
}
};

final Runnable closeAction = new Runnable() {
private void runAgain() {
threadPool.generic().execute(this);
}

@Override
public void run() {
closingRefs.decRef();
if (closePermits.tryAcquire() && closingRefs.tryIncRef()) {
try {
Transport.Connection connection = connectionManager.getConnection(node);
connection.addRemovedListener(ActionListener.wrap(this::runAgain));
connection.close();
} catch (NodeNotConnectedException e) {
closePermits.release();
runAgain();
}
} else {
countDownLatch.countDown();
}
}
};

for (int i = 0; i < connectThreadCount; i++) {
connectAction.run();
}
for (int i = 0; i < closeThreadCount; i++) {
closingRefs.incRef();
closeAction.run();
}
closingRefs.decRef();

assertTrue("threads did not all complete", countDownLatch.await(10, TimeUnit.SECONDS));
assertFalse(closingRefs.hasReferences());
assertTrue(cleanlyOpenedConnectionFuture.actionGet(0, TimeUnit.SECONDS));

assertTrue("validatorPermits not all released", validatorPermits.tryAcquire(Integer.MAX_VALUE, 10, TimeUnit.SECONDS));
connectionManager.close();
}

Expand Down