diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java index 5328f5ac5f1e8..2f325581dbd1a 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java @@ -10,6 +10,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.RateLimiter; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; @@ -243,8 +244,19 @@ private void executeRetryableAction(String action, @Override public void tryAction(ActionListener listener) { - transportService.sendRequest(targetNode, action, request, options, - new ActionListenerResponseHandler<>(listener, reader, ThreadPool.Names.GENERIC)); + if (request.tryIncRef()) { + transportService.sendRequest( + targetNode, + action, + request, + options, + new ActionListenerResponseHandler<>( + ActionListener.runBefore(listener, request::decRef), + reader, + ThreadPool.Names.GENERIC)); + } else { + listener.onFailure(new AlreadyClosedException("already closed")); + } } @Override diff --git a/server/src/main/java/org/elasticsearch/transport/BytesTransportRequest.java b/server/src/main/java/org/elasticsearch/transport/BytesTransportRequest.java index c07d97b362538..b650659e63680 100644 --- a/server/src/main/java/org/elasticsearch/transport/BytesTransportRequest.java +++ b/server/src/main/java/org/elasticsearch/transport/BytesTransportRequest.java @@ -67,7 +67,7 @@ public void incRef() { @Override public boolean tryIncRef() { - return bytes.decRef(); + return bytes.tryIncRef(); } @Override diff --git a/server/src/main/java/org/elasticsearch/transport/OutboundHandler.java b/server/src/main/java/org/elasticsearch/transport/OutboundHandler.java index a1b829ce5ac21..c704bff65f416 100644 --- a/server/src/main/java/org/elasticsearch/transport/OutboundHandler.java +++ b/server/src/main/java/org/elasticsearch/transport/OutboundHandler.java @@ -11,6 +11,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.NotifyOnceListener; @@ -81,8 +82,17 @@ void sendRequest(final DiscoveryNode node, final TcpChannel channel, final long Version version = Version.min(this.version, channelVersion); OutboundMessage.Request message = new OutboundMessage.Request(threadPool.getThreadContext(), features, request, version, action, requestId, isHandshake, compressRequest); - ActionListener listener = ActionListener.wrap(() -> - messageListener.onRequestSent(node, requestId, action, request, options)); + if (request.tryIncRef() == false) { + assert false : "request [" + request + "] has been released already"; + throw new AlreadyClosedException("request [" + request + "] has been released already"); + } + ActionListener listener = ActionListener.wrap(() -> { + try { + messageListener.onRequestSent(node, requestId, action, request, options); + } finally { + request.decRef(); + } + }); sendMessage(channel, message, listener); }