Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add utility for asserting listeners are completed #109325

Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
984e2fd
Add utility for asserting listeners are completed
nicktindall Jun 4, 2024
1b54b82
Fix formatting
nicktindall Jun 4, 2024
c94c829
Remove unnecessary escaping
nicktindall Jun 4, 2024
99d0e16
Update server/src/main/java/org/elasticsearch/action/ActionListenerIm…
nicktindall Jun 4, 2024
323b6f6
Remove/minimise assertBusy blocks
nicktindall Jun 4, 2024
a88ce9f
Remove unnecessary System.gc()
nicktindall Jun 4, 2024
1e84c2b
Merge branch 'main' into fix/108123_add_utility_assert_listeners_comp…
nicktindall Jun 5, 2024
bf99ff0
Only test for a single call to onResponse OR onFailure
nicktindall Jun 5, 2024
9171a55
Merge branch 'main' into fix/108123_add_utility_assert_listeners_comp…
nicktindall Jun 6, 2024
b90f16b
Use LeakTracker to detect non-completed ActionListeners
nicktindall Jun 9, 2024
4f0e438
Merge branch 'main' into fix/108123_add_utility_assert_listeners_comp…
nicktindall Jun 9, 2024
83f3145
fix whitespace
nicktindall Jun 9, 2024
3507494
Test that assertAtLeastOnce delegates correctly
nicktindall Jun 9, 2024
890df5d
Simplify regex
nicktindall Jun 9, 2024
df408e4
Remove variable
nicktindall Jun 9, 2024
94ab0e9
Merge branch 'main' into fix/108123_add_utility_assert_listeners_comp…
nicktindall Jun 11, 2024
3fbd9f5
Tidy tests, add tests for LeakTracker
nicktindall Jun 11, 2024
628034f
Tidy up
nicktindall Jun 11, 2024
2bc3635
Remove unnecessary code
nicktindall Jun 11, 2024
29947e5
De-verbose-ify
nicktindall Jun 11, 2024
ad3a53c
Merge branch 'main' into fix/108123_add_utility_assert_listeners_comp…
nicktindall Jun 11, 2024
884238d
Don't use #runBefore because it changes semantics to exactly-once
nicktindall Jun 11, 2024
344477f
LeakTrackerTest improvements
nicktindall Jun 11, 2024
3372f44
Delete LeakTrackerTests (to introduce in a separate PR)
nicktindall Jun 11, 2024
cbb962e
Add message to assertLeakDetected
nicktindall Jun 11, 2024
076ed95
Update server/src/test/java/org/elasticsearch/action/ActionListenerTe…
nicktindall Jun 13, 2024
07f7127
Review feedback
nicktindall Jun 13, 2024
e45718b
Merge branch 'main' into fix/108123_add_utility_assert_listeners_comp…
nicktindall Jun 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions server/src/main/java/org/elasticsearch/action/ActionListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,28 @@ public boolean equals(Object obj) {
}
}

/**
* @return A listener which (if assertions are enabled) wraps around the given delegate and asserts that it is called at least once.
*/
static <Response> ActionListener<Response> assertAtLeastOnce(ActionListener<Response> delegate) {
if (Assertions.ENABLED) {
return new ActionListenerImplementations.AssertAtLeastOnceActionListener<>(
delegate,
// We use maybeDieOnAnotherThread here because this will get executed
// on the Cleaner thread, which silently swallows Throwable
(listener, createdAt) -> ExceptionsHelper.maybeDieOnAnotherThread(
new AssertionError(
"Expected listener "
+ delegate
+ " to be called at least once, but it was never called. Created:"
+ ExceptionsHelper.formatStackTrace(createdAt.getStackTrace())
nicktindall marked this conversation as resolved.
Show resolved Hide resolved
)
)
);
}
return delegate;
}

/**
* Execute the given action in a {@code try/catch} block which feeds all exceptions to the given listener's {@link #onFailure} method.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@

package org.elasticsearch.action;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.CheckedBiConsumer;
import org.elasticsearch.core.CheckedFunction;
import org.elasticsearch.core.CheckedRunnable;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;

import java.lang.ref.Cleaner;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
Expand All @@ -24,6 +26,8 @@
*/
class ActionListenerImplementations {

private static final Cleaner cleaner = Cleaner.create();
nicktindall marked this conversation as resolved.
Show resolved Hide resolved

private ActionListenerImplementations() {
// no instances
}
Expand Down Expand Up @@ -353,4 +357,84 @@ public String toString() {
return "notifyOnce[" + get() + "]";
}
}

/**
* An {@link ActionListener} that wraps a delegate, registering itself with a {@link Cleaner} that will check that
* the delegate was called at least once prior to being garbage collected.
* If the listener was not completed prior to becoming unreachable it will invoke the passed {@link NotCalledListener}
* from the Cleaner thread.
*/
static final class AssertAtLeastOnceActionListener<Response> extends DelegatingActionListener<Response, Response> {

interface NotCalledListener {

/**
* Called when {@link CallTrackingActionListener#run()} is called prior to the listener being
* called at least once.
*
* @param listener The listener that was not called at least once
* @param createdAt The stack trace of where the listener was created
*/
void onListenerNotCalled(ActionListener<?> listener, ElasticsearchException createdAt);
}

/**
* This is the delegate that keeps track of whether {@link #onResponse(Object)} or {@link #onFailure(Exception)}
* were called, and invokes the {@link NotCalledListener} if neither was called.
* Because it is used as the "cleanup" action by the cleaner, it must be static, so it does not hold a reference
* to the wrapper and prevent it becoming "phantom reachable".
* See <a href="https://inside.java/2022/05/25/clean-cleaner/">the guidelines</a> for more details.
*/
private static final class CallTrackingActionListener<R> implements Runnable, ActionListener<R> {

private final ActionListener<R> delegate;
private final ElasticsearchException created;
private final NotCalledListener listener;
private volatile boolean wasCalled = false;

private CallTrackingActionListener(ActionListener<R> delegate, NotCalledListener listener) {
this.delegate = delegate;
this.created = new ElasticsearchException("Listener created at...");
nicktindall marked this conversation as resolved.
Show resolved Hide resolved
this.listener = listener;
}

@Override
public void run() {
if (wasCalled == false) {
listener.onListenerNotCalled(delegate, created);
}
}

@Override
public void onResponse(R r) {
wasCalled = true;
delegate.onResponse(r);
}

@Override
public void onFailure(Exception e) {
wasCalled = true;
delegate.onFailure(e);
}
}
nicktindall marked this conversation as resolved.
Show resolved Hide resolved

/**
* @param delegate The listener to wrap
* @param listener The action to perform if we detect that it wasn't called (called on the Cleaner thread)
*/
AssertAtLeastOnceActionListener(ActionListener<Response> delegate, NotCalledListener listener) {
super(new CallTrackingActionListener<>(delegate, listener));
cleaner.register(this, (Runnable) this.delegate);
nicktindall marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public void onResponse(Response response) {
delegate.onResponse(response);
}

@Override
public String toString() {
return delegate.toString();
}
nicktindall marked this conversation as resolved.
Show resolved Hide resolved
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,18 @@
*/
package org.elasticsearch.action;

import org.apache.logging.log4j.Level;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.core.Assertions;
import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.core.CheckedRunnable;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.MockLog;
import org.elasticsearch.test.ReachabilityChecker;
import org.hamcrest.Matcher;

Expand All @@ -28,6 +31,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.IntStream;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.endsWith;
Expand Down Expand Up @@ -370,6 +374,75 @@ public void onFailure(Exception e) {
assertThat(exReference.get(), instanceOf(IllegalArgumentException.class));
}

public void testAssertAtLeastOnceWillLogAssertionErrorWhenNotResolved() throws Exception {
try (MockLog mockLog = MockLog.capture(ExceptionsHelper.class)) {
mockLog.addExpectation(
new MockLog.PatternSeenEventExpectation(
"action listener not called",
ExceptionsHelper.class.getName(),
Level.ERROR,
".*Expected listener NoopActionListener to be called at least once, but it was never called\\. Created:(.*|\\s)*"
)
);
final AtomicReference<ActionListener<Object>> listenerRef = new AtomicReference<>(
ActionListener.assertAtLeastOnce(ActionListener.noop())
);
assertBusy(() -> {
// Nullify reference so it becomes unreachable
listenerRef.set(null);
nicktindall marked this conversation as resolved.
Show resolved Hide resolved
System.gc();
mockLog.assertAllExpectationsMatched();
});
}
}
nicktindall marked this conversation as resolved.
Show resolved Hide resolved

public void testAssertAtLeastOnceWillInvokeListenerWhenNotResolved() throws Exception {
AtomicReference<Exception> listenerCreatedAt = new AtomicReference<>();
AtomicReference<ActionListener<?>> notCalledListener = new AtomicReference<>();
AtomicReference<ActionListener<Object>> listenerRef = new AtomicReference<>(
new ActionListenerImplementations.AssertAtLeastOnceActionListener<>(ActionListener.noop(), (listener, createdAt) -> {
logger.info("NotCalledListener called!");
notCalledListener.set(listener);
listenerCreatedAt.set(createdAt);
})
);
assertBusy(() -> {
// Nullify reference so it becomes unreachable
listenerRef.set(null);
System.gc();
assertNotNull(notCalledListener.get());
assertNotNull(listenerCreatedAt.get());
});
}
nicktindall marked this conversation as resolved.
Show resolved Hide resolved

public void testAssertAtLeastOnceWillNotLogWhenResolvedOrFailed() throws Exception {
final ReachabilityChecker reachabilityChecker = new ReachabilityChecker();
final AtomicBoolean notCalledListenerCalled = new AtomicBoolean();
AtomicReference<ActionListener<Object>> listenerRef = new AtomicReference<>(
reachabilityChecker.register(
new ActionListenerImplementations.AssertAtLeastOnceActionListener<>(
ActionListener.noop(),
(listener, createdAt) -> notCalledListenerCalled.set(true)
)
)
);
// Call onResponse and/or onFailure at least once
IntStream.range(0, 1 + randomInt(3)).forEach(i -> {
if (randomBoolean()) {
listenerRef.get().onResponse("succeeded");
} else {
listenerRef.get().onFailure(new RuntimeException("Failed"));
}
});
nicktindall marked this conversation as resolved.
Show resolved Hide resolved
assertBusy(() -> {
nicktindall marked this conversation as resolved.
Show resolved Hide resolved
// Nullify reference so it becomes unreachable
listenerRef.set(null);
System.gc();
nicktindall marked this conversation as resolved.
Show resolved Hide resolved
reachabilityChecker.ensureUnreachable(); // Only proceed once we know the object isn't reachable
assertFalse(notCalledListenerCalled.get());
});
}

/**
* Test that map passes the output of the function to its delegate listener and that exceptions in the function are propagated to the
* onFailure handler. Also verify that exceptions from ActionListener.onResponse does not invoke onFailure, since it is the
Expand Down