Skip to content

Commit

Permalink
Revert "Transport: shortcut local execution, #10350"
Browse files Browse the repository at this point in the history
This reverts commit d8bb760.

This causes BWC issues for some plugins
  • Loading branch information
bleskes committed Apr 8, 2015
1 parent cdaee9b commit b53afb0
Show file tree
Hide file tree
Showing 7 changed files with 120 additions and 288 deletions.
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.cluster.action.index;

import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNodes;
Expand All @@ -31,6 +32,7 @@
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.env.ShardLock;
import org.elasticsearch.index.Index;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -74,23 +76,44 @@ public void remove(Listener listener) {

public void nodeIndexDeleted(final ClusterState clusterState, final String index, final Settings indexSettings, final String nodeId) throws ElasticsearchException {
final DiscoveryNodes nodes = clusterState.nodes();
transportService.sendRequest(clusterState.nodes().masterNode(),
INDEX_DELETED_ACTION_NAME, new NodeIndexDeletedMessage(index, nodeId), EmptyTransportResponseHandler.INSTANCE_SAME);
if (nodes.localNode().isDataNode() == false) {
logger.trace("[{}] not acking store deletion (not a data node)");
return;
}
threadPool.generic().execute(new AbstractRunnable() {
@Override
public void onFailure(Throwable t) {
logger.warn("[{}]failed to ack index store deleted for index", t, index);
}

@Override
protected void doRun() throws Exception {
lockIndexAndAck(index, nodes, nodeId, clusterState, indexSettings);
if (nodes.localNodeMaster()) {
threadPool.generic().execute(new AbstractRunnable() {

@Override
public void onFailure(Throwable t) {
logger.warn("[{}]failed to ack index store deleted for index", t, index);
}

@Override
protected void doRun() throws Exception {
innerNodeIndexDeleted(index, nodeId);
if (nodes.localNode().isDataNode() == false) {
logger.trace("[{}] not acking store deletion (not a data node)");
return;
}
lockIndexAndAck(index, nodes, nodeId, clusterState, indexSettings);

}
});
} else {
transportService.sendRequest(clusterState.nodes().masterNode(),
INDEX_DELETED_ACTION_NAME, new NodeIndexDeletedMessage(index, nodeId), EmptyTransportResponseHandler.INSTANCE_SAME);
if (nodes.localNode().isDataNode() == false) {
logger.trace("[{}] not acking store deletion (not a data node)");
return;
}
});
threadPool.generic().execute(new AbstractRunnable() {
@Override
public void onFailure(Throwable t) {
logger.warn("[{}]failed to ack index store deleted for index", t, index);
}

@Override
protected void doRun() throws Exception {
lockIndexAndAck(index, nodes, nodeId, clusterState, indexSettings);
}
});
}
}

private void lockIndexAndAck(String index, DiscoveryNodes nodes, String nodeId, ClusterState clusterState, Settings indexSettings) throws IOException {
Expand All @@ -100,14 +123,30 @@ private void lockIndexAndAck(String index, DiscoveryNodes nodes, String nodeId,
// due to a "currently canceled recovery" or so. The shard will delete itself BEFORE the lock is released so it's guaranteed to be
// deleted by the time we get the lock
indicesService.processPendingDeletes(new Index(index), indexSettings, new TimeValue(30, TimeUnit.MINUTES));
transportService.sendRequest(clusterState.nodes().masterNode(),
INDEX_STORE_DELETED_ACTION_NAME, new NodeIndexStoreDeletedMessage(index, nodeId), EmptyTransportResponseHandler.INSTANCE_SAME);
if (nodes.localNodeMaster()) {
innerNodeIndexStoreDeleted(index, nodeId);
} else {
transportService.sendRequest(clusterState.nodes().masterNode(),
INDEX_STORE_DELETED_ACTION_NAME, new NodeIndexStoreDeletedMessage(index, nodeId), EmptyTransportResponseHandler.INSTANCE_SAME);
}
} catch (LockObtainFailedException exc) {
logger.warn("[{}] failed to lock all shards for index - timed out after 30 seconds", index);
}
}

public interface Listener {
private void innerNodeIndexDeleted(String index, String nodeId) {
for (Listener listener : listeners) {
listener.onNodeIndexDeleted(index, nodeId);
}
}

private void innerNodeIndexStoreDeleted(String index, String nodeId) {
for (Listener listener : listeners) {
listener.onNodeIndexStoreDeleted(index, nodeId);
}
}

public static interface Listener {
void onNodeIndexDeleted(String index, String nodeId);

void onNodeIndexStoreDeleted(String index, String nodeId);
Expand All @@ -122,9 +161,7 @@ public NodeIndexDeletedMessage newInstance() {

@Override
public void messageReceived(NodeIndexDeletedMessage message, TransportChannel channel) throws Exception {
for (Listener listener : listeners) {
listener.onNodeIndexDeleted(message.index, message.nodeId);
}
innerNodeIndexDeleted(message.index, message.nodeId);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}

Expand All @@ -143,9 +180,7 @@ public NodeIndexStoreDeletedMessage newInstance() {

@Override
public void messageReceived(NodeIndexStoreDeletedMessage message, TransportChannel channel) throws Exception {
for (Listener listener : listeners) {
listener.onNodeIndexStoreDeleted(message.index, message.nodeId);
}
innerNodeIndexStoreDeleted(message.index, message.nodeId);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}

Expand Down
Expand Up @@ -61,7 +61,17 @@ public void nodeMappingRefresh(final ClusterState state, final NodeMappingRefres
logger.warn("can't send mapping refresh for [{}][{}], no master known.", request.index(), Strings.arrayToCommaDelimitedString(request.types()));
return;
}
transportService.sendRequest(nodes.masterNode(), ACTION_NAME, request, EmptyTransportResponseHandler.INSTANCE_SAME);

if (nodes.localNodeMaster()) {
innerMappingRefresh(request);
} else {
transportService.sendRequest(nodes.masterNode(),
ACTION_NAME, request, EmptyTransportResponseHandler.INSTANCE_SAME);
}
}

private void innerMappingRefresh(NodeMappingRefreshRequest request) {
metaDataMappingService.refreshMapping(request.index(), request.indexUUID(), request.types());
}

private class NodeMappingRefreshTransportHandler extends BaseTransportRequestHandler<NodeMappingRefreshRequest> {
Expand All @@ -73,7 +83,7 @@ public NodeMappingRefreshRequest newInstance() {

@Override
public void messageReceived(NodeMappingRefreshRequest request, TransportChannel channel) throws Exception {
metaDataMappingService.refreshMapping(request.index(), request.indexUUID(), request.types());
innerMappingRefresh(request);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}

Expand Down
Expand Up @@ -92,13 +92,17 @@ public void resendShardFailed(final ShardRouting shardRouting, final String inde

private void innerShardFailed(final ShardRouting shardRouting, final String indexUUID, final String reason, final DiscoveryNode masterNode) {
ShardRoutingEntry shardRoutingEntry = new ShardRoutingEntry(shardRouting, indexUUID, reason);
transportService.sendRequest(masterNode,
SHARD_FAILED_ACTION_NAME, shardRoutingEntry, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleException(TransportException exp) {
logger.warn("failed to send failed shard to {}", exp, masterNode);
}
});
if (clusterService.localNode().equals(masterNode)) {
innerShardFailed(shardRoutingEntry);
} else {
transportService.sendRequest(masterNode,
SHARD_FAILED_ACTION_NAME, shardRoutingEntry, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleException(TransportException exp) {
logger.warn("failed to send failed shard to {}", exp, masterNode);
}
});
}
}

public void shardStarted(final ShardRouting shardRouting, String indexUUID, final String reason) throws ElasticsearchException {
Expand All @@ -116,17 +120,20 @@ public void shardStarted(final ShardRouting shardRouting, String indexUUID, fina

logger.debug("sending shard started for {}", shardRoutingEntry);

transportService.sendRequest(masterNode,
SHARD_STARTED_ACTION_NAME, new ShardRoutingEntry(shardRouting, indexUUID, reason), new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleException(TransportException exp) {
logger.warn("failed to send shard started to [{}]", exp, masterNode);
}

});
if (clusterService.localNode().equals(masterNode)) {
innerShardStarted(shardRoutingEntry);
} else {
transportService.sendRequest(masterNode,
SHARD_STARTED_ACTION_NAME, new ShardRoutingEntry(shardRouting, indexUUID, reason), new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleException(TransportException exp) {
logger.warn("failed to send shard started to [{}]", exp, masterNode);
}
});
}
}

private void handleShardFailureOnMaster(final ShardRoutingEntry shardRoutingEntry) {
private void innerShardFailed(final ShardRoutingEntry shardRoutingEntry) {
logger.warn("{} received shard failed for {}", shardRoutingEntry.shardRouting.shardId(), shardRoutingEntry);
failedShardQueue.add(shardRoutingEntry);
clusterService.submitStateUpdateTask("shard-failed (" + shardRoutingEntry.shardRouting + "), reason [" + shardRoutingEntry.reason + "]", Priority.HIGH, new ProcessedClusterStateUpdateTask() {
Expand Down Expand Up @@ -189,7 +196,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
});
}

private void shardStartedOnMaster(final ShardRoutingEntry shardRoutingEntry) {
private void innerShardStarted(final ShardRoutingEntry shardRoutingEntry) {
logger.debug("received shard started for {}", shardRoutingEntry);
// buffer shard started requests, and the state update tasks will simply drain it
// this is to optimize the number of "started" events we generate, and batch them
Expand Down Expand Up @@ -296,7 +303,7 @@ public ShardRoutingEntry newInstance() {

@Override
public void messageReceived(ShardRoutingEntry request, TransportChannel channel) throws Exception {
handleShardFailureOnMaster(request);
innerShardFailed(request);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}

Expand All @@ -315,7 +322,7 @@ public ShardRoutingEntry newInstance() {

@Override
public void messageReceived(ShardRoutingEntry request, TransportChannel channel) throws Exception {
shardStartedOnMaster(request);
innerShardStarted(request);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}

Expand Down
Expand Up @@ -155,7 +155,7 @@ protected void doStart() throws ElasticsearchException {
DiscoveryNode localNode = new DiscoveryNode(settings.get("name"), nodeId, transportService.boundAddress().publishAddress(), nodeAttributes, version);
DiscoveryNodes.Builder nodeBuilder = DiscoveryNodes.builder().put(localNode).localNodeId(localNode.id());
this.clusterState = ClusterState.builder(clusterState).nodes(nodeBuilder).blocks(initialBlocks).build();
this.transportService.setLocalNode(localNode);

}

@Override
Expand Down
14 changes: 11 additions & 3 deletions src/main/java/org/elasticsearch/snapshots/RestoreService.java
Expand Up @@ -429,8 +429,12 @@ public void indexShardRestoreCompleted(SnapshotId snapshotId, ShardId shardId) {
logger.trace("[{}] successfully restored shard [{}]", snapshotId, shardId);
UpdateIndexShardRestoreStatusRequest request = new UpdateIndexShardRestoreStatusRequest(snapshotId, shardId,
new ShardRestoreStatus(clusterService.state().nodes().localNodeId(), RestoreMetaData.State.SUCCESS));
if (clusterService.state().nodes().localNodeMaster()) {
innerUpdateRestoreState(request);
} else {
transportService.sendRequest(clusterService.state().nodes().masterNode(),
UPDATE_RESTORE_ACTION_NAME, request, EmptyTransportResponseHandler.INSTANCE_SAME);
}
}

public final static class RestoreCompletionResponse {
Expand All @@ -456,7 +460,7 @@ public RestoreInfo getRestoreInfo() {
*
* @param request update shard status request
*/
private void updateRestoreStateOnMaster(final UpdateIndexShardRestoreStatusRequest request) {
private void innerUpdateRestoreState(final UpdateIndexShardRestoreStatusRequest request) {
clusterService.submitStateUpdateTask("update snapshot state", new ProcessedClusterStateUpdateTask() {

private RestoreInfo restoreInfo = null;
Expand Down Expand Up @@ -651,7 +655,7 @@ private void processDeletedIndices(ClusterChangedEvent event) {
if (shardsToFail != null) {
for (ShardId shardId : shardsToFail) {
logger.trace("[{}] failing running shard restore [{}]", entry.snapshotId(), shardId);
updateRestoreStateOnMaster(new UpdateIndexShardRestoreStatusRequest(entry.snapshotId(), shardId, new ShardRestoreStatus(null, RestoreMetaData.State.FAILURE, "index was deleted")));
innerUpdateRestoreState(new UpdateIndexShardRestoreStatusRequest(entry.snapshotId(), shardId, new ShardRestoreStatus(null, RestoreMetaData.State.FAILURE, "index was deleted")));
}
}
}
Expand All @@ -665,8 +669,12 @@ public void failRestore(SnapshotId snapshotId, ShardId shardId) {
logger.debug("[{}] failed to restore shard [{}]", snapshotId, shardId);
UpdateIndexShardRestoreStatusRequest request = new UpdateIndexShardRestoreStatusRequest(snapshotId, shardId,
new ShardRestoreStatus(clusterService.state().nodes().localNodeId(), RestoreMetaData.State.FAILURE));
if (clusterService.state().nodes().localNodeMaster()) {
innerUpdateRestoreState(request);
} else {
transportService.sendRequest(clusterService.state().nodes().masterNode(),
UPDATE_RESTORE_ACTION_NAME, request, EmptyTransportResponseHandler.INSTANCE_SAME);
}
}

private boolean failed(Snapshot snapshot, String index) {
Expand Down Expand Up @@ -991,7 +999,7 @@ public UpdateIndexShardRestoreStatusRequest newInstance() {

@Override
public void messageReceived(UpdateIndexShardRestoreStatusRequest request, final TransportChannel channel) throws Exception {
updateRestoreStateOnMaster(request);
innerUpdateRestoreState(request);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}

Expand Down

0 comments on commit b53afb0

Please sign in to comment.