From 2bd4df574a1984904c1eb68349818a9a8174b715 Mon Sep 17 00:00:00 2001 From: Bruno Roustant <33934988+bruno-roustant@users.noreply.github.com> Date: Mon, 13 May 2024 13:18:15 +0200 Subject: [PATCH 1/3] DelegatingBackupRepository extends AbstractBackupRepository. (#2444) --- .../solr/core/backup/repository/DelegatingBackupRepository.java | 2 +- .../cloud/api/collections/AbstractBackupRepositoryTest.java | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/core/backup/repository/DelegatingBackupRepository.java b/solr/core/src/java/org/apache/solr/core/backup/repository/DelegatingBackupRepository.java index a603c208cb8..e3b27cb073c 100644 --- a/solr/core/src/java/org/apache/solr/core/backup/repository/DelegatingBackupRepository.java +++ b/solr/core/src/java/org/apache/solr/core/backup/repository/DelegatingBackupRepository.java @@ -27,7 +27,7 @@ import org.apache.solr.core.backup.Checksum; /** Delegates to another {@link BackupRepository}. */ -public class DelegatingBackupRepository implements BackupRepository { +public class DelegatingBackupRepository extends AbstractBackupRepository { public static final String PARAM_DELEGATE_REPOSITORY_NAME = "delegateRepoName"; diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/api/collections/AbstractBackupRepositoryTest.java b/solr/test-framework/src/java/org/apache/solr/cloud/api/collections/AbstractBackupRepositoryTest.java index d988c5662bc..34b0d4a2b77 100644 --- a/solr/test-framework/src/java/org/apache/solr/cloud/api/collections/AbstractBackupRepositoryTest.java +++ b/solr/test-framework/src/java/org/apache/solr/cloud/api/collections/AbstractBackupRepositoryTest.java @@ -20,7 +20,6 @@ import static org.apache.lucene.codecs.CodecUtil.FOOTER_MAGIC; import static org.apache.lucene.codecs.CodecUtil.writeBEInt; import static org.apache.lucene.codecs.CodecUtil.writeBELong; -import static org.apache.solr.core.backup.repository.AbstractBackupRepository.PARAM_VERIFY_CHECKSUM; import static org.apache.solr.core.backup.repository.DelegatingBackupRepository.PARAM_DELEGATE_REPOSITORY_NAME; import java.io.File; From d141875f9b29c0b4a0e39dd0b53f1895809df3e1 Mon Sep 17 00:00:00 2001 From: David Smiley Date: Mon, 13 May 2024 22:34:38 -0400 Subject: [PATCH 2/3] SOLR-17066 Fix ShardSplitTest & StressHdfsTest (#2457) Clarify getBaseURL --- .../solr/cloud/api/collections/ShardSplitTest.java | 4 +--- .../org/apache/solr/hdfs/cloud/StressHdfsTest.java | 4 ++++ .../apache/solr/client/solrj/impl/HttpSolrClient.java | 1 + .../solr/cloud/AbstractBasicDistributedZkTestBase.java | 10 ---------- .../solr/cloud/AbstractFullDistribZkTestBase.java | 3 +++ .../java/org/apache/solr/embedded/JettySolrRunner.java | 5 +---- 6 files changed, 10 insertions(+), 17 deletions(-) diff --git a/solr/core/src/test/org/apache/solr/cloud/api/collections/ShardSplitTest.java b/solr/core/src/test/org/apache/solr/cloud/api/collections/ShardSplitTest.java index 517a718416c..59bad33ba96 100644 --- a/solr/core/src/test/org/apache/solr/cloud/api/collections/ShardSplitTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/api/collections/ShardSplitTest.java @@ -1277,9 +1277,7 @@ protected void splitShard( QueryRequest request = new QueryRequest(params); request.setPath("/admin/collections"); - String baseUrl = - ((HttpSolrClient) shardToJetty.get(SHARD1).get(0).client.getSolrClient()).getBaseURL(); - baseUrl = baseUrl.substring(0, baseUrl.length() - "collection1".length()); + String baseUrl = shardToJetty.get(SHARD1).get(0).jetty.getBaseUrl().toString(); try (SolrClient baseServer = new HttpSolrClient.Builder(baseUrl) diff --git a/solr/modules/hdfs/src/test/org/apache/solr/hdfs/cloud/StressHdfsTest.java b/solr/modules/hdfs/src/test/org/apache/solr/hdfs/cloud/StressHdfsTest.java index 72acd6f5343..7bb98b8d97b 100644 --- a/solr/modules/hdfs/src/test/org/apache/solr/hdfs/cloud/StressHdfsTest.java +++ b/solr/modules/hdfs/src/test/org/apache/solr/hdfs/cloud/StressHdfsTest.java @@ -251,4 +251,8 @@ private void createAndDeleteCollection() throws Exception { } } } + + protected String getBaseUrl(SolrClient client) { + return ((HttpSolrClient) client).getBaseURL(); + } } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrClient.java index 805bbd5f6f9..612d95b28ad 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrClient.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrClient.java @@ -801,6 +801,7 @@ public ModifiableSolrParams getInvariantParams() { return invariantParams; } + /** Typically looks like {@code http://localhost:8983/solr} (no core or collection) */ public String getBaseURL() { return baseUrl; } diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractBasicDistributedZkTestBase.java b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractBasicDistributedZkTestBase.java index 96d0c9952a3..91fe79b6c73 100644 --- a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractBasicDistributedZkTestBase.java +++ b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractBasicDistributedZkTestBase.java @@ -1115,16 +1115,6 @@ public static void createCollectionInOneInstance( } } - protected String getBaseUrl(SolrClient client) { - String url2 = - ((HttpSolrClient) client) - .getBaseURL() - .substring( - 0, - ((HttpSolrClient) client).getBaseURL().length() - DEFAULT_COLLECTION.length() - 1); - return url2; - } - @Override protected CollectionAdminResponse createCollection( Map> collectionInfos, diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java index 8641c4a2fab..e7aafb2ded5 100644 --- a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java +++ b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java @@ -159,7 +159,10 @@ public static class CloudJettyRunner { public JettySolrRunner jetty; public String nodeName; public String coreNodeName; + + /** Core or Collection URL */ public String url; + public CloudSolrServerClient client; public ZkNodeProps info; diff --git a/solr/test-framework/src/java/org/apache/solr/embedded/JettySolrRunner.java b/solr/test-framework/src/java/org/apache/solr/embedded/JettySolrRunner.java index edb7694abf1..cbfb23ffed9 100644 --- a/solr/test-framework/src/java/org/apache/solr/embedded/JettySolrRunner.java +++ b/solr/test-framework/src/java/org/apache/solr/embedded/JettySolrRunner.java @@ -834,10 +834,7 @@ public void setProxyPort(int proxyPort) { this.proxyPort = proxyPort; } - /** - * Returns a base URL consisting of the protocol, host, and port for a Connector in use by the - * Jetty Server contained in this runner. - */ + /** Returns a base URL like {@code http://localhost:8983/solr} */ public URL getBaseUrl() { try { return new URL(protocol, host, jettyPort, "/solr"); From 1b582e90de228bd5db3f0e884f0bcb7ec3de2868 Mon Sep 17 00:00:00 2001 From: Houston Putman Date: Tue, 14 May 2024 15:16:46 -0400 Subject: [PATCH 3/3] SOLR-17049: Fix Replica Down on startup logic (#2432) --- solr/CHANGES.txt | 3 + .../org/apache/solr/cloud/ZkController.java | 66 +++++------ .../solr/cloud/overseer/NodeMutator.java | 46 +++----- .../solr/servlet/CoordinatorHttpSolrCall.java | 8 +- .../solr/cloud/ClusterStateMockUtil.java | 15 +-- .../apache/solr/cloud/ZkControllerTest.java | 107 ++++++++++++++---- .../solr/common/cloud/ClusterState.java | 46 ++++++-- .../org/apache/solr/common/cloud/Replica.java | 8 +- 8 files changed, 183 insertions(+), 116 deletions(-) diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index d231a3d3475..30d340d11a4 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -127,6 +127,9 @@ Bug Fixes * SOLR-17261: Remove unintended timeout of 60 seconds for core loading. (Houston Putman) +* SOLR-17049: Actually mark all replicas down at startup and truly wait for them. + This includes replicas that might not exist anymore locally. (Houston Putman, Vincent Primault) + Dependency Upgrades --------------------- (No changes) diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java index a2377ceedd9..26e659d4db6 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java +++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java @@ -39,6 +39,7 @@ import java.util.Locale; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; @@ -50,6 +51,7 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; +import java.util.stream.Collectors; import org.apache.solr.client.solrj.SolrClient; import org.apache.solr.client.solrj.cloud.SolrCloudManager; import org.apache.solr.client.solrj.impl.CloudLegacySolrClient; @@ -1098,15 +1100,12 @@ public void publishAndWaitForDownStates() throws KeeperException, InterruptedExc publishAndWaitForDownStates(WAIT_DOWN_STATES_TIMEOUT_SECONDS); } - public void publishAndWaitForDownStates(int timeoutSeconds) - throws KeeperException, InterruptedException { - - publishNodeAsDown(getNodeName()); + public void publishAndWaitForDownStates(int timeoutSeconds) throws InterruptedException { + final String nodeName = getNodeName(); - Set collectionsWithLocalReplica = ConcurrentHashMap.newKeySet(); - for (CoreDescriptor descriptor : cc.getCoreDescriptors()) { - collectionsWithLocalReplica.add(descriptor.getCloudDescriptor().getCollectionName()); - } + Collection collectionsWithLocalReplica = publishNodeAsDown(nodeName); + Map collectionsAlreadyVerified = + new ConcurrentHashMap<>(collectionsWithLocalReplica.size()); CountDownLatch latch = new CountDownLatch(collectionsWithLocalReplica.size()); for (String collectionWithLocalReplica : collectionsWithLocalReplica) { @@ -1114,25 +1113,17 @@ public void publishAndWaitForDownStates(int timeoutSeconds) collectionWithLocalReplica, (collectionState) -> { if (collectionState == null) return false; - boolean foundStates = true; - for (CoreDescriptor coreDescriptor : cc.getCoreDescriptors()) { - if (coreDescriptor - .getCloudDescriptor() - .getCollectionName() - .equals(collectionWithLocalReplica)) { - Replica replica = - collectionState.getReplica( - coreDescriptor.getCloudDescriptor().getCoreNodeName()); - if (replica == null || replica.getState() != Replica.State.DOWN) { - foundStates = false; - } - } - } - - if (foundStates && collectionsWithLocalReplica.remove(collectionWithLocalReplica)) { + boolean allStatesCorrect = + Optional.ofNullable(collectionState.getReplicas(nodeName)).stream() + .flatMap(List::stream) + .allMatch(replica -> replica.getState() == Replica.State.DOWN); + + if (allStatesCorrect + && collectionsAlreadyVerified.putIfAbsent(collectionWithLocalReplica, true) + == null) { latch.countDown(); } - return foundStates; + return allStatesCorrect; }); } @@ -2849,9 +2840,14 @@ public boolean checkIfCoreNodeNameAlreadyExists(CoreDescriptor dcore) { * Best effort to set DOWN state for all replicas on node. * * @param nodeName to operate on + * @return the names of the collections that have replicas on the given node */ - public void publishNodeAsDown(String nodeName) { + public Collection publishNodeAsDown(String nodeName) { log.info("Publish node={} as DOWN", nodeName); + + ClusterState clusterState = getClusterState(); + Map> replicasPerCollectionOnNode = + clusterState.getReplicaNamesPerCollectionOnNode(nodeName); if (distributedClusterStateUpdater.isDistributedStateUpdate()) { // Note that with the current implementation, when distributed cluster state updates are // enabled, we mark the node down synchronously from this thread, whereas the Overseer cluster @@ -2862,24 +2858,15 @@ public void publishNodeAsDown(String nodeName) { distributedClusterStateUpdater.executeNodeDownStateUpdate(nodeName, zkStateReader); } else { try { - // Create a concurrently accessible set to avoid repeating collections - Set processedCollections = new HashSet<>(); - for (CoreDescriptor cd : cc.getCoreDescriptors()) { - String collName = cd.getCollectionName(); + for (String collName : replicasPerCollectionOnNode.keySet()) { DocCollection coll; if (collName != null - && processedCollections.add(collName) && (coll = zkStateReader.getCollection(collName)) != null && coll.isPerReplicaState()) { - final List replicasToDown = new ArrayList<>(coll.getSlicesMap().size()); - coll.forEachReplica( - (s, replica) -> { - if (replica.getNodeName().equals(nodeName)) { - replicasToDown.add(replica.getName()); - } - }); PerReplicaStatesOps.downReplicas( - replicasToDown, + replicasPerCollectionOnNode.get(collName).stream() + .map(Replica::getName) + .collect(Collectors.toList()), PerReplicaStatesOps.fetch( coll.getZNode(), zkClient, coll.getPerReplicaStates())) .persist(coll.getZNode(), zkClient); @@ -2904,6 +2891,7 @@ public void publishNodeAsDown(String nodeName) { log.warn("Could not publish node as down: ", e); } } + return replicasPerCollectionOnNode.keySet(); } /** diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/NodeMutator.java b/solr/core/src/java/org/apache/solr/cloud/overseer/NodeMutator.java index 8afcd805233..f54d2569ef5 100644 --- a/solr/core/src/java/org/apache/solr/cloud/overseer/NodeMutator.java +++ b/solr/core/src/java/org/apache/solr/cloud/overseer/NodeMutator.java @@ -18,11 +18,9 @@ import java.lang.invoke.MethodHandles; import java.util.ArrayList; -import java.util.Collection; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.Optional; import org.apache.solr.client.solrj.cloud.SolrCloudManager; import org.apache.solr.common.cloud.ClusterState; @@ -82,39 +80,21 @@ public static Optional computeCollectionUpdate( String nodeName, String collectionName, DocCollection docCollection, SolrZkClient client) { boolean needToUpdateCollection = false; List downedReplicas = new ArrayList<>(); - Map slicesCopy = new LinkedHashMap<>(docCollection.getSlicesMap()); + final Map slicesCopy = new LinkedHashMap<>(docCollection.getSlicesMap()); - for (Entry sliceEntry : slicesCopy.entrySet()) { - Slice slice = sliceEntry.getValue(); - Map newReplicas = slice.getReplicasCopy(); - - Collection replicas = slice.getReplicas(); - for (Replica replica : replicas) { - String rNodeName = replica.getNodeName(); - if (rNodeName == null) { - throw new RuntimeException("Replica without node name! " + replica); - } - if (rNodeName.equals(nodeName)) { - log.debug("Update replica state for {} to {}", replica, Replica.State.DOWN); - Map props = replica.shallowCopy(); - Replica newReplica = - new Replica( - replica.getName(), - replica.node, - replica.collection, - slice.getName(), - replica.core, - Replica.State.DOWN, - replica.type, - props); - newReplicas.put(replica.getName(), newReplica); - needToUpdateCollection = true; - downedReplicas.add(replica.getName()); - } + List replicasOnNode = docCollection.getReplicas(nodeName); + if (replicasOnNode == null || replicasOnNode.isEmpty()) { + return Optional.empty(); + } + for (Replica replica : replicasOnNode) { + if (replica.getState() != Replica.State.DOWN) { + log.debug("Update replica state for {} to {}", replica, Replica.State.DOWN); + needToUpdateCollection = true; + downedReplicas.add(replica.getName()); + slicesCopy.computeIfPresent( + replica.getShard(), + (name, slice) -> slice.copyWith(replica.copyWith(Replica.State.DOWN))); } - - Slice newSlice = new Slice(slice.getName(), newReplicas, slice.shallowCopy(), collectionName); - sliceEntry.setValue(newSlice); } if (needToUpdateCollection) { diff --git a/solr/core/src/java/org/apache/solr/servlet/CoordinatorHttpSolrCall.java b/solr/core/src/java/org/apache/solr/servlet/CoordinatorHttpSolrCall.java index aa4a0e2fd75..ec00107b717 100644 --- a/solr/core/src/java/org/apache/solr/servlet/CoordinatorHttpSolrCall.java +++ b/solr/core/src/java/org/apache/solr/servlet/CoordinatorHttpSolrCall.java @@ -163,8 +163,12 @@ public static SolrCore getCore( 10, TimeUnit.SECONDS, docCollection -> { - for (Replica nodeNameSyntheticReplica : - docCollection.getReplicas(solrCall.cores.getZkController().getNodeName())) { + List replicas = + docCollection.getReplicas(solrCall.cores.getZkController().getNodeName()); + if (replicas == null || replicas.isEmpty()) { + return false; + } + for (Replica nodeNameSyntheticReplica : replicas) { if (nodeNameSyntheticReplica.getState() == Replica.State.ACTIVE) { return true; } diff --git a/solr/core/src/test/org/apache/solr/cloud/ClusterStateMockUtil.java b/solr/core/src/test/org/apache/solr/cloud/ClusterStateMockUtil.java index 674fe60b8a6..51a3c3263f5 100644 --- a/solr/core/src/test/org/apache/solr/cloud/ClusterStateMockUtil.java +++ b/solr/core/src/test/org/apache/solr/cloud/ClusterStateMockUtil.java @@ -111,7 +111,6 @@ public static ZkStateReader buildClusterState(String clusterDescription, String. public static ZkStateReader buildClusterState( String clusterDescription, int replicationFactor, String... liveNodes) { Map slices = null; - Map replicas = null; Map collectionProps = new HashMap<>(); collectionProps.put(ZkStateReader.REPLICATION_FACTOR, Integer.toString(replicationFactor)); Map collectionStates = new HashMap<>(); @@ -138,9 +137,9 @@ public static ZkStateReader buildClusterState( collectionStates.put(docCollection.getName(), docCollection); break; case "s": - replicas = new HashMap<>(); if (collName == null) collName = "collection" + (collectionStates.size() + 1); - slice = new Slice(sliceName = "slice" + (slices.size() + 1), replicas, null, collName); + slice = + new Slice(sliceName = "slice" + (slices.size() + 1), new HashMap<>(), null, collName); slices.put(slice.getName(), slice); // hack alert: the DocCollection constructor copies over active slices to its active slice @@ -168,7 +167,7 @@ public static ZkStateReader buildClusterState( // O(n^2) alert! but this is for mocks and testing so shouldn't be used for very large // cluster states boolean leaderFound = false; - for (Map.Entry entry : replicas.entrySet()) { + for (Map.Entry entry : slice.getReplicasMap().entrySet()) { Replica value = entry.getValue(); if ("true".equals(value.get(ReplicaStateProps.LEADER))) { leaderFound = true; @@ -178,15 +177,13 @@ public static ZkStateReader buildClusterState( if (!leaderFound && !m.group(1).equals("p")) { replicaPropMap.put(ReplicaStateProps.LEADER, "true"); } - replica = new Replica(replicaName, replicaPropMap, collName, sliceName); - replicas.put(replica.getName(), replica); // hack alert: re-create slice with existing data and new replicas map so that it updates // its internal leader attribute - slice = new Slice(slice.getName(), replicas, null, collName); + slice = slice.copyWith(new Replica(replicaName, replicaPropMap, collName, sliceName)); slices.put(slice.getName(), slice); - // we don't need to update doc collection again because we aren't adding a new slice or - // changing its state + docCollection = docCollection.copyWithSlices(slices); + collectionStates.put(docCollection.getName(), docCollection); break; default: break; diff --git a/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java b/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java index f1df3febab4..ff892a74f00 100644 --- a/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java @@ -23,8 +23,12 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Path; +import java.time.Duration; +import java.time.Instant; import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.Optional; import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; @@ -34,7 +38,9 @@ import org.apache.solr.SolrTestCaseJ4; import org.apache.solr.common.MapWriter; import org.apache.solr.common.cloud.ClusterProperties; +import org.apache.solr.common.cloud.ClusterState; import org.apache.solr.common.cloud.DocCollection; +import org.apache.solr.common.cloud.Replica; import org.apache.solr.common.cloud.SolrZkClient; import org.apache.solr.common.cloud.ZkNodeProps; import org.apache.solr.common.cloud.ZkStateReader; @@ -55,21 +61,15 @@ import org.apache.solr.util.LogLevel; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.data.Stat; -import org.junit.AfterClass; -import org.junit.BeforeClass; +import org.hamcrest.Matchers; import org.junit.Test; @SolrTestCaseJ4.SuppressSSL -public class ZkControllerTest extends SolrTestCaseJ4 { +public class ZkControllerTest extends SolrCloudTestCase { static final int TIMEOUT = 10000; - @BeforeClass - public static void beforeClass() {} - - @AfterClass - public static void afterClass() {} - + @Test public void testNodeNameUrlConversion() throws Exception { // nodeName from parts @@ -152,6 +152,7 @@ public void testNodeNameUrlConversion() throws Exception { } } + @Test public void testGetHostName() throws Exception { Path zkDir = createTempDir("zkData"); @@ -180,6 +181,7 @@ public void testGetHostName() throws Exception { } @LogLevel(value = "org.apache.solr.cloud=DEBUG;org.apache.solr.cloud.overseer=DEBUG") + @Test public void testPublishAndWaitForDownStates() throws Exception { /* @@ -197,9 +199,8 @@ cores are down then the method will return immediately but if it uses coreNodeNa String nodeName = "127.0.0.1:8983_solr"; - ZkTestServer server = new ZkTestServer(zkDir); try { - server.run(); + cluster = configureCluster(1).configure(); AtomicReference zkControllerRef = new AtomicReference<>(); CoreContainer cc = @@ -223,9 +224,16 @@ public List getCoreDescriptors() { ZkController zkController = null; try { - CloudConfig cloudConfig = new CloudConfig.CloudConfigBuilder("127.0.0.1", 8983).build(); + CloudConfig cloudConfig = + new CloudConfig.CloudConfigBuilder("127.0.0.1", 8983) + .setUseDistributedClusterStateUpdates( + Boolean.getBoolean("solr.distributedClusterStateUpdates")) + .setUseDistributedCollectionConfigSetExecution( + Boolean.getBoolean("solr.distributedCollectionConfigSetExecution")) + .build(); zkController = - new ZkController(cc, server.getZkAddress(), TIMEOUT, cloudConfig, () -> null); + new ZkController( + cc, cluster.getZkServer().getZkAddress(), TIMEOUT, cloudConfig, () -> null); zkControllerRef.set(zkController); zkController @@ -258,6 +266,7 @@ public List getCoreDescriptors() { zkController.getOverseerJobQueue().offer(Utils.toJSON(m)); } + // Add an active replica that shares the same core name, but on a non existent host MapWriter propMap = ew -> ew.put(Overseer.QUEUE_OPERATION, ADDREPLICA.toLower()) @@ -279,6 +288,7 @@ public List getCoreDescriptors() { zkController.getOverseerJobQueue().offer(propMap); } + // Add an down replica that shares the same core name, also on a non existent host propMap = ew -> ew.put(Overseer.QUEUE_OPERATION, ADDREPLICA.toLower()) @@ -299,20 +309,77 @@ public List getCoreDescriptors() { zkController.getOverseerJobQueue().offer(propMap); } - zkController.getZkStateReader().forciblyRefreshAllClusterStateSlow(); + // Add an active replica on the existing host. This replica will exist in the cluster state + // but not + // on the disk. We are testing that this replica is also put to "DOWN" even though it + // doesn't exist locally. + propMap = + ew -> + ew.put(Overseer.QUEUE_OPERATION, ADDREPLICA.toLower()) + .put(COLLECTION_PROP, collectionName) + .put(SHARD_ID_PROP, "shard1") + .put(ZkStateReader.NODE_NAME_PROP, nodeName) + .put(ZkStateReader.CORE_NAME_PROP, collectionName + "-not-on-disk") + .put(ZkStateReader.STATE_PROP, "active"); + if (zkController.getDistributedClusterStateUpdater().isDistributedStateUpdate()) { + zkController + .getDistributedClusterStateUpdater() + .doSingleStateUpdate( + DistributedClusterStateUpdater.MutatingCommand.SliceAddReplica, + new ZkNodeProps(propMap), + zkController.getSolrCloudManager(), + zkController.getZkStateReader()); + } else { + zkController.getOverseerJobQueue().offer(propMap); + } - long now = System.nanoTime(); - long timeout = now + TimeUnit.NANOSECONDS.convert(5, TimeUnit.SECONDS); + // Wait for the overseer to process all the replica additions + if (!zkController.getDistributedClusterStateUpdater().isDistributedStateUpdate()) { + zkController + .getZkStateReader() + .waitForState( + collectionName, + 10, + TimeUnit.SECONDS, + ((liveNodes, collectionState) -> + Optional.ofNullable(collectionState) + .map(DocCollection::getReplicas) + .map(List::size) + .orElse(0) + == 3)); + } + + Instant now = Instant.now(); zkController.publishAndWaitForDownStates(5); - assertTrue( - "The ZkController.publishAndWaitForDownStates should have timed out but it didn't", - System.nanoTime() >= timeout); + assertThat( + "The ZkController.publishAndWaitForDownStates should not have timed out but it did", + Duration.between(now, Instant.now()), + Matchers.lessThanOrEqualTo(Duration.ofSeconds(5))); + + zkController.getZkStateReader().forciblyRefreshAllClusterStateSlow(); + ClusterState clusterState = zkController.getClusterState(); + + Map> replicasOnNode = + clusterState.getReplicaNamesPerCollectionOnNode(nodeName); + assertNotNull("There should be replicas on the existing node", replicasOnNode); + List replicas = replicasOnNode.get(collectionName); + assertNotNull("There should be replicas for the collection on the existing node", replicas); + assertEquals( + "Wrong number of replicas for the collection on the existing node", 1, replicas.size()); + for (Replica replica : replicas) { + assertEquals( + "Replica " + + replica.getName() + + " is not DOWN, even though it is on the node that should be DOWN", + Replica.State.DOWN, + replica.getState()); + } } finally { if (zkController != null) zkController.close(); cc.shutdown(); } } finally { - server.shutdown(); + cluster.shutdown(); } } diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java index 83898f57631..5f7a23f2ed0 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java @@ -25,8 +25,11 @@ import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; @@ -176,34 +179,53 @@ public Set getLiveNodes() { return Collections.unmodifiableSet(liveNodes); } + @Deprecated public String getShardId(String nodeName, String coreName) { return getShardId(null, nodeName, coreName); } + @Deprecated public String getShardId(String collectionName, String nodeName, String coreName) { - Collection states = collectionStates.values(); + if (coreName == null || nodeName == null) { + return null; + } + Collection states = Collections.emptyList(); if (collectionName != null) { CollectionRef c = collectionStates.get(collectionName); if (c != null) states = Collections.singletonList(c); + } else { + states = collectionStates.values(); } for (CollectionRef ref : states) { DocCollection coll = ref.get(); - if (coll == null) continue; // this collection go tremoved in between, skip - for (Slice slice : coll.getSlices()) { - for (Replica replica : slice.getReplicas()) { - // TODO: for really large clusters, we could 'index' on this - String rnodeName = replica.getStr(ReplicaStateProps.NODE_NAME); - String rcore = replica.getStr(ReplicaStateProps.CORE_NAME); - if (nodeName.equals(rnodeName) && coreName.equals(rcore)) { - return slice.getName(); - } - } - } + if (coll == null) continue; // this collection got removed in between, skip + // TODO: for really large clusters, we could 'index' on this + return Optional.ofNullable(coll.getReplicas(nodeName)).stream() + .flatMap(List::stream) + .filter(r -> coreName.equals(r.getStr(ReplicaStateProps.CORE_NAME))) + .map(Replica::getShard) + .findAny() + .orElse(null); } return null; } + public Map> getReplicaNamesPerCollectionOnNode(final String nodeName) { + Map> replicaNamesPerCollectionOnNode = new HashMap<>(); + collectionStates.values().stream() + .map(CollectionRef::get) + .filter(Objects::nonNull) + .forEach( + col -> { + List replicas = col.getReplicas(nodeName); + if (replicas != null && !replicas.isEmpty()) { + replicaNamesPerCollectionOnNode.put(col.getName(), replicas); + } + }); + return replicaNamesPerCollectionOnNode; + } + /** Check if node is alive. */ public boolean liveNodesContain(String name) { return liveNodes.contains(name); diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java b/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java index 0d4cd3afffb..e9c41df8c51 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java @@ -369,7 +369,7 @@ public String getProperty(String propertyName) { } public Replica copyWith(PerReplicaStates.State state) { - log.debug("A replica is updated with new state : {}", state); + log.debug("A replica is updated with new PRS state : {}", state); Map props = new LinkedHashMap<>(propMap); if (state == null) { props.put(ReplicaStateProps.STATE, State.DOWN.toString()); @@ -382,6 +382,12 @@ public Replica copyWith(PerReplicaStates.State state) { return r; } + public Replica copyWith(State state) { + Replica r = new Replica(name, propMap, collection, shard); + r.setState(state); + return r; + } + public PerReplicaStates.State getReplicaState() { if (perReplicaStatesRef != null) { return perReplicaStatesRef.get().get(name);