Skip to content

Commit

Permalink
Introduced an index UUID which is added to the index's settings upon …
Browse files Browse the repository at this point in the history
…creation. Used that UUID to verify old and delayed shard started/failed events are not applied to newer indexes with the same name.

Also, exceptions while processing batched events do not stop the rest of the events from being processed.

Closes #3778
  • Loading branch information
bleskes committed Sep 25, 2013
1 parent 35990f5 commit 1644444
Show file tree
Hide file tree
Showing 8 changed files with 259 additions and 110 deletions.
Expand Up @@ -31,6 +31,7 @@
import org.elasticsearch.cluster.TimeoutClusterStateListener;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
Expand Down Expand Up @@ -622,8 +623,11 @@ void performReplicas(final PrimaryResponse<Response, ReplicaRequest> response) {
replicaCounter++;
AtomicInteger counter = new AtomicInteger(replicaCounter);


IndexMetaData indexMetaData = clusterState.metaData().index(request.index());

if (newPrimaryShard != null) {
performOnReplica(response, counter, newPrimaryShard, newPrimaryShard.currentNodeId());
performOnReplica(response, counter, newPrimaryShard, newPrimaryShard.currentNodeId(), indexMetaData);
}

shardIt.reset(); // reset the iterator
Expand All @@ -647,10 +651,10 @@ void performReplicas(final PrimaryResponse<Response, ReplicaRequest> response) {
// yet that it was started. We will get an exception IllegalShardState exception if its not started
// and that's fine, we will ignore it
if (!doOnlyOnRelocating) {
performOnReplica(response, counter, shard, shard.currentNodeId());
performOnReplica(response, counter, shard, shard.currentNodeId(), indexMetaData);
}
if (shard.relocating()) {
performOnReplica(response, counter, shard, shard.relocatingNodeId());
performOnReplica(response, counter, shard, shard.relocatingNodeId(), indexMetaData);
}
}

Expand All @@ -662,7 +666,7 @@ void performReplicas(final PrimaryResponse<Response, ReplicaRequest> response) {
}
}

void performOnReplica(final PrimaryResponse<Response, ReplicaRequest> response, final AtomicInteger counter, final ShardRouting shard, String nodeId) {
void performOnReplica(final PrimaryResponse<Response, ReplicaRequest> response, final AtomicInteger counter, final ShardRouting shard, String nodeId, final IndexMetaData indexMetaData) {
// if we don't have that node, it means that it might have failed and will be created again, in
// this case, we don't have to do the operation, and just let it failover
if (!clusterState.nodes().nodeExists(nodeId)) {
Expand All @@ -685,7 +689,8 @@ public void handleResponse(TransportResponse.Empty vResponse) {
public void handleException(TransportException exp) {
if (!ignoreReplicaException(exp.unwrapCause())) {
logger.warn("Failed to perform " + transportAction + " on replica " + shardIt.shardId(), exp);
shardStateAction.shardFailed(shard, "Failed to perform [" + transportAction + "] on replica, message [" + detailedMessage(exp) + "]");
shardStateAction.shardFailed(shard, indexMetaData.getUUID(),
"Failed to perform [" + transportAction + "] on replica, message [" + detailedMessage(exp) + "]");
}
finishIfPossible();
}
Expand All @@ -708,7 +713,8 @@ public void run() {
} catch (Throwable e) {
if (!ignoreReplicaException(e)) {
logger.warn("Failed to perform " + transportAction + " on replica " + shardIt.shardId(), e);
shardStateAction.shardFailed(shard, "Failed to perform [" + transportAction + "] on replica, message [" + detailedMessage(e) + "]");
shardStateAction.shardFailed(shard, indexMetaData.getUUID(),
"Failed to perform [" + transportAction + "] on replica, message [" + detailedMessage(e) + "]");
}
}
if (counter.decrementAndGet() == 0) {
Expand All @@ -725,7 +731,8 @@ public boolean isForceExecution() {
} catch (Throwable e) {
if (!ignoreReplicaException(e)) {
logger.warn("Failed to perform " + transportAction + " on replica " + shardIt.shardId(), e);
shardStateAction.shardFailed(shard, "Failed to perform [" + transportAction + "] on replica, message [" + detailedMessage(e) + "]");
shardStateAction.shardFailed(shard, indexMetaData.getUUID(),
"Failed to perform [" + transportAction + "] on replica, message [" + detailedMessage(e) + "]");
}
// we want to decrement the counter here, in teh failure handling, cause we got rejected
// from executing on the thread pool
Expand All @@ -739,7 +746,8 @@ public boolean isForceExecution() {
} catch (Throwable e) {
if (!ignoreReplicaException(e)) {
logger.warn("Failed to perform " + transportAction + " on replica" + shardIt.shardId(), e);
shardStateAction.shardFailed(shard, "Failed to perform [" + transportAction + "] on replica, message [" + detailedMessage(e) + "]");
shardStateAction.shardFailed(shard, indexMetaData.getUUID(),
"Failed to perform [" + transportAction + "] on replica, message [" + detailedMessage(e) + "]");
}
}
if (counter.decrementAndGet() == 0) {
Expand Down

0 comments on commit 1644444

Please sign in to comment.