Skip to content

Commit

Permalink
Reduce indirection around cancellable transport tasks (#85372)
Browse files Browse the repository at this point in the history
No need for noop wrapping of response handlers or notifying empty listener lists.
The empty listener notifying just creates needless method size as does the needless
wrapping. The send and response handling path is obviousyly very hot code and it's
nice to reduce indirection and allocation here if not for performance now but at
least to make profiling easier to interpret.
Also, nesting handlers less makes the logging of those handlers less verbose while
still providing the same amount of information.
  • Loading branch information
original-brownbear committed Mar 29, 2022
1 parent 55af878 commit 6a60888
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 41 deletions.
28 changes: 15 additions & 13 deletions server/src/main/java/org/elasticsearch/tasks/TaskManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.core.TimeValue;
Expand Down Expand Up @@ -156,13 +157,13 @@ public <Request extends ActionRequest, Response extends ActionResponse> Task reg
if (request.getParentTask().isSet()) {
unregisterChildNode = registerChildConnection(request.getParentTask().getId(), localConnection);
} else {
unregisterChildNode = () -> {};
unregisterChildNode = null;
}
final Task task;
try {
task = register(type, action.actionName, request);
} catch (TaskCancelledException e) {
unregisterChildNode.close();
Releasables.close(unregisterChildNode);
throw e;
}
// NOTE: ActionListener cannot infer Response, see https://bugs.openjdk.java.net/browse/JDK-8203195
Expand Down Expand Up @@ -258,7 +259,10 @@ public Task unregister(Task task) {
/**
* Register a connection on which a child task will execute on the target connection. The returned {@link Releasable} must be called
* to unregister the child connection once the child task is completed or failed.
*
* @return Releasable that must be closed once the child task completes or {@code null} if no cancellable task for the given id exists
*/
@Nullable
public Releasable registerChildConnection(long taskId, Transport.Connection childConnection) {
assert TransportService.unwrapConnection(childConnection) == childConnection : "Child connection must be unwrapped";
final CancellableTaskHolder holder = cancellableTasks.get(taskId);
Expand All @@ -270,7 +274,7 @@ public Releasable registerChildConnection(long taskId, Transport.Connection chil
holder.unregisterChildConnection(childConnection);
});
}
return () -> {};
return null;
}

/**
Expand Down Expand Up @@ -546,12 +550,11 @@ public void finish() {
final List<Runnable> listeners;
synchronized (this) {
this.finished = true;
if (cancellationListeners != null) {
listeners = cancellationListeners;
cancellationListeners = null;
} else {
listeners = Collections.emptyList();
if (cancellationListeners == null) {
return;
}
listeners = cancellationListeners;
cancellationListeners = null;
}
// We need to call the listener outside of the synchronised section to avoid potential bottle necks
// in the listener synchronization
Expand Down Expand Up @@ -595,12 +598,11 @@ void unregisterChildConnection(Transport.Connection node) {
if (childTasksPerConnection.addTo(node, -1) == 0) {
childTasksPerConnection.remove(node);
}
if (childTasksPerConnection.isEmpty() && this.childTaskCompletedListeners != null) {
listeners = childTaskCompletedListeners;
childTaskCompletedListeners = null;
} else {
listeners = Collections.emptyList();
if (childTasksPerConnection.isEmpty() == false || this.childTaskCompletedListeners == null) {
return;
}
listeners = childTaskCompletedListeners;
childTaskCompletedListeners = null;
}
notifyListeners(listeners);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -723,34 +723,11 @@ public final <T extends TransportResponse> void sendRequest(
// unwrap the connection and keep track of the connection to the proxy node instead of the proxy connection.
final Transport.Connection unwrappedConn = unwrapConnection(connection);
final Releasable unregisterChildNode = taskManager.registerChildConnection(request.getParentTask().getId(), unwrappedConn);
delegate = new TransportResponseHandler<>() {
@Override
public void handleResponse(T response) {
unregisterChildNode.close();
handler.handleResponse(response);
}

@Override
public void handleException(TransportException exp) {
unregisterChildNode.close();
handler.handleException(exp);
}

@Override
public String executor() {
return handler.executor();
}

@Override
public T read(StreamInput in) throws IOException {
return handler.read(in);
}

@Override
public String toString() {
return getClass().getName() + "/[" + action + "]:" + handler.toString();
}
};
if (unregisterChildNode == null) {
delegate = handler;
} else {
delegate = new UnregisterChildTransportResponseHandler<>(unregisterChildNode, handler, action);
}
} else {
delegate = handler;
}
Expand Down Expand Up @@ -1618,4 +1595,32 @@ Releasable withRef() {
assert Version.CURRENT.major == Version.V_7_0_0.major + 1; // we can remove this whole block in v9
}

private record UnregisterChildTransportResponseHandler<T extends TransportResponse> (
Releasable unregisterChildNode,
TransportResponseHandler<T> handler,
String action
) implements TransportResponseHandler<T> {

@Override
public void handleResponse(T response) {
unregisterChildNode.close();
handler.handleResponse(response);
}

@Override
public void handleException(TransportException exp) {
unregisterChildNode.close();
handler.handleException(exp);
}

@Override
public String executor() {
return handler.executor();
}

@Override
public T read(StreamInput in) throws IOException {
return handler.read(in);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskAwareRequest;
import org.elasticsearch.tasks.TaskId;
Expand All @@ -26,6 +27,7 @@

import java.util.Collections;
import java.util.List;
import java.util.Map;

import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsString;
Expand Down Expand Up @@ -127,6 +129,11 @@ public void setParentTask(TaskId taskId) {
public TaskId getParentTask() {
return TaskId.EMPTY_TASK_ID;
}

@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new CancellableTask(id, type, action, "", parentTaskId, headers);
}
});

transportService.sendChildRequest(
Expand Down

0 comments on commit 6a60888

Please sign in to comment.