Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.PlainShardIterator;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.Writeable;
Expand Down Expand Up @@ -107,7 +106,7 @@ protected ShardIterator shards(ClusterState state, InternalRequest request) {
if (iterator == null) {
return null;
}
return PlainShardIterator.allSearchableShards(iterator);
return ShardIterator.allSearchableShards(iterator);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.PlainShardIterator;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.Writeable;
Expand Down Expand Up @@ -111,7 +110,7 @@ protected ShardIterator shards(ClusterState state, InternalRequest request) {
if (iterator == null) {
return null;
}
return PlainShardIterator.allSearchableShards(iterator);
return ShardIterator.allSearchableShards(iterator);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
package org.elasticsearch.action.search;

import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.cluster.routing.PlainShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardsIterator;
import org.elasticsearch.common.util.Countable;
import org.elasticsearch.common.util.PlainIterator;
import org.elasticsearch.core.Nullable;
Expand All @@ -24,7 +24,7 @@
import java.util.Objects;

/**
* Extension of {@link PlainShardIterator} used in the search api, which also holds the {@link OriginalIndices}
* Iterator for shards used in the search api, which also holds the {@link OriginalIndices}
* of the search request (useful especially with cross-cluster search, as each cluster has its own set of original indices) as well as
* the cluster alias.
* @see OriginalIndices
Expand All @@ -42,7 +42,7 @@ public final class SearchShardIterator implements Comparable<SearchShardIterator
private final PlainIterator<String> targetNodesIterator;

/**
* Creates a {@link PlainShardIterator} instance that iterates over a subset of the given shards
* Creates a {@link SearchShardIterator} instance that iterates over a subset of the given shards
* this the a given <code>shardId</code>.
*
* @param clusterAlias the alias of the cluster where the shard is located
Expand All @@ -55,7 +55,7 @@ public SearchShardIterator(@Nullable String clusterAlias, ShardId shardId, List<
}

/**
* Creates a {@link PlainShardIterator} instance that iterates over a subset of the given shards
* Creates a {@link SearchShardIterator} instance that iterates over a subset of the given shards
*
* @param clusterAlias the alias of the cluster where the shard is located
* @param shardId shard id of the group
Expand Down Expand Up @@ -103,6 +103,9 @@ public String getClusterAlias() {
return clusterAlias;
}

/**
* Returns the next shard, or {@code null} if none available.
*/
SearchShardTarget nextOrNull() {
final String nodeId = targetNodesIterator.nextOrNull();
if (nodeId != null) {
Expand All @@ -111,6 +114,11 @@ SearchShardTarget nextOrNull() {
return null;
}

/**
* Return the number of shards remaining in this {@link ShardsIterator}
*
* @return number of shard remaining
*/
int remaining() {
return targetNodesIterator.remaining();
}
Expand All @@ -130,6 +138,9 @@ List<String> getTargetNodeIds() {
return targetNodesIterator.asList();
}

/**
* Resets the iterator to its initial state.
*/
void reset() {
targetNodesIterator.reset();
}
Expand All @@ -155,11 +166,19 @@ boolean prefiltered() {
return prefiltered;
}

/**
* The number of shard routing instances.
*
* @return number of shard routing instances in this iterator
*/
@Override
public int size() {
return targetNodesIterator.size();
}

/**
* The shard id this group relates to.
*/
ShardId shardId() {
return shardId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,11 +189,11 @@ public List<ShardRouting> unpromotableShards() {
}

public ShardIterator shardsRandomIt() {
return new PlainShardIterator(shardId, shuffler.shuffle(Arrays.asList(shards)));
return new ShardIterator(shardId, shuffler.shuffle(Arrays.asList(shards)));
}

public ShardIterator shardsIt(int seed) {
return new PlainShardIterator(shardId, shuffler.shuffle(Arrays.asList(shards), seed));
return new ShardIterator(shardId, shuffler.shuffle(Arrays.asList(shards), seed));
}

/**
Expand All @@ -210,12 +210,12 @@ public ShardIterator activeInitializingShardsRandomIt() {
*/
public ShardIterator activeInitializingShardsIt(int seed) {
if (allInitializingShards.isEmpty()) {
return new PlainShardIterator(shardId, shuffler.shuffle(activeShards, seed));
return new ShardIterator(shardId, shuffler.shuffle(activeShards, seed));
}
ArrayList<ShardRouting> ordered = new ArrayList<>(activeShards.size() + allInitializingShards.size());
ordered.addAll(shuffler.shuffle(activeShards, seed));
ordered.addAll(allInitializingShards);
return new PlainShardIterator(shardId, ordered);
return new ShardIterator(shardId, ordered);
}

/**
Expand All @@ -229,18 +229,15 @@ public ShardIterator activeInitializingShardsRankedIt(
) {
final int seed = shuffler.nextSeed();
if (allInitializingShards.isEmpty()) {
return new PlainShardIterator(
shardId,
rankShardsAndUpdateStats(shuffler.shuffle(activeShards, seed), collector, nodeSearchCounts)
);
return new ShardIterator(shardId, rankShardsAndUpdateStats(shuffler.shuffle(activeShards, seed), collector, nodeSearchCounts));
}

ArrayList<ShardRouting> ordered = new ArrayList<>(activeShards.size() + allInitializingShards.size());
List<ShardRouting> rankedActiveShards = rankShardsAndUpdateStats(shuffler.shuffle(activeShards, seed), collector, nodeSearchCounts);
ordered.addAll(rankedActiveShards);
List<ShardRouting> rankedInitializingShards = rankShardsAndUpdateStats(allInitializingShards, collector, nodeSearchCounts);
ordered.addAll(rankedInitializingShards);
return new PlainShardIterator(shardId, ordered);
return new ShardIterator(shardId, ordered);
}

private static Set<String> getAllNodeIds(final List<ShardRouting> shards) {
Expand Down Expand Up @@ -398,9 +395,9 @@ public int compare(ShardRouting s1, ShardRouting s2) {
*/
public ShardIterator primaryShardIt() {
if (primary != null) {
return new PlainShardIterator(shardId, Collections.singletonList(primary));
return new ShardIterator(shardId, Collections.singletonList(primary));
}
return new PlainShardIterator(shardId, Collections.emptyList());
return new ShardIterator(shardId, Collections.emptyList());
}

public ShardIterator onlyNodeActiveInitializingShardsIt(String nodeId) {
Expand All @@ -416,7 +413,7 @@ public ShardIterator onlyNodeActiveInitializingShardsIt(String nodeId) {
ordered.add(shardRouting);
}
}
return new PlainShardIterator(shardId, ordered);
return new ShardIterator(shardId, ordered);
}

public ShardIterator onlyNodeSelectorActiveInitializingShardsIt(String nodeAttributes, DiscoveryNodes discoveryNodes) {
Expand Down Expand Up @@ -451,7 +448,7 @@ public ShardIterator onlyNodeSelectorActiveInitializingShardsIt(String[] nodeAtt
);
throw new IllegalArgumentException(message);
}
return new PlainShardIterator(shardId, ordered);
return new ShardIterator(shardId, ordered);
}

public ShardIterator preferNodeActiveInitializingShardsIt(Set<String> nodeIds) {
Expand All @@ -469,7 +466,7 @@ public ShardIterator preferNodeActiveInitializingShardsIt(Set<String> nodeIds) {
if (allInitializingShards.isEmpty() == false) {
preferred.addAll(allInitializingShards);
}
return new PlainShardIterator(shardId, preferred);
return new ShardIterator(shardId, preferred);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public ShardIterator useOnlyPromotableShardsForStateless(ShardIterator shards) {
// If it is stateless, only route promotable shards. This is a temporary workaround until a more cohesive solution can be
// implemented for search shards.
if (isStateless && shards != null) {
return new PlainShardIterator(
return new ShardIterator(
shards.shardId(),
shards.getShardRoutings().stream().filter(ShardRouting::isPromotableToPrimary).collect(Collectors.toList())
);
Expand Down Expand Up @@ -126,10 +126,10 @@ public List<ShardIterator> searchShards(
nodeCounts
);
if (iterator != null) {
set.add(PlainShardIterator.allSearchableShards(iterator));
set.add(ShardIterator.allSearchableShards(iterator));
}
}
var res = new ArrayList<>(set);
List<ShardIterator> res = new ArrayList<>(set);
CollectionUtil.timSort(res);
return res;
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ private List<ShardIterator> allSatisfyingPredicateShardsGrouped(
if (predicate.test(shardRouting)) {
set.add(shardRouting.shardsIt());
} else if (includeEmpty) { // we need this for counting properly, just make it an empty one
set.add(new PlainShardIterator(shardRouting.shardId(), Collections.emptyList()));
set.add(new ShardIterator(shardRouting.shardId(), Collections.emptyList()));
}
}
}
Expand Down Expand Up @@ -301,7 +301,7 @@ public List<ShardIterator> activePrimaryShardsGrouped(String[] indices, boolean
if (primary.active()) {
set.add(primary.shardsIt());
} else if (includeEmpty) { // we need this for counting properly, just make it an empty one
set.add(new PlainShardIterator(primary.shardId(), Collections.emptyList()));
set.add(new ShardIterator(primary.shardId(), Collections.emptyList()));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,65 @@

import org.elasticsearch.index.shard.ShardId;

import java.util.ArrayList;
import java.util.List;

/**
* Allows to iterate over a set of shard instances (routing) within a shard id group.
* The {@link ShardIterator} is a {@link ShardsIterator} which iterates all
* shards of a given {@link ShardId shard id}
*/
public interface ShardIterator extends ShardsIterator, Comparable<ShardIterator> {
public final class ShardIterator extends PlainShardsIterator implements Comparable<ShardIterator> {

private final ShardId shardId;

public static ShardIterator allSearchableShards(ShardIterator shardIterator) {
return new ShardIterator(shardIterator.shardId(), shardsThatCanHandleSearches(shardIterator));
}

private static List<ShardRouting> shardsThatCanHandleSearches(ShardIterator iterator) {
final List<ShardRouting> shardsThatCanHandleSearches = new ArrayList<>(iterator.size());
for (ShardRouting shardRouting : iterator) {
if (shardRouting.isSearchable()) {
shardsThatCanHandleSearches.add(shardRouting);
}
}
return shardsThatCanHandleSearches;
}

/**
* The shard id this group relates to.
* Creates a {@link ShardIterator} instance that iterates all shards
* of a given <code>shardId</code>.
*
* @param shardId shard id of the group
* @param shards shards to iterate
*/
ShardId shardId();
public ShardIterator(ShardId shardId, List<ShardRouting> shards) {
super(shards);
this.shardId = shardId;
}

/**
* Resets the iterator.
* The shard id this group relates to.
*/
public ShardId shardId() {
return this.shardId;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ShardIterator that = (ShardIterator) o;
return shardId.equals(that.shardId());
}

@Override
public int hashCode() {
return shardId.hashCode();
}

@Override
void reset();
public int compareTo(ShardIterator o) {
return shardId.compareTo(o.shardId());
}
}
Loading