Skip to content
4 changes: 3 additions & 1 deletion solr/CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,9 @@ when told to. The admin UI now tells it to. (Nazerke Seidan, David Smiley)

* SOLR-15834: Films example readme needs updating, including useParams support for multiple algorithms. (Eric Pugh)

* SOLR-15824 Improved Query Screen handling of raw query parameters. (Betul Ince via Tim Potter, Eric Pugh)
* SOLR-15824 Improved Query Screen handling of raw query parameters. (Betul Ince via Tim Potter, Eric Pugh)

* SOLR-15803: Compute concurrent replica assignment requests together, using the shared context to better distribute replicas. (Houston Putman)

Build
---------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ public static List<ReplicaPosition> buildReplicaPositions(SolrCloudManager cloud
int i = 0;
for (Map.Entry<Replica.Type, Integer> entry : replicaTypeVsCount.entrySet()) {
for (int j = 0; j < entry.getValue(); j++) {
positions.add(new ReplicaPosition(sliceName, i++, entry.getKey(), node));
positions.add(new ReplicaPosition(collectionName, sliceName, i++, entry.getKey(), node));
}
}
}
Expand Down
126 changes: 76 additions & 50 deletions solr/core/src/java/org/apache/solr/cloud/api/collections/Assign.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Locale;
Expand Down Expand Up @@ -251,15 +253,20 @@ public static List<String> getLiveOrLiveAndCreateNodeSetList(final Set<String> l

static class ReplicaCount {
public final String nodeName;
public int thisCollectionNodes = 0;
public int totalNodes = 0;
public Map<String, Integer> collectionReplicas;
public int totalReplicas = 0;

ReplicaCount(String nodeName) {
this.nodeName = nodeName;
this.collectionReplicas = new HashMap<>();
}

public int weight() {
return (thisCollectionNodes * 100) + totalNodes;
public int weight(String collection) {
return (collectionReplicas.getOrDefault(collection, 0) * 100) + totalReplicas;
}

public String nodeName() {
return nodeName;
}
}

Expand All @@ -274,8 +281,7 @@ public static List<ReplicaPosition> getNodesForNewReplicas(ClusterState clusterS
CoreContainer coreContainer) throws IOException, InterruptedException, AssignmentException {
log.debug("getNodesForNewReplicas() shard: {} , nrtReplicas : {} , tlogReplicas: {} , pullReplicas: {} , createNodeSet {}"
, shard, nrtReplicas, tlogReplicas, pullReplicas, createNodeSet);
DocCollection coll = clusterState.getCollection(collectionName);
List<String> createNodeList = null;
List<String> createNodeList;

if (createNodeSet instanceof List) {
createNodeList = (List<String>) createNodeSet;
Expand All @@ -297,28 +303,18 @@ public static List<ReplicaPosition> getNodesForNewReplicas(ClusterState clusterS
.assignPullReplicas(pullReplicas)
.onNodes(createNodeList)
.build();
AssignStrategy assignStrategy = createAssignStrategy(coreContainer, clusterState, coll);
AssignStrategy assignStrategy = createAssignStrategy(coreContainer);
return assignStrategy.assign(cloudManager, assignRequest);
}

static HashMap<String, ReplicaCount> getNodeNameVsShardCount(String collectionName,
ClusterState clusterState, List<String> createNodeList) {
HashMap<String, ReplicaCount> nodeNameVsShardCount = new HashMap<>();
List<String> liveNodes = createNodeList == null || createNodeList.isEmpty() ?
new ArrayList<>(clusterState.getLiveNodes()) :
checkLiveNodes(createNodeList, clusterState);
static void addNodeNameVsShardCount(ClusterState clusterState, HashMap<String, ReplicaCount> nodeNameVsShardCount) {
Collection<String> liveNodes = clusterState.getLiveNodes();

for (String s : liveNodes) {
nodeNameVsShardCount.put(s, new ReplicaCount(s));
}

// if we were given a list, just use that, don't worry about counts
if (createNodeList != null) {
return nodeNameVsShardCount;
nodeNameVsShardCount.putIfAbsent(s, new ReplicaCount(s));
}

// if we get here we were not given a createNodeList, build a map with real counts.
DocCollection coll = clusterState.getCollection(collectionName);
Map<String, DocCollection> collections = clusterState.getCollectionsMap();
for (Map.Entry<String, DocCollection> entry : collections.entrySet()) {
DocCollection c = entry.getValue();
Expand All @@ -328,16 +324,13 @@ static HashMap<String, ReplicaCount> getNodeNameVsShardCount(String collectionNa
for (Replica replica : replicas) {
ReplicaCount count = nodeNameVsShardCount.get(replica.getNodeName());
if (count != null) {
count.totalNodes++; // Used to "weigh" whether this node should be used later.
if (entry.getKey().equals(collectionName)) {
count.thisCollectionNodes++;
}
// Used to "weigh" whether this node should be used later.
count.collectionReplicas.merge(entry.getKey(), 1, Integer::sum);
count.totalReplicas++;
}
}
}
}

return nodeNameVsShardCount;
}

// throw an exception if any node in the supplied list is not live.
Expand Down Expand Up @@ -407,12 +400,32 @@ public interface AssignStrategy {

/**
* Assign new replicas to nodes.
* If multiple {@link AssignRequest}s are provided, then every {@link ReplicaPosition} made for an
* {@link AssignRequest} will be applied to the {@link SolrCloudManager}'s state when processing subsequent {@link AssignRequest}s.
* Therefore, the order in which {@link AssignRequest}s are provided can and will affect the {@link ReplicaPosition}s returned.
*
* @param solrCloudManager current instance of {@link SolrCloudManager}.
* @param assignRequest assign request.
* @param assignRequests assign request.
* @return list of {@link ReplicaPosition}-s for new replicas.
* @throws AssignmentException when assignment request cannot produce any valid assignments.
*/
List<ReplicaPosition> assign(SolrCloudManager solrCloudManager, AssignRequest assignRequest)
default List<ReplicaPosition> assign(SolrCloudManager solrCloudManager, AssignRequest... assignRequests)
throws AssignmentException, IOException, InterruptedException {
return assign(solrCloudManager, Arrays.asList(assignRequests));
}

/**
* Assign new replicas to nodes.
* If multiple {@link AssignRequest}s are provided, then every {@link ReplicaPosition} made for an
* {@link AssignRequest} will be applied to the {@link SolrCloudManager}'s state when processing subsequent {@link AssignRequest}s.
* Therefore, the order in which {@link AssignRequest}s are provided can and will affect the {@link ReplicaPosition}s returned.
*
* @param solrCloudManager current instance of {@link SolrCloudManager}.
* @param assignRequests list of assign requests to process together ().
* @return list of {@link ReplicaPosition}-s for new replicas.
* @throws AssignmentException when assignment request cannot produce any valid assignments.
*/
List<ReplicaPosition> assign(SolrCloudManager solrCloudManager, List<AssignRequest> assignRequests)
throws AssignmentException, IOException, InterruptedException;

/**
Expand Down Expand Up @@ -508,32 +521,45 @@ public AssignRequest build() {

public static class LegacyAssignStrategy implements AssignStrategy {
@Override
public List<ReplicaPosition> assign(SolrCloudManager solrCloudManager, AssignRequest assignRequest) throws Assign.AssignmentException, IOException, InterruptedException {
public List<ReplicaPosition> assign(SolrCloudManager solrCloudManager, List<AssignRequest> assignRequests) throws Assign.AssignmentException, IOException, InterruptedException {
ClusterState clusterState = solrCloudManager.getClusterStateProvider().getClusterState();
List<String> nodeList = assignRequest.nodes; // can this be empty list?

if (nodeList == null || nodeList.isEmpty()) {
HashMap<String, Assign.ReplicaCount> nodeNameVsShardCount =
Assign.getNodeNameVsShardCount(assignRequest.collectionName, clusterState, nodeList);
// if nodelist was empty, this map will be empty too. (passing null above however gets a full map)
ArrayList<Assign.ReplicaCount> sortedNodeList = new ArrayList<>(nodeNameVsShardCount.values());
sortedNodeList.sort(Comparator.comparingInt(Assign.ReplicaCount::weight));
nodeList = sortedNodeList.stream().map(replicaCount -> replicaCount.nodeName).collect(Collectors.toList());
}

// Throw an error if there aren't any live nodes.
checkAnyLiveNodes(nodeList, solrCloudManager.getClusterStateProvider().getClusterState());

int i = 0;
List<ReplicaPosition> result = new ArrayList<>();
for (String aShard : assignRequest.shardNames) {
for (Map.Entry<Replica.Type, Integer> e : countsPerReplicaType(assignRequest).entrySet()) {
for (int j = 0; j < e.getValue(); j++) {
result.add(new ReplicaPosition(aShard, j, e.getKey(), nodeList.get(i % nodeList.size())));
i++;

HashMap<String, Assign.ReplicaCount> nodeNameVsShardCount = new HashMap<>();
addNodeNameVsShardCount(clusterState, nodeNameVsShardCount);
for (AssignRequest assignRequest : assignRequests) {
Collection<ReplicaCount> replicaCounts = nodeNameVsShardCount.values();

if (assignRequest.nodes != null && !assignRequest.nodes.isEmpty()) {
// Throw an error if there are any non-live nodes.
checkLiveNodes(assignRequest.nodes, clusterState);
HashSet<String> nodeSet = new HashSet<>(assignRequest.nodes);
replicaCounts = replicaCounts.stream().filter(rc -> nodeSet.contains(rc.nodeName)).collect(Collectors.toList());
} else if (nodeNameVsShardCount.values().isEmpty()) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "There are no live nodes in the cluster");
}

for (String aShard : assignRequest.shardNames) {
// Reset the ordering of the nodes for each shard, using the replicas added in the previous shards and assign requests
List<String> nodeList = replicaCounts.stream()
.sorted(Comparator.<ReplicaCount>comparingInt(rc -> rc.weight(assignRequest.collectionName)).thenComparing(ReplicaCount::nodeName))
.map(ReplicaCount::nodeName)
.collect(Collectors.toList());
int i = 0;
for (Map.Entry<Replica.Type, Integer> e : countsPerReplicaType(assignRequest).entrySet()) {
for (int j = 0; j < e.getValue(); j++) {
String assignedNode = nodeList.get(i % nodeList.size());
result.add(new ReplicaPosition(assignRequest.collectionName, aShard, j, e.getKey(), assignedNode));
i++;
ReplicaCount replicaCount = nodeNameVsShardCount.computeIfAbsent(assignedNode, ReplicaCount::new);
replicaCount.totalReplicas++;
replicaCount.collectionReplicas.merge(assignRequest.collectionName, 1, Integer::sum);
}
}
}
}

return result;
}

Expand All @@ -553,11 +579,11 @@ private ImmutableMap<Replica.Type, Integer> countsPerReplicaType(AssignRequest a
* <p>If {@link PlacementPlugin} instance is null this call will return {@link LegacyAssignStrategy}, otherwise
* {@link PlacementPluginAssignStrategy} will be used.</p>
*/
public static AssignStrategy createAssignStrategy(CoreContainer coreContainer, ClusterState clusterState, DocCollection collection) {
public static AssignStrategy createAssignStrategy(CoreContainer coreContainer) {
PlacementPlugin placementPlugin = coreContainer.getPlacementPluginFactory().createPluginInstance();
if (placementPlugin != null) {
// If a cluster wide placement plugin is configured (and that's the only way to define a placement plugin)
return new PlacementPluginAssignStrategy(collection, placementPlugin);
return new PlacementPluginAssignStrategy(placementPlugin);
} else {
return new LegacyAssignStrategy();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,7 @@ public void call(ClusterState clusterState, ZkNodeProps message, NamedList<Objec

final List<ReplicaPosition> replicaPositions;
try {
replicaPositions = buildReplicaPositions(ccc.getCoreContainer(), ccc.getSolrCloudManager(), clusterState, newColl,
message, shardNames);
replicaPositions = buildReplicaPositions(ccc.getCoreContainer(), ccc.getSolrCloudManager(), clusterState, message, shardNames);
} catch (Assign.AssignmentException e) {
ZkNodeProps deleteMessage = new ZkNodeProps("name", collectionName);
new DeleteCollectionCmd(ccc).call(clusterState, deleteMessage, results);
Expand Down Expand Up @@ -389,7 +388,6 @@ public void call(ClusterState clusterState, ZkNodeProps message, NamedList<Objec
}

private static List<ReplicaPosition> buildReplicaPositions(CoreContainer coreContainer, SolrCloudManager cloudManager, ClusterState clusterState,
DocCollection docCollection,
ZkNodeProps message,
List<String> shardNames) throws IOException, InterruptedException, Assign.AssignmentException {
final String collectionName = message.getStr(NAME);
Expand Down Expand Up @@ -430,7 +428,7 @@ private static List<ReplicaPosition> buildReplicaPositions(CoreContainer coreCon
.assignPullReplicas(numPullReplicas)
.onNodes(nodeList)
.build();
Assign.AssignStrategy assignStrategy = Assign.createAssignStrategy(coreContainer, clusterState, docCollection);
Assign.AssignStrategy assignStrategy = Assign.createAssignStrategy(coreContainer);
replicaPositions = assignStrategy.assign(cloudManager, assignRequest);
}
return replicaPositions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public void call(ClusterState state, ZkNodeProps message, NamedList<Object> resu
// verify the placement modifications caused by the deletion are allowed
DocCollection coll = state.getCollectionOrNull(collection);
if (coll != null) {
Assign.AssignStrategy assignStrategy = Assign.createAssignStrategy(ccc.getCoreContainer(), state, coll);
Assign.AssignStrategy assignStrategy = Assign.createAssignStrategy(ccc.getCoreContainer());
assignStrategy.verifyDeleteCollection(ccc.getSolrCloudManager(), coll);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ void deleteReplica(ClusterState clusterState, ZkNodeProps message, NamedList<Obj
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"Invalid shard name : " + shard + " in collection : " + collectionName);
}
deleteCore(clusterState, coll, shard, replicaName, message, results, onComplete, parallel, true);
deleteCore(coll, shard, replicaName, message, results, onComplete, parallel, true);
}


Expand Down Expand Up @@ -145,7 +145,7 @@ void deleteReplicaBasedOnCount(ClusterState clusterState,
}

// verify that all replicas can be deleted
Assign.AssignStrategy assignStrategy = Assign.createAssignStrategy(ccc.getCoreContainer(), clusterState, coll);
Assign.AssignStrategy assignStrategy = Assign.createAssignStrategy(ccc.getCoreContainer());
for (Map.Entry<Slice, Set<String>> entry : shardToReplicasMapping.entrySet()) {
Slice shardSlice = entry.getKey();
String shardId = shardSlice.getName();
Expand All @@ -163,7 +163,7 @@ void deleteReplicaBasedOnCount(ClusterState clusterState,
for (String replica: replicas) {
log.debug("Deleting replica {} for shard {} based on count {}", replica, shardId, count);
// don't verify with the placement plugin - we already did it
deleteCore(clusterState, coll, shardId, replica, message, results, onComplete, parallel, false);
deleteCore(coll, shardId, replica, message, results, onComplete, parallel, false);
}
results.add("shard_id", shardId);
results.add("replicas_deleted", replicas);
Expand Down Expand Up @@ -220,7 +220,7 @@ private void validateReplicaAvailability(Slice slice, String shard, String colle
}
}

void deleteCore(ClusterState clusterState, DocCollection coll,
void deleteCore(DocCollection coll,
String shardId,
String replicaName,
ZkNodeProps message,
Expand Down Expand Up @@ -249,7 +249,7 @@ void deleteCore(ClusterState clusterState, DocCollection coll,

// verify that we are allowed to delete this replica
if (verifyPlacement) {
Assign.AssignStrategy assignStrategy = Assign.createAssignStrategy(ccc.getCoreContainer(), clusterState, coll);
Assign.AssignStrategy assignStrategy = Assign.createAssignStrategy(ccc.getCoreContainer());
assignStrategy.verifyDeleteReplicas(ccc.getSolrCloudManager(), coll, shardId, Set.of(replica));
}

Expand Down
Loading