Skip to content

Commit

Permalink
auto_expand_replicas: [0-auto] can cause data loss when nodes are rem…
Browse files Browse the repository at this point in the history
…oved, closes elastic#934.

This is caused because of a race condition between when to handle the removed node and move a replica to a primary mode, and when to remove the replica because of the 0-auto setting.
  • Loading branch information
kimchy committed May 16, 2011
1 parent 70e8698 commit e15c34f
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 27 deletions.
25 changes: 25 additions & 0 deletions .idea/projectCodeStyle.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Expand Up @@ -54,6 +54,11 @@ public interface ClusterService extends LifecycleComponent<ClusterService> {
*/
OperationRouting operationRouting();

/**
* Adds a priority listener for updated cluster states.
*/
void addPriority(ClusterStateListener listener);

/**
* Adds a listener for updated cluster states.
*/
Expand Down
Expand Up @@ -39,6 +39,8 @@
*/
public class RoutingService extends AbstractLifecycleComponent<RoutingService> implements ClusterStateListener {

private static final String CLUSTER_UPDATE_TASK_SOURCE = "routing-table-updater";

private final ThreadPool threadPool;

private final ClusterService clusterService;
Expand All @@ -60,7 +62,7 @@ public class RoutingService extends AbstractLifecycleComponent<RoutingService> i
}

@Override protected void doStart() throws ElasticSearchException {
clusterService.add(this);
clusterService.addPriority(this);
}

@Override protected void doStop() throws ElasticSearchException {
Expand All @@ -75,7 +77,7 @@ public class RoutingService extends AbstractLifecycleComponent<RoutingService> i
}

@Override public void clusterChanged(ClusterChangedEvent event) {
if (event.source().equals(RoutingTableUpdater.CLUSTER_UPDATE_TASK_SOURCE)) {
if (event.source().equals(CLUSTER_UPDATE_TASK_SOURCE)) {
// that's us, ignore this event
return;
}
Expand All @@ -93,7 +95,7 @@ public class RoutingService extends AbstractLifecycleComponent<RoutingService> i
// also, if the routing table changed, it means that we have new indices, or shard have started
// or failed, we want to apply this as fast as possible
routingTableDirty = true;
threadPool.cached().execute(new RoutingTableUpdater());
reroute();
} else {
if (event.nodesAdded()) {
routingTableDirty = true;
Expand All @@ -107,32 +109,34 @@ public class RoutingService extends AbstractLifecycleComponent<RoutingService> i
}
}

private class RoutingTableUpdater implements Runnable {
private void reroute() {
try {
if (!routingTableDirty) {
return;
}
if (lifecycle.stopped()) {
return;
}
clusterService.submitStateUpdateTask(CLUSTER_UPDATE_TASK_SOURCE, new ClusterStateUpdateTask() {
@Override public ClusterState execute(ClusterState currentState) {
RoutingAllocation.Result routingResult = shardsAllocation.reroute(currentState);
if (!routingResult.changed()) {
// no state changed
return currentState;
}
return newClusterStateBuilder().state(currentState).routingResult(routingResult).build();
}
});
routingTableDirty = false;
} catch (Exception e) {
logger.warn("Failed to reroute routing table", e);
}
}

private static final String CLUSTER_UPDATE_TASK_SOURCE = "routing-table-updater";
private class RoutingTableUpdater implements Runnable {

@Override public void run() {
try {
if (!routingTableDirty) {
return;
}
if (lifecycle.stopped()) {
return;
}
clusterService.submitStateUpdateTask(CLUSTER_UPDATE_TASK_SOURCE, new ClusterStateUpdateTask() {
@Override public ClusterState execute(ClusterState currentState) {
RoutingAllocation.Result routingResult = shardsAllocation.reroute(currentState);
if (!routingResult.changed()) {
// no state changed
return currentState;
}
return newClusterStateBuilder().state(currentState).routingResult(routingResult).build();
}
});
routingTableDirty = false;
} catch (Exception e) {
logger.warn("Failed to reroute routing table", e);
}
reroute();
}
}
}
Expand Up @@ -65,6 +65,7 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe

private volatile ExecutorService updateTasksExecutor;

private final List<ClusterStateListener> priorityClusterStateListeners = new CopyOnWriteArrayList<ClusterStateListener>();
private final List<ClusterStateListener> clusterStateListeners = new CopyOnWriteArrayList<ClusterStateListener>();

private final Queue<NotifyTimeout> onGoingTimeouts = new LinkedTransferQueue<NotifyTimeout>();
Expand Down Expand Up @@ -127,13 +128,17 @@ public ClusterState state() {
return this.clusterState;
}

public void addPriority(ClusterStateListener listener) {
priorityClusterStateListeners.add(listener);
}

public void add(ClusterStateListener listener) {
clusterStateListeners.add(listener);
}

public void remove(ClusterStateListener listener) {
clusterStateListeners.remove(listener);
for (Iterator<NotifyTimeout> it = onGoingTimeouts.iterator(); it.hasNext();) {
for (Iterator<NotifyTimeout> it = onGoingTimeouts.iterator(); it.hasNext(); ) {
NotifyTimeout timeout = it.next();
if (timeout.listener.equals(listener)) {
timeout.cancel();
Expand Down Expand Up @@ -226,6 +231,9 @@ public void submitStateUpdateTask(final String source, final ClusterStateUpdateT
}
}

for (ClusterStateListener listener : priorityClusterStateListeners) {
listener.clusterChanged(clusterChangedEvent);
}
for (ClusterStateListener listener : clusterStateListeners) {
listener.clusterChanged(clusterChangedEvent);
}
Expand Down

0 comments on commit e15c34f

Please sign in to comment.