Skip to content

Commit

Permalink
Move primary term from ReplicationRequest to ConcreteShardRequest (#2…
Browse files Browse the repository at this point in the history
…5822)

Removes the primary term from the replication request and pushes it into the transport envelope. This makes it possible to remove the term from the ReplicationOperation universe. The primary term that is to be used for a replication operation is now determined in the reroute phase when the node decides to execute a primary action (and validated once the primary action gets to execute). This makes it possible to validate that the primary action was sent to the correct primary shard instance that it was meant to be sent to (currently we only validate primary actions using the allocation id, which can be reused for failed and reallocated primaries).
  • Loading branch information
ywelsch committed Jul 21, 2017
1 parent 4935bce commit a2624df
Show file tree
Hide file tree
Showing 22 changed files with 170 additions and 219 deletions.
3 changes: 2 additions & 1 deletion build.gradle
Expand Up @@ -159,7 +159,8 @@ task verifyVersions {
* after the backport of the backcompat code is complete.
*/
allprojects {
ext.bwc_tests_enabled = true
// disabled in preparation of merging #25822 & #25824
ext.bwc_tests_enabled = false
}

task verifyBwcTestsEnabled {
Expand Down
Expand Up @@ -464,7 +464,6 @@ public WriteReplicaResult<BulkShardRequest> shardOperationOnReplica(BulkShardReq

public static Translog.Location performOnReplica(BulkShardRequest request, IndexShard replica) throws Exception {
Translog.Location location = null;
final long primaryTerm = request.primaryTerm();
for (int i = 0; i < request.items().length; i++) {
BulkItemRequest item = request.items()[i];
final Engine.Result operationResult;
Expand All @@ -473,7 +472,7 @@ public static Translog.Location performOnReplica(BulkShardRequest request, Index
switch (replicaItemExecutionMode(item, i)) {
case NORMAL:
final DocWriteResponse primaryResponse = item.getPrimaryResponse().getResponse();
operationResult = performOpOnReplica(primaryResponse, docWriteRequest, primaryTerm, replica);
operationResult = performOpOnReplica(primaryResponse, docWriteRequest, replica);
assert operationResult != null : "operation result must never be null when primary response has no failure";
location = syncOperationResultOrThrow(operationResult, location);
break;
Expand All @@ -482,7 +481,7 @@ public static Translog.Location performOnReplica(BulkShardRequest request, Index
case FAILURE:
final BulkItemResponse.Failure failure = item.getPrimaryResponse().getFailure();
assert failure.getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO : "seq no must be assigned";
operationResult = replica.markSeqNoAsNoop(failure.getSeqNo(), primaryTerm, failure.getMessage());
operationResult = replica.markSeqNoAsNoop(failure.getSeqNo(), failure.getMessage());
assert operationResult != null : "operation result must never be null when primary response has no failure";
location = syncOperationResultOrThrow(operationResult, location);
break;
Expand All @@ -501,7 +500,7 @@ public static Translog.Location performOnReplica(BulkShardRequest request, Index
}

private static Engine.Result performOpOnReplica(DocWriteResponse primaryResponse, DocWriteRequest docWriteRequest,
long primaryTerm, IndexShard replica) throws Exception {
IndexShard replica) throws Exception {
switch (docWriteRequest.opType()) {
case CREATE:
case INDEX:
Expand All @@ -511,15 +510,15 @@ private static Engine.Result performOpOnReplica(DocWriteResponse primaryResponse
SourceToParse.source(shardId.getIndexName(),
indexRequest.type(), indexRequest.id(), indexRequest.source(), indexRequest.getContentType())
.routing(indexRequest.routing()).parent(indexRequest.parent());
return replica.applyIndexOperationOnReplica(primaryResponse.getSeqNo(), primaryTerm, primaryResponse.getVersion(),
return replica.applyIndexOperationOnReplica(primaryResponse.getSeqNo(), primaryResponse.getVersion(),
indexRequest.versionType().versionTypeForReplicationAndRecovery(), indexRequest.getAutoGeneratedTimestamp(),
indexRequest.isRetry(), sourceToParse, update -> {
throw new TransportReplicationAction.RetryOnReplicaException(replica.shardId(),
"Mappings are not available on the replica yet, triggered update: " + update);
});
case DELETE:
DeleteRequest deleteRequest = (DeleteRequest) docWriteRequest;
return replica.applyDeleteOperationOnReplica(primaryResponse.getSeqNo(), primaryTerm, primaryResponse.getVersion(),
return replica.applyDeleteOperationOnReplica(primaryResponse.getSeqNo(), primaryResponse.getVersion(),
deleteRequest.type(), deleteRequest.id(), deleteRequest.versionType().versionTypeForReplicationAndRecovery(),
update -> {
throw new TransportReplicationAction.RetryOnReplicaException(replica.shardId(),
Expand Down
Expand Up @@ -223,26 +223,6 @@ public String toString() {
return "delete {[" + index + "][" + type + "][" + id + "]}";
}

/**
* Override this method from ReplicationAction, this is where we are storing our state in the request object (which we really shouldn't
* do). Once the transport client goes away we can move away from making this available, but in the meantime this is dangerous to set or
* use because the DeleteRequest object will always be wrapped in a bulk request envelope, which is where this *should* be set.
*/
@Override
public long primaryTerm() {
throw new UnsupportedOperationException("primary term should never be set on DeleteRequest");
}

/**
* Override this method from ReplicationAction, this is where we are storing our state in the request object (which we really shouldn't
* do). Once the transport client goes away we can move away from making this available, but in the meantime this is dangerous to set or
* use because the DeleteRequest object will always be wrapped in a bulk request envelope, which is where this *should* be set.
*/
@Override
public void primaryTerm(long term) {
throw new UnsupportedOperationException("primary term should never be set on DeleteRequest");
}

/**
* Override this method from ReplicationAction, this is where we are storing our state in the request object (which we really shouldn't
* do). Once the transport client goes away we can move away from making this available, but in the meantime this is dangerous to set or
Expand Down
Expand Up @@ -542,11 +542,7 @@ public void readFrom(StreamInput in) throws IOException {
pipeline = in.readOptionalString();
isRetry = in.readBoolean();
autoGeneratedTimestamp = in.readLong();
if (in.getVersion().onOrAfter(Version.V_5_3_0)) {
contentType = in.readOptionalWriteable(XContentType::readFrom);
} else {
contentType = XContentFactory.xContentType(source);
}
contentType = in.readOptionalWriteable(XContentType::readFrom);
}

@Override
Expand All @@ -565,19 +561,12 @@ public void writeTo(StreamOutput out) throws IOException {
}
out.writeBytesReference(source);
out.writeByte(opType.getId());
// ES versions below 5.1.2 don't know about resolveVersionDefaults but resolve the version eagerly (which messes with validation).
if (out.getVersion().before(Version.V_5_1_2)) {
out.writeLong(resolveVersionDefaults());
} else {
out.writeLong(version);
}
out.writeLong(version);
out.writeByte(versionType.getValue());
out.writeOptionalString(pipeline);
out.writeBoolean(isRetry);
out.writeLong(autoGeneratedTimestamp);
if (out.getVersion().onOrAfter(Version.V_5_3_0)) {
out.writeOptionalWriteable(contentType);
}
out.writeOptionalWriteable(contentType);
}

@Override
Expand Down Expand Up @@ -617,26 +606,6 @@ public long getAutoGeneratedTimestamp() {
return autoGeneratedTimestamp;
}

/**
* Override this method from ReplicationAction, this is where we are storing our state in the request object (which we really shouldn't
* do). Once the transport client goes away we can move away from making this available, but in the meantime this is dangerous to set or
* use because the IndexRequest object will always be wrapped in a bulk request envelope, which is where this *should* be set.
*/
@Override
public long primaryTerm() {
throw new UnsupportedOperationException("primary term should never be set on IndexRequest");
}

/**
* Override this method from ReplicationAction, this is where we are storing our state in the request object (which we really shouldn't
* do). Once the transport client goes away we can move away from making this available, but in the meantime this is dangerous to set or
* use because the IndexRequest object will always be wrapped in a bulk request envelope, which is where this *should* be set.
*/
@Override
public void primaryTerm(long term) {
throw new UnsupportedOperationException("primary term should never be set on IndexRequest");
}

/**
* Override this method from ReplicationAction, this is where we are storing our state in the request object (which we really shouldn't
* do). Once the transport client goes away we can move away from making this available, but in the meantime this is dangerous to set or
Expand Down
Expand Up @@ -80,9 +80,9 @@ protected ResyncReplicationResponse newResponseInstance() {
}

@Override
protected ReplicationOperation.Replicas newReplicasProxy() {
protected ReplicationOperation.Replicas newReplicasProxy(long primaryTerm) {
// We treat the resync as best-effort for now and don't mark unavailable shard copies as stale.
return new ReplicasProxy();
return new ReplicasProxy(primaryTerm);
}

@Override
Expand Down Expand Up @@ -135,13 +135,13 @@ public static Translog.Location performOnReplica(ResyncReplicationRequest reques
}

@Override
public void sync(ResyncReplicationRequest request, Task parentTask, String primaryAllocationId,
public void sync(ResyncReplicationRequest request, Task parentTask, String primaryAllocationId, long primaryTerm,
ActionListener<ResyncReplicationResponse> listener) {
// skip reroute phase
transportService.sendChildRequest(
clusterService.localNode(),
transportPrimaryAction,
new ConcreteShardRequest<>(request, primaryAllocationId),
new ConcreteShardRequest<>(request, primaryAllocationId, primaryTerm),
parentTask,
transportOptions,
new TransportResponseHandler<ResyncReplicationResponse>() {
Expand Down
Expand Up @@ -27,7 +27,6 @@
import org.elasticsearch.action.UnavailableShardsException;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.cluster.routing.AllocationId;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
Expand All @@ -42,13 +41,10 @@
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class ReplicationOperation<
Request extends ReplicationRequest<Request>,
Expand Down Expand Up @@ -108,7 +104,6 @@ public void execute() throws Exception {
primary.updateLocalCheckpointForShard(primaryRouting.allocationId().getId(), primary.localCheckpoint());
final ReplicaRequest replicaRequest = primaryResult.replicaRequest();
if (replicaRequest != null) {
assert replicaRequest.primaryTerm() > 0 : "replicaRequest doesn't have a primary term";
if (logger.isTraceEnabled()) {
logger.trace("[{}] op [{}] completed on primary for request [{}]", primaryId, opType, request);
}
Expand Down Expand Up @@ -136,7 +131,7 @@ private void markUnavailableShardsAsStale(ReplicaRequest replicaRequest, Set<Str
for (String allocationId : Sets.difference(inSyncAllocationIds, indexShardRoutingTable.getAllAllocationIds())) {
// mark copy as stale
pendingActions.incrementAndGet();
replicasProxy.markShardCopyAsStaleIfNeeded(replicaRequest.shardId(), allocationId, replicaRequest.primaryTerm(),
replicasProxy.markShardCopyAsStaleIfNeeded(replicaRequest.shardId(), allocationId,
ReplicationOperation.this::decPendingAndFinishIfNeeded,
ReplicationOperation.this::onPrimaryDemoted,
throwable -> decPendingAndFinishIfNeeded()
Expand Down Expand Up @@ -205,7 +200,7 @@ public void onFailure(Exception replicaException) {
shardReplicaFailures.add(new ReplicationResponse.ShardInfo.Failure(
shard.shardId(), shard.currentNodeId(), replicaException, restStatus, false));
String message = String.format(Locale.ROOT, "failed to perform %s on replica %s", opType, shard);
replicasProxy.failShardIfNeeded(shard, replicaRequest.primaryTerm(), message,
replicasProxy.failShardIfNeeded(shard, message,
replicaException, ReplicationOperation.this::decPendingAndFinishIfNeeded,
ReplicationOperation.this::onPrimaryDemoted, throwable -> decPendingAndFinishIfNeeded());
}
Expand Down Expand Up @@ -363,15 +358,14 @@ public interface Replicas<RequestT extends ReplicationRequest<RequestT>> {
* implementation.
*
* @param replica shard to fail
* @param primaryTerm the primary term of the primary shard when requesting the failure
* @param message a (short) description of the reason
* @param exception the original exception which caused the ReplicationOperation to request the shard to be failed
* @param onSuccess a callback to call when the shard has been successfully removed from the active set.
* @param onPrimaryDemoted a callback to call when the shard can not be failed because the current primary has been demoted
* by the master.
* @param onIgnoredFailure a callback to call when failing a shard has failed, but it that failure can be safely ignored and the
*/
void failShardIfNeeded(ShardRouting replica, long primaryTerm, String message, Exception exception, Runnable onSuccess,
void failShardIfNeeded(ShardRouting replica, String message, Exception exception, Runnable onSuccess,
Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure);

/**
Expand All @@ -381,13 +375,12 @@ void failShardIfNeeded(ShardRouting replica, long primaryTerm, String message, E
*
* @param shardId shard id
* @param allocationId allocation id to remove from the set of in-sync allocation ids
* @param primaryTerm the primary term of the primary shard when requesting the failure
* @param onSuccess a callback to call when the allocation id has been successfully removed from the in-sync set.
* @param onPrimaryDemoted a callback to call when the request failed because the current primary was already demoted
* by the master.
* @param onIgnoredFailure a callback to call when the request failed, but the failure can be safely ignored.
*/
void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, long primaryTerm, Runnable onSuccess,
void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, Runnable onSuccess,
Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure);
}

Expand Down
Expand Up @@ -55,8 +55,6 @@ public abstract class ReplicationRequest<Request extends ReplicationRequest<Requ
*/
protected ShardId shardId;

long primaryTerm;

protected TimeValue timeout = DEFAULT_TIMEOUT;
protected String index;

Expand Down Expand Up @@ -170,16 +168,6 @@ long routedBasedOnClusterVersion() {
return routedBasedOnClusterVersion;
}

/** returns the primary term active at the time the operation was performed on the primary shard */
public long primaryTerm() {
return primaryTerm;
}

/** marks the primary term in which the operation was performed */
public void primaryTerm(long term) {
primaryTerm = term;
}

@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
Expand All @@ -201,7 +189,6 @@ public void readFrom(StreamInput in) throws IOException {
timeout = new TimeValue(in);
index = in.readString();
routedBasedOnClusterVersion = in.readVLong();
primaryTerm = in.readVLong();
}

@Override
Expand All @@ -217,7 +204,6 @@ public void writeTo(StreamOutput out) throws IOException {
timeout.writeTo(out);
out.writeString(index);
out.writeVLong(routedBasedOnClusterVersion);
out.writeVLong(primaryTerm);
}

@Override
Expand Down

0 comments on commit a2624df

Please sign in to comment.