diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 64a22878f23a..d6fb81551382 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -86,6 +86,9 @@ New Features * SOLR-16812: Support CBOR format for update/query (noble) +* SOLR-16855: Solr now provides a MigrateReplicas API at `POST /api/cluster/replicas/migrate` (v2), to move replicas + off of a given set of nodes. This extends the functionality of the existing ReplaceNode API. (Houston Putman) + Improvements --------------------- diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CollApiCmds.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CollApiCmds.java index e35023023b17..1595904bc4f9 100644 --- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CollApiCmds.java +++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CollApiCmds.java @@ -51,6 +51,7 @@ import static org.apache.solr.common.params.CollectionParams.CollectionAction.INSTALLSHARDDATA; import static org.apache.solr.common.params.CollectionParams.CollectionAction.MAINTAINROUTEDALIAS; import static org.apache.solr.common.params.CollectionParams.CollectionAction.MIGRATE; +import static org.apache.solr.common.params.CollectionParams.CollectionAction.MIGRATE_REPLICAS; import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOCK_COLL_TASK; import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOCK_REPLICA_TASK; import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOCK_SHARD_TASK; @@ -143,6 +144,7 @@ private CommandMap(OverseerNodePrioritizer overseerPrioritizer, CollectionComman commandMap = Map.ofEntries( Map.entry(REPLACENODE, new ReplaceNodeCmd(ccc)), + Map.entry(MIGRATE_REPLICAS, new MigrateReplicasCmd(ccc)), Map.entry(BALANCE_REPLICAS, new BalanceReplicasCmd(ccc)), Map.entry(DELETENODE, new DeleteNodeCmd(ccc)), Map.entry(BACKUP, new BackupCmd(ccc)), diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/MigrateReplicasCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/MigrateReplicasCmd.java new file mode 100644 index 000000000000..29616737574c --- /dev/null +++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/MigrateReplicasCmd.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.cloud.api.collections; + +import static org.apache.solr.common.params.CommonAdminParams.ASYNC; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.cloud.ClusterState; +import org.apache.solr.common.cloud.Replica; +import org.apache.solr.common.cloud.ReplicaPosition; +import org.apache.solr.common.cloud.ZkNodeProps; +import org.apache.solr.common.cloud.ZkStateReader; +import org.apache.solr.common.params.CollectionParams; +import org.apache.solr.common.params.CommonAdminParams; +import org.apache.solr.common.util.CollectionUtil; +import org.apache.solr.common.util.NamedList; + +public class MigrateReplicasCmd implements CollApiCmds.CollectionApiCommand { + + private final CollectionCommandContext ccc; + + public MigrateReplicasCmd(CollectionCommandContext ccc) { + this.ccc = ccc; + } + + @Override + public void call(ClusterState state, ZkNodeProps message, NamedList results) + throws Exception { + ZkStateReader zkStateReader = ccc.getZkStateReader(); + Set sourceNodes = getNodesFromParam(message, CollectionParams.SOURCE_NODES); + Set targetNodes = getNodesFromParam(message, CollectionParams.TARGET_NODES); + boolean waitForFinalState = message.getBool(CommonAdminParams.WAIT_FOR_FINAL_STATE, false); + if (sourceNodes.isEmpty()) { + throw new SolrException( + SolrException.ErrorCode.BAD_REQUEST, "sourceNodes is a required param"); + } + String async = message.getStr(ASYNC); + int timeout = message.getInt("timeout", 10 * 60); // 10 minutes + boolean parallel = message.getBool("parallel", false); + ClusterState clusterState = zkStateReader.getClusterState(); + + for (String sourceNode : sourceNodes) { + if (!clusterState.liveNodesContain(sourceNode)) { + throw new SolrException( + SolrException.ErrorCode.BAD_REQUEST, "Source Node: " + sourceNode + " is not live"); + } + } + for (String targetNode : targetNodes) { + if (!clusterState.liveNodesContain(targetNode)) { + throw new SolrException( + SolrException.ErrorCode.BAD_REQUEST, "Target Node: " + targetNode + " is not live"); + } + } + + if (targetNodes.isEmpty()) { + // If no target nodes are provided, use all other live nodes that are not the sourceNodes + targetNodes = + clusterState.getLiveNodes().stream() + .filter(n -> !sourceNodes.contains(n)) + .collect(Collectors.toSet()); + if (targetNodes.isEmpty()) { + throw new SolrException( + SolrException.ErrorCode.BAD_REQUEST, + "No nodes other than the source nodes are live, therefore replicas cannot be migrated"); + } + } + List sourceReplicas = + ReplicaMigrationUtils.getReplicasOfNodes(sourceNodes, clusterState); + Map replicaMovements = CollectionUtil.newHashMap(sourceReplicas.size()); + + if (targetNodes.size() > 1) { + List assignRequests = new ArrayList<>(sourceReplicas.size()); + List targetNodeList = new ArrayList<>(targetNodes); + for (Replica sourceReplica : sourceReplicas) { + Replica.Type replicaType = sourceReplica.getType(); + Assign.AssignRequest assignRequest = + new Assign.AssignRequestBuilder() + .forCollection(sourceReplica.getCollection()) + .forShard(Collections.singletonList(sourceReplica.getShard())) + .assignNrtReplicas(replicaType == Replica.Type.NRT ? 1 : 0) + .assignTlogReplicas(replicaType == Replica.Type.TLOG ? 1 : 0) + .assignPullReplicas(replicaType == Replica.Type.PULL ? 1 : 0) + .onNodes(targetNodeList) + .build(); + assignRequests.add(assignRequest); + } + Assign.AssignStrategy assignStrategy = Assign.createAssignStrategy(ccc.getCoreContainer()); + List replicaPositions = + assignStrategy.assign(ccc.getSolrCloudManager(), assignRequests); + int position = 0; + for (Replica sourceReplica : sourceReplicas) { + replicaMovements.put(sourceReplica, replicaPositions.get(position++).node); + } + } else { + String targetNode = targetNodes.stream().findFirst().get(); + for (Replica sourceReplica : sourceReplicas) { + replicaMovements.put(sourceReplica, targetNode); + } + } + + boolean migrationSuccessful = + ReplicaMigrationUtils.migrateReplicas( + ccc, replicaMovements, parallel, waitForFinalState, timeout, async, results); + if (migrationSuccessful) { + results.add( + "success", + "MIGRATE_REPLICAS action completed successfully from : [" + + String.join(",", sourceNodes) + + "] to : [" + + String.join(",", targetNodes) + + "]"); + } + } + + @SuppressWarnings({"unchecked"}) + protected Set getNodesFromParam(ZkNodeProps message, String paramName) { + Object rawParam = message.get(paramName); + if (rawParam == null) { + return Collections.emptySet(); + } else if (rawParam instanceof Set) { + return (Set) rawParam; + } else if (rawParam instanceof Collection) { + return new HashSet<>((Collection) rawParam); + } else if (rawParam instanceof String) { + return Set.of(((String) rawParam).split(",")); + } else { + throw new SolrException( + SolrException.ErrorCode.BAD_REQUEST, + "'" + + paramName + + "' was not passed as a correct type (Set/List/String): " + + rawParam.getClass().getName()); + } + } +} diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/ReplicaMigrationUtils.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/ReplicaMigrationUtils.java index 903d509da18f..6013dea0c2d1 100644 --- a/solr/core/src/java/org/apache/solr/cloud/api/collections/ReplicaMigrationUtils.java +++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/ReplicaMigrationUtils.java @@ -304,6 +304,20 @@ static boolean cleanupReplicas( return cleanupLatch.await(5, TimeUnit.MINUTES); } + static List getReplicasOfNodes(Collection nodeNames, ClusterState state) { + List sourceReplicas = new ArrayList<>(); + for (Map.Entry e : state.getCollectionsMap().entrySet()) { + for (Slice slice : e.getValue().getSlices()) { + for (Replica replica : slice.getReplicas()) { + if (nodeNames.contains(replica.getNodeName())) { + sourceReplicas.add(replica); + } + } + } + } + return sourceReplicas; + } + static List getReplicasOfNode(String nodeName, ClusterState state) { List sourceReplicas = new ArrayList<>(); for (Map.Entry e : state.getCollectionsMap().entrySet()) { diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/PlacementPlanFactory.java b/solr/core/src/java/org/apache/solr/cluster/placement/PlacementPlanFactory.java index 1a539c09b0a8..8ed2abe9a9da 100644 --- a/solr/core/src/java/org/apache/solr/cluster/placement/PlacementPlanFactory.java +++ b/solr/core/src/java/org/apache/solr/cluster/placement/PlacementPlanFactory.java @@ -39,6 +39,7 @@ public interface PlacementPlanFactory { * org.apache.solr.cloud.api.collections.CreateShardCmd}, {@link * org.apache.solr.cloud.api.collections.ReplaceNodeCmd}, {@link * org.apache.solr.cloud.api.collections.MoveReplicaCmd}, {@link + * org.apache.solr.cloud.api.collections.MigrateReplicasCmd}, {@link * org.apache.solr.cloud.api.collections.SplitShardCmd}, {@link * org.apache.solr.cloud.api.collections.RestoreCmd}, {@link * org.apache.solr.cloud.api.collections.MigrateCmd} as well as of {@link diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/plugins/OrderedNodePlacementPlugin.java b/solr/core/src/java/org/apache/solr/cluster/placement/plugins/OrderedNodePlacementPlugin.java index 0a3beff2706f..de83db8e967b 100644 --- a/solr/core/src/java/org/apache/solr/cluster/placement/plugins/OrderedNodePlacementPlugin.java +++ b/solr/core/src/java/org/apache/solr/cluster/placement/plugins/OrderedNodePlacementPlugin.java @@ -18,10 +18,12 @@ package org.apache.solr.cluster.placement.plugins; import java.lang.invoke.MethodHandles; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Comparator; +import java.util.Deque; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -29,11 +31,11 @@ import java.util.Locale; import java.util.Map; import java.util.Optional; -import java.util.PriorityQueue; import java.util.Set; +import java.util.TreeMap; import java.util.TreeSet; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.IntSupplier; +import java.util.function.Function; import java.util.stream.Collectors; import org.apache.solr.cluster.Node; import org.apache.solr.cluster.Replica; @@ -66,34 +68,35 @@ public List computePlacements( List placementPlans = new ArrayList<>(requests.size()); Set allNodes = new HashSet<>(); Set allCollections = new HashSet<>(); + + Deque pendingRequests = new ArrayDeque<>(requests.size()); for (PlacementRequest request : requests) { + PendingPlacementRequest pending = new PendingPlacementRequest(request); + pendingRequests.add(pending); + placementPlans.add( + placementContext + .getPlacementPlanFactory() + .createPlacementPlan(request, pending.getComputedPlacementSet())); allNodes.addAll(request.getTargetNodes()); allCollections.add(request.getCollection()); } + Collection weightedNodes = getWeightedNodes(placementContext, allNodes, allCollections, true).values(); - for (PlacementRequest request : requests) { - int totalReplicasPerShard = 0; - for (Replica.ReplicaType rt : Replica.ReplicaType.values()) { - totalReplicasPerShard += request.getCountReplicasToCreate(rt); + while (!pendingRequests.isEmpty()) { + PendingPlacementRequest request = pendingRequests.poll(); + if (!request.isPending()) { + continue; } List nodesForRequest = - weightedNodes.stream() - .filter(wn -> request.getTargetNodes().contains(wn.getNode())) - .collect(Collectors.toList()); - - Set replicaPlacements = - CollectionUtil.newHashSet(totalReplicasPerShard * request.getShardNames().size()); + weightedNodes.stream().filter(request::isTargetingNode).collect(Collectors.toList()); SolrCollection solrCollection = request.getCollection(); - // Now place randomly all replicas of all shards on available nodes - for (String shardName : request.getShardNames()) { - for (Replica.ReplicaType replicaType : Replica.ReplicaType.values()) { - int replicaCount = request.getCountReplicasToCreate(replicaType); - if (replicaCount == 0) { - continue; - } + // Now place all replicas of all shards on available nodes + for (String shardName : request.getPendingShards()) { + for (Replica.ReplicaType replicaType : request.getPendingReplicaTypes(shardName)) { + int replicaCount = request.getPendingReplicas(shardName, replicaType); if (log.isDebugEnabled()) { log.debug( "Placing {} replicas for Collection: {}, Shard: {}, ReplicaType: {}", @@ -103,46 +106,52 @@ public List computePlacements( replicaType); } Replica pr = createProjectedReplica(solrCollection, shardName, replicaType, null); - PriorityQueue nodesForReplicaType = new PriorityQueue<>(); + + // Create a NodeHeap so that we have access to the number of ties for the lowestWeighted + // node. + // Sort this heap by the relevant weight of the node given that the replica has been + // added. + NodeHeap nodesForReplicaType = new NodeHeap(n -> n.calcRelevantWeightWithReplica(pr)); nodesForRequest.stream() .filter(n -> n.canAddReplica(pr)) - .forEach( - n -> { - n.sortByRelevantWeightWithReplica(pr); - n.addToSortedCollection(nodesForReplicaType); - }); + .forEach(nodesForReplicaType::add); int replicasPlaced = 0; + boolean retryRequestLater = false; while (!nodesForReplicaType.isEmpty() && replicasPlaced < replicaCount) { WeightedNode node = nodesForReplicaType.poll(); + if (!node.canAddReplica(pr)) { - if (log.isDebugEnabled()) { - log.debug( - "Node can no longer accept replica, removing from selection list: {}", - node.getNode()); - } + log.debug("Node can no-longer add the given replica, move on to next node: {}", node); continue; } - if (node.hasWeightChangedSinceSort()) { - if (log.isDebugEnabled()) { - log.debug( - "Node's sort is out-of-date, adding back to selection list: {}", - node.getNode()); - } - node.addToSortedCollection(nodesForReplicaType); - // The node will be re-sorted, - // so go back to the top of the loop to get the new lowest-sorted node - continue; - } - if (log.isDebugEnabled()) { - log.debug("Node chosen to host replica: {}", node.getNode()); + + // If there is a tie, and there are more node options than we have replicas to place, + // then we want to come back later and try again. If there are ties, but less tie + // options than we have replicas to place, that's ok, because the replicas will likely + // be put on all the tie options. + // + // Only skip the request if it can be requeued, and there are other pending requests to + // compute. + int numWeightTies = nodesForReplicaType.peekTies(); + if (!pendingRequests.isEmpty() + && request.canBeRequeued() + && numWeightTies > (replicaCount - replicasPlaced)) { + log.debug( + "There is a tie for best weight. There are more options ({}) than replicas to place ({}), so try this placement request later: {}", + numWeightTies, + replicaCount - replicasPlaced, + node); + retryRequestLater = true; + break; } + log.debug("Node chosen to host replica: {}", node); boolean needsToResortAll = node.addReplica( createProjectedReplica(solrCollection, shardName, replicaType, node.getNode())); replicasPlaced += 1; - replicaPlacements.add( + request.addPlacement( placementContext .getPlacementPlanFactory() .createReplicaPlacement( @@ -150,12 +159,8 @@ public List computePlacements( // Only update the priorityQueue if there are still replicas to be placed if (replicasPlaced < replicaCount) { if (needsToResortAll) { - if (log.isDebugEnabled()) { - log.debug("Replica addition requires re-sorting of entire selection list"); - } - List nodeList = new ArrayList<>(nodesForReplicaType); - nodesForReplicaType.clear(); - nodeList.forEach(n -> n.addToSortedCollection(nodesForReplicaType)); + log.debug("Replica addition requires re-sorting of entire selection list"); + nodesForReplicaType.resortAll(); } // Add the chosen node back to the list if it can accept another replica of the // shard/replicaType. @@ -167,7 +172,7 @@ public List computePlacements( } } - if (replicasPlaced < replicaCount) { + if (!retryRequestLater && replicasPlaced < replicaCount) { throw new PlacementException( String.format( Locale.ROOT, @@ -180,11 +185,10 @@ public List computePlacements( } } } - - placementPlans.add( - placementContext - .getPlacementPlanFactory() - .createPlacementPlan(request, replicaPlacements)); + if (request.isPending()) { + request.requeue(); + pendingRequests.add(request); + } } return placementPlans; } @@ -194,23 +198,17 @@ public BalancePlan computeBalancing( BalanceRequest balanceRequest, PlacementContext placementContext) throws PlacementException { Map replicaMovements = new HashMap<>(); TreeSet orderedNodes = new TreeSet<>(); - Collection weightedNodes = + orderedNodes.addAll( getWeightedNodes( placementContext, balanceRequest.getNodes(), placementContext.getCluster().collections(), true) - .values(); - // This is critical to store the last sort weight for this node - weightedNodes.forEach( - node -> { - node.sortWithoutChanges(); - node.addToSortedCollection(orderedNodes); - }); + .values()); // While the node with the lowest weight still has room to take a replica from the node with the // highest weight, loop - Map newReplicaMovements = new HashMap<>(); + Map newReplicaMovements = CollectionUtil.newHashMap(1); ArrayList traversedHighNodes = new ArrayList<>(orderedNodes.size() - 1); while (orderedNodes.size() > 1 && orderedNodes.first().calcWeight() < orderedNodes.last().calcWeight()) { @@ -218,22 +216,7 @@ public BalancePlan computeBalancing( if (lowestWeight == null) { break; } - if (lowestWeight.hasWeightChangedSinceSort()) { - if (log.isDebugEnabled()) { - log.debug( - "Re-sorting lowest weighted node: {}, sorting weight is out-of-date.", - lowestWeight.getNode().getName()); - } - // Re-sort this node and go back to find the lowest weight - lowestWeight.addToSortedCollection(orderedNodes); - continue; - } - if (log.isDebugEnabled()) { - log.debug( - "Lowest weighted node: {}, weight: {}", - lowestWeight.getNode().getName(), - lowestWeight.calcWeight()); - } + log.debug("Highest weighted node: {}", lowestWeight); newReplicaMovements.clear(); // If a compatible node was found to move replicas, break and find the lowest weighted node @@ -245,22 +228,7 @@ public BalancePlan computeBalancing( if (highestWeight == null) { break; } - if (highestWeight.hasWeightChangedSinceSort()) { - if (log.isDebugEnabled()) { - log.debug( - "Re-sorting highest weighted node: {}, sorting weight is out-of-date.", - highestWeight.getNode().getName()); - } - // Re-sort this node and go back to find the highest weight - highestWeight.addToSortedCollection(orderedNodes); - continue; - } - if (log.isDebugEnabled()) { - log.debug( - "Highest weighted node: {}, weight: {}", - highestWeight.getNode().getName(), - highestWeight.calcWeight()); - } + log.debug("Highest weighted node: {}", highestWeight); traversedHighNodes.add(highestWeight); // select a replica from the node with the most cores to move to the node with the least @@ -298,13 +266,11 @@ public BalancePlan computeBalancing( highestWeight.addReplica(r); continue; } - if (log.isDebugEnabled()) { - log.debug( - "Replica Movement chosen. From: {}, To: {}, Replica: {}", - highestWeight.getNode().getName(), - lowestWeight.getNode().getName(), - r); - } + log.debug( + "Replica Movement chosen. From: {}, To: {}, Replica: {}", + highestWeight, + lowestWeight, + r); newReplicaMovements.put(r, lowestWeight.getNode()); // Do not go beyond here, do another loop and see if other nodes can move replicas. @@ -321,12 +287,12 @@ public BalancePlan computeBalancing( // Add back in the traversed highNodes that we did not select replicas from, // they might have replicas to move to the next lowestWeighted node - traversedHighNodes.forEach(n -> n.addToSortedCollection(orderedNodes)); + orderedNodes.addAll(traversedHighNodes); traversedHighNodes.clear(); if (newReplicaMovements.size() > 0) { replicaMovements.putAll(newReplicaMovements); // There are no replicas to move to the lowestWeight, remove it from our loop - lowestWeight.addToSortedCollection(orderedNodes); + orderedNodes.add(lowestWeight); } } @@ -437,22 +403,10 @@ protected void verifyDeleteReplicas( public abstract static class WeightedNode implements Comparable { private final Node node; private final Map>> replicas; - private IntSupplier sortWeightCalculator; - private int lastSortedWeight; public WeightedNode(Node node) { this.node = node; this.replicas = new HashMap<>(); - this.lastSortedWeight = 0; - this.sortWeightCalculator = this::calcWeight; - } - - public void sortByRelevantWeightWithReplica(Replica replica) { - sortWeightCalculator = () -> calcRelevantWeightWithReplica(replica); - } - - public void sortWithoutChanges() { - sortWeightCalculator = this::calcWeight; } public Node getNode() { @@ -490,11 +444,6 @@ public Set getReplicasForShardOnNode(Shard shard) { .orElseGet(Collections::emptySet); } - public void addToSortedCollection(Collection collection) { - stashSortedWeight(); - collection.add(this); - } - public abstract int calcWeight(); public abstract int calcRelevantWeightWithReplica(Replica replica); @@ -571,14 +520,6 @@ public final void removeReplica(Replica replica) { protected abstract void removeProjectedReplicaWeights(Replica replica); - private void stashSortedWeight() { - lastSortedWeight = sortWeightCalculator.getAsInt(); - } - - protected boolean hasWeightChangedSinceSort() { - return lastSortedWeight != sortWeightCalculator.getAsInt(); - } - @SuppressWarnings({"rawtypes"}) protected Comparable getTiebreaker() { return node.getName(); @@ -587,7 +528,7 @@ protected Comparable getTiebreaker() { @Override @SuppressWarnings({"unchecked"}) public int compareTo(WeightedNode o) { - int comp = Integer.compare(this.lastSortedWeight, o.lastSortedWeight); + int comp = Integer.compare(this.calcWeight(), o.calcWeight()); if (comp == 0 && !equals(o)) { // TreeSets do not like a 0 comp for non-equal members. comp = getTiebreaker().compareTo(o.getTiebreaker()); @@ -616,7 +557,7 @@ public boolean equals(Object o) { @Override public String toString() { - return "WeightedNode{" + "node=" + node + ", lastSortedWeight=" + lastSortedWeight + '}'; + return "WeightedNode{" + "node=" + node.getName() + ", weight=" + calcWeight() + '}'; } } @@ -722,4 +663,280 @@ public String toString() { } }; } + + /** + * A heap that stores Nodes, sorting them by a given function. + * + *

A normal Java heap class cannot be used, because the {@link #peekTies()} method is required. + */ + private static class NodeHeap { + final Function weightFunc; + + final TreeMap> nodesByWeight; + + Deque currentLowestList; + int currentLowestWeight; + + int size = 0; + + protected NodeHeap(Function weightFunc) { + this.weightFunc = weightFunc; + nodesByWeight = new TreeMap<>(); + currentLowestList = null; + currentLowestWeight = -1; + } + + /** + * Remove and return the node with the lowest weight. There is no guarantee to the sorting of + * nodes that have equal weights. + * + * @return the node with the lowest weight + */ + protected WeightedNode poll() { + updateLowestWeightedList(); + if (currentLowestList == null || currentLowestList.isEmpty()) { + return null; + } else { + size--; + return currentLowestList.pollFirst(); + } + } + + /** + * Return the number of Nodes that are tied for the current lowest weight (using the given + * sorting function). + * + *

PeekTies should only be called after poll(). + * + * @return the number of nodes that are tied for the lowest weight + */ + protected int peekTies() { + return currentLowestList == null ? 1 : currentLowestList.size() + 1; + } + + /** Make sure that the list that contains the nodes with the lowest weights is correct. */ + private void updateLowestWeightedList() { + recheckLowestWeights(); + while (currentLowestList == null || currentLowestList.isEmpty()) { + Map.Entry> lowestEntry = nodesByWeight.pollFirstEntry(); + if (lowestEntry == null) { + currentLowestList = null; + currentLowestWeight = -1; + break; + } else { + currentLowestList = lowestEntry.getValue(); + currentLowestWeight = lowestEntry.getKey(); + recheckLowestWeights(); + } + } + } + + /** + * Go through the list of Nodes with the lowest weight, and make sure that they are still the + * same weight. If their weight has increased, re-add the node to the heap. + */ + private void recheckLowestWeights() { + if (currentLowestList != null) { + currentLowestList.removeIf( + node -> { + if (weightFunc.apply(node) != currentLowestWeight) { + log.debug("Node's sort is out-of-date, re-sorting: {}", node); + add(node); + return true; + } + return false; + }); + } + } + + /** + * Add a node to the heap. + * + * @param node the node to add + */ + public void add(WeightedNode node) { + size++; + int nodeWeight = weightFunc.apply(node); + if (currentLowestWeight == nodeWeight) { + currentLowestList.add(node); + } else { + nodesByWeight.computeIfAbsent(nodeWeight, w -> new ArrayDeque<>()).addLast(node); + } + } + + /** + * Get the number of nodes in the heap. + * + * @return number of nodes + */ + public int size() { + return size; + } + + /** + * Check if the heap is empty. + * + * @return if the heap has no nodes + */ + public boolean isEmpty() { + return size == 0; + } + + /** + * Re-sort all nodes in the heap, because their weights can no-longer be trusted. This is only + * necessary if nodes in the heap may have had their weights decrease. If the nodes just had + * their weights increase, then calling this is not required. + */ + public void resortAll() { + ArrayList temp = new ArrayList<>(size); + if (currentLowestList != null) { + temp.addAll(currentLowestList); + currentLowestList.clear(); + } + nodesByWeight.values().forEach(temp::addAll); + currentLowestWeight = -1; + nodesByWeight.clear(); + temp.forEach(this::add); + } + } + + /** Context for a placement request still has replicas that need to be placed. */ + static class PendingPlacementRequest { + boolean hasBeenRequeued; + + final SolrCollection collection; + + final Set targetNodes; + + // A running list of placements already computed + final Set computedPlacements; + + // A live view on how many replicas still need to be placed for each shard & replica type + final Map> replicasToPlaceForShards; + + public PendingPlacementRequest(PlacementRequest request) { + hasBeenRequeued = false; + collection = request.getCollection(); + targetNodes = request.getTargetNodes(); + Set shards = request.getShardNames(); + replicasToPlaceForShards = CollectionUtil.newHashMap(shards.size()); + int totalShardReplicas = 0; + for (Replica.ReplicaType type : Replica.ReplicaType.values()) { + int count = request.getCountReplicasToCreate(type); + if (count > 0) { + totalShardReplicas += count; + shards.forEach( + s -> + replicasToPlaceForShards + .computeIfAbsent(s, sh -> CollectionUtil.newHashMap(3)) + .put(type, count)); + } + } + computedPlacements = CollectionUtil.newHashSet(totalShardReplicas * shards.size()); + } + + /** + * Determine if this request is not yet complete, and there are requested replicas that have not + * had placements computed. + * + * @return if there are still replica placements that need to be computed + */ + public boolean isPending() { + return !replicasToPlaceForShards.isEmpty(); + } + + public SolrCollection getCollection() { + return collection; + } + + public boolean isTargetingNode(WeightedNode node) { + return targetNodes.contains(node.getNode()); + } + + /** + * The set of ReplicaPlacements computed for this request. + * + *

The list that is returned is the same list used internally, so it will be augmented until + * {@link #isPending()} returns false. + * + * @return The live set of replicaPlacements for this request. + */ + public Set getComputedPlacementSet() { + return computedPlacements; + } + + /** + * Fetch the list of shards that still have replicas that need placements computed. If all the + * requested replicas for a shard are represented in {@link #getComputedPlacementSet()}, then + * that shard will not be returned by this method. + * + * @return list of unfinished shards + */ + public Collection getPendingShards() { + return new ArrayList<>(replicasToPlaceForShards.keySet()); + } + + /** + * For the given shard, return the replica types that still have placements that need to be + * computed. + * + * @param shard name of the shard to check for uncomputed placements + * @return the set of unfinished replica types + */ + public Collection getPendingReplicaTypes(String shard) { + return Optional.ofNullable(replicasToPlaceForShards.get(shard)) + .map(Map::keySet) + // Use a sorted TreeSet to make sure that tests are repeatable + .>map(TreeSet::new) + .orElseGet(Collections::emptyList); + } + + /** + * Fetch the number of replicas that still need to be placed for the given shard and replica + * type. + * + * @param shard name of shard to be place + * @param type type of replica to be placed + * @return the number of replicas that have not yet had placements computed + */ + public int getPendingReplicas(String shard, Replica.ReplicaType type) { + return Optional.ofNullable(replicasToPlaceForShards.get(shard)) + .map(m -> m.get(type)) + .orElse(0); + } + + /** + * Currently, only of requeue is allowed per pending request. + * + * @return true if the request has not been requeued already + */ + public boolean canBeRequeued() { + return !hasBeenRequeued; + } + + /** Let the pending request know that it has been requeued */ + public void requeue() { + hasBeenRequeued = true; + } + + /** + * Track the given replica placement for this pending request. + * + * @param replica placement that has been made for the pending request + */ + public void addPlacement(ReplicaPlacement replica) { + computedPlacements.add(replica); + replicasToPlaceForShards.computeIfPresent( + replica.getShardName(), + (shard, replicaTypes) -> { + replicaTypes.computeIfPresent( + replica.getReplicaType(), (type, count) -> (count == 1) ? null : count - 1); + if (replicaTypes.size() > 0) { + return replicaTypes; + } else { + return null; + } + }); + } + } } diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/plugins/RandomPlacementFactory.java b/solr/core/src/java/org/apache/solr/cluster/placement/plugins/RandomPlacementFactory.java index 1e0f6a2f5ba4..0b2279b34fac 100644 --- a/solr/core/src/java/org/apache/solr/cluster/placement/plugins/RandomPlacementFactory.java +++ b/solr/core/src/java/org/apache/solr/cluster/placement/plugins/RandomPlacementFactory.java @@ -17,7 +17,6 @@ package org.apache.solr.cluster.placement.plugins; -import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.Random; @@ -108,20 +107,15 @@ public int calcRelevantWeightWithReplica(Replica replica) { @Override protected boolean addProjectedReplicaWeights(Replica replica) { + randomTiebreaker = random.nextInt(); // NO-OP return false; } @Override protected void removeProjectedReplicaWeights(Replica replica) { - // NO-OP - } - - @Override - public void addToSortedCollection( - Collection collection) { randomTiebreaker = random.nextInt(); - super.addToSortedCollection(collection); + // NO-OP } } } diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java index 9413389ca92b..23a0d094ed62 100644 --- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java @@ -188,6 +188,7 @@ import org.apache.solr.handler.admin.api.ListCollectionSnapshotsAPI; import org.apache.solr.handler.admin.api.ListCollectionsAPI; import org.apache.solr.handler.admin.api.MigrateDocsAPI; +import org.apache.solr.handler.admin.api.MigrateReplicasAPI; import org.apache.solr.handler.admin.api.ModifyCollectionAPI; import org.apache.solr.handler.admin.api.MoveReplicaAPI; import org.apache.solr.handler.admin.api.RebalanceLeadersAPI; @@ -1382,6 +1383,7 @@ public Collection> getJerseyResources() { ReloadCollectionAPI.class, RenameCollectionAPI.class, ReplaceNodeAPI.class, + MigrateReplicasAPI.class, BalanceReplicasAPI.class, RestoreCollectionAPI.class, SyncShardAPI.class, diff --git a/solr/core/src/java/org/apache/solr/handler/admin/api/MigrateReplicasAPI.java b/solr/core/src/java/org/apache/solr/handler/admin/api/MigrateReplicasAPI.java new file mode 100644 index 000000000000..da1188ef89df --- /dev/null +++ b/solr/core/src/java/org/apache/solr/handler/admin/api/MigrateReplicasAPI.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.handler.admin.api; + +import static org.apache.solr.client.solrj.impl.BinaryResponseParser.BINARY_CONTENT_TYPE_V2; +import static org.apache.solr.cloud.Overseer.QUEUE_OPERATION; +import static org.apache.solr.common.params.CollectionParams.SOURCE_NODES; +import static org.apache.solr.common.params.CollectionParams.TARGET_NODES; +import static org.apache.solr.common.params.CommonAdminParams.ASYNC; +import static org.apache.solr.common.params.CommonAdminParams.WAIT_FOR_FINAL_STATE; +import static org.apache.solr.handler.admin.CollectionsHandler.DEFAULT_COLLECTION_OP_TIMEOUT; +import static org.apache.solr.security.PermissionNameProvider.Name.COLL_EDIT_PERM; + +import com.fasterxml.jackson.annotation.JsonProperty; +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.media.Schema; +import io.swagger.v3.oas.annotations.parameters.RequestBody; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import javax.inject.Inject; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import org.apache.solr.client.solrj.SolrResponse; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.cloud.ZkNodeProps; +import org.apache.solr.common.params.CollectionParams.CollectionAction; +import org.apache.solr.core.CoreContainer; +import org.apache.solr.handler.admin.CollectionsHandler; +import org.apache.solr.jersey.JacksonReflectMapWriter; +import org.apache.solr.jersey.PermissionName; +import org.apache.solr.jersey.SolrJerseyResponse; +import org.apache.solr.request.SolrQueryRequest; +import org.apache.solr.response.SolrQueryResponse; + +/** V2 API for migrating replicas from a set of nodes to another set of nodes. */ +@Path("cluster/replicas/migrate") +public class MigrateReplicasAPI extends AdminAPIBase { + + @Inject + public MigrateReplicasAPI( + CoreContainer coreContainer, + SolrQueryRequest solrQueryRequest, + SolrQueryResponse solrQueryResponse) { + super(coreContainer, solrQueryRequest, solrQueryResponse); + } + + @POST + @Produces({"application/json", "application/xml", BINARY_CONTENT_TYPE_V2}) + @PermissionName(COLL_EDIT_PERM) + @Operation(summary = "Migrate Replicas from a given set of nodes.") + public SolrJerseyResponse migrateReplicas( + @RequestBody(description = "Contains user provided parameters", required = true) + MigrateReplicasRequestBody requestBody) + throws Exception { + final SolrJerseyResponse response = instantiateJerseyResponse(SolrJerseyResponse.class); + final CoreContainer coreContainer = fetchAndValidateZooKeeperAwareCoreContainer(); + // TODO Record node for log and tracing + final ZkNodeProps remoteMessage = createRemoteMessage(requestBody); + final SolrResponse remoteResponse = + CollectionsHandler.submitCollectionApiCommand( + coreContainer, + coreContainer.getDistributedCollectionCommandRunner(), + remoteMessage, + CollectionAction.MIGRATE_REPLICAS, + DEFAULT_COLLECTION_OP_TIMEOUT); + if (remoteResponse.getException() != null) { + throw remoteResponse.getException(); + } + + disableResponseCaching(); + return response; + } + + public ZkNodeProps createRemoteMessage(MigrateReplicasRequestBody requestBody) { + final Map remoteMessage = new HashMap<>(); + if (requestBody != null) { + if (requestBody.sourceNodes == null || requestBody.sourceNodes.isEmpty()) { + throw new SolrException( + SolrException.ErrorCode.BAD_REQUEST, + "No 'sourceNodes' provided in the request body. The MigrateReplicas API requires a 'sourceNodes' list in the request body."); + } + insertIfNotNull(remoteMessage, SOURCE_NODES, requestBody.sourceNodes); + insertIfNotNull(remoteMessage, TARGET_NODES, requestBody.targetNodes); + insertIfNotNull(remoteMessage, WAIT_FOR_FINAL_STATE, requestBody.waitForFinalState); + insertIfNotNull(remoteMessage, ASYNC, requestBody.async); + } else { + throw new SolrException( + SolrException.ErrorCode.BAD_REQUEST, + "No request body sent with request. The MigrateReplicas API requires a body."); + } + remoteMessage.put(QUEUE_OPERATION, CollectionAction.MIGRATE_REPLICAS.toLower()); + + return new ZkNodeProps(remoteMessage); + } + + public static class MigrateReplicasRequestBody implements JacksonReflectMapWriter { + + public MigrateReplicasRequestBody() {} + + public MigrateReplicasRequestBody( + Set sourceNodes, Set targetNodes, Boolean waitForFinalState, String async) { + this.sourceNodes = sourceNodes; + this.targetNodes = targetNodes; + this.waitForFinalState = waitForFinalState; + this.async = async; + } + + @Schema(description = "The set of nodes which all replicas will be migrated off of.") + @JsonProperty(value = "sourceNodes", required = true) + public Set sourceNodes; + + @Schema( + description = + "A set of nodes to migrate the replicas to. If this is not provided, then the API will use the live data nodes not in 'sourceNodes'.") + @JsonProperty(value = "targetNodes") + public Set targetNodes; + + @Schema( + description = + "If true, the request will complete only when all affected replicas become active. " + + "If false, the API will return the status of the single action, which may be " + + "before the new replicas are online and active.") + @JsonProperty("waitForFinalState") + public Boolean waitForFinalState = false; + + @Schema(description = "Request ID to track this action which will be processed asynchronously.") + @JsonProperty("async") + public String async; + } +} diff --git a/solr/core/src/test/org/apache/solr/cloud/MigrateReplicasTest.java b/solr/core/src/test/org/apache/solr/cloud/MigrateReplicasTest.java new file mode 100644 index 000000000000..0935770b9f5c --- /dev/null +++ b/solr/core/src/test/org/apache/solr/cloud/MigrateReplicasTest.java @@ -0,0 +1,372 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.cloud; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import com.codahale.metrics.Metric; +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.Collections; +import java.util.EnumSet; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.http.HttpEntity; +import org.apache.http.client.methods.HttpEntityEnclosingRequestBase; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.ByteArrayEntity; +import org.apache.http.entity.ContentType; +import org.apache.http.util.EntityUtils; +import org.apache.solr.client.solrj.SolrClient; +import org.apache.solr.client.solrj.impl.CloudLegacySolrClient; +import org.apache.solr.client.solrj.impl.CloudSolrClient; +import org.apache.solr.client.solrj.request.CollectionAdminRequest; +import org.apache.solr.client.solrj.request.CoreAdminRequest; +import org.apache.solr.client.solrj.response.CoreAdminResponse; +import org.apache.solr.common.cloud.DocCollection; +import org.apache.solr.common.cloud.Replica; +import org.apache.solr.common.cloud.Slice; +import org.apache.solr.common.cloud.ZkStateReader; +import org.apache.solr.common.util.StrUtils; +import org.apache.solr.common.util.Utils; +import org.apache.solr.embedded.JettySolrRunner; +import org.apache.solr.handler.admin.api.MigrateReplicasAPI; +import org.apache.solr.metrics.MetricsMap; +import org.apache.solr.metrics.SolrMetricManager; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.noggit.JSONParser; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MigrateReplicasTest extends SolrCloudTestCase { + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + @BeforeClass + public static void setupCluster() { + System.setProperty("metricsEnabled", "true"); + } + + @Before + public void clearPreviousCluster() throws Exception { + // Clear the previous cluster before each test, since they use different numbers of nodes. + shutdownCluster(); + } + + @Test + public void test() throws Exception { + configureCluster(6) + .addConfig( + "conf1", TEST_PATH().resolve("configsets").resolve("cloud-dynamic").resolve("conf")) + .configure(); + String coll = "replacenodetest_coll"; + if (log.isInfoEnabled()) { + log.info("total_jettys: {}", cluster.getJettySolrRunners().size()); + } + + CloudSolrClient cloudClient = cluster.getSolrClient(); + Set liveNodes = cloudClient.getClusterState().getLiveNodes(); + ArrayList l = new ArrayList<>(liveNodes); + Collections.shuffle(l, random()); + String emptyNode = l.remove(0); + String nodeToBeDecommissioned = l.get(0); + CollectionAdminRequest.Create create; + // NOTE: always using the createCollection that takes in 'int' for all types of replicas, so we + // never have to worry about null checking when comparing the Create command with the final + // Slices + + // TODO: tlog replicas do not work correctly in tests due to fault + // TestInjection#waitForInSyncWithLeader + create = + pickRandom( + CollectionAdminRequest.createCollection(coll, "conf1", 5, 2, 0, 0), + // CollectionAdminRequest.createCollection(coll, "conf1", 5, 1,1,0), + // CollectionAdminRequest.createCollection(coll, "conf1", 5, 0,1,1), + // CollectionAdminRequest.createCollection(coll, "conf1", 5, 1,0,1), + // CollectionAdminRequest.createCollection(coll, "conf1", 5, 0,2,0), + // check also replicationFactor 1 + CollectionAdminRequest.createCollection(coll, "conf1", 5, 1, 0, 0) + // CollectionAdminRequest.createCollection(coll, "conf1", 5, 0,1,0) + ); + create.setCreateNodeSet(StrUtils.join(l, ',')); + cloudClient.request(create); + + cluster.waitForActiveCollection( + coll, + 5, + 5 + * (create.getNumNrtReplicas() + + create.getNumPullReplicas() + + create.getNumTlogReplicas())); + + DocCollection collection = cloudClient.getClusterState().getCollection(coll); + log.debug("### Before decommission: {}", collection); + log.info("excluded_node : {} ", emptyNode); + Map response = + callMigrateReplicas( + cloudClient, + new MigrateReplicasAPI.MigrateReplicasRequestBody( + Set.of(nodeToBeDecommissioned), Set.of(emptyNode), true, null)); + assertEquals( + "MigrateReplicas request was unsuccessful", + 0L, + ((Map) response.get("responseHeader")).get("status")); + ZkStateReader zkStateReader = ZkStateReader.from(cloudClient); + try (SolrClient coreClient = + getHttpSolrClient(zkStateReader.getBaseUrlForNodeName(nodeToBeDecommissioned))) { + CoreAdminResponse status = CoreAdminRequest.getStatus(null, coreClient); + assertEquals( + "There should not be any cores left on decommissioned node", + 0, + status.getCoreStatus().size()); + } + + Thread.sleep(5000); + collection = cloudClient.getClusterState().getCollectionOrNull(coll, false); + log.debug("### After decommission: {}", collection); + // check what are replica states on the decommissioned node + assertNull( + "There should not be any replicas left on decommissioned node", + collection.getReplicas(nodeToBeDecommissioned)); + + // let's do it back - this time wait for recoveries + response = + callMigrateReplicas( + cloudClient, + new MigrateReplicasAPI.MigrateReplicasRequestBody( + Set.of(emptyNode), Set.of(nodeToBeDecommissioned), true, null)); + assertEquals( + "MigrateReplicas request was unsuccessful", + 0L, + ((Map) response.get("responseHeader")).get("status")); + + try (SolrClient coreClient = + getHttpSolrClient(zkStateReader.getBaseUrlForNodeName(emptyNode))) { + CoreAdminResponse status = CoreAdminRequest.getStatus(null, coreClient); + assertEquals( + "Expecting no cores but found some: " + status.getCoreStatus(), + 0, + status.getCoreStatus().size()); + } + + collection = cluster.getSolrClient().getClusterState().getCollection(coll); + assertEquals(create.getNumShards().intValue(), collection.getSlices().size()); + for (Slice s : collection.getSlices()) { + assertEquals( + create.getNumNrtReplicas().intValue(), + s.getReplicas(EnumSet.of(Replica.Type.NRT)).size()); + assertEquals( + create.getNumTlogReplicas().intValue(), + s.getReplicas(EnumSet.of(Replica.Type.TLOG)).size()); + assertEquals( + create.getNumPullReplicas().intValue(), + s.getReplicas(EnumSet.of(Replica.Type.PULL)).size()); + } + // make sure all newly created replicas on node are active + List newReplicas = collection.getReplicas(nodeToBeDecommissioned); + assertNotNull("There should be replicas on the migrated-to node", newReplicas); + assertFalse("There should be replicas on the migrated-to node", newReplicas.isEmpty()); + for (Replica r : newReplicas) { + assertEquals(r.toString(), Replica.State.ACTIVE, r.getState()); + } + // make sure all replicas on emptyNode are not active + List replicas = collection.getReplicas(emptyNode); + if (replicas != null) { + for (Replica r : replicas) { + assertNotEquals(r.toString(), Replica.State.ACTIVE, r.getState()); + } + } + + // check replication metrics on this jetty - see SOLR-14924 + for (JettySolrRunner jetty : cluster.getJettySolrRunners()) { + if (jetty.getCoreContainer() == null) { + continue; + } + SolrMetricManager metricManager = jetty.getCoreContainer().getMetricManager(); + String registryName = null; + for (String name : metricManager.registryNames()) { + if (name.startsWith("solr.core.")) { + registryName = name; + } + } + Map metrics = metricManager.registry(registryName).getMetrics(); + if (!metrics.containsKey("REPLICATION./replication.fetcher")) { + continue; + } + MetricsMap fetcherGauge = + (MetricsMap) + ((SolrMetricManager.GaugeWrapper) metrics.get("REPLICATION./replication.fetcher")) + .getGauge(); + assertNotNull("no IndexFetcher gauge in metrics", fetcherGauge); + Map value = fetcherGauge.getValue(); + if (value.isEmpty()) { + continue; + } + assertNotNull("isReplicating missing: " + value, value.get("isReplicating")); + assertTrue( + "isReplicating should be a boolean: " + value, + value.get("isReplicating") instanceof Boolean); + if (value.get("indexReplicatedAt") == null) { + continue; + } + assertNotNull("timesIndexReplicated missing: " + value, value.get("timesIndexReplicated")); + assertTrue( + "timesIndexReplicated should be a number: " + value, + value.get("timesIndexReplicated") instanceof Number); + } + } + + @Test + public void testGoodSpreadDuringAssignWithNoTarget() throws Exception { + configureCluster(5) + .addConfig( + "conf1", TEST_PATH().resolve("configsets").resolve("cloud-dynamic").resolve("conf")) + .configure(); + String coll = "migratereplicastest_notarget_coll"; + if (log.isInfoEnabled()) { + log.info("total_jettys: {}", cluster.getJettySolrRunners().size()); + } + + CloudSolrClient cloudClient = cluster.getSolrClient(); + Set liveNodes = cloudClient.getClusterState().getLiveNodes(); + List l = new ArrayList<>(liveNodes); + Collections.shuffle(l, random()); + List nodesToBeDecommissioned = l.subList(0, 2); + List eventualTargetNodes = l.subList(2, l.size()); + + // TODO: tlog replicas do not work correctly in tests due to fault + // TestInjection#waitForInSyncWithLeader + CollectionAdminRequest.Create create = + CollectionAdminRequest.createCollection(coll, "conf1", 3, 2, 0, 0); + cloudClient.request(create); + + cluster.waitForActiveCollection( + coll, + create.getNumShards(), + create.getNumShards() + * (create.getNumNrtReplicas() + + create.getNumPullReplicas() + + create.getNumTlogReplicas())); + + DocCollection initialCollection = cloudClient.getClusterState().getCollection(coll); + log.info("### Before decommission: {}", initialCollection); + List initialReplicaCounts = + l.stream() + .map(node -> initialCollection.getReplicas(node).size()) + .collect(Collectors.toList()); + Map response = + callMigrateReplicas( + cloudClient, + new MigrateReplicasAPI.MigrateReplicasRequestBody( + new HashSet<>(nodesToBeDecommissioned), Collections.emptySet(), true, null)); + assertEquals( + "MigrateReplicas request was unsuccessful", + 0L, + ((Map) response.get("responseHeader")).get("status")); + + DocCollection collection = cloudClient.getClusterState().getCollectionOrNull(coll, false); + assertNotNull("Collection cannot be null: " + coll, collection); + log.info("### After decommission: {}", collection); + // check what are replica states on the decommissioned nodes + for (String nodeToBeDecommissioned : nodesToBeDecommissioned) { + List replicas = collection.getReplicas(nodeToBeDecommissioned); + if (replicas == null) { + replicas = Collections.emptyList(); + } + assertEquals( + "There should be no more replicas on the sourceNode after a migrateReplicas request.", + Collections.emptyList(), + replicas); + } + + for (String node : eventualTargetNodes) { + assertEquals( + "The non-source node '" + node + "' has the wrong number of replicas after the migration", + 2, + collection.getReplicas(node).size()); + } + } + + @Test + public void testFailOnSingleNode() throws Exception { + configureCluster(1) + .addConfig( + "conf1", TEST_PATH().resolve("configsets").resolve("cloud-dynamic").resolve("conf")) + .configure(); + String coll = "migratereplicastest_singlenode_coll"; + if (log.isInfoEnabled()) { + log.info("total_jettys: {}", cluster.getJettySolrRunners().size()); + } + + CloudSolrClient cloudClient = cluster.getSolrClient(); + cloudClient.request(CollectionAdminRequest.createCollection(coll, "conf1", 5, 1, 0, 0)); + + cluster.waitForActiveCollection(coll, 5, 5); + + String liveNode = cloudClient.getClusterState().getLiveNodes().iterator().next(); + Map response = + callMigrateReplicas( + cloudClient, + new MigrateReplicasAPI.MigrateReplicasRequestBody( + Set.of(liveNode), Collections.emptySet(), true, null)); + assertNotNull( + "No error in response, when the request should have failed", response.get("error")); + assertEquals( + "Wrong error message", + "No nodes other than the source nodes are live, therefore replicas cannot be migrated", + ((Map) response.get("error")).get("msg")); + } + + public Map callMigrateReplicas( + CloudSolrClient cloudClient, MigrateReplicasAPI.MigrateReplicasRequestBody body) + throws IOException { + HttpEntityEnclosingRequestBase httpRequest = null; + HttpEntity entity; + String response = null; + Map r = null; + + String uri = + cluster.getJettySolrRunners().get(0).getBaseUrl().toString().replace("/solr", "") + + "/api/cluster/replicas/migrate"; + try { + httpRequest = new HttpPost(uri); + + httpRequest.setEntity(new ByteArrayEntity(Utils.toJSON(body), ContentType.APPLICATION_JSON)); + httpRequest.setHeader("Accept", "application/json"); + entity = + ((CloudLegacySolrClient) cloudClient).getHttpClient().execute(httpRequest).getEntity(); + try { + response = EntityUtils.toString(entity, UTF_8); + r = (Map) Utils.fromJSONString(response); + assertNotNull("No response given from MigrateReplicas API", r); + assertNotNull("No responseHeader given from MigrateReplicas API", r.get("responseHeader")); + } catch (JSONParser.ParseException e) { + log.error("err response: {}", response); + throw new AssertionError(e); + } + } finally { + httpRequest.releaseConnection(); + } + return r; + } +} diff --git a/solr/core/src/test/org/apache/solr/cloud/api/collections/TestRequestStatusCollectionAPI.java b/solr/core/src/test/org/apache/solr/cloud/api/collections/TestRequestStatusCollectionAPI.java index bf4be82d6b74..6d3f208761a1 100644 --- a/solr/core/src/test/org/apache/solr/cloud/api/collections/TestRequestStatusCollectionAPI.java +++ b/solr/core/src/test/org/apache/solr/cloud/api/collections/TestRequestStatusCollectionAPI.java @@ -219,6 +219,7 @@ private NamedList sendStatusRequestWithRetry(ModifiableSolrParams params try { Thread.sleep(1000); } catch (InterruptedException e) { + break; } } // Return last state? diff --git a/solr/core/src/test/org/apache/solr/handler/admin/api/MigrateReplicasAPITest.java b/solr/core/src/test/org/apache/solr/handler/admin/api/MigrateReplicasAPITest.java new file mode 100644 index 000000000000..e87a4c675327 --- /dev/null +++ b/solr/core/src/test/org/apache/solr/handler/admin/api/MigrateReplicasAPITest.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.handler.admin.api; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.Collections; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import org.apache.solr.SolrTestCaseJ4; +import org.apache.solr.cloud.OverseerSolrResponse; +import org.apache.solr.cloud.api.collections.DistributedCollectionConfigSetCommandRunner; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.cloud.ZkNodeProps; +import org.apache.solr.common.util.NamedList; +import org.apache.solr.core.CoreContainer; +import org.apache.solr.request.SolrQueryRequest; +import org.apache.solr.response.SolrQueryResponse; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.ArgumentCaptor; + +/** Unit tests for {@link ReplaceNodeAPI} */ +public class MigrateReplicasAPITest extends SolrTestCaseJ4 { + + private CoreContainer mockCoreContainer; + private SolrQueryRequest mockQueryRequest; + private SolrQueryResponse queryResponse; + private MigrateReplicasAPI migrateReplicasAPI; + private DistributedCollectionConfigSetCommandRunner mockCommandRunner; + private ArgumentCaptor messageCapturer; + + @BeforeClass + public static void ensureWorkingMockito() { + assumeWorkingMockito(); + } + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + + mockCoreContainer = mock(CoreContainer.class); + mockCommandRunner = mock(DistributedCollectionConfigSetCommandRunner.class); + when(mockCoreContainer.getDistributedCollectionCommandRunner()) + .thenReturn(Optional.of(mockCommandRunner)); + when(mockCommandRunner.runCollectionCommand(any(), any(), anyLong())) + .thenReturn(new OverseerSolrResponse(new NamedList<>())); + mockQueryRequest = mock(SolrQueryRequest.class); + queryResponse = new SolrQueryResponse(); + migrateReplicasAPI = new MigrateReplicasAPI(mockCoreContainer, mockQueryRequest, queryResponse); + messageCapturer = ArgumentCaptor.forClass(ZkNodeProps.class); + + when(mockCoreContainer.isZooKeeperAware()).thenReturn(true); + } + + @Test + public void testCreatesValidOverseerMessage() throws Exception { + MigrateReplicasAPI.MigrateReplicasRequestBody requestBody = + new MigrateReplicasAPI.MigrateReplicasRequestBody( + Set.of("demoSourceNode"), Set.of("demoTargetNode"), false, "async"); + migrateReplicasAPI.migrateReplicas(requestBody); + verify(mockCommandRunner).runCollectionCommand(messageCapturer.capture(), any(), anyLong()); + + final ZkNodeProps createdMessage = messageCapturer.getValue(); + final Map createdMessageProps = createdMessage.getProperties(); + assertEquals(5, createdMessageProps.size()); + assertEquals(Set.of("demoSourceNode"), createdMessageProps.get("sourceNodes")); + assertEquals(Set.of("demoTargetNode"), createdMessageProps.get("targetNodes")); + assertEquals(false, createdMessageProps.get("waitForFinalState")); + assertEquals("async", createdMessageProps.get("async")); + assertEquals("migrate_replicas", createdMessageProps.get("operation")); + } + + @Test + public void testNoTargetNodes() throws Exception { + MigrateReplicasAPI.MigrateReplicasRequestBody requestBody = + new MigrateReplicasAPI.MigrateReplicasRequestBody( + Set.of("demoSourceNode"), null, null, null); + migrateReplicasAPI.migrateReplicas(requestBody); + verify(mockCommandRunner).runCollectionCommand(messageCapturer.capture(), any(), anyLong()); + + final ZkNodeProps createdMessage = messageCapturer.getValue(); + final Map createdMessageProps = createdMessage.getProperties(); + assertEquals(2, createdMessageProps.size()); + assertEquals(Set.of("demoSourceNode"), createdMessageProps.get("sourceNodes")); + assertEquals("migrate_replicas", createdMessageProps.get("operation")); + } + + @Test + public void testNoSourceNodesThrowsError() throws Exception { + MigrateReplicasAPI.MigrateReplicasRequestBody requestBody1 = + new MigrateReplicasAPI.MigrateReplicasRequestBody( + Collections.emptySet(), Set.of("demoTargetNode"), null, null); + assertThrows(SolrException.class, () -> migrateReplicasAPI.migrateReplicas(requestBody1)); + MigrateReplicasAPI.MigrateReplicasRequestBody requestBody2 = + new MigrateReplicasAPI.MigrateReplicasRequestBody( + null, Set.of("demoTargetNode"), null, null); + assertThrows(SolrException.class, () -> migrateReplicasAPI.migrateReplicas(requestBody2)); + } +} diff --git a/solr/solr-ref-guide/modules/deployment-guide/pages/cluster-node-management.adoc b/solr/solr-ref-guide/modules/deployment-guide/pages/cluster-node-management.adoc index d1f79a569685..117d066add4e 100644 --- a/solr/solr-ref-guide/modules/deployment-guide/pages/cluster-node-management.adoc +++ b/solr/solr-ref-guide/modules/deployment-guide/pages/cluster-node-management.adoc @@ -365,6 +365,13 @@ At this point, if you run a query on a node having e.g., `rack=rack1`, Solr will Shuffle the replicas across the given set of Solr nodes until an equilibrium is reached. +The configured xref:configuration-guide:replica-placement-plugins.adoc[Replica Placement Plugin] +will be used to decide: + +* Which replicas should be moved for the balancing +* Which nodes those replicas should be placed +* When the cluster has reached an "equilibrium" + [example.tab-pane#v2balancereplicas] ==== [.tab-label]*V2 API* @@ -414,17 +421,17 @@ If `false`, the API will return when the bare minimum replicas are active, such + Request ID to track this action which will be xref:configuration-guide:collections-api.adoc#asynchronous-calls[processed asynchronously]. +=== BalanceReplicas Response + +The response will include the status of the request. +If the status is anything other than "0", an error message will explain why the request failed. + [IMPORTANT] ==== This operation does not hold necessary locks on the replicas that belong to on the source node. So don't perform other collection operations in this period. ==== -=== BalanceReplicas Response - -The response will include the status of the request. -If the status is anything other than "0", an error message will explain why the request failed. - [[balanceshardunique]] == BALANCESHARDUNIQUE: Balance a Property Across Nodes @@ -534,14 +541,106 @@ http://localhost:8983/solr/admin/collections?action=BALANCESHARDUNIQUE&collectio Examining the clusterstate after issuing this call should show exactly one replica in each shard that has this property. +[[migratereplicas]] +== Migrate Replicas + +Migrate all replicas off of a given set of source nodes. ++ +If more than one node is used as a targetNode (either explicitly, or by default), then the configured +xref:configuration-guide:replica-placement-plugins.adoc[Replica Placement Plugin] will be used to determine +which targetNode should be used for each migrated replica. + +[example.tab-pane#v2migratereplicas] +==== +[.tab-label]*V2 API* + +[source,bash] +---- +curl -X POST http://localhost:8983/api/cluster/replicas/migrate -H 'Content-Type: application/json' -d ' + { + "sourceNodes": ["localhost:8983_solr", "localhost:8984_solr"], + "targetNodes": ["localhost:8985_solr", "localhost:8986_solr"], + "async": "migrate-replicas-1" + } +' +---- +==== + +=== Parameters + + +`sourceNodes`:: ++ +[%autowidth,frame=none] +|=== +|Required |Default: `[]` +|=== ++ +The nodes over which replicas will be balanced. +Replicas that live outside this set of nodes will not be included in the balancing. ++ +If this parameter is not provided, all live data nodes will be used. + +`targetNodes`:: ++ +[%autowidth,frame=none] +|=== +|Optional |Default: `[]` +|=== ++ +The nodes which the migrated replicas will be moved to. +If none is provided, then the API will use all live nodes not provided in `sourceNodes`. ++ +If there is more than one node to migrate the replicas to, then the configured PlacementPlugin replica will have one of these nodes selected + +`waitForFinalState`:: ++ +[%autowidth,frame=none] +|=== +|Optional |Default: `false` +|=== ++ +If `true`, the request will complete only when all affected replicas become active. +If `false`, the API will return when the bare minimum replicas are active, such as the affected leader replicas. + +`async`:: ++ +[%autowidth,frame=none] +|=== +|Optional |Default: none +|=== ++ +Request ID to track this action which will be xref:configuration-guide:collections-api.adoc#asynchronous-calls[processed asynchronously]. + +=== MigrateReplicas Response + +The response will include the status of the request. +If the status is anything other than "0", an error message will explain why the request failed. + +[IMPORTANT] +==== +This operation does not hold necessary locks on the replicas that belong to on the source node. +So don't perform other collection operations in this period. +==== + [[replacenode]] == REPLACENODE: Move All Replicas in a Node to Another +[WARNING] +==== +This API's functionality has been replaced and enhanced by <>, please consider using the new +API instead, as this API may be removed in a future version. +==== + This command recreates replicas in one node (the source) on another node(s) (the target). After each replica is copied, the replicas in the source node are deleted. For source replicas that are also shard leaders the operation will wait for the number of seconds set with the `timeout` parameter to make sure there's an active replica that can become a leader, either an existing replica becoming a leader or the new replica completing recovery and becoming a leader). +If no targetNode is provided, then the configured +xref:configuration-guide:replica-placement-plugins.adoc[Replica Placement Plugin] will be used to determine +which node each recreated replica should be placed on. + [.dynamic-tabs] -- [example.tab-pane#v1replacenode] @@ -592,7 +691,9 @@ The source node from which the replicas need to be copied from. |=== + The target node where replicas will be copied. -If this parameter is not provided, Solr will identify nodes automatically based on policies or number of cores in each node. +If this parameter is not provided, Solr will use all live nodes except for the `sourceNode`. +The configured xref:configuration-guide:replica-placement-plugins.adoc[Replica Placement Plugin] +will be used to determine which node will be used for each replica. `parallel`:: + diff --git a/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java b/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java index 516fdbfb919b..73e2e9518a1d 100644 --- a/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java +++ b/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java @@ -37,6 +37,8 @@ public interface CollectionParams { String SOURCE_NODE = "sourceNode"; String TARGET_NODE = "targetNode"; + String SOURCE_NODES = "sourceNodes"; + String TARGET_NODES = "targetNodes"; String NODES = "nodes"; String MAX_BALANCE_SKEW = "maxBalanceSkew"; @@ -130,6 +132,8 @@ enum CollectionAction { // TODO when we have a node level lock use it here REPLACENODE(true, LockLevel.NONE), // TODO when we have a node level lock use it here + MIGRATE_REPLICAS(true, LockLevel.NONE), + // TODO when we have a node level lock use it here BALANCE_REPLICAS(true, LockLevel.NONE), DELETENODE(true, LockLevel.NONE), MOCK_REPLICA_TASK(false, LockLevel.REPLICA),