Skip to content

Commit

Permalink
Merge branch 'apache:main' into solr-17263
Browse files Browse the repository at this point in the history
  • Loading branch information
andywebb1975 committed May 14, 2024
2 parents 362c595 + 1b582e9 commit 19c2dcb
Show file tree
Hide file tree
Showing 16 changed files with 194 additions and 135 deletions.
3 changes: 3 additions & 0 deletions solr/CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,9 @@ Bug Fixes

* SOLR-17261: Remove unintended timeout of 60 seconds for core loading. (Houston Putman)

* SOLR-17049: Actually mark all replicas down at startup and truly wait for them.
This includes replicas that might not exist anymore locally. (Houston Putman, Vincent Primault)

Dependency Upgrades
---------------------
(No changes)
Expand Down
66 changes: 27 additions & 39 deletions solr/core/src/java/org/apache/solr/cloud/ZkController.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -50,6 +51,7 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.client.solrj.impl.CloudLegacySolrClient;
Expand Down Expand Up @@ -1098,41 +1100,30 @@ public void publishAndWaitForDownStates() throws KeeperException, InterruptedExc
publishAndWaitForDownStates(WAIT_DOWN_STATES_TIMEOUT_SECONDS);
}

public void publishAndWaitForDownStates(int timeoutSeconds)
throws KeeperException, InterruptedException {

publishNodeAsDown(getNodeName());
public void publishAndWaitForDownStates(int timeoutSeconds) throws InterruptedException {
final String nodeName = getNodeName();

Set<String> collectionsWithLocalReplica = ConcurrentHashMap.newKeySet();
for (CoreDescriptor descriptor : cc.getCoreDescriptors()) {
collectionsWithLocalReplica.add(descriptor.getCloudDescriptor().getCollectionName());
}
Collection<String> collectionsWithLocalReplica = publishNodeAsDown(nodeName);
Map<String, Boolean> collectionsAlreadyVerified =
new ConcurrentHashMap<>(collectionsWithLocalReplica.size());

CountDownLatch latch = new CountDownLatch(collectionsWithLocalReplica.size());
for (String collectionWithLocalReplica : collectionsWithLocalReplica) {
zkStateReader.registerDocCollectionWatcher(
collectionWithLocalReplica,
(collectionState) -> {
if (collectionState == null) return false;
boolean foundStates = true;
for (CoreDescriptor coreDescriptor : cc.getCoreDescriptors()) {
if (coreDescriptor
.getCloudDescriptor()
.getCollectionName()
.equals(collectionWithLocalReplica)) {
Replica replica =
collectionState.getReplica(
coreDescriptor.getCloudDescriptor().getCoreNodeName());
if (replica == null || replica.getState() != Replica.State.DOWN) {
foundStates = false;
}
}
}

if (foundStates && collectionsWithLocalReplica.remove(collectionWithLocalReplica)) {
boolean allStatesCorrect =
Optional.ofNullable(collectionState.getReplicas(nodeName)).stream()
.flatMap(List::stream)
.allMatch(replica -> replica.getState() == Replica.State.DOWN);

if (allStatesCorrect
&& collectionsAlreadyVerified.putIfAbsent(collectionWithLocalReplica, true)
== null) {
latch.countDown();
}
return foundStates;
return allStatesCorrect;
});
}

Expand Down Expand Up @@ -2849,9 +2840,14 @@ public boolean checkIfCoreNodeNameAlreadyExists(CoreDescriptor dcore) {
* Best effort to set DOWN state for all replicas on node.
*
* @param nodeName to operate on
* @return the names of the collections that have replicas on the given node
*/
public void publishNodeAsDown(String nodeName) {
public Collection<String> publishNodeAsDown(String nodeName) {
log.info("Publish node={} as DOWN", nodeName);

ClusterState clusterState = getClusterState();
Map<String, List<Replica>> replicasPerCollectionOnNode =
clusterState.getReplicaNamesPerCollectionOnNode(nodeName);
if (distributedClusterStateUpdater.isDistributedStateUpdate()) {
// Note that with the current implementation, when distributed cluster state updates are
// enabled, we mark the node down synchronously from this thread, whereas the Overseer cluster
Expand All @@ -2862,24 +2858,15 @@ public void publishNodeAsDown(String nodeName) {
distributedClusterStateUpdater.executeNodeDownStateUpdate(nodeName, zkStateReader);
} else {
try {
// Create a concurrently accessible set to avoid repeating collections
Set<String> processedCollections = new HashSet<>();
for (CoreDescriptor cd : cc.getCoreDescriptors()) {
String collName = cd.getCollectionName();
for (String collName : replicasPerCollectionOnNode.keySet()) {
DocCollection coll;
if (collName != null
&& processedCollections.add(collName)
&& (coll = zkStateReader.getCollection(collName)) != null
&& coll.isPerReplicaState()) {
final List<String> replicasToDown = new ArrayList<>(coll.getSlicesMap().size());
coll.forEachReplica(
(s, replica) -> {
if (replica.getNodeName().equals(nodeName)) {
replicasToDown.add(replica.getName());
}
});
PerReplicaStatesOps.downReplicas(
replicasToDown,
replicasPerCollectionOnNode.get(collName).stream()
.map(Replica::getName)
.collect(Collectors.toList()),
PerReplicaStatesOps.fetch(
coll.getZNode(), zkClient, coll.getPerReplicaStates()))
.persist(coll.getZNode(), zkClient);
Expand All @@ -2904,6 +2891,7 @@ public void publishNodeAsDown(String nodeName) {
log.warn("Could not publish node as down: ", e);
}
}
return replicasPerCollectionOnNode.keySet();
}

/**
Expand Down
46 changes: 13 additions & 33 deletions solr/core/src/java/org/apache/solr/cloud/overseer/NodeMutator.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,9 @@

import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.common.cloud.ClusterState;
Expand Down Expand Up @@ -82,39 +80,21 @@ public static Optional<ZkWriteCommand> computeCollectionUpdate(
String nodeName, String collectionName, DocCollection docCollection, SolrZkClient client) {
boolean needToUpdateCollection = false;
List<String> downedReplicas = new ArrayList<>();
Map<String, Slice> slicesCopy = new LinkedHashMap<>(docCollection.getSlicesMap());
final Map<String, Slice> slicesCopy = new LinkedHashMap<>(docCollection.getSlicesMap());

for (Entry<String, Slice> sliceEntry : slicesCopy.entrySet()) {
Slice slice = sliceEntry.getValue();
Map<String, Replica> newReplicas = slice.getReplicasCopy();

Collection<Replica> replicas = slice.getReplicas();
for (Replica replica : replicas) {
String rNodeName = replica.getNodeName();
if (rNodeName == null) {
throw new RuntimeException("Replica without node name! " + replica);
}
if (rNodeName.equals(nodeName)) {
log.debug("Update replica state for {} to {}", replica, Replica.State.DOWN);
Map<String, Object> props = replica.shallowCopy();
Replica newReplica =
new Replica(
replica.getName(),
replica.node,
replica.collection,
slice.getName(),
replica.core,
Replica.State.DOWN,
replica.type,
props);
newReplicas.put(replica.getName(), newReplica);
needToUpdateCollection = true;
downedReplicas.add(replica.getName());
}
List<Replica> replicasOnNode = docCollection.getReplicas(nodeName);
if (replicasOnNode == null || replicasOnNode.isEmpty()) {
return Optional.empty();
}
for (Replica replica : replicasOnNode) {
if (replica.getState() != Replica.State.DOWN) {
log.debug("Update replica state for {} to {}", replica, Replica.State.DOWN);
needToUpdateCollection = true;
downedReplicas.add(replica.getName());
slicesCopy.computeIfPresent(
replica.getShard(),
(name, slice) -> slice.copyWith(replica.copyWith(Replica.State.DOWN)));
}

Slice newSlice = new Slice(slice.getName(), newReplicas, slice.shallowCopy(), collectionName);
sliceEntry.setValue(newSlice);
}

if (needToUpdateCollection) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.apache.solr.core.backup.Checksum;

/** Delegates to another {@link BackupRepository}. */
public class DelegatingBackupRepository implements BackupRepository {
public class DelegatingBackupRepository extends AbstractBackupRepository {

public static final String PARAM_DELEGATE_REPOSITORY_NAME = "delegateRepoName";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,12 @@ public static SolrCore getCore(
10,
TimeUnit.SECONDS,
docCollection -> {
for (Replica nodeNameSyntheticReplica :
docCollection.getReplicas(solrCall.cores.getZkController().getNodeName())) {
List<Replica> replicas =
docCollection.getReplicas(solrCall.cores.getZkController().getNodeName());
if (replicas == null || replicas.isEmpty()) {
return false;
}
for (Replica nodeNameSyntheticReplica : replicas) {
if (nodeNameSyntheticReplica.getState() == Replica.State.ACTIVE) {
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ public static ZkStateReader buildClusterState(String clusterDescription, String.
public static ZkStateReader buildClusterState(
String clusterDescription, int replicationFactor, String... liveNodes) {
Map<String, Slice> slices = null;
Map<String, Replica> replicas = null;
Map<String, Object> collectionProps = new HashMap<>();
collectionProps.put(ZkStateReader.REPLICATION_FACTOR, Integer.toString(replicationFactor));
Map<String, DocCollection> collectionStates = new HashMap<>();
Expand All @@ -138,9 +137,9 @@ public static ZkStateReader buildClusterState(
collectionStates.put(docCollection.getName(), docCollection);
break;
case "s":
replicas = new HashMap<>();
if (collName == null) collName = "collection" + (collectionStates.size() + 1);
slice = new Slice(sliceName = "slice" + (slices.size() + 1), replicas, null, collName);
slice =
new Slice(sliceName = "slice" + (slices.size() + 1), new HashMap<>(), null, collName);
slices.put(slice.getName(), slice);

// hack alert: the DocCollection constructor copies over active slices to its active slice
Expand Down Expand Up @@ -168,7 +167,7 @@ public static ZkStateReader buildClusterState(
// O(n^2) alert! but this is for mocks and testing so shouldn't be used for very large
// cluster states
boolean leaderFound = false;
for (Map.Entry<String, Replica> entry : replicas.entrySet()) {
for (Map.Entry<String, Replica> entry : slice.getReplicasMap().entrySet()) {
Replica value = entry.getValue();
if ("true".equals(value.get(ReplicaStateProps.LEADER))) {
leaderFound = true;
Expand All @@ -178,15 +177,13 @@ public static ZkStateReader buildClusterState(
if (!leaderFound && !m.group(1).equals("p")) {
replicaPropMap.put(ReplicaStateProps.LEADER, "true");
}
replica = new Replica(replicaName, replicaPropMap, collName, sliceName);
replicas.put(replica.getName(), replica);

// hack alert: re-create slice with existing data and new replicas map so that it updates
// its internal leader attribute
slice = new Slice(slice.getName(), replicas, null, collName);
slice = slice.copyWith(new Replica(replicaName, replicaPropMap, collName, sliceName));
slices.put(slice.getName(), slice);
// we don't need to update doc collection again because we aren't adding a new slice or
// changing its state
docCollection = docCollection.copyWithSlices(slices);
collectionStates.put(docCollection.getName(), docCollection);
break;
default:
break;
Expand Down
Loading

0 comments on commit 19c2dcb

Please sign in to comment.