From f33f2363e2dcd505c2242532cad4a67e7c8f6941 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Tue, 14 May 2024 23:28:48 -0600 Subject: [PATCH] Ensure that mock transport does not swallow req Currently it is possible for the MockTransportService distrupt behavior to swallow requests if either the connection is already closed (in which case response pruning has already occurred) or if the behavior is added after the clear callback has been triggered. --- .../test/transport/MockTransportService.java | 20 +++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java index fc048bbe0758f..876ad6c0e688c 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java @@ -48,6 +48,7 @@ import org.elasticsearch.transport.ClusterConnectionManager; import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.ConnectionProfile; +import org.elasticsearch.transport.NodeNotConnectedException; import org.elasticsearch.transport.RequestHandlerRegistry; import org.elasticsearch.transport.TcpTransport; import org.elasticsearch.transport.Transport; @@ -72,6 +73,7 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.function.Supplier; @@ -428,6 +430,8 @@ public void addUnresponsiveRule(TransportAddress transportAddress) { ); transport().addSendBehavior(transportAddress, new StubbableTransport.SendRequestBehavior() { + + private final Semaphore sendSemaphore = new Semaphore(Integer.MAX_VALUE); private final Set toClose = ConcurrentHashMap.newKeySet(); @Override @@ -437,9 +441,16 @@ public void sendRequest( String action, TransportRequest request, TransportRequestOptions options - ) { - // don't send anything, the receiving node is unresponsive - toClose.add(connection); + ) throws IOException { + if (connection.isClosed()) { + throw new NodeNotConnectedException(connection.getNode(), "connection already closed"); + } else if (sendSemaphore.tryAcquire()) { + // don't send anything, the receiving node is unresponsive + toClose.add(connection); + sendSemaphore.release(); + } else { + connection.sendRequest(requestId, action, request, options); + } } @Override @@ -447,8 +458,9 @@ public void clearCallback() { // close to simulate that tcp-ip eventually times out and closes connection (necessary to ensure transport eventually // responds). try { + sendSemaphore.acquire(Integer.MAX_VALUE); IOUtils.close(toClose); - } catch (IOException e) { + } catch (IOException | InterruptedException e) { throw new RuntimeException(e); } }