diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 9c509123ff7..d8f8453e8d3 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -183,7 +183,9 @@ when told to. The admin UI now tells it to. (Nazerke Seidan, David Smiley) * SOLR-15834: Films example readme needs updating, including useParams support for multiple algorithms. (Eric Pugh) -* SOLR-15824 Improved Query Screen handling of raw query parameters. (Betul Ince via Tim Potter, Eric Pugh) +* SOLR-15824 Improved Query Screen handling of raw query parameters. (Betul Ince via Tim Potter, Eric Pugh) + +* SOLR-15803: Compute concurrent replica assignment requests together, using the shared context to better distribute replicas. (Houston Putman) Build --------------------- diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java index 64c10a498dc..19aa9b9de38 100644 --- a/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java +++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java @@ -347,7 +347,7 @@ public static List buildReplicaPositions(SolrCloudManager cloud int i = 0; for (Map.Entry entry : replicaTypeVsCount.entrySet()) { for (int j = 0; j < entry.getValue(); j++) { - positions.add(new ReplicaPosition(sliceName, i++, entry.getKey(), node)); + positions.add(new ReplicaPosition(collectionName, sliceName, i++, entry.getKey(), node)); } } } diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/Assign.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/Assign.java index c6702ef96d7..4dd2dbba24e 100644 --- a/solr/core/src/java/org/apache/solr/cloud/api/collections/Assign.java +++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/Assign.java @@ -22,10 +22,12 @@ import java.io.IOException; import java.lang.invoke.MethodHandles; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedHashSet; import java.util.List; import java.util.Locale; @@ -251,15 +253,20 @@ public static List getLiveOrLiveAndCreateNodeSetList(final Set l static class ReplicaCount { public final String nodeName; - public int thisCollectionNodes = 0; - public int totalNodes = 0; + public Map collectionReplicas; + public int totalReplicas = 0; ReplicaCount(String nodeName) { this.nodeName = nodeName; + this.collectionReplicas = new HashMap<>(); } - public int weight() { - return (thisCollectionNodes * 100) + totalNodes; + public int weight(String collection) { + return (collectionReplicas.getOrDefault(collection, 0) * 100) + totalReplicas; + } + + public String nodeName() { + return nodeName; } } @@ -274,8 +281,7 @@ public static List getNodesForNewReplicas(ClusterState clusterS CoreContainer coreContainer) throws IOException, InterruptedException, AssignmentException { log.debug("getNodesForNewReplicas() shard: {} , nrtReplicas : {} , tlogReplicas: {} , pullReplicas: {} , createNodeSet {}" , shard, nrtReplicas, tlogReplicas, pullReplicas, createNodeSet); - DocCollection coll = clusterState.getCollection(collectionName); - List createNodeList = null; + List createNodeList; if (createNodeSet instanceof List) { createNodeList = (List) createNodeSet; @@ -297,28 +303,18 @@ public static List getNodesForNewReplicas(ClusterState clusterS .assignPullReplicas(pullReplicas) .onNodes(createNodeList) .build(); - AssignStrategy assignStrategy = createAssignStrategy(coreContainer, clusterState, coll); + AssignStrategy assignStrategy = createAssignStrategy(coreContainer); return assignStrategy.assign(cloudManager, assignRequest); } - static HashMap getNodeNameVsShardCount(String collectionName, - ClusterState clusterState, List createNodeList) { - HashMap nodeNameVsShardCount = new HashMap<>(); - List liveNodes = createNodeList == null || createNodeList.isEmpty() ? - new ArrayList<>(clusterState.getLiveNodes()) : - checkLiveNodes(createNodeList, clusterState); + static void addNodeNameVsShardCount(ClusterState clusterState, HashMap nodeNameVsShardCount) { + Collection liveNodes = clusterState.getLiveNodes(); for (String s : liveNodes) { - nodeNameVsShardCount.put(s, new ReplicaCount(s)); - } - - // if we were given a list, just use that, don't worry about counts - if (createNodeList != null) { - return nodeNameVsShardCount; + nodeNameVsShardCount.putIfAbsent(s, new ReplicaCount(s)); } // if we get here we were not given a createNodeList, build a map with real counts. - DocCollection coll = clusterState.getCollection(collectionName); Map collections = clusterState.getCollectionsMap(); for (Map.Entry entry : collections.entrySet()) { DocCollection c = entry.getValue(); @@ -328,16 +324,13 @@ static HashMap getNodeNameVsShardCount(String collectionNa for (Replica replica : replicas) { ReplicaCount count = nodeNameVsShardCount.get(replica.getNodeName()); if (count != null) { - count.totalNodes++; // Used to "weigh" whether this node should be used later. - if (entry.getKey().equals(collectionName)) { - count.thisCollectionNodes++; - } + // Used to "weigh" whether this node should be used later. + count.collectionReplicas.merge(entry.getKey(), 1, Integer::sum); + count.totalReplicas++; } } } } - - return nodeNameVsShardCount; } // throw an exception if any node in the supplied list is not live. @@ -407,12 +400,32 @@ public interface AssignStrategy { /** * Assign new replicas to nodes. + * If multiple {@link AssignRequest}s are provided, then every {@link ReplicaPosition} made for an + * {@link AssignRequest} will be applied to the {@link SolrCloudManager}'s state when processing subsequent {@link AssignRequest}s. + * Therefore, the order in which {@link AssignRequest}s are provided can and will affect the {@link ReplicaPosition}s returned. + * * @param solrCloudManager current instance of {@link SolrCloudManager}. - * @param assignRequest assign request. + * @param assignRequests assign request. * @return list of {@link ReplicaPosition}-s for new replicas. * @throws AssignmentException when assignment request cannot produce any valid assignments. */ - List assign(SolrCloudManager solrCloudManager, AssignRequest assignRequest) + default List assign(SolrCloudManager solrCloudManager, AssignRequest... assignRequests) + throws AssignmentException, IOException, InterruptedException { + return assign(solrCloudManager, Arrays.asList(assignRequests)); + } + + /** + * Assign new replicas to nodes. + * If multiple {@link AssignRequest}s are provided, then every {@link ReplicaPosition} made for an + * {@link AssignRequest} will be applied to the {@link SolrCloudManager}'s state when processing subsequent {@link AssignRequest}s. + * Therefore, the order in which {@link AssignRequest}s are provided can and will affect the {@link ReplicaPosition}s returned. + * + * @param solrCloudManager current instance of {@link SolrCloudManager}. + * @param assignRequests list of assign requests to process together (). + * @return list of {@link ReplicaPosition}-s for new replicas. + * @throws AssignmentException when assignment request cannot produce any valid assignments. + */ + List assign(SolrCloudManager solrCloudManager, List assignRequests) throws AssignmentException, IOException, InterruptedException; /** @@ -508,32 +521,45 @@ public AssignRequest build() { public static class LegacyAssignStrategy implements AssignStrategy { @Override - public List assign(SolrCloudManager solrCloudManager, AssignRequest assignRequest) throws Assign.AssignmentException, IOException, InterruptedException { + public List assign(SolrCloudManager solrCloudManager, List assignRequests) throws Assign.AssignmentException, IOException, InterruptedException { ClusterState clusterState = solrCloudManager.getClusterStateProvider().getClusterState(); - List nodeList = assignRequest.nodes; // can this be empty list? - - if (nodeList == null || nodeList.isEmpty()) { - HashMap nodeNameVsShardCount = - Assign.getNodeNameVsShardCount(assignRequest.collectionName, clusterState, nodeList); - // if nodelist was empty, this map will be empty too. (passing null above however gets a full map) - ArrayList sortedNodeList = new ArrayList<>(nodeNameVsShardCount.values()); - sortedNodeList.sort(Comparator.comparingInt(Assign.ReplicaCount::weight)); - nodeList = sortedNodeList.stream().map(replicaCount -> replicaCount.nodeName).collect(Collectors.toList()); - } - - // Throw an error if there aren't any live nodes. - checkAnyLiveNodes(nodeList, solrCloudManager.getClusterStateProvider().getClusterState()); - int i = 0; List result = new ArrayList<>(); - for (String aShard : assignRequest.shardNames) { - for (Map.Entry e : countsPerReplicaType(assignRequest).entrySet()) { - for (int j = 0; j < e.getValue(); j++) { - result.add(new ReplicaPosition(aShard, j, e.getKey(), nodeList.get(i % nodeList.size()))); - i++; + + HashMap nodeNameVsShardCount = new HashMap<>(); + addNodeNameVsShardCount(clusterState, nodeNameVsShardCount); + for (AssignRequest assignRequest : assignRequests) { + Collection replicaCounts = nodeNameVsShardCount.values(); + + if (assignRequest.nodes != null && !assignRequest.nodes.isEmpty()) { + // Throw an error if there are any non-live nodes. + checkLiveNodes(assignRequest.nodes, clusterState); + HashSet nodeSet = new HashSet<>(assignRequest.nodes); + replicaCounts = replicaCounts.stream().filter(rc -> nodeSet.contains(rc.nodeName)).collect(Collectors.toList()); + } else if (nodeNameVsShardCount.values().isEmpty()) { + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "There are no live nodes in the cluster"); + } + + for (String aShard : assignRequest.shardNames) { + // Reset the ordering of the nodes for each shard, using the replicas added in the previous shards and assign requests + List nodeList = replicaCounts.stream() + .sorted(Comparator.comparingInt(rc -> rc.weight(assignRequest.collectionName)).thenComparing(ReplicaCount::nodeName)) + .map(ReplicaCount::nodeName) + .collect(Collectors.toList()); + int i = 0; + for (Map.Entry e : countsPerReplicaType(assignRequest).entrySet()) { + for (int j = 0; j < e.getValue(); j++) { + String assignedNode = nodeList.get(i % nodeList.size()); + result.add(new ReplicaPosition(assignRequest.collectionName, aShard, j, e.getKey(), assignedNode)); + i++; + ReplicaCount replicaCount = nodeNameVsShardCount.computeIfAbsent(assignedNode, ReplicaCount::new); + replicaCount.totalReplicas++; + replicaCount.collectionReplicas.merge(assignRequest.collectionName, 1, Integer::sum); + } } } } + return result; } @@ -553,11 +579,11 @@ private ImmutableMap countsPerReplicaType(AssignRequest a *

If {@link PlacementPlugin} instance is null this call will return {@link LegacyAssignStrategy}, otherwise * {@link PlacementPluginAssignStrategy} will be used.

*/ - public static AssignStrategy createAssignStrategy(CoreContainer coreContainer, ClusterState clusterState, DocCollection collection) { + public static AssignStrategy createAssignStrategy(CoreContainer coreContainer) { PlacementPlugin placementPlugin = coreContainer.getPlacementPluginFactory().createPluginInstance(); if (placementPlugin != null) { // If a cluster wide placement plugin is configured (and that's the only way to define a placement plugin) - return new PlacementPluginAssignStrategy(collection, placementPlugin); + return new PlacementPluginAssignStrategy(placementPlugin); } else { return new LegacyAssignStrategy(); } diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java index 66c21b6173e..b7a0c978625 100644 --- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java +++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java @@ -196,8 +196,7 @@ public void call(ClusterState clusterState, ZkNodeProps message, NamedList replicaPositions; try { - replicaPositions = buildReplicaPositions(ccc.getCoreContainer(), ccc.getSolrCloudManager(), clusterState, newColl, - message, shardNames); + replicaPositions = buildReplicaPositions(ccc.getCoreContainer(), ccc.getSolrCloudManager(), clusterState, message, shardNames); } catch (Assign.AssignmentException e) { ZkNodeProps deleteMessage = new ZkNodeProps("name", collectionName); new DeleteCollectionCmd(ccc).call(clusterState, deleteMessage, results); @@ -389,7 +388,6 @@ public void call(ClusterState clusterState, ZkNodeProps message, NamedList buildReplicaPositions(CoreContainer coreContainer, SolrCloudManager cloudManager, ClusterState clusterState, - DocCollection docCollection, ZkNodeProps message, List shardNames) throws IOException, InterruptedException, Assign.AssignmentException { final String collectionName = message.getStr(NAME); @@ -430,7 +428,7 @@ private static List buildReplicaPositions(CoreContainer coreCon .assignPullReplicas(numPullReplicas) .onNodes(nodeList) .build(); - Assign.AssignStrategy assignStrategy = Assign.createAssignStrategy(coreContainer, clusterState, docCollection); + Assign.AssignStrategy assignStrategy = Assign.createAssignStrategy(coreContainer); replicaPositions = assignStrategy.assign(cloudManager, assignRequest); } return replicaPositions; diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java index d1ae965a1d4..db96fa8771b 100644 --- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java +++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java @@ -93,7 +93,7 @@ public void call(ClusterState state, ZkNodeProps message, NamedList resu // verify the placement modifications caused by the deletion are allowed DocCollection coll = state.getCollectionOrNull(collection); if (coll != null) { - Assign.AssignStrategy assignStrategy = Assign.createAssignStrategy(ccc.getCoreContainer(), state, coll); + Assign.AssignStrategy assignStrategy = Assign.createAssignStrategy(ccc.getCoreContainer()); assignStrategy.verifyDeleteCollection(ccc.getSolrCloudManager(), coll); } diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java index 7c020c94091..f5e2eb8078f 100644 --- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java +++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java @@ -101,7 +101,7 @@ void deleteReplica(ClusterState clusterState, ZkNodeProps message, NamedList> entry : shardToReplicasMapping.entrySet()) { Slice shardSlice = entry.getKey(); String shardId = shardSlice.getName(); @@ -163,7 +163,7 @@ void deleteReplicaBasedOnCount(ClusterState clusterState, for (String replica: replicas) { log.debug("Deleting replica {} for shard {} based on count {}", replica, shardId, count); // don't verify with the placement plugin - we already did it - deleteCore(clusterState, coll, shardId, replica, message, results, onComplete, parallel, false); + deleteCore(coll, shardId, replica, message, results, onComplete, parallel, false); } results.add("shard_id", shardId); results.add("replicas_deleted", replicas); @@ -220,7 +220,7 @@ private void validateReplicaAvailability(Slice slice, String shard, String colle } } - void deleteCore(ClusterState clusterState, DocCollection coll, + void deleteCore(DocCollection coll, String shardId, String replicaName, ZkNodeProps message, @@ -249,7 +249,7 @@ void deleteCore(ClusterState clusterState, DocCollection coll, // verify that we are allowed to delete this replica if (verifyPlacement) { - Assign.AssignStrategy assignStrategy = Assign.createAssignStrategy(ccc.getCoreContainer(), clusterState, coll); + Assign.AssignStrategy assignStrategy = Assign.createAssignStrategy(ccc.getCoreContainer()); assignStrategy.verifyDeleteReplicas(ccc.getSolrCloudManager(), coll, shardId, Set.of(replica)); } diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/ReplaceNodeCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/ReplaceNodeCmd.java index 1d9307d40b4..7cd8dd7f8c6 100644 --- a/solr/core/src/java/org/apache/solr/cloud/api/collections/ReplaceNodeCmd.java +++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/ReplaceNodeCmd.java @@ -36,6 +36,7 @@ import org.apache.solr.common.cloud.CollectionStateWatcher; import org.apache.solr.common.cloud.DocCollection; import org.apache.solr.common.cloud.Replica; +import org.apache.solr.common.cloud.ReplicaPosition; import org.apache.solr.common.cloud.Slice; import org.apache.solr.common.cloud.ZkNodeProps; import org.apache.solr.common.cloud.ZkStateReader; @@ -102,29 +103,40 @@ public void call(ClusterState state, ZkNodeProps message, NamedList resu SolrCloseableLatch replicasToRecover = new SolrCloseableLatch(numLeaders, ccc.getCloseableToLatchOn()); - for (ZkNodeProps sourceReplica : sourceReplicas) { - String sourceCollection = sourceReplica.getStr(COLLECTION_PROP); - if (log.isInfoEnabled()) { - log.info("Going to create replica for collection={} shard={} on node={}", sourceCollection, sourceReplica.getStr(SHARD_ID_PROP), target); - } - String targetNode = target; - if (targetNode == null) { + List replicaPositions = null; + if (target == null || target.isEmpty()) { + List assignRequests = new ArrayList<>(sourceReplicas.size()); + for (ZkNodeProps sourceReplica : sourceReplicas) { Replica.Type replicaType = Replica.Type.get(sourceReplica.getStr(ZkStateReader.REPLICA_TYPE)); int numNrtReplicas = replicaType == Replica.Type.NRT ? 1 : 0; int numTlogReplicas = replicaType == Replica.Type.TLOG ? 1 : 0; int numPullReplicas = replicaType == Replica.Type.PULL ? 1 : 0; Assign.AssignRequest assignRequest = new Assign.AssignRequestBuilder() - .forCollection(sourceCollection) + .forCollection(sourceReplica.getStr(COLLECTION_PROP)) .forShard(Collections.singletonList(sourceReplica.getStr(SHARD_ID_PROP))) .assignNrtReplicas(numNrtReplicas) .assignTlogReplicas(numTlogReplicas) .assignPullReplicas(numPullReplicas) .onNodes(ccc.getSolrCloudManager().getClusterStateProvider().getLiveNodes().stream().filter(node -> !node.equals(source)).collect(Collectors.toList())) .build(); - Assign.AssignStrategy assignStrategy = Assign.createAssignStrategy( - ccc.getCoreContainer(), - clusterState, clusterState.getCollection(sourceCollection)); - targetNode = assignStrategy.assign(ccc.getSolrCloudManager(), assignRequest).get(0).node; + assignRequests.add(assignRequest); + } + Assign.AssignStrategy assignStrategy = Assign.createAssignStrategy(ccc.getCoreContainer()); + replicaPositions = assignStrategy.assign(ccc.getSolrCloudManager(), assignRequests); + } + int replicaPositionIdx = 0; + for (ZkNodeProps sourceReplica : sourceReplicas) { + String sourceCollection = sourceReplica.getStr(COLLECTION_PROP); + if (log.isInfoEnabled()) { + log.info("Going to create replica for collection={} shard={} on node={}", sourceCollection, sourceReplica.getStr(SHARD_ID_PROP), target); + } + String targetNode; + // Use the assigned replica positions, if target is null or empty (checked above) + if (replicaPositions != null) { + targetNode = replicaPositions.get(replicaPositionIdx).node; + replicaPositionIdx++; + } else { + targetNode = target; } ZkNodeProps msg = sourceReplica.plus("parallel", String.valueOf(parallel)).plus(CoreAdminParams.NODE, targetNode); if (async != null) msg.getProperties().put(ASYNC, async); diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/RestoreCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/RestoreCmd.java index 2732cb9816a..3ff38ae3b74 100644 --- a/solr/core/src/java/org/apache/solr/cloud/api/collections/RestoreCmd.java +++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/RestoreCmd.java @@ -246,7 +246,7 @@ public void process(NamedList results, RestoreContext rc) throws Excepti List sliceNames = new ArrayList<>(); restoreCollection.getSlices().forEach(x -> sliceNames.add(x.getName())); - List replicaPositions = getReplicaPositions(restoreCollection, rc.nodeList, clusterState, sliceNames); + List replicaPositions = getReplicaPositions(rc.restoreCollectionName, rc.nodeList, sliceNames); createSingleReplicaPerShard(results, restoreCollection, rc.asyncId, clusterState, replicaPositions); Object failures = results.get("failure"); @@ -353,18 +353,16 @@ private void markAllShardsAsConstruction(DocCollection restoreCollection) throws } } - private List getReplicaPositions(DocCollection restoreCollection, List nodeList, ClusterState clusterState, List sliceNames) throws IOException, InterruptedException { + private List getReplicaPositions(String restoreCollection, List nodeList, List sliceNames) throws IOException, InterruptedException { Assign.AssignRequest assignRequest = new Assign.AssignRequestBuilder() - .forCollection(restoreCollection.getName()) + .forCollection(restoreCollection) .forShard(sliceNames) .assignNrtReplicas(numNrtReplicas) .assignTlogReplicas(numTlogReplicas) .assignPullReplicas(numPullReplicas) .onNodes(nodeList) .build(); - Assign.AssignStrategy assignStrategy = Assign.createAssignStrategy( - ccc.getCoreContainer(), - clusterState, restoreCollection); + Assign.AssignStrategy assignStrategy = Assign.createAssignStrategy(ccc.getCoreContainer()); return assignStrategy.assign(ccc.getSolrCloudManager(), assignRequest); } diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java index 602167a4c6a..043862d9b0c 100644 --- a/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java +++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java @@ -438,9 +438,7 @@ public boolean split(ClusterState clusterState, ZkNodeProps message, NamedList(clusterState.getLiveNodes())) .build(); - Assign.AssignStrategy assignStrategy = Assign.createAssignStrategy( - ccc.getCoreContainer(), - clusterState, collection); + Assign.AssignStrategy assignStrategy = Assign.createAssignStrategy(ccc.getCoreContainer()); List replicaPositions = assignStrategy.assign(ccc.getSolrCloudManager(), assignRequest); t.stop(); diff --git a/solr/core/src/java/org/apache/solr/cluster/events/impl/CollectionsRepairEventListener.java b/solr/core/src/java/org/apache/solr/cluster/events/impl/CollectionsRepairEventListener.java index be4533bc190..7f3fb2105b7 100644 --- a/solr/core/src/java/org/apache/solr/cluster/events/impl/CollectionsRepairEventListener.java +++ b/solr/core/src/java/org/apache/solr/cluster/events/impl/CollectionsRepairEventListener.java @@ -167,7 +167,7 @@ private void runRepair() { .incrementAndGet(); } }); - Assign.AssignStrategy assignStrategy = Assign.createAssignStrategy(cc, clusterState, coll); + Assign.AssignStrategy assignStrategy = Assign.createAssignStrategy(cc); lostReplicas.forEach((shard, types) -> { Assign.AssignRequestBuilder assignRequestBuilder = new Assign.AssignRequestBuilder() .forCollection(coll.getName()) diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/PlacementPlugin.java b/solr/core/src/java/org/apache/solr/cluster/placement/PlacementPlugin.java index 97d195716e1..b996795c7e4 100644 --- a/solr/core/src/java/org/apache/solr/cluster/placement/PlacementPlugin.java +++ b/solr/core/src/java/org/apache/solr/cluster/placement/PlacementPlugin.java @@ -17,6 +17,10 @@ package org.apache.solr.cluster.placement; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + /** *

Implemented by external plugins to control replica placement and movement on the search cluster (as well as other things * such as cluster elasticity?) when cluster changes are required (initiated elsewhere, most likely following a Collection @@ -37,7 +41,29 @@ public interface PlacementPlugin { * @param placementRequest request for placing new replicas or moving existing replicas on the cluster. * @return plan satisfying the placement request. */ - PlacementPlan computePlacement(PlacementRequest placementRequest, PlacementContext placementContext) throws PlacementException, InterruptedException; + default PlacementPlan computePlacement(PlacementRequest placementRequest, PlacementContext placementContext) throws PlacementException, InterruptedException { + List placementPlans = computePlacements(Collections.singletonList(placementRequest), placementContext); + if (placementPlans == null || placementPlans.isEmpty()) { + return null; + } else { + return placementPlans.get(0); + } + } + + /** + *

Request from plugin code to compute multiple placements. + * If multiple placements are requested, then the {@link PlacementPlan} computed for each {@link PlacementRequest} + * will be used to affect the starting state for each subsequent {@link PlacementRequest} in the list. + * This means that each {@link PlacementRequest} is computed in the context of the previous + * {@link PlacementRequest}'s already having been implemented. + * Note this method must be reentrant as a plugin instance may (read will) get multiple such calls in parallel. + * + *

Configuration is passed upon creation of a new instance of this class by {@link PlacementPluginFactory#createPluginInstance}. + * + * @param placementRequests requests for placing new replicas or moving existing replicas on the cluster. + * @return plan satisfying all placement requests. + */ + List computePlacements(Collection placementRequests, PlacementContext placementContext) throws PlacementException, InterruptedException; /** * Verify that a collection layout modification doesn't violate constraints on replica placements diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/impl/PlacementPluginAssignStrategy.java b/solr/core/src/java/org/apache/solr/cluster/placement/impl/PlacementPluginAssignStrategy.java index 32abe84afd0..232539f747a 100644 --- a/solr/core/src/java/org/apache/solr/cluster/placement/impl/PlacementPluginAssignStrategy.java +++ b/solr/core/src/java/org/apache/solr/cluster/placement/impl/PlacementPluginAssignStrategy.java @@ -18,18 +18,19 @@ package org.apache.solr.cluster.placement.impl; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.Set; import org.apache.solr.client.solrj.cloud.SolrCloudManager; import org.apache.solr.cloud.api.collections.Assign; -import org.apache.solr.cluster.SolrCollection; import org.apache.solr.cluster.placement.DeleteCollectionRequest; import org.apache.solr.cluster.placement.DeleteReplicasRequest; import org.apache.solr.cluster.placement.PlacementContext; import org.apache.solr.cluster.placement.PlacementException; import org.apache.solr.cluster.placement.PlacementPlugin; import org.apache.solr.cluster.placement.PlacementPlan; +import org.apache.solr.cluster.placement.PlacementRequest; import org.apache.solr.common.cloud.DocCollection; import org.apache.solr.common.cloud.Replica; import org.apache.solr.common.cloud.ReplicaPosition; @@ -40,34 +41,34 @@ public class PlacementPluginAssignStrategy implements Assign.AssignStrategy { private final PlacementPlugin plugin; - private final DocCollection collection; - /** - * @param collection the collection for which this assign request is done. In theory would be better to pass it into the - * {@link #assign} call below (which would allow reusing instances of {@link PlacementPluginAssignStrategy}, - * but for now doing it here in order not to change the other Assign.AssignStrategy implementations. - */ - public PlacementPluginAssignStrategy(DocCollection collection, PlacementPlugin plugin) { - this.collection = collection; + public PlacementPluginAssignStrategy(PlacementPlugin plugin) { this.plugin = plugin; } - public List assign(SolrCloudManager solrCloudManager, Assign.AssignRequest assignRequest) + public List assign(SolrCloudManager solrCloudManager, List assignRequests) throws Assign.AssignmentException, IOException, InterruptedException { PlacementContext placementContext = new SimplePlacementContextImpl(solrCloudManager); - SolrCollection solrCollection = placementContext.getCluster().getCollection(collection.getName()); - PlacementRequestImpl placementRequest = PlacementRequestImpl.toPlacementRequest(placementContext.getCluster(), solrCollection, assignRequest); + List placementRequests = new ArrayList<>(assignRequests.size()); + for (Assign.AssignRequest assignRequest : assignRequests) { + placementRequests.add(PlacementRequestImpl.toPlacementRequest(placementContext.getCluster(), placementContext.getCluster().getCollection(assignRequest.collectionName), assignRequest)); + } - final PlacementPlan placementPlan; + final List replicaPositions = new ArrayList<>(); try { - placementPlan = plugin.computePlacement(placementRequest, placementContext); + List placementPlans = plugin.computePlacements(placementRequests, placementContext); + if (placementPlans != null) { + for (PlacementPlan placementPlan : placementPlans) { + replicaPositions.addAll(ReplicaPlacementImpl.toReplicaPositions(placementPlan.getRequest().getCollection().getName(), placementPlan.getReplicaPlacements())); + } + } } catch (PlacementException pe) { throw new Assign.AssignmentException(pe); } - return ReplicaPlacementImpl.toReplicaPositions(placementPlan.getReplicaPlacements()); + return replicaPositions; } @Override diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/impl/ReplicaPlacementImpl.java b/solr/core/src/java/org/apache/solr/cluster/placement/impl/ReplicaPlacementImpl.java index 69d9718b3fe..e9bb9846015 100644 --- a/solr/core/src/java/org/apache/solr/cluster/placement/impl/ReplicaPlacementImpl.java +++ b/solr/core/src/java/org/apache/solr/cluster/placement/impl/ReplicaPlacementImpl.java @@ -69,7 +69,7 @@ public String toString() { * Translates a set of {@link ReplicaPlacement} returned by a plugin into a list of {@link ReplicaPosition} expected * by {@link org.apache.solr.cloud.api.collections.Assign.AssignStrategy} */ - static List toReplicaPositions(Set replicaPlacementSet) { + static List toReplicaPositions(String collection, Set replicaPlacementSet) { // The replica index in ReplicaPosition is not as strict a concept as it might seem. It is used in rules // based placement (for sorting replicas) but its presence in ReplicaPosition is not justified (and when the code // is executing here, it means rules based placement is not used). @@ -80,7 +80,7 @@ static List toReplicaPositions(Set replicaPla List replicaPositions = new ArrayList<>(replicaPlacementSet.size()); int index = 0; // This really an arbitrary value when adding replicas and a possible source of core name collisions for (ReplicaPlacement placement : replicaPlacementSet) { - replicaPositions.add(new ReplicaPosition(placement.getShardName(), index++, SimpleClusterAbstractionsImpl.ReplicaImpl.toCloudReplicaType(placement.getReplicaType()), placement.getNode().getName())); + replicaPositions.add(new ReplicaPosition(collection, placement.getShardName(), index++, SimpleClusterAbstractionsImpl.ReplicaImpl.toCloudReplicaType(placement.getReplicaType()), placement.getNode().getName())); } return replicaPositions; diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/plugins/AffinityPlacementFactory.java b/solr/core/src/java/org/apache/solr/cluster/placement/plugins/AffinityPlacementFactory.java index 6f1ddc9bff6..d958591e415 100644 --- a/solr/core/src/java/org/apache/solr/cluster/placement/plugins/AffinityPlacementFactory.java +++ b/solr/core/src/java/org/apache/solr/cluster/placement/plugins/AffinityPlacementFactory.java @@ -22,7 +22,6 @@ import org.apache.solr.cluster.*; import org.apache.solr.cluster.placement.*; import org.apache.solr.cluster.placement.impl.NodeMetricImpl; -import org.apache.solr.common.util.Pair; import org.apache.solr.common.util.StrUtils; import org.apache.solr.common.util.SuppressForbidden; import org.slf4j.Logger; @@ -202,69 +201,86 @@ private AffinityPlacementPlugin(long minimalFreeDiskGB, long prioritizedFreeDisk @Override @SuppressForbidden(reason = "Ordering.arbitrary() has no equivalent in Comparator class. Rather reuse than copy.") - public PlacementPlan computePlacement(PlacementRequest request, PlacementContext placementContext) throws PlacementException { - Set nodes = request.getTargetNodes(); - SolrCollection solrCollection = request.getCollection(); + public List computePlacements(Collection requests, PlacementContext placementContext) throws PlacementException { + List placementPlans = new ArrayList<>(requests.size()); + Set allNodes = new HashSet<>(); + for (PlacementRequest request : requests) { + allNodes.addAll(request.getTargetNodes()); + } - // Request all needed attributes + // Fetch attributes for a superset of all nodes requested amongst the placementRequests AttributeFetcher attributeFetcher = placementContext.getAttributeFetcher(); attributeFetcher - .requestNodeSystemProperty(AffinityPlacementConfig.AVAILABILITY_ZONE_SYSPROP) - .requestNodeSystemProperty(AffinityPlacementConfig.NODE_TYPE_SYSPROP) - .requestNodeSystemProperty(AffinityPlacementConfig.REPLICA_TYPE_SYSPROP); + .requestNodeSystemProperty(AffinityPlacementConfig.AVAILABILITY_ZONE_SYSPROP) + .requestNodeSystemProperty(AffinityPlacementConfig.NODE_TYPE_SYSPROP) + .requestNodeSystemProperty(AffinityPlacementConfig.REPLICA_TYPE_SYSPROP); attributeFetcher - .requestNodeMetric(NodeMetricImpl.NUM_CORES) - .requestNodeMetric(NodeMetricImpl.FREE_DISK_GB); - attributeFetcher.fetchFrom(nodes); + .requestNodeMetric(NodeMetricImpl.NUM_CORES) + .requestNodeMetric(NodeMetricImpl.FREE_DISK_GB); + attributeFetcher.fetchFrom(allNodes); final AttributeValues attrValues = attributeFetcher.fetchAttributes(); - // filter out nodes that don't meet the `withCollection` constraint - nodes = filterNodesWithCollection(placementContext.getCluster(), request, attrValues, nodes); - // filter out nodes that don't match the "node types" specified in the collection props - nodes = filterNodesByNodeType(placementContext.getCluster(), request, attrValues, nodes); - - - // Split the set of nodes into 3 sets of nodes accepting each replica type (sets can overlap if nodes accept multiple replica types) - // These subsets sets are actually maps, because we capture the number of cores (of any replica type) present on each node. - // Also get the number of currently existing cores per node, so we can keep update as we place new cores to not end up - // always selecting the same node(s). - Pair>, Map> p = getNodesPerReplicaType(nodes, attrValues); - - EnumMap> replicaTypeToNodes = p.first(); - Map coresOnNodes = p.second(); - - // All available zones of live nodes. Due to some nodes not being candidates for placement, and some existing replicas - // being one availability zones that might be offline (i.e. their nodes are not live), this set might contain zones - // on which it is impossible to place replicas. That's ok. - Set availabilityZones = getZonesFromNodes(nodes, attrValues); - - // Build the replica placement decisions here - Set replicaPlacements = new HashSet<>(); - - // Let's now iterate on all shards to create replicas for and start finding home sweet homes for the replicas - for (String shardName : request.getShardNames()) { - // Inventory nodes (if any) that already have a replica of any type for the shard, because we can't be placing - // additional replicas on these. This data structure is updated after each replica to node assign and is used to - // make sure different replica types are not allocated to the same nodes (protecting same node assignments within - // a given replica type is done "by construction" in makePlacementDecisions()). - Set nodesWithReplicas = new HashSet<>(); - Shard shard = solrCollection.getShard(shardName); - if (shard != null) { - for (Replica r : shard.replicas()) { - nodesWithReplicas.add(r.getNode()); + // Get the number of currently existing cores per node, so we can update as we place new cores to not end up + // always selecting the same node(s). This is used across placement requests + Map allCoresOnNodes = getCoreCountPerNode(allNodes, attrValues); + + // Keep track with nodesWithReplicas across requests + Map>> allNodesWithReplicas = new HashMap<>(); + for (PlacementRequest request : requests) { + Set nodes = request.getTargetNodes(); + SolrCollection solrCollection = request.getCollection(); + + // filter out nodes that don't meet the `withCollection` constraint + nodes = filterNodesWithCollection(placementContext.getCluster(), request, attrValues, nodes); + // filter out nodes that don't match the "node types" specified in the collection props + nodes = filterNodesByNodeType(placementContext.getCluster(), request, attrValues, nodes); + + // Split the set of nodes into 3 sets of nodes accepting each replica type (sets can overlap if nodes accept multiple replica types) + // These subsets sets are actually maps, because we capture the number of cores (of any replica type) present on each node. + EnumMap> replicaTypeToNodes = getAvailableNodesForReplicaTypes(nodes, attrValues); + + // All available zones of live nodes. Due to some nodes not being candidates for placement, and some existing replicas + // being one availability zones that might be offline (i.e. their nodes are not live), this set might contain zones + // on which it is impossible to place replicas. That's ok. + Set availabilityZones = getZonesFromNodes(nodes, attrValues); + + // Build the replica placement decisions here + Set replicaPlacements = new HashSet<>(); + + // Let's now iterate on all shards to create replicas for and start finding home sweet homes for the replicas + for (String shardName : request.getShardNames()) { + // Inventory nodes (if any) that already have a replica of any type for the shard, because we can't be placing + // additional replicas on these. This data structure is updated after each replica to node assign and is used to + // make sure different replica types are not allocated to the same nodes (protecting same node assignments within + // a given replica type is done "by construction" in makePlacementDecisions()). + Set nodesWithReplicas = + allNodesWithReplicas + .computeIfAbsent(solrCollection.getName(), col -> new HashMap<>()) + .computeIfAbsent(shardName, s -> { + Set newNodeSet = new HashSet<>(); + Shard shard = solrCollection.getShard(s); + if (shard != null) { + // Prefill the set with the existing replicas + for (Replica r : shard.replicas()) { + newNodeSet.add(r.getNode()); + } + } + return newNodeSet; + }); + + + // Iterate on the replica types in the enum order. We place more strategic replicas first + // (NRT is more strategic than TLOG more strategic than PULL). This is in case we eventually decide that less + // strategic replica placement impossibility is not a problem that should lead to replica placement computation + // failure. Current code does fail if placement is impossible (constraint is at most one replica of a shard on any node). + for (Replica.ReplicaType replicaType : Replica.ReplicaType.values()) { + makePlacementDecisions(solrCollection, shardName, availabilityZones, replicaType, request.getCountReplicasToCreate(replicaType), + attrValues, replicaTypeToNodes, nodesWithReplicas, allCoresOnNodes, placementContext.getPlacementPlanFactory(), replicaPlacements); } } - - // Iterate on the replica types in the enum order. We place more strategic replicas first - // (NRT is more strategic than TLOG more strategic than PULL). This is in case we eventually decide that less - // strategic replica placement impossibility is not a problem that should lead to replica placement computation - // failure. Current code does fail if placement is impossible (constraint is at most one replica of a shard on any node). - for (Replica.ReplicaType replicaType : Replica.ReplicaType.values()) { - makePlacementDecisions(solrCollection, shardName, availabilityZones, replicaType, request.getCountReplicasToCreate(replicaType), - attrValues, replicaTypeToNodes, nodesWithReplicas, coresOnNodes, placementContext.getPlacementPlanFactory(), replicaPlacements); - } + placementPlans.add(placementContext.getPlacementPlanFactory().createPlacementPlan(request, replicaPlacements)); } - return placementContext.getPlacementPlanFactory().createPlacementPlan(request, replicaPlacements); + return placementPlans; } @Override @@ -392,11 +408,29 @@ private static class AzWithNodes { } } + /** + * Builds the number of existing cores on each node returned in the attrValues. + * Nodes for which the number of cores is not available for whatever reason are excluded from acceptable candidate nodes + * as it would not be possible to make any meaningful placement decisions. + * + * @param nodes all nodes on which this plugin should compute placement + * @param attrValues attributes fetched for the nodes. This method uses system property {@link AffinityPlacementConfig#REPLICA_TYPE_SYSPROP} as + * well as the number of cores on each node. + */ + private Map getCoreCountPerNode(Set nodes, final AttributeValues attrValues) { + Map coresOnNodes = new HashMap<>(); + + for (Node node : nodes) { + attrValues.getNodeMetric(node, NodeMetricImpl.NUM_CORES).ifPresent(count -> coresOnNodes.put(node, count)); + } + + return coresOnNodes; + } + /** * Given the set of all nodes on which to do placement and fetched attributes, builds the sets representing * candidate nodes for placement of replicas of each replica type. - * These sets are packaged and returned in an EnumMap keyed by replica type (1st member of the Pair). - * Also builds the number of existing cores on each node present in the returned EnumMap (2nd member of the returned Pair). + * These sets are packaged and returned in an EnumMap keyed by replica type. * Nodes for which the number of cores is not available for whatever reason are excluded from acceptable candidate nodes * as it would not be possible to make any meaningful placement decisions. * @@ -404,9 +438,8 @@ private static class AzWithNodes { * @param attrValues attributes fetched for the nodes. This method uses system property {@link AffinityPlacementConfig#REPLICA_TYPE_SYSPROP} as * well as the number of cores on each node. */ - private Pair>, Map> getNodesPerReplicaType(Set nodes, final AttributeValues attrValues) { + private EnumMap> getAvailableNodesForReplicaTypes(Set nodes, final AttributeValues attrValues) { EnumMap> replicaTypeToNodes = new EnumMap<>(Replica.ReplicaType.class); - Map coresOnNodes = new HashMap<>(); for (Replica.ReplicaType replicaType : Replica.ReplicaType.values()) { replicaTypeToNodes.put(replicaType, new HashSet<>()); @@ -436,9 +469,6 @@ private Pair>, Map> getNod continue; } - Integer coresCount = attrValues.getNodeMetric(node, NodeMetricImpl.NUM_CORES).get(); - coresOnNodes.put(node, coresCount); - String supportedReplicaTypes = attrValues.getSystemProperty(node, AffinityPlacementConfig.REPLICA_TYPE_SYSPROP).isPresent() ? attrValues.getSystemProperty(node, AffinityPlacementConfig.REPLICA_TYPE_SYSPROP).get() : null; // If property not defined or is only whitespace on a node, assuming node can take any replica type if (supportedReplicaTypes == null || supportedReplicaTypes.isBlank()) { @@ -454,7 +484,7 @@ private Pair>, Map> getNod } } } - return new Pair<>(replicaTypeToNodes, coresOnNodes); + return replicaTypeToNodes; } /** @@ -466,7 +496,7 @@ private Pair>, Map> getNod *

  • Balance as much as possible replicas of a given {@link org.apache.solr.cluster.Replica.ReplicaType} over available AZ's. * This balancing takes into account existing replicas of the corresponding replica type, if any.
  • *
  • Place replicas if possible on nodes having more than a certain amount of free disk space (note that nodes with a too small - * amount of free disk space were eliminated as placement targets earlier, in {@link #getNodesPerReplicaType}). There's + * amount of free disk space were eliminated as placement targets earlier, in {@link #getAvailableNodesForReplicaTypes(Set, AttributeValues)}). There's * a threshold here rather than sorting on the amount of free disk space, because sorting on that value would in * practice lead to never considering the number of cores on a node.
  • *
  • Place replicas on nodes having a smaller number of cores (the number of cores considered diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/plugins/MinimizeCoresPlacementFactory.java b/solr/core/src/java/org/apache/solr/cluster/placement/plugins/MinimizeCoresPlacementFactory.java index df7735ecdd8..ddd327f04d8 100644 --- a/solr/core/src/java/org/apache/solr/cluster/placement/plugins/MinimizeCoresPlacementFactory.java +++ b/solr/core/src/java/org/apache/solr/cluster/placement/plugins/MinimizeCoresPlacementFactory.java @@ -18,9 +18,12 @@ package org.apache.solr.cluster.placement.plugins; import java.util.ArrayList; +import java.util.Collection; import java.util.Comparator; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.List; import java.util.Set; import java.util.Map; @@ -51,66 +54,80 @@ static private class MinimizeCoresPlacementPlugin implements PlacementPlugin { @Override @SuppressForbidden(reason = "Ordering.arbitrary() has no equivalent in Comparator class. Rather reuse than copy.") - public PlacementPlan computePlacement(PlacementRequest request, PlacementContext placementContext) throws PlacementException { - int totalReplicasPerShard = 0; - for (Replica.ReplicaType rt : Replica.ReplicaType.values()) { - totalReplicasPerShard += request.getCountReplicasToCreate(rt); + public List computePlacements(Collection requests, PlacementContext placementContext) throws PlacementException { + List placementPlans = new ArrayList<>(requests.size()); + Set allNodes = new HashSet<>(); + for (PlacementRequest request : requests) { + allNodes.addAll(request.getTargetNodes()); } - if (placementContext.getCluster().getLiveNodes().size() < totalReplicasPerShard) { - throw new PlacementException("Cluster size too small for number of replicas per shard"); - } - - // Get number of cores on each Node - TreeMultimap nodesByCores = TreeMultimap.create(Comparator.naturalOrder(), Ordering.arbitrary()); - - Set nodes = request.getTargetNodes(); - + // Fetch attributes for a superset of all nodes requested amongst the placementRequests AttributeFetcher attributeFetcher = placementContext.getAttributeFetcher(); attributeFetcher.requestNodeMetric(NodeMetricImpl.NUM_CORES); - attributeFetcher.fetchFrom(nodes); + attributeFetcher.fetchFrom(allNodes); AttributeValues attrValues = attributeFetcher.fetchAttributes(); - - - // Get the number of cores on each node and sort the nodes by increasing number of cores - for (Node node : nodes) { + Map coresPerNodeTotal = new HashMap<>(); + for (Node node : allNodes) { if (attrValues.getNodeMetric(node, NodeMetricImpl.NUM_CORES).isEmpty()) { throw new PlacementException("Can't get number of cores in " + node); } - nodesByCores.put(attrValues.getNodeMetric(node, NodeMetricImpl.NUM_CORES).get(), node); + coresPerNodeTotal.put(node.getName(), attrValues.getNodeMetric(node, NodeMetricImpl.NUM_CORES).get()); } - Set replicaPlacements = new HashSet<>(totalReplicasPerShard * request.getShardNames().size()); - - // Now place all replicas of all shards on nodes, by placing on nodes with the smallest number of cores and taking - // into account replicas placed during this computation. Note that for each shard we must place replicas on different - // nodes, when moving to the next shard we use the nodes sorted by their updated number of cores (due to replica - // placements for previous shards). - for (String shardName : request.getShardNames()) { - // Assign replicas based on the sort order of the nodesByCores tree multimap to put replicas on nodes with less - // cores first. We only need totalReplicasPerShard nodes given that's the number of replicas to place. - // We assign based on the passed nodeEntriesToAssign list so the right nodes get replicas. - ArrayList> nodeEntriesToAssign = new ArrayList<>(totalReplicasPerShard); - Iterator> treeIterator = nodesByCores.entries().iterator(); - for (int i = 0; i < totalReplicasPerShard; i++) { - nodeEntriesToAssign.add(treeIterator.next()); + for (PlacementRequest request : requests) { + int totalReplicasPerShard = 0; + for (Replica.ReplicaType rt : Replica.ReplicaType.values()) { + totalReplicasPerShard += request.getCountReplicasToCreate(rt); } - // Update the number of cores each node will have once the assignments below got executed so the next shard picks the - // lowest loaded nodes for its replicas. - for (Map.Entry e : nodeEntriesToAssign) { - int coreCount = e.getKey(); - Node node = e.getValue(); - nodesByCores.remove(coreCount, node); - nodesByCores.put(coreCount + 1, node); + if (placementContext.getCluster().getLiveNodes().size() < totalReplicasPerShard) { + throw new PlacementException("Cluster size too small for number of replicas per shard"); } - for (Replica.ReplicaType replicaType : Replica.ReplicaType.values()) { - placeReplicas(request.getCollection(), nodeEntriesToAssign, placementContext.getPlacementPlanFactory(), replicaPlacements, shardName, request, replicaType); + // Get number of cores on each Node + TreeMultimap nodesByCores = TreeMultimap.create(Comparator.naturalOrder(), Ordering.arbitrary()); + + Set nodes = request.getTargetNodes(); + + // Get the number of cores on each node and sort the nodes by increasing number of cores + for (Node node : nodes) { + nodesByCores.put(coresPerNodeTotal.get(node.getName()), node); } - } - return placementContext.getPlacementPlanFactory().createPlacementPlan(request, replicaPlacements); + Set replicaPlacements = new HashSet<>(totalReplicasPerShard * request.getShardNames().size()); + + // Now place all replicas of all shards on nodes, by placing on nodes with the smallest number of cores and taking + // into account replicas placed during this computation. Note that for each shard we must place replicas on different + // nodes, when moving to the next shard we use the nodes sorted by their updated number of cores (due to replica + // placements for previous shards). + for (String shardName : request.getShardNames()) { + // Assign replicas based on the sort order of the nodesByCores tree multimap to put replicas on nodes with less + // cores first. We only need totalReplicasPerShard nodes given that's the number of replicas to place. + // We assign based on the passed nodeEntriesToAssign list so the right nodes get replicas. + ArrayList> nodeEntriesToAssign = new ArrayList<>(totalReplicasPerShard); + Iterator> treeIterator = nodesByCores.entries().iterator(); + for (int i = 0; i < totalReplicasPerShard; i++) { + nodeEntriesToAssign.add(treeIterator.next()); + } + + // Update the number of cores each node will have once the assignments below got executed so the next shard picks the + // lowest loaded nodes for its replicas. + for (Map.Entry e : nodeEntriesToAssign) { + int coreCount = e.getKey(); + Node node = e.getValue(); + nodesByCores.remove(coreCount, node); + nodesByCores.put(coreCount + 1, node); + coresPerNodeTotal.put(node.getName(), coreCount + 1); + } + + for (Replica.ReplicaType replicaType : Replica.ReplicaType.values()) { + placeReplicas(request.getCollection(), nodeEntriesToAssign, placementContext.getPlacementPlanFactory(), replicaPlacements, shardName, request, replicaType); + } + } + + placementPlans.add(placementContext.getPlacementPlanFactory().createPlacementPlan(request, replicaPlacements)); + } + return placementPlans; } private void placeReplicas(SolrCollection solrCollection, ArrayList> nodeEntriesToAssign, diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/plugins/RandomPlacementFactory.java b/solr/core/src/java/org/apache/solr/cluster/placement/plugins/RandomPlacementFactory.java index e222e14f82b..2f03b5f21b2 100644 --- a/solr/core/src/java/org/apache/solr/cluster/placement/plugins/RandomPlacementFactory.java +++ b/solr/core/src/java/org/apache/solr/cluster/placement/plugins/RandomPlacementFactory.java @@ -18,8 +18,10 @@ package org.apache.solr.cluster.placement.plugins; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashSet; +import java.util.List; import java.util.Random; import java.util.Set; @@ -53,30 +55,34 @@ private RandomPlacementPlugin() { } @Override - public PlacementPlan computePlacement(PlacementRequest request, PlacementContext placementContext) throws PlacementException { - int totalReplicasPerShard = 0; - for (Replica.ReplicaType rt : Replica.ReplicaType.values()) { - totalReplicasPerShard += request.getCountReplicasToCreate(rt); - } + public List computePlacements(Collection requests, PlacementContext placementContext) throws PlacementException { + List placementPlans = new ArrayList<>(requests.size()); + for (PlacementRequest request : requests) { + int totalReplicasPerShard = 0; + for (Replica.ReplicaType rt : Replica.ReplicaType.values()) { + totalReplicasPerShard += request.getCountReplicasToCreate(rt); + } - if (placementContext.getCluster().getLiveNodes().size() < totalReplicasPerShard) { - throw new PlacementException("Cluster size too small for number of replicas per shard"); - } + if (placementContext.getCluster().getLiveNodes().size() < totalReplicasPerShard) { + throw new PlacementException("Cluster size too small for number of replicas per shard"); + } - Set replicaPlacements = new HashSet<>(totalReplicasPerShard * request.getShardNames().size()); + Set replicaPlacements = new HashSet<>(totalReplicasPerShard * request.getShardNames().size()); - // Now place randomly all replicas of all shards on available nodes - for (String shardName : request.getShardNames()) { - // Shuffle the nodes for each shard so that replicas for a shard are placed on distinct yet random nodes - ArrayList nodesToAssign = new ArrayList<>(placementContext.getCluster().getLiveNodes()); - Collections.shuffle(nodesToAssign, replicaPlacementRandom); + // Now place randomly all replicas of all shards on available nodes + for (String shardName : request.getShardNames()) { + // Shuffle the nodes for each shard so that replicas for a shard are placed on distinct yet random nodes + ArrayList nodesToAssign = new ArrayList<>(placementContext.getCluster().getLiveNodes()); + Collections.shuffle(nodesToAssign, replicaPlacementRandom); - for (Replica.ReplicaType rt : Replica.ReplicaType.values()) { - placeForReplicaType(request.getCollection(), nodesToAssign, placementContext.getPlacementPlanFactory(), replicaPlacements, shardName, request, rt); + for (Replica.ReplicaType rt : Replica.ReplicaType.values()) { + placeForReplicaType(request.getCollection(), nodesToAssign, placementContext.getPlacementPlanFactory(), replicaPlacements, shardName, request, rt); + } } - } - return placementContext.getPlacementPlanFactory().createPlacementPlan(request, replicaPlacements); + placementPlans.add(placementContext.getPlacementPlanFactory().createPlacementPlan(request, replicaPlacements)); + } + return placementPlans; } private void placeForReplicaType(SolrCollection solrCollection, ArrayList nodesToAssign, PlacementPlanFactory placementPlanFactory, diff --git a/solr/core/src/test/org/apache/solr/cloud/ReplaceNodeTest.java b/solr/core/src/test/org/apache/solr/cloud/ReplaceNodeTest.java index 917603a91ed..9202c0840d2 100644 --- a/solr/core/src/test/org/apache/solr/cloud/ReplaceNodeTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/ReplaceNodeTest.java @@ -25,15 +25,16 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; import com.codahale.metrics.Metric; +import org.apache.commons.lang3.StringUtils; import org.apache.solr.client.solrj.embedded.JettySolrRunner; import org.apache.solr.client.solrj.impl.CloudSolrClient; import org.apache.solr.client.solrj.impl.HttpSolrClient; import org.apache.solr.client.solrj.request.CollectionAdminRequest; import org.apache.solr.client.solrj.request.CoreAdminRequest; import org.apache.solr.client.solrj.response.CoreAdminResponse; -import org.apache.solr.client.solrj.response.RequestStatusState; import org.apache.solr.common.SolrException; import org.apache.solr.common.cloud.DocCollection; import org.apache.solr.common.cloud.Replica; @@ -106,19 +107,7 @@ public void test() throws Exception { DocCollection collection = cloudClient.getZkStateReader().getClusterState().getCollection(coll); log.debug("### Before decommission: {}", collection); log.info("excluded_node : {} ", emptyNode); - createReplaceNodeRequest(node2bdecommissioned, emptyNode, null).processAsync("000", cloudClient); - CollectionAdminRequest.RequestStatus requestStatus = CollectionAdminRequest.requestStatus("000"); - boolean success = false; - for (int i = 0; i < 300; i++) { - CollectionAdminRequest.RequestStatusResponse rsp = requestStatus.process(cloudClient); - if (rsp.getRequestStatus() == RequestStatusState.COMPLETED) { - success = true; - break; - } - assertNotSame(rsp.getRequestStatus(), RequestStatusState.FAILED); - Thread.sleep(50); - } - assertTrue(success); + createReplaceNodeRequest(node2bdecommissioned, emptyNode, null).processAndWait("000", cloudClient, 15); try (HttpSolrClient coreclient = getHttpSolrClient(cloudClient.getZkStateReader().getBaseUrlForNodeName(node2bdecommissioned))) { CoreAdminResponse status = CoreAdminRequest.getStatus(null, coreclient); assertEquals(0, status.getCoreStatus().size()); @@ -137,19 +126,8 @@ public void test() throws Exception { //let's do it back - this time wait for recoveries CollectionAdminRequest.AsyncCollectionAdminRequest replaceNodeRequest = createReplaceNodeRequest(emptyNode, node2bdecommissioned, Boolean.TRUE); replaceNodeRequest.setWaitForFinalState(true); - replaceNodeRequest.processAsync("001", cloudClient); - requestStatus = CollectionAdminRequest.requestStatus("001"); - - for (int i = 0; i < 200; i++) { - CollectionAdminRequest.RequestStatusResponse rsp = requestStatus.process(cloudClient); - if (rsp.getRequestStatus() == RequestStatusState.COMPLETED) { - success = true; - break; - } - assertNotSame(rsp.getRequestStatus(), RequestStatusState.FAILED); - Thread.sleep(50); - } - assertTrue(success); + replaceNodeRequest.processAndWait("001", cloudClient, 10); + try (HttpSolrClient coreclient = getHttpSolrClient(cloudClient.getZkStateReader().getBaseUrlForNodeName(emptyNode))) { CoreAdminResponse status = CoreAdminRequest.getStatus(null, coreclient); assertEquals("Expecting no cores but found some: " + status.getCoreStatus(), 0, status.getCoreStatus().size()); @@ -210,6 +188,54 @@ public void test() throws Exception { } + @Test + public void testGoodSpreadDuringAssignWithNoTarget() throws Exception { + configureCluster(5) + .addConfig("conf1", TEST_PATH().resolve("configsets").resolve("cloud-dynamic").resolve("conf")) + .configure(); + String coll = "replacenodetest_coll"; + if (log.isInfoEnabled()) { + log.info("total_jettys: {}", cluster.getJettySolrRunners().size()); + } + + CloudSolrClient cloudClient = cluster.getSolrClient(); + Set liveNodes = cloudClient.getZkStateReader().getClusterState().getLiveNodes(); + List l = new ArrayList<>(liveNodes); + Collections.shuffle(l, random()); + List emptyNodes = l.subList(0, 2); + l = l.subList(2, l.size()); + String node2bdecommissioned = l.get(0); + + // TODO: tlog replicas do not work correctly in tests due to fault TestInjection#waitForInSyncWithLeader + CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(coll, "conf1", 4, 3,0,0); + create.setCreateNodeSet(StrUtils.join(l, ',')); + cloudClient.request(create); + + cluster.waitForActiveCollection(coll, 4, 4 * (create.getNumNrtReplicas() + create.getNumPullReplicas() + create.getNumTlogReplicas())); + + DocCollection initialCollection = cloudClient.getZkStateReader().getClusterState().getCollection(coll); + log.debug("### Before decommission: {}", initialCollection); + log.info("excluded_nodes : {} ", emptyNodes); + List initialReplicaCounts = l.stream().map(node -> initialCollection.getReplicas(node).size()).collect(Collectors.toList()); + createReplaceNodeRequest(node2bdecommissioned, null, true).processAndWait("000", cloudClient, 15); + + DocCollection collection = cloudClient.getZkStateReader().getClusterState().getCollection(coll); + log.debug("### After decommission: {}", collection); + // check what are replica states on the decommissioned node + List replicas = collection.getReplicas(node2bdecommissioned); + if (replicas == null) { + replicas = Collections.emptyList(); + } + assertEquals("There should be no more replicas on the sourceNode after a replaceNode request.", Collections.emptyList(), replicas); + int sizeA = collection.getReplicas(emptyNodes.get(0)).size(); + int sizeB = collection.getReplicas(emptyNodes.get(1)).size(); + assertEquals("The empty nodes should have a similar number of replicas placed on each", sizeA, sizeB, 1); + assertEquals("The number of replicas on the two empty nodes should equal the number of replicas removed from the source node", initialReplicaCounts.get(0).intValue(), sizeA + sizeB); + for (int i = 1; i < l.size(); i++) { + assertEquals("The number of replicas on non-empty and non-source nodes should not change", initialReplicaCounts.get(i).intValue(), collection.getReplicas(l.get(i)).size()); + } + } + @Test public void testFailOnSingleNode() throws Exception { configureCluster(1) @@ -240,7 +266,9 @@ public static CollectionAdminRequest.AsyncCollectionAdminRequest createReplaceN public SolrParams getParams() { ModifiableSolrParams params = (ModifiableSolrParams) super.getParams(); params.set("source", sourceNode); - params.setNonNull("target", targetNode); + if (!StringUtils.isEmpty(targetNode)) { + params.setNonNull("target", targetNode); + } if (parallel != null) params.set("parallel", parallel.toString()); return params; } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java index 981934a8624..4c677601312 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java @@ -27,6 +27,7 @@ import org.apache.solr.client.solrj.util.SolrIdentifierValidator; import org.apache.solr.common.MapWriter; import org.apache.solr.common.SolrException; +import org.apache.solr.common.StringUtils; import org.apache.solr.common.cloud.ImplicitDocRouter; import org.apache.solr.common.cloud.Replica; import org.apache.solr.common.cloud.ZkStateReader; @@ -656,7 +657,9 @@ public ReplaceNode setParallel(Boolean flag) { public SolrParams getParams() { ModifiableSolrParams params = (ModifiableSolrParams) super.getParams(); params.set(CollectionParams.SOURCE_NODE, sourceNode); - params.set(CollectionParams.TARGET_NODE, targetNode); + if (!StringUtils.isEmpty(targetNode)) { + params.set(CollectionParams.TARGET_NODE, targetNode); + } if (parallel != null) params.set("parallel", parallel.toString()); return params; } diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ReplicaPosition.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ReplicaPosition.java index 62d876133a5..c5acd19fadb 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/ReplicaPosition.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ReplicaPosition.java @@ -18,29 +18,39 @@ package org.apache.solr.common.cloud; +import java.util.Comparator; + public class ReplicaPosition implements Comparable { + public final String collection; public final String shard; public final int index; public final Replica.Type type; public String node; - public ReplicaPosition(String shard, int replicaIdx, Replica.Type type) { + public ReplicaPosition(String collection, String shard, int replicaIdx, Replica.Type type) { + this.collection = collection; this.shard = shard; this.index = replicaIdx; this.type = type; } - public ReplicaPosition(String shard, int replicaIdx, Replica.Type type, String node) { + public ReplicaPosition(String collection, String shard, int replicaIdx, Replica.Type type, String node) { + this.collection = collection; this.shard = shard; this.index = replicaIdx; this.type = type; this.node = node; } + private static final Comparator comparator = + Comparator + .comparing(rp -> rp.collection) + .thenComparing(rp -> rp.shard) + .thenComparing(rp -> rp.type) + .thenComparingInt(rp -> rp.index); + @Override public int compareTo(ReplicaPosition that) { - //this is to ensure that we try one replica from each shard first instead of - // all replicas from same shard - return Integer.compare(index, that.index); + return comparator.compare(this, that); } @Override