Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
title: SolrCloud admin commands should more reliably be fully completed when Solr returns a response.
The `waitForFinalState` param now defaults to true and is deprecated. (Abhishek Umarjikar)
type: changed
authors:
- name: Abhishek Umarjikar
- name: David Smiley
links:
- name: SOLR-17712
url: https://issues.apache.org/jira/browse/SOLR-17712
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@ public class BalanceReplicasRequestBody {

public BalanceReplicasRequestBody() {}

public BalanceReplicasRequestBody(Set<String> nodes, Boolean waitForFinalState, String async) {
public BalanceReplicasRequestBody(Set<String> nodes, String async) {
this.nodes = nodes;
this.waitForFinalState = waitForFinalState;
this.async = async;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import org.apache.solr.common.SolrCloseableLatch;
import org.apache.solr.common.cloud.CollectionStateWatcher;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.PerReplicaStates;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkStateReader;
Expand All @@ -45,6 +47,7 @@ public class ActiveReplicaWatcher implements CollectionStateWatcher {
private final List<Replica> activeReplicas = new ArrayList<>();

private int lastZkVersion = -1;
private PerReplicaStates lastPrs;

private SolrCloseableLatch latch;

Expand Down Expand Up @@ -150,11 +153,13 @@ public synchronized boolean onStateChanged(Set<String> liveNodes, DocCollection
log.debug("-- already done, exiting...");
return true;
}
if (collectionState.getZNodeVersion() == lastZkVersion) {
if (collectionState.getZNodeVersion() == lastZkVersion
&& Objects.equals(lastPrs, collectionState.getPerReplicaStates())) {
Comment on lines +156 to +157
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PRS wasn't working with waitForFinalState until this

log.debug("-- spurious call with already seen zkVersion= {}, ignoring...", lastZkVersion);
return false;
}
lastZkVersion = collectionState.getZNodeVersion();
lastPrs = collectionState.getPerReplicaStates();

for (Slice slice : collectionState.getSlices()) {
for (Replica replica : slice.getReplicas()) {
Expand Down
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

quite a few SolrCloud commands directly call AddReplicaCmd...

Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ List<ZkNodeProps> addReplica(
"Collection: " + collectionName + " shard: " + shard + " does not exist");
}

boolean waitForFinalState = message.getBool(WAIT_FOR_FINAL_STATE, false);
boolean waitForFinalState = message.getBool(WAIT_FOR_FINAL_STATE, true);
boolean skipCreateReplicaInClusterState =
message.getBool(SKIP_CREATE_REPLICA_IN_CLUSTER_STATE, false);
final String asyncId = message.getStr(ASYNC);
Expand Down Expand Up @@ -177,33 +177,41 @@ List<ZkNodeProps> addReplica(
shardRequestTracker.sendShardRequest(createReplica.node, params, shardHandler);
}

SolrCloseableLatch waitForFinalStateLatch =
new SolrCloseableLatch(totalReplicas, ccc.getCloseableToLatchOn());

Runnable runnable =
() -> {
shardRequestTracker.processResponses(
results, shardHandler, true, "ADDREPLICA failed to create replica");
for (CreateReplica replica : createReplicas) {
CollectionHandlingUtils.waitForCoreNodeName(
collectionName, replica.node, replica.coreName, ccc.getZkStateReader());
// if there were errors, don't do any more waiting
if (results.get("failure") != null) {
while (waitForFinalStateLatch.getCount() > 0) {
waitForFinalStateLatch.countDown();
}
} else {
for (CreateReplica replica : createReplicas) {
CollectionHandlingUtils.waitForCoreNodeName(
collectionName, replica.node, replica.coreName, ccc.getZkStateReader());
}
}
if (onComplete != null) onComplete.run();
};

if (!parallel || waitForFinalState) {
if (waitForFinalState) {
SolrCloseableLatch latch =
new SolrCloseableLatch(totalReplicas, ccc.getCloseableToLatchOn());
ActiveReplicaWatcher watcher =
new ActiveReplicaWatcher(
collectionName,
null,
createReplicas.stream()
.map(createReplica -> createReplica.coreName)
.collect(Collectors.toList()),
latch);
waitForFinalStateLatch);
try {
zkStateReader.registerCollectionStateWatcher(collectionName, watcher);
runnable.run();
if (!latch.await(timeout, TimeUnit.SECONDS)) {
if (!waitForFinalStateLatch.await(timeout, TimeUnit.SECONDS)) {
throw new SolrException(
SolrException.ErrorCode.SERVER_ERROR,
"Timeout waiting " + timeout + " seconds for replica to become active.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public void call(ClusterState state, ZkNodeProps message, NamedList<Object> resu
"'nodes' was not passed as a correct type (Set/List/String): "
+ nodesRaw.getClass().getName());
}
boolean waitForFinalState = message.getBool(CommonAdminParams.WAIT_FOR_FINAL_STATE, false);
boolean waitForFinalState = message.getBool(CommonAdminParams.WAIT_FOR_FINAL_STATE, true);
String async = message.getStr(ASYNC);
int timeout = message.getInt("timeout", 10 * 60); // 10 minutes
boolean parallel = message.getBool("parallel", false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,18 @@ public void sendShardRequest(
shardHandler.submit(sreq, replica, sreq.params);
}

/**
* Processes all responses (waiting, if necessary), populating "success" or "failure" keyed data
* into {@code results} per shard response in turn keyed by node. If {@code asyncId} mode, will
* wait for their completion.
*
* @param results will hold shard request results aggregated to "success" and "failure" keys.
* @param abortOnError if any shard request has an exception, throw it. This doesn't apply to
* "STATUS=failed" responses.
* @param msgOnError the exception message for {@code abortOnError}
*/
// TODO it's too confusing how we sometime abort on error yet if async or find STATUS=FAILED not
// Recommend not doing any abort. Have a separate utility method that looks for "failure".
void processResponses(
NamedList<Object> results,
ShardHandler shardHandler,
Expand Down Expand Up @@ -698,8 +710,8 @@ void processResponses(
} while (srsp != null);

// If request is async wait for the core admin to complete before returning
// note: ignore abortOnError when async
if (asyncId != null) {
// TODO: Shouldn't we abort with msgOnError exception when failure?
waitForAsyncCallsToComplete(results);
shardAsyncIdByNode.clear();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public void call(ClusterState clusterState, ZkNodeProps message, NamedList<Objec
}
final Aliases aliases = ccc.getZkStateReader().getAliases();
final String collectionName = message.getStr(NAME);
final boolean waitForFinalState = message.getBool(WAIT_FOR_FINAL_STATE, false);
final boolean waitForFinalState = message.getBool(WAIT_FOR_FINAL_STATE, true);
final String alias = message.getStr(ALIAS, collectionName);
log.info("Create collection {}", collectionName);
boolean prsDefault = EnvUtils.getPropertyAsBool(PRS_DEFAULT_PROP, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public void call(ClusterState clusterState, ZkNodeProps message, NamedList<Objec
throws Exception {
String extCollectionName = message.getStr(COLLECTION_PROP);
String sliceName = message.getStr(SHARD_ID_PROP);
boolean waitForFinalState = message.getBool(CommonAdminParams.WAIT_FOR_FINAL_STATE, false);
boolean waitForFinalState = message.getBool(CommonAdminParams.WAIT_FOR_FINAL_STATE, true);

log.info("Create shard invoked: {}", message);
if (extCollectionName == null || sliceName == null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public void call(ClusterState state, ZkNodeProps message, NamedList<Object> resu
ZkStateReader zkStateReader = ccc.getZkStateReader();
Set<String> sourceNodes = getNodesFromParam(message, CollectionParams.SOURCE_NODES);
Set<String> targetNodes = getNodesFromParam(message, CollectionParams.TARGET_NODES);
boolean waitForFinalState = message.getBool(CommonAdminParams.WAIT_FOR_FINAL_STATE, false);
boolean waitForFinalState = message.getBool(CommonAdminParams.WAIT_FOR_FINAL_STATE, true);
if (sourceNodes.isEmpty()) {
throw new SolrException(
SolrException.ErrorCode.BAD_REQUEST, "sourceNodes is a required param");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ private void moveReplica(
CollectionHandlingUtils.checkRequired(message, COLLECTION_PROP, CollectionParams.TARGET_NODE);
String extCollection = message.getStr(COLLECTION_PROP);
String targetNode = message.getStr(CollectionParams.TARGET_NODE);
boolean waitForFinalState = message.getBool(WAIT_FOR_FINAL_STATE, false);
boolean waitForFinalState = message.getBool(WAIT_FOR_FINAL_STATE, true);
boolean inPlaceMove = message.getBool(IN_PLACE_MOVE, true);
int timeout = message.getInt(TIMEOUT, 10 * 60); // 10 minutes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public void call(ClusterState state, ZkNodeProps message, NamedList<Object> resu
ZkStateReader zkStateReader = ccc.getZkStateReader();
String source = message.getStr(CollectionParams.SOURCE_NODE);
String target = message.getStr(CollectionParams.TARGET_NODE);
boolean waitForFinalState = message.getBool(CommonAdminParams.WAIT_FOR_FINAL_STATE, false);
boolean waitForFinalState = message.getBool(CommonAdminParams.WAIT_FOR_FINAL_STATE, true);
if (source == null) {
throw new SolrException(
SolrException.ErrorCode.BAD_REQUEST, "sourceNode is a required param");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public boolean split(ClusterState clusterState, ZkNodeProps message, NamedList<O
throws Exception {
final String asyncId = message.getStr(ASYNC);

boolean waitForFinalState = message.getBool(CommonAdminParams.WAIT_FOR_FINAL_STATE, false);
boolean waitForFinalState = message.getBool(CommonAdminParams.WAIT_FOR_FINAL_STATE, true);
String methodStr =
message.getStr(
CommonAdminParams.SPLIT_METHOD, SolrIndexSplitter.SplitMethod.REWRITE.toLower());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1154,7 +1154,6 @@ public Map<String, Object> execute(
final RequiredSolrParams requiredParams = req.getParams().required();
final var requestBody = new ReplaceNodeRequestBody();
requestBody.targetNodeName = params.get(TARGET_NODE);
requestBody.waitForFinalState = params.getBool(WAIT_FOR_FINAL_STATE);
requestBody.async = params.get(ASYNC);
final ReplaceNode replaceNodeAPI = new ReplaceNode(h.coreContainer, req, rsp);
final SolrJerseyResponse replaceNodeResponse =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import static org.apache.solr.cloud.Overseer.QUEUE_OPERATION;
import static org.apache.solr.common.params.CollectionParams.NODES;
import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
import static org.apache.solr.common.params.CommonAdminParams.WAIT_FOR_FINAL_STATE;
import static org.apache.solr.handler.admin.CollectionsHandler.DEFAULT_COLLECTION_OP_TIMEOUT;
import static org.apache.solr.security.PermissionNameProvider.Name.COLL_EDIT_PERM;

Expand Down Expand Up @@ -75,7 +74,6 @@ public ZkNodeProps createRemoteMessage(BalanceReplicasRequestBody requestBody) {
final Map<String, Object> remoteMessage = new HashMap<>();
if (requestBody != null) {
insertIfNotNull(remoteMessage, NODES, requestBody.nodes);
insertIfNotNull(remoteMessage, WAIT_FOR_FINAL_STATE, requestBody.waitForFinalState);
insertIfNotNull(remoteMessage, ASYNC, requestBody.async);
}
remoteMessage.put(QUEUE_OPERATION, CollectionAction.BALANCE_REPLICAS.toLower());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import static org.apache.solr.common.params.CollectionAdminParams.REPLICATION_FACTOR;
import static org.apache.solr.common.params.CollectionAdminParams.TLOG_REPLICAS;
import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
import static org.apache.solr.common.params.CommonAdminParams.WAIT_FOR_FINAL_STATE;
import static org.apache.solr.common.params.CoreAdminParams.NAME;
import static org.apache.solr.handler.admin.CollectionsHandler.DEFAULT_COLLECTION_OP_TIMEOUT;
import static org.apache.solr.handler.admin.CollectionsHandler.waitForActiveCollection;
Expand Down Expand Up @@ -173,7 +172,6 @@ public static ZkNodeProps createRemoteMessage(CreateCollectionRequestBody reqBod
rawProperties.put(SHARDS_PROP, String.join(",", reqBody.shardNames));
rawProperties.put(PULL_REPLICAS, reqBody.pullReplicas);
rawProperties.put(TLOG_REPLICAS, reqBody.tlogReplicas);
rawProperties.put(WAIT_FOR_FINAL_STATE, reqBody.waitForFinalState);
rawProperties.put(PER_REPLICA_STATE, reqBody.perReplicaState);
rawProperties.put(ALIAS, reqBody.alias);
rawProperties.put(ASYNC, reqBody.async);
Expand Down Expand Up @@ -266,7 +264,6 @@ public static CreateCollectionRequestBody createRequestBodyFromV1Params(
requestBody.tlogReplicas = params.getInt(ZkStateReader.TLOG_REPLICAS);
requestBody.pullReplicas = params.getInt(ZkStateReader.PULL_REPLICAS);
requestBody.nrtReplicas = params.getInt(ZkStateReader.NRT_REPLICAS);
requestBody.waitForFinalState = params.getBool(WAIT_FOR_FINAL_STATE);
requestBody.perReplicaState = params.getBool(PER_REPLICA_STATE);
requestBody.alias = params.get(ALIAS);
requestBody.async = params.get(ASYNC);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import static org.apache.solr.common.params.CollectionAdminParams.PROPERTY_PREFIX;
import static org.apache.solr.common.params.CollectionAdminParams.SKIP_NODE_ASSIGNMENT;
import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
import static org.apache.solr.common.params.CommonAdminParams.WAIT_FOR_FINAL_STATE;
import static org.apache.solr.common.params.CoreAdminParams.DATA_DIR;
import static org.apache.solr.common.params.CoreAdminParams.INSTANCE_DIR;
import static org.apache.solr.common.params.CoreAdminParams.NAME;
Expand Down Expand Up @@ -114,7 +113,6 @@ public static ZkNodeProps createRemoteMessage(
insertIfNotNull(remoteMessage, DATA_DIR, requestBody.dataDir);
insertIfNotNull(remoteMessage, ULOG_DIR, requestBody.ulogDir);
insertIfNotNull(remoteMessage, REPLICA_TYPE, requestBody.type);
insertIfNotNull(remoteMessage, WAIT_FOR_FINAL_STATE, requestBody.waitForFinalState);
insertIfNotNull(remoteMessage, NRT_REPLICAS, requestBody.nrtReplicas);
insertIfNotNull(remoteMessage, TLOG_REPLICAS, requestBody.tlogReplicas);
insertIfNotNull(remoteMessage, PULL_REPLICAS, requestBody.pullReplicas);
Expand Down Expand Up @@ -146,7 +144,6 @@ public static CreateReplicaRequestBody createRequestBodyFromV1Params(SolrParams
requestBody.nrtReplicas = params.getInt(NRT_REPLICAS);
requestBody.tlogReplicas = params.getInt(TLOG_REPLICAS);
requestBody.pullReplicas = params.getInt(PULL_REPLICAS);
requestBody.waitForFinalState = params.getBool(WAIT_FOR_FINAL_STATE);
requestBody.followAliases = params.getBool(FOLLOW_ALIASES);
requestBody.async = params.get(ASYNC);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import static org.apache.solr.common.params.CollectionAdminParams.PROPERTY_PREFIX;
import static org.apache.solr.common.params.CollectionAdminParams.SHARD;
import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
import static org.apache.solr.common.params.CommonAdminParams.WAIT_FOR_FINAL_STATE;
import static org.apache.solr.common.params.CommonParams.NAME;
import static org.apache.solr.handler.admin.api.CreateCollection.copyPrefixedPropertiesWithoutPrefix;
import static org.apache.solr.security.PermissionNameProvider.Name.COLL_EDIT_PERM;
Expand Down Expand Up @@ -113,7 +112,6 @@ public static CreateShardRequestBody createRequestBodyFromV1Params(SolrParams pa
requestBody.nodeSet = Arrays.asList(nodeSetStr.split(","));
}
}
requestBody.waitForFinalState = params.getBool(WAIT_FOR_FINAL_STATE);
requestBody.followAliases = params.getBool(FOLLOW_ALIASES);
requestBody.async = params.get(ASYNC);
requestBody.properties =
Expand Down Expand Up @@ -152,7 +150,6 @@ public static ZkNodeProps createRemoteMessage(
insertIfNotNull(remoteMessage, NRT_REPLICAS, requestBody.nrtReplicas);
insertIfNotNull(remoteMessage, TLOG_REPLICAS, requestBody.tlogReplicas);
insertIfNotNull(remoteMessage, PULL_REPLICAS, requestBody.pullReplicas);
insertIfNotNull(remoteMessage, WAIT_FOR_FINAL_STATE, requestBody.waitForFinalState);
insertIfNotNull(remoteMessage, FOLLOW_ALIASES, requestBody.followAliases);
insertIfNotNull(remoteMessage, ASYNC, requestBody.async);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import static org.apache.solr.common.params.CollectionParams.SOURCE_NODES;
import static org.apache.solr.common.params.CollectionParams.TARGET_NODES;
import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
import static org.apache.solr.common.params.CommonAdminParams.WAIT_FOR_FINAL_STATE;
import static org.apache.solr.handler.admin.CollectionsHandler.DEFAULT_COLLECTION_OP_TIMEOUT;
import static org.apache.solr.security.PermissionNameProvider.Name.COLL_EDIT_PERM;

Expand Down Expand Up @@ -83,7 +82,6 @@ public ZkNodeProps createRemoteMessage(MigrateReplicasRequestBody requestBody) {
}
insertIfNotNull(remoteMessage, SOURCE_NODES, requestBody.sourceNodes);
insertIfNotNull(remoteMessage, TARGET_NODES, requestBody.targetNodes);
insertIfNotNull(remoteMessage, WAIT_FOR_FINAL_STATE, requestBody.waitForFinalState);
insertIfNotNull(remoteMessage, ASYNC, requestBody.async);
} else {
throw new SolrException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import static org.apache.solr.common.params.CollectionParams.SOURCE_NODE;
import static org.apache.solr.common.params.CollectionParams.TARGET_NODE;
import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
import static org.apache.solr.common.params.CommonAdminParams.WAIT_FOR_FINAL_STATE;
import static org.apache.solr.handler.admin.CollectionsHandler.DEFAULT_COLLECTION_OP_TIMEOUT;
import static org.apache.solr.security.PermissionNameProvider.Name.COLL_EDIT_PERM;

Expand Down Expand Up @@ -82,7 +81,6 @@ public ZkNodeProps createRemoteMessage(String nodeName, ReplaceNodeRequestBody r
remoteMessage.put(SOURCE_NODE, nodeName);
if (requestBody != null) {
insertIfValueNotNull(remoteMessage, TARGET_NODE, requestBody.targetNodeName);
insertIfValueNotNull(remoteMessage, WAIT_FOR_FINAL_STATE, requestBody.waitForFinalState);
insertIfValueNotNull(remoteMessage, ASYNC, requestBody.async);
}
remoteMessage.put(QUEUE_OPERATION, CollectionAction.REPLACENODE.toLower());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public void testSomeNodes() throws Exception {
cluster.getSolrClient(),
"/api/cluster/replicas/balance",
Utils.getReflectWriter(
new BalanceReplicasRequestBody(new HashSet<>(l.subList(1, 4)), true, null)));
new BalanceReplicasRequestBody(new HashSet<>(l.subList(1, 4)), null)));

collection = cloudClient.getClusterState().getCollectionOrNull(coll, false);
log.debug("### After balancing: {}", collection);
Expand Down
Loading