Skip to content

Commit

Permalink
Optimize Iteration over IndexShardRoutingTable (#84987)
Browse files Browse the repository at this point in the history
Move this to array as well like in #84955. This comes with a measurable speedup to reroute
as we iterate lots of very small lists in nested loops here where the iterator overhead hurts.

relates #77466
  • Loading branch information
original-brownbear committed Mar 16, 2022
1 parent 3f6d460 commit f7dbca3
Show file tree
Hide file tree
Showing 53 changed files with 440 additions and 445 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -455,12 +455,12 @@ public void testCreateShrinkIndexFails() throws Exception {
assertBusy(() -> {
ClusterStateResponse clusterStateResponse = client().admin().cluster().prepareState().get();
RoutingTable routingTables = clusterStateResponse.getState().routingTable();
assertTrue(routingTables.index("target").shard(0).getShards().get(0).unassigned());
assertTrue(routingTables.index("target").shard(0).shard(0).unassigned());
assertEquals(
UnassignedInfo.Reason.ALLOCATION_FAILED,
routingTables.index("target").shard(0).getShards().get(0).unassignedInfo().getReason()
routingTables.index("target").shard(0).shard(0).unassignedInfo().getReason()
);
assertEquals(1, routingTables.index("target").shard(0).getShards().get(0).unassignedInfo().getNumFailedAllocations());
assertEquals(1, routingTables.index("target").shard(0).shard(0).unassignedInfo().getNumFailedAllocations());
});
client().admin()
.indices()
Expand All @@ -472,13 +472,7 @@ public void testCreateShrinkIndexFails() throws Exception {
refreshClusterInfo();
// kick off a retry and wait until it's done!
ClusterRerouteResponse clusterRerouteResponse = client().admin().cluster().prepareReroute().setRetryFailed(true).get();
long expectedShardSize = clusterRerouteResponse.getState()
.routingTable()
.index("target")
.shard(0)
.getShards()
.get(0)
.getExpectedShardSize();
long expectedShardSize = clusterRerouteResponse.getState().routingTable().index("target").shard(0).shard(0).getExpectedShardSize();
// we support the expected shard size in the allocator to sum up over the source index shards
assertTrue("expected shard size must be set but wasn't: " + expectedShardSize, expectedShardSize > 0);
ensureGreen();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
Expand All @@ -40,6 +41,7 @@
import org.elasticsearch.transport.TransportService;
import org.hamcrest.Matchers;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
Expand Down Expand Up @@ -210,7 +212,7 @@ public void testClusterInfoServiceInformationClearOnError() {
prepareCreate("test").setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)).get();
ensureGreen("test");

final List<ShardRouting> shardRoutings = client().admin()
final IndexShardRoutingTable indexShardRoutingTable = client().admin()
.cluster()
.prepareState()
.clear()
Expand All @@ -219,8 +221,11 @@ public void testClusterInfoServiceInformationClearOnError() {
.getState()
.getRoutingTable()
.index("test")
.shard(0)
.shards();
.shard(0);
final List<ShardRouting> shardRoutings = new ArrayList<>(indexShardRoutingTable.size());
for (int copy = 0; copy < indexShardRoutingTable.size(); copy++) {
shardRoutings.add(indexShardRoutingTable.shard(copy));
}

InternalTestCluster internalTestCluster = internalCluster();
InternalClusterInfoService infoService = (InternalClusterInfoService) internalTestCluster.getInstance(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,14 +309,16 @@ private IndexRoutingTable randomChangeToIndexRoutingTable(IndexRoutingTable orig
for (int i = 0; i < original.size(); i++) {
IndexShardRoutingTable indexShardRoutingTable = original.shard(i);
Set<String> availableNodes = Sets.newHashSet(nodes);
for (ShardRouting shardRouting : indexShardRoutingTable.shards()) {
for (int copy = 0; copy < indexShardRoutingTable.size(); copy++) {
ShardRouting shardRouting = indexShardRoutingTable.shard(copy);
availableNodes.remove(shardRouting.currentNodeId());
if (shardRouting.relocating()) {
availableNodes.remove(shardRouting.relocatingNodeId());
}
}

for (ShardRouting shardRouting : indexShardRoutingTable) {
for (int copy = 0; copy < indexShardRoutingTable.size(); copy++) {
ShardRouting shardRouting = indexShardRoutingTable.shard(copy);
final ShardRouting updatedShardRouting = randomChange(shardRouting, availableNodes);
availableNodes.remove(updatedShardRouting.currentNodeId());
if (shardRouting.relocating()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexMetadata.State;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.allocation.decider.AwarenessAllocationDecider;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -327,9 +327,10 @@ Map<String, Integer> computeShardCounts(ClusterState clusterState) {
Map<String, Integer> counts = new HashMap<>();

for (IndexRoutingTable indexRoutingTable : clusterState.routingTable()) {
for (int i = 0; i < indexRoutingTable.size(); i++) {
for (ShardRouting shardRouting : indexRoutingTable.shard(i)) {
counts.merge(clusterState.nodes().get(shardRouting.currentNodeId()).getName(), 1, Integer::sum);
for (int shardId = 0; shardId < indexRoutingTable.size(); shardId++) {
final IndexShardRoutingTable indexShardRoutingTable = indexRoutingTable.shard(shardId);
for (int copy = 0; copy < indexShardRoutingTable.size(); copy++) {
counts.merge(clusterState.nodes().get(indexShardRoutingTable.shard(copy).currentNodeId()).getName(), 1, Integer::sum);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.cluster.metadata.AutoExpandReplicas;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingNodesHelper;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
Expand Down Expand Up @@ -79,9 +80,10 @@ public void testDecommissionNodeNoReplicas() {
logger.info("--> verify all are allocated on node1 now");
ClusterState clusterState = client().admin().cluster().prepareState().execute().actionGet().getState();
for (IndexRoutingTable indexRoutingTable : clusterState.routingTable()) {
for (int i = 0; i < indexRoutingTable.size(); i++) {
for (ShardRouting shardRouting : indexRoutingTable.shard(i)) {
assertThat(clusterState.nodes().get(shardRouting.currentNodeId()).getName(), equalTo(node_0));
for (int shardId = 0; shardId < indexRoutingTable.size(); shardId++) {
final IndexShardRoutingTable indexShardRoutingTable = indexRoutingTable.shard(shardId);
for (int copy = 0; copy < indexShardRoutingTable.size(); copy++) {
assertThat(clusterState.nodes().get(indexShardRoutingTable.shard(copy).currentNodeId()).getName(), equalTo(node_0));
}
}
}
Expand Down Expand Up @@ -138,9 +140,10 @@ public void testAutoExpandReplicasToFilteredNodes() {
clusterState = client().admin().cluster().prepareState().execute().actionGet().getState();
assertThat(clusterState.metadata().index("test").getNumberOfReplicas(), equalTo(0));
for (IndexRoutingTable indexRoutingTable : clusterState.routingTable()) {
for (int i = 0; i < indexRoutingTable.size(); i++) {
for (ShardRouting shardRouting : indexRoutingTable.shard(i)) {
assertThat(clusterState.nodes().get(shardRouting.currentNodeId()).getName(), equalTo(node_0));
for (int shardId = 0; shardId < indexRoutingTable.size(); shardId++) {
final IndexShardRoutingTable indexShardRoutingTable = indexRoutingTable.shard(shardId);
for (int copy = 0; copy < indexShardRoutingTable.size(); copy++) {
assertThat(clusterState.nodes().get(indexShardRoutingTable.shard(copy).currentNodeId()).getName(), equalTo(node_0));
}
}
}
Expand Down Expand Up @@ -185,9 +188,10 @@ public void testDisablingAllocationFiltering() {
ClusterState clusterState = client().admin().cluster().prepareState().execute().actionGet().getState();
IndexRoutingTable indexRoutingTable = clusterState.routingTable().index("test");
int numShardsOnNode1 = 0;
for (int i = 0; i < indexRoutingTable.size(); i++) {
for (ShardRouting shardRouting : indexRoutingTable.shard(i)) {
if ("node1".equals(clusterState.nodes().get(shardRouting.currentNodeId()).getName())) {
for (int shardId = 0; shardId < indexRoutingTable.size(); shardId++) {
final IndexShardRoutingTable indexShardRoutingTable = indexRoutingTable.shard(shardId);
for (int copy = 0; copy < indexShardRoutingTable.size(); copy++) {
if ("node1".equals(clusterState.nodes().get(indexShardRoutingTable.shard(copy).currentNodeId()).getName())) {
numShardsOnNode1++;
}
}
Expand Down Expand Up @@ -216,9 +220,10 @@ public void testDisablingAllocationFiltering() {
logger.info("--> verify all shards are allocated on node_1 now");
clusterState = client().admin().cluster().prepareState().execute().actionGet().getState();
indexRoutingTable = clusterState.routingTable().index("test");
for (int i = 0; i < indexRoutingTable.size(); i++) {
for (ShardRouting shardRouting : indexRoutingTable.shard(i)) {
assertThat(clusterState.nodes().get(shardRouting.currentNodeId()).getName(), equalTo(node_1));
for (int shardId = 0; shardId < indexRoutingTable.size(); shardId++) {
final IndexShardRoutingTable indexShardRoutingTable = indexRoutingTable.shard(shardId);
for (int copy = 0; copy < indexShardRoutingTable.size(); copy++) {
assertThat(clusterState.nodes().get(indexShardRoutingTable.shard(copy).currentNodeId()).getName(), equalTo(node_1));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.cluster.InternalClusterInfoService;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings;
Expand Down Expand Up @@ -196,8 +197,10 @@ private Set<ShardRouting> getShardRoutings(final String nodeId, final String ind
.getState()
.getRoutingTable()
.index(indexName);
for (int i = 0; i < indexRoutingTable.size(); i++) {
for (ShardRouting shard : indexRoutingTable.shard(i).shards()) {
for (int shardId = 0; shardId < indexRoutingTable.size(); shardId++) {
final IndexShardRoutingTable shardRoutingTable = indexRoutingTable.shard(shardId);
for (int copy = 0; copy < shardRoutingTable.size(); copy++) {
ShardRouting shard = shardRoutingTable.shard(copy);
assertThat(shard.state(), equalTo(ShardRoutingState.STARTED));
if (shard.currentNodeId().equals(nodeId)) {
shardRoutings.add(shard);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RoutingNodesHelper;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
Expand Down Expand Up @@ -465,12 +466,7 @@ public static void ensureActivePeerRecoveryRetentionLeasesAdvanced(String indexN
final ClusterService clusterService = internalCluster().clusterService();
assertBusy(() -> {
Index index = resolveIndex(indexName);
Set<String> activeRetentionLeaseIds = clusterService.state()
.routingTable()
.index(index)
.shard(0)
.shards()
.stream()
Set<String> activeRetentionLeaseIds = RoutingNodesHelper.asStream(clusterService.state().routingTable().index(index).shard(0))
.map(shardRouting -> ReplicationTracker.getPeerRecoveryRetentionLeaseId(shardRouting.currentNodeId()))
.collect(Collectors.toSet());
for (String node : internalCluster().nodesInclude(indexName)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
Expand Down Expand Up @@ -498,8 +499,10 @@ public void testCorruptionOnNetworkLayer() throws ExecutionException, Interrupte
// ensure that no shard is actually allocated on the unlucky node
ClusterStateResponse clusterStateResponse = client().admin().cluster().prepareState().get();
final IndexRoutingTable indexRoutingTable = clusterStateResponse.getState().getRoutingTable().index("test");
for (int i = 0; i < indexRoutingTable.size(); i++) {
for (ShardRouting routing : indexRoutingTable.shard(i)) {
for (int shardId = 0; shardId < indexRoutingTable.size(); shardId++) {
final IndexShardRoutingTable indexShardRoutingTable = indexRoutingTable.shard(shardId);
for (int copy = 0; copy < indexShardRoutingTable.size(); copy++) {
final ShardRouting routing = indexShardRoutingTable.shard(copy);
if (unluckyNode.getNode().getId().equals(routing.currentNodeId())) {
assertThat(routing.state(), not(equalTo(ShardRoutingState.STARTED)));
assertThat(routing.state(), not(equalTo(ShardRoutingState.RELOCATING)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
Expand Down Expand Up @@ -391,7 +392,9 @@ private void iterateAssertCount(final int numberOfShards, final int iterations,
for (String id : ids) {
ShardId docShard = clusterService.operationRouting().shardId(state, "test", id, null);
if (docShard.id() == shard) {
for (ShardRouting shardRouting : state.routingTable().shardRoutingTable("test", shard)) {
final IndexShardRoutingTable indexShardRoutingTable = state.routingTable().shardRoutingTable("test", shard);
for (int copy = 0; copy < indexShardRoutingTable.size(); copy++) {
ShardRouting shardRouting = indexShardRoutingTable.shard(copy);
GetResponse response = client().prepareGet("test", id)
.setPreference("_only_nodes:" + shardRouting.currentNodeId())
.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ protected String checkActiveShardCount() {
return null;
} else {
final String resolvedShards = waitForActiveShards == ActiveShardCount.ALL
? Integer.toString(shardRoutingTable.shards().size())
? Integer.toString(shardRoutingTable.size())
: waitForActiveShards.toString();
logger.trace(
"[{}] not enough active copies to meet shard count of [{}] (have {}, needed {}), scheduling a retry. op [{}], "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -522,11 +522,11 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
for (IndexRoutingTable indexRoutingTable : routingTable()) {
builder.startObject(indexRoutingTable.getIndex().getName());
builder.startObject("shards");
for (int i = 0; i < indexRoutingTable.size(); i++) {
IndexShardRoutingTable indexShardRoutingTable = indexRoutingTable.shard(i);
for (int shardId = 0; shardId < indexRoutingTable.size(); shardId++) {
IndexShardRoutingTable indexShardRoutingTable = indexRoutingTable.shard(shardId);
builder.startArray(Integer.toString(indexShardRoutingTable.shardId().id()));
for (ShardRouting shardRouting : indexShardRoutingTable) {
shardRouting.toXContent(builder, params);
for (int copy = 0; copy < indexShardRoutingTable.size(); copy++) {
indexShardRoutingTable.shard(copy).toXContent(builder, params);
}
builder.endArray();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ public ClusterShardHealth(final int shardId, final IndexShardRoutingTable shardR
int computeRelocatingShards = 0;
int computeInitializingShards = 0;
int computeUnassignedShards = 0;
for (ShardRouting shardRouting : shardRoutingTable) {
for (int j = 0; j < shardRoutingTable.size(); j++) {
ShardRouting shardRouting = shardRoutingTable.shard(j);
if (shardRouting.active()) {
computeActiveShards++;
if (shardRouting.relocating()) {
Expand Down

0 comments on commit f7dbca3

Please sign in to comment.