Skip to content

Commit

Permalink
Transport: shortcut local execution
Browse files Browse the repository at this point in the history
In several places in the code we need to notify a node it needs to do something (typically the master). When that node is the local node, we have an optimization in serveral places that runs the execution code immediately instead of sending the request through the wire to itself. This is a shame as we need to implement the same pattern again and again. On top of that we may forget (see note bellow) to do so and we might have to write some craft if the code need to run under another thread pool.

This commit folds the optimization in the TrasnportService, shortcutting wire serliazition if the target node is local.

Note: this was discovered by elastic#10247 which tries to import a dangling index quickly after the cluster forms. When sending an import dangling request to master, the code didn't take into account that fact that the local node may master. If this happens quickly enough, one would get a NodeNotConnected exception causing the dangling indices not to be imported. This will succeed after 10s where InternalClusterService.ReconnectToNodes runs and actively connects the local node to itself (which is not needed), potentially after another cluster state update.

Closes elastic#10350
  • Loading branch information
bleskes committed Apr 8, 2015
1 parent 4d33776 commit d8bb760
Show file tree
Hide file tree
Showing 7 changed files with 288 additions and 120 deletions.
Expand Up @@ -20,7 +20,6 @@
package org.elasticsearch.cluster.action.index;

import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNodes;
Expand All @@ -32,7 +31,6 @@
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.env.ShardLock;
import org.elasticsearch.index.Index;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -76,44 +74,23 @@ public void remove(Listener listener) {

public void nodeIndexDeleted(final ClusterState clusterState, final String index, final Settings indexSettings, final String nodeId) throws ElasticsearchException {
final DiscoveryNodes nodes = clusterState.nodes();
if (nodes.localNodeMaster()) {
threadPool.generic().execute(new AbstractRunnable() {

@Override
public void onFailure(Throwable t) {
logger.warn("[{}]failed to ack index store deleted for index", t, index);
}

@Override
protected void doRun() throws Exception {
innerNodeIndexDeleted(index, nodeId);
if (nodes.localNode().isDataNode() == false) {
logger.trace("[{}] not acking store deletion (not a data node)");
return;
}
lockIndexAndAck(index, nodes, nodeId, clusterState, indexSettings);

}
});
} else {
transportService.sendRequest(clusterState.nodes().masterNode(),
INDEX_DELETED_ACTION_NAME, new NodeIndexDeletedMessage(index, nodeId), EmptyTransportResponseHandler.INSTANCE_SAME);
if (nodes.localNode().isDataNode() == false) {
logger.trace("[{}] not acking store deletion (not a data node)");
return;
transportService.sendRequest(clusterState.nodes().masterNode(),
INDEX_DELETED_ACTION_NAME, new NodeIndexDeletedMessage(index, nodeId), EmptyTransportResponseHandler.INSTANCE_SAME);
if (nodes.localNode().isDataNode() == false) {
logger.trace("[{}] not acking store deletion (not a data node)");
return;
}
threadPool.generic().execute(new AbstractRunnable() {
@Override
public void onFailure(Throwable t) {
logger.warn("[{}]failed to ack index store deleted for index", t, index);
}
threadPool.generic().execute(new AbstractRunnable() {
@Override
public void onFailure(Throwable t) {
logger.warn("[{}]failed to ack index store deleted for index", t, index);
}

@Override
protected void doRun() throws Exception {
lockIndexAndAck(index, nodes, nodeId, clusterState, indexSettings);
}
});
}

@Override
protected void doRun() throws Exception {
lockIndexAndAck(index, nodes, nodeId, clusterState, indexSettings);
}
});
}

private void lockIndexAndAck(String index, DiscoveryNodes nodes, String nodeId, ClusterState clusterState, Settings indexSettings) throws IOException {
Expand All @@ -123,30 +100,14 @@ private void lockIndexAndAck(String index, DiscoveryNodes nodes, String nodeId,
// due to a "currently canceled recovery" or so. The shard will delete itself BEFORE the lock is released so it's guaranteed to be
// deleted by the time we get the lock
indicesService.processPendingDeletes(new Index(index), indexSettings, new TimeValue(30, TimeUnit.MINUTES));
if (nodes.localNodeMaster()) {
innerNodeIndexStoreDeleted(index, nodeId);
} else {
transportService.sendRequest(clusterState.nodes().masterNode(),
INDEX_STORE_DELETED_ACTION_NAME, new NodeIndexStoreDeletedMessage(index, nodeId), EmptyTransportResponseHandler.INSTANCE_SAME);
}
transportService.sendRequest(clusterState.nodes().masterNode(),
INDEX_STORE_DELETED_ACTION_NAME, new NodeIndexStoreDeletedMessage(index, nodeId), EmptyTransportResponseHandler.INSTANCE_SAME);
} catch (LockObtainFailedException exc) {
logger.warn("[{}] failed to lock all shards for index - timed out after 30 seconds", index);
}
}

private void innerNodeIndexDeleted(String index, String nodeId) {
for (Listener listener : listeners) {
listener.onNodeIndexDeleted(index, nodeId);
}
}

private void innerNodeIndexStoreDeleted(String index, String nodeId) {
for (Listener listener : listeners) {
listener.onNodeIndexStoreDeleted(index, nodeId);
}
}

public static interface Listener {
public interface Listener {
void onNodeIndexDeleted(String index, String nodeId);

void onNodeIndexStoreDeleted(String index, String nodeId);
Expand All @@ -161,7 +122,9 @@ public NodeIndexDeletedMessage newInstance() {

@Override
public void messageReceived(NodeIndexDeletedMessage message, TransportChannel channel) throws Exception {
innerNodeIndexDeleted(message.index, message.nodeId);
for (Listener listener : listeners) {
listener.onNodeIndexDeleted(message.index, message.nodeId);
}
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}

Expand All @@ -180,7 +143,9 @@ public NodeIndexStoreDeletedMessage newInstance() {

@Override
public void messageReceived(NodeIndexStoreDeletedMessage message, TransportChannel channel) throws Exception {
innerNodeIndexStoreDeleted(message.index, message.nodeId);
for (Listener listener : listeners) {
listener.onNodeIndexStoreDeleted(message.index, message.nodeId);
}
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}

Expand Down
Expand Up @@ -61,17 +61,7 @@ public void nodeMappingRefresh(final ClusterState state, final NodeMappingRefres
logger.warn("can't send mapping refresh for [{}][{}], no master known.", request.index(), Strings.arrayToCommaDelimitedString(request.types()));
return;
}

if (nodes.localNodeMaster()) {
innerMappingRefresh(request);
} else {
transportService.sendRequest(nodes.masterNode(),
ACTION_NAME, request, EmptyTransportResponseHandler.INSTANCE_SAME);
}
}

private void innerMappingRefresh(NodeMappingRefreshRequest request) {
metaDataMappingService.refreshMapping(request.index(), request.indexUUID(), request.types());
transportService.sendRequest(nodes.masterNode(), ACTION_NAME, request, EmptyTransportResponseHandler.INSTANCE_SAME);
}

private class NodeMappingRefreshTransportHandler extends BaseTransportRequestHandler<NodeMappingRefreshRequest> {
Expand All @@ -83,7 +73,7 @@ public NodeMappingRefreshRequest newInstance() {

@Override
public void messageReceived(NodeMappingRefreshRequest request, TransportChannel channel) throws Exception {
innerMappingRefresh(request);
metaDataMappingService.refreshMapping(request.index(), request.indexUUID(), request.types());
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}

Expand Down
Expand Up @@ -92,17 +92,13 @@ public void resendShardFailed(final ShardRouting shardRouting, final String inde

private void innerShardFailed(final ShardRouting shardRouting, final String indexUUID, final String reason, final DiscoveryNode masterNode) {
ShardRoutingEntry shardRoutingEntry = new ShardRoutingEntry(shardRouting, indexUUID, reason);
if (clusterService.localNode().equals(masterNode)) {
innerShardFailed(shardRoutingEntry);
} else {
transportService.sendRequest(masterNode,
SHARD_FAILED_ACTION_NAME, shardRoutingEntry, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleException(TransportException exp) {
logger.warn("failed to send failed shard to {}", exp, masterNode);
}
});
}
transportService.sendRequest(masterNode,
SHARD_FAILED_ACTION_NAME, shardRoutingEntry, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleException(TransportException exp) {
logger.warn("failed to send failed shard to {}", exp, masterNode);
}
});
}

public void shardStarted(final ShardRouting shardRouting, String indexUUID, final String reason) throws ElasticsearchException {
Expand All @@ -120,20 +116,17 @@ public void shardStarted(final ShardRouting shardRouting, String indexUUID, fina

logger.debug("sending shard started for {}", shardRoutingEntry);

if (clusterService.localNode().equals(masterNode)) {
innerShardStarted(shardRoutingEntry);
} else {
transportService.sendRequest(masterNode,
SHARD_STARTED_ACTION_NAME, new ShardRoutingEntry(shardRouting, indexUUID, reason), new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleException(TransportException exp) {
logger.warn("failed to send shard started to [{}]", exp, masterNode);
}
});
}
transportService.sendRequest(masterNode,
SHARD_STARTED_ACTION_NAME, new ShardRoutingEntry(shardRouting, indexUUID, reason), new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleException(TransportException exp) {
logger.warn("failed to send shard started to [{}]", exp, masterNode);
}

});
}

private void innerShardFailed(final ShardRoutingEntry shardRoutingEntry) {
private void handleShardFailureOnMaster(final ShardRoutingEntry shardRoutingEntry) {
logger.warn("{} received shard failed for {}", shardRoutingEntry.shardRouting.shardId(), shardRoutingEntry);
failedShardQueue.add(shardRoutingEntry);
clusterService.submitStateUpdateTask("shard-failed (" + shardRoutingEntry.shardRouting + "), reason [" + shardRoutingEntry.reason + "]", Priority.HIGH, new ProcessedClusterStateUpdateTask() {
Expand Down Expand Up @@ -196,7 +189,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
});
}

private void innerShardStarted(final ShardRoutingEntry shardRoutingEntry) {
private void shardStartedOnMaster(final ShardRoutingEntry shardRoutingEntry) {
logger.debug("received shard started for {}", shardRoutingEntry);
// buffer shard started requests, and the state update tasks will simply drain it
// this is to optimize the number of "started" events we generate, and batch them
Expand Down Expand Up @@ -303,7 +296,7 @@ public ShardRoutingEntry newInstance() {

@Override
public void messageReceived(ShardRoutingEntry request, TransportChannel channel) throws Exception {
innerShardFailed(request);
handleShardFailureOnMaster(request);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}

Expand All @@ -322,7 +315,7 @@ public ShardRoutingEntry newInstance() {

@Override
public void messageReceived(ShardRoutingEntry request, TransportChannel channel) throws Exception {
innerShardStarted(request);
shardStartedOnMaster(request);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}

Expand Down
Expand Up @@ -155,7 +155,7 @@ protected void doStart() throws ElasticsearchException {
DiscoveryNode localNode = new DiscoveryNode(settings.get("name"), nodeId, transportService.boundAddress().publishAddress(), nodeAttributes, version);
DiscoveryNodes.Builder nodeBuilder = DiscoveryNodes.builder().put(localNode).localNodeId(localNode.id());
this.clusterState = ClusterState.builder(clusterState).nodes(nodeBuilder).blocks(initialBlocks).build();

this.transportService.setLocalNode(localNode);
}

@Override
Expand Down
14 changes: 3 additions & 11 deletions src/main/java/org/elasticsearch/snapshots/RestoreService.java
Expand Up @@ -429,12 +429,8 @@ public void indexShardRestoreCompleted(SnapshotId snapshotId, ShardId shardId) {
logger.trace("[{}] successfully restored shard [{}]", snapshotId, shardId);
UpdateIndexShardRestoreStatusRequest request = new UpdateIndexShardRestoreStatusRequest(snapshotId, shardId,
new ShardRestoreStatus(clusterService.state().nodes().localNodeId(), RestoreMetaData.State.SUCCESS));
if (clusterService.state().nodes().localNodeMaster()) {
innerUpdateRestoreState(request);
} else {
transportService.sendRequest(clusterService.state().nodes().masterNode(),
UPDATE_RESTORE_ACTION_NAME, request, EmptyTransportResponseHandler.INSTANCE_SAME);
}
}

public final static class RestoreCompletionResponse {
Expand All @@ -460,7 +456,7 @@ public RestoreInfo getRestoreInfo() {
*
* @param request update shard status request
*/
private void innerUpdateRestoreState(final UpdateIndexShardRestoreStatusRequest request) {
private void updateRestoreStateOnMaster(final UpdateIndexShardRestoreStatusRequest request) {
clusterService.submitStateUpdateTask("update snapshot state", new ProcessedClusterStateUpdateTask() {

private RestoreInfo restoreInfo = null;
Expand Down Expand Up @@ -655,7 +651,7 @@ private void processDeletedIndices(ClusterChangedEvent event) {
if (shardsToFail != null) {
for (ShardId shardId : shardsToFail) {
logger.trace("[{}] failing running shard restore [{}]", entry.snapshotId(), shardId);
innerUpdateRestoreState(new UpdateIndexShardRestoreStatusRequest(entry.snapshotId(), shardId, new ShardRestoreStatus(null, RestoreMetaData.State.FAILURE, "index was deleted")));
updateRestoreStateOnMaster(new UpdateIndexShardRestoreStatusRequest(entry.snapshotId(), shardId, new ShardRestoreStatus(null, RestoreMetaData.State.FAILURE, "index was deleted")));
}
}
}
Expand All @@ -669,12 +665,8 @@ public void failRestore(SnapshotId snapshotId, ShardId shardId) {
logger.debug("[{}] failed to restore shard [{}]", snapshotId, shardId);
UpdateIndexShardRestoreStatusRequest request = new UpdateIndexShardRestoreStatusRequest(snapshotId, shardId,
new ShardRestoreStatus(clusterService.state().nodes().localNodeId(), RestoreMetaData.State.FAILURE));
if (clusterService.state().nodes().localNodeMaster()) {
innerUpdateRestoreState(request);
} else {
transportService.sendRequest(clusterService.state().nodes().masterNode(),
UPDATE_RESTORE_ACTION_NAME, request, EmptyTransportResponseHandler.INSTANCE_SAME);
}
}

private boolean failed(Snapshot snapshot, String index) {
Expand Down Expand Up @@ -999,7 +991,7 @@ public UpdateIndexShardRestoreStatusRequest newInstance() {

@Override
public void messageReceived(UpdateIndexShardRestoreStatusRequest request, final TransportChannel channel) throws Exception {
innerUpdateRestoreState(request);
updateRestoreStateOnMaster(request);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}

Expand Down

0 comments on commit d8bb760

Please sign in to comment.