Skip to content

Commit

Permalink
Changed ReindexRequest to use Writeable.Reader (#32401)
Browse files Browse the repository at this point in the history
-- This is a pre-stage for adding the reindex API to the REST high-level-client
-- Follows the pattern set in #26315
  • Loading branch information
sohaibiftikhar authored and nik9000 committed Jul 31, 2018
1 parent adb93da commit 4fa92cb
Show file tree
Hide file tree
Showing 7 changed files with 42 additions and 31 deletions.
Expand Up @@ -27,13 +27,13 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.util.function.Supplier;

public class TransportDeleteByQueryAction extends HandledTransportAction<DeleteByQueryRequest, BulkByScrollResponse> {

Expand All @@ -46,7 +46,7 @@ public class TransportDeleteByQueryAction extends HandledTransportAction<DeleteB
public TransportDeleteByQueryAction(Settings settings, ThreadPool threadPool, ActionFilters actionFilters, Client client,
TransportService transportService, ScriptService scriptService, ClusterService clusterService) {
super(settings, DeleteByQueryAction.NAME, transportService, actionFilters,
(Supplier<DeleteByQueryRequest>) DeleteByQueryRequest::new);
(Writeable.Reader<DeleteByQueryRequest>) DeleteByQueryRequest::new);
this.threadPool = threadPool;
this.client = client;
this.scriptService = scriptService;
Expand Down
Expand Up @@ -39,6 +39,7 @@
import org.elasticsearch.action.bulk.BulkItemResponse.Failure;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.index.reindex.ScrollableHitSource.SearchFailure;
import org.elasticsearch.action.index.IndexRequest;
Expand Down Expand Up @@ -104,7 +105,7 @@ public class TransportReindexAction extends HandledTransportAction<ReindexReques
public TransportReindexAction(Settings settings, ThreadPool threadPool, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, ClusterService clusterService, ScriptService scriptService,
AutoCreateIndex autoCreateIndex, Client client, TransportService transportService) {
super(settings, ReindexAction.NAME, transportService, actionFilters, ReindexRequest::new);
super(settings, ReindexAction.NAME, transportService, actionFilters, (Writeable.Reader<ReindexRequest>)ReindexRequest::new);
this.threadPool = threadPool;
this.clusterService = clusterService;
this.scriptService = scriptService;
Expand Down
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.mapper.IdFieldMapper;
Expand All @@ -43,7 +44,6 @@

import java.util.Map;
import java.util.function.BiFunction;
import java.util.function.Supplier;

public class TransportUpdateByQueryAction extends HandledTransportAction<UpdateByQueryRequest, BulkByScrollResponse> {

Expand All @@ -56,7 +56,7 @@ public class TransportUpdateByQueryAction extends HandledTransportAction<UpdateB
public TransportUpdateByQueryAction(Settings settings, ThreadPool threadPool, ActionFilters actionFilters, Client client,
TransportService transportService, ScriptService scriptService, ClusterService clusterService) {
super(settings, UpdateByQueryAction.NAME, transportService, actionFilters,
(Supplier<UpdateByQueryRequest>) UpdateByQueryRequest::new);
(Writeable.Reader<UpdateByQueryRequest>) UpdateByQueryRequest::new);
this.threadPool = threadPool;
this.client = client;
this.scriptService = scriptService;
Expand Down
Expand Up @@ -67,19 +67,17 @@ public void testReindexRequest() throws IOException {
new RemoteInfo(randomAlphaOfLength(5), randomAlphaOfLength(5), port, null,
query, username, password, headers, socketTimeout, connectTimeout));
}
ReindexRequest tripped = new ReindexRequest();
roundTrip(reindex, tripped);
ReindexRequest tripped = new ReindexRequest(toInputByteStream(reindex));
assertRequestEquals(reindex, tripped);

// Try slices=auto with a version that doesn't support it, which should fail
reindex.setSlices(AbstractBulkByScrollRequest.AUTO_SLICES);
Exception e = expectThrows(IllegalArgumentException.class, () -> roundTrip(Version.V_6_0_0_alpha1, reindex, null));
Exception e = expectThrows(IllegalArgumentException.class, () -> toInputByteStream(Version.V_6_0_0_alpha1, reindex));
assertEquals("Slices set as \"auto\" are not supported before version [6.1.0]. Found version [6.0.0-alpha1]", e.getMessage());

// Try regular slices with a version that doesn't support slices=auto, which should succeed
tripped = new ReindexRequest();
reindex.setSlices(between(1, Integer.MAX_VALUE));
roundTrip(Version.V_6_0_0_alpha1, reindex, tripped);
tripped = new ReindexRequest(toInputByteStream(reindex));
assertRequestEquals(Version.V_6_0_0_alpha1, reindex, tripped);
}

Expand All @@ -89,40 +87,36 @@ public void testUpdateByQueryRequest() throws IOException {
if (randomBoolean()) {
update.setPipeline(randomAlphaOfLength(5));
}
UpdateByQueryRequest tripped = new UpdateByQueryRequest();
roundTrip(update, tripped);
UpdateByQueryRequest tripped = new UpdateByQueryRequest(toInputByteStream(update));
assertRequestEquals(update, tripped);
assertEquals(update.getPipeline(), tripped.getPipeline());

// Try slices=auto with a version that doesn't support it, which should fail
update.setSlices(AbstractBulkByScrollRequest.AUTO_SLICES);
Exception e = expectThrows(IllegalArgumentException.class, () -> roundTrip(Version.V_6_0_0_alpha1, update, null));
Exception e = expectThrows(IllegalArgumentException.class, () -> toInputByteStream(Version.V_6_0_0_alpha1, update));
assertEquals("Slices set as \"auto\" are not supported before version [6.1.0]. Found version [6.0.0-alpha1]", e.getMessage());

// Try regular slices with a version that doesn't support slices=auto, which should succeed
tripped = new UpdateByQueryRequest();
update.setSlices(between(1, Integer.MAX_VALUE));
roundTrip(Version.V_6_0_0_alpha1, update, tripped);
tripped = new UpdateByQueryRequest(toInputByteStream(update));
assertRequestEquals(update, tripped);
assertEquals(update.getPipeline(), tripped.getPipeline());
}

public void testDeleteByQueryRequest() throws IOException {
DeleteByQueryRequest delete = new DeleteByQueryRequest(new SearchRequest());
randomRequest(delete);
DeleteByQueryRequest tripped = new DeleteByQueryRequest();
roundTrip(delete, tripped);
DeleteByQueryRequest tripped = new DeleteByQueryRequest(toInputByteStream(delete));
assertRequestEquals(delete, tripped);

// Try slices=auto with a version that doesn't support it, which should fail
delete.setSlices(AbstractBulkByScrollRequest.AUTO_SLICES);
Exception e = expectThrows(IllegalArgumentException.class, () -> roundTrip(Version.V_6_0_0_alpha1, delete, null));
Exception e = expectThrows(IllegalArgumentException.class, () -> toInputByteStream(Version.V_6_0_0_alpha1, delete));
assertEquals("Slices set as \"auto\" are not supported before version [6.1.0]. Found version [6.0.0-alpha1]", e.getMessage());

// Try regular slices with a version that doesn't support slices=auto, which should succeed
tripped = new DeleteByQueryRequest();
delete.setSlices(between(1, Integer.MAX_VALUE));
roundTrip(Version.V_6_0_0_alpha1, delete, tripped);
tripped = new DeleteByQueryRequest(toInputByteStream(delete));
assertRequestEquals(delete, tripped);
}

Expand Down Expand Up @@ -198,23 +192,24 @@ public void testRethrottleRequest() throws IOException {
request.setTaskId(new TaskId(randomAlphaOfLength(5), randomLong()));
}
RethrottleRequest tripped = new RethrottleRequest();
roundTrip(request, tripped);
// We use readFrom here because Rethrottle does not support the Writeable.Reader interface
tripped.readFrom(toInputByteStream(request));
assertEquals(request.getRequestsPerSecond(), tripped.getRequestsPerSecond(), 0.00001);
assertArrayEquals(request.getActions(), tripped.getActions());
assertEquals(request.getTaskId(), tripped.getTaskId());
}

private void roundTrip(Streamable example, Streamable empty) throws IOException {
roundTrip(Version.CURRENT, example, empty);
private StreamInput toInputByteStream(Streamable example) throws IOException {
return toInputByteStream(Version.CURRENT, example);
}

private void roundTrip(Version version, Streamable example, Streamable empty) throws IOException {
private StreamInput toInputByteStream(Version version, Streamable example) throws IOException {
BytesStreamOutput out = new BytesStreamOutput();
out.setVersion(version);
example.writeTo(out);
StreamInput in = out.bytes().streamInput();
in.setVersion(version);
empty.readFrom(in);
return in;
}

private Script randomScript() {
Expand Down
Expand Up @@ -23,8 +23,11 @@
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.tasks.TaskId;

import java.io.IOException;

import static org.elasticsearch.action.ValidateActions.addValidationError;

/**
Expand Down Expand Up @@ -53,6 +56,10 @@ public DeleteByQueryRequest(SearchRequest search) {
this(search, true);
}

public DeleteByQueryRequest(StreamInput in) throws IOException {
super.readFrom(in);
}

private DeleteByQueryRequest(SearchRequest search, boolean setDefaults) {
super(search, setDefaults);
// Delete-By-Query does not require the source
Expand Down
Expand Up @@ -59,6 +59,13 @@ private ReindexRequest(SearchRequest search, IndexRequest destination, boolean s
this.destination = destination;
}

public ReindexRequest(StreamInput in) throws IOException {
super.readFrom(in);
destination = new IndexRequest();
destination.readFrom(in);
remoteInfo = in.readOptionalWriteable(RemoteInfo::new);
}

@Override
protected ReindexRequest self() {
return this;
Expand Down Expand Up @@ -135,10 +142,7 @@ public ReindexRequest forSlice(TaskId slicingTask, SearchRequest slice, int tota

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
destination = new IndexRequest();
destination.readFrom(in);
remoteInfo = in.readOptionalWriteable(RemoteInfo::new);
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}

@Override
Expand Down
Expand Up @@ -47,6 +47,11 @@ public UpdateByQueryRequest(SearchRequest search) {
this(search, true);
}

public UpdateByQueryRequest(StreamInput in) throws IOException {
super.readFrom(in);
pipeline = in.readOptionalString();
}

private UpdateByQueryRequest(SearchRequest search, boolean setDefaults) {
super(search, setDefaults);
}
Expand Down Expand Up @@ -108,8 +113,7 @@ public IndicesOptions indicesOptions() {

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
pipeline = in.readOptionalString();
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}

@Override
Expand Down

0 comments on commit 4fa92cb

Please sign in to comment.