Skip to content

Commit

Permalink
Add utility for asserting listeners are completed (#109325)
Browse files Browse the repository at this point in the history
Closes #108123
  • Loading branch information
nicktindall committed Jun 13, 2024
1 parent d8ce1c1 commit 855b0b6
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 0 deletions.
11 changes: 11 additions & 0 deletions server/src/main/java/org/elasticsearch/action/ActionListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.core.CheckedRunnable;
import org.elasticsearch.core.RefCounted;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.transport.LeakTracker;

import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -425,6 +426,16 @@ public boolean equals(Object obj) {
}
}

/**
* @return A listener which (if assertions are enabled) wraps around the given delegate and asserts that it is called at least once.
*/
static <Response> ActionListener<Response> assertAtLeastOnce(ActionListener<Response> delegate) {
if (Assertions.ENABLED) {
return new ActionListenerImplementations.RunBeforeActionListener<>(delegate, LeakTracker.INSTANCE.track(delegate)::close);
}
return delegate;
}

/**
* Execute the given action in a {@code try/catch} block which feeds all exceptions to the given listener's {@link #onFailure} method.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.core.Assertions;
import org.elasticsearch.core.CheckedConsumer;
Expand Down Expand Up @@ -370,6 +371,52 @@ public void onFailure(Exception e) {
assertThat(exReference.get(), instanceOf(IllegalArgumentException.class));
}

public void testAssertAtLeastOnceWillLogAssertionErrorWhenNotResolved() throws Exception {
assumeTrue("assertAtLeastOnce will be a no-op when assertions are disabled", Assertions.ENABLED);
ActionListener<Object> listenerRef = ActionListener.assertAtLeastOnce(ActionListener.running(() -> {
// Do nothing, but don't use ActionListener.noop() as it'll never be garbage collected
}));
// Nullify reference so it becomes unreachable
listenerRef = null;
assertBusy(() -> {
System.gc();
assertLeakDetected("LEAK: resource was not cleaned up before it was garbage-collected\\.(.*|\\s)*");
});
}

public void testAssertAtLeastOnceWillNotLogWhenResolvedOrFailed() {
assumeTrue("assertAtLeastOnce will be a no-op when assertions are disabled", Assertions.ENABLED);
ReachabilityChecker reachabilityChecker = new ReachabilityChecker();
ActionListener<Object> listenerRef = reachabilityChecker.register(ActionListener.assertAtLeastOnce(ActionListener.running(() -> {
// Do nothing, but don't use ActionListener.noop() as it'll never be garbage collected
})));
// Call onResponse and/or onFailure at least once
int times = randomIntBetween(1, 3);
for (int i = 0; i < times; i++) {
if (randomBoolean()) {
listenerRef.onResponse("succeeded");
} else {
listenerRef.onFailure(new RuntimeException("Failed"));
}
}
// Nullify reference so it becomes unreachable
listenerRef = null;
reachabilityChecker.ensureUnreachable();
}

public void testAssertAtLeastOnceWillDelegateResponses() {
final var response = new Object();
assertSame(response, safeAwait(SubscribableListener.newForked(l -> ActionListener.assertAtLeastOnce(l).onResponse(response))));
}

public void testAssertAtLeastOnceWillDelegateFailures() {
final var exception = new RuntimeException();
assertSame(
exception,
safeAwaitFailure(SubscribableListener.newForked(l -> ActionListener.assertAtLeastOnce(l).onFailure(exception)))
);
}

/**
* Test that map passes the output of the function to its delegate listener and that exceptions in the function are propagated to the
* onFailure handler. Also verify that exceptions from ActionListener.onResponse does not invoke onFailure, since it is the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@
import java.util.function.IntFunction;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.DoubleStream;
import java.util.stream.IntStream;
Expand Down Expand Up @@ -788,6 +789,21 @@ protected static void checkStaticState() throws Exception {
}
}

/**
* Assert that a leak was detected, also remove the leak from the list of detected leaks
* so the test won't fail for that specific leak.
*
* @param expectedPattern A pattern that matches the detected leak's exception
*/
protected static void assertLeakDetected(String expectedPattern) {
synchronized (loggedLeaks) {
assertTrue(
"No leak detected matching the pattern: " + expectedPattern,
loggedLeaks.removeIf(leakText -> Pattern.matches(expectedPattern, leakText))
);
}
}

// this must be a separate method from other ensure checks above so suite scoped integ tests can call...TODO: fix that
public final void ensureAllSearchContextsReleased() throws Exception {
assertBusy(() -> MockSearchService.assertNoInFlightContext());
Expand Down

0 comments on commit 855b0b6

Please sign in to comment.