Skip to content

Commit

Permalink
IndexShard.routingEntry should only be updated once all internal st…
Browse files Browse the repository at this point in the history
…ate is ready (#26776)

The routing entry is used by external components to check whether the shard is ready to perform as primary. Most notably, the peer recovery source handler delays recoveries until the shard routing entry says the shard is ready. 

When a shard is promoted to primary, we currently update the shard's routing entry before we finish all the work relating to the promotion. This can cause recoveries to fail later on because the `GlobalCheckpointTracker` isn't set (yet) to primary mode. 

This commit fixes this issue by updating the routing entry last.
  • Loading branch information
bleskes committed Sep 25, 2017
1 parent 4bff8a1 commit 3980235
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 8 deletions.
15 changes: 8 additions & 7 deletions core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Expand Up @@ -437,10 +437,10 @@ public void updateShardState(final ShardRouting newRouting,
assert newRouting.active() == false || state == IndexShardState.STARTED || state == IndexShardState.RELOCATED ||
state == IndexShardState.CLOSED :
"routing is active, but local shard state isn't. routing: " + newRouting + ", local state: " + state;
this.shardRouting = newRouting;
persistMetadata(path, indexSettings, newRouting, currentRouting, logger);
final CountDownLatch shardStateUpdated = new CountDownLatch(1);

if (shardRouting.primary()) {
if (newRouting.primary()) {
if (newPrimaryTerm != primaryTerm) {
assert currentRouting.primary() == false : "term is only increased as part of primary promotion";
/* Note that due to cluster state batching an initializing primary shard term can failed and re-assigned
Expand All @@ -456,9 +456,9 @@ public void updateShardState(final ShardRouting newRouting,
* We could fail the shard in that case, but this will cause it to be removed from the insync allocations list
* potentially preventing re-allocation.
*/
assert shardRouting.initializing() == false :
assert newRouting.initializing() == false :
"a started primary shard should never update its term; "
+ "shard " + shardRouting + ", "
+ "shard " + newRouting + ", "
+ "current term [" + primaryTerm + "], "
+ "new term [" + newPrimaryTerm + "]";
assert newPrimaryTerm > primaryTerm :
Expand All @@ -468,7 +468,6 @@ public void updateShardState(final ShardRouting newRouting,
* increment the primary term. The latch is needed to ensure that we do not unblock operations before the primary term is
* incremented.
*/
final CountDownLatch latch = new CountDownLatch(1);
// to prevent primary relocation handoff while resync is not completed
boolean resyncStarted = primaryReplicaResyncInProgress.compareAndSet(false, true);
if (resyncStarted == false) {
Expand All @@ -478,7 +477,7 @@ public void updateShardState(final ShardRouting newRouting,
30,
TimeUnit.MINUTES,
() -> {
latch.await();
shardStateUpdated.await();
try {
/*
* If this shard was serving as a replica shard when another shard was promoted to primary then the state of
Expand Down Expand Up @@ -521,9 +520,11 @@ public void onFailure(Exception e) {
e -> failShard("exception during primary term transition", e));
getEngine().seqNoService().activatePrimaryMode(getEngine().seqNoService().getLocalCheckpoint());
primaryTerm = newPrimaryTerm;
latch.countDown();
}
}
// set this last, once we finished updating all internal state.
this.shardRouting = newRouting;
shardStateUpdated.countDown();
}
if (currentRouting != null && currentRouting.active() == false && newRouting.active()) {
indexEventListener.afterIndexShardStarted(this);
Expand Down
Expand Up @@ -136,7 +136,6 @@

import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static java.util.Collections.max;
import static org.elasticsearch.cluster.routing.TestShardRouting.newShardRouting;
import static org.elasticsearch.common.lucene.Lucene.cleanLuceneIndex;
import static org.elasticsearch.common.xcontent.ToXContent.EMPTY_PARAMS;
Expand All @@ -151,6 +150,7 @@
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.hasToString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;

/**
Expand Down Expand Up @@ -410,6 +410,52 @@ public void onFailure(Exception e) {
closeShards(indexShard);
}

/**
* This test makes sure that people can use the shard routing entry to check whether a shard was already promoted to
* a primary. Concretely this means, that when we publish the routing entry via {@link IndexShard#routingEntry()} the following
* should have happened
* 1) Internal state (ala GlobalCheckpointTracker) have been updated
* 2) Primary term is set to the new term
*/
public void testPublishingOrderOnPromotion() throws IOException, BrokenBarrierException, InterruptedException {
final IndexShard indexShard = newStartedShard(false);
final long promotedTerm = indexShard.getPrimaryTerm() + 1;
final CyclicBarrier barrier = new CyclicBarrier(2);
final AtomicBoolean stop = new AtomicBoolean();
final Thread thread = new Thread(() -> {
try {
barrier.await();
} catch (final BrokenBarrierException | InterruptedException e) {
throw new RuntimeException(e);
}
while(stop.get() == false) {
if (indexShard.routingEntry().primary()) {
assertThat(indexShard.getPrimaryTerm(), equalTo(promotedTerm));
assertThat(indexShard.getEngine().seqNoService().getReplicationGroup(), notNullValue());
}
}
});
thread.start();

final ShardRouting replicaRouting = indexShard.routingEntry();
final ShardRouting primaryRouting = newShardRouting(replicaRouting.shardId(), replicaRouting.currentNodeId(), null, true,
ShardRoutingState.STARTED, replicaRouting.allocationId());


final Set<String> inSyncAllocationIds = Collections.singleton(primaryRouting.allocationId().getId());
final IndexShardRoutingTable routingTable =
new IndexShardRoutingTable.Builder(primaryRouting.shardId()).addShard(primaryRouting).build();
barrier.await();
// promote the replica
indexShard.updateShardState(primaryRouting, promotedTerm, (shard, listener) -> {}, 0L, inSyncAllocationIds, routingTable,
Collections.emptySet());

stop.set(true);
thread.join();
closeShards(indexShard);
}


public void testPrimaryFillsSeqNoGapsOnPromotion() throws Exception {
final IndexShard indexShard = newStartedShard(false);

Expand Down

0 comments on commit 3980235

Please sign in to comment.