Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions solr/CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ New Features

* SOLR-16812: Support CBOR format for update/query (noble)

* SOLR-16855: Solr now provides a MigrateReplicas API at `POST /api/cluster/replicas/migrate` (v2), to move replicas
off of a given set of nodes. This extends the functionality of the existing ReplaceNode API. (Houston Putman)

Improvements
---------------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import static org.apache.solr.common.params.CollectionParams.CollectionAction.INSTALLSHARDDATA;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.MAINTAINROUTEDALIAS;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.MIGRATE;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.MIGRATE_REPLICAS;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOCK_COLL_TASK;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOCK_REPLICA_TASK;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOCK_SHARD_TASK;
Expand Down Expand Up @@ -143,6 +144,7 @@ private CommandMap(OverseerNodePrioritizer overseerPrioritizer, CollectionComman
commandMap =
Map.ofEntries(
Map.entry(REPLACENODE, new ReplaceNodeCmd(ccc)),
Map.entry(MIGRATE_REPLICAS, new MigrateReplicasCmd(ccc)),
Map.entry(BALANCE_REPLICAS, new BalanceReplicasCmd(ccc)),
Map.entry(DELETENODE, new DeleteNodeCmd(ccc)),
Map.entry(BACKUP, new BackupCmd(ccc)),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.solr.cloud.api.collections;

import static org.apache.solr.common.params.CommonAdminParams.ASYNC;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.ReplicaPosition;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.params.CommonAdminParams;
import org.apache.solr.common.util.CollectionUtil;
import org.apache.solr.common.util.NamedList;

public class MigrateReplicasCmd implements CollApiCmds.CollectionApiCommand {

private final CollectionCommandContext ccc;

public MigrateReplicasCmd(CollectionCommandContext ccc) {
this.ccc = ccc;
}

@Override
public void call(ClusterState state, ZkNodeProps message, NamedList<Object> results)
throws Exception {
ZkStateReader zkStateReader = ccc.getZkStateReader();
Set<String> sourceNodes = getNodesFromParam(message, CollectionParams.SOURCE_NODES);
Set<String> targetNodes = getNodesFromParam(message, CollectionParams.TARGET_NODES);
boolean waitForFinalState = message.getBool(CommonAdminParams.WAIT_FOR_FINAL_STATE, false);
if (sourceNodes.isEmpty()) {
throw new SolrException(
SolrException.ErrorCode.BAD_REQUEST, "sourceNodes is a required param");
}
String async = message.getStr(ASYNC);
int timeout = message.getInt("timeout", 10 * 60); // 10 minutes
boolean parallel = message.getBool("parallel", false);
ClusterState clusterState = zkStateReader.getClusterState();

for (String sourceNode : sourceNodes) {
if (!clusterState.liveNodesContain(sourceNode)) {
throw new SolrException(
SolrException.ErrorCode.BAD_REQUEST, "Source Node: " + sourceNode + " is not live");
}
Comment on lines +66 to +69
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be allowed?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently the ReplaceNode/BalanceReplicas logic does not allow for moving replicas off of non-live nodes. I think that's something that we should think about for a separate ticket. Especially now that cores are not deleted on startup if they don't exist in Zookeeper. #1321

Copy link
Copy Markdown
Member

@tflobbe tflobbe Jun 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how this will work currently. My thinking is, nodes that are removed, you want to "vacate" them and have the replicas be created somewhere else, so that:

  1. You don't see the node anymore in the "nodes" page and
  2. You get your replication factor back to whatever it was before

Would this API be the one to use in this case?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I do think this is the correct API for that, but it (REPLACENODE) is not currently able to do that. AFAIK.

I think we should make this a param as a part of MigrateReplicas and ReplaceNode. (migrateFromNonLiveNodes or something like that). That would somewhat necessitate solr.deleteUnknownCores, from the above linked PR, to be set to true, so that when the node is started back up, it deletes the data directories from those nodes.

Again, I think this should be done in a separate JIRA/PR.

}
for (String targetNode : targetNodes) {
if (!clusterState.liveNodesContain(targetNode)) {
throw new SolrException(
SolrException.ErrorCode.BAD_REQUEST, "Target Node: " + targetNode + " is not live");
}
}

if (targetNodes.isEmpty()) {
// If no target nodes are provided, use all other live nodes that are not the sourceNodes
targetNodes =
clusterState.getLiveNodes().stream()
.filter(n -> !sourceNodes.contains(n))
.collect(Collectors.toSet());
if (targetNodes.isEmpty()) {
throw new SolrException(
SolrException.ErrorCode.BAD_REQUEST,
"No nodes other than the source nodes are live, therefore replicas cannot be migrated");
}
}
List<Replica> sourceReplicas =
ReplicaMigrationUtils.getReplicasOfNodes(sourceNodes, clusterState);
Map<Replica, String> replicaMovements = CollectionUtil.newHashMap(sourceReplicas.size());

if (targetNodes.size() > 1) {
List<Assign.AssignRequest> assignRequests = new ArrayList<>(sourceReplicas.size());
List<String> targetNodeList = new ArrayList<>(targetNodes);
for (Replica sourceReplica : sourceReplicas) {
Replica.Type replicaType = sourceReplica.getType();
Assign.AssignRequest assignRequest =
new Assign.AssignRequestBuilder()
.forCollection(sourceReplica.getCollection())
.forShard(Collections.singletonList(sourceReplica.getShard()))
.assignNrtReplicas(replicaType == Replica.Type.NRT ? 1 : 0)
.assignTlogReplicas(replicaType == Replica.Type.TLOG ? 1 : 0)
.assignPullReplicas(replicaType == Replica.Type.PULL ? 1 : 0)
.onNodes(targetNodeList)
.build();
assignRequests.add(assignRequest);
}
Assign.AssignStrategy assignStrategy = Assign.createAssignStrategy(ccc.getCoreContainer());
List<ReplicaPosition> replicaPositions =
assignStrategy.assign(ccc.getSolrCloudManager(), assignRequests);
int position = 0;
for (Replica sourceReplica : sourceReplicas) {
replicaMovements.put(sourceReplica, replicaPositions.get(position++).node);
}
} else {
String targetNode = targetNodes.stream().findFirst().get();
for (Replica sourceReplica : sourceReplicas) {
replicaMovements.put(sourceReplica, targetNode);
}
}

boolean migrationSuccessful =
ReplicaMigrationUtils.migrateReplicas(
ccc, replicaMovements, parallel, waitForFinalState, timeout, async, results);
if (migrationSuccessful) {
results.add(
"success",
"MIGRATE_REPLICAS action completed successfully from : ["
+ String.join(",", sourceNodes)
+ "] to : ["
+ String.join(",", targetNodes)
+ "]");
}
}

@SuppressWarnings({"unchecked"})
protected Set<String> getNodesFromParam(ZkNodeProps message, String paramName) {
Object rawParam = message.get(paramName);
if (rawParam == null) {
return Collections.emptySet();
} else if (rawParam instanceof Set) {
return (Set<String>) rawParam;
} else if (rawParam instanceof Collection) {
return new HashSet<>((Collection<String>) rawParam);
} else if (rawParam instanceof String) {
return Set.of(((String) rawParam).split(","));
} else {
throw new SolrException(
SolrException.ErrorCode.BAD_REQUEST,
"'"
+ paramName
+ "' was not passed as a correct type (Set/List/String): "
+ rawParam.getClass().getName());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,20 @@ static boolean cleanupReplicas(
return cleanupLatch.await(5, TimeUnit.MINUTES);
}

static List<Replica> getReplicasOfNodes(Collection<String> nodeNames, ClusterState state) {
List<Replica> sourceReplicas = new ArrayList<>();
for (Map.Entry<String, DocCollection> e : state.getCollectionsMap().entrySet()) {
for (Slice slice : e.getValue().getSlices()) {
for (Replica replica : slice.getReplicas()) {
if (nodeNames.contains(replica.getNodeName())) {
sourceReplicas.add(replica);
}
}
}
}
return sourceReplicas;
}

static List<Replica> getReplicasOfNode(String nodeName, ClusterState state) {
List<Replica> sourceReplicas = new ArrayList<>();
for (Map.Entry<String, DocCollection> e : state.getCollectionsMap().entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public interface PlacementPlanFactory {
* org.apache.solr.cloud.api.collections.CreateShardCmd}, {@link
* org.apache.solr.cloud.api.collections.ReplaceNodeCmd}, {@link
* org.apache.solr.cloud.api.collections.MoveReplicaCmd}, {@link
* org.apache.solr.cloud.api.collections.MigrateReplicasCmd}, {@link
* org.apache.solr.cloud.api.collections.SplitShardCmd}, {@link
* org.apache.solr.cloud.api.collections.RestoreCmd}, {@link
* org.apache.solr.cloud.api.collections.MigrateCmd} as well as of {@link
Expand Down
Loading