diff --git a/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java b/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java index 31be35cc752..f26819aa021 100644 --- a/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java +++ b/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java @@ -70,10 +70,6 @@ public CloudDescriptor(CoreDescriptor cd, String coreName, Properties props) { } } - public boolean requiresTransactionLog() { - return this.replicaType != Replica.Type.PULL; - } - public Replica.State getLastPublished() { return lastPublished; } diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java index f26672b1960..9ea837378d7 100644 --- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java +++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java @@ -336,14 +336,14 @@ public final void doRecovery(SolrCore core) throws Exception { // we can lose our core descriptor, so store it now this.coreDescriptor = core.getCoreDescriptor(); - if (this.coreDescriptor.getCloudDescriptor().requiresTransactionLog()) { + if (this.coreDescriptor.getCloudDescriptor().getReplicaType().requireTransactionLog) { doSyncOrReplicateRecovery(core); } else { doReplicateOnlyRecovery(core); } } - private final void doReplicateOnlyRecovery(SolrCore core) throws InterruptedException { + private void doReplicateOnlyRecovery(SolrCore core) { final RTimer timer = new RTimer(); boolean successfulRecovery = false; @@ -396,8 +396,10 @@ private final void doReplicateOnlyRecovery(SolrCore core) throws InterruptedExce log.info("Starting Replication Recovery."); try { - log.info("Stopping background replicate from leader process"); - zkController.stopReplicationFromLeader(coreName); + if (replicaType.replicateFromLeader) { + log.info("Stopping background replicate from leader process"); + zkController.stopReplicationFromLeader(coreName); + } replicate(zkController.getNodeName(), core, leaderprops); if (isClosed()) { @@ -417,8 +419,10 @@ private final void doReplicateOnlyRecovery(SolrCore core) throws InterruptedExce log.error("Error while trying to recover. core={}", coreName, e); } finally { if (successfulRecovery) { - log.info("Restarting background replicate from leader process"); - zkController.startReplicationFromLeader(coreName, false); + if (replicaType.replicateFromLeader) { + log.info("Restarting background replicate from leader process"); + zkController.startReplicationFromLeader(coreName, false); + } log.info("Registering as Active after recovery."); try { zkController.publish(this.coreDescriptor, Replica.State.ACTIVE); @@ -583,7 +587,7 @@ public final void doSyncOrReplicateRecovery(SolrCore core) throws Exception { } } - if (replicaType == Replica.Type.TLOG) { + if (replicaType.replicateFromLeader) { zkController.stopReplicationFromLeader(coreName); } @@ -735,7 +739,7 @@ public final void doSyncOrReplicateRecovery(SolrCore core) throws Exception { if (successfulRecovery) { log.info("Registering as Active after recovery."); try { - if (replicaType == Replica.Type.TLOG) { + if (replicaType.replicateFromLeader) { zkController.startReplicationFromLeader(coreName, true); } zkController.publish(this.coreDescriptor, Replica.State.ACTIVE); @@ -770,8 +774,8 @@ public final void doSyncOrReplicateRecovery(SolrCore core) throws Exception { * @param ourUrl if the leader url is the same as our url, we will skip trying to connect * @return the leader replica, or null if closed */ - private final Replica pingLeader( - String ourUrl, CoreDescriptor coreDesc, boolean mayPutReplicaAsDown) throws Exception { + private Replica pingLeader(String ourUrl, CoreDescriptor coreDesc, boolean mayPutReplicaAsDown) + throws Exception { int numTried = 0; while (true) { CloudDescriptor cloudDesc = coreDesc.getCloudDescriptor(); diff --git a/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java b/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java index 7abe46fe0c3..43390e63e9b 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java +++ b/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java @@ -20,7 +20,6 @@ import java.lang.invoke.MethodHandles; import org.apache.lucene.index.IndexCommit; import org.apache.solr.common.SolrException; -import org.apache.solr.common.cloud.Replica; import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.util.NamedList; import org.apache.solr.core.CoreContainer; @@ -88,31 +87,9 @@ public void startReplication(boolean switchTransactionLog) { log.info("Will start replication from leader with poll interval: {}", pollIntervalStr); NamedList followerConfig = new NamedList<>(); - followerConfig.add("fetchFromLeader", Boolean.TRUE); - - // don't commit on leader version zero for PULL replicas as PULL should only get its index - // state from leader - boolean skipCommitOnLeaderVersionZero = switchTransactionLog; - if (!skipCommitOnLeaderVersionZero) { - CloudDescriptor cloudDescriptor = core.getCoreDescriptor().getCloudDescriptor(); - if (cloudDescriptor != null) { - Replica replica = - cc.getZkController() - .getZkStateReader() - .getCollection(cloudDescriptor.getCollectionName()) - .getSlice(cloudDescriptor.getShardId()) - .getReplica(cloudDescriptor.getCoreNodeName()); - if (replica != null && replica.getType() == Replica.Type.PULL) { - // only set this to true if we're a PULL replica, otherwise use value of - // switchTransactionLog - skipCommitOnLeaderVersionZero = true; - } - } - } - followerConfig.add( - ReplicationHandler.SKIP_COMMIT_ON_LEADER_VERSION_ZERO, skipCommitOnLeaderVersionZero); - - followerConfig.add("pollInterval", pollIntervalStr); + followerConfig.add(ReplicationHandler.FETCH_FROM_LEADER, Boolean.TRUE); + followerConfig.add(ReplicationHandler.SKIP_COMMIT_ON_LEADER_VERSION_ZERO, Boolean.TRUE); + followerConfig.add(ReplicationHandler.POLL_INTERVAL, pollIntervalStr); NamedList replicationConfig = new NamedList<>(); replicationConfig.add("follower", followerConfig); diff --git a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java index e1768e0cd37..116aee57d3f 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java +++ b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java @@ -270,9 +270,11 @@ void runLeaderProcess(boolean weAreReplacement, int pauseBeforeStart) if (!isClosed) { try { - if (replicaType == Replica.Type.TLOG) { + if (replicaType.replicateFromLeader) { // stop replicate from old leader zkController.stopReplicationFromLeader(coreName); + } + if (replicaType == Replica.Type.TLOG) { if (weAreReplacement) { try (SolrCore core = cc.getCore(coreName)) { Future future = diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java index 51aa6e024c8..90442818718 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java +++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java @@ -78,7 +78,6 @@ import org.apache.solr.common.cloud.PerReplicaStates; import org.apache.solr.common.cloud.PerReplicaStatesOps; import org.apache.solr.common.cloud.Replica; -import org.apache.solr.common.cloud.Replica.Type; import org.apache.solr.common.cloud.SecurityAwareZkACLProvider; import org.apache.solr.common.cloud.Slice; import org.apache.solr.common.cloud.SolrZkClient; @@ -1307,16 +1306,19 @@ public String register( boolean joinAtHead = replica.getBool(SliceMutator.PREFERRED_LEADER_PROP, false); if (replica.getType().leaderEligible) { joinElection(desc, afterExpiration, joinAtHead); - } else if (replica.getType() == Type.PULL) { + } else { if (joinAtHead) { log.warn( - "Replica {} was designated as preferred leader but it's type is {}, It won't join election", + "Replica {} was designated as preferred leader but its type is {}, It won't join election", coreZkNodeName, - Type.PULL); + replica.getType()); + } + if (log.isDebugEnabled()) { + log.debug( + "Replica {} skipping election because its type is {}", + coreZkNodeName, + replica.getType()); } - log.debug( - "Replica {} skipping election because it's type is {}", coreZkNodeName, Type.PULL); - startReplicationFromLeader(coreName, false); } } catch (InterruptedException e) { // Restore the interrupted status @@ -1391,8 +1393,8 @@ public String register( cc, afterExpiration); if (!didRecovery) { - if (isTlogReplicaAndNotLeader) { - startReplicationFromLeader(coreName, true); + if (replica.getType().replicateFromLeader && !isLeader) { + startReplicationFromLeader(coreName, replica.getType().requireTransactionLog); } publish(desc, Replica.State.ACTIVE); } @@ -2400,7 +2402,7 @@ public void rejoinShardLeaderElection(SolrParams params) { try (SolrCore core = cc.getCore(coreName)) { Replica.Type replicaType = core.getCoreDescriptor().getCloudDescriptor().getReplicaType(); - if (replicaType == Type.TLOG) { + if (replicaType.replicateFromLeader) { String leaderUrl = getLeader( core.getCoreDescriptor().getCloudDescriptor(), cloudConfig.getLeaderVoteWait()); @@ -2408,7 +2410,7 @@ public void rejoinShardLeaderElection(SolrParams params) { // restart the replication thread to ensure the replication is running in each new // replica especially if previous role is "leader" (i.e., no replication thread) stopReplicationFromLeader(coreName); - startReplicationFromLeader(coreName, true); + startReplicationFromLeader(coreName, replicaType.requireTransactionLog); } } } diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java index 8af5ecc272e..3d66853a781 100644 --- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java +++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java @@ -2051,15 +2051,13 @@ public void reload(String name, UUID coreId) { if (docCollection != null) { Replica replica = docCollection.getReplica(cd.getCloudDescriptor().getCoreNodeName()); assert replica != null : cd.getCloudDescriptor().getCoreNodeName() + " had no replica"; - if (replica.getType() == Replica.Type.TLOG) { // TODO: needed here? + if (replica.getType().replicateFromLeader) { getZkController().stopReplicationFromLeader(core.getName()); if (!cd.getCloudDescriptor().isLeader()) { - getZkController().startReplicationFromLeader(newCore.getName(), true); + getZkController() + .startReplicationFromLeader( + newCore.getName(), replica.getType().requireTransactionLog); } - - } else if (replica.getType() == Replica.Type.PULL) { - getZkController().stopReplicationFromLeader(core.getName()); - getZkController().startReplicationFromLeader(newCore.getName(), false); } } success = true; @@ -2167,9 +2165,8 @@ public void unload( if (zkSys.getZkController() != null) { // cancel recovery in cloud mode core.getSolrCoreState().cancelRecovery(); - if (cd.getCloudDescriptor().getReplicaType() == Replica.Type.PULL - || cd.getCloudDescriptor().getReplicaType() == Replica.Type.TLOG) { - // Stop replication if this is part of a pull/tlog replica before closing the core + if (cd.getCloudDescriptor().getReplicaType().replicateFromLeader) { + // Stop replication before closing the core zkSys.getZkController().stopReplicationFromLeader(name); } } diff --git a/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java b/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java index a5451dc1b2b..02d6745b234 100644 --- a/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java @@ -1801,6 +1801,8 @@ private static Long readIntervalNs(String interval) { // in case of TLOG replica, if leaderVersion = zero, don't do commit // otherwise updates from current tlog won't copied over properly to the new tlog, leading to data // loss + // don't commit on leader version zero for PULL replicas as PULL should only get its index + // state from leader public static final String SKIP_COMMIT_ON_LEADER_VERSION_ZERO = "skipCommitOnLeaderVersionZero"; /** diff --git a/solr/core/src/java/org/apache/solr/update/UpdateHandler.java b/solr/core/src/java/org/apache/solr/update/UpdateHandler.java index cfcd32fb4cf..f0ade81e090 100644 --- a/solr/core/src/java/org/apache/solr/update/UpdateHandler.java +++ b/solr/core/src/java/org/apache/solr/update/UpdateHandler.java @@ -116,10 +116,13 @@ public UpdateHandler(SolrCore core, UpdateLog updateLog) { parseEventListeners(); PluginInfo ulogPluginInfo = core.getSolrConfig().getPluginInfo(UpdateLog.class.getName()); - // If this is a replica of type PULL, don't create the update log + // If this replica doesn't require a transaction log, don't create it boolean skipUpdateLog = core.getCoreDescriptor().getCloudDescriptor() != null - && !core.getCoreDescriptor().getCloudDescriptor().requiresTransactionLog(); + && !core.getCoreDescriptor() + .getCloudDescriptor() + .getReplicaType() + .requireTransactionLog; if (updateLog == null && ulogPluginInfo != null && ulogPluginInfo.isEnabled() diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java b/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java index abeef4a9fc6..0d4cd3afffb 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java @@ -104,27 +104,48 @@ public enum Type { * support NRT (soft commits) and RTG. Any {@link Type#NRT} replica can become a leader. A shard * leader will forward updates to all active {@link Type#NRT} and {@link Type#TLOG} replicas. */ - NRT(true, CollectionAdminParams.NRT_REPLICAS), + NRT(true, true, false, CollectionAdminParams.NRT_REPLICAS), /** * Writes to transaction log, but not to index, uses replication. Any {@link Type#TLOG} replica * can become leader (by first applying all local transaction log elements). If a replica is of * type {@link Type#TLOG} but is also the leader, it will behave as a {@link Type#NRT}. A shard * leader will forward updates to all active {@link Type#NRT} and {@link Type#TLOG} replicas. */ - TLOG(true, CollectionAdminParams.TLOG_REPLICAS), + TLOG(true, true, true, CollectionAdminParams.TLOG_REPLICAS), /** * Doesn’t index or writes to transaction log. Just replicates from {@link Type#NRT} or {@link * Type#TLOG} replicas. {@link Type#PULL} replicas can’t become shard leaders (i.e., if there * are only pull replicas in the collection at some point, updates will fail same as if there is * no leaders, queries continue to work), so they don’t even participate in elections. */ - PULL(false, CollectionAdminParams.PULL_REPLICAS); + PULL(false, false, true, CollectionAdminParams.PULL_REPLICAS); + /** Whether replicas of this type join the leader election and can be elected. */ public final boolean leaderEligible; + + /** + * Whether replicas of this type require a transaction log. A transaction log will be created + * only if this is {@code true}. + */ + public final boolean requireTransactionLog; + + /** + * Whether replicas of this type continuously replicate from the leader, if they are not + * themselves the leader. + */ + public final boolean replicateFromLeader; + + /** Name of the property in messages that contains the number of replicas of this type. */ public final String numReplicasPropertyName; - Type(boolean leaderEligible, String numReplicasPropertyName) { + Type( + boolean leaderEligible, + boolean requireTransactionLog, + boolean replicateFromLeader, + String numReplicasPropertyName) { this.leaderEligible = leaderEligible; + this.requireTransactionLog = requireTransactionLog; + this.replicateFromLeader = replicateFromLeader; this.numReplicasPropertyName = numReplicasPropertyName; }