diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java index 876ad6c0e688c..93c9f63fc5e63 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java @@ -29,8 +29,10 @@ import org.elasticsearch.common.util.MockPageCacheRecycler; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.RunOnce; +import org.elasticsearch.core.AbstractRefCounted; import org.elasticsearch.core.IOUtils; import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.RefCounted; import org.elasticsearch.core.Strings; import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.UpdateForV9; @@ -73,7 +75,6 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingDeque; -import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.function.Supplier; @@ -431,8 +432,8 @@ public void addUnresponsiveRule(TransportAddress transportAddress) { transport().addSendBehavior(transportAddress, new StubbableTransport.SendRequestBehavior() { - private final Semaphore sendSemaphore = new Semaphore(Integer.MAX_VALUE); private final Set toClose = ConcurrentHashMap.newKeySet(); + private final RefCounted refs = AbstractRefCounted.of(this::closeConnections); @Override public void sendRequest( @@ -444,10 +445,10 @@ public void sendRequest( ) throws IOException { if (connection.isClosed()) { throw new NodeNotConnectedException(connection.getNode(), "connection already closed"); - } else if (sendSemaphore.tryAcquire()) { + } else if (refs.tryIncRef()) { // don't send anything, the receiving node is unresponsive toClose.add(connection); - sendSemaphore.release(); + refs.decRef(); } else { connection.sendRequest(requestId, action, request, options); } @@ -455,13 +456,18 @@ public void sendRequest( @Override public void clearCallback() { + // close to simulate that tcp-ip eventually times out and closes connection (necessary to ensure transport eventually + // responds). + refs.decRef(); + } + + private void closeConnections() { // close to simulate that tcp-ip eventually times out and closes connection (necessary to ensure transport eventually // responds). try { - sendSemaphore.acquire(Integer.MAX_VALUE); IOUtils.close(toClose); - } catch (IOException | InterruptedException e) { - throw new RuntimeException(e); + } catch (IOException e) { + throw new AssertionError(e); } } });