Skip to content

Commit

Permalink
When sending shard start/failed message due to a cluster state change…
Browse files Browse the repository at this point in the history
…, use the master indicated in the new state rather than current

This commit also adds extra protection in other cases against a master node being de-elected and thus being null.

Closes #6189
  • Loading branch information
bleskes committed May 15, 2014
1 parent 84593f0 commit 1f28cd0
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 15 deletions.
Expand Up @@ -25,7 +25,7 @@
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable;
Expand Down Expand Up @@ -76,37 +76,51 @@ public ShardStateAction(Settings settings, ClusterService clusterService, Transp
}

public void shardFailed(final ShardRouting shardRouting, final String indexUUID, final String reason) throws ElasticsearchException {
DiscoveryNode masterNode = clusterService.state().nodes().masterNode();
if (masterNode == null) {
logger.debug("can't send shard failed for {}. no master known.", shardRouting);
}
shardFailed(shardRouting, indexUUID, reason, masterNode);
}

public void shardFailed(final ShardRouting shardRouting, final String indexUUID, final String reason, final DiscoveryNode masterNode) throws ElasticsearchException {
ShardRoutingEntry shardRoutingEntry = new ShardRoutingEntry(shardRouting, indexUUID, reason);
logger.warn("{} sending failed shard for {}", shardRouting.shardId(), shardRoutingEntry);
DiscoveryNodes nodes = clusterService.state().nodes();
if (nodes.localNodeMaster()) {
if (clusterService.localNode().equals(masterNode)) {
innerShardFailed(shardRoutingEntry);
} else {
transportService.sendRequest(clusterService.state().nodes().masterNode(),
ShardFailedTransportHandler.ACTION, shardRoutingEntry, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleException(TransportException exp) {
logger.warn("failed to send failed shard to [{}]", exp, clusterService.state().nodes().masterNode());
logger.warn("failed to send failed shard to {}", exp, masterNode);
}
});
}
}

public void shardStarted(final ShardRouting shardRouting, String indexUUID, final String reason) throws ElasticsearchException {
DiscoveryNode masterNode = clusterService.state().nodes().masterNode();
if (masterNode == null) {
logger.debug("can't send shard started for {}. no master known.", shardRouting);
}
shardStarted(shardRouting, indexUUID, reason, masterNode);
}

public void shardStarted(final ShardRouting shardRouting, String indexUUID, final String reason, final DiscoveryNode masterNode) throws ElasticsearchException {

ShardRoutingEntry shardRoutingEntry = new ShardRoutingEntry(shardRouting, indexUUID, reason);

logger.debug("sending shard started for {}", shardRoutingEntry);

DiscoveryNodes nodes = clusterService.state().nodes();
if (nodes.localNodeMaster()) {
if (clusterService.localNode().equals(masterNode)) {
innerShardStarted(shardRoutingEntry);
} else {
transportService.sendRequest(clusterService.state().nodes().masterNode(),
transportService.sendRequest(masterNode,
ShardStartedTransportHandler.ACTION, new ShardRoutingEntry(shardRouting, indexUUID, reason), new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleException(TransportException exp) {
logger.warn("failed to send shard started to [{}]", exp, clusterService.state().nodes().masterNode());
logger.warn("failed to send shard started to [{}]", exp, masterNode);
}
});
}
Expand Down
Expand Up @@ -517,8 +517,12 @@ private void applyNewOrUpdatedShards(final ClusterChangedEvent event) throws Ela
// the master thinks we are started, but we don't have this shard at all, mark it as failed
logger.warn("[{}][{}] master [{}] marked shard as started, but shard has not been created, mark shard as failed", shardRouting.index(), shardId, nodes.masterNode());
failedShards.put(shardRouting.shardId(), new FailedShard(shardRouting.version()));
shardStateAction.shardFailed(shardRouting, indexMetaData.getUUID(),
"master " + nodes.masterNode() + " marked shard as started, but shard has not been created, mark shard as failed");
if (nodes.masterNode() != null) {
shardStateAction.shardFailed(shardRouting, indexMetaData.getUUID(),
"master " + nodes.masterNode() + " marked shard as started, but shard has not been created, mark shard as failed",
nodes.masterNode()
);
}
}
continue;
}
Expand Down Expand Up @@ -606,11 +610,14 @@ private void applyInitializingShard(final RoutingTable routingTable, final Disco
// for master to confirm a shard started message (either master failover, or a cluster event before
// we managed to tell the master we started), mark us as started
if (logger.isTraceEnabled()) {
logger.trace("{} master marked shard as initializing, but shard has state [{}], resending shard started",
indexShard.shardId(), indexShard.state());
logger.trace("{} master marked shard as initializing, but shard has state [{}], resending shard started to {}",
indexShard.shardId(), indexShard.state(), nodes.masterNode());
}
if (nodes.masterNode() != null) {
shardStateAction.shardStarted(shardRouting, indexMetaData.getUUID(),
"master " + nodes.masterNode() + " marked shard as initializing, but shard state is [" + indexShard.state() + "], mark shard as started",
nodes.masterNode());
}
shardStateAction.shardStarted(shardRouting, indexMetaData.getUUID(),
"master " + nodes.masterNode() + " marked shard as initializing, but shard state is [" + indexShard.state() + "], mark shard as started");
return;
} else {
if (indexShard.ignoreRecoveryAttempt()) {
Expand Down Expand Up @@ -676,7 +683,13 @@ private void applyInitializingShard(final RoutingTable routingTable, final Disco
logger.warn("[{}][{}] failed to remove shard after failed creation", e1, shardRouting.index(), shardRouting.id());
}
failedShards.put(shardRouting.shardId(), new FailedShard(shardRouting.version()));
shardStateAction.shardFailed(shardRouting, indexMetaData.getUUID(), "Failed to create shard, message [" + detailedMessage(e) + "]");
if (nodes.masterNode() != null) {
shardStateAction.shardFailed(shardRouting, indexMetaData.getUUID(), "Failed to create shard, message [" + detailedMessage(e) + "]",
nodes.masterNode()
);
} else {
logger.debug("can't send shard failed for {} as there is no current master", shardRouting.shardId());
}
return;
}
}
Expand Down

0 comments on commit 1f28cd0

Please sign in to comment.