Skip to content

Commit

Permalink
Bulk process of shard started/failed should not execute on already pr…
Browse files Browse the repository at this point in the history
…ocessed events

closes #5061
  • Loading branch information
kimchy committed Feb 10, 2014
1 parent 85b7604 commit 83b09d2
Showing 1 changed file with 11 additions and 0 deletions.
Expand Up @@ -119,6 +119,9 @@ private void innerShardFailed(final ShardRoutingEntry shardRoutingEntry) {
clusterService.submitStateUpdateTask("shard-failed (" + shardRoutingEntry.shardRouting + "), reason [" + shardRoutingEntry.reason + "]", Priority.HIGH, new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
if (shardRoutingEntry.processed) {
return currentState;
}

List<ShardRoutingEntry> shardRoutingEntries = new ArrayList<ShardRoutingEntry>();
failedShardQueue.drainTo(shardRoutingEntries);
Expand All @@ -133,6 +136,7 @@ public ClusterState execute(ClusterState currentState) {
List<ShardRouting> shardRoutingsToBeApplied = new ArrayList<ShardRouting>(shardRoutingEntries.size());
for (int i = 0; i < shardRoutingEntries.size(); i++) {
ShardRoutingEntry shardRoutingEntry = shardRoutingEntries.get(i);
shardRoutingEntry.processed = true;
ShardRouting shardRouting = shardRoutingEntry.shardRouting;
IndexMetaData indexMetaData = metaData.index(shardRouting.index());
// if there is no metadata or the current index is not of the right uuid, the index has been deleted while it was being allocated
Expand Down Expand Up @@ -176,6 +180,10 @@ private void innerShardStarted(final ShardRoutingEntry shardRoutingEntry) {
@Override
public ClusterState execute(ClusterState currentState) {

if (shardRoutingEntry.processed) {
return currentState;
}

List<ShardRoutingEntry> shardRoutingEntries = new ArrayList<ShardRoutingEntry>();
startedShardsQueue.drainTo(shardRoutingEntries);

Expand All @@ -191,6 +199,7 @@ public ClusterState execute(ClusterState currentState) {

for (int i = 0; i < shardRoutingEntries.size(); i++) {
ShardRoutingEntry shardRoutingEntry = shardRoutingEntries.get(i);
shardRoutingEntry.processed = true;
ShardRouting shardRouting = shardRoutingEntry.shardRouting;
try {
IndexMetaData indexMetaData = metaData.index(shardRouting.index());
Expand Down Expand Up @@ -306,6 +315,8 @@ static class ShardRoutingEntry extends TransportRequest {

private String reason;

volatile boolean processed; // state field, no need to serialize

private ShardRoutingEntry() {
}

Expand Down

0 comments on commit 83b09d2

Please sign in to comment.