Skip to content

Commit

Permalink
Miscellaneous PRS fixes, backported from 9.x
Browse files Browse the repository at this point in the history
Co-authored-by: Noble Paul <noble@apache.org>
  • Loading branch information
Ishan Chattopadhyaya and noblepaul committed Oct 23, 2023
1 parent 32b6d4c commit 962c926
Show file tree
Hide file tree
Showing 7 changed files with 303 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -185,11 +185,21 @@ void runLeaderProcess(boolean weAreReplacement, int pauseBeforeStartMs)
assert zkController != null;
assert zkController.getOverseer() != null;
DocCollection coll = zkStateReader.getCollection(this.collection);
if (coll == null || coll.getStateFormat() < 2 || ZkController.sendToOverseer(coll, id)) {

if (coll == null || ZkController.updateStateDotJson(coll, id)) {
zkController.getOverseer().offerStateUpdate(Utils.toJSON(m));
} else {
PerReplicaStates prs = PerReplicaStates.fetch(coll.getZNode(), zkClient, coll.getPerReplicaStates());
PerReplicaStatesOps.flipLeader(zkStateReader.getClusterState().getCollection(collection).getSlice(shardId).getReplicaNames(), id, prs)
}
if (coll != null && coll.isPerReplicaState()) {
PerReplicaStates prs =
PerReplicaStates.fetch(coll.getZNode(), zkClient, coll.getPerReplicaStates());
PerReplicaStatesOps.flipLeader(
zkStateReader
.getClusterState()
.getCollection(collection)
.getSlice(shardId)
.getReplicaNames(),
id,
prs)
.persist(coll.getZNode(), zkStateReader.getZkClient());
}
}
Expand Down
65 changes: 50 additions & 15 deletions solr/core/src/java/org/apache/solr/cloud/ZkController.java
Original file line number Diff line number Diff line change
Expand Up @@ -1657,32 +1657,32 @@ public void publish(final CoreDescriptor cd, final Replica.State state, boolean
cd.getCloudDescriptor().setLastPublished(state);
}
DocCollection coll = zkStateReader.getCollection(collection);
if (forcePublish || sendToOverseer(coll, coreNodeName)) {
overseerJobQueue.offer(Utils.toJSON(m));
} else {
if (log.isDebugEnabled()) {
log.debug("bypassed overseer for message : {}", Utils.toJSONString(m));
}
PerReplicaStates perReplicaStates = PerReplicaStates.fetch(coll.getZNode(), zkClient, coll.getPerReplicaStates());
if (forcePublish || updateStateDotJson(coll, coreNodeName)) {
overseerJobQueue.offer(Utils.toJSON(m));
}
// extra handling for PRS, we need to write the PRS entries from this node directly,
// as overseer does not and should not handle those entries
if (coll != null && coll.isPerReplicaState() && coreNodeName != null) {
PerReplicaStates perReplicaStates =
PerReplicaStates.fetch(coll.getZNode(), zkClient, coll.getPerReplicaStates());
PerReplicaStatesOps.flipState(coreNodeName, state, perReplicaStates)
.persist(coll.getZNode(), zkClient);
}
} finally {
MDCLoggingContext.clear();
}
}

/**
* Whether a message needs to be sent to overseer or not
* Returns {@code true} if a message needs to be sent to overseer (or done in a distributed way)
* to update state.json for the collection
*/
static boolean sendToOverseer(DocCollection coll, String replicaName) {
static boolean updateStateDotJson(DocCollection coll, String replicaName) {
if (coll == null) return true;
if (coll.getStateFormat() < 2 || !coll.isPerReplicaState()) return true;
if (!coll.isPerReplicaState()) return true;
Replica r = coll.getReplica(replicaName);
if (r == null) return true;
Slice shard = coll.getSlice(r.slice);
if (shard == null) return true;//very unlikely
if (shard.getState() == Slice.State.RECOVERY) return true;
Slice shard = coll.getSlice(r.getSlice());
if (shard == null) return true; // very unlikely
if (shard.getParent() != null) return true;
for (Slice slice : coll.getSlices()) {
if (Objects.equals(shard.getName(), slice.getParent())) return true;
Expand Down Expand Up @@ -2765,9 +2765,44 @@ public boolean checkIfCoreNodeNameAlreadyExists(CoreDescriptor dcore) {
*/
public void publishNodeAsDown(String nodeName) {
log.info("Publish node={} as DOWN", nodeName);
try {
boolean sendToOverseer = false;
// Create a concurrently accessible set to avoid repeating collections
Set<String> processedCollections = new HashSet<>();
for (CoreDescriptor cd : cc.getCoreDescriptors()) {
String collName = cd.getCollectionName();
DocCollection coll = null;
if (collName != null
&& processedCollections.add(collName)
&& (coll = zkStateReader.getCollection(collName)) != null
&& coll.isPerReplicaState()) {
final List<String> replicasToDown = new ArrayList<>(coll.getSlicesMap().size());
coll.forEachReplica(
(s, replica) -> {
if (replica.getNodeName().equals(nodeName)) {
replicasToDown.add(replica.getName());
}
});
PerReplicaStatesOps.downReplicas(
replicasToDown,
PerReplicaStates.fetch(
coll.getZNode(), zkClient, coll.getPerReplicaStates()))
.persist(coll.getZNode(), zkClient);
}
if (coll != null && !coll.isPerReplicaState()) {
sendToOverseer = true;
}
}

// Only send downnode message to overseer if we have to. We are trying to avoid the overhead
// from PRS collections, as it takes awhile to process downnode message by loading
// the DocCollection even if it does no further processing.
// In the future, we should optimize the handling on Solr side to speed up PRS DocCollection
// read on operations that do not require actual replica information.
if(!sendToOverseer) return;

ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.DOWNNODE.toLower(),
ZkStateReader.NODE_NAME_PROP, nodeName);
try {
overseer.getStateUpdateQueue().offer(Utils.toJSON(m));
} catch (AlreadyClosedException e) {
log.info("Not publishing node as DOWN because a resource required to do so is already closed.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,12 @@
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.client.solrj.cloud.autoscaling.AlreadyExistsException;
import org.apache.solr.client.solrj.cloud.autoscaling.BadVersionException;
import org.apache.solr.client.solrj.cloud.autoscaling.DelegatingCloudManager;
import org.apache.solr.client.solrj.cloud.autoscaling.DelegatingClusterStateProvider;
import org.apache.solr.client.solrj.cloud.autoscaling.NotEmptyException;
import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
import org.apache.solr.client.solrj.cloud.autoscaling.VersionedData;
import org.apache.solr.client.solrj.impl.ClusterStateProvider;
import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.RefreshCollectionMessage;
import org.apache.solr.cloud.ZkController;
Expand Down Expand Up @@ -180,12 +183,23 @@ public void call(ClusterState clusterState, ZkNodeProps message, @SuppressWarnin
createCollectionZkNode(stateManager, collectionName, collectionParams);

if (isPRS) {
// In case of a PRS collection, create the collection structure directly instead of resubmitting
// to the overseer queue.
// In case of a PRS collection, create the collection structure directly instead of
// resubmitting to the overseer queue.
// TODO: Consider doing this for all collections, not just the PRS collections.
ZkWriteCommand command = new ClusterStateMutator(ocmh.cloudManager).createCollection(clusterState, message);
// TODO comment above achieved by switching the cluster to distributed state updates

// This code directly updates Zookeeper by creating the collection state.json. It is
// compatible with both distributed cluster state updates and Overseer based cluster state
// updates.

// TODO: Consider doing this for all collections, not just the PRS collections.
ZkWriteCommand command =
new ClusterStateMutator(ocmh.overseer.getSolrCloudManager())
.createCollection(clusterState, message);
byte[] data = Utils.toJSON(Collections.singletonMap(collectionName, command.collection));
ocmh.zkStateReader.getZkClient().create(collectionPath, data, CreateMode.PERSISTENT, true);
ocmh.overseer.getZkStateReader()
.getZkClient()
.create(collectionPath, data, CreateMode.PERSISTENT, true);
clusterState = clusterState.copyWith(collectionName, command.collection);
newColl = command.collection;
ocmh.overseer.submit(new RefreshCollectionMessage(collectionName));
Expand Down Expand Up @@ -279,8 +293,8 @@ public void call(ClusterState clusterState, ZkNodeProps message, @SuppressWarnin
// to the overseer queue.
// TODO: Consider doing this for all collections, not just the PRS collections.
ZkWriteCommand command = new SliceMutator(ocmh.cloudManager).addReplica(clusterState, props);
byte[] data = Utils.toJSON(Collections.singletonMap(collectionName, command.collection));
ocmh.zkStateReader.getZkClient().setData(collectionPath, data, true);
// byte[] data = Utils.toJSON(Collections.singletonMap(collectionName, command.collection));
// ocmh.zkStateReader.getZkClient().setData(collectionPath, data, true);
clusterState = clusterState.copyWith(collectionName, command.collection);
newColl = command.collection;
} else {
Expand Down Expand Up @@ -322,6 +336,11 @@ public void call(ClusterState clusterState, ZkNodeProps message, @SuppressWarnin
}
}
if(isPRS) {
byte[] data =
Utils.toJSON(
Collections.singletonMap(
collectionName, clusterState.getCollection(collectionName)));
zkStateReader.getZkClient().setData(collectionPath, data, true);
ocmh.overseer.submit(new RefreshCollectionMessage(collectionName));
}

Expand All @@ -347,21 +366,26 @@ public void call(ClusterState clusterState, ZkNodeProps message, @SuppressWarnin
shardRequestTracker.processResponses(results, shardHandler, false, null, Collections.emptySet());
@SuppressWarnings({"rawtypes"})
boolean failure = results.get("failure") != null && ((SimpleOrderedMap)results.get("failure")).size() > 0;
final Map<String, Replica> replicas;
if (isPRS) {
TimeOut timeout = new TimeOut(Integer.getInteger("solr.waitToSeeReplicasInStateTimeoutSeconds", 120), TimeUnit.SECONDS, timeSource); // could be a big cluster
PerReplicaStates prs = PerReplicaStates.fetch(collectionPath, ocmh.zkStateReader.getZkClient(), null);
TimeOut timeout =
new TimeOut(
Integer.getInteger("solr.waitToSeeReplicasInStateTimeoutSeconds", 120),
TimeUnit.SECONDS,
ocmh.timeSource); // could be a big cluster
PerReplicaStates prs =
PerReplicaStates.fetch(collectionPath, ocmh.overseer.getZkStateReader().getZkClient(), null);
while (!timeout.hasTimedOut()) {
if(prs.allActive()) break;
if (prs.allActive()) break;
Thread.sleep(100);
prs = PerReplicaStates.fetch(collectionPath, ocmh.zkStateReader.getZkClient(), null);
prs =
PerReplicaStates.fetch(collectionPath, ocmh.overseer.getZkStateReader().getZkClient(), null);
}
if (!prs.allActive()) {
if (prs.allActive()) {
// we have successfully found all replicas to be ACTIVE
} else {
failure = true;
} // we have successfully found all replicas to be ACTIVE

// Now ask Overseer to fetch the latest state of collection
// from ZK
ocmh.overseer.submit(new RefreshCollectionMessage(collectionName));
}
}
if (failure) {
// Let's cleanup as we hit an exception
Expand Down Expand Up @@ -428,6 +452,7 @@ public static List<ReplicaPosition> buildReplicaPositions(SolrCloudManager cloud
int numSlices = shardNames.size();
int maxShardsPerNode = message.getInt(MAX_SHARDS_PER_NODE, 1);
if (maxShardsPerNode == -1) maxShardsPerNode = Integer.MAX_VALUE;
cloudManager = wrapCloudManager(clusterState, cloudManager);

// we need to look at every node and see how many cores it serves
// add our new cores to existing nodes serving the least number of cores
Expand Down Expand Up @@ -481,6 +506,34 @@ public static List<ReplicaPosition> buildReplicaPositions(SolrCloudManager cloud
}
return replicaPositions;
}
// the cloud manager should reflect the latest internal cluster state
private static SolrCloudManager wrapCloudManager(
ClusterState clusterState, SolrCloudManager solrCloudManager) {
final ClusterStateProvider csp =
new DelegatingClusterStateProvider(solrCloudManager.getClusterStateProvider()) {
@Override
public ClusterState.CollectionRef getState(String collection) {
return clusterState.getCollectionRef(collection);
}

@Override
public ClusterState getClusterState() {
return clusterState;
}

@Override
public DocCollection getCollection(String name) throws IOException {
return clusterState.getCollection(name);
}
};

return new DelegatingCloudManager(solrCloudManager) {
@Override
public ClusterStateProvider getClusterStateProvider() {
return csp;
}
};
}

public static void checkReplicaTypes(ZkNodeProps message) {
int numTlogReplicas = message.getInt(TLOG_REPLICAS, 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Locale;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;

Expand All @@ -39,9 +40,7 @@
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.PerReplicaStatesOps;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.PerReplicaStates;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.util.Utils;
Expand Down Expand Up @@ -252,8 +251,6 @@ private ZkWriteCommand updateState(final ClusterState prevState, ZkNodeProps mes
log.info("Failed to update state because the replica does not exist, {}", message);
return ZkStateWriter.NO_OP;
}
boolean persistCollectionState = collection != null && collection.isPerReplicaState();

if (coreNodeName == null) {
coreNodeName = ClusterStateMutator.getAssignedCoreNodeName(collection,
message.getStr(ZkStateReader.NODE_NAME_PROP), message.getStr(ZkStateReader.CORE_NAME_PROP));
Expand All @@ -264,7 +261,6 @@ private ZkWriteCommand updateState(final ClusterState prevState, ZkNodeProps mes
log.info("Failed to update state because the replica does not exist, {}", message);
return ZkStateWriter.NO_OP;
}
persistCollectionState = true;
// if coreNodeName is null, auto assign one
coreNodeName = Assign.assignCoreNodeName(stateManager, collection);
}
Expand All @@ -279,7 +275,6 @@ private ZkWriteCommand updateState(final ClusterState prevState, ZkNodeProps mes
if (sliceName != null) {
log.debug("shard={} is already registered", sliceName);
}
persistCollectionState = true;
}
if (sliceName == null) {
//request new shardId
Expand All @@ -290,14 +285,14 @@ private ZkWriteCommand updateState(final ClusterState prevState, ZkNodeProps mes
}
sliceName = Assign.assignShard(collection, numShards);
log.info("Assigning new node to shard shard={}", sliceName);
persistCollectionState = true;
}

Slice slice = collection != null ? collection.getSlice(sliceName) : null;

Replica oldReplica = null;
Map<String, Object> replicaProps = new LinkedHashMap<>(message.getProperties());
if (slice != null) {
Replica oldReplica = slice.getReplica(coreNodeName);
oldReplica = slice.getReplica(coreNodeName);
if (oldReplica != null) {
if (oldReplica.containsKey(ZkStateReader.LEADER_PROP)) {
replicaProps.put(ZkStateReader.LEADER_PROP, oldReplica.get(ZkStateReader.LEADER_PROP));
Expand Down Expand Up @@ -348,10 +343,13 @@ private ZkWriteCommand updateState(final ClusterState prevState, ZkNodeProps mes
Map<String, Object> sliceProps = null;
Map<String, Replica> replicas;

boolean sliceChanged = true;
if (slice != null) {
Slice.State originalState = slice.getState();
collection = checkAndCompleteShardSplit(prevState, collection, coreNodeName, sliceName, replica);
// get the current slice again because it may have been updated due to checkAndCompleteShardSplit method
slice = collection.getSlice(sliceName);
sliceChanged = originalState != slice.getState();
sliceProps = slice.getProperties();
replicas = slice.getReplicasCopy();
} else {
Expand All @@ -366,12 +364,55 @@ private ZkWriteCommand updateState(final ClusterState prevState, ZkNodeProps mes

DocCollection newCollection = CollectionMutator.updateSlice(collectionName, collection, slice);
log.debug("Collection is now: {}", newCollection);
if (collection != null && collection.isPerReplicaState()) {
PerReplicaStates prs = PerReplicaStates.fetch(collection.getZNode(), zkClient, collection.getPerReplicaStates());
return new ZkWriteCommand(collectionName, newCollection, PerReplicaStatesOps.flipState(replica.getName(), replica.getState(), prs), persistCollectionState);
} else{
return new ZkWriteCommand(collectionName, newCollection);
if (collection != null && collection.isPerReplicaState() && oldReplica != null) {
if (!sliceChanged && !persistStateJson(replica, oldReplica, collection)) {
if (log.isDebugEnabled()) {
log.debug(
"state.json is not persisted slice/replica : {}/{} \n , old : {}, \n new {}",
replica.getSlice(),
replica.getName(),
Utils.toJSONString(oldReplica.getProperties()),
Utils.toJSONString(replica.getProperties()));
}
return ZkWriteCommand.NO_OP;
}
}
return new ZkWriteCommand(collectionName, newCollection);
}

/** Whether it is required to persist the state.json */
private boolean persistStateJson(Replica newReplica, Replica oldReplica, DocCollection coll) {
if (!Objects.equals(newReplica.getBaseUrl(), oldReplica.getBaseUrl())) return true;
if (!Objects.equals(newReplica.getCoreName(), oldReplica.getCoreName())) return true;
if (!Objects.equals(newReplica.getNodeName(), oldReplica.getNodeName())) return true;
if (!Objects.equals(newReplica.getState(), oldReplica.getState())) return true;
if (!Objects.equals(
newReplica.getProperties().get(ZkStateReader.FORCE_SET_STATE_PROP),
oldReplica.getProperties().get(ZkStateReader.FORCE_SET_STATE_PROP))) {
if (log.isInfoEnabled()) {
log.info(
"{} force_set_state is changed from {} -> {}",
newReplica.getName(),
oldReplica.getProperties().get(ZkStateReader.FORCE_SET_STATE_PROP),
newReplica.getProperties().get(ZkStateReader.FORCE_SET_STATE_PROP));
}
return true;
}
Slice slice = coll.getSlice(newReplica.getSlice());
// the slice may be in recovery
if (slice.getState() == Slice.State.RECOVERY) {
if (log.isInfoEnabled()) {
log.info("{} slice state_is_recovery", slice.getName());
}
return true;
}
if (Objects.equals(oldReplica.getProperties().get("state"), "recovering")) {
if (log.isInfoEnabled()) {
log.info("{} state_is_recovering", newReplica.getName());
}
return true;
}
return false;
}

/**
Expand Down

0 comments on commit 962c926

Please sign in to comment.