Skip to content

Commit

Permalink
SOLR-16995: Configure replication behaviour for each replica type (#2207
Browse files Browse the repository at this point in the history
)
  • Loading branch information
pvcnt committed Feb 19, 2024
1 parent bb0675b commit 2ec0cbf
Show file tree
Hide file tree
Showing 9 changed files with 71 additions and 67 deletions.
4 changes: 0 additions & 4 deletions solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,6 @@ public CloudDescriptor(CoreDescriptor cd, String coreName, Properties props) {
}
}

public boolean requiresTransactionLog() {
return this.replicaType != Replica.Type.PULL;
}

public Replica.State getLastPublished() {
return lastPublished;
}
Expand Down
24 changes: 14 additions & 10 deletions solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
Original file line number Diff line number Diff line change
Expand Up @@ -336,14 +336,14 @@ public final void doRecovery(SolrCore core) throws Exception {
// we can lose our core descriptor, so store it now
this.coreDescriptor = core.getCoreDescriptor();

if (this.coreDescriptor.getCloudDescriptor().requiresTransactionLog()) {
if (this.coreDescriptor.getCloudDescriptor().getReplicaType().requireTransactionLog) {
doSyncOrReplicateRecovery(core);
} else {
doReplicateOnlyRecovery(core);
}
}

private final void doReplicateOnlyRecovery(SolrCore core) throws InterruptedException {
private void doReplicateOnlyRecovery(SolrCore core) {
final RTimer timer = new RTimer();
boolean successfulRecovery = false;

Expand Down Expand Up @@ -396,8 +396,10 @@ private final void doReplicateOnlyRecovery(SolrCore core) throws InterruptedExce
log.info("Starting Replication Recovery.");

try {
log.info("Stopping background replicate from leader process");
zkController.stopReplicationFromLeader(coreName);
if (replicaType.replicateFromLeader) {
log.info("Stopping background replicate from leader process");
zkController.stopReplicationFromLeader(coreName);
}
replicate(zkController.getNodeName(), core, leaderprops);

if (isClosed()) {
Expand All @@ -417,8 +419,10 @@ private final void doReplicateOnlyRecovery(SolrCore core) throws InterruptedExce
log.error("Error while trying to recover. core={}", coreName, e);
} finally {
if (successfulRecovery) {
log.info("Restarting background replicate from leader process");
zkController.startReplicationFromLeader(coreName, false);
if (replicaType.replicateFromLeader) {
log.info("Restarting background replicate from leader process");
zkController.startReplicationFromLeader(coreName, false);
}
log.info("Registering as Active after recovery.");
try {
zkController.publish(this.coreDescriptor, Replica.State.ACTIVE);
Expand Down Expand Up @@ -583,7 +587,7 @@ public final void doSyncOrReplicateRecovery(SolrCore core) throws Exception {
}
}

if (replicaType == Replica.Type.TLOG) {
if (replicaType.replicateFromLeader) {
zkController.stopReplicationFromLeader(coreName);
}

Expand Down Expand Up @@ -735,7 +739,7 @@ public final void doSyncOrReplicateRecovery(SolrCore core) throws Exception {
if (successfulRecovery) {
log.info("Registering as Active after recovery.");
try {
if (replicaType == Replica.Type.TLOG) {
if (replicaType.replicateFromLeader) {
zkController.startReplicationFromLeader(coreName, true);
}
zkController.publish(this.coreDescriptor, Replica.State.ACTIVE);
Expand Down Expand Up @@ -770,8 +774,8 @@ public final void doSyncOrReplicateRecovery(SolrCore core) throws Exception {
* @param ourUrl if the leader url is the same as our url, we will skip trying to connect
* @return the leader replica, or null if closed
*/
private final Replica pingLeader(
String ourUrl, CoreDescriptor coreDesc, boolean mayPutReplicaAsDown) throws Exception {
private Replica pingLeader(String ourUrl, CoreDescriptor coreDesc, boolean mayPutReplicaAsDown)
throws Exception {
int numTried = 0;
while (true) {
CloudDescriptor cloudDesc = coreDesc.getCloudDescriptor();
Expand Down
29 changes: 3 additions & 26 deletions solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.lang.invoke.MethodHandles;
import org.apache.lucene.index.IndexCommit;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.CoreContainer;
Expand Down Expand Up @@ -88,31 +87,9 @@ public void startReplication(boolean switchTransactionLog) {
log.info("Will start replication from leader with poll interval: {}", pollIntervalStr);

NamedList<Object> followerConfig = new NamedList<>();
followerConfig.add("fetchFromLeader", Boolean.TRUE);

// don't commit on leader version zero for PULL replicas as PULL should only get its index
// state from leader
boolean skipCommitOnLeaderVersionZero = switchTransactionLog;
if (!skipCommitOnLeaderVersionZero) {
CloudDescriptor cloudDescriptor = core.getCoreDescriptor().getCloudDescriptor();
if (cloudDescriptor != null) {
Replica replica =
cc.getZkController()
.getZkStateReader()
.getCollection(cloudDescriptor.getCollectionName())
.getSlice(cloudDescriptor.getShardId())
.getReplica(cloudDescriptor.getCoreNodeName());
if (replica != null && replica.getType() == Replica.Type.PULL) {
// only set this to true if we're a PULL replica, otherwise use value of
// switchTransactionLog
skipCommitOnLeaderVersionZero = true;
}
}
}
followerConfig.add(
ReplicationHandler.SKIP_COMMIT_ON_LEADER_VERSION_ZERO, skipCommitOnLeaderVersionZero);

followerConfig.add("pollInterval", pollIntervalStr);
followerConfig.add(ReplicationHandler.FETCH_FROM_LEADER, Boolean.TRUE);
followerConfig.add(ReplicationHandler.SKIP_COMMIT_ON_LEADER_VERSION_ZERO, Boolean.TRUE);
followerConfig.add(ReplicationHandler.POLL_INTERVAL, pollIntervalStr);
NamedList<Object> replicationConfig = new NamedList<>();
replicationConfig.add("follower", followerConfig);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,9 +270,11 @@ void runLeaderProcess(boolean weAreReplacement, int pauseBeforeStart)

if (!isClosed) {
try {
if (replicaType == Replica.Type.TLOG) {
if (replicaType.replicateFromLeader) {
// stop replicate from old leader
zkController.stopReplicationFromLeader(coreName);
}
if (replicaType == Replica.Type.TLOG) {
if (weAreReplacement) {
try (SolrCore core = cc.getCore(coreName)) {
Future<UpdateLog.RecoveryInfo> future =
Expand Down
24 changes: 13 additions & 11 deletions solr/core/src/java/org/apache/solr/cloud/ZkController.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@
import org.apache.solr.common.cloud.PerReplicaStates;
import org.apache.solr.common.cloud.PerReplicaStatesOps;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Replica.Type;
import org.apache.solr.common.cloud.SecurityAwareZkACLProvider;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.SolrZkClient;
Expand Down Expand Up @@ -1307,16 +1306,19 @@ public String register(
boolean joinAtHead = replica.getBool(SliceMutator.PREFERRED_LEADER_PROP, false);
if (replica.getType().leaderEligible) {
joinElection(desc, afterExpiration, joinAtHead);
} else if (replica.getType() == Type.PULL) {
} else {
if (joinAtHead) {
log.warn(
"Replica {} was designated as preferred leader but it's type is {}, It won't join election",
"Replica {} was designated as preferred leader but its type is {}, It won't join election",
coreZkNodeName,
Type.PULL);
replica.getType());
}
if (log.isDebugEnabled()) {
log.debug(
"Replica {} skipping election because its type is {}",
coreZkNodeName,
replica.getType());
}
log.debug(
"Replica {} skipping election because it's type is {}", coreZkNodeName, Type.PULL);
startReplicationFromLeader(coreName, false);
}
} catch (InterruptedException e) {
// Restore the interrupted status
Expand Down Expand Up @@ -1391,8 +1393,8 @@ public String register(
cc,
afterExpiration);
if (!didRecovery) {
if (isTlogReplicaAndNotLeader) {
startReplicationFromLeader(coreName, true);
if (replica.getType().replicateFromLeader && !isLeader) {
startReplicationFromLeader(coreName, replica.getType().requireTransactionLog);
}
publish(desc, Replica.State.ACTIVE);
}
Expand Down Expand Up @@ -2400,15 +2402,15 @@ public void rejoinShardLeaderElection(SolrParams params) {

try (SolrCore core = cc.getCore(coreName)) {
Replica.Type replicaType = core.getCoreDescriptor().getCloudDescriptor().getReplicaType();
if (replicaType == Type.TLOG) {
if (replicaType.replicateFromLeader) {
String leaderUrl =
getLeader(
core.getCoreDescriptor().getCloudDescriptor(), cloudConfig.getLeaderVoteWait());
if (!leaderUrl.equals(ourUrl)) {
// restart the replication thread to ensure the replication is running in each new
// replica especially if previous role is "leader" (i.e., no replication thread)
stopReplicationFromLeader(coreName);
startReplicationFromLeader(coreName, true);
startReplicationFromLeader(coreName, replicaType.requireTransactionLog);
}
}
}
Expand Down
15 changes: 6 additions & 9 deletions solr/core/src/java/org/apache/solr/core/CoreContainer.java
Original file line number Diff line number Diff line change
Expand Up @@ -2051,15 +2051,13 @@ public void reload(String name, UUID coreId) {
if (docCollection != null) {
Replica replica = docCollection.getReplica(cd.getCloudDescriptor().getCoreNodeName());
assert replica != null : cd.getCloudDescriptor().getCoreNodeName() + " had no replica";
if (replica.getType() == Replica.Type.TLOG) { // TODO: needed here?
if (replica.getType().replicateFromLeader) {
getZkController().stopReplicationFromLeader(core.getName());
if (!cd.getCloudDescriptor().isLeader()) {
getZkController().startReplicationFromLeader(newCore.getName(), true);
getZkController()
.startReplicationFromLeader(
newCore.getName(), replica.getType().requireTransactionLog);
}

} else if (replica.getType() == Replica.Type.PULL) {
getZkController().stopReplicationFromLeader(core.getName());
getZkController().startReplicationFromLeader(newCore.getName(), false);
}
}
success = true;
Expand Down Expand Up @@ -2167,9 +2165,8 @@ public void unload(
if (zkSys.getZkController() != null) {
// cancel recovery in cloud mode
core.getSolrCoreState().cancelRecovery();
if (cd.getCloudDescriptor().getReplicaType() == Replica.Type.PULL
|| cd.getCloudDescriptor().getReplicaType() == Replica.Type.TLOG) {
// Stop replication if this is part of a pull/tlog replica before closing the core
if (cd.getCloudDescriptor().getReplicaType().replicateFromLeader) {
// Stop replication before closing the core
zkSys.getZkController().stopReplicationFromLeader(name);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1801,6 +1801,8 @@ private static Long readIntervalNs(String interval) {
// in case of TLOG replica, if leaderVersion = zero, don't do commit
// otherwise updates from current tlog won't copied over properly to the new tlog, leading to data
// loss
// don't commit on leader version zero for PULL replicas as PULL should only get its index
// state from leader
public static final String SKIP_COMMIT_ON_LEADER_VERSION_ZERO = "skipCommitOnLeaderVersionZero";

/**
Expand Down
7 changes: 5 additions & 2 deletions solr/core/src/java/org/apache/solr/update/UpdateHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,13 @@ public UpdateHandler(SolrCore core, UpdateLog updateLog) {
parseEventListeners();
PluginInfo ulogPluginInfo = core.getSolrConfig().getPluginInfo(UpdateLog.class.getName());

// If this is a replica of type PULL, don't create the update log
// If this replica doesn't require a transaction log, don't create it
boolean skipUpdateLog =
core.getCoreDescriptor().getCloudDescriptor() != null
&& !core.getCoreDescriptor().getCloudDescriptor().requiresTransactionLog();
&& !core.getCoreDescriptor()
.getCloudDescriptor()
.getReplicaType()
.requireTransactionLog;
if (updateLog == null
&& ulogPluginInfo != null
&& ulogPluginInfo.isEnabled()
Expand Down
29 changes: 25 additions & 4 deletions solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
Original file line number Diff line number Diff line change
Expand Up @@ -104,27 +104,48 @@ public enum Type {
* support NRT (soft commits) and RTG. Any {@link Type#NRT} replica can become a leader. A shard
* leader will forward updates to all active {@link Type#NRT} and {@link Type#TLOG} replicas.
*/
NRT(true, CollectionAdminParams.NRT_REPLICAS),
NRT(true, true, false, CollectionAdminParams.NRT_REPLICAS),
/**
* Writes to transaction log, but not to index, uses replication. Any {@link Type#TLOG} replica
* can become leader (by first applying all local transaction log elements). If a replica is of
* type {@link Type#TLOG} but is also the leader, it will behave as a {@link Type#NRT}. A shard
* leader will forward updates to all active {@link Type#NRT} and {@link Type#TLOG} replicas.
*/
TLOG(true, CollectionAdminParams.TLOG_REPLICAS),
TLOG(true, true, true, CollectionAdminParams.TLOG_REPLICAS),
/**
* Doesn’t index or writes to transaction log. Just replicates from {@link Type#NRT} or {@link
* Type#TLOG} replicas. {@link Type#PULL} replicas can’t become shard leaders (i.e., if there
* are only pull replicas in the collection at some point, updates will fail same as if there is
* no leaders, queries continue to work), so they don’t even participate in elections.
*/
PULL(false, CollectionAdminParams.PULL_REPLICAS);
PULL(false, false, true, CollectionAdminParams.PULL_REPLICAS);

/** Whether replicas of this type join the leader election and can be elected. */
public final boolean leaderEligible;

/**
* Whether replicas of this type require a transaction log. A transaction log will be created
* only if this is {@code true}.
*/
public final boolean requireTransactionLog;

/**
* Whether replicas of this type continuously replicate from the leader, if they are not
* themselves the leader.
*/
public final boolean replicateFromLeader;

/** Name of the property in messages that contains the number of replicas of this type. */
public final String numReplicasPropertyName;

Type(boolean leaderEligible, String numReplicasPropertyName) {
Type(
boolean leaderEligible,
boolean requireTransactionLog,
boolean replicateFromLeader,
String numReplicasPropertyName) {
this.leaderEligible = leaderEligible;
this.requireTransactionLog = requireTransactionLog;
this.replicateFromLeader = replicateFromLeader;
this.numReplicasPropertyName = numReplicasPropertyName;
}

Expand Down

0 comments on commit 2ec0cbf

Please sign in to comment.