From 2ff19bc1b7adc2e886ebf88f18f227f7e0e33562 Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 5 Apr 2019 19:10:10 +0100 Subject: [PATCH] Use Writeable for TransportReplAction derivatives (#40905) Relates #34389, backport of #40894. --- .../elasticsearch/action/DocWriteRequest.java | 8 +- ...TransportVerifyShardBeforeCloseAction.java | 11 +-- .../admin/indices/flush/FlushRequest.java | 10 ++- .../indices/flush/ShardFlushRequest.java | 9 +- .../flush/TransportShardFlushAction.java | 2 +- .../admin/indices/refresh/RefreshRequest.java | 7 ++ .../action/bulk/BulkShardRequest.java | 21 ++--- .../TransportSingleItemBulkWriteAction.java | 7 +- .../action/delete/DeleteRequest.java | 54 ++++++------ .../action/index/IndexRequest.java | 82 +++++++++---------- .../resync/ResyncReplicationRequest.java | 54 ++++++------ .../support/broadcast/BroadcastRequest.java | 6 ++ .../replication/BasicReplicationRequest.java | 10 ++- .../replication/ReplicatedWriteRequest.java | 12 +-- .../replication/ReplicationRequest.java | 50 ++++------- .../TransportBroadcastReplicationAction.java | 7 +- .../TransportReplicationAction.java | 78 ++++++++---------- .../replication/TransportWriteAction.java | 6 +- .../action/update/UpdateRequest.java | 6 +- .../index/reindex/ReindexRequest.java | 3 +- .../seqno/GlobalCheckpointSyncAction.java | 5 +- .../RetentionLeaseBackgroundSyncAction.java | 10 +-- .../index/seqno/RetentionLeaseSyncAction.java | 10 +-- .../action/index/IndexRequestTests.java | 13 ++- .../resync/ResyncReplicationRequestTests.java | 3 +- .../BroadcastReplicationTests.java | 17 ++-- .../ReplicationOperationTests.java | 6 +- .../TransportReplicationActionTests.java | 35 ++++---- ...ReplicationAllPermitsAcquisitionTests.java | 10 ++- .../TransportWriteActionTests.java | 8 +- .../ESIndexLevelReplicationTestCase.java | 6 +- .../bulk/BulkShardOperationsRequest.java | 19 +++-- 32 files changed, 297 insertions(+), 288 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/DocWriteRequest.java b/server/src/main/java/org/elasticsearch/action/DocWriteRequest.java index 373dfaa5c7416..61328a78df69c 100644 --- a/server/src/main/java/org/elasticsearch/action/DocWriteRequest.java +++ b/server/src/main/java/org/elasticsearch/action/DocWriteRequest.java @@ -222,13 +222,9 @@ static DocWriteRequest readDocumentRequest(StreamInput in) throws IOException byte type = in.readByte(); DocWriteRequest docWriteRequest; if (type == 0) { - IndexRequest indexRequest = new IndexRequest(); - indexRequest.readFrom(in); - docWriteRequest = indexRequest; + docWriteRequest = new IndexRequest(in); } else if (type == 1) { - DeleteRequest deleteRequest = new DeleteRequest(); - deleteRequest.readFrom(in); - docWriteRequest = deleteRequest; + docWriteRequest = new DeleteRequest(in); } else if (type == 2) { UpdateRequest updateRequest = new UpdateRequest(); updateRequest.readFrom(in); diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java index cba01a3e9f827..e0cddcb0acf2b 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java @@ -136,9 +136,11 @@ public void markShardCopyAsStaleIfNeeded(final ShardId shardId, final String all public static class ShardRequest extends ReplicationRequest { - private ClusterBlock clusterBlock; + private final ClusterBlock clusterBlock; - ShardRequest(){ + ShardRequest(StreamInput in) throws IOException { + super(in); + clusterBlock = new ClusterBlock(in); } public ShardRequest(final ShardId shardId, final ClusterBlock clusterBlock, final TaskId parentTaskId) { @@ -153,9 +155,8 @@ public String toString() { } @Override - public void readFrom(final StreamInput in) throws IOException { - super.readFrom(in); - clusterBlock = new ClusterBlock(in); + public void readFrom(final StreamInput in) { + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/flush/FlushRequest.java b/server/src/main/java/org/elasticsearch/action/admin/indices/flush/FlushRequest.java index 7f31890339c71..a6a72b92ce75b 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/flush/FlushRequest.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/flush/FlushRequest.java @@ -52,6 +52,12 @@ public FlushRequest(String... indices) { super(indices); } + public FlushRequest(StreamInput in) throws IOException { + super(in); + force = in.readBoolean(); + waitIfOngoing = in.readBoolean(); + } + /** * Returns {@code true} iff a flush should block * if a another flush operation is already running. Otherwise {@code false} @@ -103,9 +109,7 @@ public void writeTo(StreamOutput out) throws IOException { @Override public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - force = in.readBoolean(); - waitIfOngoing = in.readBoolean(); + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/flush/ShardFlushRequest.java b/server/src/main/java/org/elasticsearch/action/admin/indices/flush/ShardFlushRequest.java index ac32b16eb5711..8bd3597eba9bc 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/flush/ShardFlushRequest.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/flush/ShardFlushRequest.java @@ -29,7 +29,7 @@ public class ShardFlushRequest extends ReplicationRequest { - private FlushRequest request = new FlushRequest(); + private final FlushRequest request; public ShardFlushRequest(FlushRequest request, ShardId shardId) { super(shardId); @@ -37,7 +37,9 @@ public ShardFlushRequest(FlushRequest request, ShardId shardId) { this.waitForActiveShards = ActiveShardCount.NONE; // don't wait for any active shards before proceeding, by default } - public ShardFlushRequest() { + public ShardFlushRequest(StreamInput in) throws IOException { + super(in); + request = new FlushRequest(in); } FlushRequest getRequest() { @@ -46,8 +48,7 @@ FlushRequest getRequest() { @Override public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - request.readFrom(in); + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java index 344a817fa8b83..63424844d7d76 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java @@ -55,7 +55,7 @@ protected PrimaryResult shardOperationOn IndexShard primary) { primary.flush(shardRequest.getRequest()); logger.trace("{} flush request executed on primary", primary.shardId()); - return new PrimaryResult(shardRequest, new ReplicationResponse()); + return new PrimaryResult<>(shardRequest, new ReplicationResponse()); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/RefreshRequest.java b/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/RefreshRequest.java index 20687b8e53418..991184508fa55 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/RefreshRequest.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/RefreshRequest.java @@ -20,6 +20,9 @@ package org.elasticsearch.action.admin.indices.refresh; import org.elasticsearch.action.support.broadcast.BroadcastRequest; +import org.elasticsearch.common.io.stream.StreamInput; + +import java.io.IOException; /** * A refresh request making all operations performed since the last refresh available for search. The (near) real-time @@ -35,4 +38,8 @@ public class RefreshRequest extends BroadcastRequest { public RefreshRequest(String... indices) { super(indices); } + + public RefreshRequest(StreamInput in) throws IOException { + super(in); + } } diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java index 2f9a130eb82fd..1fe763b48fdde 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java @@ -33,7 +33,14 @@ public class BulkShardRequest extends ReplicatedWriteRequest { private BulkItemRequest[] items; - public BulkShardRequest() { + public BulkShardRequest(StreamInput in) throws IOException { + super(in); + items = new BulkItemRequest[in.readVInt()]; + for (int i = 0; i < items.length; i++) { + if (in.readBoolean()) { + items[i] = BulkItemRequest.readBulkItem(in); + } + } } public BulkShardRequest(ShardId shardId, RefreshPolicy refreshPolicy, BulkItemRequest[] items) { @@ -60,7 +67,7 @@ public String[] indices() { indices.add(item.index()); } } - return indices.toArray(new String[indices.size()]); + return indices.toArray(new String[0]); } @Override @@ -78,14 +85,8 @@ public void writeTo(StreamOutput out) throws IOException { } @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - items = new BulkItemRequest[in.readVInt()]; - for (int i = 0; i < items.length; i++) { - if (in.readBoolean()) { - items[i] = BulkItemRequest.readBulkItem(in); - } - } + public void readFrom(StreamInput in) { + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportSingleItemBulkWriteAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportSingleItemBulkWriteAction.java index cc97b6237e30c..c080006b19d77 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportSingleItemBulkWriteAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportSingleItemBulkWriteAction.java @@ -28,11 +28,10 @@ import org.elasticsearch.action.support.WriteResponse; import org.elasticsearch.action.support.replication.ReplicatedWriteRequest; import org.elasticsearch.action.support.replication.ReplicationResponse; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.tasks.Task; import org.elasticsearch.transport.TransportService; -import java.util.function.Supplier; - /** use transport bulk action directly */ @Deprecated public abstract class TransportSingleItemBulkWriteAction< @@ -43,8 +42,8 @@ public abstract class TransportSingleItemBulkWriteAction< private final TransportBulkAction bulkAction; protected TransportSingleItemBulkWriteAction(String actionName, TransportService transportService, ActionFilters actionFilters, - Supplier request, TransportBulkAction bulkAction) { - super(actionName, transportService, actionFilters, request); + Writeable.Reader requestReader, TransportBulkAction bulkAction) { + super(actionName, transportService, actionFilters, requestReader); this.bulkAction = bulkAction; } diff --git a/server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java b/server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java index a033bf3cb000f..115a7bdd7428e 100644 --- a/server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java +++ b/server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java @@ -53,6 +53,8 @@ public class DeleteRequest extends ReplicatedWriteRequest implements DocWriteRequest, CompositeIndicesRequest { + private static final ShardId NO_SHARD_ID = null; + // Set to null initially so we can know to override in bulk requests that have a default type. private String type; private String id; @@ -63,7 +65,27 @@ public class DeleteRequest extends ReplicatedWriteRequest private long ifSeqNo = UNASSIGNED_SEQ_NO; private long ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM; + public DeleteRequest(StreamInput in) throws IOException { + super(in); + type = in.readString(); + id = in.readString(); + routing = in.readOptionalString(); + if (in.getVersion().before(Version.V_7_0_0)) { + in.readOptionalString(); // _parent + } + version = in.readLong(); + versionType = VersionType.fromValue(in.readByte()); + if (in.getVersion().onOrAfter(Version.V_6_6_0)) { + ifSeqNo = in.readZLong(); + ifPrimaryTerm = in.readVLong(); + } else { + ifSeqNo = UNASSIGNED_SEQ_NO; + ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM; + } + } + public DeleteRequest() { + super(NO_SHARD_ID); } /** @@ -71,6 +93,7 @@ public DeleteRequest() { * must be set. */ public DeleteRequest(String index) { + super(NO_SHARD_ID); this.index = index; } @@ -85,6 +108,7 @@ public DeleteRequest(String index) { */ @Deprecated public DeleteRequest(String index, String type, String id) { + super(NO_SHARD_ID); this.index = index; this.type = type; this.id = id; @@ -97,6 +121,7 @@ public DeleteRequest(String index, String type, String id) { * @param id The id of the document */ public DeleteRequest(String index, String id) { + super(NO_SHARD_ID); this.index = index; this.id = id; } @@ -274,23 +299,8 @@ public OpType opType() { } @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - type = in.readString(); - id = in.readString(); - routing = in.readOptionalString(); - if (in.getVersion().before(Version.V_7_0_0)) { - in.readOptionalString(); // _parent - } - version = in.readLong(); - versionType = VersionType.fromValue(in.readByte()); - if (in.getVersion().onOrAfter(Version.V_6_6_0)) { - ifSeqNo = in.readZLong(); - ifPrimaryTerm = in.readVLong(); - } else { - ifSeqNo = UNASSIGNED_SEQ_NO; - ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM; - } + public void readFrom(StreamInput in) { + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); } @Override @@ -321,14 +331,4 @@ public void writeTo(StreamOutput out) throws IOException { public String toString() { return "delete {[" + index + "][" + type() + "][" + id + "]}"; } - - /** - * Override this method from ReplicationAction, this is where we are storing our state in the request object (which we really shouldn't - * do). Once the transport client goes away we can move away from making this available, but in the meantime this is dangerous to set or - * use because the DeleteRequest object will always be wrapped in a bulk request envelope, which is where this *should* be set. - */ - @Override - public DeleteRequest setShardId(ShardId shardId) { - throw new UnsupportedOperationException("shard id should never be set on DeleteRequest"); - } } diff --git a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java index 37d960831776d..6d26eccca67bf 100644 --- a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java +++ b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java @@ -83,6 +83,8 @@ public class IndexRequest extends ReplicatedWriteRequest implement */ static final int MAX_SOURCE_LENGTH_IN_TOSTRING = 2048; + private static final ShardId NO_SHARD_ID = null; + // Set to null initially so we can know to override in bulk requests that have a default type. private String type; private String id; @@ -112,8 +114,41 @@ public class IndexRequest extends ReplicatedWriteRequest implement private long ifSeqNo = UNASSIGNED_SEQ_NO; private long ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM; + public IndexRequest(StreamInput in) throws IOException { + super(in); + type = in.readOptionalString(); + id = in.readOptionalString(); + routing = in.readOptionalString(); + if (in.getVersion().before(Version.V_7_0_0)) { + in.readOptionalString(); // _parent + } + if (in.getVersion().before(Version.V_6_0_0_alpha1)) { + in.readOptionalString(); // timestamp + in.readOptionalTimeValue(); // ttl + } + source = in.readBytesReference(); + opType = OpType.fromId(in.readByte()); + version = in.readLong(); + versionType = VersionType.fromValue(in.readByte()); + pipeline = in.readOptionalString(); + isRetry = in.readBoolean(); + autoGeneratedTimestamp = in.readLong(); + if (in.readBoolean()) { + contentType = in.readEnum(XContentType.class); + } else { + contentType = null; + } + if (in.getVersion().onOrAfter(Version.V_6_6_0)) { + ifSeqNo = in.readZLong(); + ifPrimaryTerm = in.readVLong(); + } else { + ifSeqNo = UNASSIGNED_SEQ_NO; + ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM; + } + } public IndexRequest() { + super(NO_SHARD_ID); } /** @@ -121,6 +156,7 @@ public IndexRequest() { * {@link #source(byte[], XContentType)} must be set. */ public IndexRequest(String index) { + super(NO_SHARD_ID); this.index = index; } @@ -131,6 +167,7 @@ public IndexRequest(String index) { */ @Deprecated public IndexRequest(String index, String type) { + super(NO_SHARD_ID); this.index = index; this.type = type; } @@ -146,6 +183,7 @@ public IndexRequest(String index, String type) { */ @Deprecated public IndexRequest(String index, String type, String id) { + super(NO_SHARD_ID); this.index = index; this.type = type; this.id = id; @@ -593,37 +631,8 @@ public void resolveRouting(MetaData metaData) { } @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - type = in.readOptionalString(); - id = in.readOptionalString(); - routing = in.readOptionalString(); - if (in.getVersion().before(Version.V_7_0_0)) { - in.readOptionalString(); // _parent - } - if (in.getVersion().before(Version.V_6_0_0_alpha1)) { - in.readOptionalString(); // timestamp - in.readOptionalTimeValue(); // ttl - } - source = in.readBytesReference(); - opType = OpType.fromId(in.readByte()); - version = in.readLong(); - versionType = VersionType.fromValue(in.readByte()); - pipeline = in.readOptionalString(); - isRetry = in.readBoolean(); - autoGeneratedTimestamp = in.readLong(); - if (in.readBoolean()) { - contentType = in.readEnum(XContentType.class); - } else { - contentType = null; - } - if (in.getVersion().onOrAfter(Version.V_6_6_0)) { - ifSeqNo = in.readZLong(); - ifPrimaryTerm = in.readVLong(); - } else { - ifSeqNo = UNASSIGNED_SEQ_NO; - ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM; - } + public void readFrom(StreamInput in) { + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); } @Override @@ -704,15 +713,4 @@ public void onRetry() { public long getAutoGeneratedTimestamp() { return autoGeneratedTimestamp; } - - /** - * Override this method from ReplicationAction, this is where we are storing our state in the request object (which we really shouldn't - * do). Once the transport client goes away we can move away from making this available, but in the meantime this is dangerous to set or - * use because the IndexRequest object will always be wrapped in a bulk request envelope, which is where this *should* be set. - */ - @Override - public IndexRequest setShardId(ShardId shardId) { - throw new UnsupportedOperationException("shard id should never be set on IndexRequest"); - } - } diff --git a/server/src/main/java/org/elasticsearch/action/resync/ResyncReplicationRequest.java b/server/src/main/java/org/elasticsearch/action/resync/ResyncReplicationRequest.java index a53766af7cf52..de964a40ca4ca 100644 --- a/server/src/main/java/org/elasticsearch/action/resync/ResyncReplicationRequest.java +++ b/server/src/main/java/org/elasticsearch/action/resync/ResyncReplicationRequest.java @@ -36,12 +36,32 @@ */ public final class ResyncReplicationRequest extends ReplicatedWriteRequest { - private long trimAboveSeqNo; - private Translog.Operation[] operations; - private long maxSeenAutoIdTimestampOnPrimary; + private final long trimAboveSeqNo; + private final Translog.Operation[] operations; + private final long maxSeenAutoIdTimestampOnPrimary; - ResyncReplicationRequest() { - super(); + ResyncReplicationRequest(StreamInput in) throws IOException { + super(in); + assert Version.CURRENT.major <= 7; + if (in.getVersion().equals(Version.V_6_0_0)) { + /* + * Resync replication request serialization was broken in 6.0.0 due to the elements of the stream not being prefixed with a + * byte indicating the type of the operation. + */ + // TODO: remove this check in 8.0.0 which provides no BWC guarantees with 6.x. + throw new IllegalStateException("resync replication request serialization is broken in 6.0.0"); + } + if (in.getVersion().onOrAfter(Version.V_6_4_0)) { + trimAboveSeqNo = in.readZLong(); + } else { + trimAboveSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO; + } + if (in.getVersion().onOrAfter(Version.V_6_5_0)) { + maxSeenAutoIdTimestampOnPrimary = in.readZLong(); + } else { + maxSeenAutoIdTimestampOnPrimary = IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP; + } + operations = in.readArray(Translog.Operation::readOperation, Translog.Operation[]::new); } public ResyncReplicationRequest(final ShardId shardId, final long trimAboveSeqNo, final long maxSeenAutoIdTimestampOnPrimary, @@ -65,28 +85,8 @@ public Translog.Operation[] getOperations() { } @Override - public void readFrom(final StreamInput in) throws IOException { - assert Version.CURRENT.major <= 7; - if (in.getVersion().equals(Version.V_6_0_0)) { - /* - * Resync replication request serialization was broken in 6.0.0 due to the elements of the stream not being prefixed with a - * byte indicating the type of the operation. - */ - // TODO: remove this check in 8.0.0 which provides no BWC guarantees with 6.x. - throw new IllegalStateException("resync replication request serialization is broken in 6.0.0"); - } - super.readFrom(in); - if (in.getVersion().onOrAfter(Version.V_6_4_0)) { - trimAboveSeqNo = in.readZLong(); - } else { - trimAboveSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO; - } - if (in.getVersion().onOrAfter(Version.V_6_5_0)) { - maxSeenAutoIdTimestampOnPrimary = in.readZLong(); - } else { - maxSeenAutoIdTimestampOnPrimary = IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP; - } - operations = in.readArray(Translog.Operation::readOperation, Translog.Operation[]::new); + public void readFrom(final StreamInput in) { + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/support/broadcast/BroadcastRequest.java b/server/src/main/java/org/elasticsearch/action/support/broadcast/BroadcastRequest.java index 6cf42e7ad3f14..d19b28036b92b 100644 --- a/server/src/main/java/org/elasticsearch/action/support/broadcast/BroadcastRequest.java +++ b/server/src/main/java/org/elasticsearch/action/support/broadcast/BroadcastRequest.java @@ -36,6 +36,12 @@ public class BroadcastRequest> extends public BroadcastRequest() { } + public BroadcastRequest(StreamInput in) throws IOException { + super(in); + indices = in.readStringArray(); + indicesOptions = IndicesOptions.readIndicesOptions(in); + } + protected BroadcastRequest(String[] indices) { this.indices = indices; } diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/BasicReplicationRequest.java b/server/src/main/java/org/elasticsearch/action/support/replication/BasicReplicationRequest.java index b4731d19e29e4..b70b6b2566d10 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/BasicReplicationRequest.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/BasicReplicationRequest.java @@ -19,8 +19,11 @@ package org.elasticsearch.action.support.replication; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.index.shard.ShardId; +import java.io.IOException; + /** * A replication request that has no more information than ReplicationRequest. * Unfortunately ReplicationRequest can't be declared as a type parameter @@ -28,9 +31,6 @@ * instead. */ public class BasicReplicationRequest extends ReplicationRequest { - public BasicReplicationRequest() { - } - /** * Creates a new request with resolved shard id */ @@ -38,6 +38,10 @@ public BasicReplicationRequest(ShardId shardId) { super(shardId); } + public BasicReplicationRequest(StreamInput in) throws IOException { + super(in); + } + @Override public String toString() { return "BasicReplicationRequest{" + shardId + "}"; diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicatedWriteRequest.java b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicatedWriteRequest.java index fa02dac9e1e2d..25089d02b799f 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicatedWriteRequest.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicatedWriteRequest.java @@ -23,6 +23,7 @@ import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.index.shard.ShardId; @@ -39,10 +40,12 @@ public abstract class ReplicatedWriteRequest /** * Constructor for deserialization. */ - public ReplicatedWriteRequest() { + public ReplicatedWriteRequest(StreamInput in) throws IOException { + super(in); + refreshPolicy = RefreshPolicy.readFrom(in); } - public ReplicatedWriteRequest(ShardId shardId) { + public ReplicatedWriteRequest(@Nullable ShardId shardId) { super(shardId); } @@ -59,9 +62,8 @@ public RefreshPolicy getRefreshPolicy() { } @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - refreshPolicy = RefreshPolicy.readFrom(in); + public void readFrom(StreamInput in) { + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java index db043238feb3e..31d18d4dc0537 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java @@ -54,9 +54,9 @@ public abstract class ReplicationRequest request, ClusterService clusterService, + public TransportBroadcastReplicationAction(String name, Writeable.Reader requestReader, ClusterService clusterService, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, TransportReplicationAction replicatedBroadcastShardAction) { - super(name, transportService, actionFilters, request); + super(name, transportService, actionFilters, requestReader); this.replicatedBroadcastShardAction = replicatedBroadcastShardAction; this.clusterService = clusterService; this.indexNameExpressionResolver = indexNameExpressionResolver; diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index ac6298c2c8691..6df98fbf1498c 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -83,9 +83,6 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Supplier; - -import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM; /** * Base class for requests that should be executed on a primary copy followed by replica copies. @@ -120,10 +117,10 @@ protected TransportReplicationAction(Settings settings, String actionName, Trans ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction, ActionFilters actionFilters, - IndexNameExpressionResolver indexNameExpressionResolver, Supplier request, - Supplier replicaRequest, String executor) { + IndexNameExpressionResolver indexNameExpressionResolver, Writeable.Reader requestReader, + Writeable.Reader replicaRequestReader, String executor) { this(settings, actionName, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters, - indexNameExpressionResolver, request, replicaRequest, executor, false, false); + indexNameExpressionResolver, requestReader, replicaRequestReader, executor, false, false); } @@ -131,8 +128,8 @@ protected TransportReplicationAction(Settings settings, String actionName, Trans ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction, ActionFilters actionFilters, - IndexNameExpressionResolver indexNameExpressionResolver, Supplier request, - Supplier replicaRequest, String executor, + IndexNameExpressionResolver indexNameExpressionResolver, Writeable.Reader requestReader, + Writeable.Reader replicaRequestReader, String executor, boolean syncGlobalCheckpointAfterOperation, boolean forceExecutionOnPrimary) { super(actionName, actionFilters, transportService.getTaskManager()); this.threadPool = threadPool; @@ -146,14 +143,14 @@ protected TransportReplicationAction(Settings settings, String actionName, Trans this.transportPrimaryAction = actionName + "[p]"; this.transportReplicaAction = actionName + "[r]"; - transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, this::handleOperationRequest); + transportService.registerRequestHandler(actionName, ThreadPool.Names.SAME, requestReader, this::handleOperationRequest); - transportService.registerRequestHandler(transportPrimaryAction, - () -> new ConcreteShardRequest<>(request), executor, forceExecutionOnPrimary, true, this::handlePrimaryRequest); + transportService.registerRequestHandler(transportPrimaryAction, executor, forceExecutionOnPrimary, true, + in -> new ConcreteShardRequest<>(requestReader, in), this::handlePrimaryRequest); // we must never reject on because of thread pool capacity on replicas - transportService.registerRequestHandler(transportReplicaAction, () -> new ConcreteReplicaRequest<>(replicaRequest), - executor, true, true, this::handleReplicaRequest); + transportService.registerRequestHandler(transportReplicaAction, executor, true, true, + in -> new ConcreteReplicaRequest<>(replicaRequestReader, in), this::handleReplicaRequest); this.transportOptions = transportOptions(settings); @@ -1089,17 +1086,14 @@ public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, A public static class ConcreteShardRequest extends TransportRequest { /** {@link AllocationId#getId()} of the shard this request is sent to **/ - private String targetAllocationID; - - private long primaryTerm; + private final String targetAllocationID; + private final long primaryTerm; + private final R request; - private R request; - - public ConcreteShardRequest(Supplier requestSupplier) { - request = requestSupplier.get(); - // null now, but will be populated by reading from the streams - targetAllocationID = null; - primaryTerm = UNASSIGNED_PRIMARY_TERM; + public ConcreteShardRequest(Writeable.Reader requestReader, StreamInput in) throws IOException { + targetAllocationID = in.readString(); + primaryTerm = in.readVLong(); + request = requestReader.read(in); } public ConcreteShardRequest(R request, String targetAllocationID, long primaryTerm) { @@ -1135,10 +1129,8 @@ public String getDescription() { } @Override - public void readFrom(StreamInput in) throws IOException { - targetAllocationID = in.readString(); - primaryTerm = in.readVLong(); - request.readFrom(in); + public void readFrom(StreamInput in) { + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); } @Override @@ -1168,23 +1160,11 @@ public String toString() { protected static final class ConcreteReplicaRequest extends ConcreteShardRequest { - private long globalCheckpoint; - private long maxSeqNoOfUpdatesOrDeletes; + private final long globalCheckpoint; + private final long maxSeqNoOfUpdatesOrDeletes; - public ConcreteReplicaRequest(final Supplier requestSupplier) { - super(requestSupplier); - } - - public ConcreteReplicaRequest(final R request, final String targetAllocationID, final long primaryTerm, - final long globalCheckpoint, final long maxSeqNoOfUpdatesOrDeletes) { - super(request, targetAllocationID, primaryTerm); - this.globalCheckpoint = globalCheckpoint; - this.maxSeqNoOfUpdatesOrDeletes = maxSeqNoOfUpdatesOrDeletes; - } - - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); + public ConcreteReplicaRequest(Writeable.Reader requestReader, StreamInput in) throws IOException { + super(requestReader, in); if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) { globalCheckpoint = in.readZLong(); } else { @@ -1199,6 +1179,18 @@ public void readFrom(StreamInput in) throws IOException { } } + public ConcreteReplicaRequest(final R request, final String targetAllocationID, final long primaryTerm, + final long globalCheckpoint, final long maxSeqNoOfUpdatesOrDeletes) { + super(request, targetAllocationID, primaryTerm); + this.globalCheckpoint = globalCheckpoint; + this.maxSeqNoOfUpdatesOrDeletes = maxSeqNoOfUpdatesOrDeletes; + } + + @Override + public void readFrom(StreamInput in) { + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); + } + @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java index 619beab57932c..cb3f67aa99ea2 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java @@ -32,6 +32,7 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.mapper.MapperParsingException; @@ -47,7 +48,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Supplier; /** * Base class for transport actions that modify data in some shard like index, delete, and shardBulk. @@ -62,8 +62,8 @@ public abstract class TransportWriteAction< protected TransportWriteAction(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction, ActionFilters actionFilters, - IndexNameExpressionResolver indexNameExpressionResolver, Supplier request, - Supplier replicaRequest, String executor, boolean forceExecutionOnPrimary) { + IndexNameExpressionResolver indexNameExpressionResolver, Writeable.Reader request, + Writeable.Reader replicaRequest, String executor, boolean forceExecutionOnPrimary) { super(settings, actionName, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters, indexNameExpressionResolver, request, replicaRequest, executor, true, forceExecutionOnPrimary); } diff --git a/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java b/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java index 3693975ddab08..bbd17ab4a72e9 100644 --- a/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java +++ b/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java @@ -845,8 +845,7 @@ public void readFrom(StreamInput in) throws IOException { retryOnConflict = in.readVInt(); refreshPolicy = RefreshPolicy.readFrom(in); if (in.readBoolean()) { - doc = new IndexRequest(); - doc.readFrom(in); + doc = new IndexRequest(in); } if (in.getVersion().before(Version.V_7_0_0)) { String[] fields = in.readOptionalStringArray(); @@ -856,8 +855,7 @@ public void readFrom(StreamInput in) throws IOException { } fetchSourceContext = in.readOptionalWriteable(FetchSourceContext::new); if (in.readBoolean()) { - upsertRequest = new IndexRequest(); - upsertRequest.readFrom(in); + upsertRequest = new IndexRequest(in); } docAsUpsert = in.readBoolean(); if (in.getVersion().before(Version.V_7_0_0)) { diff --git a/server/src/main/java/org/elasticsearch/index/reindex/ReindexRequest.java b/server/src/main/java/org/elasticsearch/index/reindex/ReindexRequest.java index cd93356bb3968..de171e88fbca1 100644 --- a/server/src/main/java/org/elasticsearch/index/reindex/ReindexRequest.java +++ b/server/src/main/java/org/elasticsearch/index/reindex/ReindexRequest.java @@ -69,8 +69,7 @@ private ReindexRequest(SearchRequest search, IndexRequest destination, boolean s public ReindexRequest(StreamInput in) throws IOException { super.readFrom(in); - destination = new IndexRequest(); - destination.readFrom(in); + destination = new IndexRequest(in); remoteInfo = in.readOptionalWriteable(RemoteInfo::new); } diff --git a/server/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java b/server/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java index 4d3d0123fe6c9..b7f08a36ac06f 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java @@ -31,6 +31,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.index.shard.IndexShard; @@ -122,8 +123,8 @@ private void maybeSyncTranslog(final IndexShard indexShard) throws IOException { public static final class Request extends ReplicationRequest { - private Request() { - super(); + private Request(StreamInput in) throws IOException { + super(in); } public Request(final ShardId shardId) { diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java index d454c2de75b28..918ce664aea61 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java @@ -148,8 +148,9 @@ public RetentionLeases getRetentionLeases() { return retentionLeases; } - public Request() { - + public Request(StreamInput in) throws IOException { + super(in); + retentionLeases = new RetentionLeases(in); } public Request(final ShardId shardId, final RetentionLeases retentionLeases) { @@ -159,9 +160,8 @@ public Request(final ShardId shardId, final RetentionLeases retentionLeases) { } @Override - public void readFrom(final StreamInput in) throws IOException { - super.readFrom(in); - retentionLeases = new RetentionLeases(in); + public void readFrom(final StreamInput in) { + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); } @Override diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java index 26eb32a9f1860..a8aa7fe6f8ec2 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java @@ -157,8 +157,9 @@ public RetentionLeases getRetentionLeases() { return retentionLeases; } - public Request() { - + public Request(StreamInput in) throws IOException { + super(in); + retentionLeases = new RetentionLeases(in); } public Request(final ShardId shardId, final RetentionLeases retentionLeases) { @@ -168,9 +169,8 @@ public Request(final ShardId shardId, final RetentionLeases retentionLeases) { } @Override - public void readFrom(final StreamInput in) throws IOException { - super.readFrom(in); - retentionLeases = new RetentionLeases(in); + public void readFrom(final StreamInput in) { + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); } @Override diff --git a/server/src/test/java/org/elasticsearch/action/index/IndexRequestTests.java b/server/src/test/java/org/elasticsearch/action/index/IndexRequestTests.java index 3ab31b7d725cd..e01cc511ba136 100644 --- a/server/src/test/java/org/elasticsearch/action/index/IndexRequestTests.java +++ b/server/src/test/java/org/elasticsearch/action/index/IndexRequestTests.java @@ -163,8 +163,7 @@ public void testIndexRequestXContentSerialization() throws IOException { BytesStreamOutput out = new BytesStreamOutput(); indexRequest.writeTo(out); StreamInput in = StreamInput.wrap(out.bytes().toBytesRef().bytes); - IndexRequest serialized = new IndexRequest(); - serialized.readFrom(in); + IndexRequest serialized = new IndexRequest(in); assertEquals(XContentType.JSON, serialized.getContentType()); assertEquals(new BytesArray("{}"), serialized.source()); } @@ -173,14 +172,14 @@ public void testIndexRequestXContentSerialization() throws IOException { public void testSerializationOfEmptyRequestWorks() throws IOException { IndexRequest request = new IndexRequest("index"); assertNull(request.getContentType()); + assertEquals("index", request.index()); + try (BytesStreamOutput out = new BytesStreamOutput()) { request.writeTo(out); - try (StreamInput in = out.bytes().streamInput()) { - IndexRequest serialized = new IndexRequest(); - serialized.readFrom(in); - assertNull(request.getContentType()); - assertEquals("index", request.index()); + IndexRequest serialized = new IndexRequest(in); + assertNull(serialized.getContentType()); + assertEquals("index", serialized.index()); } } } diff --git a/server/src/test/java/org/elasticsearch/action/resync/ResyncReplicationRequestTests.java b/server/src/test/java/org/elasticsearch/action/resync/ResyncReplicationRequestTests.java index 230eccb057844..6aab44b722d9d 100644 --- a/server/src/test/java/org/elasticsearch/action/resync/ResyncReplicationRequestTests.java +++ b/server/src/test/java/org/elasticsearch/action/resync/ResyncReplicationRequestTests.java @@ -44,8 +44,7 @@ public void testSerialization() throws IOException { before.writeTo(out); final StreamInput in = out.bytes().streamInput(); - final ResyncReplicationRequest after = new ResyncReplicationRequest(); - after.readFrom(in); + final ResyncReplicationRequest after = new ResyncReplicationRequest(in); assertThat(after, equalTo(before)); } diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/BroadcastReplicationTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/BroadcastReplicationTests.java index 4c91bfaa420bd..383b6ed304db0 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/BroadcastReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/BroadcastReplicationTests.java @@ -37,6 +37,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.PageCacheRecycler; @@ -118,13 +119,13 @@ public static void afterClass() { threadPool = null; } - public void testNotStartedPrimary() throws InterruptedException, ExecutionException, IOException { + public void testNotStartedPrimary() throws InterruptedException, ExecutionException { final String index = "test"; setState(clusterService, state(index, randomBoolean(), randomBoolean() ? ShardRoutingState.INITIALIZING : ShardRoutingState.UNASSIGNED, ShardRoutingState.UNASSIGNED)); logger.debug("--> using initial state:\n{}", clusterService.state()); PlainActionFuture response = PlainActionFuture.newFuture(); - broadcastReplicationAction.execute(new DummyBroadcastRequest().indices(index), response); + broadcastReplicationAction.execute(new DummyBroadcastRequest(index), response); for (Tuple> shardRequests : broadcastReplicationAction.capturedShardRequests) { if (randomBoolean()) { shardRequests.v2().onFailure(new NoShardAvailableActionException(shardRequests.v1())); @@ -138,13 +139,13 @@ public void testNotStartedPrimary() throws InterruptedException, ExecutionExcept assertBroadcastResponse(2, 0, 0, response.get(), null); } - public void testStartedPrimary() throws InterruptedException, ExecutionException, IOException { + public void testStartedPrimary() throws InterruptedException, ExecutionException { final String index = "test"; setState(clusterService, state(index, randomBoolean(), ShardRoutingState.STARTED)); logger.debug("--> using initial state:\n{}", clusterService.state()); PlainActionFuture response = PlainActionFuture.newFuture(); - broadcastReplicationAction.execute(new DummyBroadcastRequest().indices(index), response); + broadcastReplicationAction.execute(new DummyBroadcastRequest(index), response); for (Tuple> shardRequests : broadcastReplicationAction.capturedShardRequests) { ReplicationResponse replicationResponse = new ReplicationResponse(); replicationResponse.setShardInfo(new ReplicationResponse.ShardInfo(1, 1)); @@ -225,7 +226,7 @@ protected ReplicationResponse newShardResponse() { @Override protected BasicReplicationRequest newShardRequest(DummyBroadcastRequest request, ShardId shardId) { - return new BasicReplicationRequest().setShardId(shardId); + return new BasicReplicationRequest(shardId); } @Override @@ -269,6 +270,12 @@ private void assertBroadcastResponse(int total, int successful, int failed, Broa } public static class DummyBroadcastRequest extends BroadcastRequest { + DummyBroadcastRequest(String... indices) { + super(indices); + } + DummyBroadcastRequest(StreamInput in) throws IOException { + super(in); + } } } diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java index 02988e7981a29..3af5047fe22e5 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java @@ -440,12 +440,8 @@ public static class Request extends ReplicationRequest { public AtomicBoolean processedOnPrimary = new AtomicBoolean(); public Set processedOnReplicas = ConcurrentCollections.newConcurrentSet(); - public Request() { - } - Request(ShardId shardId) { - this(); - this.shardId = shardId; + super(shardId); this.index = shardId.getIndexName(); this.waitForActiveShards = ActiveShardCount.NONE; // keep things simple diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index 02e9ff3146cf6..a663841ac6a47 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -130,6 +130,8 @@ public class TransportReplicationActionTests extends ESTestCase { + private static final ShardId NO_SHARD_ID = null; + /** * takes a request that was sent by a {@link TransportReplicationAction} and captured * and returns the underlying request if it's wrapped or the original (cast to the expected type). @@ -231,7 +233,7 @@ public ClusterBlockLevel indexBlockLevel() { { setStateWithBlock(clusterService, nonRetryableBlock, globalBlock); - Request request = globalBlock ? new Request() : new Request().index("index"); + Request request = globalBlock ? new Request(NO_SHARD_ID) : new Request(NO_SHARD_ID).index("index"); PlainActionFuture listener = new PlainActionFuture<>(); ReplicationTask task = maybeTask(); @@ -246,7 +248,7 @@ public ClusterBlockLevel indexBlockLevel() { { setStateWithBlock(clusterService, retryableBlock, globalBlock); - Request requestWithTimeout = (globalBlock ? new Request() : new Request().index("index")).timeout("5ms"); + Request requestWithTimeout = (globalBlock ? new Request(NO_SHARD_ID) : new Request(NO_SHARD_ID).index("index")).timeout("5ms"); PlainActionFuture listener = new PlainActionFuture<>(); ReplicationTask task = maybeTask(); @@ -262,7 +264,7 @@ public ClusterBlockLevel indexBlockLevel() { { setStateWithBlock(clusterService, retryableBlock, globalBlock); - Request request = globalBlock ? new Request() : new Request().index("index"); + Request request = globalBlock ? new Request(NO_SHARD_ID) : new Request(NO_SHARD_ID).index("index"); PlainActionFuture listener = new PlainActionFuture<>(); ReplicationTask task = maybeTask(); @@ -281,7 +283,7 @@ public ClusterBlockLevel indexBlockLevel() { assertIndexShardUninitialized(); } { - Request requestWithTimeout = new Request().index("unknown").setShardId(new ShardId("unknown", "_na_", 0)).timeout("5ms"); + Request requestWithTimeout = new Request(new ShardId("unknown", "_na_", 0)).index("unknown").timeout("5ms"); PlainActionFuture listener = new PlainActionFuture<>(); ReplicationTask task = maybeTask(); @@ -688,8 +690,8 @@ public void testPrimaryReference() throws Exception { } }; TestAction.PrimaryShardReference primary = action.new PrimaryShardReference(shard, releasable); - final Request request = new Request(); - Request replicaRequest = (Request) primary.perform(request).replicaRequest; + final Request request = new Request(NO_SHARD_ID); + primary.perform(request); final ElasticsearchException exception = new ElasticsearchException("testing"); primary.failShard("test", exception); @@ -716,7 +718,7 @@ public void testReplicaProxy() throws InterruptedException, ExecutionException { proxy.performOn( TestShardRouting.newShardRouting(shardId, "NOT THERE", routingState == ShardRoutingState.RELOCATING ? state.nodes().iterator().next().getId() : null, false, routingState), - new Request(), + new Request(NO_SHARD_ID), randomNonNegativeLong(), randomNonNegativeLong(), listener); @@ -727,7 +729,7 @@ public void testReplicaProxy() throws InterruptedException, ExecutionException { final ShardRouting replica = randomFrom(shardRoutings.replicaShards().stream() .filter(ShardRouting::assignedToNode).collect(Collectors.toList())); listener = new PlainActionFuture<>(); - proxy.performOn(replica, new Request(), randomNonNegativeLong(), randomNonNegativeLong(), listener); + proxy.performOn(replica, new Request(NO_SHARD_ID), randomNonNegativeLong(), randomNonNegativeLong(), listener); assertFalse(listener.isDone()); CapturingTransport.CapturedRequest[] captures = transport.getCapturedRequestsAndClear(); @@ -888,7 +890,7 @@ protected ReplicaResult shardOperationOnReplica(Request request, IndexShard repl try { action.handleReplicaRequest( new TransportReplicationAction.ConcreteReplicaRequest<>( - new Request().setShardId(shardId), replicaRouting.allocationId().getId(), randomNonNegativeLong(), + new Request(shardId), replicaRouting.allocationId().getId(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()), createTransportChannel(new PlainActionFuture<>()), task); } catch (ElasticsearchException e) { @@ -1020,7 +1022,7 @@ protected ReplicaResult shardOperationOnReplica(Request request, IndexShard repl } }; final PlainActionFuture listener = new PlainActionFuture<>(); - final Request request = new Request().setShardId(shardId); + final Request request = new Request(shardId); final long checkpoint = randomNonNegativeLong(); final long maxSeqNoOfUpdatesOrDeletes = randomNonNegativeLong(); action.handleReplicaRequest( @@ -1088,7 +1090,7 @@ protected ReplicaResult shardOperationOnReplica(Request request, IndexShard repl } }; final PlainActionFuture listener = new PlainActionFuture<>(); - final Request request = new Request().setShardId(shardId); + final Request request = new Request(shardId); final long checkpoint = randomNonNegativeLong(); final long maxSeqNoOfUpdates = randomNonNegativeLong(); action.handleReplicaRequest( @@ -1166,13 +1168,12 @@ public static class Request extends ReplicationRequest { public AtomicInteger processedOnReplicas = new AtomicInteger(); public AtomicBoolean isRetrySet = new AtomicBoolean(false); - public Request() { + Request(StreamInput in) throws IOException { + super(in); } - Request(ShardId shardId) { - this(); - this.shardId = shardId; - this.index = shardId.getIndexName(); + Request(@Nullable ShardId shardId) { + super(shardId); this.waitForActiveShards = ActiveShardCount.NONE; // keep things simple } @@ -1184,7 +1185,7 @@ public void writeTo(StreamOutput out) throws IOException { @Override public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); } @Override diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java index b8c87acb56dd6..15886a517d380 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java @@ -400,7 +400,7 @@ private String allocationId() { } private Request request() { - return new Request().setShardId(primary.shardId()); + return new Request(primary.shardId()); } /** @@ -558,6 +558,14 @@ protected ReplicaResult shardOperationOnReplica(Request shardRequest, IndexShard } static class Request extends ReplicationRequest { + Request(StreamInput in) throws IOException { + super(in); + } + + Request(ShardId shardId) { + super(shardId); + } + @Override public String toString() { return getTestClass().getName() + ".Request"; diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java index 3c2df0b59b24e..5a35202506d39 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java @@ -37,6 +37,7 @@ import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.Index; @@ -62,6 +63,7 @@ import org.junit.BeforeClass; import org.mockito.ArgumentCaptor; +import java.io.IOException; import java.util.Collections; import java.util.HashSet; import java.util.Locale; @@ -522,8 +524,12 @@ private IndexShard mockIndexShard(ShardId shardId, ClusterService clusterService } private static class TestRequest extends ReplicatedWriteRequest { + TestRequest(StreamInput in) throws IOException { + super(in); + } + TestRequest() { - setShardId(new ShardId("test", "test", 1)); + super(new ShardId("test", "test", 1)); } @Override diff --git a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index e9edae6468ddd..9b70aefa56b61 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -145,15 +145,13 @@ protected IndexMetaData buildIndexMetaData(int replicas, Settings indexSettings, return metaData.build(); } - protected IndexRequest copyIndexRequest(IndexRequest inRequest) throws IOException { - final IndexRequest outRequest = new IndexRequest(); + IndexRequest copyIndexRequest(IndexRequest inRequest) throws IOException { try (BytesStreamOutput out = new BytesStreamOutput()) { inRequest.writeTo(out); try (StreamInput in = out.bytes().streamInput()) { - outRequest.readFrom(in); + return new IndexRequest(in); } } - return outRequest; } protected DiscoveryNode getDiscoveryNode(String id) { diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsRequest.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsRequest.java index cf9239af740c4..f05a616c956b8 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsRequest.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsRequest.java @@ -16,11 +16,15 @@ public final class BulkShardOperationsRequest extends ReplicatedWriteRequest { - private String historyUUID; - private List operations; - private long maxSeqNoOfUpdatesOrDeletes; + private final String historyUUID; + private final List operations; + private final long maxSeqNoOfUpdatesOrDeletes; - public BulkShardOperationsRequest() { + public BulkShardOperationsRequest(StreamInput in) throws IOException { + super(in); + historyUUID = in.readString(); + maxSeqNoOfUpdatesOrDeletes = in.readZLong(); + operations = in.readList(Translog.Operation::readOperation); } public BulkShardOperationsRequest(final ShardId shardId, @@ -47,11 +51,8 @@ public long getMaxSeqNoOfUpdatesOrDeletes() { } @Override - public void readFrom(final StreamInput in) throws IOException { - super.readFrom(in); - historyUUID = in.readString(); - maxSeqNoOfUpdatesOrDeletes = in.readZLong(); - operations = in.readList(Translog.Operation::readOperation); + public void readFrom(final StreamInput in) { + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); } @Override