Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

Commit

Permalink
Added the 'reason' to the /pendingTasks endpoint
Browse files Browse the repository at this point in the history
Bugs closed: AURORA-1762

Reviewed at https://reviews.apache.org/r/51993/
  • Loading branch information
pradykaushik authored and jcohen committed Nov 28, 2016
1 parent d56f8c6 commit 8e07b04
Show file tree
Hide file tree
Showing 9 changed files with 332 additions and 11 deletions.
1 change: 0 additions & 1 deletion config/legacy_untested_classes.txt
Expand Up @@ -25,7 +25,6 @@ org/apache/aurora/scheduler/http/Offers$2
org/apache/aurora/scheduler/http/Offers$3
org/apache/aurora/scheduler/http/Offers$4
org/apache/aurora/scheduler/http/Offers$5
org/apache/aurora/scheduler/http/PendingTasks
org/apache/aurora/scheduler/http/Quotas
org/apache/aurora/scheduler/http/Quotas$1
org/apache/aurora/scheduler/http/Quotas$2
Expand Down
33 changes: 30 additions & 3 deletions src/main/java/org/apache/aurora/scheduler/http/PendingTasks.java
Expand Up @@ -13,6 +13,10 @@
*/
package org.apache.aurora.scheduler.http;

import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;

import javax.inject.Inject;
Expand All @@ -22,7 +26,11 @@
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;

import org.apache.aurora.scheduler.metadata.NearestFit;
import org.apache.aurora.scheduler.scheduling.TaskGroups;
import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.node.ObjectNode;

/**
* Servlet that exposes detailed information about tasks that are pending.
Expand All @@ -31,10 +39,12 @@
public class PendingTasks {

private final TaskGroups taskGroups;
private final NearestFit nearestFit;

@Inject
PendingTasks(TaskGroups taskGroups) {
PendingTasks(TaskGroups taskGroups, NearestFit nearestFit) {
this.taskGroups = Objects.requireNonNull(taskGroups);
this.nearestFit = Objects.requireNonNull(nearestFit);
}

/**
Expand All @@ -44,7 +54,24 @@ public class PendingTasks {
*/
@GET
@Produces(MediaType.APPLICATION_JSON)
public Response getOffers() {
return Response.ok(taskGroups.getGroups()).build();
public Response getOffers() throws IOException {
// Adding reason, received from NearestFit#getPendingReasons() to the JSON Object.
Map<String, List<String>> taskGroupReasonMap =
nearestFit.getPendingReasons(taskGroups.getGroups());
// Adding the attribute "reason" to each of the JSON Objects in the JsonNode.
ObjectMapper mapper = new ObjectMapper();
JsonNode jsonNode = mapper.valueToTree(taskGroups.getGroups());
Iterator<JsonNode> jsonNodeIterator = jsonNode.iterator();

while (jsonNodeIterator.hasNext()) {
JsonNode pendingTask = jsonNodeIterator.next();

// Retrieving the reasons corresponding to this pendingTask.
List<String> reasons = taskGroupReasonMap.get(pendingTask.get("name").asText());
// Adding the reasons corresponding to the pendingTask.
((ObjectNode) pendingTask).put("reason", reasons.toString());
}
return Response.ok(jsonNode).build();
}

}
24 changes: 22 additions & 2 deletions src/main/java/org/apache/aurora/scheduler/metadata/NearestFit.java
Expand Up @@ -13,8 +13,13 @@
*/
package org.apache.aurora.scheduler.metadata;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import javax.inject.Inject;

Expand All @@ -28,6 +33,7 @@
import com.google.common.collect.Iterables;
import com.google.common.eventbus.Subscribe;

import org.apache.aurora.GuavaUtils;
import org.apache.aurora.common.quantity.Amount;
import org.apache.aurora.common.quantity.Time;
import org.apache.aurora.gen.ScheduleStatus;
Expand All @@ -38,6 +44,7 @@
import org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted;
import org.apache.aurora.scheduler.events.PubsubEvent.Vetoed;
import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
import org.apache.aurora.scheduler.scheduling.TaskGroup;

/**
* Tracks vetoes against scheduling decisions and maintains the closest fit among all the vetoes
Expand All @@ -52,8 +59,7 @@ public class NearestFit implements EventSubscriber {

private final LoadingCache<TaskGroupKey, Fit> fitByGroupKey;

@VisibleForTesting
NearestFit(Ticker ticker) {
public NearestFit(Ticker ticker) {
fitByGroupKey = CacheBuilder.newBuilder()
.expireAfterWrite(EXPIRATION.getValue(), EXPIRATION.getUnit().getTimeUnit())
.ticker(ticker)
Expand Down Expand Up @@ -118,6 +124,20 @@ public synchronized void vetoed(Vetoed vetoEvent) {
fitByGroupKey.getUnchecked(vetoEvent.getGroupKey()).maybeUpdate(vetoEvent.getVetoes());
}

/**
* Determine the pending reason, for each of the given tasks in taskGroups.
*
* @param taskGroups Group of pending tasks.
* @return A map with key=String (the taskgroup key) and value=List of reasons.
*/
public synchronized Map<String, List<String>> getPendingReasons(Iterable<TaskGroup> taskGroups) {
return StreamSupport.stream(taskGroups.spliterator(), false).map(t -> {
List<String> reasons = getNearestFit(t.getKey()).stream()
.map(Veto::getReason).collect(Collectors.toList());
return new HashMap.SimpleEntry<>(t.getKey().toString(), reasons);
}).collect(GuavaUtils.toImmutableMap(Map.Entry::getKey, Map.Entry::getValue));
}

private static class Fit {
private ImmutableSet<Veto> vetoes;

Expand Down
Expand Up @@ -17,29 +17,34 @@
import java.util.Queue;
import java.util.Set;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;

import org.apache.aurora.scheduler.base.TaskGroupKey;
import org.codehaus.jackson.annotate.JsonIgnore;

import static org.apache.aurora.GuavaUtils.toImmutableSet;

/**
* A group of task IDs that are eligible for scheduling, but may be waiting for a backoff to expire.
*/
class TaskGroup {
public class TaskGroup {
private final TaskGroupKey key;
private long penaltyMs;
private final Queue<String> tasks;

TaskGroup(TaskGroupKey key, String initialTaskId) {
@VisibleForTesting
public TaskGroup(TaskGroupKey key, String initialTaskId) {
this.key = key;
this.penaltyMs = 0;
this.tasks = Lists.newLinkedList();
this.tasks.add(initialTaskId);
}

synchronized TaskGroupKey getKey() {
// This class is serialized by the PendingTasks endpoint, but the key is exposed via getName().
@JsonIgnore
public synchronized TaskGroupKey getKey() {
return key;
}

Expand Down Expand Up @@ -76,4 +81,5 @@ public synchronized Set<String> getTaskIds() {
public synchronized long getPenaltyMs() {
return penaltyMs;
}

}
Expand Up @@ -126,8 +126,9 @@ public TaskGroupsSettings(
}
}

@VisibleForTesting
@Inject
TaskGroups(
public TaskGroups(
@AsyncExecutor DelayExecutor executor,
TaskGroupsSettings settings,
TaskScheduler taskScheduler,
Expand Down
Expand Up @@ -152,5 +152,4 @@ public void testOneOffer() throws Exception {
private static List toList(String json) {
return new Gson().fromJson(json, List.class);
}

}
172 changes: 172 additions & 0 deletions src/test/java/org/apache/aurora/scheduler/http/PendingTasksTest.java
@@ -0,0 +1,172 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.aurora.scheduler.http;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

import com.google.common.collect.ImmutableSet;

import org.apache.aurora.common.testing.easymock.EasyMockTest;
import org.apache.aurora.common.util.testing.FakeTicker;
import org.apache.aurora.gen.JobKey;
import org.apache.aurora.gen.ScheduleStatus;
import org.apache.aurora.scheduler.base.TaskGroupKey;
import org.apache.aurora.scheduler.events.PubsubEvent;
import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
import org.apache.aurora.scheduler.metadata.NearestFit;
import org.apache.aurora.scheduler.scheduling.TaskGroup;
import org.apache.aurora.scheduler.scheduling.TaskGroups;
import org.apache.aurora.scheduler.storage.entities.IJobKey;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.node.ArrayNode;
import org.codehaus.jackson.node.ObjectNode;
import org.junit.Before;
import org.junit.Test;

import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.expectLastCall;
import static org.easymock.EasyMock.replay;
import static org.junit.Assert.assertEquals;

public class PendingTasksTest extends EasyMockTest {

private TaskGroups pendingTaskGroups;
private NearestFit nearestFit;

@Before
public void setUp() {
pendingTaskGroups = createMock(TaskGroups.class);
nearestFit = new NearestFit(new FakeTicker());
}

/**
* Create a {@link JsonNode} object to mimic the response.
*
* @param penaltyMs
* @param taskIds
* @param name
* @param reasons
* @return Json node for pending tasks whose values are initialized to the provided values.
* @throws IOException
*/
private JsonNode getMimicResponseJson(
long penaltyMs, String[] taskIds, String name, List<String> reasons) throws IOException {
ObjectMapper mapper = new ObjectMapper();
ObjectNode mutablePendingTaskJson = mapper.createObjectNode();
// Adding the key=value pairs to mutablePendingTaskJson.
mutablePendingTaskJson.put("penaltyMs", penaltyMs);
mutablePendingTaskJson.putArray("taskIds");
for (String taskId : taskIds) {
((ArrayNode) mutablePendingTaskJson.get("taskIds")).add(taskId);
}
mutablePendingTaskJson.put("name", name);
mutablePendingTaskJson.put("reason", reasons.toString());
return mutablePendingTaskJson;
}

@Test
public void testNoOffers() throws IOException {
// Making a task that is not in PENDING state.
IJobKey jobKey = IJobKey.build(new JobKey("role", "test", "nonPendingJob"));
IScheduledTask task = TestUtils.makeTask(jobKey, "task0", 0,
ScheduleStatus.ASSIGNED, 10, 10, 10);

PubsubEvent.TaskStateChange taskStateChange = PubsubEvent.TaskStateChange.transition(
task, ScheduleStatus.INIT);

pendingTaskGroups.taskChangedState(taskStateChange);
expectLastCall();

// Recording the return value of pendingTaskGroups.getGroups().
List<TaskGroup> taskGroupList = new ArrayList<>();
expect(pendingTaskGroups.getGroups()).andReturn(taskGroupList).anyTimes();

replay(pendingTaskGroups);

// Testing.
pendingTaskGroups.taskChangedState(taskStateChange);
PendingTasks pendingTasks = new PendingTasks(pendingTaskGroups, nearestFit);
JsonNode mimicResponseNoPendingTaskJson = new ObjectMapper().createArrayNode();
JsonNode actualResponseJson = new ObjectMapper().valueToTree(
pendingTasks.getOffers().getEntity());
assertEquals(mimicResponseNoPendingTaskJson, actualResponseJson);
}

@Test
public void testOffers() throws IOException {
// Making pending tasks.
IJobKey jobKey0 = IJobKey.build(new JobKey("role", "test", "jobA"));
IJobKey jobKey1 = IJobKey.build(new JobKey("role", "test", "jobB"));
IScheduledTask task0 = TestUtils.makeTask(jobKey0, "task0", 0,
ScheduleStatus.PENDING, 1000, 1000000, 10);
IScheduledTask task1 = TestUtils.makeTask(jobKey1, "task1", 0,
ScheduleStatus.PENDING, 1000, 10, 1000000);

PubsubEvent.TaskStateChange taskStateChange0 = PubsubEvent.TaskStateChange.transition(
task0, ScheduleStatus.INIT);
PubsubEvent.TaskStateChange taskStateChange1 = PubsubEvent.TaskStateChange.transition(
task1, ScheduleStatus.INIT);

pendingTaskGroups.taskChangedState(taskStateChange0);
pendingTaskGroups.taskChangedState(taskStateChange1);
expectLastCall();

// Recording the return value of pendingTaskGroups.getGroups().
TaskGroupKey taskGroupKey0 = TaskGroupKey.from(task0.getAssignedTask().getTask());
TaskGroupKey taskGroupKey1 = TaskGroupKey.from(task1.getAssignedTask().getTask());
TaskGroup taskGroup0 = new TaskGroup(taskGroupKey0, "task0");
TaskGroup taskGroup1 = new TaskGroup(taskGroupKey1, "task1");
List<TaskGroup> taskGroupList = new ArrayList<>();
taskGroupList.add(taskGroup0);
taskGroupList.add(taskGroup1);
expect(pendingTaskGroups.getGroups()).andReturn(taskGroupList).anyTimes();

// Creating vetoes for CPU and RAM, corresponding to task0.
ImmutableSet<Veto> vetoes = ImmutableSet.<Veto>builder()
.add(Veto.insufficientResources("CPU", 1))
.add(Veto.insufficientResources("RAM", 1)).build();
nearestFit.vetoed(new PubsubEvent.Vetoed(taskGroupKey0, vetoes));
// Creating vetoes for CPU and DISK, corresponding to task1.
ImmutableSet<Veto> vetoes1 = ImmutableSet.<Veto>builder()
.add(Veto.insufficientResources("CPU", 1))
.add(Veto.insufficientResources("DISK", 1)).build();
nearestFit.vetoed(new PubsubEvent.Vetoed(taskGroupKey1, vetoes1));
replay(pendingTaskGroups);

// Testing.
pendingTaskGroups.taskChangedState(taskStateChange0);
pendingTaskGroups.taskChangedState(taskStateChange1);
PendingTasks pendingTasks0 = new PendingTasks(pendingTaskGroups, nearestFit);
String[] taskIds0 = {"task0"};
String[] taskIds1 = {"task1"};
String[] reasonsArr0 = {"Insufficient: CPU", "Insufficient: RAM"};
String[] reasonsArr1 = {"Insufficient: CPU", "Insufficient: DISK"};
List<String> reasons0 = Arrays.stream(reasonsArr0).collect(Collectors.toList());
List<String> reasons1 = Arrays.stream(reasonsArr1).collect(Collectors.toList());
JsonNode mimicResponseTwoPendingTasksJson = new ObjectMapper().createArrayNode();
JsonNode mimicJson0 = getMimicResponseJson(0, taskIds0, "role/test/jobA", reasons0);
JsonNode mimicJson1 = getMimicResponseJson(0, taskIds1, "role/test/jobB", reasons1);
((ArrayNode) mimicResponseTwoPendingTasksJson).add(mimicJson0);
((ArrayNode) mimicResponseTwoPendingTasksJson).add(mimicJson1);
JsonNode actualResponseJson = new ObjectMapper().valueToTree(
pendingTasks0.getOffers().getEntity());
assertEquals(mimicResponseTwoPendingTasksJson, actualResponseJson);
}
}

0 comments on commit 8e07b04

Please sign in to comment.