Skip to content

Commit

Permalink
[TRANSPORT] never send requests after transport service is stopped
Browse files Browse the repository at this point in the history
With local transport or any transport that doesn't necessarily send
notification if connections are closed we might miss a node
disconnection and the request handler hangs forever / until the timeout
kicks in. This window only exists during shutdown and is likely
unproblematic in practice but tests might run into this problem when
local transport is used.
  • Loading branch information
s1monw committed Sep 25, 2014
1 parent 9987897 commit b10ff38
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -196,13 +196,11 @@ public void phase1(final SnapshotIndexCommit snapshot) throws ElasticsearchExcep
pool.execute(new Runnable() {
@Override
public void run() {
IndexInput indexInput = null;
store.incRef();
final StoreFileMetaData md = recoverySourceMetadata.get(name);
try {
try (final IndexInput indexInput = store.directory().openInput(name, IOContext.READONCE)) {
final int BUFFER_SIZE = (int) recoverySettings.fileChunkSize().bytes();
byte[] buf = new byte[BUFFER_SIZE];
indexInput = store.directory().openInput(name, IOContext.READONCE);
final byte[] buf = new byte[BUFFER_SIZE];
boolean shouldCompressRequest = recoverySettings.compress();
if (CompressorFactory.isCompressed(indexInput)) {
shouldCompressRequest = false;
Expand Down Expand Up @@ -249,7 +247,6 @@ public void run() {
exceptions.add(0, e); // last exceptions first
}
} finally {
IOUtils.closeWhileHandlingException(indexInput);
try {
store.decRef();
} finally {
Expand Down
13 changes: 12 additions & 1 deletion src/main/java/org/elasticsearch/transport/TransportService.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

import static org.elasticsearch.common.settings.ImmutableSettings.Builder.EMPTY_SETTINGS;
Expand All @@ -50,6 +51,7 @@
*/
public class TransportService extends AbstractLifecycleComponent<TransportService> {

private final AtomicBoolean started = new AtomicBoolean(false);
protected final Transport transport;
protected final ThreadPool threadPool;

Expand Down Expand Up @@ -92,10 +94,14 @@ protected void doStart() throws ElasticsearchException {
if (transport.boundAddress() != null && logger.isInfoEnabled()) {
logger.info("{}", transport.boundAddress());
}
boolean setStarted = started.compareAndSet(false, true);
assert setStarted : "service was already started";
}

@Override
protected void doStop() throws ElasticsearchException {
final boolean setStopped = started.compareAndSet(true, false);
assert setStopped : "service has already been stopped";
try {
transport.stop();
} finally {
Expand Down Expand Up @@ -191,11 +197,16 @@ public <T extends TransportResponse> void sendRequest(final DiscoveryNode node,
final long requestId = newRequestId();
TimeoutHandler timeoutHandler = null;
try {
clientHandlers.put(requestId, new RequestHolder<>(handler, node, action, timeoutHandler));
if (started.get() == false) {
// if we are not started the exception handling will remove the RequestHolder again and calls the handler to notify the caller.
// it will only notify if the toStop code hasn't done the work yet.
throw new TransportException("TransportService is closed stopped can't send request");
}
if (options.timeout() != null) {
timeoutHandler = new TimeoutHandler(requestId);
timeoutHandler.future = threadPool.schedule(options.timeout(), ThreadPool.Names.GENERIC, timeoutHandler);
}
clientHandlers.put(requestId, new RequestHolder<>(handler, node, action, timeoutHandler));
transport.sendRequest(node, requestId, action, request, options);
} catch (final Throwable e) {
// usually happen either because we failed to connect to the node
Expand Down

0 comments on commit b10ff38

Please sign in to comment.