diff --git a/src/main/java/org/elasticsearch/cluster/action/index/NodeIndexDeletedAction.java b/src/main/java/org/elasticsearch/cluster/action/index/NodeIndexDeletedAction.java index 6b5a150f38c4f..322035450090b 100644 --- a/src/main/java/org/elasticsearch/cluster/action/index/NodeIndexDeletedAction.java +++ b/src/main/java/org/elasticsearch/cluster/action/index/NodeIndexDeletedAction.java @@ -20,7 +20,6 @@ package org.elasticsearch.cluster.action.index; import org.apache.lucene.store.LockObtainFailedException; -import org.apache.lucene.util.IOUtils; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNodes; @@ -32,7 +31,6 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.env.NodeEnvironment; -import org.elasticsearch.env.ShardLock; import org.elasticsearch.index.Index; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.threadpool.ThreadPool; @@ -76,44 +74,23 @@ public void remove(Listener listener) { public void nodeIndexDeleted(final ClusterState clusterState, final String index, final Settings indexSettings, final String nodeId) throws ElasticsearchException { final DiscoveryNodes nodes = clusterState.nodes(); - if (nodes.localNodeMaster()) { - threadPool.generic().execute(new AbstractRunnable() { - - @Override - public void onFailure(Throwable t) { - logger.warn("[{}]failed to ack index store deleted for index", t, index); - } - - @Override - protected void doRun() throws Exception { - innerNodeIndexDeleted(index, nodeId); - if (nodes.localNode().isDataNode() == false) { - logger.trace("[{}] not acking store deletion (not a data node)"); - return; - } - lockIndexAndAck(index, nodes, nodeId, clusterState, indexSettings); - - } - }); - } else { - transportService.sendRequest(clusterState.nodes().masterNode(), - INDEX_DELETED_ACTION_NAME, new NodeIndexDeletedMessage(index, nodeId), EmptyTransportResponseHandler.INSTANCE_SAME); - if (nodes.localNode().isDataNode() == false) { - logger.trace("[{}] not acking store deletion (not a data node)"); - return; + transportService.sendRequest(clusterState.nodes().masterNode(), + INDEX_DELETED_ACTION_NAME, new NodeIndexDeletedMessage(index, nodeId), EmptyTransportResponseHandler.INSTANCE_SAME); + if (nodes.localNode().isDataNode() == false) { + logger.trace("[{}] not acking store deletion (not a data node)"); + return; + } + threadPool.generic().execute(new AbstractRunnable() { + @Override + public void onFailure(Throwable t) { + logger.warn("[{}]failed to ack index store deleted for index", t, index); } - threadPool.generic().execute(new AbstractRunnable() { - @Override - public void onFailure(Throwable t) { - logger.warn("[{}]failed to ack index store deleted for index", t, index); - } - - @Override - protected void doRun() throws Exception { - lockIndexAndAck(index, nodes, nodeId, clusterState, indexSettings); - } - }); - } + + @Override + protected void doRun() throws Exception { + lockIndexAndAck(index, nodes, nodeId, clusterState, indexSettings); + } + }); } private void lockIndexAndAck(String index, DiscoveryNodes nodes, String nodeId, ClusterState clusterState, Settings indexSettings) throws IOException { @@ -123,30 +100,14 @@ private void lockIndexAndAck(String index, DiscoveryNodes nodes, String nodeId, // due to a "currently canceled recovery" or so. The shard will delete itself BEFORE the lock is released so it's guaranteed to be // deleted by the time we get the lock indicesService.processPendingDeletes(new Index(index), indexSettings, new TimeValue(30, TimeUnit.MINUTES)); - if (nodes.localNodeMaster()) { - innerNodeIndexStoreDeleted(index, nodeId); - } else { - transportService.sendRequest(clusterState.nodes().masterNode(), - INDEX_STORE_DELETED_ACTION_NAME, new NodeIndexStoreDeletedMessage(index, nodeId), EmptyTransportResponseHandler.INSTANCE_SAME); - } + transportService.sendRequest(clusterState.nodes().masterNode(), + INDEX_STORE_DELETED_ACTION_NAME, new NodeIndexStoreDeletedMessage(index, nodeId), EmptyTransportResponseHandler.INSTANCE_SAME); } catch (LockObtainFailedException exc) { logger.warn("[{}] failed to lock all shards for index - timed out after 30 seconds", index); } } - private void innerNodeIndexDeleted(String index, String nodeId) { - for (Listener listener : listeners) { - listener.onNodeIndexDeleted(index, nodeId); - } - } - - private void innerNodeIndexStoreDeleted(String index, String nodeId) { - for (Listener listener : listeners) { - listener.onNodeIndexStoreDeleted(index, nodeId); - } - } - - public static interface Listener { + public interface Listener { void onNodeIndexDeleted(String index, String nodeId); void onNodeIndexStoreDeleted(String index, String nodeId); @@ -161,7 +122,9 @@ public NodeIndexDeletedMessage newInstance() { @Override public void messageReceived(NodeIndexDeletedMessage message, TransportChannel channel) throws Exception { - innerNodeIndexDeleted(message.index, message.nodeId); + for (Listener listener : listeners) { + listener.onNodeIndexDeleted(message.index, message.nodeId); + } channel.sendResponse(TransportResponse.Empty.INSTANCE); } @@ -180,7 +143,9 @@ public NodeIndexStoreDeletedMessage newInstance() { @Override public void messageReceived(NodeIndexStoreDeletedMessage message, TransportChannel channel) throws Exception { - innerNodeIndexStoreDeleted(message.index, message.nodeId); + for (Listener listener : listeners) { + listener.onNodeIndexStoreDeleted(message.index, message.nodeId); + } channel.sendResponse(TransportResponse.Empty.INSTANCE); } diff --git a/src/main/java/org/elasticsearch/cluster/action/index/NodeMappingRefreshAction.java b/src/main/java/org/elasticsearch/cluster/action/index/NodeMappingRefreshAction.java index 4a43f5b0f2df4..c14452a0feffd 100644 --- a/src/main/java/org/elasticsearch/cluster/action/index/NodeMappingRefreshAction.java +++ b/src/main/java/org/elasticsearch/cluster/action/index/NodeMappingRefreshAction.java @@ -61,17 +61,7 @@ public void nodeMappingRefresh(final ClusterState state, final NodeMappingRefres logger.warn("can't send mapping refresh for [{}][{}], no master known.", request.index(), Strings.arrayToCommaDelimitedString(request.types())); return; } - - if (nodes.localNodeMaster()) { - innerMappingRefresh(request); - } else { - transportService.sendRequest(nodes.masterNode(), - ACTION_NAME, request, EmptyTransportResponseHandler.INSTANCE_SAME); - } - } - - private void innerMappingRefresh(NodeMappingRefreshRequest request) { - metaDataMappingService.refreshMapping(request.index(), request.indexUUID(), request.types()); + transportService.sendRequest(nodes.masterNode(), ACTION_NAME, request, EmptyTransportResponseHandler.INSTANCE_SAME); } private class NodeMappingRefreshTransportHandler extends BaseTransportRequestHandler { @@ -83,7 +73,7 @@ public NodeMappingRefreshRequest newInstance() { @Override public void messageReceived(NodeMappingRefreshRequest request, TransportChannel channel) throws Exception { - innerMappingRefresh(request); + metaDataMappingService.refreshMapping(request.index(), request.indexUUID(), request.types()); channel.sendResponse(TransportResponse.Empty.INSTANCE); } diff --git a/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java b/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java index 6c26201bee298..feb72fc7078c1 100644 --- a/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java +++ b/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java @@ -92,17 +92,13 @@ public void resendShardFailed(final ShardRouting shardRouting, final String inde private void innerShardFailed(final ShardRouting shardRouting, final String indexUUID, final String reason, final DiscoveryNode masterNode) { ShardRoutingEntry shardRoutingEntry = new ShardRoutingEntry(shardRouting, indexUUID, reason); - if (clusterService.localNode().equals(masterNode)) { - innerShardFailed(shardRoutingEntry); - } else { - transportService.sendRequest(masterNode, - SHARD_FAILED_ACTION_NAME, shardRoutingEntry, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) { - @Override - public void handleException(TransportException exp) { - logger.warn("failed to send failed shard to {}", exp, masterNode); - } - }); - } + transportService.sendRequest(masterNode, + SHARD_FAILED_ACTION_NAME, shardRoutingEntry, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) { + @Override + public void handleException(TransportException exp) { + logger.warn("failed to send failed shard to {}", exp, masterNode); + } + }); } public void shardStarted(final ShardRouting shardRouting, String indexUUID, final String reason) throws ElasticsearchException { @@ -120,20 +116,17 @@ public void shardStarted(final ShardRouting shardRouting, String indexUUID, fina logger.debug("sending shard started for {}", shardRoutingEntry); - if (clusterService.localNode().equals(masterNode)) { - innerShardStarted(shardRoutingEntry); - } else { - transportService.sendRequest(masterNode, - SHARD_STARTED_ACTION_NAME, new ShardRoutingEntry(shardRouting, indexUUID, reason), new EmptyTransportResponseHandler(ThreadPool.Names.SAME) { - @Override - public void handleException(TransportException exp) { - logger.warn("failed to send shard started to [{}]", exp, masterNode); - } - }); - } + transportService.sendRequest(masterNode, + SHARD_STARTED_ACTION_NAME, new ShardRoutingEntry(shardRouting, indexUUID, reason), new EmptyTransportResponseHandler(ThreadPool.Names.SAME) { + @Override + public void handleException(TransportException exp) { + logger.warn("failed to send shard started to [{}]", exp, masterNode); + } + + }); } - private void innerShardFailed(final ShardRoutingEntry shardRoutingEntry) { + private void handleShardFailureOnMaster(final ShardRoutingEntry shardRoutingEntry) { logger.warn("{} received shard failed for {}", shardRoutingEntry.shardRouting.shardId(), shardRoutingEntry); failedShardQueue.add(shardRoutingEntry); clusterService.submitStateUpdateTask("shard-failed (" + shardRoutingEntry.shardRouting + "), reason [" + shardRoutingEntry.reason + "]", Priority.HIGH, new ProcessedClusterStateUpdateTask() { @@ -196,7 +189,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS }); } - private void innerShardStarted(final ShardRoutingEntry shardRoutingEntry) { + private void shardStartedOnMaster(final ShardRoutingEntry shardRoutingEntry) { logger.debug("received shard started for {}", shardRoutingEntry); // buffer shard started requests, and the state update tasks will simply drain it // this is to optimize the number of "started" events we generate, and batch them @@ -303,7 +296,7 @@ public ShardRoutingEntry newInstance() { @Override public void messageReceived(ShardRoutingEntry request, TransportChannel channel) throws Exception { - innerShardFailed(request); + handleShardFailureOnMaster(request); channel.sendResponse(TransportResponse.Empty.INSTANCE); } @@ -322,7 +315,7 @@ public ShardRoutingEntry newInstance() { @Override public void messageReceived(ShardRoutingEntry request, TransportChannel channel) throws Exception { - innerShardStarted(request); + shardStartedOnMaster(request); channel.sendResponse(TransportResponse.Empty.INSTANCE); } diff --git a/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java b/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java index 6eef633050470..398c4b0aa54b0 100644 --- a/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java +++ b/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java @@ -156,7 +156,7 @@ protected void doStart() throws ElasticsearchException { DiscoveryNode localNode = new DiscoveryNode(settings.get("name"), nodeId, transportService.boundAddress().publishAddress(), nodeAttributes, version); DiscoveryNodes.Builder nodeBuilder = DiscoveryNodes.builder().put(localNode).localNodeId(localNode.id()); this.clusterState = ClusterState.builder(clusterState).nodes(nodeBuilder).blocks(initialBlocks).build(); - + this.transportService.setLocalNode(localNode); } @Override diff --git a/src/main/java/org/elasticsearch/snapshots/RestoreService.java b/src/main/java/org/elasticsearch/snapshots/RestoreService.java index 7b9e99d95b50c..65eed00ac2c84 100644 --- a/src/main/java/org/elasticsearch/snapshots/RestoreService.java +++ b/src/main/java/org/elasticsearch/snapshots/RestoreService.java @@ -428,12 +428,8 @@ public void indexShardRestoreCompleted(SnapshotId snapshotId, ShardId shardId) { logger.trace("[{}] successfully restored shard [{}]", snapshotId, shardId); UpdateIndexShardRestoreStatusRequest request = new UpdateIndexShardRestoreStatusRequest(snapshotId, shardId, new ShardRestoreStatus(clusterService.state().nodes().localNodeId(), RestoreMetaData.State.SUCCESS)); - if (clusterService.state().nodes().localNodeMaster()) { - innerUpdateRestoreState(request); - } else { transportService.sendRequest(clusterService.state().nodes().masterNode(), UPDATE_RESTORE_ACTION_NAME, request, EmptyTransportResponseHandler.INSTANCE_SAME); - } } public final static class RestoreCompletionResponse { @@ -459,7 +455,7 @@ public RestoreInfo getRestoreInfo() { * * @param request update shard status request */ - private void innerUpdateRestoreState(final UpdateIndexShardRestoreStatusRequest request) { + private void updateRestoreStateOnMaster(final UpdateIndexShardRestoreStatusRequest request) { clusterService.submitStateUpdateTask("update snapshot state", new ProcessedClusterStateUpdateTask() { private RestoreInfo restoreInfo = null; @@ -654,7 +650,7 @@ private void processDeletedIndices(ClusterChangedEvent event) { if (shardsToFail != null) { for (ShardId shardId : shardsToFail) { logger.trace("[{}] failing running shard restore [{}]", entry.snapshotId(), shardId); - innerUpdateRestoreState(new UpdateIndexShardRestoreStatusRequest(entry.snapshotId(), shardId, new ShardRestoreStatus(null, RestoreMetaData.State.FAILURE, "index was deleted"))); + updateRestoreStateOnMaster(new UpdateIndexShardRestoreStatusRequest(entry.snapshotId(), shardId, new ShardRestoreStatus(null, RestoreMetaData.State.FAILURE, "index was deleted"))); } } } @@ -668,12 +664,8 @@ public void failRestore(SnapshotId snapshotId, ShardId shardId) { logger.debug("[{}] failed to restore shard [{}]", snapshotId, shardId); UpdateIndexShardRestoreStatusRequest request = new UpdateIndexShardRestoreStatusRequest(snapshotId, shardId, new ShardRestoreStatus(clusterService.state().nodes().localNodeId(), RestoreMetaData.State.FAILURE)); - if (clusterService.state().nodes().localNodeMaster()) { - innerUpdateRestoreState(request); - } else { transportService.sendRequest(clusterService.state().nodes().masterNode(), UPDATE_RESTORE_ACTION_NAME, request, EmptyTransportResponseHandler.INSTANCE_SAME); - } } private boolean failed(Snapshot snapshot, String index) { @@ -998,7 +990,7 @@ public UpdateIndexShardRestoreStatusRequest newInstance() { @Override public void messageReceived(UpdateIndexShardRestoreStatusRequest request, final TransportChannel channel) throws Exception { - innerUpdateRestoreState(request); + updateRestoreStateOnMaster(request); channel.sendResponse(TransportResponse.Empty.INSTANCE); } diff --git a/src/main/java/org/elasticsearch/transport/TransportService.java b/src/main/java/org/elasticsearch/transport/TransportService.java index 968b775030e62..630c86aa19e88 100644 --- a/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/src/main/java/org/elasticsearch/transport/TransportService.java @@ -36,13 +36,11 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.BoundTransportAddress; import org.elasticsearch.common.transport.TransportAddress; -import org.elasticsearch.common.util.concurrent.ConcurrentCollections; -import org.elasticsearch.common.util.concurrent.ConcurrentMapLong; -import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; -import org.elasticsearch.common.util.concurrent.FutureUtils; +import org.elasticsearch.common.util.concurrent.*; import org.elasticsearch.node.settings.NodeSettingsService; import org.elasticsearch.threadpool.ThreadPool; +import java.io.IOException; import java.util.Arrays; import java.util.Collections; import java.util.LinkedHashMap; @@ -94,6 +92,9 @@ protected boolean removeEldestEntry(Map.Entry eldest) { volatile String[] tracelLogExclude; private final ApplySettings settingsListener = new ApplySettings(); + /** if set will call requests sent to this id to shortcut and executed locally */ + volatile String localNodeId = null; + public TransportService(Transport transport, ThreadPool threadPool) { this(EMPTY_SETTINGS, transport, threadPool); } @@ -109,6 +110,21 @@ public TransportService(Settings settings, Transport transport, ThreadPool threa adapter = createAdapter(); } + /** + * makes the transport service aware of the local node. this allows it to optimize requests sent + * from the local node to it self and by pass the network stack/ serialization + * + * @param localNode + */ + public void setLocalNode(DiscoveryNode localNode) { + localNodeId = localNode.id(); + } + + // for testing + String getLocalNodeId() { + return localNodeId; + } + protected Adapter createAdapter() { return new Adapter(); } @@ -209,18 +225,27 @@ public BoundTransportAddress boundAddress() { } public boolean nodeConnected(DiscoveryNode node) { - return transport.nodeConnected(node); + return node.id().equals(localNodeId) || transport.nodeConnected(node); } public void connectToNode(DiscoveryNode node) throws ConnectTransportException { + if (node.id().equals(localNodeId)) { + return; + } transport.connectToNode(node); } public void connectToNodeLight(DiscoveryNode node) throws ConnectTransportException { + if (node.id().equals(localNodeId)) { + return; + } transport.connectToNodeLight(node); } public void disconnectFromNode(DiscoveryNode node) { + if (node.id().equals(localNodeId)) { + return; + } transport.disconnectFromNode(node); } @@ -273,7 +298,11 @@ public void sendRequest(final DiscoveryNode node, assert options.timeout() != null; timeoutHandler.future = threadPool.schedule(options.timeout(), ThreadPool.Names.GENERIC, timeoutHandler); } - transport.sendRequest(node, requestId, action, request, options); + if (node.id().equals(localNodeId)) { + sendLocalRequest(requestId, action, request); + } else { + transport.sendRequest(node, requestId, action, request, options); + } } catch (final Throwable e) { // usually happen either because we failed to connect to the node // or because we failed serializing the message @@ -294,6 +323,53 @@ public void run() { } } + private void sendLocalRequest(long requestId, final String action, final TransportRequest request) { + final DirectResponseChannel channel = new DirectResponseChannel(action, requestId, adapter, threadPool); + try { + final TransportRequestHandler handler = adapter.handler(action); + if (handler == null) { + throw new ActionNotFoundTransportException("Action [" + action + "] not found"); + } + final String executor = handler.executor(); + if (ThreadPool.Names.SAME.equals(executor)) { + //noinspection unchecked + handler.messageReceived(request, channel); + } else { + threadPool.executor(executor).execute(new AbstractRunnable() { + @Override + protected void doRun() throws Exception { + //noinspection unchecked + handler.messageReceived(request, channel); + } + + @Override + public boolean isForceExecution() { + return handler.isForceExecution(); + } + + @Override + public void onFailure(Throwable e) { + try { + channel.sendResponse(e); + } catch (Throwable e1) { + logger.warn("failed to notify channel of error message for action [" + action + "]", e1); + logger.warn("actual exception", e); + } + } + }); + } + + } catch (Throwable e) { + try { + channel.sendResponse(e); + } catch (Throwable e1) { + logger.warn("failed to notify channel of error message for action [" + action + "]", e1); + logger.warn("actual exception", e1); + } + } + + } + private boolean shouldTraceAction(String action) { if (tracerLogInclude.length > 0) { if (Regex.simpleMatch(tracerLogInclude, action) == false) { @@ -609,4 +685,89 @@ public void cancelTimeout() { } } } + + static class DirectResponseChannel implements TransportChannel { + final private String action; + final private long requestId; + final TransportServiceAdapter adapter; + final ThreadPool threadPool; + + public DirectResponseChannel(String action, long requestId, TransportServiceAdapter adapter, ThreadPool threadPool) { + this.action = action; + this.requestId = requestId; + this.adapter = adapter; + this.threadPool = threadPool; + } + + @Override + public String action() { + return action; + } + + @Override + public void sendResponse(TransportResponse response) throws IOException { + sendResponse(response, TransportResponseOptions.EMPTY); + } + + @Override + public void sendResponse(final TransportResponse response, TransportResponseOptions options) throws IOException { + final TransportResponseHandler handler = adapter.onResponseReceived(requestId); + // ignore if its null, the adapter logs it + if (handler != null) { + final String executor = handler.executor(); + if (ThreadPool.Names.SAME.equals(executor)) { + processResponse(handler, response); + } else { + threadPool.executor(executor).execute(new Runnable() { + @SuppressWarnings({"unchecked"}) + @Override + public void run() { + processResponse(handler, response); + } + }); + } + } + } + + protected void processResponse(TransportResponseHandler handler, TransportResponse response) { + try { + handler.handleResponse(response); + } catch (Throwable e) { + handler.handleException(new ResponseHandlerFailureTransportException(e)); + } + } + + @Override + public void sendResponse(Throwable error) throws IOException { + final TransportResponseHandler handler = adapter.onResponseReceived(requestId); + // ignore if its null, the adapter logs it + if (handler != null) { + if (!(error instanceof RemoteTransportException)) { + error = new RemoteTransportException(error.getMessage(), error); + } + final RemoteTransportException rtx = (RemoteTransportException) error; + final String executor = handler.executor(); + if (ThreadPool.Names.SAME.equals(executor)) { + processException(handler, rtx); + } else { + threadPool.executor(handler.executor()).execute(new Runnable() { + @SuppressWarnings({"unchecked"}) + @Override + public void run() { + processException(handler, rtx); + } + }); + } + } + } + + protected void processException(final TransportResponseHandler handler, final RemoteTransportException rtx) { + try { + handler.handleException(rtx); + } catch (Throwable e) { + handler.handleException(new ResponseHandlerFailureTransportException(e)); + } + } + } + } \ No newline at end of file diff --git a/src/test/java/org/elasticsearch/transport/AbstractSimpleTransportTests.java b/src/test/java/org/elasticsearch/transport/AbstractSimpleTransportTests.java index 34313abaf1714..7494e9336b6b3 100644 --- a/src/test/java/org/elasticsearch/transport/AbstractSimpleTransportTests.java +++ b/src/test/java/org/elasticsearch/transport/AbstractSimpleTransportTests.java @@ -80,7 +80,8 @@ public void setUp() throws Exception { // wait till all nodes are properly connected and the event has been sent, so tests in this class // will not get this callback called on the connections done in this setup - final CountDownLatch latch = new CountDownLatch(4); + final boolean useLocalNode = randomBoolean(); + final CountDownLatch latch = new CountDownLatch(useLocalNode ? 2 : 4); TransportConnectionListener waitForConnection = new TransportConnectionListener() { @Override public void onNodeConnected(DiscoveryNode node) { @@ -95,10 +96,18 @@ public void onNodeDisconnected(DiscoveryNode node) { serviceA.addConnectionListener(waitForConnection); serviceB.addConnectionListener(waitForConnection); + if (useLocalNode) { + logger.info("--> using local node optimization"); + serviceA.setLocalNode(nodeA); + serviceB.setLocalNode(nodeB); + } else { + logger.info("--> actively connecting to local node"); + serviceA.connectToNode(nodeA); + serviceB.connectToNode(nodeB); + } + serviceA.connectToNode(nodeB); - serviceA.connectToNode(nodeA); serviceB.connectToNode(nodeA); - serviceB.connectToNode(nodeB); assertThat("failed to wait for all nodes to connect", latch.await(5, TimeUnit.SECONDS), equalTo(true)); serviceA.removeConnectionListener(waitForConnection); @@ -204,6 +213,64 @@ public void handleException(TransportException exp) { serviceA.removeHandler("sayHello"); } + @Test + public void testLocalNodeConnection() throws InterruptedException { + assertTrue("serviceA is not connected to nodeA", serviceA.nodeConnected(nodeA)); + if (((TransportService) serviceA).getLocalNodeId() != null) { + // this should be a noop + serviceA.disconnectFromNode(nodeA); + } + final AtomicReference exception = new AtomicReference<>(); + serviceA.registerHandler("localNode", new BaseTransportRequestHandler() { + @Override + public StringMessageRequest newInstance() { + return new StringMessageRequest(); + } + + @Override + public String executor() { + return ThreadPool.Names.GENERIC; + } + + @Override + public void messageReceived(StringMessageRequest request, TransportChannel channel) { + try { + channel.sendResponse(new StringMessageResponse(request.message)); + } catch (IOException e) { + exception.set(e); + } + } + }); + final AtomicReference responseString = new AtomicReference<>(); + final CountDownLatch responseLatch = new CountDownLatch(1); + serviceA.sendRequest(nodeA, "localNode", new StringMessageRequest("test"), new TransportResponseHandler() { + @Override + public StringMessageResponse newInstance() { + return new StringMessageResponse(); + } + + @Override + public void handleResponse(StringMessageResponse response) { + responseString.set(response.message); + responseLatch.countDown(); + } + + @Override + public void handleException(TransportException exp) { + exception.set(exp); + responseLatch.countDown(); + } + + @Override + public String executor() { + return ThreadPool.Names.GENERIC; + } + }); + responseLatch.await(); + assertNull(exception.get()); + assertThat(responseString.get(), equalTo("test")); + } + @Test public void testVoidMessageCompressed() { serviceA.registerHandler("sayHello", new BaseTransportRequestHandler() { @@ -367,7 +434,7 @@ public void handleException(TransportException exp) { res.txGet(); fail("exception should be thrown"); } catch (Exception e) { - assertThat("bad message !!!", equalTo(e.getCause().getMessage())); + assertThat(e.getCause().getMessage(), equalTo("bad message !!!")); } serviceA.removeHandler("sayHelloException");