Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move primary term from ReplicationRequest to ConcreteShardRequest #25822

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion build.gradle
Expand Up @@ -159,7 +159,8 @@ task verifyVersions {
* after the backport of the backcompat code is complete.
*/
allprojects {
ext.bwc_tests_enabled = true
// disabled in preparation of merging #25822 & #25824
ext.bwc_tests_enabled = false
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only do not forget to reenable (although we do have a daily CI check that will fail if it's not enabled).

}

task verifyBwcTestsEnabled {
Expand Down
Expand Up @@ -464,7 +464,6 @@ public WriteReplicaResult<BulkShardRequest> shardOperationOnReplica(BulkShardReq

public static Translog.Location performOnReplica(BulkShardRequest request, IndexShard replica) throws Exception {
Translog.Location location = null;
final long primaryTerm = request.primaryTerm();
for (int i = 0; i < request.items().length; i++) {
BulkItemRequest item = request.items()[i];
final Engine.Result operationResult;
Expand All @@ -473,7 +472,7 @@ public static Translog.Location performOnReplica(BulkShardRequest request, Index
switch (replicaItemExecutionMode(item, i)) {
case NORMAL:
final DocWriteResponse primaryResponse = item.getPrimaryResponse().getResponse();
operationResult = performOpOnReplica(primaryResponse, docWriteRequest, primaryTerm, replica);
operationResult = performOpOnReplica(primaryResponse, docWriteRequest, replica);
assert operationResult != null : "operation result must never be null when primary response has no failure";
location = syncOperationResultOrThrow(operationResult, location);
break;
Expand All @@ -482,7 +481,7 @@ public static Translog.Location performOnReplica(BulkShardRequest request, Index
case FAILURE:
final BulkItemResponse.Failure failure = item.getPrimaryResponse().getFailure();
assert failure.getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO : "seq no must be assigned";
operationResult = replica.markSeqNoAsNoop(failure.getSeqNo(), primaryTerm, failure.getMessage());
operationResult = replica.markSeqNoAsNoop(failure.getSeqNo(), failure.getMessage());
assert operationResult != null : "operation result must never be null when primary response has no failure";
location = syncOperationResultOrThrow(operationResult, location);
break;
Expand All @@ -501,7 +500,7 @@ public static Translog.Location performOnReplica(BulkShardRequest request, Index
}

private static Engine.Result performOpOnReplica(DocWriteResponse primaryResponse, DocWriteRequest docWriteRequest,
long primaryTerm, IndexShard replica) throws Exception {
IndexShard replica) throws Exception {
switch (docWriteRequest.opType()) {
case CREATE:
case INDEX:
Expand All @@ -511,15 +510,15 @@ private static Engine.Result performOpOnReplica(DocWriteResponse primaryResponse
SourceToParse.source(shardId.getIndexName(),
indexRequest.type(), indexRequest.id(), indexRequest.source(), indexRequest.getContentType())
.routing(indexRequest.routing()).parent(indexRequest.parent());
return replica.applyIndexOperationOnReplica(primaryResponse.getSeqNo(), primaryTerm, primaryResponse.getVersion(),
return replica.applyIndexOperationOnReplica(primaryResponse.getSeqNo(), primaryResponse.getVersion(),
indexRequest.versionType().versionTypeForReplicationAndRecovery(), indexRequest.getAutoGeneratedTimestamp(),
indexRequest.isRetry(), sourceToParse, update -> {
throw new TransportReplicationAction.RetryOnReplicaException(replica.shardId(),
"Mappings are not available on the replica yet, triggered update: " + update);
});
case DELETE:
DeleteRequest deleteRequest = (DeleteRequest) docWriteRequest;
return replica.applyDeleteOperationOnReplica(primaryResponse.getSeqNo(), primaryTerm, primaryResponse.getVersion(),
return replica.applyDeleteOperationOnReplica(primaryResponse.getSeqNo(), primaryResponse.getVersion(),
deleteRequest.type(), deleteRequest.id(), deleteRequest.versionType().versionTypeForReplicationAndRecovery(),
update -> {
throw new TransportReplicationAction.RetryOnReplicaException(replica.shardId(),
Expand Down
Expand Up @@ -223,26 +223,6 @@ public String toString() {
return "delete {[" + index + "][" + type + "][" + id + "]}";
}

/**
* Override this method from ReplicationAction, this is where we are storing our state in the request object (which we really shouldn't
* do). Once the transport client goes away we can move away from making this available, but in the meantime this is dangerous to set or
* use because the DeleteRequest object will always be wrapped in a bulk request envelope, which is where this *should* be set.
*/
@Override
public long primaryTerm() {
throw new UnsupportedOperationException("primary term should never be set on DeleteRequest");
}

/**
* Override this method from ReplicationAction, this is where we are storing our state in the request object (which we really shouldn't
* do). Once the transport client goes away we can move away from making this available, but in the meantime this is dangerous to set or
* use because the DeleteRequest object will always be wrapped in a bulk request envelope, which is where this *should* be set.
*/
@Override
public void primaryTerm(long term) {
throw new UnsupportedOperationException("primary term should never be set on DeleteRequest");
}

/**
* Override this method from ReplicationAction, this is where we are storing our state in the request object (which we really shouldn't
* do). Once the transport client goes away we can move away from making this available, but in the meantime this is dangerous to set or
Expand Down
Expand Up @@ -542,11 +542,7 @@ public void readFrom(StreamInput in) throws IOException {
pipeline = in.readOptionalString();
isRetry = in.readBoolean();
autoGeneratedTimestamp = in.readLong();
if (in.getVersion().onOrAfter(Version.V_5_3_0)) {
contentType = in.readOptionalWriteable(XContentType::readFrom);
} else {
contentType = XContentFactory.xContentType(source);
}
contentType = in.readOptionalWriteable(XContentType::readFrom);
}

@Override
Expand All @@ -565,19 +561,12 @@ public void writeTo(StreamOutput out) throws IOException {
}
out.writeBytesReference(source);
out.writeByte(opType.getId());
// ES versions below 5.1.2 don't know about resolveVersionDefaults but resolve the version eagerly (which messes with validation).
if (out.getVersion().before(Version.V_5_1_2)) {
out.writeLong(resolveVersionDefaults());
} else {
out.writeLong(version);
}
out.writeLong(version);
out.writeByte(versionType.getValue());
out.writeOptionalString(pipeline);
out.writeBoolean(isRetry);
out.writeLong(autoGeneratedTimestamp);
if (out.getVersion().onOrAfter(Version.V_5_3_0)) {
out.writeOptionalWriteable(contentType);
}
out.writeOptionalWriteable(contentType);
}

@Override
Expand Down Expand Up @@ -617,26 +606,6 @@ public long getAutoGeneratedTimestamp() {
return autoGeneratedTimestamp;
}

/**
* Override this method from ReplicationAction, this is where we are storing our state in the request object (which we really shouldn't
* do). Once the transport client goes away we can move away from making this available, but in the meantime this is dangerous to set or
* use because the IndexRequest object will always be wrapped in a bulk request envelope, which is where this *should* be set.
*/
@Override
public long primaryTerm() {
throw new UnsupportedOperationException("primary term should never be set on IndexRequest");
}

/**
* Override this method from ReplicationAction, this is where we are storing our state in the request object (which we really shouldn't
* do). Once the transport client goes away we can move away from making this available, but in the meantime this is dangerous to set or
* use because the IndexRequest object will always be wrapped in a bulk request envelope, which is where this *should* be set.
*/
@Override
public void primaryTerm(long term) {
throw new UnsupportedOperationException("primary term should never be set on IndexRequest");
}

/**
* Override this method from ReplicationAction, this is where we are storing our state in the request object (which we really shouldn't
* do). Once the transport client goes away we can move away from making this available, but in the meantime this is dangerous to set or
Expand Down
Expand Up @@ -80,9 +80,9 @@ protected ResyncReplicationResponse newResponseInstance() {
}

@Override
protected ReplicationOperation.Replicas newReplicasProxy() {
protected ReplicationOperation.Replicas newReplicasProxy(long primaryTerm) {
// We treat the resync as best-effort for now and don't mark unavailable shard copies as stale.
return new ReplicasProxy();
return new ReplicasProxy(primaryTerm);
}

@Override
Expand Down Expand Up @@ -135,13 +135,13 @@ public static Translog.Location performOnReplica(ResyncReplicationRequest reques
}

@Override
public void sync(ResyncReplicationRequest request, Task parentTask, String primaryAllocationId,
public void sync(ResyncReplicationRequest request, Task parentTask, String primaryAllocationId, long primaryTerm,
ActionListener<ResyncReplicationResponse> listener) {
// skip reroute phase
transportService.sendChildRequest(
clusterService.localNode(),
transportPrimaryAction,
new ConcreteShardRequest<>(request, primaryAllocationId),
new ConcreteShardRequest<>(request, primaryAllocationId, primaryTerm),
parentTask,
transportOptions,
new TransportResponseHandler<ResyncReplicationResponse>() {
Expand Down
Expand Up @@ -27,7 +27,6 @@
import org.elasticsearch.action.UnavailableShardsException;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.cluster.routing.AllocationId;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
Expand All @@ -42,13 +41,10 @@
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class ReplicationOperation<
Request extends ReplicationRequest<Request>,
Expand Down Expand Up @@ -108,7 +104,6 @@ public void execute() throws Exception {
primary.updateLocalCheckpointForShard(primaryRouting.allocationId().getId(), primary.localCheckpoint());
final ReplicaRequest replicaRequest = primaryResult.replicaRequest();
if (replicaRequest != null) {
assert replicaRequest.primaryTerm() > 0 : "replicaRequest doesn't have a primary term";
if (logger.isTraceEnabled()) {
logger.trace("[{}] op [{}] completed on primary for request [{}]", primaryId, opType, request);
}
Expand Down Expand Up @@ -136,7 +131,7 @@ private void markUnavailableShardsAsStale(ReplicaRequest replicaRequest, Set<Str
for (String allocationId : Sets.difference(inSyncAllocationIds, indexShardRoutingTable.getAllAllocationIds())) {
// mark copy as stale
pendingActions.incrementAndGet();
replicasProxy.markShardCopyAsStaleIfNeeded(replicaRequest.shardId(), allocationId, replicaRequest.primaryTerm(),
replicasProxy.markShardCopyAsStaleIfNeeded(replicaRequest.shardId(), allocationId,
ReplicationOperation.this::decPendingAndFinishIfNeeded,
ReplicationOperation.this::onPrimaryDemoted,
throwable -> decPendingAndFinishIfNeeded()
Expand Down Expand Up @@ -205,7 +200,7 @@ public void onFailure(Exception replicaException) {
shardReplicaFailures.add(new ReplicationResponse.ShardInfo.Failure(
shard.shardId(), shard.currentNodeId(), replicaException, restStatus, false));
String message = String.format(Locale.ROOT, "failed to perform %s on replica %s", opType, shard);
replicasProxy.failShardIfNeeded(shard, replicaRequest.primaryTerm(), message,
replicasProxy.failShardIfNeeded(shard, message,
replicaException, ReplicationOperation.this::decPendingAndFinishIfNeeded,
ReplicationOperation.this::onPrimaryDemoted, throwable -> decPendingAndFinishIfNeeded());
}
Expand Down Expand Up @@ -363,15 +358,14 @@ public interface Replicas<RequestT extends ReplicationRequest<RequestT>> {
* implementation.
*
* @param replica shard to fail
* @param primaryTerm the primary term of the primary shard when requesting the failure
* @param message a (short) description of the reason
* @param exception the original exception which caused the ReplicationOperation to request the shard to be failed
* @param onSuccess a callback to call when the shard has been successfully removed from the active set.
* @param onPrimaryDemoted a callback to call when the shard can not be failed because the current primary has been demoted
* by the master.
* @param onIgnoredFailure a callback to call when failing a shard has failed, but it that failure can be safely ignored and the
*/
void failShardIfNeeded(ShardRouting replica, long primaryTerm, String message, Exception exception, Runnable onSuccess,
void failShardIfNeeded(ShardRouting replica, String message, Exception exception, Runnable onSuccess,
Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure);

/**
Expand All @@ -381,13 +375,12 @@ void failShardIfNeeded(ShardRouting replica, long primaryTerm, String message, E
*
* @param shardId shard id
* @param allocationId allocation id to remove from the set of in-sync allocation ids
* @param primaryTerm the primary term of the primary shard when requesting the failure
* @param onSuccess a callback to call when the allocation id has been successfully removed from the in-sync set.
* @param onPrimaryDemoted a callback to call when the request failed because the current primary was already demoted
* by the master.
* @param onIgnoredFailure a callback to call when the request failed, but the failure can be safely ignored.
*/
void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, long primaryTerm, Runnable onSuccess,
void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, Runnable onSuccess,
Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure);
}

Expand Down
Expand Up @@ -55,8 +55,6 @@ public abstract class ReplicationRequest<Request extends ReplicationRequest<Requ
*/
protected ShardId shardId;

long primaryTerm;

protected TimeValue timeout = DEFAULT_TIMEOUT;
protected String index;

Expand Down Expand Up @@ -170,16 +168,6 @@ long routedBasedOnClusterVersion() {
return routedBasedOnClusterVersion;
}

/** returns the primary term active at the time the operation was performed on the primary shard */
public long primaryTerm() {
return primaryTerm;
}

/** marks the primary term in which the operation was performed */
public void primaryTerm(long term) {
primaryTerm = term;
}

@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
Expand All @@ -201,7 +189,6 @@ public void readFrom(StreamInput in) throws IOException {
timeout = new TimeValue(in);
index = in.readString();
routedBasedOnClusterVersion = in.readVLong();
primaryTerm = in.readVLong();
}

@Override
Expand All @@ -217,7 +204,6 @@ public void writeTo(StreamOutput out) throws IOException {
timeout.writeTo(out);
out.writeString(index);
out.writeVLong(routedBasedOnClusterVersion);
out.writeVLong(primaryTerm);
}

@Override
Expand Down