Skip to content

Commit

Permalink
Trogdor: Add Task State filter to /coordinator/tasks endpoint (#5907)
Browse files Browse the repository at this point in the history
Reviewers: Colin McCabe <cmccabe@apache.org>
  • Loading branch information
stanislavkozlovski authored and cmccabe committed Nov 27, 2018
1 parent d0ed389 commit 7fadf0a
Show file tree
Hide file tree
Showing 9 changed files with 146 additions and 65 deletions.
2 changes: 1 addition & 1 deletion checkstyle/suppressions.xml
Expand Up @@ -57,7 +57,7 @@
files="AbstractRequest.java|KerberosLogin.java|WorkerSinkTaskTest.java|TransactionManagerTest.java"/>

<suppress checks="NPathComplexity"
files="(BufferPool|Fetcher|MetricName|Node|ConfigDef|RecordBatch|SslFactory|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|TokenInformation|Agent|Values|PluginUtils|MiniTrogdorCluster).java"/>
files="(BufferPool|Fetcher|MetricName|Node|ConfigDef|RecordBatch|SslFactory|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|TokenInformation|Agent|Values|PluginUtils|MiniTrogdorCluster|TasksRequest).java"/>

<!-- clients tests -->
<suppress checks="ClassDataAbstractionCoupling"
Expand Down
Expand Up @@ -42,6 +42,8 @@
import javax.ws.rs.NotFoundException;
import javax.ws.rs.core.UriBuilder;

import java.util.Optional;

import static net.sourceforge.argparse4j.impl.Arguments.store;
import static net.sourceforge.argparse4j.impl.Arguments.storeTrue;

Expand Down Expand Up @@ -244,7 +246,7 @@ public static void main(String[] args) throws Exception {
} else if (res.getBoolean("show_tasks")) {
System.out.println("Got coordinator tasks: " +
JsonUtil.toPrettyJsonString(client.tasks(
new TasksRequest(null, 0, 0, 0, 0))));
new TasksRequest(null, 0, 0, 0, 0, Optional.empty()))));
} else if (res.getString("show_task") != null) {
String taskId = res.getString("show_task");
TaskRequest req = new TaskRequest(res.getString("show_task"));
Expand Down
Expand Up @@ -23,24 +23,27 @@
import org.apache.kafka.trogdor.rest.Empty;
import org.apache.kafka.trogdor.rest.StopTaskRequest;
import org.apache.kafka.trogdor.rest.TaskRequest;
import org.apache.kafka.trogdor.rest.TasksRequest;
import org.apache.kafka.trogdor.rest.TaskState;
import org.apache.kafka.trogdor.rest.TaskStateType;
import org.apache.kafka.trogdor.rest.TasksRequest;
import org.apache.kafka.trogdor.rest.TasksResponse;

import javax.servlet.ServletContext;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.NotFoundException;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.PathParam;
import javax.ws.rs.NotFoundException;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;

/**
Expand Down Expand Up @@ -96,13 +99,25 @@ public Empty destroyTask(@DefaultValue("") @QueryParam("taskId") String taskId)
}

@GET
@Path("/tasks")
public TasksResponse tasks(@QueryParam("taskId") List<String> taskId,
@Path("/tasks/")
public Response tasks(@QueryParam("taskId") List<String> taskId,
@DefaultValue("0") @QueryParam("firstStartMs") long firstStartMs,
@DefaultValue("0") @QueryParam("lastStartMs") long lastStartMs,
@DefaultValue("0") @QueryParam("firstEndMs") long firstEndMs,
@DefaultValue("0") @QueryParam("lastEndMs") long lastEndMs) throws Throwable {
return coordinator().tasks(new TasksRequest(taskId, firstStartMs, lastStartMs, firstEndMs, lastEndMs));
@DefaultValue("0") @QueryParam("lastEndMs") long lastEndMs,
@DefaultValue("") @QueryParam("state") String state) throws Throwable {
boolean isEmptyState = state.equals("");
if (!isEmptyState && !TaskStateType.Constants.VALUES.contains(state)) {
return Response.status(400).entity(
String.format("State %s is invalid. Must be one of %s",
state, TaskStateType.Constants.VALUES)
).build();
}

Optional<TaskStateType> givenState = Optional.ofNullable(isEmptyState ? null : TaskStateType.valueOf(state));
TasksResponse resp = coordinator().tasks(new TasksRequest(taskId, firstStartMs, lastStartMs, firstEndMs, lastEndMs, givenState));

return Response.status(200).entity(resp).build();
}

@GET
Expand Down
Expand Up @@ -32,11 +32,12 @@
import org.apache.kafka.trogdor.rest.RequestConflictException;
import org.apache.kafka.trogdor.rest.TaskDone;
import org.apache.kafka.trogdor.rest.TaskPending;
import org.apache.kafka.trogdor.rest.TaskRequest;
import org.apache.kafka.trogdor.rest.TaskRunning;
import org.apache.kafka.trogdor.rest.TaskState;
import org.apache.kafka.trogdor.rest.TaskStateType;
import org.apache.kafka.trogdor.rest.TaskStopping;
import org.apache.kafka.trogdor.rest.TasksRequest;
import org.apache.kafka.trogdor.rest.TaskRequest;
import org.apache.kafka.trogdor.rest.TasksResponse;
import org.apache.kafka.trogdor.rest.WorkerDone;
import org.apache.kafka.trogdor.rest.WorkerReceiving;
Expand Down Expand Up @@ -142,13 +143,6 @@ public final class TaskManager {
Utils.join(nodeManagers.keySet(), ", "));
}

enum ManagedTaskState {
PENDING,
RUNNING,
STOPPING,
DONE;
}

class ManagedTask {
/**
* The task id.
Expand All @@ -168,7 +162,7 @@ class ManagedTask {
/**
* The task state.
*/
private ManagedTaskState state;
private TaskStateType state;

/**
* The time when the task was started, or -1 if the task has not been started.
Expand Down Expand Up @@ -201,7 +195,7 @@ class ManagedTask {
*/
private String error = "";

ManagedTask(String id, TaskSpec spec, TaskController controller, ManagedTaskState state) {
ManagedTask(String id, TaskSpec spec, TaskController controller, TaskStateType state) {
this.id = id;
this.spec = spec;
this.controller = controller;
Expand Down Expand Up @@ -345,13 +339,13 @@ public Void call() throws Exception {
if (failure != null) {
log.info("Failed to create a new task {} with spec {}: {}",
id, spec, failure);
task = new ManagedTask(id, spec, null, ManagedTaskState.DONE);
task = new ManagedTask(id, spec, null, TaskStateType.DONE);
task.doneMs = time.milliseconds();
task.maybeSetError(failure);
tasks.put(id, task);
return null;
}
task = new ManagedTask(id, spec, controller, ManagedTaskState.PENDING);
task = new ManagedTask(id, spec, controller, TaskStateType.PENDING);
tasks.put(id, task);
long delayMs = task.startDelayMs(time.milliseconds());
task.startFuture = scheduler.schedule(executor, new RunTask(task), delayMs);
Expand All @@ -374,7 +368,7 @@ class RunTask implements Callable<Void> {
@Override
public Void call() throws Exception {
task.clearStartFuture();
if (task.state != ManagedTaskState.PENDING) {
if (task.state != TaskStateType.PENDING) {
log.info("Can't start task {}, because it is already in state {}.",
task.id, task.state);
return null;
Expand All @@ -385,12 +379,12 @@ public Void call() throws Exception {
} catch (Exception e) {
log.error("Unable to find nodes for task {}", task.id, e);
task.doneMs = time.milliseconds();
task.state = ManagedTaskState.DONE;
task.state = TaskStateType.DONE;
task.maybeSetError("Unable to find nodes for task: " + e.getMessage());
return null;
}
log.info("Running task {} on node(s): {}", task.id, Utils.join(nodeNames, ", "));
task.state = ManagedTaskState.RUNNING;
task.state = TaskStateType.RUNNING;
task.startedMs = time.milliseconds();
for (String workerName : nodeNames) {
long workerId = nextWorkerId++;
Expand Down Expand Up @@ -441,7 +435,7 @@ public Void call() throws Exception {
task.cancelled = true;
task.clearStartFuture();
task.doneMs = time.milliseconds();
task.state = ManagedTaskState.DONE;
task.state = TaskStateType.DONE;
log.info("Stopped pending task {}.", id);
break;
case RUNNING:
Expand All @@ -454,14 +448,14 @@ public Void call() throws Exception {
log.info("Task {} is now complete with error: {}", id, task.error);
}
task.doneMs = time.milliseconds();
task.state = ManagedTaskState.DONE;
task.state = TaskStateType.DONE;
} else {
for (Map.Entry<String, Long> entry : activeWorkerIds.entrySet()) {
nodeManagers.get(entry.getKey()).stopWorker(entry.getValue());
}
log.info("Cancelling task {} with worker(s) {}",
id, Utils.mkString(activeWorkerIds, "", "", " = ", ", "));
task.state = ManagedTaskState.STOPPING;
task.state = TaskStateType.STOPPING;
}
break;
case STOPPING:
Expand Down Expand Up @@ -586,14 +580,14 @@ private void handleWorkerCompletion(ManagedTask task, String nodeName, WorkerDon
TreeMap<String, Long> activeWorkerIds = task.activeWorkerIds();
if (activeWorkerIds.isEmpty()) {
task.doneMs = time.milliseconds();
task.state = ManagedTaskState.DONE;
task.state = TaskStateType.DONE;
log.info("{}: Task {} is now complete on {} with error: {}",
nodeName, task.id, Utils.join(task.workerIds.keySet(), ", "),
task.error.isEmpty() ? "(none)" : task.error);
} else if ((task.state == ManagedTaskState.RUNNING) && (!task.error.isEmpty())) {
} else if ((task.state == TaskStateType.RUNNING) && (!task.error.isEmpty())) {
log.info("{}: task {} stopped with error {}. Stopping worker(s): {}",
nodeName, task.id, task.error, Utils.mkString(activeWorkerIds, "{", "}", ": ", ", "));
task.state = ManagedTaskState.STOPPING;
task.state = TaskStateType.STOPPING;
for (Map.Entry<String, Long> entry : activeWorkerIds.entrySet()) {
nodeManagers.get(entry.getKey()).stopWorker(entry.getValue());
}
Expand Down Expand Up @@ -621,7 +615,7 @@ class GetTasksResponse implements Callable<TasksResponse> {
public TasksResponse call() throws Exception {
TreeMap<String, TaskState> states = new TreeMap<>();
for (ManagedTask task : tasks.values()) {
if (request.matches(task.id, task.startedMs, task.doneMs)) {
if (request.matches(task.id, task.startedMs, task.doneMs, task.state)) {
states.put(task.id, task.taskState());
}
}
Expand Down
Expand Up @@ -31,10 +31,10 @@
include = JsonTypeInfo.As.PROPERTY,
property = "state")
@JsonSubTypes({
@JsonSubTypes.Type(value = TaskPending.class, name = "PENDING"),
@JsonSubTypes.Type(value = TaskRunning.class, name = "RUNNING"),
@JsonSubTypes.Type(value = TaskStopping.class, name = "STOPPING"),
@JsonSubTypes.Type(value = TaskDone.class, name = "DONE")
@JsonSubTypes.Type(value = TaskPending.class, name = TaskStateType.Constants.PENDING_VALUE),
@JsonSubTypes.Type(value = TaskRunning.class, name = TaskStateType.Constants.RUNNING_VALUE),
@JsonSubTypes.Type(value = TaskStopping.class, name = TaskStateType.Constants.STOPPING_VALUE),
@JsonSubTypes.Type(value = TaskDone.class, name = TaskStateType.Constants.DONE_VALUE)
})
public abstract class TaskState extends Message {
private final TaskSpec spec;
Expand Down
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.trogdor.rest;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;

/**
* The types of states a single Task can be in
*/
public enum TaskStateType {
PENDING(Constants.PENDING_VALUE),
RUNNING(Constants.RUNNING_VALUE),
STOPPING(Constants.STOPPING_VALUE),
DONE(Constants.DONE_VALUE);

TaskStateType(String stateType) {}

public static class Constants {
static final String PENDING_VALUE = "PENDING";
static final String RUNNING_VALUE = "RUNNING";
static final String STOPPING_VALUE = "STOPPING";
static final String DONE_VALUE = "DONE";
public static final List<String> VALUES = Collections.unmodifiableList(
Arrays.asList(PENDING_VALUE, RUNNING_VALUE, STOPPING_VALUE, DONE_VALUE));
}
}
Expand Up @@ -23,6 +23,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;

/**
Expand Down Expand Up @@ -55,18 +56,26 @@ public class TasksRequest extends Message {
*/
private final long lastEndMs;

/**
* The desired state of the tasks.
* An empty string will match all states.
*/
private final Optional<TaskStateType> state;

@JsonCreator
public TasksRequest(@JsonProperty("taskIds") Collection<String> taskIds,
@JsonProperty("firstStartMs") long firstStartMs,
@JsonProperty("lastStartMs") long lastStartMs,
@JsonProperty("firstEndMs") long firstEndMs,
@JsonProperty("lastEndMs") long lastEndMs) {
@JsonProperty("lastEndMs") long lastEndMs,
@JsonProperty("state") Optional<TaskStateType> state) {
this.taskIds = Collections.unmodifiableSet((taskIds == null) ?
new HashSet<String>() : new HashSet<>(taskIds));
this.firstStartMs = Math.max(0, firstStartMs);
this.lastStartMs = Math.max(0, lastStartMs);
this.firstEndMs = Math.max(0, firstEndMs);
this.lastEndMs = Math.max(0, lastEndMs);
this.state = state == null ? Optional.empty() : state;
}

@JsonProperty
Expand Down Expand Up @@ -94,6 +103,11 @@ public long lastEndMs() {
return lastEndMs;
}

@JsonProperty
public Optional<TaskStateType> state() {
return state;
}

/**
* Determine if this TaskRequest should return a particular task.
*
Expand All @@ -102,7 +116,7 @@ public long lastEndMs() {
* @param endMs The task end time, or -1 if the task hasn't ended.
* @return True if information about the task should be returned.
*/
public boolean matches(String taskId, long startMs, long endMs) {
public boolean matches(String taskId, long startMs, long endMs, TaskStateType state) {
if ((!taskIds.isEmpty()) && (!taskIds.contains(taskId))) {
return false;
}
Expand All @@ -118,6 +132,11 @@ public boolean matches(String taskId, long startMs, long endMs) {
if ((lastEndMs > 0) && ((endMs < 0) || (endMs > lastEndMs))) {
return false;
}

if (this.state.isPresent() && !this.state.get().equals(state)) {
return false;
}

return true;
}
}
Expand Up @@ -34,6 +34,7 @@

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;

public class ExpectedTasks {
Expand Down Expand Up @@ -146,7 +147,7 @@ public ExpectedTasks waitFor(final CoordinatorClient client) throws InterruptedE
public boolean conditionMet() {
TasksResponse tasks = null;
try {
tasks = client.tasks(new TasksRequest(null, 0, 0, 0, 0));
tasks = client.tasks(new TasksRequest(null, 0, 0, 0, 0, Optional.empty()));
} catch (Exception e) {
log.info("Unable to get coordinator tasks", e);
throw new RuntimeException(e);
Expand Down

0 comments on commit 7fadf0a

Please sign in to comment.