Skip to content

Commit

Permalink
Pull index routing into strategy object (#77211)
Browse files Browse the repository at this point in the history
* Pull index routing into strategy object

This pulls the calculation of the shard id for an (id, routing) pair
into a little strategy class, `IndexRouting`. This is easier to test and
should be easier to extend.

My hope is that this is an incremental readability improvement. My
ulterior motive is that this is where I want to land our new
routing-by-dimensions work for tsdb.

* WIP

* Switch build check
  • Loading branch information
nik9000 committed Sep 10, 2021
1 parent 95a8c80 commit b0b5cbd
Show file tree
Hide file tree
Showing 11 changed files with 561 additions and 392 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,13 @@
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MappingMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.routing.IndexRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexingPressure;
Expand Down Expand Up @@ -423,6 +424,7 @@ protected void doRun() {
Metadata metadata = clusterState.metadata();
// Group the requests by ShardId -> Operations mapping
Map<ShardId, List<BulkItemRequest>> requestsByShard = new HashMap<>();
Map<Index, IndexRouting> indexRoutings = new HashMap<>();
for (int i = 0; i < bulkRequest.requests.size(); i++) {
DocWriteRequest<?> docWriteRequest = bulkRequest.requests.get(i);
//the request can only be null because we set it to null in the previous step, so it gets ignored
Expand Down Expand Up @@ -474,8 +476,13 @@ protected void doRun() {
break;
default: throw new AssertionError("request type not supported: [" + docWriteRequest.opType() + "]");
}
ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex.getName(),
docWriteRequest.id(), docWriteRequest.routing()).shardId();
IndexRouting indexRouting = indexRoutings.computeIfAbsent(
concreteIndex,
idx -> IndexRouting.fromIndexMetadata(clusterState.metadata().getIndexSafe(idx))
);
ShardId shardId = clusterService.operationRouting()
.indexShards(clusterState, concreteIndex.getName(), indexRouting, docWriteRequest.id(), docWriteRequest.routing())
.shardId();
List<BulkItemRequest> shardRequests = requestsByShard.computeIfAbsent(shardId, shard -> new ArrayList<>());
shardRequests.add(new BulkItemRequest(i, docWriteRequest));
} catch (ElasticsearchParseException | IllegalArgumentException | RoutingMissingException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,21 @@
import org.elasticsearch.action.support.single.instance.TransportInstanceSingleOperationAction;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.routing.IndexRouting;
import org.elasticsearch.cluster.routing.PlainShardIterator;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.NotSerializableExceptionWrapper;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.VersionConflictEngineException;
Expand Down Expand Up @@ -157,8 +159,13 @@ protected ShardIterator shards(ClusterState clusterState, UpdateRequest request)
if (request.getShardId() != null) {
return clusterState.routingTable().index(request.concreteIndex()).shard(request.getShardId().getId()).primaryShardIt();
}
IndexMetadata indexMetadata = clusterState.metadata().index(request.concreteIndex());
if (indexMetadata == null) {
throw new IndexNotFoundException(request.concreteIndex());
}
IndexRouting indexRouting = IndexRouting.fromIndexMetadata(indexMetadata);
ShardIterator shardIterator = clusterService.operationRouting()
.indexShards(clusterState, request.concreteIndex(), request.id(), request.routing());
.indexShards(clusterState, request.concreteIndex(), indexRouting, request.id(), request.routing());
ShardRouting shard;
while ((shard = shardIterator.nextOrNull()) != null) {
if (shard.primary()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.carrotsearch.hppc.cursors.IntObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;

import org.elasticsearch.Assertions;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.indices.rollover.RolloverInfo;
Expand All @@ -22,15 +23,8 @@
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.node.DiscoveryNodeFilters;
import org.elasticsearch.cluster.routing.IndexRouting;
import org.elasticsearch.cluster.routing.allocation.IndexMetadataUpdater;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentParserUtils;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.common.collect.ImmutableOpenIntMap;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.collect.MapBuilder;
Expand All @@ -41,6 +35,14 @@
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentParserUtils;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.gateway.MetadataStateFormat;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.mapper.MapperService;
Expand Down Expand Up @@ -1623,7 +1625,7 @@ public IndexMetadata fromXContent(XContentParser parser) throws IOException {

/**
* Returns the number of shards that should be used for routing. This basically defines the hash space we use in
* {@link org.elasticsearch.cluster.routing.OperationRouting#generateShardId(IndexMetadata, String, String)} to route documents
* {@link IndexRouting#shardId(String, String)} to route documents
* to shards based on their ID or their specific routing value. The default value is {@link #getNumberOfShards()}. This value only
* changes if and index is shrunk.
*/
Expand Down Expand Up @@ -1736,7 +1738,7 @@ public static Set<ShardId> selectShrinkShards(int shardId, IndexMetadata sourceI
/**
* Returns the routing factor for and shrunk index with the given number of target shards.
* This factor is used in the hash function in
* {@link org.elasticsearch.cluster.routing.OperationRouting#generateShardId(IndexMetadata, String, String)} to guarantee consistent
* {@link IndexRouting#shardId(String, String)} to guarantee consistent
* hashing / routing of documents even if the number of shards changed (ie. a shrunk index).
*
* @param sourceNumberOfShards the total number of shards in the source index
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.cluster.routing;

import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.core.Nullable;

import java.util.function.IntConsumer;

/**
* Generates the shard id for {@code (id, routing)} pairs.
*/
public abstract class IndexRouting {
/**
* Build the routing from {@link IndexMetadata}.
*/
public static IndexRouting fromIndexMetadata(IndexMetadata indexMetadata) {
if (indexMetadata.isRoutingPartitionedIndex()) {
return new Partitioned(
indexMetadata.getRoutingNumShards(),
indexMetadata.getRoutingFactor(),
indexMetadata.getRoutingPartitionSize()
);
}
return new Unpartitioned(indexMetadata.getRoutingNumShards(), indexMetadata.getRoutingFactor());
}

private final int routingNumShards;
private final int routingFactor;

private IndexRouting(int routingNumShards, int routingFactor) {
this.routingNumShards = routingNumShards;
this.routingFactor = routingFactor;
}

/**
* Generate the single shard id that should contain a document with the
* provided {@code id} and {@code routing}.
*/
public abstract int shardId(String id, @Nullable String routing);

/**
* Collect all of the shard ids that *may* contain documents with the
* provided {@code routing}. Indices with a {@code routing_partition}
* will collect more than one shard. Indices without a partition
* will collect the same shard id as would be returned
* by {@link #shardId}.
*/
public abstract void collectSearchShards(String routing, IntConsumer consumer);

/**
* Convert a hash generated from an {@code (id, routing}) pair into a
* shard id.
*/
protected final int hashToShardId(int hash) {
return Math.floorMod(hash, routingNumShards) / routingFactor;
}

/**
* Convert a routing value into a hash.
*/
private static int effectiveRoutingToHash(String effectiveRouting) {
return Murmur3HashFunction.hash(effectiveRouting);
}

/**
* Strategy for indices that are not partitioned.
*/
private static class Unpartitioned extends IndexRouting {
Unpartitioned(int routingNumShards, int routingFactor) {
super(routingNumShards, routingFactor);
}

@Override
public int shardId(String id, @Nullable String routing) {
return hashToShardId(effectiveRoutingToHash(routing == null ? id : routing));
}

@Override
public void collectSearchShards(String routing, IntConsumer consumer) {
consumer.accept(hashToShardId(effectiveRoutingToHash(routing)));
}
}

/**
* Strategy for partitioned indices.
*/
private static class Partitioned extends IndexRouting {
private final int routingPartitionSize;

Partitioned(int routingNumShards, int routingFactor, int routingPartitionSize) {
super(routingNumShards, routingFactor);
this.routingPartitionSize = routingPartitionSize;
}

@Override
public int shardId(String id, @Nullable String routing) {
if (routing == null) {
throw new IllegalArgumentException("A routing value is required for gets from a partitioned index");
}
int offset = Math.floorMod(effectiveRoutingToHash(id), routingPartitionSize);
return hashToShardId(effectiveRoutingToHash(routing) + offset);
}

@Override
public void collectSearchShards(String routing, IntConsumer consumer) {
int hash = effectiveRoutingToHash(routing);
for (int i = 0; i < routingPartitionSize; i++) {
consumer.accept(hashToShardId(hash + i));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.node.ResponseCollectorService;
Expand Down Expand Up @@ -45,14 +45,30 @@ void setUseAdaptiveReplicaSelection(boolean useAdaptiveReplicaSelection) {
this.useAdaptiveReplicaSelection = useAdaptiveReplicaSelection;
}

public ShardIterator indexShards(ClusterState clusterState, String index, String id, @Nullable String routing) {
return shards(clusterState, index, id, routing).shardsIt();
public ShardIterator indexShards(
ClusterState clusterState,
String index,
IndexRouting indexRouting,
String id,
@Nullable String routing
) {
return shards(clusterState, index, indexRouting, id, routing).shardsIt();
}

/**
* Shards to use for a {@code GET} operation.
*/
public ShardIterator getShards(ClusterState clusterState, String index, String id, @Nullable String routing,
@Nullable String preference) {
return preferenceActiveShardIterator(shards(clusterState, index, id, routing), clusterState.nodes().getLocalNodeId(),
clusterState.nodes(), preference, null, null);
IndexRouting indexRouting = IndexRouting.fromIndexMetadata(indexMetadata(clusterState, index));
return preferenceActiveShardIterator(
shards(clusterState, index, indexRouting, id, routing),
clusterState.nodes().getLocalNodeId(),
clusterState.nodes(),
preference,
null,
null
);
}

public ShardIterator getShards(ClusterState clusterState, String index, int shardId, @Nullable String preference) {
Expand Down Expand Up @@ -100,18 +116,16 @@ private Set<IndexShardRoutingTable> computeTargetedShards(ClusterState clusterSt
final Set<IndexShardRoutingTable> set = new HashSet<>();
// we use set here and not list since we might get duplicates
for (String index : concreteIndices) {
final IndexRoutingTable indexRouting = indexRoutingTable(clusterState, index);
final IndexRoutingTable indexRoutingTable = indexRoutingTable(clusterState, index);
final IndexMetadata indexMetadata = indexMetadata(clusterState, index);
final Set<String> effectiveRouting = routing.get(index);
if (effectiveRouting != null) {
for (String r : effectiveRouting) {
final int routingPartitionSize = indexMetadata.getRoutingPartitionSize();
for (int partitionOffset = 0; partitionOffset < routingPartitionSize; partitionOffset++) {
set.add(RoutingTable.shardRoutingTable(indexRouting, calculateScaledShardId(indexMetadata, r, partitionOffset)));
}
final Set<String> indexSearchRouting = routing.get(index);
if (indexSearchRouting != null) {
IndexRouting indexRouting = IndexRouting.fromIndexMetadata(indexMetadata);
for (String r : indexSearchRouting) {
indexRouting.collectSearchShards(r, s -> set.add(RoutingTable.shardRoutingTable(indexRoutingTable, s)));
}
} else {
for (IndexShardRoutingTable indexShard : indexRouting) {
for (IndexShardRoutingTable indexShard : indexRoutingTable) {
set.add(indexShard);
}
}
Expand Down Expand Up @@ -198,51 +212,20 @@ protected IndexRoutingTable indexRoutingTable(ClusterState clusterState, String
return indexRouting;
}

protected IndexMetadata indexMetadata(ClusterState clusterState, String index) {
private IndexMetadata indexMetadata(ClusterState clusterState, String index) {
IndexMetadata indexMetadata = clusterState.metadata().index(index);
if (indexMetadata == null) {
throw new IndexNotFoundException(index);
}
return indexMetadata;
}

protected IndexShardRoutingTable shards(ClusterState clusterState, String index, String id, String routing) {
int shardId = generateShardId(indexMetadata(clusterState, index), id, routing);
return clusterState.getRoutingTable().shardRoutingTable(index, shardId);
private IndexShardRoutingTable shards(ClusterState clusterState, String index, IndexRouting indexRouting, String id, String routing) {
return clusterState.getRoutingTable().shardRoutingTable(index, indexRouting.shardId(id, routing));
}

public ShardId shardId(ClusterState clusterState, String index, String id, @Nullable String routing) {
IndexMetadata indexMetadata = indexMetadata(clusterState, index);
return new ShardId(indexMetadata.getIndex(), generateShardId(indexMetadata, id, routing));
}

public static int generateShardId(IndexMetadata indexMetadata, @Nullable String id, @Nullable String routing) {
final String effectiveRouting;
final int partitionOffset;

if (routing == null) {
assert(indexMetadata.isRoutingPartitionedIndex() == false) : "A routing value is required for gets from a partitioned index";
effectiveRouting = id;
} else {
effectiveRouting = routing;
}

if (indexMetadata.isRoutingPartitionedIndex()) {
partitionOffset = Math.floorMod(Murmur3HashFunction.hash(id), indexMetadata.getRoutingPartitionSize());
} else {
// we would have still got 0 above but this check just saves us an unnecessary hash calculation
partitionOffset = 0;
}

return calculateScaledShardId(indexMetadata, effectiveRouting, partitionOffset);
return new ShardId(indexMetadata.getIndex(), IndexRouting.fromIndexMetadata(indexMetadata).shardId(id, routing));
}

private static int calculateScaledShardId(IndexMetadata indexMetadata, String effectiveRouting, int partitionOffset) {
final int hash = Murmur3HashFunction.hash(effectiveRouting) + partitionOffset;

// we don't use IMD#getNumberOfShards since the index might have been shrunk such that we need to use the size
// of original index to hash documents
return Math.floorMod(hash, indexMetadata.getRoutingNumShards()) / indexMetadata.getRoutingFactor();
}

}

0 comments on commit b0b5cbd

Please sign in to comment.