Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transport: shortcut local execution #10350

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -20,7 +20,6 @@
package org.elasticsearch.cluster.action.index;

import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNodes;
Expand All @@ -32,7 +31,6 @@
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.env.ShardLock;
import org.elasticsearch.index.Index;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -76,44 +74,23 @@ public void remove(Listener listener) {

public void nodeIndexDeleted(final ClusterState clusterState, final String index, final Settings indexSettings, final String nodeId) throws ElasticsearchException {
final DiscoveryNodes nodes = clusterState.nodes();
if (nodes.localNodeMaster()) {
threadPool.generic().execute(new AbstractRunnable() {

@Override
public void onFailure(Throwable t) {
logger.warn("[{}]failed to ack index store deleted for index", t, index);
}

@Override
protected void doRun() throws Exception {
innerNodeIndexDeleted(index, nodeId);
if (nodes.localNode().isDataNode() == false) {
logger.trace("[{}] not acking store deletion (not a data node)");
return;
}
lockIndexAndAck(index, nodes, nodeId, clusterState, indexSettings);

}
});
} else {
transportService.sendRequest(clusterState.nodes().masterNode(),
INDEX_DELETED_ACTION_NAME, new NodeIndexDeletedMessage(index, nodeId), EmptyTransportResponseHandler.INSTANCE_SAME);
if (nodes.localNode().isDataNode() == false) {
logger.trace("[{}] not acking store deletion (not a data node)");
return;
transportService.sendRequest(clusterState.nodes().masterNode(),
INDEX_DELETED_ACTION_NAME, new NodeIndexDeletedMessage(index, nodeId), EmptyTransportResponseHandler.INSTANCE_SAME);
if (nodes.localNode().isDataNode() == false) {
logger.trace("[{}] not acking store deletion (not a data node)");
return;
}
threadPool.generic().execute(new AbstractRunnable() {
@Override
public void onFailure(Throwable t) {
logger.warn("[{}]failed to ack index store deleted for index", t, index);
}
threadPool.generic().execute(new AbstractRunnable() {
@Override
public void onFailure(Throwable t) {
logger.warn("[{}]failed to ack index store deleted for index", t, index);
}

@Override
protected void doRun() throws Exception {
lockIndexAndAck(index, nodes, nodeId, clusterState, indexSettings);
}
});
}

@Override
protected void doRun() throws Exception {
lockIndexAndAck(index, nodes, nodeId, clusterState, indexSettings);
}
});
}

private void lockIndexAndAck(String index, DiscoveryNodes nodes, String nodeId, ClusterState clusterState, Settings indexSettings) throws IOException {
Expand All @@ -123,30 +100,14 @@ private void lockIndexAndAck(String index, DiscoveryNodes nodes, String nodeId,
// due to a "currently canceled recovery" or so. The shard will delete itself BEFORE the lock is released so it's guaranteed to be
// deleted by the time we get the lock
indicesService.processPendingDeletes(new Index(index), indexSettings, new TimeValue(30, TimeUnit.MINUTES));
if (nodes.localNodeMaster()) {
innerNodeIndexStoreDeleted(index, nodeId);
} else {
transportService.sendRequest(clusterState.nodes().masterNode(),
INDEX_STORE_DELETED_ACTION_NAME, new NodeIndexStoreDeletedMessage(index, nodeId), EmptyTransportResponseHandler.INSTANCE_SAME);
}
transportService.sendRequest(clusterState.nodes().masterNode(),
INDEX_STORE_DELETED_ACTION_NAME, new NodeIndexStoreDeletedMessage(index, nodeId), EmptyTransportResponseHandler.INSTANCE_SAME);
} catch (LockObtainFailedException exc) {
logger.warn("[{}] failed to lock all shards for index - timed out after 30 seconds", index);
}
}

private void innerNodeIndexDeleted(String index, String nodeId) {
for (Listener listener : listeners) {
listener.onNodeIndexDeleted(index, nodeId);
}
}

private void innerNodeIndexStoreDeleted(String index, String nodeId) {
for (Listener listener : listeners) {
listener.onNodeIndexStoreDeleted(index, nodeId);
}
}

public static interface Listener {
public interface Listener {
void onNodeIndexDeleted(String index, String nodeId);

void onNodeIndexStoreDeleted(String index, String nodeId);
Expand All @@ -161,7 +122,9 @@ public NodeIndexDeletedMessage newInstance() {

@Override
public void messageReceived(NodeIndexDeletedMessage message, TransportChannel channel) throws Exception {
innerNodeIndexDeleted(message.index, message.nodeId);
for (Listener listener : listeners) {
listener.onNodeIndexDeleted(message.index, message.nodeId);
}
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}

Expand All @@ -180,7 +143,9 @@ public NodeIndexStoreDeletedMessage newInstance() {

@Override
public void messageReceived(NodeIndexStoreDeletedMessage message, TransportChannel channel) throws Exception {
innerNodeIndexStoreDeleted(message.index, message.nodeId);
for (Listener listener : listeners) {
listener.onNodeIndexStoreDeleted(message.index, message.nodeId);
}
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}

Expand Down
Expand Up @@ -61,17 +61,7 @@ public void nodeMappingRefresh(final ClusterState state, final NodeMappingRefres
logger.warn("can't send mapping refresh for [{}][{}], no master known.", request.index(), Strings.arrayToCommaDelimitedString(request.types()));
return;
}

if (nodes.localNodeMaster()) {
innerMappingRefresh(request);
} else {
transportService.sendRequest(nodes.masterNode(),
ACTION_NAME, request, EmptyTransportResponseHandler.INSTANCE_SAME);
}
}

private void innerMappingRefresh(NodeMappingRefreshRequest request) {
metaDataMappingService.refreshMapping(request.index(), request.indexUUID(), request.types());
transportService.sendRequest(nodes.masterNode(), ACTION_NAME, request, EmptyTransportResponseHandler.INSTANCE_SAME);
}

private class NodeMappingRefreshTransportHandler extends BaseTransportRequestHandler<NodeMappingRefreshRequest> {
Expand All @@ -83,7 +73,7 @@ public NodeMappingRefreshRequest newInstance() {

@Override
public void messageReceived(NodeMappingRefreshRequest request, TransportChannel channel) throws Exception {
innerMappingRefresh(request);
metaDataMappingService.refreshMapping(request.index(), request.indexUUID(), request.types());
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}

Expand Down
Expand Up @@ -92,17 +92,13 @@ public void resendShardFailed(final ShardRouting shardRouting, final String inde

private void innerShardFailed(final ShardRouting shardRouting, final String indexUUID, final String reason, final DiscoveryNode masterNode) {
ShardRoutingEntry shardRoutingEntry = new ShardRoutingEntry(shardRouting, indexUUID, reason);
if (clusterService.localNode().equals(masterNode)) {
innerShardFailed(shardRoutingEntry);
} else {
transportService.sendRequest(masterNode,
SHARD_FAILED_ACTION_NAME, shardRoutingEntry, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleException(TransportException exp) {
logger.warn("failed to send failed shard to {}", exp, masterNode);
}
});
}
transportService.sendRequest(masterNode,
SHARD_FAILED_ACTION_NAME, shardRoutingEntry, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleException(TransportException exp) {
logger.warn("failed to send failed shard to {}", exp, masterNode);
}
});
}

public void shardStarted(final ShardRouting shardRouting, String indexUUID, final String reason) throws ElasticsearchException {
Expand All @@ -120,20 +116,17 @@ public void shardStarted(final ShardRouting shardRouting, String indexUUID, fina

logger.debug("sending shard started for {}", shardRoutingEntry);

if (clusterService.localNode().equals(masterNode)) {
innerShardStarted(shardRoutingEntry);
} else {
transportService.sendRequest(masterNode,
SHARD_STARTED_ACTION_NAME, new ShardRoutingEntry(shardRouting, indexUUID, reason), new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleException(TransportException exp) {
logger.warn("failed to send shard started to [{}]", exp, masterNode);
}
});
}
transportService.sendRequest(masterNode,
SHARD_STARTED_ACTION_NAME, new ShardRoutingEntry(shardRouting, indexUUID, reason), new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleException(TransportException exp) {
logger.warn("failed to send shard started to [{}]", exp, masterNode);
}

});
}

private void innerShardFailed(final ShardRoutingEntry shardRoutingEntry) {
private void handleShardFailureOnMaster(final ShardRoutingEntry shardRoutingEntry) {
logger.warn("{} received shard failed for {}", shardRoutingEntry.shardRouting.shardId(), shardRoutingEntry);
failedShardQueue.add(shardRoutingEntry);
clusterService.submitStateUpdateTask("shard-failed (" + shardRoutingEntry.shardRouting + "), reason [" + shardRoutingEntry.reason + "]", Priority.HIGH, new ProcessedClusterStateUpdateTask() {
Expand Down Expand Up @@ -196,7 +189,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
});
}

private void innerShardStarted(final ShardRoutingEntry shardRoutingEntry) {
private void shardStartedOnMaster(final ShardRoutingEntry shardRoutingEntry) {
logger.debug("received shard started for {}", shardRoutingEntry);
// buffer shard started requests, and the state update tasks will simply drain it
// this is to optimize the number of "started" events we generate, and batch them
Expand Down Expand Up @@ -303,7 +296,7 @@ public ShardRoutingEntry newInstance() {

@Override
public void messageReceived(ShardRoutingEntry request, TransportChannel channel) throws Exception {
innerShardFailed(request);
handleShardFailureOnMaster(request);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}

Expand All @@ -322,7 +315,7 @@ public ShardRoutingEntry newInstance() {

@Override
public void messageReceived(ShardRoutingEntry request, TransportChannel channel) throws Exception {
innerShardStarted(request);
shardStartedOnMaster(request);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}

Expand Down
Expand Up @@ -156,7 +156,7 @@ protected void doStart() throws ElasticsearchException {
DiscoveryNode localNode = new DiscoveryNode(settings.get("name"), nodeId, transportService.boundAddress().publishAddress(), nodeAttributes, version);
DiscoveryNodes.Builder nodeBuilder = DiscoveryNodes.builder().put(localNode).localNodeId(localNode.id());
this.clusterState = ClusterState.builder(clusterState).nodes(nodeBuilder).blocks(initialBlocks).build();

this.transportService.setLocalNode(localNode);
}

@Override
Expand Down
14 changes: 3 additions & 11 deletions src/main/java/org/elasticsearch/snapshots/RestoreService.java
Expand Up @@ -428,12 +428,8 @@ public void indexShardRestoreCompleted(SnapshotId snapshotId, ShardId shardId) {
logger.trace("[{}] successfully restored shard [{}]", snapshotId, shardId);
UpdateIndexShardRestoreStatusRequest request = new UpdateIndexShardRestoreStatusRequest(snapshotId, shardId,
new ShardRestoreStatus(clusterService.state().nodes().localNodeId(), RestoreMetaData.State.SUCCESS));
if (clusterService.state().nodes().localNodeMaster()) {
innerUpdateRestoreState(request);
} else {
transportService.sendRequest(clusterService.state().nodes().masterNode(),
UPDATE_RESTORE_ACTION_NAME, request, EmptyTransportResponseHandler.INSTANCE_SAME);
}
}

public final static class RestoreCompletionResponse {
Expand All @@ -459,7 +455,7 @@ public RestoreInfo getRestoreInfo() {
*
* @param request update shard status request
*/
private void innerUpdateRestoreState(final UpdateIndexShardRestoreStatusRequest request) {
private void updateRestoreStateOnMaster(final UpdateIndexShardRestoreStatusRequest request) {
clusterService.submitStateUpdateTask("update snapshot state", new ProcessedClusterStateUpdateTask() {

private RestoreInfo restoreInfo = null;
Expand Down Expand Up @@ -654,7 +650,7 @@ private void processDeletedIndices(ClusterChangedEvent event) {
if (shardsToFail != null) {
for (ShardId shardId : shardsToFail) {
logger.trace("[{}] failing running shard restore [{}]", entry.snapshotId(), shardId);
innerUpdateRestoreState(new UpdateIndexShardRestoreStatusRequest(entry.snapshotId(), shardId, new ShardRestoreStatus(null, RestoreMetaData.State.FAILURE, "index was deleted")));
updateRestoreStateOnMaster(new UpdateIndexShardRestoreStatusRequest(entry.snapshotId(), shardId, new ShardRestoreStatus(null, RestoreMetaData.State.FAILURE, "index was deleted")));
}
}
}
Expand All @@ -668,12 +664,8 @@ public void failRestore(SnapshotId snapshotId, ShardId shardId) {
logger.debug("[{}] failed to restore shard [{}]", snapshotId, shardId);
UpdateIndexShardRestoreStatusRequest request = new UpdateIndexShardRestoreStatusRequest(snapshotId, shardId,
new ShardRestoreStatus(clusterService.state().nodes().localNodeId(), RestoreMetaData.State.FAILURE));
if (clusterService.state().nodes().localNodeMaster()) {
innerUpdateRestoreState(request);
} else {
transportService.sendRequest(clusterService.state().nodes().masterNode(),
UPDATE_RESTORE_ACTION_NAME, request, EmptyTransportResponseHandler.INSTANCE_SAME);
}
}

private boolean failed(Snapshot snapshot, String index) {
Expand Down Expand Up @@ -998,7 +990,7 @@ public UpdateIndexShardRestoreStatusRequest newInstance() {

@Override
public void messageReceived(UpdateIndexShardRestoreStatusRequest request, final TransportChannel channel) throws Exception {
innerUpdateRestoreState(request);
updateRestoreStateOnMaster(request);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}

Expand Down