Skip to content

Commit

Permalink
Fix RoutingTable Lookup by Index (#75530) (#75575)
Browse files Browse the repository at this point in the history
This is likely one source of bugs in at least snapshotting as it can lead
to looking up the wrong index from an old shard id (if an index has been
deleted and a new index is created in its place concurrently)
  • Loading branch information
original-brownbear committed Jul 21, 2021
1 parent dc9d9b4 commit 6443571
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,16 +93,17 @@ public boolean hasIndex(String index) {
}

public boolean hasIndex(Index index) {
IndexRoutingTable indexRouting = index(index.getName());
return indexRouting != null && indexRouting.getIndex().equals(index);
IndexRoutingTable indexRouting = index(index);
return indexRouting != null;
}

public IndexRoutingTable index(String index) {
return indicesRouting.get(index);
}

public IndexRoutingTable index(Index index) {
return indicesRouting.get(index.getName());
IndexRoutingTable indexRouting = index(index.getName());
return indexRouting != null && indexRouting.getIndex().equals(index) ? indexRouting : null;
}

public ImmutableOpenMap<String, IndexRoutingTable> indicesRouting() {
Expand Down Expand Up @@ -134,8 +135,8 @@ public IndexShardRoutingTable shardRoutingTable(String index, int shardId) {
* @throws ShardNotFoundException if provided shard id is unknown
*/
public IndexShardRoutingTable shardRoutingTable(ShardId shardId) {
IndexRoutingTable indexRouting = index(shardId.getIndexName());
if (indexRouting == null || indexRouting.getIndex().equals(shardId.getIndex()) == false) {
IndexRoutingTable indexRouting = index(shardId.getIndex());
if (indexRouting == null) {
throw new IndexNotFoundException(shardId.getIndex());
}
IndexShardRoutingTable shard = indexRouting.shard(shardId.id());
Expand All @@ -147,7 +148,7 @@ public IndexShardRoutingTable shardRoutingTable(ShardId shardId) {

@Nullable
public ShardRouting getByAllocationId(ShardId shardId, String allocationId) {
final IndexRoutingTable indexRoutingTable = index(shardId.getIndexName());
final IndexRoutingTable indexRoutingTable = index(shardId.getIndex());
if (indexRoutingTable == null) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.cluster.node.DiscoveryNodes.Builder;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException;
Expand Down Expand Up @@ -127,6 +128,7 @@ public void testHasIndex() {

public void testIndex() {
assertThat(clusterState.routingTable().index(TEST_INDEX_1).getIndex().getName(), is(TEST_INDEX_1));
assertThat(clusterState.routingTable().index(new Index(TEST_INDEX_1, UUIDs.randomBase64UUID())), is(nullValue()));
assertThat(clusterState.routingTable().index("foobar"), is(nullValue()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.UUID;

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -178,10 +176,7 @@ public void testManagerSkipsIndicesWithRedStatus() {
SystemIndices systemIndices = new SystemIndices(Map.of("MyIndex", FEATURE));
SystemIndexManager manager = new SystemIndexManager(systemIndices, client);

final ClusterState.Builder clusterStateBuilder = createClusterState();
markShardsUnavailable(clusterStateBuilder);

assertThat(manager.getUpgradeStatus(clusterStateBuilder.build(), DESCRIPTOR), equalTo(UpgradeStatus.UNHEALTHY));
assertThat(manager.getUpgradeStatus(markShardsUnavailable(createClusterState()), DESCRIPTOR), equalTo(UpgradeStatus.UNHEALTHY));
}

/**
Expand All @@ -192,10 +187,7 @@ public void testManagerSkipsIndicesWithOutdatedFormat() {
SystemIndices systemIndices = new SystemIndices(Map.of("MyIndex", FEATURE));
SystemIndexManager manager = new SystemIndexManager(systemIndices, client);

final ClusterState.Builder clusterStateBuilder = createClusterState(5);
markShardsAvailable(clusterStateBuilder);

assertThat(manager.getUpgradeStatus(clusterStateBuilder.build(), DESCRIPTOR), equalTo(UpgradeStatus.NEEDS_UPGRADE));
assertThat(manager.getUpgradeStatus(markShardsAvailable(createClusterState(5)), DESCRIPTOR), equalTo(UpgradeStatus.NEEDS_UPGRADE));
}

/**
Expand All @@ -205,10 +197,7 @@ public void testManagerSkipsIndicesWithUpToDateMappings() {
SystemIndices systemIndices = new SystemIndices(Map.of("MyIndex", FEATURE));
SystemIndexManager manager = new SystemIndexManager(systemIndices, client);

final ClusterState.Builder clusterStateBuilder = createClusterState();
markShardsAvailable(clusterStateBuilder);

assertThat(manager.getUpgradeStatus(clusterStateBuilder.build(), DESCRIPTOR), equalTo(UpgradeStatus.UP_TO_DATE));
assertThat(manager.getUpgradeStatus(markShardsAvailable(createClusterState()), DESCRIPTOR), equalTo(UpgradeStatus.UP_TO_DATE));
}

/**
Expand All @@ -218,10 +207,10 @@ public void testManagerProcessesIndicesWithOutdatedMappings() {
SystemIndices systemIndices = new SystemIndices(Map.of("MyIndex", FEATURE));
SystemIndexManager manager = new SystemIndexManager(systemIndices, client);

final ClusterState.Builder clusterStateBuilder = createClusterState(Strings.toString(getMappings("1.0.0")));
markShardsAvailable(clusterStateBuilder);

assertThat(manager.getUpgradeStatus(clusterStateBuilder.build(), DESCRIPTOR), equalTo(UpgradeStatus.NEEDS_MAPPINGS_UPDATE));
assertThat(
manager.getUpgradeStatus(markShardsAvailable(createClusterState(Strings.toString(getMappings("1.0.0")))), DESCRIPTOR),
equalTo(UpgradeStatus.NEEDS_MAPPINGS_UPDATE)
);
}

/**
Expand All @@ -231,10 +220,10 @@ public void testManagerProcessesIndicesWithNullVersionMetadata() {
SystemIndices systemIndices = new SystemIndices(Map.of("MyIndex", FEATURE));
SystemIndexManager manager = new SystemIndexManager(systemIndices, client);

final ClusterState.Builder clusterStateBuilder = createClusterState(Strings.toString(getMappings(null)));
markShardsAvailable(clusterStateBuilder);

assertThat(manager.getUpgradeStatus(clusterStateBuilder.build(), DESCRIPTOR), equalTo(UpgradeStatus.NEEDS_MAPPINGS_UPDATE));
assertThat(
manager.getUpgradeStatus(markShardsAvailable(createClusterState(Strings.toString(getMappings(null)))), DESCRIPTOR),
equalTo(UpgradeStatus.NEEDS_MAPPINGS_UPDATE)
);
}

/**
Expand All @@ -244,10 +233,7 @@ public void testManagerSubmitsPutRequest() {
SystemIndices systemIndices = new SystemIndices(Map.of("MyIndex", FEATURE));
SystemIndexManager manager = new SystemIndexManager(systemIndices, client);

final ClusterState.Builder clusterStateBuilder = createClusterState(Strings.toString(getMappings("1.0.0")));
markShardsAvailable(clusterStateBuilder);

manager.clusterChanged(event(clusterStateBuilder));
manager.clusterChanged(event(markShardsAvailable(createClusterState(Strings.toString(getMappings("1.0.0"))))));

verify(client, times(1)).execute(any(PutMappingAction.class), any(PutMappingRequest.class), any());
}
Expand All @@ -259,10 +245,10 @@ public void testCanHandleIntegerMetaVersion() {
SystemIndices systemIndices = new SystemIndices(Map.of("MyIndex", FEATURE));
SystemIndexManager manager = new SystemIndexManager(systemIndices, client);

final ClusterState.Builder clusterStateBuilder = createClusterState(Strings.toString(getMappings(3)));
markShardsAvailable(clusterStateBuilder);

assertThat(manager.getUpgradeStatus(clusterStateBuilder.build(), DESCRIPTOR), equalTo(UpgradeStatus.NEEDS_MAPPINGS_UPDATE));
assertThat(
manager.getUpgradeStatus(markShardsAvailable(createClusterState(Strings.toString(getMappings(3)))), DESCRIPTOR),
equalTo(UpgradeStatus.NEEDS_MAPPINGS_UPDATE)
);
}

private static ClusterState.Builder createClusterState() {
Expand Down Expand Up @@ -294,12 +280,16 @@ private static ClusterState.Builder createClusterState(String mappings, int form
return ClusterState.builder(state()).metadata(metadataBuilder.build());
}

private void markShardsAvailable(ClusterState.Builder clusterStateBuilder) {
clusterStateBuilder.routingTable(buildIndexRoutingTable(DESCRIPTOR.getPrimaryIndex()));
private ClusterState markShardsAvailable(ClusterState.Builder clusterStateBuilder) {
final ClusterState cs = clusterStateBuilder.build();
return ClusterState.builder(cs)
.routingTable(buildIndexRoutingTable(cs.metadata().index(DESCRIPTOR.getPrimaryIndex()).getIndex()))
.build();
}

private void markShardsUnavailable(ClusterState.Builder clusterStateBuilder) {
final RoutingTable routingTable = buildIndexRoutingTable(DESCRIPTOR.getPrimaryIndex());
private ClusterState markShardsUnavailable(ClusterState.Builder clusterStateBuilder) {
final ClusterState cs = clusterStateBuilder.build();
final RoutingTable routingTable = buildIndexRoutingTable(cs.metadata().index(DESCRIPTOR.getPrimaryIndex()).getIndex());

Index prevIndex = routingTable.index(DESCRIPTOR.getPrimaryIndex()).getIndex();

Expand All @@ -321,7 +311,7 @@ private void markShardsUnavailable(ClusterState.Builder clusterStateBuilder) {
)
.build();

clusterStateBuilder.routingTable(unavailableRoutingTable);
return ClusterState.builder(cs).routingTable(unavailableRoutingTable).build();
}

private static ClusterState state() {
Expand Down Expand Up @@ -367,8 +357,7 @@ private static IndexMetadata.Builder getIndexMetadata(
return indexMetadata;
}

private static RoutingTable buildIndexRoutingTable(String indexName) {
Index index = new Index(indexName, UUID.randomUUID().toString());
private static RoutingTable buildIndexRoutingTable(Index index) {
ShardRouting shardRouting = ShardRouting.newUnassigned(
new ShardId(index, 0),
true,
Expand All @@ -382,8 +371,8 @@ private static RoutingTable buildIndexRoutingTable(String indexName) {
return RoutingTable.builder().add(IndexRoutingTable.builder(index).addIndexShard(table).build()).build();
}

private ClusterChangedEvent event(ClusterState.Builder clusterStateBuilder) {
return new ClusterChangedEvent("test-event", clusterStateBuilder.build(), EMPTY_CLUSTER_STATE);
private ClusterChangedEvent event(ClusterState clusterState) {
return new ClusterChangedEvent("test-event", clusterState, EMPTY_CLUSTER_STATE);
}

private static Settings getSettings() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.elasticsearch.xpack.core.security.index.RestrictedIndicesNames.SECURITY_MAIN_ALIAS;
Expand Down Expand Up @@ -268,7 +267,7 @@ private ClusterState getClusterStateWithSecurityIndex() {
metadata = SecurityTestUtils.addAliasToMetadata(metadata, securityIndexName);
}

Index index = new Index(securityIndexName, UUID.randomUUID().toString());
Index index = metadata.index(securityIndexName).getIndex();
ShardRouting shardRouting = ShardRouting.newUnassigned(new ShardId(index, 0), true,
RecoverySource.ExistingStoreRecoverySource.INSTANCE, new UnassignedInfo(Reason.INDEX_CREATED, ""));
IndexShardRoutingTable table = new IndexShardRoutingTable.Builder(new ShardId(index, 0))
Expand Down

0 comments on commit 6443571

Please sign in to comment.