Skip to content

Commit

Permalink
Use Writeable for TransportReplAction derivatives (elastic#40894)
Browse files Browse the repository at this point in the history
  • Loading branch information
DaveCTurner committed Apr 5, 2019
1 parent 665f0d8 commit d0aa5a0
Show file tree
Hide file tree
Showing 32 changed files with 297 additions and 288 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -222,13 +222,9 @@ static DocWriteRequest<?> readDocumentRequest(StreamInput in) throws IOException
byte type = in.readByte();
DocWriteRequest<?> docWriteRequest;
if (type == 0) {
IndexRequest indexRequest = new IndexRequest();
indexRequest.readFrom(in);
docWriteRequest = indexRequest;
docWriteRequest = new IndexRequest(in);
} else if (type == 1) {
DeleteRequest deleteRequest = new DeleteRequest();
deleteRequest.readFrom(in);
docWriteRequest = deleteRequest;
docWriteRequest = new DeleteRequest(in);
} else if (type == 2) {
UpdateRequest updateRequest = new UpdateRequest();
updateRequest.readFrom(in);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,11 @@ public void markShardCopyAsStaleIfNeeded(final ShardId shardId, final String all

public static class ShardRequest extends ReplicationRequest<ShardRequest> {

private ClusterBlock clusterBlock;
private final ClusterBlock clusterBlock;

ShardRequest(){
ShardRequest(StreamInput in) throws IOException {
super(in);
clusterBlock = new ClusterBlock(in);
}

public ShardRequest(final ShardId shardId, final ClusterBlock clusterBlock, final TaskId parentTaskId) {
Expand All @@ -153,9 +155,8 @@ public String toString() {
}

@Override
public void readFrom(final StreamInput in) throws IOException {
super.readFrom(in);
clusterBlock = new ClusterBlock(in);
public void readFrom(final StreamInput in) {
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ public FlushRequest(String... indices) {
super(indices);
}

public FlushRequest(StreamInput in) throws IOException {
super(in);
force = in.readBoolean();
waitIfOngoing = in.readBoolean();
}

/**
* Returns {@code true} iff a flush should block
* if a another flush operation is already running. Otherwise {@code false}
Expand Down Expand Up @@ -103,9 +109,7 @@ public void writeTo(StreamOutput out) throws IOException {

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
force = in.readBoolean();
waitIfOngoing = in.readBoolean();
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,17 @@

public class ShardFlushRequest extends ReplicationRequest<ShardFlushRequest> {

private FlushRequest request = new FlushRequest();
private final FlushRequest request;

public ShardFlushRequest(FlushRequest request, ShardId shardId) {
super(shardId);
this.request = request;
this.waitForActiveShards = ActiveShardCount.NONE; // don't wait for any active shards before proceeding, by default
}

public ShardFlushRequest() {
public ShardFlushRequest(StreamInput in) throws IOException {
super(in);
request = new FlushRequest(in);
}

FlushRequest getRequest() {
Expand All @@ -46,8 +48,7 @@ FlushRequest getRequest() {

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
request.readFrom(in);
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ protected PrimaryResult<ShardFlushRequest, ReplicationResponse> shardOperationOn
IndexShard primary) {
primary.flush(shardRequest.getRequest());
logger.trace("{} flush request executed on primary", primary.shardId());
return new PrimaryResult<ShardFlushRequest, ReplicationResponse>(shardRequest, new ReplicationResponse());
return new PrimaryResult<>(shardRequest, new ReplicationResponse());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
package org.elasticsearch.action.admin.indices.refresh;

import org.elasticsearch.action.support.broadcast.BroadcastRequest;
import org.elasticsearch.common.io.stream.StreamInput;

import java.io.IOException;

/**
* A refresh request making all operations performed since the last refresh available for search. The (near) real-time
Expand All @@ -35,4 +38,8 @@ public class RefreshRequest extends BroadcastRequest<RefreshRequest> {
public RefreshRequest(String... indices) {
super(indices);
}

public RefreshRequest(StreamInput in) throws IOException {
super(in);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,14 @@ public class BulkShardRequest extends ReplicatedWriteRequest<BulkShardRequest> {

private BulkItemRequest[] items;

public BulkShardRequest() {
public BulkShardRequest(StreamInput in) throws IOException {
super(in);
items = new BulkItemRequest[in.readVInt()];
for (int i = 0; i < items.length; i++) {
if (in.readBoolean()) {
items[i] = BulkItemRequest.readBulkItem(in);
}
}
}

public BulkShardRequest(ShardId shardId, RefreshPolicy refreshPolicy, BulkItemRequest[] items) {
Expand All @@ -60,7 +67,7 @@ public String[] indices() {
indices.add(item.index());
}
}
return indices.toArray(new String[indices.size()]);
return indices.toArray(new String[0]);
}

@Override
Expand All @@ -78,14 +85,8 @@ public void writeTo(StreamOutput out) throws IOException {
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
items = new BulkItemRequest[in.readVInt()];
for (int i = 0; i < items.length; i++) {
if (in.readBoolean()) {
items[i] = BulkItemRequest.readBulkItem(in);
}
}
public void readFrom(StreamInput in) {
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,10 @@
import org.elasticsearch.action.support.WriteResponse;
import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.TransportService;

import java.util.function.Supplier;

/** use transport bulk action directly */
@Deprecated
public abstract class TransportSingleItemBulkWriteAction<
Expand All @@ -43,8 +42,8 @@ public abstract class TransportSingleItemBulkWriteAction<
private final TransportBulkAction bulkAction;

protected TransportSingleItemBulkWriteAction(String actionName, TransportService transportService, ActionFilters actionFilters,
Supplier<Request> request, TransportBulkAction bulkAction) {
super(actionName, transportService, actionFilters, request);
Writeable.Reader<Request> requestReader, TransportBulkAction bulkAction) {
super(actionName, transportService, actionFilters, requestReader);
this.bulkAction = bulkAction;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@
public class DeleteRequest extends ReplicatedWriteRequest<DeleteRequest>
implements DocWriteRequest<DeleteRequest>, CompositeIndicesRequest {

private static final ShardId NO_SHARD_ID = null;

// Set to null initially so we can know to override in bulk requests that have a default type.
private String type;
private String id;
Expand All @@ -63,14 +65,35 @@ public class DeleteRequest extends ReplicatedWriteRequest<DeleteRequest>
private long ifSeqNo = UNASSIGNED_SEQ_NO;
private long ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM;

public DeleteRequest(StreamInput in) throws IOException {
super(in);
type = in.readString();
id = in.readString();
routing = in.readOptionalString();
if (in.getVersion().before(Version.V_7_0_0)) {
in.readOptionalString(); // _parent
}
version = in.readLong();
versionType = VersionType.fromValue(in.readByte());
if (in.getVersion().onOrAfter(Version.V_6_6_0)) {
ifSeqNo = in.readZLong();
ifPrimaryTerm = in.readVLong();
} else {
ifSeqNo = UNASSIGNED_SEQ_NO;
ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM;
}
}

public DeleteRequest() {
super(NO_SHARD_ID);
}

/**
* Constructs a new delete request against the specified index. The {@link #type(String)} and {@link #id(String)}
* must be set.
*/
public DeleteRequest(String index) {
super(NO_SHARD_ID);
this.index = index;
}

Expand All @@ -85,6 +108,7 @@ public DeleteRequest(String index) {
*/
@Deprecated
public DeleteRequest(String index, String type, String id) {
super(NO_SHARD_ID);
this.index = index;
this.type = type;
this.id = id;
Expand All @@ -97,6 +121,7 @@ public DeleteRequest(String index, String type, String id) {
* @param id The id of the document
*/
public DeleteRequest(String index, String id) {
super(NO_SHARD_ID);
this.index = index;
this.id = id;
}
Expand Down Expand Up @@ -274,23 +299,8 @@ public OpType opType() {
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
type = in.readString();
id = in.readString();
routing = in.readOptionalString();
if (in.getVersion().before(Version.V_7_0_0)) {
in.readOptionalString(); // _parent
}
version = in.readLong();
versionType = VersionType.fromValue(in.readByte());
if (in.getVersion().onOrAfter(Version.V_6_6_0)) {
ifSeqNo = in.readZLong();
ifPrimaryTerm = in.readVLong();
} else {
ifSeqNo = UNASSIGNED_SEQ_NO;
ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM;
}
public void readFrom(StreamInput in) {
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}

@Override
Expand Down Expand Up @@ -321,14 +331,4 @@ public void writeTo(StreamOutput out) throws IOException {
public String toString() {
return "delete {[" + index + "][" + type() + "][" + id + "]}";
}

/**
* Override this method from ReplicationAction, this is where we are storing our state in the request object (which we really shouldn't
* do). Once the transport client goes away we can move away from making this available, but in the meantime this is dangerous to set or
* use because the DeleteRequest object will always be wrapped in a bulk request envelope, which is where this *should* be set.
*/
@Override
public DeleteRequest setShardId(ShardId shardId) {
throw new UnsupportedOperationException("shard id should never be set on DeleteRequest");
}
}
Loading

0 comments on commit d0aa5a0

Please sign in to comment.