Skip to content

Commit

Permalink
Persistent Tasks: force writeable name of params and status to be the…
Browse files Browse the repository at this point in the history
… same as their task (#1072)

Changes persistent task serialization and forces params and status to have the same writeable name as the task itself.
  • Loading branch information
imotov authored and martijnvg committed Feb 5, 2018
1 parent c36ddbe commit dd363a1
Show file tree
Hide file tree
Showing 10 changed files with 130 additions and 75 deletions.
Expand Up @@ -82,16 +82,21 @@ public PersistentTasksCustomMetaData(long lastAllocationId, Map<String, Persiste
public static final ConstructingObjectParser<Assignment, Void> ASSIGNMENT_PARSER =
new ConstructingObjectParser<>("assignment", objects -> new Assignment((String) objects[0], (String) objects[1]));

private static final NamedObjectParser<PersistentTaskParams, Void> PARAMS_PARSER =
(XContentParser p, Void c, String name) -> p.namedObject(PersistentTaskParams.class, name, null);
private static final NamedObjectParser<Status, Void> STATUS_PARSER =
(XContentParser p, Void c, String name) -> p.namedObject(Status.class, name, null);
private static final NamedObjectParser<TaskDescriptionBuilder<PersistentTaskParams>, Void> TASK_DESCRIPTION_PARSER;

static {
// Tasks parser initialization
PERSISTENT_TASKS_PARSER.declareLong(Builder::setLastAllocationId, new ParseField("last_allocation_id"));
PERSISTENT_TASKS_PARSER.declareObjectArray(Builder::setTasks, PERSISTENT_TASK_PARSER, new ParseField("tasks"));

// Task description parser initialization
ObjectParser<TaskDescriptionBuilder<PersistentTaskParams>, String> parser = new ObjectParser<>("named");
parser.declareObject(TaskDescriptionBuilder::setParams,
(p, c) -> p.namedObject(PersistentTaskParams.class, c, null), new ParseField("params"));
parser.declareObject(TaskDescriptionBuilder::setStatus,
(p, c) -> p.namedObject(Status.class, c, null), new ParseField("status"));
TASK_DESCRIPTION_PARSER = (XContentParser p, Void c, String name) -> parser.parse(p, new TaskDescriptionBuilder<>(name), name);

// Assignment parser
ASSIGNMENT_PARSER.declareStringOrNull(constructorArg(), new ParseField("executor_node"));
ASSIGNMENT_PARSER.declareStringOrNull(constructorArg(), new ParseField("explanation"));
Expand All @@ -100,28 +105,46 @@ public PersistentTasksCustomMetaData(long lastAllocationId, Map<String, Persiste
PERSISTENT_TASK_PARSER.declareString(TaskBuilder::setId, new ParseField("id"));
PERSISTENT_TASK_PARSER.declareString(TaskBuilder::setTaskName, new ParseField("name"));
PERSISTENT_TASK_PARSER.declareLong(TaskBuilder::setAllocationId, new ParseField("allocation_id"));
PERSISTENT_TASK_PARSER.declareNamedObjects(
(TaskBuilder<PersistentTaskParams> taskBuilder, List<PersistentTaskParams> objects) -> {
if (objects.size() != 1) {
throw new IllegalArgumentException("only one params per task is allowed");
}
taskBuilder.setParams(objects.get(0));
}, PARAMS_PARSER, new ParseField("params"));

PERSISTENT_TASK_PARSER.declareNamedObjects(
(TaskBuilder<PersistentTaskParams> taskBuilder, List<Status> objects) -> {
(TaskBuilder<PersistentTaskParams> taskBuilder, List<TaskDescriptionBuilder<PersistentTaskParams>> objects) -> {
if (objects.size() != 1) {
throw new IllegalArgumentException("only one status per task is allowed");
throw new IllegalArgumentException("only one task description per task is allowed");
}
taskBuilder.setStatus(objects.get(0));
}, STATUS_PARSER, new ParseField("status"));


TaskDescriptionBuilder<PersistentTaskParams> builder = objects.get(0);
taskBuilder.setTaskName(builder.taskName);
taskBuilder.setParams(builder.params);
taskBuilder.setStatus(builder.status);
}, TASK_DESCRIPTION_PARSER, new ParseField("task"));
PERSISTENT_TASK_PARSER.declareObject(TaskBuilder::setAssignment, ASSIGNMENT_PARSER, new ParseField("assignment"));
PERSISTENT_TASK_PARSER.declareLong(TaskBuilder::setAllocationIdOnLastStatusUpdate,
new ParseField("allocation_id_on_last_status_update"));
}

/**
* Private builder used in XContent parser to build task-specific portion (params and status)
*/
private static class TaskDescriptionBuilder<Params extends PersistentTaskParams> {
private final String taskName;
private Params params;
private Status status;

private TaskDescriptionBuilder(String taskName) {
this.taskName = taskName;
}

private TaskDescriptionBuilder setParams(Params params) {
this.params = params;
return this;
}

private TaskDescriptionBuilder setStatus(Status status) {
this.status = status;
return this;
}
}


public Collection<PersistentTask<?>> tasks() {
return this.tasks.values();
}
Expand Down Expand Up @@ -279,6 +302,18 @@ private PersistentTask(String id, long allocationId, String taskName, Params par
this.status = status;
this.assignment = assignment;
this.allocationIdOnLastStatusUpdate = allocationIdOnLastStatusUpdate;
if (params != null) {
if (params.getWriteableName().equals(taskName) == false) {
throw new IllegalArgumentException("params have to have the same writeable name as task. params: " +
params.getWriteableName() + " task: " + taskName);
}
}
if (status != null) {
if (status.getWriteableName().equals(taskName) == false) {
throw new IllegalArgumentException("status has to have the same writeable name as task. status: " +
status.getWriteableName() + " task: " + taskName);
}
}
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -384,21 +419,20 @@ public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params xPa
builder.startObject();
{
builder.field("id", id);
builder.field("name", taskName);
if (params != null) {
builder.startObject("params");
{
builder.field(params.getWriteableName(), params, xParams);
}
builder.endObject();
}
if (status != null) {
builder.startObject("status");
builder.startObject("task");
{
builder.startObject(taskName);
{
builder.field(status.getWriteableName(), status, xParams);
if (params != null) {
builder.field("params", params, xParams);
}
if (status != null) {
builder.field("status", status, xParams);
}
}
builder.endObject();
}
builder.endObject();

if (API_CONTEXT.equals(xParams.param(MetaData.CONTEXT_MODE_PARAM, API_CONTEXT))) {
// These are transient values that shouldn't be persisted to gateway cluster state or snapshot
Expand Down
Expand Up @@ -74,33 +74,33 @@ public static class Request extends MasterNodeRequest<Request> {
private String taskId;

@Nullable
private String action;
private String taskName;

private PersistentTaskParams params;

public Request() {

}

public Request(String taskId, String action, PersistentTaskParams params) {
public Request(String taskId, String taskName, PersistentTaskParams params) {
this.taskId = taskId;
this.action = action;
this.taskName = taskName;
this.params = params;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
taskId = in.readString();
action = in.readString();
taskName = in.readString();
params = in.readOptionalNamedWriteable(PersistentTaskParams.class);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(taskId);
out.writeString(action);
out.writeString(taskName);
out.writeOptionalNamedWriteable(params);
}

Expand All @@ -110,9 +110,15 @@ public ActionRequestValidationException validate() {
if (this.taskId == null) {
validationException = addValidationError("task id must be specified", validationException);
}
if (this.action == null) {
if (this.taskName == null) {
validationException = addValidationError("action must be specified", validationException);
}
if (params != null) {
if (params.getWriteableName().equals(taskName) == false) {
validationException = addValidationError("params have to have the same writeable name as task. params: " +
params.getWriteableName() + " task: " + taskName, validationException);
}
}
return validationException;
}

Expand All @@ -121,21 +127,21 @@ public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Request request1 = (Request) o;
return Objects.equals(taskId, request1.taskId) && Objects.equals(action, request1.action) &&
return Objects.equals(taskId, request1.taskId) && Objects.equals(taskName, request1.taskName) &&
Objects.equals(params, request1.params);
}

@Override
public int hashCode() {
return Objects.hash(taskId, action, params);
return Objects.hash(taskId, taskName, params);
}

public String getAction() {
return action;
public String getTaskName() {
return taskName;
}

public void setAction(String action) {
this.action = action;
public void setTaskName(String taskName) {
this.taskName = taskName;
}

public String getTaskId() {
Expand Down Expand Up @@ -170,7 +176,7 @@ public RequestBuilder setTaskId(String taskId) {
}

public RequestBuilder setAction(String action) {
request.setAction(action);
request.setTaskName(action);
return this;
}

Expand Down Expand Up @@ -219,7 +225,7 @@ protected ClusterBlockException checkBlock(Request request, ClusterState state)
@Override
protected final void masterOperation(final Request request, ClusterState state,
final ActionListener<PersistentTaskResponse> listener) {
persistentTasksClusterService.createPersistentTask(request.taskId, request.action, request.params,
persistentTasksClusterService.createPersistentTask(request.taskId, request.taskName, request.params,
new ActionListener<PersistentTask<?>>() {

@Override
Expand Down
Expand Up @@ -22,7 +22,6 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
Expand All @@ -38,13 +37,14 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportResponse.Empty;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;

import java.io.IOException;
import java.util.Objects;

import static org.elasticsearch.action.ValidateActions.addValidationError;

public class UpdatePersistentTaskStatusAction extends Action<UpdatePersistentTaskStatusAction.Request,
PersistentTaskResponse,
UpdatePersistentTaskStatusAction.RequestBuilder> {
Expand All @@ -69,7 +69,7 @@ public PersistentTaskResponse newResponse() {
public static class Request extends MasterNodeRequest<Request> {

private String taskId;
private long allocationId;
private long allocationId = -1L;
private Task.Status status;

public Request() {
Expand Down Expand Up @@ -112,7 +112,16 @@ public void writeTo(StreamOutput out) throws IOException {

@Override
public ActionRequestValidationException validate() {
return null;
ActionRequestValidationException validationException = null;
if (this.taskId == null) {
validationException = addValidationError("task id must be specified", validationException);
}
if (this.allocationId == -1L) {
validationException = addValidationError("allocationId must be specified", validationException);
}
// We cannot really check if status has the same type as task because we don't have access
// to the task here. We will check it when we try to update the task
return validationException;
}

@Override
Expand Down
Expand Up @@ -36,6 +36,7 @@
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestParams;
import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor;

import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -93,7 +94,7 @@ public void testReassignConsidersClusterStateUpdates() {
addTestNodes(nodes, randomIntBetween(1, 10));
int numberOfTasks = randomIntBetween(2, 40);
for (int i = 0; i < numberOfTasks; i++) {
addTask(tasks, "should_assign", "assign_one", randomBoolean() ? null : "no_longer_exits");
addTask(tasks, "assign_one", randomBoolean() ? null : "no_longer_exits");
}

MetaData.Builder metaData = MetaData.builder(clusterState.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks.build());
Expand All @@ -117,14 +118,14 @@ public void testReassignTasks() {
switch (randomInt(2)) {
case 0:
// add an unassigned task that should get assigned because it's assigned to a non-existing node or unassigned
addTask(tasks, "should_assign", "assign_me", randomBoolean() ? null : "no_longer_exits");
addTask(tasks, "assign_me", randomBoolean() ? null : "no_longer_exits");
break;
case 1:
// add a task assigned to non-existing node that should not get assigned
addTask(tasks, "should_not_assign", "dont_assign_me", randomBoolean() ? null : "no_longer_exits");
addTask(tasks, "dont_assign_me", randomBoolean() ? null : "no_longer_exits");
break;
case 2:
addTask(tasks, "assign_one", "assign_one", randomBoolean() ? null : "no_longer_exits");
addTask(tasks, "assign_one", randomBoolean() ? null : "no_longer_exits");
break;

}
Expand All @@ -143,8 +144,8 @@ public void testReassignTasks() {

for (PersistentTask<?> task : tasksInProgress.tasks()) {
// explanation should correspond to the action name
switch (task.getTaskName()) {
case "should_assign":
switch (((TestParams) task.getParams()).getTestParam()) {
case "assign_me":
assertThat(task.getExecutorNode(), notNullValue());
assertThat(task.isAssigned(), equalTo(true));
if (clusterState.nodes().nodeExists(task.getExecutorNode()) == false) {
Expand All @@ -154,7 +155,7 @@ public void testReassignTasks() {
clusterState.nodes().nodeExists(task.getExecutorNode()), equalTo(true));
assertThat(task.getAssignment().getExplanation(), equalTo("test assignment"));
break;
case "should_not_assign":
case "dont_assign_me":
assertThat(task.getExecutorNode(), nullValue());
assertThat(task.isAssigned(), equalTo(false));
assertThat(task.getAssignment().getExplanation(), equalTo("no appropriate nodes found for the assignment"));
Expand Down Expand Up @@ -210,7 +211,9 @@ public <Params extends PersistentTaskParams> Assignment getAssignment(
private Assignment assignOnlyOneTaskAtATime(ClusterState clusterState) {
DiscoveryNodes nodes = clusterState.nodes();
PersistentTasksCustomMetaData tasksInProgress = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
if (tasksInProgress.findTasks("assign_one", task -> nodes.nodeExists(task.getExecutorNode())).isEmpty()) {
if (tasksInProgress.findTasks(TestPersistentTasksExecutor.NAME, task ->
"assign_one".equals(((TestParams) task.getParams()).getTestParam()) &&
nodes.nodeExists(task.getExecutorNode())).isEmpty()) {
return randomNodeAssignment(clusterState.nodes());
} else {
return new Assignment(null, "only one task can be assigned at a time");
Expand Down Expand Up @@ -404,11 +407,12 @@ private ClusterState.Builder addRandomTask(ClusterState.Builder clusterStateBuil
MetaData.Builder metaData, PersistentTasksCustomMetaData.Builder tasks,
Assignment assignment, String param) {
return clusterStateBuilder.metaData(metaData.putCustom(PersistentTasksCustomMetaData.TYPE,
tasks.addTask(UUIDs.base64UUID(), randomAlphaOfLength(10), new TestParams(param), assignment).build()));
tasks.addTask(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, new TestParams(param), assignment).build()));
}

private void addTask(PersistentTasksCustomMetaData.Builder tasks, String action, String param, String node) {
tasks.addTask(UUIDs.base64UUID(), action, new TestParams(param), new Assignment(node, "explanation: " + action));
private void addTask(PersistentTasksCustomMetaData.Builder tasks, String param, String node) {
tasks.addTask(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, new TestParams(param),
new Assignment(node, "explanation: " + param));
}

private DiscoveryNode newNode(String nodeId) {
Expand Down

0 comments on commit dd363a1

Please sign in to comment.