Skip to content

Commit

Permalink
Before deleting a local unused shard copy, verify we're connected to …
Browse files Browse the repository at this point in the history
…the node it's supposed to be on

This is yet another safety guard to make sure we don't delete data if the local copy is the only one (even if it's not part of the cluster state any more)

Closes #6191
  • Loading branch information
bleskes committed May 20, 2014
1 parent 541acc7 commit 05d131c
Showing 1 changed file with 13 additions and 6 deletions.
19 changes: 13 additions & 6 deletions src/main/java/org/elasticsearch/indices/store/IndicesStore.java
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
Expand All @@ -37,7 +38,7 @@
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.node.settings.NodeSettingsService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.io.File;

Expand Down Expand Up @@ -78,6 +79,7 @@ public void onRefreshSettings(Settings settings) {
private final IndicesService indicesService;

private final ClusterService clusterService;
private final TransportService transportService;

private volatile String rateLimitingType;
private volatile ByteSizeValue rateLimitingThrottle;
Expand All @@ -86,12 +88,15 @@ public void onRefreshSettings(Settings settings) {
private final ApplySettings applySettings = new ApplySettings();

@Inject
public IndicesStore(Settings settings, NodeEnvironment nodeEnv, NodeSettingsService nodeSettingsService, IndicesService indicesService, ClusterService clusterService, ThreadPool threadPool) {
public IndicesStore(Settings settings, NodeEnvironment nodeEnv, NodeSettingsService nodeSettingsService, IndicesService indicesService,
ClusterService clusterService, TransportService transportService) {
super(settings);
this.nodeEnv = nodeEnv;
this.nodeSettingsService = nodeSettingsService;
this.indicesService = indicesService;
this.clusterService = clusterService;
this.transportService = transportService;

// we limit with 20MB / sec by default with a default type set to merge sice 0.90.1
this.rateLimitingType = componentSettings.get("throttle.type", StoreRateLimiting.Type.MERGE.name());
rateLimiting.setType(rateLimitingType);
Expand Down Expand Up @@ -141,15 +146,17 @@ public void clusterChanged(ClusterChangedEvent event) {
break;
}

// if the allocated or relocation node id doesn't exists in the cluster state, its a stale
// node, make sure we don't do anything with this until the routing table has properly been
// if the allocated or relocation node id doesn't exists in the cluster state or we're not connected to it
// it may be a stale node, make sure we don't do anything with this until the routing table has properly been
// rerouted to reflect the fact that the node does not exists
if (!event.state().nodes().nodeExists(shardRouting.currentNodeId())) {
DiscoveryNode node = event.state().nodes().get(shardRouting.currentNodeId());
if (node == null || !transportService.nodeConnected(node)) {
shardCanBeDeleted = false;
break;
}
if (shardRouting.relocatingNodeId() != null) {
if (!event.state().nodes().nodeExists(shardRouting.relocatingNodeId())) {
node = event.state().nodes().get(shardRouting.relocatingNodeId());
if (node == null || !transportService.nodeConnected(node)) {
shardCanBeDeleted = false;
break;
}
Expand Down

0 comments on commit 05d131c

Please sign in to comment.