Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IndexShard.routingEntry should only be updated once all internal state is ready #26776

Merged
merged 2 commits into from Sep 25, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
15 changes: 8 additions & 7 deletions core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Expand Up @@ -437,10 +437,10 @@ public void updateShardState(final ShardRouting newRouting,
assert newRouting.active() == false || state == IndexShardState.STARTED || state == IndexShardState.RELOCATED ||
state == IndexShardState.CLOSED :
"routing is active, but local shard state isn't. routing: " + newRouting + ", local state: " + state;
this.shardRouting = newRouting;
persistMetadata(path, indexSettings, newRouting, currentRouting, logger);
final CountDownLatch shardStateUpdated = new CountDownLatch(1);

if (shardRouting.primary()) {
if (newRouting.primary()) {
if (newPrimaryTerm != primaryTerm) {
assert currentRouting.primary() == false : "term is only increased as part of primary promotion";
/* Note that due to cluster state batching an initializing primary shard term can failed and re-assigned
Expand All @@ -456,9 +456,9 @@ public void updateShardState(final ShardRouting newRouting,
* We could fail the shard in that case, but this will cause it to be removed from the insync allocations list
* potentially preventing re-allocation.
*/
assert shardRouting.initializing() == false :
assert newRouting.initializing() == false :
"a started primary shard should never update its term; "
+ "shard " + shardRouting + ", "
+ "shard " + newRouting + ", "
+ "current term [" + primaryTerm + "], "
+ "new term [" + newPrimaryTerm + "]";
assert newPrimaryTerm > primaryTerm :
Expand All @@ -468,7 +468,6 @@ public void updateShardState(final ShardRouting newRouting,
* increment the primary term. The latch is needed to ensure that we do not unblock operations before the primary term is
* incremented.
*/
final CountDownLatch latch = new CountDownLatch(1);
// to prevent primary relocation handoff while resync is not completed
boolean resyncStarted = primaryReplicaResyncInProgress.compareAndSet(false, true);
if (resyncStarted == false) {
Expand All @@ -478,7 +477,7 @@ public void updateShardState(final ShardRouting newRouting,
30,
TimeUnit.MINUTES,
() -> {
latch.await();
shardStateUpdated.await();
try {
/*
* If this shard was serving as a replica shard when another shard was promoted to primary then the state of
Expand Down Expand Up @@ -521,9 +520,11 @@ public void onFailure(Exception e) {
e -> failShard("exception during primary term transition", e));
getEngine().seqNoService().activatePrimaryMode(getEngine().seqNoService().getLocalCheckpoint());
primaryTerm = newPrimaryTerm;
latch.countDown();
}
}
// set this last, once we finished updating all internal state.
this.shardRouting = newRouting;
shardStateUpdated.countDown();
}
if (currentRouting != null && currentRouting.active() == false && newRouting.active()) {
indexEventListener.afterIndexShardStarted(this);
Expand Down
Expand Up @@ -136,7 +136,6 @@

import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static java.util.Collections.max;
import static org.elasticsearch.cluster.routing.TestShardRouting.newShardRouting;
import static org.elasticsearch.common.lucene.Lucene.cleanLuceneIndex;
import static org.elasticsearch.common.xcontent.ToXContent.EMPTY_PARAMS;
Expand All @@ -151,6 +150,7 @@
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.hasToString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;

/**
Expand Down Expand Up @@ -410,6 +410,52 @@ public void onFailure(Exception e) {
closeShards(indexShard);
}

/**
* This test makes sure that people can use the shard routing entry to check whether a shard was already promoted to
* a primary. Concretely this means, that when we publish the routing entry via {@link IndexShard#routingEntry()} the following
* should have happened
* 1) Internal state (ala GlobalCheckpointTracker) have been updated
* 2) Primary term is set to the new term
*/
public void testPublishingOrderOnPromotion() throws IOException, BrokenBarrierException, InterruptedException {
final IndexShard indexShard = newStartedShard(false);
final long promotedTerm = indexShard.getPrimaryTerm() + 1;
final CyclicBarrier barrier = new CyclicBarrier(2);
final AtomicBoolean stop = new AtomicBoolean();
final Thread thread = new Thread(() -> {
try {
barrier.await();
} catch (final BrokenBarrierException | InterruptedException e) {
throw new RuntimeException(e);
}
while(stop.get() == false) {
if (indexShard.routingEntry().primary()) {
assertThat(indexShard.getPrimaryTerm(), equalTo(promotedTerm));
assertThat(indexShard.getEngine().seqNoService().getReplicationGroup(), notNullValue());
}
}
});
thread.start();

final ShardRouting replicaRouting = indexShard.routingEntry();
final ShardRouting primaryRouting = newShardRouting(replicaRouting.shardId(), replicaRouting.currentNodeId(), null, true,
ShardRoutingState.STARTED, replicaRouting.allocationId());


final Set<String> inSyncAllocationIds = Collections.singleton(primaryRouting.allocationId().getId());
final IndexShardRoutingTable routingTable =
new IndexShardRoutingTable.Builder(primaryRouting.shardId()).addShard(primaryRouting).build();
barrier.await();
// promote the replica
indexShard.updateShardState(primaryRouting, promotedTerm, (shard, listener) -> {}, 0L, inSyncAllocationIds, routingTable,
Collections.emptySet());

stop.set(true);
thread.join();
closeShards(indexShard);
}


public void testPrimaryFillsSeqNoGapsOnPromotion() throws Exception {
final IndexShard indexShard = newStartedShard(false);

Expand Down