Skip to content

Commit

Permalink
Verify some more invariants in IndexShardRoutingTable (#84582)
Browse files Browse the repository at this point in the history
Every `ShardRouting` in an `IndexShardRoutingTable` should have the same
`ShardId`, and there shouldn't be multiple primaries. This commit adds
assertions to that effect, and fixes the tests that were violating them.
  • Loading branch information
DaveCTurner committed Mar 3, 2022
1 parent 7b94e6a commit 5564ca1
Show file tree
Hide file tree
Showing 17 changed files with 169 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -477,9 +477,10 @@ public void testShardActiveElseWhere() throws Exception {
ClusterState currentState = clusterApplierService.state();
IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index);
for (int j = 0; j < numShards; j++) {
final var shardId = new ShardId(index, j);
indexRoutingTableBuilder.addIndexShard(
new IndexShardRoutingTable.Builder(new ShardId(index, j)).addShard(
TestShardRouting.newShardRouting("test", j, masterId, true, ShardRoutingState.STARTED)
new IndexShardRoutingTable.Builder(shardId).addShard(
TestShardRouting.newShardRouting(shardId, masterId, true, ShardRoutingState.STARTED)
).build()
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

import org.elasticsearch.core.CheckedConsumer;

import java.util.concurrent.TimeUnit;

public class PlainActionFuture<T> extends AdapterActionFuture<T, T> {

public static <T> PlainActionFuture<T> newFuture() {
Expand All @@ -22,6 +24,12 @@ public static <T, E extends Exception> T get(CheckedConsumer<PlainActionFuture<T
return fut.actionGet();
}

public static <T, E extends Exception> T get(CheckedConsumer<PlainActionFuture<T>, E> e, long timeout, TimeUnit unit) throws E {
PlainActionFuture<T> fut = newFuture();
e.accept(fut);
return fut.actionGet(timeout, unit);
}

@Override
protected T convert(T listenerResponse) {
return listenerResponse;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,8 @@ public Builder removeReplica() {
}

public Builder addIndexShard(IndexShardRoutingTable indexShard) {
assert indexShard.shardId().getIndex().equals(index)
: "cannot add shard routing table for " + indexShard.shardId() + " to index routing table for " + index;
shards.put(indexShard.shardId().id(), indexShard);
return this;
}
Expand All @@ -549,6 +551,7 @@ public Builder addIndexShard(IndexShardRoutingTable indexShard) {
* if it needs to be created.
*/
public Builder addShard(ShardRouting shard) {
assert shard.index().equals(index) : "cannot add [" + shard + "] to routing table for " + index;
IndexShardRoutingTable indexShard = shards.get(shard.id());
if (indexShard == null) {
indexShard = new IndexShardRoutingTable.Builder(shard.shardId()).addShard(shard).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
boolean allShardsStarted = true;
for (ShardRouting shard : shards) {
if (shard.primary()) {
assert primary == null : "duplicate primary: " + primary + " vs " + shard;
primary = shard;
} else {
replicas.add(shard);
Expand Down Expand Up @@ -580,6 +581,7 @@ public Builder(ShardId shardId) {
}

public Builder addShard(ShardRouting shardEntry) {
assert shardEntry.shardId().equals(shardId) : "cannot add [" + shardEntry + "] to routing table for " + shardId;
shards.add(shardEntry);
return this;
}
Expand All @@ -592,6 +594,7 @@ public Builder removeShard(ShardRouting shardEntry) {
public IndexShardRoutingTable build() {
// don't allow more than one shard copy with same id to be allocated to same node
assert distinctNodes(shards) : "more than one shard with same id assigned to same node (shards: " + shards + ")";
assert noDuplicatePrimary(shards) : "expected but did not find unique primary in shard routing table: " + shards;
return new IndexShardRoutingTable(shardId, shards);
}

Expand All @@ -612,6 +615,21 @@ static boolean distinctNodes(List<ShardRouting> shards) {
return true;
}

static boolean noDuplicatePrimary(List<ShardRouting> shards) {
boolean seenPrimary = false;
for (final var shard : shards) {
if (shard.primary()) {
if (seenPrimary) {
return false;
}
seenPrimary = true;
}
}
// We should be able to return seenPrimary here, but in tests there are many routing tables with no primary (e.g. empty) so for
// now we leniently allow there to be no primary as well. TODO fix those tests and stop being lenient here.
return true;
}

public static IndexShardRoutingTable readFrom(StreamInput in) throws IOException {
Index index = new Index(in);
return readFromThin(in, index);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESTestCase;

Expand Down Expand Up @@ -89,7 +88,7 @@ ClusterState randomClusterStateWithInitializingShards(String index, final int in
shardRoutingStates.set(0, randomFrom(ShardRoutingState.STARTED, ShardRoutingState.RELOCATING));
}

final ShardId shardId = new ShardId(new Index("index", "uuid"), 0);
final ShardId shardId = new ShardId(indexMetadata.getIndex(), 0);
final IndexRoutingTable.Builder routingTable = new IndexRoutingTable.Builder(indexMetadata.getIndex());

// Primary
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -937,9 +937,9 @@ private ClusterState buildClusterState() throws IOException {
.add(
IndexRoutingTable.builder(new Index("index", "indexUUID"))
.addIndexShard(
new IndexShardRoutingTable.Builder(new ShardId("index", "_na_", 1)).addShard(
new IndexShardRoutingTable.Builder(new ShardId("index", "indexUUID", 1)).addShard(
TestShardRouting.newShardRouting(
new ShardId("index", "_na_", 1),
new ShardId("index", "indexUUID", 1),
"nodeId2",
true,
ShardRoutingState.STARTED
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2594,8 +2594,7 @@ public void testSeqNoAndCheckpoints() throws IOException, InterruptedException {
try {
initialEngine = createEngine(defaultSettings, store, createTempDir(), newLogMergePolicy(), null);
final ShardRouting primary = TestShardRouting.newShardRouting(
"test",
shardId.id(),
shardId,
"node1",
null,
true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,7 @@ public void testTwoNoopEngines() throws IOException {
public void testNoopAfterRegularEngine() throws IOException {
int docs = randomIntBetween(1, 10);
ReplicationTracker tracker = (ReplicationTracker) engine.config().getGlobalCheckpointSupplier();
ShardRouting routing = TestShardRouting.newShardRouting(
"test",
shardId.id(),
"node",
null,
true,
ShardRoutingState.STARTED,
allocationId
);
ShardRouting routing = TestShardRouting.newShardRouting(shardId, "node", null, true, ShardRoutingState.STARTED, allocationId);
IndexShardRoutingTable table = new IndexShardRoutingTable.Builder(shardId).addShard(routing).build();
tracker.updateFromMaster(1L, Collections.singleton(allocationId.getId()), table);
tracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED);
Expand Down Expand Up @@ -165,15 +157,7 @@ public void testNoOpEngineStats() throws Exception {

public void testTrimUnreferencedTranslogFiles() throws Exception {
final ReplicationTracker tracker = (ReplicationTracker) engine.config().getGlobalCheckpointSupplier();
ShardRouting routing = TestShardRouting.newShardRouting(
"test",
shardId.id(),
"node",
null,
true,
ShardRoutingState.STARTED,
allocationId
);
ShardRouting routing = TestShardRouting.newShardRouting(shardId, "node", null, true, ShardRoutingState.STARTED, allocationId);
IndexShardRoutingTable table = new IndexShardRoutingTable.Builder(shardId).addShard(routing).build();
tracker.updateFromMaster(1L, Collections.singleton(allocationId.getId()), table);
tracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,71 +42,60 @@ public void createLocalNode() {
localNode = new DiscoveryNode("abc", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
}

public void testShardCanBeDeletedNoShardRouting() throws Exception {
public void testShardCanBeDeletedNoShardRouting() {
IndexShardRoutingTable.Builder routingTable = new IndexShardRoutingTable.Builder(new ShardId("test", "_na_", 1));
assertFalse(IndicesStore.shardCanBeDeleted(localNode.getId(), routingTable.build()));
}

public void testShardCanBeDeletedNoShardStarted() throws Exception {
int numShards = randomIntBetween(1, 7);
int numReplicas = randomInt(2);

IndexShardRoutingTable.Builder routingTable = new IndexShardRoutingTable.Builder(new ShardId("test", "_na_", 1));

for (int i = 0; i < numShards; i++) {
int unStartedShard = randomInt(numReplicas);
for (int j = 0; j <= numReplicas; j++) {
ShardRoutingState state;
if (j == unStartedShard) {
state = randomFrom(NOT_STARTED_STATES);
} else {
state = randomFrom(ShardRoutingState.values());
}
UnassignedInfo unassignedInfo = null;
if (state == ShardRoutingState.UNASSIGNED) {
unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null);
}
String currentNodeId = state == ShardRoutingState.UNASSIGNED ? null : randomAlphaOfLength(10);
String relocatingNodeId = state == ShardRoutingState.RELOCATING ? randomAlphaOfLength(10) : null;
routingTable.addShard(
TestShardRouting.newShardRouting("test", i, currentNodeId, relocatingNodeId, j == 0, state, unassignedInfo)
);
public void testShardCanBeDeletedNoShardStarted() {
final var numShardCopies = randomInt(3);
final var shardId = new ShardId("test", "_na_", 0);
final var routingTable = new IndexShardRoutingTable.Builder(shardId);
final var unStartedShard = randomInt(numShardCopies);
for (int j = 0; j <= numShardCopies; j++) {
ShardRoutingState state;
if (j == unStartedShard) {
state = randomFrom(NOT_STARTED_STATES);
} else {
state = randomFrom(ShardRoutingState.values());
}
UnassignedInfo unassignedInfo = null;
if (state == ShardRoutingState.UNASSIGNED) {
unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null);
}
String currentNodeId = state == ShardRoutingState.UNASSIGNED ? null : randomAlphaOfLength(10);
String relocatingNodeId = state == ShardRoutingState.RELOCATING ? randomAlphaOfLength(10) : null;
routingTable.addShard(
TestShardRouting.newShardRouting(shardId, currentNodeId, relocatingNodeId, j == 0, state, unassignedInfo)
);
}

assertFalse(IndicesStore.shardCanBeDeleted(localNode.getId(), routingTable.build()));
}

public void testShardCanBeDeletedShardExistsLocally() throws Exception {
int numShards = randomIntBetween(1, 7);
int numReplicas = randomInt(2);

IndexShardRoutingTable.Builder routingTable = new IndexShardRoutingTable.Builder(new ShardId("test", "_na_", 1));
int localShardId = randomInt(numShards - 1);
for (int i = 0; i < numShards; i++) {
int localNodeIndex = randomInt(numReplicas);
boolean primaryOnLocalNode = i == localShardId && localNodeIndex == numReplicas;
public void testShardCanBeDeletedShardExistsLocally() {
final var numReplicas = randomInt(2);
final var shardId = new ShardId("test", "_na_", 1);
final var routingTable = new IndexShardRoutingTable.Builder(shardId);
final var localNodeIndex = randomInt(numReplicas);
final var primaryOnLocalNode = localNodeIndex == numReplicas;
routingTable.addShard(
TestShardRouting.newShardRouting(
shardId,
primaryOnLocalNode ? localNode.getId() : randomAlphaOfLength(10),
true,
ShardRoutingState.STARTED
)
);
for (int j = 0; j < numReplicas; j++) {
final var replicaOnLocalNode = localNodeIndex == j;
routingTable.addShard(
TestShardRouting.newShardRouting(
"test",
i,
primaryOnLocalNode ? localNode.getId() : randomAlphaOfLength(10),
true,
shardId,
replicaOnLocalNode ? localNode.getId() : randomAlphaOfLength(10),
false,
ShardRoutingState.STARTED
)
);
for (int j = 0; j < numReplicas; j++) {
boolean replicaOnLocalNode = i == localShardId && localNodeIndex == j;
routingTable.addShard(
TestShardRouting.newShardRouting(
"test",
i,
replicaOnLocalNode ? localNode.getId() : randomAlphaOfLength(10),
false,
ShardRoutingState.STARTED
)
);
}
}

// Shard exists locally, can't delete shard
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,9 @@ public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, In
private void applyClusterState(final String reason, final Function<ClusterState, ClusterState> applier) {
PlainActionFuture.<Void, RuntimeException>get(
future -> clusterService.getClusterApplierService()
.onNewClusterState(reason, () -> applier.apply(clusterService.state()), future)
.onNewClusterState(reason, () -> applier.apply(clusterService.state()), future),
10,
TimeUnit.SECONDS
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.xpack.ccr.Ccr;
Expand Down Expand Up @@ -715,10 +716,14 @@ public void testGetLeaderIndicesToFollow() {
Settings.Builder builder = Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetadata.SETTING_INDEX_UUID, indexName);
imdBuilder.put(IndexMetadata.builder("metrics-" + i).settings(builder).numberOfShards(1).numberOfReplicas(0));

ShardRouting shardRouting = TestShardRouting.newShardRouting(indexName, 0, "1", true, ShardRoutingState.INITIALIZING)
.moveToStarted();
imdBuilder.put(IndexMetadata.builder(indexName).settings(builder).numberOfShards(1).numberOfReplicas(0));

ShardRouting shardRouting = TestShardRouting.newShardRouting(
new ShardId(imdBuilder.get(indexName).getIndex(), 0),
"1",
true,
ShardRoutingState.INITIALIZING
).moveToStarted();
IndexRoutingTable indexRoutingTable = IndexRoutingTable.builder(imdBuilder.get(indexName).getIndex())
.addShard(shardRouting)
.build();
Expand Down Expand Up @@ -2191,8 +2196,12 @@ private static ClusterState createRemoteClusterState(
ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("remote"))
.metadata(Metadata.builder().put(indexMetadata, true).version(metadataVersion));

ShardRouting shardRouting = TestShardRouting.newShardRouting(indexName, 0, "1", true, ShardRoutingState.INITIALIZING)
.moveToStarted();
ShardRouting shardRouting = TestShardRouting.newShardRouting(
new ShardId(indexMetadata.getIndex(), 0),
"1",
true,
ShardRoutingState.INITIALIZING
).moveToStarted();
IndexRoutingTable indexRoutingTable = IndexRoutingTable.builder(indexMetadata.getIndex()).addShard(shardRouting).build();
return csBuilder.routingTable(RoutingTable.builder().add(indexRoutingTable).build()).build();
}
Expand All @@ -2217,7 +2226,14 @@ private static ClusterState createRemoteClusterState(final ClusterState previous
metadataBuilder.put(indexMetadata, true);
routingTableBuilder.add(
IndexRoutingTable.builder(indexMetadata.getIndex())
.addShard(TestShardRouting.newShardRouting(indexName, 0, "1", true, ShardRoutingState.INITIALIZING).moveToStarted())
.addShard(
TestShardRouting.newShardRouting(
new ShardId(indexMetadata.getIndex(), 0),
"1",
true,
ShardRoutingState.INITIALIZING
).moveToStarted()
)
.build()
);
}
Expand Down Expand Up @@ -2287,8 +2303,12 @@ private static ClusterState createRemoteClusterStateWithDataStream(String dataSt
ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("remote"))
.metadata(Metadata.builder().put(indexMetadata, true).put(dataStream).version(0L));

ShardRouting shardRouting = TestShardRouting.newShardRouting(dataStreamName, 0, "1", true, ShardRoutingState.INITIALIZING)
.moveToStarted();
ShardRouting shardRouting = TestShardRouting.newShardRouting(
new ShardId(indexMetadata.getIndex(), 0),
"1",
true,
ShardRoutingState.INITIALIZING
).moveToStarted();
IndexRoutingTable indexRoutingTable = IndexRoutingTable.builder(indexMetadata.getIndex()).addShard(shardRouting).build();
return csBuilder.routingTable(RoutingTable.builder().add(indexRoutingTable).build()).build();
}
Expand Down

0 comments on commit 5564ca1

Please sign in to comment.