Skip to content

Commit

Permalink
Fix ListenableFuture Resolving Listeners under Mutex (#71943) (#71963)
Browse files Browse the repository at this point in the history
We shouldn't loop over the listeners under the mutex in `done` since in most use-cases we used `DirectExecutorService`
with this class.
Also, no need to create an `AbstractRunnable` for direct execution. We use this listener on the hot path in authentication
making this a worthwhile optimization I think.
Lastly, no need to clear and thus loop over `listeners`, the list is not used again after the `done` call returns anyway
so no point in retaining it at all (especially when in a number of use cases we add listeners only after the `done` call
so we can also save the instantiation by making the field non-final).
  • Loading branch information
original-brownbear committed Apr 21, 2021
1 parent 0a17f56 commit 2fdcc9d
Show file tree
Hide file tree
Showing 9 changed files with 59 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
package org.elasticsearch.action;

import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.common.util.concurrent.ListenableFuture;

Expand Down Expand Up @@ -105,7 +104,7 @@ public Response result() {
* Registers the given listener to be notified with the result of this step.
*/
public void addListener(ActionListener<Response> listener) {
delegate.addListener(listener, EsExecutors.newDirectExecutorService());
delegate.addListener(listener);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ private void loadRepositoryData(SnapshotsInProgress snapshotsInProgress, Snapsho
}
}
listener.onResponse(new SnapshotsStatusResponse(Collections.unmodifiableList(builder)));
}, listener::onFailure), threadPool.generic());
}, listener::onFailure), threadPool.generic(), null);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.ListenableFuture;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.json.JsonXContent;
Expand Down Expand Up @@ -1458,7 +1457,7 @@ public void onFailure(Exception e) {
ackListener.onNodeAck(getLocalNode(), exception); // other nodes have acked, but not the master.
publishListener.onFailure(exception);
}
}, EsExecutors.newDirectExecutorService(), transportService.getThreadPool().getThreadContext());
}, null, transportService.getThreadPool().getThreadContext());
}

private void cancelTimeoutHandlers() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
package org.elasticsearch.common.util.concurrent;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.Tuple;

import java.util.ArrayList;
Expand All @@ -30,31 +30,33 @@
public final class ListenableFuture<V> extends BaseFuture<V> implements ActionListener<V> {

private volatile boolean done = false;
private final List<Tuple<ActionListener<V>, ExecutorService>> listeners = new ArrayList<>();

private List<Tuple<ActionListener<V>, ExecutorService>> listeners;

/**
* Adds a listener to this future. If the future has not yet completed, the listener will be
* notified of a response or exception in a runnable submitted to the ExecutorService provided.
* notified of a response or exception on the thread completing this future.
* If the future has completed, the listener will be notified immediately without forking to
* a different thread.
*/
public void addListener(ActionListener<V> listener, ExecutorService executor) {
addListener(listener, executor, null);
public void addListener(ActionListener<V> listener) {
addListener(listener, null, null);
}

/**
* Adds a listener to this future. If the future has not yet completed, the listener will be
* notified of a response or exception in a runnable submitted to the ExecutorService provided.
* notified of a response or exception in a runnable submitted to the ExecutorService provided
* if one is provided. If a null executor is provided the listener will be executed directly
* on the thread completing the future.
* If the future has completed, the listener will be notified immediately without forking to
* a different thread.
*
* It will apply the provided ThreadContext (if not null) when executing the listening.
*/
public void addListener(ActionListener<V> listener, ExecutorService executor, ThreadContext threadContext) {
public void addListener(ActionListener<V> listener, @Nullable ExecutorService executor, ThreadContext threadContext) {
assert executor != EsExecutors.newDirectExecutorService() : "using direct executor here instead of null is needless overhead";
if (done) {
// run the callback directly, we don't hold the lock and don't need to fork!
notifyListener(listener, EsExecutors.newDirectExecutorService());
notifyListenerDirectly(listener);
} else {
final boolean run;
// check done under lock since it could have been modified and protect modifications
Expand All @@ -69,36 +71,60 @@ public void addListener(ActionListener<V> listener, ExecutorService executor, Th
} else {
wrappedListener = ContextPreservingActionListener.wrapPreservingContext(listener, threadContext);
}
if (listeners == null) {
listeners = new ArrayList<>();
}
listeners.add(new Tuple<>(wrappedListener, executor));
run = false;
}
}

if (run) {
// run the callback directly, we don't hold the lock and don't need to fork!
notifyListener(listener, EsExecutors.newDirectExecutorService());
notifyListenerDirectly(listener);
}
}
}

@Override
protected synchronized void done(boolean ignored) {
done = true;
listeners.forEach(t -> notifyListener(t.v1(), t.v2()));
// release references to any listeners as we no longer need them and will live
// much longer than the listeners in most cases
listeners.clear();
protected void done(boolean ignored) {
final List<Tuple<ActionListener<V>, ExecutorService>> existingListeners;
synchronized (this) {
done = true;
existingListeners = listeners;
if (existingListeners == null) {
return;
}
listeners = null;
}
for (Tuple<ActionListener<V>, ExecutorService> t : existingListeners) {
final ExecutorService executorService = t.v2();
final ActionListener<V> listener = t.v1();
if (executorService == null) {
notifyListenerDirectly(listener);
} else {
notifyListener(listener, executorService);
}
}
}

private void notifyListenerDirectly(ActionListener<V> listener) {
try {
// call get in a non-blocking fashion as we could be on a network thread
// or another thread like the scheduler, which we should never block!
V value = FutureUtils.get(ListenableFuture.this, 0L, TimeUnit.NANOSECONDS);
listener.onResponse(value);
} catch (Exception e) {
listener.onFailure(e);
}
}

private void notifyListener(ActionListener<V> listener, ExecutorService executorService) {
try {
executorService.execute(new ActionRunnable<V>(listener) {
executorService.execute(new Runnable() {
@Override
protected void doRun() {
// call get in a non-blocking fashion as we could be on a network thread
// or another thread like the scheduler, which we should never block!
V value = FutureUtils.get(ListenableFuture.this, 0L, TimeUnit.NANOSECONDS);
listener.onResponse(value);
public void run() {
notifyListenerDirectly(listener);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.ListenableFuture;
import org.elasticsearch.index.seqno.LocalCheckpointTracker;

Expand Down Expand Up @@ -41,7 +40,7 @@ public synchronized ActionListener<Void> markReceivedAndCreateListener(long requ
if (checkpointTracker.hasProcessed(requestSeqNo)) {
final ListenableFuture<Void> existingFuture = ongoingRequests.get(requestSeqNo);
if (existingFuture != null) {
existingFuture.addListener(listener, EsExecutors.newDirectExecutorService());
existingFuture.addListener(listener);
} else {
listener.onResponse(null);
}
Expand All @@ -53,7 +52,7 @@ public synchronized ActionListener<Void> markReceivedAndCreateListener(long requ
future.addListener(listener.delegateFailure((l, v) -> {
ongoingRequests.remove(requestSeqNo);
l.onResponse(v);
}), EsExecutors.newDirectExecutorService());
}));
return future;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.common.util.concurrent.ListenableFuture;
import org.elasticsearch.core.internal.io.IOUtils;
Expand Down Expand Up @@ -130,7 +129,7 @@ public StartRecoveryRequest getRequest() {
}

public void addListener(ActionListener<RecoveryResponse> listener) {
future.addListener(listener, EsExecutors.newDirectExecutorService());
future.addListener(listener);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.ListenableFuture;
import org.elasticsearch.common.util.concurrent.RunOnce;
import org.elasticsearch.core.internal.io.IOUtils;
Expand Down Expand Up @@ -115,14 +114,14 @@ public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfil
if (existingListener != null) {
try {
// wait on previous entry to complete connection attempt
existingListener.addListener(listener, EsExecutors.newDirectExecutorService());
existingListener.addListener(listener);
} finally {
connectingRefCounter.decRef();
}
return;
}

currentListener.addListener(listener, EsExecutors.newDirectExecutorService());
currentListener.addListener(listener);

final RunOnce releaseOnce = new RunOnce(connectingRefCounter::decRef);
internalOpenConnection(node, resolvedProfile, ActionListener.wrap(conn -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public void testListenableFutureNotifiesListeners() {
AtomicInteger notifications = new AtomicInteger(0);
final int numberOfListeners = scaledRandomIntBetween(1, 12);
for (int i = 0; i < numberOfListeners; i++) {
future.addListener(ActionListener.wrap(notifications::incrementAndGet), EsExecutors.newDirectExecutorService(), threadContext);
future.addListener(ActionListener.wrap(notifications::incrementAndGet), null, threadContext);
}

future.onResponse("");
Expand All @@ -56,7 +56,7 @@ public void testListenableFutureNotifiesListenersOnException() {
future.addListener(ActionListener.wrap(s -> fail("this should never be called"), e -> {
assertEquals(exception, e);
notifications.incrementAndGet();
}), EsExecutors.newDirectExecutorService(), threadContext);
}), null, threadContext);
}

future.onFailure(exception);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.ListenableFuture;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.watcher.FileChangesListener;
Expand Down Expand Up @@ -809,7 +808,7 @@ void triggerReload(ActionListener<Void> toNotify) {
future = reloadFutureRef.get();
}
}
future.addListener(toNotify, EsExecutors.newDirectExecutorService(), null);
future.addListener(toNotify);
}

void reloadAsync(final ListenableFuture<Void> future) {
Expand Down

0 comments on commit 2fdcc9d

Please sign in to comment.