Skip to content

Commit

Permalink
Persistent Tasks: PersistentTaskRequest -> PersistTaskParams (#1057)
Browse files Browse the repository at this point in the history
Removes the last pieces of ActionRequest from PersistentTaskRequest and renames it into PersistTaskParams, which is now just an interface that extends NamedWriteable and ToXContent.
  • Loading branch information
imotov authored and martijnvg committed Feb 5, 2018
1 parent 8db353b commit a093188
Show file tree
Hide file tree
Showing 17 changed files with 300 additions and 251 deletions.
Expand Up @@ -18,6 +18,7 @@
*/
package org.elasticsearch.persistent;

import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.threadpool.ThreadPool;

Expand All @@ -33,10 +34,10 @@ public NodePersistentTasksExecutor(ThreadPool threadPool) {
this.threadPool = threadPool;
}

public <Request extends PersistentTaskRequest> void executeTask(Request request,
AllocatedPersistentTask task,
PersistentTasksExecutor<Request> action) {
threadPool.executor(action.getExecutor()).execute(new AbstractRunnable() {
public <Params extends PersistentTaskParams> void executeTask(@Nullable Params params,
AllocatedPersistentTask task,
PersistentTasksExecutor<Params> executor) {
threadPool.executor(executor.getExecutor()).execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
task.markAsFailed(e);
Expand All @@ -46,7 +47,7 @@ public void onFailure(Exception e) {
@Override
protected void doRun() throws Exception {
try {
action.nodeOperation(task, request);
executor.nodeOperation(task, params);
} catch (Exception ex) {
task.markAsFailed(ex);
}
Expand Down
Expand Up @@ -16,20 +16,15 @@
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.persistent;

import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.common.io.stream.NamedWriteable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;

/**
* Base class for a request for a persistent task
* Parameters used to start persistent task
*/
public abstract class PersistentTaskRequest extends ActionRequest implements NamedWriteable, ToXContent {
@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId) {
return new AllocatedPersistentTask(id, type, action, getDescription(), parentTaskId);
}
public interface PersistentTaskParams extends NamedWriteable, ToXContent {

}
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.Task;
Expand Down Expand Up @@ -58,22 +59,22 @@ public PersistentTasksClusterService(Settings settings, PersistentTasksExecutorR
* Creates a new persistent task on master node
*
* @param action the action name
* @param request request
* @param params params
* @param listener the listener that will be called when task is started
*/
public <Request extends PersistentTaskRequest> void createPersistentTask(String taskId, String action, Request request,
ActionListener<PersistentTask<?>> listener) {
public <Params extends PersistentTaskParams> void createPersistentTask(String taskId, String action, @Nullable Params params,
ActionListener<PersistentTask<?>> listener) {
clusterService.submitStateUpdateTask("create persistent task", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
PersistentTasksCustomMetaData.Builder builder = builder(currentState);
if (builder.hasTask(taskId)) {
throw new ResourceAlreadyExistsException("task with id {" + taskId + "} already exist");
}
validate(action, clusterService.state(), request);
validate(action, clusterService.state(), params);
final Assignment assignment;
assignment = getAssignement(action, currentState, request);
return update(currentState, builder.addTask(taskId, action, request, assignment));
assignment = getAssignement(action, currentState, params);
return update(currentState, builder.addTask(taskId, action, params, assignment));
}

@Override
Expand Down Expand Up @@ -205,14 +206,15 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
});
}

private <Request extends PersistentTaskRequest> Assignment getAssignement(String taskName, ClusterState currentState, Request request) {
PersistentTasksExecutor<Request> persistentTasksExecutor = registry.getPersistentTaskExecutorSafe(taskName);
return persistentTasksExecutor.getAssignment(request, currentState);
private <Params extends PersistentTaskParams> Assignment getAssignement(String taskName, ClusterState currentState,
@Nullable Params params) {
PersistentTasksExecutor<Params> persistentTasksExecutor = registry.getPersistentTaskExecutorSafe(taskName);
return persistentTasksExecutor.getAssignment(params, currentState);
}

private <Request extends PersistentTaskRequest> void validate(String taskName, ClusterState currentState, Request request) {
PersistentTasksExecutor<Request> persistentTasksExecutor = registry.getPersistentTaskExecutorSafe(taskName);
persistentTasksExecutor.validate(request, currentState);
private <Params extends PersistentTaskParams> void validate(String taskName, ClusterState currentState, @Nullable Params params) {
PersistentTasksExecutor<Params> persistentTasksExecutor = registry.getPersistentTaskExecutorSafe(taskName);
persistentTasksExecutor.validate(params, currentState);
}

@Override
Expand All @@ -229,7 +231,7 @@ public void clusterChanged(ClusterChangedEvent event) {
}

interface ExecutorNodeDecider {
<Request extends PersistentTaskRequest> Assignment getAssignment(String action, ClusterState currentState, Request request);
<Params extends PersistentTaskParams> Assignment getAssignment(String action, ClusterState currentState, Params params);
}

static boolean reassignmentRequired(ClusterChangedEvent event, ExecutorNodeDecider decider) {
Expand All @@ -245,7 +247,7 @@ static boolean reassignmentRequired(ClusterChangedEvent event, ExecutorNodeDecid
if (taskInProgress.needsReassignment(event.state().nodes())) {
// there is an unassigned task or task with a disappeared node - we need to try assigning it
if (Objects.equals(taskInProgress.getAssignment(),
decider.getAssignment(taskInProgress.getTaskName(), event.state(), taskInProgress.getRequest())) == false) {
decider.getAssignment(taskInProgress.getTaskName(), event.state(), taskInProgress.getParams())) == false) {
// it looks like a assignment for at least one task is possible - let's trigger reassignment
reassignmentRequired = true;
break;
Expand Down Expand Up @@ -290,7 +292,7 @@ static ClusterState reassignTasks(ClusterState currentState, Logger logger, Exec
for (PersistentTask<?> task : tasks.tasks()) {
if (task.needsReassignment(nodes)) {
// there is an unassigned task - we need to try assigning it
Assignment assignment = decider.getAssignment(task.getTaskName(), clusterState, task.getRequest());
Assignment assignment = decider.getAssignment(task.getTaskName(), clusterState, task.getParams());
if (Objects.equals(assignment, task.getAssignment()) == false) {
logger.trace("reassigning task {} from node {} to node {}", task.getId(),
task.getAssignment().getExecutorNode(), assignment.getExecutorNode());
Expand Down

0 comments on commit a093188

Please sign in to comment.