Skip to content

Commit

Permalink
use ref
Browse files Browse the repository at this point in the history
  • Loading branch information
Tim-Brooks committed May 15, 2024
1 parent b2847b3 commit f7e2ae8
Showing 1 changed file with 13 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@
import org.elasticsearch.common.util.MockPageCacheRecycler;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.RunOnce;
import org.elasticsearch.core.AbstractRefCounted;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.RefCounted;
import org.elasticsearch.core.Strings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.UpdateForV9;
Expand Down Expand Up @@ -73,7 +75,6 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
Expand Down Expand Up @@ -431,8 +432,8 @@ public void addUnresponsiveRule(TransportAddress transportAddress) {

transport().addSendBehavior(transportAddress, new StubbableTransport.SendRequestBehavior() {

private final Semaphore sendSemaphore = new Semaphore(Integer.MAX_VALUE);
private final Set<Transport.Connection> toClose = ConcurrentHashMap.newKeySet();
private final RefCounted refs = AbstractRefCounted.of(this::closeConnections);

@Override
public void sendRequest(
Expand All @@ -444,24 +445,29 @@ public void sendRequest(
) throws IOException {
if (connection.isClosed()) {
throw new NodeNotConnectedException(connection.getNode(), "connection already closed");
} else if (sendSemaphore.tryAcquire()) {
} else if (refs.tryIncRef()) {
// don't send anything, the receiving node is unresponsive
toClose.add(connection);
sendSemaphore.release();
refs.decRef();
} else {
connection.sendRequest(requestId, action, request, options);
}
}

@Override
public void clearCallback() {
// close to simulate that tcp-ip eventually times out and closes connection (necessary to ensure transport eventually
// responds).
refs.decRef();
}

private void closeConnections() {
// close to simulate that tcp-ip eventually times out and closes connection (necessary to ensure transport eventually
// responds).
try {
sendSemaphore.acquire(Integer.MAX_VALUE);
IOUtils.close(toClose);
} catch (IOException | InterruptedException e) {
throw new RuntimeException(e);
} catch (IOException e) {
throw new AssertionError(e);
}
}
});
Expand Down

0 comments on commit f7e2ae8

Please sign in to comment.