Skip to content

Commit

Permalink
Move primary term from ReplicationRequest to ConcreteShardRequest (#2…
Browse files Browse the repository at this point in the history
…5824)

Removes the primary term from the replication request and pushes it into the transport envelope. This makes it possible to remove the term from the ReplicationOperation universe. The primary term that is to be used for a replication operation is now determined in the reroute phase when the node decides to execute a primary action (and validated once the primary action gets to execute). This makes it possible to validate that the primary action was sent to the correct primary shard instance that it was meant to be sent to (currently we only validate primary actions using the allocation id, which can be reused for failed and reallocated primaries).

5.x. backport of #25822
  • Loading branch information
ywelsch committed Jul 21, 2017
1 parent 5da1c53 commit 5fc3f02
Show file tree
Hide file tree
Showing 6 changed files with 167 additions and 91 deletions.
Expand Up @@ -113,7 +113,6 @@ public void execute() throws Exception {
primaryResult = primary.perform(request);
final ReplicaRequest replicaRequest = primaryResult.replicaRequest();
if (replicaRequest != null) {
assert replicaRequest.primaryTerm() > 0 : "replicaRequest doesn't have a primary term";
if (logger.isTraceEnabled()) {
logger.trace("[{}] op [{}] completed on primary for request [{}]", primaryId, opType, request);
}
Expand Down Expand Up @@ -146,7 +145,7 @@ private void markUnavailableShardsAsStale(ReplicaRequest replicaRequest, Set<Str
for (String allocationId : Sets.difference(inSyncAllocationIds, availableAllocationIds)) {
// mark copy as stale
pendingActions.incrementAndGet();
replicasProxy.markShardCopyAsStale(replicaRequest.shardId(), allocationId, replicaRequest.primaryTerm(),
replicasProxy.markShardCopyAsStale(replicaRequest.shardId(), allocationId,
ReplicationOperation.this::decPendingAndFinishIfNeeded,
ReplicationOperation.this::onPrimaryDemoted,
throwable -> decPendingAndFinishIfNeeded()
Expand Down Expand Up @@ -210,7 +209,7 @@ public void onFailure(Exception replicaException) {
logger.warn(
(org.apache.logging.log4j.util.Supplier<?>)
() -> new ParameterizedMessage("[{}] {}", shard.shardId(), message), replicaException);
replicasProxy.failShard(shard, replicaRequest.primaryTerm(), message, replicaException,
replicasProxy.failShard(shard, message, replicaException,
ReplicationOperation.this::decPendingAndFinishIfNeeded,
ReplicationOperation.this::onPrimaryDemoted,
throwable -> decPendingAndFinishIfNeeded()
Expand Down Expand Up @@ -337,7 +336,6 @@ public interface Primary<
* @return the request to send to the repicas
*/
PrimaryResultT perform(Request request) throws Exception;

}

public interface Replicas<ReplicaRequest extends ReplicationRequest<ReplicaRequest>> {
Expand All @@ -349,34 +347,33 @@ public interface Replicas<ReplicaRequest extends ReplicationRequest<ReplicaReque
* @param replicaRequest operation to peform
* @param listener a callback to call once the operation has been complicated, either successfully or with an error.
*/
void performOn(ShardRouting replica, ReplicaRequest replicaRequest, ActionListener<TransportResponse.Empty> listener);
void performOn(ShardRouting replica, ReplicaRequest replicaRequest,
ActionListener<TransportResponse.Empty> listener);

/**
* Fail the specified shard, removing it from the current set of active shards
* @param replica shard to fail
* @param primaryTerm the primary term of the primary shard when requesting the failure
* @param message a (short) description of the reason
* @param exception the original exception which caused the ReplicationOperation to request the shard to be failed
* @param onSuccess a callback to call when the shard has been successfully removed from the active set.
* @param onPrimaryDemoted a callback to call when the shard can not be failed because the current primary has been demoted
* by the master.
* @param onIgnoredFailure a callback to call when failing a shard has failed, but it that failure can be safely ignored and the
*/
void failShard(ShardRouting replica, long primaryTerm, String message, Exception exception, Runnable onSuccess,
void failShard(ShardRouting replica, String message, Exception exception, Runnable onSuccess,
Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure);

/**
* Marks shard copy as stale, removing its allocation id from the set of in-sync allocation ids.
*
* @param shardId shard id
* @param allocationId allocation id to remove from the set of in-sync allocation ids
* @param primaryTerm the primary term of the primary shard when requesting the failure
* @param onSuccess a callback to call when the allocation id has been successfully removed from the in-sync set.
* @param onPrimaryDemoted a callback to call when the request failed because the current primary was already demoted
* by the master.
* @param onIgnoredFailure a callback to call when the request failed, but the failure can be safely ignored.
*/
void markShardCopyAsStale(ShardId shardId, String allocationId, long primaryTerm, Runnable onSuccess,
void markShardCopyAsStale(ShardId shardId, String allocationId, Runnable onSuccess,
Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure);
}

Expand Down
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.action.support.replication;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.IndicesRequest;
Expand Down Expand Up @@ -55,7 +56,7 @@ public abstract class ReplicationRequest<Request extends ReplicationRequest<Requ
*/
protected ShardId shardId;

long primaryTerm;
private long primaryTerm;

protected TimeValue timeout = DEFAULT_TIMEOUT;
protected String index;
Expand Down Expand Up @@ -170,13 +171,25 @@ long routedBasedOnClusterVersion() {
return routedBasedOnClusterVersion;
}

/** returns the primary term active at the time the operation was performed on the primary shard */
public long primaryTerm() {
/**
* Returns the primary term active at the time the operation was performed on the primary shard.
* This method should not be used anymore, as its functionality has moved to ConcreteShardRequest.
* It is purely here for interoperability with ES v5.5 and below.
*/
@Deprecated
long primaryTerm() {
return primaryTerm;
}

/** marks the primary term in which the operation was performed */
public void primaryTerm(long term) {
/**
* marks the primary term in which the operation was performed
* This method should not be used anymore, as its functionality has moved to ConcreteShardRequest.
* It is purely here for interoperability with ES v5.5 and below.
*/
@Deprecated
void primaryTerm(long term) {
assert term == 0L || primaryTerm == 0L || primaryTerm == term :
"primary term should only be set once on a request, was " + primaryTerm + " now set to " + term;
primaryTerm = term;
}

Expand All @@ -201,7 +214,11 @@ public void readFrom(StreamInput in) throws IOException {
timeout = new TimeValue(in);
index = in.readString();
routedBasedOnClusterVersion = in.readVLong();
primaryTerm = in.readVLong();
if (in.getVersion().before(Version.V_5_6_0_UNRELEASED)) {
primaryTerm = in.readVLong();
} else {
primaryTerm = 0L;
}
}

@Override
Expand All @@ -217,7 +234,9 @@ public void writeTo(StreamOutput out) throws IOException {
timeout.writeTo(out);
out.writeString(index);
out.writeVLong(routedBasedOnClusterVersion);
out.writeVLong(primaryTerm);
if (out.getVersion().before(Version.V_5_6_0_UNRELEASED)) {
out.writeVLong(primaryTerm);
}
}

@Override
Expand Down

0 comments on commit 5fc3f02

Please sign in to comment.