From 1c367573132809a09d1058be32897d6d0d68100f Mon Sep 17 00:00:00 2001 From: Houston Putman Date: Tue, 27 Jun 2023 11:49:36 -0400 Subject: [PATCH 01/15] Add migrate replicas command and API --- .../cloud/api/collections/CollApiCmds.java | 2 + .../api/collections/MigrateReplicasCmd.java | 157 ++++++++++++++++++ .../collections/ReplicaMigrationUtils.java | 14 ++ .../placement/PlacementPlanFactory.java | 1 + .../handler/admin/api/MigrateReplicasAPI.java | 146 ++++++++++++++++ .../solr/common/params/CollectionParams.java | 4 + 6 files changed, 324 insertions(+) create mode 100644 solr/core/src/java/org/apache/solr/cloud/api/collections/MigrateReplicasCmd.java create mode 100644 solr/core/src/java/org/apache/solr/handler/admin/api/MigrateReplicasAPI.java diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CollApiCmds.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CollApiCmds.java index e35023023b17..1595904bc4f9 100644 --- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CollApiCmds.java +++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CollApiCmds.java @@ -51,6 +51,7 @@ import static org.apache.solr.common.params.CollectionParams.CollectionAction.INSTALLSHARDDATA; import static org.apache.solr.common.params.CollectionParams.CollectionAction.MAINTAINROUTEDALIAS; import static org.apache.solr.common.params.CollectionParams.CollectionAction.MIGRATE; +import static org.apache.solr.common.params.CollectionParams.CollectionAction.MIGRATE_REPLICAS; import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOCK_COLL_TASK; import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOCK_REPLICA_TASK; import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOCK_SHARD_TASK; @@ -143,6 +144,7 @@ private CommandMap(OverseerNodePrioritizer overseerPrioritizer, CollectionComman commandMap = Map.ofEntries( Map.entry(REPLACENODE, new ReplaceNodeCmd(ccc)), + Map.entry(MIGRATE_REPLICAS, new MigrateReplicasCmd(ccc)), Map.entry(BALANCE_REPLICAS, new BalanceReplicasCmd(ccc)), Map.entry(DELETENODE, new DeleteNodeCmd(ccc)), Map.entry(BACKUP, new BackupCmd(ccc)), diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/MigrateReplicasCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/MigrateReplicasCmd.java new file mode 100644 index 000000000000..fa44cb418646 --- /dev/null +++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/MigrateReplicasCmd.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.cloud.api.collections; + +import static org.apache.solr.common.params.CommonAdminParams.ASYNC; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.cloud.ClusterState; +import org.apache.solr.common.cloud.Replica; +import org.apache.solr.common.cloud.ReplicaPosition; +import org.apache.solr.common.cloud.ZkNodeProps; +import org.apache.solr.common.cloud.ZkStateReader; +import org.apache.solr.common.params.CollectionParams; +import org.apache.solr.common.params.CommonAdminParams; +import org.apache.solr.common.util.CollectionUtil; +import org.apache.solr.common.util.NamedList; + +public class MigrateReplicasCmd implements CollApiCmds.CollectionApiCommand { + + private final CollectionCommandContext ccc; + + public MigrateReplicasCmd(CollectionCommandContext ccc) { + this.ccc = ccc; + } + + @Override + public void call(ClusterState state, ZkNodeProps message, NamedList results) + throws Exception { + ZkStateReader zkStateReader = ccc.getZkStateReader(); + Set sourceNodes = getNodesFromParam(message, CollectionParams.SOURCE_NODES); + Set targetNodes = getNodesFromParam(message, CollectionParams.TARGET_NODES); + boolean waitForFinalState = message.getBool(CommonAdminParams.WAIT_FOR_FINAL_STATE, false); + if (sourceNodes.isEmpty()) { + throw new SolrException( + SolrException.ErrorCode.BAD_REQUEST, "sourceNodes is a required param"); + } + String async = message.getStr(ASYNC); + int timeout = message.getInt("timeout", 10 * 60); // 10 minutes + boolean parallel = message.getBool("parallel", false); + ClusterState clusterState = zkStateReader.getClusterState(); + + for (String sourceNode : sourceNodes) { + if (!clusterState.liveNodesContain(sourceNode)) { + throw new SolrException( + SolrException.ErrorCode.BAD_REQUEST, "Source Node: " + sourceNode + " is not live"); + } + } + for (String targetNode : targetNodes) { + if (!clusterState.liveNodesContain(targetNode)) { + throw new SolrException( + SolrException.ErrorCode.BAD_REQUEST, "Target Node: " + targetNode + " is not live"); + } + } + + if (targetNodes.isEmpty()) { + // If no target nodes are provided, use all other live nodes that are not the sourceNodes + targetNodes = + clusterState.getLiveNodes().stream() + .filter(n -> !sourceNodes.contains(n)) + .collect(Collectors.toSet()); + if (targetNodes.isEmpty()) { + throw new SolrException( + SolrException.ErrorCode.BAD_REQUEST, + "No nodes other than the source nodes are live, therefore replicas cannot be migrated"); + } + } + List sourceReplicas = + ReplicaMigrationUtils.getReplicasOfNodes(sourceNodes, clusterState); + Map replicaMovements = CollectionUtil.newHashMap(sourceReplicas.size()); + + if (targetNodes.size() > 1) { + List assignRequests = new ArrayList<>(sourceReplicas.size()); + List targetNodeList = new ArrayList<>(targetNodes); + for (Replica sourceReplica : sourceReplicas) { + Replica.Type replicaType = sourceReplica.getType(); + Assign.AssignRequest assignRequest = + new Assign.AssignRequestBuilder() + .forCollection(sourceReplica.getCollection()) + .forShard(Collections.singletonList(sourceReplica.getShard())) + .assignNrtReplicas(replicaType == Replica.Type.NRT ? 1 : 0) + .assignTlogReplicas(replicaType == Replica.Type.TLOG ? 1 : 0) + .assignPullReplicas(replicaType == Replica.Type.PULL ? 1 : 0) + .onNodes(targetNodeList) + .build(); + assignRequests.add(assignRequest); + } + Assign.AssignStrategy assignStrategy = Assign.createAssignStrategy(ccc.getCoreContainer()); + List replicaPositions = + assignStrategy.assign(ccc.getSolrCloudManager(), assignRequests); + int position = 0; + for (Replica sourceReplica : sourceReplicas) { + replicaMovements.put(sourceReplica, replicaPositions.get(position++).node); + } + } else { + String targetNode = targetNodes.stream().findFirst().get(); + for (Replica sourceReplica : sourceReplicas) { + replicaMovements.put(sourceReplica, targetNode); + } + } + + boolean migrationSuccessful = + ReplicaMigrationUtils.migrateReplicas( + ccc, replicaMovements, parallel, waitForFinalState, timeout, async, results); + if (migrationSuccessful) { + results.add( + "success", + "MIGRATE_REPLICAS action completed successfully from : [" + + String.join(",", sourceNodes) + + "] to : [" + + String.join(",", targetNodes) + + "]"); + } + } + + protected Set getNodesFromParam(ZkNodeProps message, String paramName) { + Object rawParam = message.get(paramName); + if (rawParam == null) { + return Collections.emptySet(); + } else if (rawParam instanceof Set) { + return (Set) rawParam; + } else if (rawParam instanceof Collection) { + return new HashSet<>((Collection) rawParam); + } else if (rawParam instanceof String) { + return Set.of(((String) rawParam).split(",")); + } else { + throw new SolrException( + SolrException.ErrorCode.BAD_REQUEST, + "'" + + paramName + + "' was not passed as a correct type (Set/List/String): " + + rawParam.getClass().getName()); + } + } +} diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/ReplicaMigrationUtils.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/ReplicaMigrationUtils.java index 903d509da18f..6013dea0c2d1 100644 --- a/solr/core/src/java/org/apache/solr/cloud/api/collections/ReplicaMigrationUtils.java +++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/ReplicaMigrationUtils.java @@ -304,6 +304,20 @@ static boolean cleanupReplicas( return cleanupLatch.await(5, TimeUnit.MINUTES); } + static List getReplicasOfNodes(Collection nodeNames, ClusterState state) { + List sourceReplicas = new ArrayList<>(); + for (Map.Entry e : state.getCollectionsMap().entrySet()) { + for (Slice slice : e.getValue().getSlices()) { + for (Replica replica : slice.getReplicas()) { + if (nodeNames.contains(replica.getNodeName())) { + sourceReplicas.add(replica); + } + } + } + } + return sourceReplicas; + } + static List getReplicasOfNode(String nodeName, ClusterState state) { List sourceReplicas = new ArrayList<>(); for (Map.Entry e : state.getCollectionsMap().entrySet()) { diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/PlacementPlanFactory.java b/solr/core/src/java/org/apache/solr/cluster/placement/PlacementPlanFactory.java index 1a539c09b0a8..8ed2abe9a9da 100644 --- a/solr/core/src/java/org/apache/solr/cluster/placement/PlacementPlanFactory.java +++ b/solr/core/src/java/org/apache/solr/cluster/placement/PlacementPlanFactory.java @@ -39,6 +39,7 @@ public interface PlacementPlanFactory { * org.apache.solr.cloud.api.collections.CreateShardCmd}, {@link * org.apache.solr.cloud.api.collections.ReplaceNodeCmd}, {@link * org.apache.solr.cloud.api.collections.MoveReplicaCmd}, {@link + * org.apache.solr.cloud.api.collections.MigrateReplicasCmd}, {@link * org.apache.solr.cloud.api.collections.SplitShardCmd}, {@link * org.apache.solr.cloud.api.collections.RestoreCmd}, {@link * org.apache.solr.cloud.api.collections.MigrateCmd} as well as of {@link diff --git a/solr/core/src/java/org/apache/solr/handler/admin/api/MigrateReplicasAPI.java b/solr/core/src/java/org/apache/solr/handler/admin/api/MigrateReplicasAPI.java new file mode 100644 index 000000000000..0e52f471eea7 --- /dev/null +++ b/solr/core/src/java/org/apache/solr/handler/admin/api/MigrateReplicasAPI.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.handler.admin.api; + +import static org.apache.solr.client.solrj.impl.BinaryResponseParser.BINARY_CONTENT_TYPE_V2; +import static org.apache.solr.cloud.Overseer.QUEUE_OPERATION; +import static org.apache.solr.common.params.CollectionParams.SOURCE_NODES; +import static org.apache.solr.common.params.CollectionParams.TARGET_NODES; +import static org.apache.solr.common.params.CommonAdminParams.ASYNC; +import static org.apache.solr.common.params.CommonAdminParams.WAIT_FOR_FINAL_STATE; +import static org.apache.solr.handler.admin.CollectionsHandler.DEFAULT_COLLECTION_OP_TIMEOUT; +import static org.apache.solr.security.PermissionNameProvider.Name.COLL_EDIT_PERM; + +import com.fasterxml.jackson.annotation.JsonProperty; +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.media.Schema; +import io.swagger.v3.oas.annotations.parameters.RequestBody; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import javax.inject.Inject; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import org.apache.solr.client.solrj.SolrResponse; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.cloud.ZkNodeProps; +import org.apache.solr.common.params.CollectionParams.CollectionAction; +import org.apache.solr.core.CoreContainer; +import org.apache.solr.handler.admin.CollectionsHandler; +import org.apache.solr.jersey.JacksonReflectMapWriter; +import org.apache.solr.jersey.PermissionName; +import org.apache.solr.jersey.SolrJerseyResponse; +import org.apache.solr.request.SolrQueryRequest; +import org.apache.solr.response.SolrQueryResponse; + +/** V2 API for migrating replicas from a set of nodes to another set of nodes. */ +@Path("cluster/replicas/migrate") +public class MigrateReplicasAPI extends AdminAPIBase { + + @Inject + public MigrateReplicasAPI( + CoreContainer coreContainer, + SolrQueryRequest solrQueryRequest, + SolrQueryResponse solrQueryResponse) { + super(coreContainer, solrQueryRequest, solrQueryResponse); + } + + @POST + @Produces({"application/json", "application/xml", BINARY_CONTENT_TYPE_V2}) + @PermissionName(COLL_EDIT_PERM) + @Operation(summary = "Migrate Replicas from a given set of nodes.") + public SolrJerseyResponse balanceReplicas( + @RequestBody(description = "Contains user provided parameters", required = true) + MigrateReplicasRequestBody requestBody) + throws Exception { + final SolrJerseyResponse response = instantiateJerseyResponse(SolrJerseyResponse.class); + final CoreContainer coreContainer = fetchAndValidateZooKeeperAwareCoreContainer(); + // TODO Record node for log and tracing + final ZkNodeProps remoteMessage = createRemoteMessage(requestBody); + final SolrResponse remoteResponse = + CollectionsHandler.submitCollectionApiCommand( + coreContainer, + coreContainer.getDistributedCollectionCommandRunner(), + remoteMessage, + CollectionAction.MIGRATE_REPLICAS, + DEFAULT_COLLECTION_OP_TIMEOUT); + if (remoteResponse.getException() != null) { + throw remoteResponse.getException(); + } + + disableResponseCaching(); + return response; + } + + public ZkNodeProps createRemoteMessage(MigrateReplicasRequestBody requestBody) { + final Map remoteMessage = new HashMap<>(); + if (requestBody != null) { + if (requestBody.sourceNodes == null || requestBody.sourceNodes.isEmpty()) { + throw new SolrException( + SolrException.ErrorCode.BAD_REQUEST, + "No 'sourceNodes' provided in the request body. The MigrateReplicas API requires a 'sourceNodes' list in the request body."); + } + insertIfNotNull(remoteMessage, SOURCE_NODES, requestBody.sourceNodes); + insertIfNotNull(remoteMessage, TARGET_NODES, requestBody.targetNodes); + insertIfNotNull(remoteMessage, WAIT_FOR_FINAL_STATE, requestBody.waitForFinalState); + insertIfNotNull(remoteMessage, ASYNC, requestBody.async); + } else { + throw new SolrException( + SolrException.ErrorCode.BAD_REQUEST, + "No request body sent with request. The MigrateReplicas API requires a body."); + } + remoteMessage.put(QUEUE_OPERATION, CollectionAction.MIGRATE_REPLICAS.toLower()); + + return new ZkNodeProps(remoteMessage); + } + + public static class MigrateReplicasRequestBody implements JacksonReflectMapWriter { + + public MigrateReplicasRequestBody() {} + + public MigrateReplicasRequestBody( + Set sourceNodes, Set targetNodes, Boolean waitForFinalState, String async) { + this.sourceNodes = sourceNodes; + this.targetNodes = targetNodes; + this.waitForFinalState = waitForFinalState; + this.async = async; + } + + @Schema(description = "The set of nodes which all replicas will be migrated off of.") + @JsonProperty(value = "sourceNodes", required = true) + public Set sourceNodes; + + @Schema( + description = + "A set of nodes to migrate the replicas to. If this is not provided, then the API will use the live data nodes not in 'sourceNodes'.") + @JsonProperty(value = "targetNodes") + public Set targetNodes; + + @Schema( + description = + "If true, the request will complete only when all affected replicas become active. " + + "If false, the API will return the status of the single action, which may be " + + "before the new replicas are online and active.") + @JsonProperty("waitForFinalState") + public Boolean waitForFinalState = false; + + @Schema(description = "Request ID to track this action which will be processed asynchronously.") + @JsonProperty("async") + public String async; + } +} diff --git a/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java b/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java index 516fdbfb919b..73e2e9518a1d 100644 --- a/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java +++ b/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java @@ -37,6 +37,8 @@ public interface CollectionParams { String SOURCE_NODE = "sourceNode"; String TARGET_NODE = "targetNode"; + String SOURCE_NODES = "sourceNodes"; + String TARGET_NODES = "targetNodes"; String NODES = "nodes"; String MAX_BALANCE_SKEW = "maxBalanceSkew"; @@ -130,6 +132,8 @@ enum CollectionAction { // TODO when we have a node level lock use it here REPLACENODE(true, LockLevel.NONE), // TODO when we have a node level lock use it here + MIGRATE_REPLICAS(true, LockLevel.NONE), + // TODO when we have a node level lock use it here BALANCE_REPLICAS(true, LockLevel.NONE), DELETENODE(true, LockLevel.NONE), MOCK_REPLICA_TASK(false, LockLevel.REPLICA), From 3e652286ee4b74a33c6271bac959609deacc3143 Mon Sep 17 00:00:00 2001 From: Houston Putman Date: Tue, 27 Jun 2023 12:09:39 -0400 Subject: [PATCH 02/15] Add docs for migrateReplicas, cleanup related docs --- .../pages/cluster-node-management.adoc | 113 +++++++++++++++++- 1 file changed, 107 insertions(+), 6 deletions(-) diff --git a/solr/solr-ref-guide/modules/deployment-guide/pages/cluster-node-management.adoc b/solr/solr-ref-guide/modules/deployment-guide/pages/cluster-node-management.adoc index d1f79a569685..117d066add4e 100644 --- a/solr/solr-ref-guide/modules/deployment-guide/pages/cluster-node-management.adoc +++ b/solr/solr-ref-guide/modules/deployment-guide/pages/cluster-node-management.adoc @@ -365,6 +365,13 @@ At this point, if you run a query on a node having e.g., `rack=rack1`, Solr will Shuffle the replicas across the given set of Solr nodes until an equilibrium is reached. +The configured xref:configuration-guide:replica-placement-plugins.adoc[Replica Placement Plugin] +will be used to decide: + +* Which replicas should be moved for the balancing +* Which nodes those replicas should be placed +* When the cluster has reached an "equilibrium" + [example.tab-pane#v2balancereplicas] ==== [.tab-label]*V2 API* @@ -414,17 +421,17 @@ If `false`, the API will return when the bare minimum replicas are active, such + Request ID to track this action which will be xref:configuration-guide:collections-api.adoc#asynchronous-calls[processed asynchronously]. +=== BalanceReplicas Response + +The response will include the status of the request. +If the status is anything other than "0", an error message will explain why the request failed. + [IMPORTANT] ==== This operation does not hold necessary locks on the replicas that belong to on the source node. So don't perform other collection operations in this period. ==== -=== BalanceReplicas Response - -The response will include the status of the request. -If the status is anything other than "0", an error message will explain why the request failed. - [[balanceshardunique]] == BALANCESHARDUNIQUE: Balance a Property Across Nodes @@ -534,14 +541,106 @@ http://localhost:8983/solr/admin/collections?action=BALANCESHARDUNIQUE&collectio Examining the clusterstate after issuing this call should show exactly one replica in each shard that has this property. +[[migratereplicas]] +== Migrate Replicas + +Migrate all replicas off of a given set of source nodes. ++ +If more than one node is used as a targetNode (either explicitly, or by default), then the configured +xref:configuration-guide:replica-placement-plugins.adoc[Replica Placement Plugin] will be used to determine +which targetNode should be used for each migrated replica. + +[example.tab-pane#v2migratereplicas] +==== +[.tab-label]*V2 API* + +[source,bash] +---- +curl -X POST http://localhost:8983/api/cluster/replicas/migrate -H 'Content-Type: application/json' -d ' + { + "sourceNodes": ["localhost:8983_solr", "localhost:8984_solr"], + "targetNodes": ["localhost:8985_solr", "localhost:8986_solr"], + "async": "migrate-replicas-1" + } +' +---- +==== + +=== Parameters + + +`sourceNodes`:: ++ +[%autowidth,frame=none] +|=== +|Required |Default: `[]` +|=== ++ +The nodes over which replicas will be balanced. +Replicas that live outside this set of nodes will not be included in the balancing. ++ +If this parameter is not provided, all live data nodes will be used. + +`targetNodes`:: ++ +[%autowidth,frame=none] +|=== +|Optional |Default: `[]` +|=== ++ +The nodes which the migrated replicas will be moved to. +If none is provided, then the API will use all live nodes not provided in `sourceNodes`. ++ +If there is more than one node to migrate the replicas to, then the configured PlacementPlugin replica will have one of these nodes selected + +`waitForFinalState`:: ++ +[%autowidth,frame=none] +|=== +|Optional |Default: `false` +|=== ++ +If `true`, the request will complete only when all affected replicas become active. +If `false`, the API will return when the bare minimum replicas are active, such as the affected leader replicas. + +`async`:: ++ +[%autowidth,frame=none] +|=== +|Optional |Default: none +|=== ++ +Request ID to track this action which will be xref:configuration-guide:collections-api.adoc#asynchronous-calls[processed asynchronously]. + +=== MigrateReplicas Response + +The response will include the status of the request. +If the status is anything other than "0", an error message will explain why the request failed. + +[IMPORTANT] +==== +This operation does not hold necessary locks on the replicas that belong to on the source node. +So don't perform other collection operations in this period. +==== + [[replacenode]] == REPLACENODE: Move All Replicas in a Node to Another +[WARNING] +==== +This API's functionality has been replaced and enhanced by <>, please consider using the new +API instead, as this API may be removed in a future version. +==== + This command recreates replicas in one node (the source) on another node(s) (the target). After each replica is copied, the replicas in the source node are deleted. For source replicas that are also shard leaders the operation will wait for the number of seconds set with the `timeout` parameter to make sure there's an active replica that can become a leader, either an existing replica becoming a leader or the new replica completing recovery and becoming a leader). +If no targetNode is provided, then the configured +xref:configuration-guide:replica-placement-plugins.adoc[Replica Placement Plugin] will be used to determine +which node each recreated replica should be placed on. + [.dynamic-tabs] -- [example.tab-pane#v1replacenode] @@ -592,7 +691,9 @@ The source node from which the replicas need to be copied from. |=== + The target node where replicas will be copied. -If this parameter is not provided, Solr will identify nodes automatically based on policies or number of cores in each node. +If this parameter is not provided, Solr will use all live nodes except for the `sourceNode`. +The configured xref:configuration-guide:replica-placement-plugins.adoc[Replica Placement Plugin] +will be used to determine which node will be used for each replica. `parallel`:: + From f548b91d3e3ef894d539411e89cd07c5090e4cf3 Mon Sep 17 00:00:00 2001 From: Houston Putman Date: Tue, 27 Jun 2023 12:11:03 -0400 Subject: [PATCH 03/15] Add changelog entry --- solr/CHANGES.txt | 3 +++ 1 file changed, 3 insertions(+) diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index e472e4b7cf23..b2a83d16ed2d 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -72,6 +72,9 @@ New Features * SOLR-16827: Add min/max scaling to the reranker (Joel Bernstein) +* SOLR-16855: Solr now provides a MigrateReplicas API at `POST /api/cluster/replicas/migrate` (v2), to move replicas + off of a given set of nodes. This extends the functionality of the existing ReplaceNode API. (Houston Putman) + Improvements --------------------- From e0e50f4dc95ec7fda6731bf2bc6853c6c2f38a35 Mon Sep 17 00:00:00 2001 From: Houston Putman Date: Tue, 27 Jun 2023 12:21:39 -0400 Subject: [PATCH 04/15] Add first tests --- .../api/collections/MigrateReplicasCmd.java | 1 + .../handler/admin/api/MigrateReplicasAPI.java | 2 +- .../admin/api/MigrateReplicasAPITest.java | 119 ++++++++++++++++++ 3 files changed, 121 insertions(+), 1 deletion(-) create mode 100644 solr/core/src/test/org/apache/solr/handler/admin/api/MigrateReplicasAPITest.java diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/MigrateReplicasCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/MigrateReplicasCmd.java index fa44cb418646..29616737574c 100644 --- a/solr/core/src/java/org/apache/solr/cloud/api/collections/MigrateReplicasCmd.java +++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/MigrateReplicasCmd.java @@ -135,6 +135,7 @@ public void call(ClusterState state, ZkNodeProps message, NamedList resu } } + @SuppressWarnings({"unchecked"}) protected Set getNodesFromParam(ZkNodeProps message, String paramName) { Object rawParam = message.get(paramName); if (rawParam == null) { diff --git a/solr/core/src/java/org/apache/solr/handler/admin/api/MigrateReplicasAPI.java b/solr/core/src/java/org/apache/solr/handler/admin/api/MigrateReplicasAPI.java index 0e52f471eea7..da1188ef89df 100644 --- a/solr/core/src/java/org/apache/solr/handler/admin/api/MigrateReplicasAPI.java +++ b/solr/core/src/java/org/apache/solr/handler/admin/api/MigrateReplicasAPI.java @@ -64,7 +64,7 @@ public MigrateReplicasAPI( @Produces({"application/json", "application/xml", BINARY_CONTENT_TYPE_V2}) @PermissionName(COLL_EDIT_PERM) @Operation(summary = "Migrate Replicas from a given set of nodes.") - public SolrJerseyResponse balanceReplicas( + public SolrJerseyResponse migrateReplicas( @RequestBody(description = "Contains user provided parameters", required = true) MigrateReplicasRequestBody requestBody) throws Exception { diff --git a/solr/core/src/test/org/apache/solr/handler/admin/api/MigrateReplicasAPITest.java b/solr/core/src/test/org/apache/solr/handler/admin/api/MigrateReplicasAPITest.java new file mode 100644 index 000000000000..492974d6ad24 --- /dev/null +++ b/solr/core/src/test/org/apache/solr/handler/admin/api/MigrateReplicasAPITest.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.handler.admin.api; + +import org.apache.solr.SolrTestCaseJ4; +import org.apache.solr.cloud.MigrateReplicasTest; +import org.apache.solr.cloud.OverseerSolrResponse; +import org.apache.solr.cloud.api.collections.DistributedCollectionConfigSetCommandRunner; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.cloud.ZkNodeProps; +import org.apache.solr.common.util.NamedList; +import org.apache.solr.core.CoreContainer; +import org.apache.solr.request.SolrQueryRequest; +import org.apache.solr.response.SolrQueryResponse; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.ArgumentCaptor; + +import java.util.Collections; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** Unit tests for {@link ReplaceNodeAPI} */ +public class MigrateReplicasAPITest extends SolrTestCaseJ4 { + + private CoreContainer mockCoreContainer; + private SolrQueryRequest mockQueryRequest; + private SolrQueryResponse queryResponse; + private MigrateReplicasAPI migrateReplicasAPI; + private DistributedCollectionConfigSetCommandRunner mockCommandRunner; + private ArgumentCaptor messageCapturer; + + @BeforeClass + public static void ensureWorkingMockito() { + assumeWorkingMockito(); + } + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + + mockCoreContainer = mock(CoreContainer.class); + mockCommandRunner = mock(DistributedCollectionConfigSetCommandRunner.class); + when(mockCoreContainer.getDistributedCollectionCommandRunner()) + .thenReturn(Optional.of(mockCommandRunner)); + when(mockCommandRunner.runCollectionCommand(any(), any(), anyLong())) + .thenReturn(new OverseerSolrResponse(new NamedList<>())); + mockQueryRequest = mock(SolrQueryRequest.class); + queryResponse = new SolrQueryResponse(); + migrateReplicasAPI = new MigrateReplicasAPI(mockCoreContainer, mockQueryRequest, queryResponse); + messageCapturer = ArgumentCaptor.forClass(ZkNodeProps.class); + + when(mockCoreContainer.isZooKeeperAware()).thenReturn(true); + } + + @Test + public void testCreatesValidOverseerMessage() throws Exception { + MigrateReplicasAPI.MigrateReplicasRequestBody requestBody = + new MigrateReplicasAPI.MigrateReplicasRequestBody(Set.of("demoSourceNode"), Set.of("demoTargetNode"), false, "async"); + migrateReplicasAPI.migrateReplicas(requestBody); + verify(mockCommandRunner).runCollectionCommand(messageCapturer.capture(), any(), anyLong()); + + final ZkNodeProps createdMessage = messageCapturer.getValue(); + final Map createdMessageProps = createdMessage.getProperties(); + assertEquals(5, createdMessageProps.size()); + assertEquals(Set.of("demoSourceNode"), createdMessageProps.get("sourceNodes")); + assertEquals(Set.of("demoTargetNode"), createdMessageProps.get("targetNodes")); + assertEquals(false, createdMessageProps.get("waitForFinalState")); + assertEquals("async", createdMessageProps.get("async")); + assertEquals("migrate_replicas", createdMessageProps.get("operation")); + } + + @Test + public void testNoTargetNodes() throws Exception { + MigrateReplicasAPI.MigrateReplicasRequestBody requestBody = + new MigrateReplicasAPI.MigrateReplicasRequestBody(Set.of("demoSourceNode"), null, null, null); + migrateReplicasAPI.migrateReplicas(requestBody); + verify(mockCommandRunner).runCollectionCommand(messageCapturer.capture(), any(), anyLong()); + + final ZkNodeProps createdMessage = messageCapturer.getValue(); + final Map createdMessageProps = createdMessage.getProperties(); + assertEquals(2, createdMessageProps.size()); + assertEquals(Set.of("demoSourceNode"), createdMessageProps.get("sourceNodes")); + assertEquals("migrate_replicas", createdMessageProps.get("operation")); + } + + @Test + public void testNoSourceNodesThrowsError() throws Exception { + MigrateReplicasAPI.MigrateReplicasRequestBody requestBody1 = + new MigrateReplicasAPI.MigrateReplicasRequestBody(Collections.emptySet(), Set.of("demoTargetNode"), null, null); + assertThrows(SolrException.class, () -> migrateReplicasAPI.migrateReplicas(requestBody1)); + MigrateReplicasAPI.MigrateReplicasRequestBody requestBody2 = + new MigrateReplicasAPI.MigrateReplicasRequestBody(null, Set.of("demoTargetNode"), null, null); + assertThrows(SolrException.class, () -> migrateReplicasAPI.migrateReplicas(requestBody2)); + } +} From 5742d45237da5d9399e96f8f217105cc5684eb0f Mon Sep 17 00:00:00 2001 From: Houston Putman Date: Tue, 27 Jun 2023 13:02:58 -0400 Subject: [PATCH 05/15] Add integration test --- .../handler/admin/CollectionsHandler.java | 2 + .../solr/cloud/MigrateReplicasTest.java | 377 ++++++++++++++++++ 2 files changed, 379 insertions(+) create mode 100644 solr/core/src/test/org/apache/solr/cloud/MigrateReplicasTest.java diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java index 9413389ca92b..23a0d094ed62 100644 --- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java @@ -188,6 +188,7 @@ import org.apache.solr.handler.admin.api.ListCollectionSnapshotsAPI; import org.apache.solr.handler.admin.api.ListCollectionsAPI; import org.apache.solr.handler.admin.api.MigrateDocsAPI; +import org.apache.solr.handler.admin.api.MigrateReplicasAPI; import org.apache.solr.handler.admin.api.ModifyCollectionAPI; import org.apache.solr.handler.admin.api.MoveReplicaAPI; import org.apache.solr.handler.admin.api.RebalanceLeadersAPI; @@ -1382,6 +1383,7 @@ public Collection> getJerseyResources() { ReloadCollectionAPI.class, RenameCollectionAPI.class, ReplaceNodeAPI.class, + MigrateReplicasAPI.class, BalanceReplicasAPI.class, RestoreCollectionAPI.class, SyncShardAPI.class, diff --git a/solr/core/src/test/org/apache/solr/cloud/MigrateReplicasTest.java b/solr/core/src/test/org/apache/solr/cloud/MigrateReplicasTest.java new file mode 100644 index 000000000000..b82785060f92 --- /dev/null +++ b/solr/core/src/test/org/apache/solr/cloud/MigrateReplicasTest.java @@ -0,0 +1,377 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.cloud; + +import com.codahale.metrics.Metric; +import org.apache.http.HttpEntity; +import org.apache.http.client.methods.HttpEntityEnclosingRequestBase; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.ByteArrayEntity; +import org.apache.http.entity.ContentType; +import org.apache.http.util.EntityUtils; +import org.apache.solr.client.solrj.SolrClient; +import org.apache.solr.client.solrj.impl.CloudLegacySolrClient; +import org.apache.solr.client.solrj.impl.CloudSolrClient; +import org.apache.solr.client.solrj.request.CollectionAdminRequest; +import org.apache.solr.client.solrj.request.CoreAdminRequest; +import org.apache.solr.client.solrj.response.CoreAdminResponse; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.cloud.DocCollection; +import org.apache.solr.common.cloud.Replica; +import org.apache.solr.common.cloud.Slice; +import org.apache.solr.common.cloud.ZkStateReader; +import org.apache.solr.common.params.CollectionParams; +import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.common.params.SolrParams; +import org.apache.solr.common.util.StrUtils; +import org.apache.solr.common.util.Utils; +import org.apache.solr.embedded.JettySolrRunner; +import org.apache.solr.handler.admin.api.BalanceReplicasAPI; +import org.apache.solr.handler.admin.api.MigrateReplicasAPI; +import org.apache.solr.jersey.SolrJerseyResponse; +import org.apache.solr.metrics.MetricsMap; +import org.apache.solr.metrics.SolrMetricManager; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.noggit.JSONParser; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.Collections; +import java.util.EnumSet; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.solr.common.params.CollectionParams.SOURCE_NODE; +import static org.apache.solr.common.params.CollectionParams.TARGET_NODE; + +public class MigrateReplicasTest extends SolrCloudTestCase { + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + @BeforeClass + public static void setupCluster() { + System.setProperty("metricsEnabled", "true"); + } + + @Before + public void clearPreviousCluster() throws Exception { + // Clear the previous cluster before each test, since they use different numbers of nodes. + shutdownCluster(); + } + + @Test + public void test() throws Exception { + configureCluster(6) + .addConfig( + "conf1", TEST_PATH().resolve("configsets").resolve("cloud-dynamic").resolve("conf")) + .configure(); + String coll = "replacenodetest_coll"; + if (log.isInfoEnabled()) { + log.info("total_jettys: {}", cluster.getJettySolrRunners().size()); + } + + CloudSolrClient cloudClient = cluster.getSolrClient(); + Set liveNodes = cloudClient.getClusterState().getLiveNodes(); + ArrayList l = new ArrayList<>(liveNodes); + Collections.shuffle(l, random()); + String emptyNode = l.remove(0); + String nodeToBeDecommissioned = l.get(0); + CollectionAdminRequest.Create create; + // NOTE: always using the createCollection that takes in 'int' for all types of replicas, so we + // never have to worry about null checking when comparing the Create command with the final + // Slices + + // TODO: tlog replicas do not work correctly in tests due to fault + // TestInjection#waitForInSyncWithLeader + create = + pickRandom( + CollectionAdminRequest.createCollection(coll, "conf1", 5, 2, 0, 0), + // CollectionAdminRequest.createCollection(coll, "conf1", 5, 1,1,0), + // CollectionAdminRequest.createCollection(coll, "conf1", 5, 0,1,1), + // CollectionAdminRequest.createCollection(coll, "conf1", 5, 1,0,1), + // CollectionAdminRequest.createCollection(coll, "conf1", 5, 0,2,0), + // check also replicationFactor 1 + CollectionAdminRequest.createCollection(coll, "conf1", 5, 1, 0, 0) + // CollectionAdminRequest.createCollection(coll, "conf1", 5, 0,1,0) + ); + create.setCreateNodeSet(StrUtils.join(l, ',')); + cloudClient.request(create); + + cluster.waitForActiveCollection( + coll, + 5, + 5 + * (create.getNumNrtReplicas() + + create.getNumPullReplicas() + + create.getNumTlogReplicas())); + + DocCollection collection = cloudClient.getClusterState().getCollection(coll); + log.debug("### Before decommission: {}", collection); + log.info("excluded_node : {} ", emptyNode); + Map response = callMigrateReplicas( + cloudClient, + new MigrateReplicasAPI.MigrateReplicasRequestBody( + Set.of(nodeToBeDecommissioned), + Set.of(emptyNode), + true, + null + ) + ); + assertEquals("MigrateReplicas request was unsuccessful", 0L, ((Map)response.get("responseHeader")).get("status")); + ZkStateReader zkStateReader = ZkStateReader.from(cloudClient); + try (SolrClient coreClient = + getHttpSolrClient(zkStateReader.getBaseUrlForNodeName(nodeToBeDecommissioned))) { + CoreAdminResponse status = CoreAdminRequest.getStatus(null, coreClient); + assertEquals(0, status.getCoreStatus().size()); + } + + Thread.sleep(5000); + collection = cloudClient.getClusterState().getCollection(coll); + log.debug("### After decommission: {}", collection); + // check what are replica states on the decommissioned node + List replicas = collection.getReplicas(nodeToBeDecommissioned); + if (replicas == null) { + replicas = Collections.emptyList(); + } + log.debug("### Existing replicas on decommissioned node: {}", replicas); + + // let's do it back - this time wait for recoveries + response = callMigrateReplicas( + cloudClient, + new MigrateReplicasAPI.MigrateReplicasRequestBody( + Set.of(emptyNode), + Set.of(nodeToBeDecommissioned), + true, + null + ) + ); + assertEquals("MigrateReplicas request was unsuccessful", 0L, ((Map)response.get("responseHeader")).get("status")); + + try (SolrClient coreClient = + getHttpSolrClient(zkStateReader.getBaseUrlForNodeName(emptyNode))) { + CoreAdminResponse status = CoreAdminRequest.getStatus(null, coreClient); + assertEquals( + "Expecting no cores but found some: " + status.getCoreStatus(), + 0, + status.getCoreStatus().size()); + } + + collection = cluster.getSolrClient().getClusterState().getCollection(coll); + assertEquals(create.getNumShards().intValue(), collection.getSlices().size()); + for (Slice s : collection.getSlices()) { + assertEquals( + create.getNumNrtReplicas().intValue(), + s.getReplicas(EnumSet.of(Replica.Type.NRT)).size()); + assertEquals( + create.getNumTlogReplicas().intValue(), + s.getReplicas(EnumSet.of(Replica.Type.TLOG)).size()); + assertEquals( + create.getNumPullReplicas().intValue(), + s.getReplicas(EnumSet.of(Replica.Type.PULL)).size()); + } + // make sure all newly created replicas on node are active + List newReplicas = collection.getReplicas(nodeToBeDecommissioned); + replicas.forEach(r -> newReplicas.removeIf(nr -> nr.getName().equals(r.getName()))); + assertFalse(newReplicas.isEmpty()); + for (Replica r : newReplicas) { + assertEquals(r.toString(), Replica.State.ACTIVE, r.getState()); + } + // make sure all replicas on emptyNode are not active + replicas = collection.getReplicas(emptyNode); + if (replicas != null) { + for (Replica r : replicas) { + assertNotEquals(r.toString(), Replica.State.ACTIVE, r.getState()); + } + } + + // check replication metrics on this jetty - see SOLR-14924 + for (JettySolrRunner jetty : cluster.getJettySolrRunners()) { + if (jetty.getCoreContainer() == null) { + continue; + } + SolrMetricManager metricManager = jetty.getCoreContainer().getMetricManager(); + String registryName = null; + for (String name : metricManager.registryNames()) { + if (name.startsWith("solr.core.")) { + registryName = name; + } + } + Map metrics = metricManager.registry(registryName).getMetrics(); + if (!metrics.containsKey("REPLICATION./replication.fetcher")) { + continue; + } + MetricsMap fetcherGauge = + (MetricsMap) + ((SolrMetricManager.GaugeWrapper) metrics.get("REPLICATION./replication.fetcher")) + .getGauge(); + assertNotNull("no IndexFetcher gauge in metrics", fetcherGauge); + Map value = fetcherGauge.getValue(); + if (value.isEmpty()) { + continue; + } + assertNotNull("isReplicating missing: " + value, value.get("isReplicating")); + assertTrue( + "isReplicating should be a boolean: " + value, + value.get("isReplicating") instanceof Boolean); + if (value.get("indexReplicatedAt") == null) { + continue; + } + assertNotNull("timesIndexReplicated missing: " + value, value.get("timesIndexReplicated")); + assertTrue( + "timesIndexReplicated should be a number: " + value, + value.get("timesIndexReplicated") instanceof Number); + } + } + + @Test + public void testGoodSpreadDuringAssignWithNoTarget() throws Exception { + configureCluster(5) + .addConfig( + "conf1", TEST_PATH().resolve("configsets").resolve("cloud-dynamic").resolve("conf")) + .configure(); + String coll = "migratereplicastest_notarget_coll"; + if (log.isInfoEnabled()) { + log.info("total_jettys: {}", cluster.getJettySolrRunners().size()); + } + + CloudSolrClient cloudClient = cluster.getSolrClient(); + Set liveNodes = cloudClient.getClusterState().getLiveNodes(); + List l = new ArrayList<>(liveNodes); + Collections.shuffle(l, random()); + List nodesToBeDecommissioned = l.subList(0, 2); + List eventualTargetNodes = l.subList(2, l.size()); + + // TODO: tlog replicas do not work correctly in tests due to fault + // TestInjection#waitForInSyncWithLeader + CollectionAdminRequest.Create create = + CollectionAdminRequest.createCollection(coll, "conf1", 3, 2, 0, 0); + create.setCreateNodeSet(StrUtils.join(l, ',')); + cloudClient.request(create); + + cluster.waitForActiveCollection( + coll, + create.getNumShards(), + create.getNumShards() + * (create.getNumNrtReplicas() + + create.getNumPullReplicas() + + create.getNumTlogReplicas())); + + DocCollection initialCollection = cloudClient.getClusterState().getCollection(coll); + log.debug("### Before decommission: {}", initialCollection); + List initialReplicaCounts = + l.stream() + .map(node -> initialCollection.getReplicas(node).size()) + .collect(Collectors.toList()); + Map response = callMigrateReplicas( + cloudClient, + new MigrateReplicasAPI.MigrateReplicasRequestBody( + new HashSet<>(nodesToBeDecommissioned), + Collections.emptySet(), + true, + null) + ); + assertEquals("MigrateReplicas request was unsuccessful", 0L, ((Map)response.get("responseHeader")).get("status")); + + DocCollection collection = cloudClient.getClusterState().getCollectionOrNull(coll, false); + assertNotNull("Collection cannot be null: " + coll, collection); + log.debug("### After decommission: {}", collection); + // check what are replica states on the decommissioned nodes + for (String nodeToBeDecommissioned : nodesToBeDecommissioned) { + List replicas = collection.getReplicas(nodeToBeDecommissioned); + if (replicas == null) { + replicas = Collections.emptyList(); + } + assertEquals( + "There should be no more replicas on the sourceNode after a migrateReplicas request.", + Collections.emptyList(), + replicas); + } + + for (String node : eventualTargetNodes) { + assertEquals( + "The non-source node '" + node + "' has the wrong number of replicas after the migration", 2, collection.getReplicas(node).size()); + } + } + + @Test + public void testFailOnSingleNode() throws Exception { + configureCluster(1) + .addConfig( + "conf1", TEST_PATH().resolve("configsets").resolve("cloud-dynamic").resolve("conf")) + .configure(); + String coll = "migratereplicastest_singlenode_coll"; + if (log.isInfoEnabled()) { + log.info("total_jettys: {}", cluster.getJettySolrRunners().size()); + } + + CloudSolrClient cloudClient = cluster.getSolrClient(); + cloudClient.request(CollectionAdminRequest.createCollection(coll, "conf1", 5, 1, 0, 0)); + + cluster.waitForActiveCollection(coll, 5, 5); + + String liveNode = cloudClient.getClusterState().getLiveNodes().iterator().next(); + Map response = callMigrateReplicas( + cloudClient, + new MigrateReplicasAPI.MigrateReplicasRequestBody( + Set.of(liveNode), + Collections.emptySet(), + true, + null) + ); + assertNotNull("No error in response, when the request should have failed", response.get("error")); + assertEquals("Wrong error message", "No nodes other than the source nodes are live, therefore replicas cannot be migrated", ((Map)response.get("error")).get("msg")); + } + + public Map callMigrateReplicas(CloudSolrClient cloudClient, MigrateReplicasAPI.MigrateReplicasRequestBody body) + throws IOException { + HttpEntityEnclosingRequestBase httpRequest = null; + HttpEntity entity; + String response = null; + Map r = null; + + String uri = cluster.getJettySolrRunners().get(0).getBaseUrl().toString().replace("/solr", "") + "/api/cluster/replicas/migrate"; + try { + httpRequest = new HttpPost(uri); + + httpRequest.setEntity(new ByteArrayEntity(Utils.toJSON(body), ContentType.APPLICATION_JSON)); + httpRequest.setHeader("Accept", "application/json"); + entity = + ((CloudLegacySolrClient) cloudClient).getHttpClient().execute(httpRequest).getEntity(); + try { + response = EntityUtils.toString(entity, UTF_8); + r = (Map) Utils.fromJSONString(response); + assertNotNull("No response given from MigrateReplicas API", r); + assertNotNull("No responseHeader given from MigrateReplicas API", r.get("responseHeader")); + } catch (JSONParser.ParseException e) { + log.error("err response: {}", response); + throw new AssertionError(e); + } + } finally { + httpRequest.releaseConnection(); + } + return r; + } +} From dc26b3df268d197e31bbe6944e3b6d55af78ad5b Mon Sep 17 00:00:00 2001 From: Houston Putman Date: Tue, 27 Jun 2023 13:03:52 -0400 Subject: [PATCH 06/15] tidy --- .../solr/cloud/MigrateReplicasTest.java | 121 +++++++++--------- .../admin/api/MigrateReplicasAPITest.java | 34 ++--- 2 files changed, 76 insertions(+), 79 deletions(-) diff --git a/solr/core/src/test/org/apache/solr/cloud/MigrateReplicasTest.java b/solr/core/src/test/org/apache/solr/cloud/MigrateReplicasTest.java index b82785060f92..30da6f03e845 100644 --- a/solr/core/src/test/org/apache/solr/cloud/MigrateReplicasTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/MigrateReplicasTest.java @@ -17,7 +17,19 @@ package org.apache.solr.cloud; +import static java.nio.charset.StandardCharsets.UTF_8; + import com.codahale.metrics.Metric; +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.Collections; +import java.util.EnumSet; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; import org.apache.http.HttpEntity; import org.apache.http.client.methods.HttpEntityEnclosingRequestBase; import org.apache.http.client.methods.HttpPost; @@ -30,20 +42,14 @@ import org.apache.solr.client.solrj.request.CollectionAdminRequest; import org.apache.solr.client.solrj.request.CoreAdminRequest; import org.apache.solr.client.solrj.response.CoreAdminResponse; -import org.apache.solr.common.SolrException; import org.apache.solr.common.cloud.DocCollection; import org.apache.solr.common.cloud.Replica; import org.apache.solr.common.cloud.Slice; import org.apache.solr.common.cloud.ZkStateReader; -import org.apache.solr.common.params.CollectionParams; -import org.apache.solr.common.params.ModifiableSolrParams; -import org.apache.solr.common.params.SolrParams; import org.apache.solr.common.util.StrUtils; import org.apache.solr.common.util.Utils; import org.apache.solr.embedded.JettySolrRunner; -import org.apache.solr.handler.admin.api.BalanceReplicasAPI; import org.apache.solr.handler.admin.api.MigrateReplicasAPI; -import org.apache.solr.jersey.SolrJerseyResponse; import org.apache.solr.metrics.MetricsMap; import org.apache.solr.metrics.SolrMetricManager; import org.junit.Before; @@ -53,21 +59,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.lang.invoke.MethodHandles; -import java.util.ArrayList; -import java.util.Collections; -import java.util.EnumSet; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.stream.Collectors; - -import static java.nio.charset.StandardCharsets.UTF_8; -import static org.apache.solr.common.params.CollectionParams.SOURCE_NODE; -import static org.apache.solr.common.params.CollectionParams.TARGET_NODE; - public class MigrateReplicasTest extends SolrCloudTestCase { private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); @@ -131,16 +122,15 @@ public void test() throws Exception { DocCollection collection = cloudClient.getClusterState().getCollection(coll); log.debug("### Before decommission: {}", collection); log.info("excluded_node : {} ", emptyNode); - Map response = callMigrateReplicas( + Map response = + callMigrateReplicas( cloudClient, new MigrateReplicasAPI.MigrateReplicasRequestBody( - Set.of(nodeToBeDecommissioned), - Set.of(emptyNode), - true, - null - ) - ); - assertEquals("MigrateReplicas request was unsuccessful", 0L, ((Map)response.get("responseHeader")).get("status")); + Set.of(nodeToBeDecommissioned), Set.of(emptyNode), true, null)); + assertEquals( + "MigrateReplicas request was unsuccessful", + 0L, + ((Map) response.get("responseHeader")).get("status")); ZkStateReader zkStateReader = ZkStateReader.from(cloudClient); try (SolrClient coreClient = getHttpSolrClient(zkStateReader.getBaseUrlForNodeName(nodeToBeDecommissioned))) { @@ -159,16 +149,15 @@ public void test() throws Exception { log.debug("### Existing replicas on decommissioned node: {}", replicas); // let's do it back - this time wait for recoveries - response = callMigrateReplicas( + response = + callMigrateReplicas( cloudClient, new MigrateReplicasAPI.MigrateReplicasRequestBody( - Set.of(emptyNode), - Set.of(nodeToBeDecommissioned), - true, - null - ) - ); - assertEquals("MigrateReplicas request was unsuccessful", 0L, ((Map)response.get("responseHeader")).get("status")); + Set.of(emptyNode), Set.of(nodeToBeDecommissioned), true, null)); + assertEquals( + "MigrateReplicas request was unsuccessful", + 0L, + ((Map) response.get("responseHeader")).get("status")); try (SolrClient coreClient = getHttpSolrClient(zkStateReader.getBaseUrlForNodeName(emptyNode))) { @@ -274,7 +263,7 @@ public void testGoodSpreadDuringAssignWithNoTarget() throws Exception { cluster.waitForActiveCollection( coll, create.getNumShards(), - create.getNumShards() + create.getNumShards() * (create.getNumNrtReplicas() + create.getNumPullReplicas() + create.getNumTlogReplicas())); @@ -285,15 +274,15 @@ public void testGoodSpreadDuringAssignWithNoTarget() throws Exception { l.stream() .map(node -> initialCollection.getReplicas(node).size()) .collect(Collectors.toList()); - Map response = callMigrateReplicas( + Map response = + callMigrateReplicas( cloudClient, new MigrateReplicasAPI.MigrateReplicasRequestBody( - new HashSet<>(nodesToBeDecommissioned), - Collections.emptySet(), - true, - null) - ); - assertEquals("MigrateReplicas request was unsuccessful", 0L, ((Map)response.get("responseHeader")).get("status")); + new HashSet<>(nodesToBeDecommissioned), Collections.emptySet(), true, null)); + assertEquals( + "MigrateReplicas request was unsuccessful", + 0L, + ((Map) response.get("responseHeader")).get("status")); DocCollection collection = cloudClient.getClusterState().getCollectionOrNull(coll, false); assertNotNull("Collection cannot be null: " + coll, collection); @@ -305,14 +294,16 @@ public void testGoodSpreadDuringAssignWithNoTarget() throws Exception { replicas = Collections.emptyList(); } assertEquals( - "There should be no more replicas on the sourceNode after a migrateReplicas request.", - Collections.emptyList(), - replicas); + "There should be no more replicas on the sourceNode after a migrateReplicas request.", + Collections.emptyList(), + replicas); } for (String node : eventualTargetNodes) { assertEquals( - "The non-source node '" + node + "' has the wrong number of replicas after the migration", 2, collection.getReplicas(node).size()); + "The non-source node '" + node + "' has the wrong number of replicas after the migration", + 2, + collection.getReplicas(node).size()); } } @@ -333,33 +324,37 @@ public void testFailOnSingleNode() throws Exception { cluster.waitForActiveCollection(coll, 5, 5); String liveNode = cloudClient.getClusterState().getLiveNodes().iterator().next(); - Map response = callMigrateReplicas( - cloudClient, - new MigrateReplicasAPI.MigrateReplicasRequestBody( - Set.of(liveNode), - Collections.emptySet(), - true, - null) - ); - assertNotNull("No error in response, when the request should have failed", response.get("error")); - assertEquals("Wrong error message", "No nodes other than the source nodes are live, therefore replicas cannot be migrated", ((Map)response.get("error")).get("msg")); + Map response = + callMigrateReplicas( + cloudClient, + new MigrateReplicasAPI.MigrateReplicasRequestBody( + Set.of(liveNode), Collections.emptySet(), true, null)); + assertNotNull( + "No error in response, when the request should have failed", response.get("error")); + assertEquals( + "Wrong error message", + "No nodes other than the source nodes are live, therefore replicas cannot be migrated", + ((Map) response.get("error")).get("msg")); } - public Map callMigrateReplicas(CloudSolrClient cloudClient, MigrateReplicasAPI.MigrateReplicasRequestBody body) - throws IOException { + public Map callMigrateReplicas( + CloudSolrClient cloudClient, MigrateReplicasAPI.MigrateReplicasRequestBody body) + throws IOException { HttpEntityEnclosingRequestBase httpRequest = null; HttpEntity entity; String response = null; Map r = null; - String uri = cluster.getJettySolrRunners().get(0).getBaseUrl().toString().replace("/solr", "") + "/api/cluster/replicas/migrate"; + String uri = + cluster.getJettySolrRunners().get(0).getBaseUrl().toString().replace("/solr", "") + + "/api/cluster/replicas/migrate"; try { httpRequest = new HttpPost(uri); httpRequest.setEntity(new ByteArrayEntity(Utils.toJSON(body), ContentType.APPLICATION_JSON)); httpRequest.setHeader("Accept", "application/json"); entity = - ((CloudLegacySolrClient) cloudClient).getHttpClient().execute(httpRequest).getEntity(); + ((CloudLegacySolrClient) cloudClient).getHttpClient().execute(httpRequest).getEntity(); try { response = EntityUtils.toString(entity, UTF_8); r = (Map) Utils.fromJSONString(response); diff --git a/solr/core/src/test/org/apache/solr/handler/admin/api/MigrateReplicasAPITest.java b/solr/core/src/test/org/apache/solr/handler/admin/api/MigrateReplicasAPITest.java index 492974d6ad24..e87a4c675327 100644 --- a/solr/core/src/test/org/apache/solr/handler/admin/api/MigrateReplicasAPITest.java +++ b/solr/core/src/test/org/apache/solr/handler/admin/api/MigrateReplicasAPITest.java @@ -16,8 +16,17 @@ */ package org.apache.solr.handler.admin.api; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.Collections; +import java.util.Map; +import java.util.Optional; +import java.util.Set; import org.apache.solr.SolrTestCaseJ4; -import org.apache.solr.cloud.MigrateReplicasTest; import org.apache.solr.cloud.OverseerSolrResponse; import org.apache.solr.cloud.api.collections.DistributedCollectionConfigSetCommandRunner; import org.apache.solr.common.SolrException; @@ -31,17 +40,6 @@ import org.junit.Test; import org.mockito.ArgumentCaptor; -import java.util.Collections; -import java.util.Map; -import java.util.Optional; -import java.util.Set; - -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - /** Unit tests for {@link ReplaceNodeAPI} */ public class MigrateReplicasAPITest extends SolrTestCaseJ4 { @@ -79,7 +77,8 @@ public void setUp() throws Exception { @Test public void testCreatesValidOverseerMessage() throws Exception { MigrateReplicasAPI.MigrateReplicasRequestBody requestBody = - new MigrateReplicasAPI.MigrateReplicasRequestBody(Set.of("demoSourceNode"), Set.of("demoTargetNode"), false, "async"); + new MigrateReplicasAPI.MigrateReplicasRequestBody( + Set.of("demoSourceNode"), Set.of("demoTargetNode"), false, "async"); migrateReplicasAPI.migrateReplicas(requestBody); verify(mockCommandRunner).runCollectionCommand(messageCapturer.capture(), any(), anyLong()); @@ -96,7 +95,8 @@ public void testCreatesValidOverseerMessage() throws Exception { @Test public void testNoTargetNodes() throws Exception { MigrateReplicasAPI.MigrateReplicasRequestBody requestBody = - new MigrateReplicasAPI.MigrateReplicasRequestBody(Set.of("demoSourceNode"), null, null, null); + new MigrateReplicasAPI.MigrateReplicasRequestBody( + Set.of("demoSourceNode"), null, null, null); migrateReplicasAPI.migrateReplicas(requestBody); verify(mockCommandRunner).runCollectionCommand(messageCapturer.capture(), any(), anyLong()); @@ -110,10 +110,12 @@ public void testNoTargetNodes() throws Exception { @Test public void testNoSourceNodesThrowsError() throws Exception { MigrateReplicasAPI.MigrateReplicasRequestBody requestBody1 = - new MigrateReplicasAPI.MigrateReplicasRequestBody(Collections.emptySet(), Set.of("demoTargetNode"), null, null); + new MigrateReplicasAPI.MigrateReplicasRequestBody( + Collections.emptySet(), Set.of("demoTargetNode"), null, null); assertThrows(SolrException.class, () -> migrateReplicasAPI.migrateReplicas(requestBody1)); MigrateReplicasAPI.MigrateReplicasRequestBody requestBody2 = - new MigrateReplicasAPI.MigrateReplicasRequestBody(null, Set.of("demoTargetNode"), null, null); + new MigrateReplicasAPI.MigrateReplicasRequestBody( + null, Set.of("demoTargetNode"), null, null); assertThrows(SolrException.class, () -> migrateReplicasAPI.migrateReplicas(requestBody2)); } } From 4eea7685653c1b709c9dac927602f9c61e9b646f Mon Sep 17 00:00:00 2001 From: Houston Putman Date: Thu, 29 Jun 2023 15:30:18 -0400 Subject: [PATCH 07/15] Fix orderedPlacement computation for bad edge case --- .../plugins/OrderedNodePlacementPlugin.java | 180 ++++++++++++++---- .../solr/cloud/MigrateReplicasTest.java | 26 +-- 2 files changed, 151 insertions(+), 55 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/plugins/OrderedNodePlacementPlugin.java b/solr/core/src/java/org/apache/solr/cluster/placement/plugins/OrderedNodePlacementPlugin.java index a9d1f4ea0488..d81df115a603 100644 --- a/solr/core/src/java/org/apache/solr/cluster/placement/plugins/OrderedNodePlacementPlugin.java +++ b/solr/core/src/java/org/apache/solr/cluster/placement/plugins/OrderedNodePlacementPlugin.java @@ -18,10 +18,12 @@ package org.apache.solr.cluster.placement.plugins; import java.lang.invoke.MethodHandles; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Comparator; +import java.util.Deque; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -66,34 +68,32 @@ public List computePlacements( List placementPlans = new ArrayList<>(requests.size()); Set allNodes = new HashSet<>(); Set allCollections = new HashSet<>(); + + Deque pendingRequests = new ArrayDeque<>(requests.size()); for (PlacementRequest request : requests) { + PendingPlacementRequest pending = new PendingPlacementRequest(request); + pendingRequests.add(pending); + placementPlans.add( + placementContext + .getPlacementPlanFactory() + .createPlacementPlan(request, pending.getComputedPlacementSet())); allNodes.addAll(request.getTargetNodes()); allCollections.add(request.getCollection()); } + Collection weightedNodes = getWeightedNodes(placementContext, allNodes, allCollections, true).values(); - for (PlacementRequest request : requests) { - int totalReplicasPerShard = 0; - for (Replica.ReplicaType rt : Replica.ReplicaType.values()) { - totalReplicasPerShard += request.getCountReplicasToCreate(rt); - } + while (!pendingRequests.isEmpty()) { + PendingPlacementRequest request = pendingRequests.poll(); List nodesForRequest = - weightedNodes.stream() - .filter(wn -> request.getTargetNodes().contains(wn.getNode())) - .collect(Collectors.toList()); - - Set replicaPlacements = - CollectionUtil.newHashSet(totalReplicasPerShard * request.getShardNames().size()); + weightedNodes.stream().filter(request::isTargetingNode).collect(Collectors.toList()); SolrCollection solrCollection = request.getCollection(); // Now place randomly all replicas of all shards on available nodes - for (String shardName : request.getShardNames()) { - for (Replica.ReplicaType replicaType : Replica.ReplicaType.values()) { - int replicaCount = request.getCountReplicasToCreate(replicaType); - if (replicaCount == 0) { - continue; - } + for (String shardName : request.getPendingShards()) { + for (Replica.ReplicaType replicaType : request.getPendingReplicaTypes(shardName)) { + int replicaCount = request.getPendingReplicas(shardName, replicaType); if (log.isDebugEnabled()) { log.debug( "Placing {} replicas for Collection: {}, Shard: {}, ReplicaType: {}", @@ -113,36 +113,40 @@ public List computePlacements( }); int replicasPlaced = 0; + boolean retryRequestLater = false; while (!nodesForReplicaType.isEmpty() && replicasPlaced < replicaCount) { WeightedNode node = nodesForReplicaType.poll(); - if (!node.canAddReplica(pr)) { - if (log.isDebugEnabled()) { - log.debug( - "Node can no longer accept replica, removing from selection list: {}", - node.getNode()); - } - continue; - } + if (node.hasWeightChangedSinceSort()) { - if (log.isDebugEnabled()) { - log.debug( - "Node's sort is out-of-date, adding back to selection list: {}", - node.getNode()); - } + log.debug("Node's sort is out-of-date, adding back to selection list: {}", node); node.addToSortedCollection(nodesForReplicaType); // The node will be re-sorted, // so go back to the top of the loop to get the new lowest-sorted node continue; } - if (log.isDebugEnabled()) { - log.debug("Node chosen to host replica: {}", node.getNode()); + // If there is a tie, we want to come back later and try again, but only if the request + // can be requeued + // TODO: Make this logic better + if (!pendingRequests.isEmpty() + && request.canBeRequeued() + && !nodesForReplicaType.isEmpty()) { + while (nodesForReplicaType.peek().hasWeightChangedSinceSort()) { + nodesForReplicaType.poll().addToSortedCollection(nodesForReplicaType); + } + if (nodesForReplicaType.peek().lastSortedWeight == node.lastSortedWeight) { + log.debug( + "There is a tie for best weight, try this placement request later: {}", node); + retryRequestLater = true; + break; + } } + log.debug("Node chosen to host replica: {}", node); boolean needsToResortAll = node.addReplica( createProjectedReplica(solrCollection, shardName, replicaType, node.getNode())); replicasPlaced += 1; - replicaPlacements.add( + request.addPlacement( placementContext .getPlacementPlanFactory() .createReplicaPlacement( @@ -150,9 +154,7 @@ public List computePlacements( // Only update the priorityQueue if there are still replicas to be placed if (replicasPlaced < replicaCount) { if (needsToResortAll) { - if (log.isDebugEnabled()) { - log.debug("Replica addition requires re-sorting of entire selection list"); - } + log.debug("Replica addition requires re-sorting of entire selection list"); List nodeList = new ArrayList<>(nodesForReplicaType); nodesForReplicaType.clear(); nodeList.forEach(n -> n.addToSortedCollection(nodesForReplicaType)); @@ -167,7 +169,7 @@ public List computePlacements( } } - if (replicasPlaced < replicaCount) { + if (!retryRequestLater && replicasPlaced < replicaCount) { throw new PlacementException( String.format( Locale.ROOT, @@ -180,11 +182,10 @@ public List computePlacements( } } } - - placementPlans.add( - placementContext - .getPlacementPlanFactory() - .createPlacementPlan(request, replicaPlacements)); + if (request.isPending()) { + request.requeue(); + pendingRequests.add(request); + } } return placementPlans; } @@ -613,6 +614,11 @@ public boolean equals(Object o) { } } } + + @Override + public String toString() { + return node.getName(); + } } /** @@ -699,4 +705,94 @@ public Node getNode() { } }; } + + /** Context for a placement request still has replicas that need to be placed. */ + class PendingPlacementRequest { + boolean hasBeenRequeued; + + final SolrCollection collection; + + final Set targetNodes; + + final Set computedPlacements; + + final Map> replicasToPlaceForShards; + + public PendingPlacementRequest(PlacementRequest request) { + hasBeenRequeued = false; + collection = request.getCollection(); + targetNodes = request.getTargetNodes(); + Set shards = request.getShardNames(); + replicasToPlaceForShards = CollectionUtil.newHashMap(shards.size()); + shards.forEach(s -> replicasToPlaceForShards.put(s, new HashMap<>())); + int totalShardReplicas = 0; + for (Replica.ReplicaType type : Replica.ReplicaType.values()) { + int count = request.getCountReplicasToCreate(type); + if (count > 0) { + totalShardReplicas += count; + shards.forEach(s -> replicasToPlaceForShards.get(s).put(type, count)); + } + } + computedPlacements = CollectionUtil.newHashSet(totalShardReplicas * shards.size()); + } + + public boolean isPending() { + return !replicasToPlaceForShards.isEmpty(); + } + + public SolrCollection getCollection() { + return collection; + } + + public boolean isTargetingNode(WeightedNode node) { + return targetNodes.contains(node.getNode()); + } + + public Set getComputedPlacementSet() { + return computedPlacements; + } + + public Set getPendingShards() { + return new HashSet<>(replicasToPlaceForShards.keySet()); + } + + public Set getPendingReplicaTypes(String shard) { + return new HashSet<>( + replicasToPlaceForShards.getOrDefault(shard, Collections.emptyMap()).keySet()); + } + + public int getPendingReplicas(String shard, Replica.ReplicaType type) { + return Optional.ofNullable(replicasToPlaceForShards.get(shard)) + .map(m -> m.get(type)) + .orElse(0); + } + + /** + * Only allow one requeue + * + * @return true if the request has not been requeued already + */ + public boolean canBeRequeued() { + return !hasBeenRequeued; + } + + public void requeue() { + hasBeenRequeued = true; + } + + public void addPlacement(ReplicaPlacement replica) { + computedPlacements.add(replica); + replicasToPlaceForShards.computeIfPresent( + replica.getShardName(), + (shard, replicaTypes) -> { + replicaTypes.computeIfPresent( + replica.getReplicaType(), (type, count) -> (count == 1) ? null : count - 1); + if (replicaTypes.size() > 0) { + return replicaTypes; + } else { + return null; + } + }); + } + } } diff --git a/solr/core/src/test/org/apache/solr/cloud/MigrateReplicasTest.java b/solr/core/src/test/org/apache/solr/cloud/MigrateReplicasTest.java index 30da6f03e845..0935770b9f5c 100644 --- a/solr/core/src/test/org/apache/solr/cloud/MigrateReplicasTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/MigrateReplicasTest.java @@ -135,18 +135,19 @@ public void test() throws Exception { try (SolrClient coreClient = getHttpSolrClient(zkStateReader.getBaseUrlForNodeName(nodeToBeDecommissioned))) { CoreAdminResponse status = CoreAdminRequest.getStatus(null, coreClient); - assertEquals(0, status.getCoreStatus().size()); + assertEquals( + "There should not be any cores left on decommissioned node", + 0, + status.getCoreStatus().size()); } Thread.sleep(5000); - collection = cloudClient.getClusterState().getCollection(coll); + collection = cloudClient.getClusterState().getCollectionOrNull(coll, false); log.debug("### After decommission: {}", collection); // check what are replica states on the decommissioned node - List replicas = collection.getReplicas(nodeToBeDecommissioned); - if (replicas == null) { - replicas = Collections.emptyList(); - } - log.debug("### Existing replicas on decommissioned node: {}", replicas); + assertNull( + "There should not be any replicas left on decommissioned node", + collection.getReplicas(nodeToBeDecommissioned)); // let's do it back - this time wait for recoveries response = @@ -183,13 +184,13 @@ public void test() throws Exception { } // make sure all newly created replicas on node are active List newReplicas = collection.getReplicas(nodeToBeDecommissioned); - replicas.forEach(r -> newReplicas.removeIf(nr -> nr.getName().equals(r.getName()))); - assertFalse(newReplicas.isEmpty()); + assertNotNull("There should be replicas on the migrated-to node", newReplicas); + assertFalse("There should be replicas on the migrated-to node", newReplicas.isEmpty()); for (Replica r : newReplicas) { assertEquals(r.toString(), Replica.State.ACTIVE, r.getState()); } // make sure all replicas on emptyNode are not active - replicas = collection.getReplicas(emptyNode); + List replicas = collection.getReplicas(emptyNode); if (replicas != null) { for (Replica r : replicas) { assertNotEquals(r.toString(), Replica.State.ACTIVE, r.getState()); @@ -257,7 +258,6 @@ public void testGoodSpreadDuringAssignWithNoTarget() throws Exception { // TestInjection#waitForInSyncWithLeader CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(coll, "conf1", 3, 2, 0, 0); - create.setCreateNodeSet(StrUtils.join(l, ',')); cloudClient.request(create); cluster.waitForActiveCollection( @@ -269,7 +269,7 @@ public void testGoodSpreadDuringAssignWithNoTarget() throws Exception { + create.getNumTlogReplicas())); DocCollection initialCollection = cloudClient.getClusterState().getCollection(coll); - log.debug("### Before decommission: {}", initialCollection); + log.info("### Before decommission: {}", initialCollection); List initialReplicaCounts = l.stream() .map(node -> initialCollection.getReplicas(node).size()) @@ -286,7 +286,7 @@ public void testGoodSpreadDuringAssignWithNoTarget() throws Exception { DocCollection collection = cloudClient.getClusterState().getCollectionOrNull(coll, false); assertNotNull("Collection cannot be null: " + coll, collection); - log.debug("### After decommission: {}", collection); + log.info("### After decommission: {}", collection); // check what are replica states on the decommissioned nodes for (String nodeToBeDecommissioned : nodesToBeDecommissioned) { List replicas = collection.getReplicas(nodeToBeDecommissioned); From e83b96ce73e18cdc4fb01a548d13b496e212461c Mon Sep 17 00:00:00 2001 From: Houston Putman Date: Thu, 29 Jun 2023 15:34:59 -0400 Subject: [PATCH 08/15] fixes --- .../cluster/placement/plugins/OrderedNodePlacementPlugin.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/plugins/OrderedNodePlacementPlugin.java b/solr/core/src/java/org/apache/solr/cluster/placement/plugins/OrderedNodePlacementPlugin.java index b11aa30eaded..3e6e3a29e9aa 100644 --- a/solr/core/src/java/org/apache/solr/cluster/placement/plugins/OrderedNodePlacementPlugin.java +++ b/solr/core/src/java/org/apache/solr/cluster/placement/plugins/OrderedNodePlacementPlugin.java @@ -725,7 +725,7 @@ public String toString() { } /** Context for a placement request still has replicas that need to be placed. */ - class PendingPlacementRequest { + static class PendingPlacementRequest { boolean hasBeenRequeued; final SolrCollection collection; From b9659279a56c4c9db39456c1801885deb3605241 Mon Sep 17 00:00:00 2001 From: Houston Putman Date: Thu, 29 Jun 2023 15:43:14 -0400 Subject: [PATCH 09/15] Tidy --- .../placement/plugins/OrderedNodePlacementPlugin.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/plugins/OrderedNodePlacementPlugin.java b/solr/core/src/java/org/apache/solr/cluster/placement/plugins/OrderedNodePlacementPlugin.java index 3e6e3a29e9aa..bda83411b51c 100644 --- a/solr/core/src/java/org/apache/solr/cluster/placement/plugins/OrderedNodePlacementPlugin.java +++ b/solr/core/src/java/org/apache/solr/cluster/placement/plugins/OrderedNodePlacementPlugin.java @@ -617,7 +617,12 @@ public boolean equals(Object o) { @Override public String toString() { - return "WeightedNode{" + "node=" + node.getName() + ", lastSortedWeight=" + lastSortedWeight + '}'; + return "WeightedNode{" + + "node=" + + node.getName() + + ", lastSortedWeight=" + + lastSortedWeight + + '}'; } } From 60458d2024a50abbcd5244f17bd9cf7a70af010b Mon Sep 17 00:00:00 2001 From: Houston Putman Date: Fri, 30 Jun 2023 13:44:34 -0400 Subject: [PATCH 10/15] Make tie-logic full-featured - Remove old WeightedNode sorting logic that is no longer needed - Remove Balancing stashed weight logic that isn't used --- .../plugins/OrderedNodePlacementPlugin.java | 260 ++++++++++-------- .../plugins/RandomPlacementFactory.java | 10 +- 2 files changed, 146 insertions(+), 124 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/plugins/OrderedNodePlacementPlugin.java b/solr/core/src/java/org/apache/solr/cluster/placement/plugins/OrderedNodePlacementPlugin.java index bda83411b51c..c26f129c6d33 100644 --- a/solr/core/src/java/org/apache/solr/cluster/placement/plugins/OrderedNodePlacementPlugin.java +++ b/solr/core/src/java/org/apache/solr/cluster/placement/plugins/OrderedNodePlacementPlugin.java @@ -27,15 +27,16 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Optional; -import java.util.PriorityQueue; import java.util.Set; +import java.util.TreeMap; import java.util.TreeSet; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.IntSupplier; +import java.util.function.Function; import java.util.stream.Collectors; import org.apache.solr.cluster.Node; import org.apache.solr.cluster.Replica; @@ -103,42 +104,37 @@ public List computePlacements( replicaType); } Replica pr = createProjectedReplica(solrCollection, shardName, replicaType, null); - PriorityQueue nodesForReplicaType = new PriorityQueue<>(); + NodeHeap nodesForReplicaType = new NodeHeap(n -> n.calcRelevantWeightWithReplica(pr)); nodesForRequest.stream() .filter(n -> n.canAddReplica(pr)) - .forEach( - n -> { - n.sortByRelevantWeightWithReplica(pr); - n.addToSortedCollection(nodesForReplicaType); - }); + .forEach(nodesForReplicaType::add); int replicasPlaced = 0; boolean retryRequestLater = false; while (!nodesForReplicaType.isEmpty() && replicasPlaced < replicaCount) { WeightedNode node = nodesForReplicaType.poll(); - if (node.hasWeightChangedSinceSort()) { - log.debug("Node's sort is out-of-date, adding back to selection list: {}", node); - node.addToSortedCollection(nodesForReplicaType); - // The node will be re-sorted, - // so go back to the top of the loop to get the new lowest-sorted node + if (!node.canAddReplica(pr)) { + log.debug("Node can no-longer add the given replica, move on to next node: {}", node); continue; } - // If there is a tie, we want to come back later and try again, but only if the request - // can be requeued - // TODO: Make this logic better + + // If there is a tie, and there are more node options than we have replicas to place, + // then we want to come back later and try again. If there are ties, but less tie + // options than + // we have replicas to place, that's ok, because the replicas will be put on all the tie + // options probably. + // Only skip the request if it can be requeued. if (!pendingRequests.isEmpty() && request.canBeRequeued() - && !nodesForReplicaType.isEmpty()) { - while (nodesForReplicaType.peek().hasWeightChangedSinceSort()) { - nodesForReplicaType.poll().addToSortedCollection(nodesForReplicaType); - } - if (nodesForReplicaType.peek().lastSortedWeight == node.lastSortedWeight) { - log.debug( - "There is a tie for best weight, try this placement request later: {}", node); - retryRequestLater = true; - break; - } + && nodesForReplicaType.peekTies() > (replicaCount - replicasPlaced)) { + log.debug( + "There is a tie for best weight. There are more options ({}) than replicas to place ({}), so try this placement request later: {}", + nodesForReplicaType.peekTies(), + replicaCount - replicasPlaced, + node); + retryRequestLater = true; + break; } log.debug("Node chosen to host replica: {}", node); @@ -155,9 +151,7 @@ public List computePlacements( if (replicasPlaced < replicaCount) { if (needsToResortAll) { log.debug("Replica addition requires re-sorting of entire selection list"); - List nodeList = new ArrayList<>(nodesForReplicaType); - nodesForReplicaType.clear(); - nodeList.forEach(n -> n.addToSortedCollection(nodesForReplicaType)); + nodesForReplicaType.resortAll(); } // Add the chosen node back to the list if it can accept another replica of the // shard/replicaType. @@ -195,23 +189,17 @@ public BalancePlan computeBalancing( BalanceRequest balanceRequest, PlacementContext placementContext) throws PlacementException { Map replicaMovements = new HashMap<>(); TreeSet orderedNodes = new TreeSet<>(); - Collection weightedNodes = + orderedNodes.addAll( getWeightedNodes( placementContext, balanceRequest.getNodes(), placementContext.getCluster().collections(), true) - .values(); - // This is critical to store the last sort weight for this node - weightedNodes.forEach( - node -> { - node.sortWithoutChanges(); - node.addToSortedCollection(orderedNodes); - }); + .values()); // While the node with the lowest weight still has room to take a replica from the node with the // highest weight, loop - Map newReplicaMovements = new HashMap<>(); + Map newReplicaMovements = CollectionUtil.newHashMap(1); ArrayList traversedHighNodes = new ArrayList<>(orderedNodes.size() - 1); while (orderedNodes.size() > 1 && orderedNodes.first().calcWeight() < orderedNodes.last().calcWeight()) { @@ -219,22 +207,7 @@ public BalancePlan computeBalancing( if (lowestWeight == null) { break; } - if (lowestWeight.hasWeightChangedSinceSort()) { - if (log.isDebugEnabled()) { - log.debug( - "Re-sorting lowest weighted node: {}, sorting weight is out-of-date.", - lowestWeight.getNode().getName()); - } - // Re-sort this node and go back to find the lowest weight - lowestWeight.addToSortedCollection(orderedNodes); - continue; - } - if (log.isDebugEnabled()) { - log.debug( - "Lowest weighted node: {}, weight: {}", - lowestWeight.getNode().getName(), - lowestWeight.calcWeight()); - } + log.debug("Highest weighted node: {}", lowestWeight); newReplicaMovements.clear(); // If a compatible node was found to move replicas, break and find the lowest weighted node @@ -246,22 +219,7 @@ public BalancePlan computeBalancing( if (highestWeight == null) { break; } - if (highestWeight.hasWeightChangedSinceSort()) { - if (log.isDebugEnabled()) { - log.debug( - "Re-sorting highest weighted node: {}, sorting weight is out-of-date.", - highestWeight.getNode().getName()); - } - // Re-sort this node and go back to find the highest weight - highestWeight.addToSortedCollection(orderedNodes); - continue; - } - if (log.isDebugEnabled()) { - log.debug( - "Highest weighted node: {}, weight: {}", - highestWeight.getNode().getName(), - highestWeight.calcWeight()); - } + log.debug("Highest weighted node: {}", highestWeight); traversedHighNodes.add(highestWeight); // select a replica from the node with the most cores to move to the node with the least @@ -299,13 +257,11 @@ public BalancePlan computeBalancing( highestWeight.addReplica(r); continue; } - if (log.isDebugEnabled()) { - log.debug( - "Replica Movement chosen. From: {}, To: {}, Replica: {}", - highestWeight.getNode().getName(), - lowestWeight.getNode().getName(), - r); - } + log.debug( + "Replica Movement chosen. From: {}, To: {}, Replica: {}", + highestWeight, + lowestWeight, + r); newReplicaMovements.put(r, lowestWeight.getNode()); // Do not go beyond here, do another loop and see if other nodes can move replicas. @@ -322,12 +278,12 @@ public BalancePlan computeBalancing( // Add back in the traversed highNodes that we did not select replicas from, // they might have replicas to move to the next lowestWeighted node - traversedHighNodes.forEach(n -> n.addToSortedCollection(orderedNodes)); + orderedNodes.addAll(traversedHighNodes); traversedHighNodes.clear(); if (newReplicaMovements.size() > 0) { replicaMovements.putAll(newReplicaMovements); // There are no replicas to move to the lowestWeight, remove it from our loop - lowestWeight.addToSortedCollection(orderedNodes); + orderedNodes.add(lowestWeight); } } @@ -438,22 +394,10 @@ protected void verifyDeleteReplicas( public abstract static class WeightedNode implements Comparable { private final Node node; private final Map>> replicas; - private IntSupplier sortWeightCalculator; - private int lastSortedWeight; public WeightedNode(Node node) { this.node = node; this.replicas = new HashMap<>(); - this.lastSortedWeight = 0; - this.sortWeightCalculator = this::calcWeight; - } - - public void sortByRelevantWeightWithReplica(Replica replica) { - sortWeightCalculator = () -> calcRelevantWeightWithReplica(replica); - } - - public void sortWithoutChanges() { - sortWeightCalculator = this::calcWeight; } public Node getNode() { @@ -491,11 +435,6 @@ public Set getReplicasForShardOnNode(Shard shard) { .orElseGet(Collections::emptySet); } - public void addToSortedCollection(Collection collection) { - stashSortedWeight(); - collection.add(this); - } - public abstract int calcWeight(); public abstract int calcRelevantWeightWithReplica(Replica replica); @@ -572,14 +511,6 @@ public final void removeReplica(Replica replica) { protected abstract void removeProjectedReplicaWeights(Replica replica); - private void stashSortedWeight() { - lastSortedWeight = sortWeightCalculator.getAsInt(); - } - - protected boolean hasWeightChangedSinceSort() { - return lastSortedWeight != sortWeightCalculator.getAsInt(); - } - @SuppressWarnings({"rawtypes"}) protected Comparable getTiebreaker() { return node.getName(); @@ -588,7 +519,7 @@ protected Comparable getTiebreaker() { @Override @SuppressWarnings({"unchecked"}) public int compareTo(WeightedNode o) { - int comp = Integer.compare(this.lastSortedWeight, o.lastSortedWeight); + int comp = Integer.compare(this.calcWeight(), o.calcWeight()); if (comp == 0 && !equals(o)) { // TreeSets do not like a 0 comp for non-equal members. comp = getTiebreaker().compareTo(o.getTiebreaker()); @@ -617,12 +548,7 @@ public boolean equals(Object o) { @Override public String toString() { - return "WeightedNode{" - + "node=" - + node.getName() - + ", lastSortedWeight=" - + lastSortedWeight - + '}'; + return "WeightedNode{" + "node=" + node.getName() + ", weight=" + calcWeight() + '}'; } } @@ -729,6 +655,106 @@ public String toString() { }; } + private static class NodeHeap { + final Function weightFunc; + + final TreeMap> nodesByWeight; + + List currentLowestList; + int currentLowestWeight; + + int size = 0; + + protected NodeHeap(Function weightFunc) { + this.weightFunc = weightFunc; + nodesByWeight = new TreeMap<>(); + currentLowestList = null; + currentLowestWeight = -1; + } + + protected WeightedNode poll() { + updateLowestWeightedList(); + if (currentLowestList == null) { + return null; + } else { + size--; + return currentLowestList.remove(0); + } + } + + /** + * PeekTies should only be called after poll(). + * + * @return the number of nodes that have a weight tied with the WeightedNode returned in poll() + */ + protected int peekTies() { + return currentLowestList == null ? 1 : currentLowestList.size() + 1; + } + + private void updateLowestWeightedList() { + if (currentLowestList != null) { + currentLowestList.removeIf( + node -> { + if (weightFunc.apply(node) != currentLowestWeight) { + log.debug("Node's sort is out-of-date, re-sorting: {}", node); + add(node); + return true; + } + return false; + }); + } + while (currentLowestList == null || currentLowestList.isEmpty()) { + Map.Entry> lowestEntry = nodesByWeight.pollFirstEntry(); + if (lowestEntry == null) { + currentLowestList = null; + currentLowestWeight = -1; + } else { + currentLowestList = lowestEntry.getValue(); + currentLowestWeight = lowestEntry.getKey(); + currentLowestList.removeIf( + node -> { + if (weightFunc.apply(node) != currentLowestWeight) { + log.debug("Node's sort is out-of-date, re-sorting: {}", node); + add(node); + return true; + } + return false; + }); + } + } + } + + public void add(WeightedNode node) { + size++; + int nodeWeight = weightFunc.apply(node); + if (currentLowestWeight == nodeWeight) { + currentLowestList.add(node); + } else { + nodesByWeight.computeIfAbsent(nodeWeight, w -> new LinkedList<>()).add(node); + } + } + + public int size() { + return size; + } + + public boolean isEmpty() { + return size == 0; + } + + public void resortAll() { + ArrayList temp = new ArrayList<>(size); + if (currentLowestList != null) { + temp.addAll(currentLowestList); + currentLowestList.clear(); + } + nodesByWeight.values().forEach(temp::addAll); + currentLowestWeight = -1; + nodesByWeight.clear(); + temp.forEach(this::add); + } + } + /** Context for a placement request still has replicas that need to be placed. */ static class PendingPlacementRequest { boolean hasBeenRequeued; @@ -775,13 +801,15 @@ public Set getComputedPlacementSet() { return computedPlacements; } - public Set getPendingShards() { - return new HashSet<>(replicasToPlaceForShards.keySet()); + public Collection getPendingShards() { + return new ArrayList<>(replicasToPlaceForShards.keySet()); } - public Set getPendingReplicaTypes(String shard) { - return new HashSet<>( - replicasToPlaceForShards.getOrDefault(shard, Collections.emptyMap()).keySet()); + public Collection getPendingReplicaTypes(String shard) { + return Optional.ofNullable(replicasToPlaceForShards.get(shard)) + .map(Map::keySet) + .>map(TreeSet::new) + .orElseGet(Collections::emptyList); } public int getPendingReplicas(String shard, Replica.ReplicaType type) { diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/plugins/RandomPlacementFactory.java b/solr/core/src/java/org/apache/solr/cluster/placement/plugins/RandomPlacementFactory.java index 1e0f6a2f5ba4..0b2279b34fac 100644 --- a/solr/core/src/java/org/apache/solr/cluster/placement/plugins/RandomPlacementFactory.java +++ b/solr/core/src/java/org/apache/solr/cluster/placement/plugins/RandomPlacementFactory.java @@ -17,7 +17,6 @@ package org.apache.solr.cluster.placement.plugins; -import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.Random; @@ -108,20 +107,15 @@ public int calcRelevantWeightWithReplica(Replica replica) { @Override protected boolean addProjectedReplicaWeights(Replica replica) { + randomTiebreaker = random.nextInt(); // NO-OP return false; } @Override protected void removeProjectedReplicaWeights(Replica replica) { - // NO-OP - } - - @Override - public void addToSortedCollection( - Collection collection) { randomTiebreaker = random.nextInt(); - super.addToSortedCollection(collection); + // NO-OP } } } From 577f566909eca9417b88fddf1478410e0397035d Mon Sep 17 00:00:00 2001 From: Houston Putman Date: Fri, 30 Jun 2023 13:48:03 -0400 Subject: [PATCH 11/15] Small fixes --- .../placement/plugins/OrderedNodePlacementPlugin.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/plugins/OrderedNodePlacementPlugin.java b/solr/core/src/java/org/apache/solr/cluster/placement/plugins/OrderedNodePlacementPlugin.java index c26f129c6d33..31f3efb0c472 100644 --- a/solr/core/src/java/org/apache/solr/cluster/placement/plugins/OrderedNodePlacementPlugin.java +++ b/solr/core/src/java/org/apache/solr/cluster/placement/plugins/OrderedNodePlacementPlugin.java @@ -125,12 +125,13 @@ public List computePlacements( // we have replicas to place, that's ok, because the replicas will be put on all the tie // options probably. // Only skip the request if it can be requeued. + int numWeightTies = nodesForReplicaType.peekTies(); if (!pendingRequests.isEmpty() && request.canBeRequeued() - && nodesForReplicaType.peekTies() > (replicaCount - replicasPlaced)) { + && numWeightTies > (replicaCount - replicasPlaced)) { log.debug( "There is a tie for best weight. There are more options ({}) than replicas to place ({}), so try this placement request later: {}", - nodesForReplicaType.peekTies(), + numWeightTies, replicaCount - replicasPlaced, node); retryRequestLater = true; From 7203d4b31e94a913f35eab102d38abaa848ef7e3 Mon Sep 17 00:00:00 2001 From: Houston Putman Date: Fri, 30 Jun 2023 13:56:11 -0400 Subject: [PATCH 12/15] Refactor shared code into method. --- .../plugins/OrderedNodePlacementPlugin.java | 35 +++++++++---------- 1 file changed, 16 insertions(+), 19 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/plugins/OrderedNodePlacementPlugin.java b/solr/core/src/java/org/apache/solr/cluster/placement/plugins/OrderedNodePlacementPlugin.java index 31f3efb0c472..feb816e3e2d4 100644 --- a/solr/core/src/java/org/apache/solr/cluster/placement/plugins/OrderedNodePlacementPlugin.java +++ b/solr/core/src/java/org/apache/solr/cluster/placement/plugins/OrderedNodePlacementPlugin.java @@ -693,6 +693,22 @@ protected int peekTies() { } private void updateLowestWeightedList() { + recheckLowestWeights(); + while (currentLowestList == null || currentLowestList.isEmpty()) { + Map.Entry> lowestEntry = nodesByWeight.pollFirstEntry(); + if (lowestEntry == null) { + currentLowestList = null; + currentLowestWeight = -1; + break; + } else { + currentLowestList = lowestEntry.getValue(); + currentLowestWeight = lowestEntry.getKey(); + recheckLowestWeights(); + } + } + } + + private void recheckLowestWeights() { if (currentLowestList != null) { currentLowestList.removeIf( node -> { @@ -704,25 +720,6 @@ private void updateLowestWeightedList() { return false; }); } - while (currentLowestList == null || currentLowestList.isEmpty()) { - Map.Entry> lowestEntry = nodesByWeight.pollFirstEntry(); - if (lowestEntry == null) { - currentLowestList = null; - currentLowestWeight = -1; - } else { - currentLowestList = lowestEntry.getValue(); - currentLowestWeight = lowestEntry.getKey(); - currentLowestList.removeIf( - node -> { - if (weightFunc.apply(node) != currentLowestWeight) { - log.debug("Node's sort is out-of-date, re-sorting: {}", node); - add(node); - return true; - } - return false; - }); - } - } } public void add(WeightedNode node) { From dd1b36e8c200665848d1de24582b285dd858129e Mon Sep 17 00:00:00 2001 From: Houston Putman Date: Fri, 30 Jun 2023 14:28:30 -0400 Subject: [PATCH 13/15] Use dequeue instead of linkedList --- .../plugins/OrderedNodePlacementPlugin.java | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/plugins/OrderedNodePlacementPlugin.java b/solr/core/src/java/org/apache/solr/cluster/placement/plugins/OrderedNodePlacementPlugin.java index feb816e3e2d4..e4e9853178e1 100644 --- a/solr/core/src/java/org/apache/solr/cluster/placement/plugins/OrderedNodePlacementPlugin.java +++ b/solr/core/src/java/org/apache/solr/cluster/placement/plugins/OrderedNodePlacementPlugin.java @@ -27,7 +27,6 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; -import java.util.LinkedList; import java.util.List; import java.util.Locale; import java.util.Map; @@ -659,9 +658,9 @@ public String toString() { private static class NodeHeap { final Function weightFunc; - final TreeMap> nodesByWeight; + final TreeMap> nodesByWeight; - List currentLowestList; + Deque currentLowestList; int currentLowestWeight; int size = 0; @@ -675,11 +674,11 @@ protected NodeHeap(Function weightFunc) { protected WeightedNode poll() { updateLowestWeightedList(); - if (currentLowestList == null) { + if (currentLowestList == null || currentLowestList.isEmpty()) { return null; } else { size--; - return currentLowestList.remove(0); + return currentLowestList.pollFirst(); } } @@ -695,7 +694,7 @@ protected int peekTies() { private void updateLowestWeightedList() { recheckLowestWeights(); while (currentLowestList == null || currentLowestList.isEmpty()) { - Map.Entry> lowestEntry = nodesByWeight.pollFirstEntry(); + Map.Entry> lowestEntry = nodesByWeight.pollFirstEntry(); if (lowestEntry == null) { currentLowestList = null; currentLowestWeight = -1; @@ -728,7 +727,7 @@ public void add(WeightedNode node) { if (currentLowestWeight == nodeWeight) { currentLowestList.add(node); } else { - nodesByWeight.computeIfAbsent(nodeWeight, w -> new LinkedList<>()).add(node); + nodesByWeight.computeIfAbsent(nodeWeight, w -> new ArrayDeque<>()).addLast(node); } } From c55611588a5bd2f635c1112b8db41efd49ba90af Mon Sep 17 00:00:00 2001 From: Houston Putman Date: Fri, 30 Jun 2023 15:46:11 -0400 Subject: [PATCH 14/15] Fix scenario when the placementRequest doesn't actually request any replicas... --- .../plugins/OrderedNodePlacementPlugin.java | 12 +++++++++--- .../collections/TestRequestStatusCollectionAPI.java | 1 + 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/plugins/OrderedNodePlacementPlugin.java b/solr/core/src/java/org/apache/solr/cluster/placement/plugins/OrderedNodePlacementPlugin.java index e4e9853178e1..cef46b7b200d 100644 --- a/solr/core/src/java/org/apache/solr/cluster/placement/plugins/OrderedNodePlacementPlugin.java +++ b/solr/core/src/java/org/apache/solr/cluster/placement/plugins/OrderedNodePlacementPlugin.java @@ -85,12 +85,15 @@ public List computePlacements( getWeightedNodes(placementContext, allNodes, allCollections, true).values(); while (!pendingRequests.isEmpty()) { PendingPlacementRequest request = pendingRequests.poll(); + if (!request.isPending()) { + continue; + } List nodesForRequest = weightedNodes.stream().filter(request::isTargetingNode).collect(Collectors.toList()); SolrCollection solrCollection = request.getCollection(); - // Now place randomly all replicas of all shards on available nodes + // Now place all replicas of all shards on available nodes for (String shardName : request.getPendingShards()) { for (Replica.ReplicaType replicaType : request.getPendingReplicaTypes(shardName)) { int replicaCount = request.getPendingReplicas(shardName, replicaType); @@ -770,13 +773,16 @@ public PendingPlacementRequest(PlacementRequest request) { targetNodes = request.getTargetNodes(); Set shards = request.getShardNames(); replicasToPlaceForShards = CollectionUtil.newHashMap(shards.size()); - shards.forEach(s -> replicasToPlaceForShards.put(s, new HashMap<>())); int totalShardReplicas = 0; for (Replica.ReplicaType type : Replica.ReplicaType.values()) { int count = request.getCountReplicasToCreate(type); if (count > 0) { totalShardReplicas += count; - shards.forEach(s -> replicasToPlaceForShards.get(s).put(type, count)); + shards.forEach( + s -> + replicasToPlaceForShards + .computeIfAbsent(s, sh -> CollectionUtil.newHashMap(3)) + .put(type, count)); } } computedPlacements = CollectionUtil.newHashSet(totalShardReplicas * shards.size()); diff --git a/solr/core/src/test/org/apache/solr/cloud/api/collections/TestRequestStatusCollectionAPI.java b/solr/core/src/test/org/apache/solr/cloud/api/collections/TestRequestStatusCollectionAPI.java index bf4be82d6b74..6d3f208761a1 100644 --- a/solr/core/src/test/org/apache/solr/cloud/api/collections/TestRequestStatusCollectionAPI.java +++ b/solr/core/src/test/org/apache/solr/cloud/api/collections/TestRequestStatusCollectionAPI.java @@ -219,6 +219,7 @@ private NamedList sendStatusRequestWithRetry(ModifiableSolrParams params try { Thread.sleep(1000); } catch (InterruptedException e) { + break; } } // Return last state? From b18e749cd3f6ee84e328ba2e5985a320ff351705 Mon Sep 17 00:00:00 2001 From: Houston Putman Date: Wed, 5 Jul 2023 13:01:19 -0400 Subject: [PATCH 15/15] Add docs for the new internal classes --- .../plugins/OrderedNodePlacementPlugin.java | 104 ++++++++++++++++-- 1 file changed, 97 insertions(+), 7 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/plugins/OrderedNodePlacementPlugin.java b/solr/core/src/java/org/apache/solr/cluster/placement/plugins/OrderedNodePlacementPlugin.java index cef46b7b200d..de83db8e967b 100644 --- a/solr/core/src/java/org/apache/solr/cluster/placement/plugins/OrderedNodePlacementPlugin.java +++ b/solr/core/src/java/org/apache/solr/cluster/placement/plugins/OrderedNodePlacementPlugin.java @@ -106,6 +106,11 @@ public List computePlacements( replicaType); } Replica pr = createProjectedReplica(solrCollection, shardName, replicaType, null); + + // Create a NodeHeap so that we have access to the number of ties for the lowestWeighted + // node. + // Sort this heap by the relevant weight of the node given that the replica has been + // added. NodeHeap nodesForReplicaType = new NodeHeap(n -> n.calcRelevantWeightWithReplica(pr)); nodesForRequest.stream() .filter(n -> n.canAddReplica(pr)) @@ -123,10 +128,11 @@ public List computePlacements( // If there is a tie, and there are more node options than we have replicas to place, // then we want to come back later and try again. If there are ties, but less tie - // options than - // we have replicas to place, that's ok, because the replicas will be put on all the tie - // options probably. - // Only skip the request if it can be requeued. + // options than we have replicas to place, that's ok, because the replicas will likely + // be put on all the tie options. + // + // Only skip the request if it can be requeued, and there are other pending requests to + // compute. int numWeightTies = nodesForReplicaType.peekTies(); if (!pendingRequests.isEmpty() && request.canBeRequeued() @@ -658,6 +664,11 @@ public String toString() { }; } + /** + * A heap that stores Nodes, sorting them by a given function. + * + *

A normal Java heap class cannot be used, because the {@link #peekTies()} method is required. + */ private static class NodeHeap { final Function weightFunc; @@ -675,6 +686,12 @@ protected NodeHeap(Function weightFunc) { currentLowestWeight = -1; } + /** + * Remove and return the node with the lowest weight. There is no guarantee to the sorting of + * nodes that have equal weights. + * + * @return the node with the lowest weight + */ protected WeightedNode poll() { updateLowestWeightedList(); if (currentLowestList == null || currentLowestList.isEmpty()) { @@ -686,14 +703,18 @@ protected WeightedNode poll() { } /** - * PeekTies should only be called after poll(). + * Return the number of Nodes that are tied for the current lowest weight (using the given + * sorting function). + * + *

PeekTies should only be called after poll(). * - * @return the number of nodes that have a weight tied with the WeightedNode returned in poll() + * @return the number of nodes that are tied for the lowest weight */ protected int peekTies() { return currentLowestList == null ? 1 : currentLowestList.size() + 1; } + /** Make sure that the list that contains the nodes with the lowest weights is correct. */ private void updateLowestWeightedList() { recheckLowestWeights(); while (currentLowestList == null || currentLowestList.isEmpty()) { @@ -710,6 +731,10 @@ private void updateLowestWeightedList() { } } + /** + * Go through the list of Nodes with the lowest weight, and make sure that they are still the + * same weight. If their weight has increased, re-add the node to the heap. + */ private void recheckLowestWeights() { if (currentLowestList != null) { currentLowestList.removeIf( @@ -724,6 +749,11 @@ private void recheckLowestWeights() { } } + /** + * Add a node to the heap. + * + * @param node the node to add + */ public void add(WeightedNode node) { size++; int nodeWeight = weightFunc.apply(node); @@ -734,14 +764,29 @@ public void add(WeightedNode node) { } } + /** + * Get the number of nodes in the heap. + * + * @return number of nodes + */ public int size() { return size; } + /** + * Check if the heap is empty. + * + * @return if the heap has no nodes + */ public boolean isEmpty() { return size == 0; } + /** + * Re-sort all nodes in the heap, because their weights can no-longer be trusted. This is only + * necessary if nodes in the heap may have had their weights decrease. If the nodes just had + * their weights increase, then calling this is not required. + */ public void resortAll() { ArrayList temp = new ArrayList<>(size); if (currentLowestList != null) { @@ -763,8 +808,10 @@ static class PendingPlacementRequest { final Set targetNodes; + // A running list of placements already computed final Set computedPlacements; + // A live view on how many replicas still need to be placed for each shard & replica type final Map> replicasToPlaceForShards; public PendingPlacementRequest(PlacementRequest request) { @@ -788,6 +835,12 @@ public PendingPlacementRequest(PlacementRequest request) { computedPlacements = CollectionUtil.newHashSet(totalShardReplicas * shards.size()); } + /** + * Determine if this request is not yet complete, and there are requested replicas that have not + * had placements computed. + * + * @return if there are still replica placements that need to be computed + */ public boolean isPending() { return !replicasToPlaceForShards.isEmpty(); } @@ -800,21 +853,52 @@ public boolean isTargetingNode(WeightedNode node) { return targetNodes.contains(node.getNode()); } + /** + * The set of ReplicaPlacements computed for this request. + * + *

The list that is returned is the same list used internally, so it will be augmented until + * {@link #isPending()} returns false. + * + * @return The live set of replicaPlacements for this request. + */ public Set getComputedPlacementSet() { return computedPlacements; } + /** + * Fetch the list of shards that still have replicas that need placements computed. If all the + * requested replicas for a shard are represented in {@link #getComputedPlacementSet()}, then + * that shard will not be returned by this method. + * + * @return list of unfinished shards + */ public Collection getPendingShards() { return new ArrayList<>(replicasToPlaceForShards.keySet()); } + /** + * For the given shard, return the replica types that still have placements that need to be + * computed. + * + * @param shard name of the shard to check for uncomputed placements + * @return the set of unfinished replica types + */ public Collection getPendingReplicaTypes(String shard) { return Optional.ofNullable(replicasToPlaceForShards.get(shard)) .map(Map::keySet) + // Use a sorted TreeSet to make sure that tests are repeatable .>map(TreeSet::new) .orElseGet(Collections::emptyList); } + /** + * Fetch the number of replicas that still need to be placed for the given shard and replica + * type. + * + * @param shard name of shard to be place + * @param type type of replica to be placed + * @return the number of replicas that have not yet had placements computed + */ public int getPendingReplicas(String shard, Replica.ReplicaType type) { return Optional.ofNullable(replicasToPlaceForShards.get(shard)) .map(m -> m.get(type)) @@ -822,7 +906,7 @@ public int getPendingReplicas(String shard, Replica.ReplicaType type) { } /** - * Only allow one requeue + * Currently, only of requeue is allowed per pending request. * * @return true if the request has not been requeued already */ @@ -830,10 +914,16 @@ public boolean canBeRequeued() { return !hasBeenRequeued; } + /** Let the pending request know that it has been requeued */ public void requeue() { hasBeenRequeued = true; } + /** + * Track the given replica placement for this pending request. + * + * @param replica placement that has been made for the pending request + */ public void addPlacement(ReplicaPlacement replica) { computedPlacements.add(replica); replicasToPlaceForShards.computeIfPresent(