Skip to content

Commit

Permalink
Bulk process of shard started/failed should not execute on already pr…
Browse files Browse the repository at this point in the history
…ocessed events

closes #5061
  • Loading branch information
kimchy committed Feb 10, 2014
1 parent b80acd2 commit a881d89
Showing 1 changed file with 11 additions and 0 deletions.
Expand Up @@ -118,6 +118,9 @@ private void innerShardFailed(final ShardRoutingEntry shardRoutingEntry) {
clusterService.submitStateUpdateTask("shard-failed (" + shardRoutingEntry.shardRouting + "), reason [" + shardRoutingEntry.reason + "]", Priority.HIGH, new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
if (shardRoutingEntry.processed) {
return currentState;
}

List<ShardRoutingEntry> shardRoutingEntries = new ArrayList<ShardRoutingEntry>();
failedShardQueue.drainTo(shardRoutingEntries);
Expand All @@ -132,6 +135,7 @@ public ClusterState execute(ClusterState currentState) {
List<ShardRouting> shardRoutingsToBeApplied = new ArrayList<ShardRouting>(shardRoutingEntries.size());
for (int i = 0; i < shardRoutingEntries.size(); i++) {
ShardRoutingEntry shardRoutingEntry = shardRoutingEntries.get(i);
shardRoutingEntry.processed = true;
ShardRouting shardRouting = shardRoutingEntry.shardRouting;
IndexMetaData indexMetaData = metaData.index(shardRouting.index());
// if there is no metadata or the current index is not of the right uuid, the index has been deleted while it was being allocated
Expand Down Expand Up @@ -175,6 +179,10 @@ private void innerShardStarted(final ShardRoutingEntry shardRoutingEntry) {
@Override
public ClusterState execute(ClusterState currentState) {

if (shardRoutingEntry.processed) {
return currentState;
}

List<ShardRoutingEntry> shardRoutingEntries = new ArrayList<ShardRoutingEntry>();
startedShardsQueue.drainTo(shardRoutingEntries);

Expand All @@ -190,6 +198,7 @@ public ClusterState execute(ClusterState currentState) {

for (int i = 0; i < shardRoutingEntries.size(); i++) {
ShardRoutingEntry shardRoutingEntry = shardRoutingEntries.get(i);
shardRoutingEntry.processed = true;
ShardRouting shardRouting = shardRoutingEntry.shardRouting;
try {
IndexMetaData indexMetaData = metaData.index(shardRouting.index());
Expand Down Expand Up @@ -305,6 +314,8 @@ static class ShardRoutingEntry extends TransportRequest {

private String reason;

volatile boolean processed; // state field, no need to serialize

private ShardRoutingEntry() {
}

Expand Down

0 comments on commit a881d89

Please sign in to comment.