Skip to content

Commit

Permalink
Make reindexing managed by a persistent task (#43382)
Browse files Browse the repository at this point in the history
This is related to #42612. Currently the reindexing transport action
creates a task on the local coordinator node. Unfortunately this is not
resilient to coordinator node failures. This commit adds a new action
that creates a reindexing job as a persistent task.
  • Loading branch information
Tim-Brooks committed Jul 18, 2019
1 parent 4fd8e34 commit 480c545
Show file tree
Hide file tree
Showing 19 changed files with 1,294 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ public void testGetValidTask() throws Exception {
}
TaskInfo info = taskResponse.getTaskInfo();
assertTrue(info.isCancellable());
assertEquals("reindex from [source1] to [dest][_doc]", info.getDescription());
assertEquals("indices:data/write/reindex", info.getAction());
assertEquals("persistent reindex from [source1] to [dest][_doc]", info.getDescription());
assertEquals("reindex/job[c]", info.getAction());
if (taskResponse.isCompleted() == false) {
assertBusy(ReindexIT.checkCompletionStatus(client(), taskId.toString()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,22 @@
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.common.settings.SettingsModule;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.persistent.PersistentTaskParams;
import org.elasticsearch.persistent.PersistentTaskState;
import org.elasticsearch.persistent.PersistentTasksExecutor;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.PersistentTaskPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestHandler;
Expand All @@ -50,23 +56,36 @@
import java.util.List;
import java.util.function.Supplier;

import static java.util.Collections.singletonList;
public class ReindexPlugin extends Plugin implements ActionPlugin, PersistentTaskPlugin {

public class ReindexPlugin extends Plugin implements ActionPlugin {
public static final String NAME = "reindex";

@Override
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
return Arrays.asList(new ActionHandler<>(ReindexAction.INSTANCE, TransportReindexAction.class),
new ActionHandler<>(UpdateByQueryAction.INSTANCE, TransportUpdateByQueryAction.class),
new ActionHandler<>(DeleteByQueryAction.INSTANCE, TransportDeleteByQueryAction.class),
new ActionHandler<>(RethrottleAction.INSTANCE, TransportRethrottleAction.class));
new ActionHandler<>(RethrottleAction.INSTANCE, TransportRethrottleAction.class),
new ActionHandler<>(StartReindexJobAction.INSTANCE, TransportStartReindexJobAction.class)
);
}

@Override
public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
return singletonList(
new NamedWriteableRegistry.Entry(Task.Status.class, BulkByScrollTask.Status.NAME, BulkByScrollTask.Status::new));
return Arrays.asList(
new NamedWriteableRegistry.Entry(Task.Status.class, BulkByScrollTask.Status.NAME, BulkByScrollTask.Status::new),
new NamedWriteableRegistry.Entry(PersistentTaskParams.class, ReindexJob.NAME, ReindexJob::new),
new NamedWriteableRegistry.Entry(Task.Status.class, ReindexJobState.NAME, ReindexJobState::new),
new NamedWriteableRegistry.Entry(PersistentTaskState.class, ReindexJobState.NAME, ReindexJobState::new));
}

@Override
public List<NamedXContentRegistry.Entry> getNamedXContent() {
return Arrays.asList(
new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(ReindexJob.NAME), ReindexJob::fromXContent),
new NamedXContentRegistry.Entry(Task.Status.class, new ParseField(ReindexJobState.NAME), ReindexJobState::fromXContent),
new NamedXContentRegistry.Entry(PersistentTaskState.class, new ParseField(ReindexJobState.NAME),
ReindexJobState::fromXContent));
}

@Override
Expand Down Expand Up @@ -95,4 +114,10 @@ public List<Setting<?>> getSettings() {
settings.addAll(ReindexSslConfig.getSettings());
return settings;
}

@Override
public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(ClusterService clusterService, ThreadPool threadPool, Client client,
SettingsModule settingsModule) {
return Collections.singletonList(new ReindexTask.ReindexPersistentTasksExecutor(clusterService, client));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,22 @@

package org.elasticsearch.index.reindex;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.action.RestBuilderListener;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import static org.elasticsearch.common.unit.TimeValue.parseTimeValue;
import static org.elasticsearch.rest.RestRequest.Method.POST;
Expand All @@ -47,7 +56,55 @@ public String getName() {

@Override
public RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
return doPrepareRequest(request, client, true, true);
boolean waitForCompletion = request.paramAsBoolean("wait_for_completion", true);

// Build the internal request
StartReindexJobAction.Request internal = new StartReindexJobAction.Request(setCommonOptions(request, buildRequest(request)),
waitForCompletion);
/*
* Let's try and validate before forking so the user gets some error. The
* task can't totally validate until it starts but this is better than
* nothing.
*/
ActionRequestValidationException validationException = internal.getReindexRequest().validate();
if (validationException != null) {
throw validationException;
}

// Executes the request and waits for completion
if (waitForCompletion) {
Map<String, String> params = new HashMap<>();
params.put(BulkByScrollTask.Status.INCLUDE_CREATED, Boolean.toString(true));
params.put(BulkByScrollTask.Status.INCLUDE_UPDATED, Boolean.toString(true));

return channel -> client.execute(StartReindexJobAction.INSTANCE, internal, new ActionListener<>() {

private BulkIndexByScrollResponseContentListener listener = new BulkIndexByScrollResponseContentListener(channel, params);

@Override
public void onResponse(StartReindexJobAction.Response response) {
listener.onResponse(response.getReindexResponse());
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
} else {
return channel -> client.execute(StartReindexJobAction.INSTANCE, internal, new RestBuilderListener<>(channel) {
@Override
public RestResponse buildResponse(StartReindexJobAction.Response response, XContentBuilder builder) throws Exception {
builder.startObject();
// This is the ephemeral task-id from the first node that is assigned the task (for BWC).
builder.field("task", response.getTaskId());

// TODO: Are there error conditions for the non-wait case?
return new BytesRestResponse(RestStatus.OK, builder.endObject());
}
});
}

}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.index.reindex;

import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.CompositeIndicesRequest;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;

import java.io.IOException;

public class StartReindexJobAction extends ActionType<StartReindexJobAction.Response> {

public static final StartReindexJobAction INSTANCE = new StartReindexJobAction();
// TODO: Name
public static final String NAME = "indices:data/write/start_reindex";

private StartReindexJobAction() {
super(NAME, Response::new);
}

public static class Request extends AcknowledgedRequest<Request> implements ToXContentObject, CompositeIndicesRequest {

private final ReindexRequest reindexRequest;
private final boolean waitForCompletion;


public Request(ReindexRequest reindexRequest) {
this(reindexRequest, false);
}

public Request(ReindexRequest reindexRequest, boolean waitForCompletion) {
this.reindexRequest = reindexRequest;
this.waitForCompletion = waitForCompletion;
}

public Request(StreamInput in) throws IOException {
super(in);
reindexRequest = new ReindexRequest(in);
waitForCompletion = in.readBoolean();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
reindexRequest.writeTo(out);
out.writeBoolean(waitForCompletion);
}

@Override
public ActionRequestValidationException validate() {
return null;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) {
return builder;
}

public ReindexRequest getReindexRequest() {
return reindexRequest;
}

public boolean getWaitForCompletion() {
return waitForCompletion;
}
}

public static class Response extends ActionResponse {

static final ParseField TASK_ID = new ParseField("task_id");
static final ParseField REINDEX_RESPONSE = new ParseField("reindex_response");

private static final ConstructingObjectParser<Response, Void> PARSER = new ConstructingObjectParser<>(
"start_reindex_response", true, args -> new Response((String) args[0], (BulkByScrollResponse) args[1]));

static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), TASK_ID);
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(),
(parser, context) -> BulkByScrollResponse.fromXContent(parser), REINDEX_RESPONSE);
}

private final String taskId;
@Nullable private final BulkByScrollResponse reindexResponse;

public Response(String taskId) {
this(taskId, null);
}

public Response(String taskId, BulkByScrollResponse reindexResponse) {
this.taskId = taskId;
this.reindexResponse = reindexResponse;
}

public Response(StreamInput in) throws IOException {
super(in);
taskId = in.readString();
reindexResponse = in.readOptionalWriteable(BulkByScrollResponse::new);
}

@Override
public void readFrom(StreamInput in) {
throw new UnsupportedOperationException();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(taskId);
out.writeOptionalWriteable(reindexResponse);
}

public String getTaskId() {
return taskId;
}

public BulkByScrollResponse getReindexResponse() {
return reindexResponse;
}

public static Response fromXContent(final XContentParser parser) throws IOException {
return PARSER.parse(parser, null);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.lucene.util.automaton.Operations;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkItemResponse.Failure;
import org.elasticsearch.action.index.IndexRequest;
Expand Down Expand Up @@ -107,7 +108,7 @@ public class TransportReindexAction extends HandledTransportAction<ReindexReques
public TransportReindexAction(Settings settings, ThreadPool threadPool, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, ClusterService clusterService, ScriptService scriptService,
AutoCreateIndex autoCreateIndex, Client client, TransportService transportService, ReindexSslConfig sslConfig) {
super(ReindexAction.NAME, transportService, actionFilters, (Writeable.Reader<ReindexRequest>)ReindexRequest::new);
super(ReindexAction.NAME, transportService, actionFilters, (Writeable.Reader<ReindexRequest>) ReindexRequest::new);
this.threadPool = threadPool;
this.clusterService = clusterService;
this.scriptService = scriptService;
Expand All @@ -120,22 +121,30 @@ public TransportReindexAction(Settings settings, ThreadPool threadPool, ActionFi

@Override
protected void doExecute(Task task, ReindexRequest request, ActionListener<BulkByScrollResponse> listener) {
checkRemoteWhitelist(remoteWhitelist, request.getRemoteInfo());
ClusterState state = clusterService.state();
validateAgainstAliases(request.getSearchRequest(), request.getDestination(), request.getRemoteInfo(),
indexNameExpressionResolver, autoCreateIndex, state);

BulkByScrollTask bulkByScrollTask = (BulkByScrollTask) task;

BulkByScrollParallelizationHelper.startSlicedAction(request, bulkByScrollTask, ReindexAction.INSTANCE, listener, client,
clusterService.localNode(),
() -> {
ParentTaskAssigningClient assigningClient = new ParentTaskAssigningClient(client, clusterService.localNode(),
bulkByScrollTask);
new AsyncIndexBySearchAction(bulkByScrollTask, logger, assigningClient, threadPool, this, request, state,
listener).start();
// We dispatch here because the new ReindexTask uses this action. When an action is executed locally,
// it is not dispatched from the ctor argument.
threadPool.generic().execute(new ActionRunnable<>(listener) {

@Override
protected void doRun() {
checkRemoteWhitelist(remoteWhitelist, request.getRemoteInfo());
ClusterState state = clusterService.state();
validateAgainstAliases(request.getSearchRequest(), request.getDestination(), request.getRemoteInfo(),
indexNameExpressionResolver, autoCreateIndex, state);

BulkByScrollTask bulkByScrollTask = (BulkByScrollTask) task;

BulkByScrollParallelizationHelper.startSlicedAction(request, bulkByScrollTask, ReindexAction.INSTANCE, listener, client,
clusterService.localNode(),
() -> {
ParentTaskAssigningClient assigningClient = new ParentTaskAssigningClient(client, clusterService.localNode(),
bulkByScrollTask);
new AsyncIndexBySearchAction(bulkByScrollTask, logger, assigningClient, threadPool, TransportReindexAction.this,
request, state, listener).start();
}
);
}
);
});
}

static void checkRemoteWhitelist(CharacterRunAutomaton whitelist, RemoteInfo remoteInfo) {
Expand Down
Loading

0 comments on commit 480c545

Please sign in to comment.