diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index edd37aa5c1739..9f0f6b84b459f 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -437,10 +437,10 @@ public void updateShardState(final ShardRouting newRouting, assert newRouting.active() == false || state == IndexShardState.STARTED || state == IndexShardState.RELOCATED || state == IndexShardState.CLOSED : "routing is active, but local shard state isn't. routing: " + newRouting + ", local state: " + state; - this.shardRouting = newRouting; persistMetadata(path, indexSettings, newRouting, currentRouting, logger); + final CountDownLatch shardStateUpdated = new CountDownLatch(1); - if (shardRouting.primary()) { + if (newRouting.primary()) { if (newPrimaryTerm != primaryTerm) { assert currentRouting.primary() == false : "term is only increased as part of primary promotion"; /* Note that due to cluster state batching an initializing primary shard term can failed and re-assigned @@ -456,9 +456,9 @@ public void updateShardState(final ShardRouting newRouting, * We could fail the shard in that case, but this will cause it to be removed from the insync allocations list * potentially preventing re-allocation. */ - assert shardRouting.initializing() == false : + assert newRouting.initializing() == false : "a started primary shard should never update its term; " - + "shard " + shardRouting + ", " + + "shard " + newRouting + ", " + "current term [" + primaryTerm + "], " + "new term [" + newPrimaryTerm + "]"; assert newPrimaryTerm > primaryTerm : @@ -468,7 +468,6 @@ public void updateShardState(final ShardRouting newRouting, * increment the primary term. The latch is needed to ensure that we do not unblock operations before the primary term is * incremented. */ - final CountDownLatch latch = new CountDownLatch(1); // to prevent primary relocation handoff while resync is not completed boolean resyncStarted = primaryReplicaResyncInProgress.compareAndSet(false, true); if (resyncStarted == false) { @@ -478,7 +477,7 @@ public void updateShardState(final ShardRouting newRouting, 30, TimeUnit.MINUTES, () -> { - latch.await(); + shardStateUpdated.await(); try { /* * If this shard was serving as a replica shard when another shard was promoted to primary then the state of @@ -521,9 +520,11 @@ public void onFailure(Exception e) { e -> failShard("exception during primary term transition", e)); getEngine().seqNoService().activatePrimaryMode(getEngine().seqNoService().getLocalCheckpoint()); primaryTerm = newPrimaryTerm; - latch.countDown(); } } + // set this last, once we finished updating all internal state. + this.shardRouting = newRouting; + shardStateUpdated.countDown(); } if (currentRouting != null && currentRouting.active() == false && newRouting.active()) { indexEventListener.afterIndexShardStarted(this); diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 92b5e74a479a8..a01f0230f22ca 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -136,7 +136,6 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; -import static java.util.Collections.max; import static org.elasticsearch.cluster.routing.TestShardRouting.newShardRouting; import static org.elasticsearch.common.lucene.Lucene.cleanLuceneIndex; import static org.elasticsearch.common.xcontent.ToXContent.EMPTY_PARAMS; @@ -151,6 +150,7 @@ import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.hasToString; import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; /** @@ -410,6 +410,52 @@ public void onFailure(Exception e) { closeShards(indexShard); } + /** + * This test makes sure that people can use the shard routing entry to check whether a shard was already promoted to + * a primary. Concretely this means, that when we publish the routing entry via {@link IndexShard#routingEntry()} the following + * should have happened + * 1) Internal state (ala GlobalCheckpointTracker) have been updated + * 2) Primary term is set to the new term + */ + public void testPublishingOrderOnPromotion() throws IOException, BrokenBarrierException, InterruptedException { + final IndexShard indexShard = newStartedShard(false); + final long promotedTerm = indexShard.getPrimaryTerm() + 1; + final CyclicBarrier barrier = new CyclicBarrier(2); + final AtomicBoolean stop = new AtomicBoolean(); + final Thread thread = new Thread(() -> { + try { + barrier.await(); + } catch (final BrokenBarrierException | InterruptedException e) { + throw new RuntimeException(e); + } + while(stop.get() == false) { + if (indexShard.routingEntry().primary()) { + assertThat(indexShard.getPrimaryTerm(), equalTo(promotedTerm)); + assertThat(indexShard.getEngine().seqNoService().getReplicationGroup(), notNullValue()); + } + } + }); + thread.start(); + + final ShardRouting replicaRouting = indexShard.routingEntry(); + final ShardRouting primaryRouting = newShardRouting(replicaRouting.shardId(), replicaRouting.currentNodeId(), null, true, + ShardRoutingState.STARTED, replicaRouting.allocationId()); + + + final Set inSyncAllocationIds = Collections.singleton(primaryRouting.allocationId().getId()); + final IndexShardRoutingTable routingTable = + new IndexShardRoutingTable.Builder(primaryRouting.shardId()).addShard(primaryRouting).build(); + barrier.await(); + // promote the replica + indexShard.updateShardState(primaryRouting, promotedTerm, (shard, listener) -> {}, 0L, inSyncAllocationIds, routingTable, + Collections.emptySet()); + + stop.set(true); + thread.join(); + closeShards(indexShard); + } + + public void testPrimaryFillsSeqNoGapsOnPromotion() throws Exception { final IndexShard indexShard = newStartedShard(false);