Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.RateLimiter;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
Expand Down Expand Up @@ -243,8 +244,19 @@ private <T extends TransportResponse> void executeRetryableAction(String action,

@Override
public void tryAction(ActionListener<T> listener) {
transportService.sendRequest(targetNode, action, request, options,
new ActionListenerResponseHandler<>(listener, reader, ThreadPool.Names.GENERIC));
if (request.tryIncRef()) {
transportService.sendRequest(
targetNode,
action,
request,
options,
new ActionListenerResponseHandler<>(
ActionListener.runBefore(listener, request::decRef),
reader,
ThreadPool.Names.GENERIC));
} else {
listener.onFailure(new AlreadyClosedException("already closed"));
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public void incRef() {

@Override
public boolean tryIncRef() {
return bytes.decRef();
return bytes.tryIncRef();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.NotifyOnceListener;
Expand Down Expand Up @@ -81,8 +82,17 @@ void sendRequest(final DiscoveryNode node, final TcpChannel channel, final long
Version version = Version.min(this.version, channelVersion);
OutboundMessage.Request message = new OutboundMessage.Request(threadPool.getThreadContext(), features, request, version, action,
requestId, isHandshake, compressRequest);
ActionListener<Void> listener = ActionListener.wrap(() ->
messageListener.onRequestSent(node, requestId, action, request, options));
if (request.tryIncRef() == false) {
assert false : "request [" + request + "] has been released already";
throw new AlreadyClosedException("request [" + request + "] has been released already");
}
ActionListener<Void> listener = ActionListener.wrap(() -> {
try {
messageListener.onRequestSent(node, requestId, action, request, options);
} finally {
request.decRef();
}
});
sendMessage(channel, message, listener);
}

Expand Down