Skip to content

Commit

Permalink
Fix using same TE for similar workers (#599)
Browse files Browse the repository at this point in the history
  • Loading branch information
fdc-ntflx committed Dec 12, 2023
1 parent 6025b7b commit d6aca28
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -259,14 +259,14 @@ public Optional<BestFit> findBestFit(TaskExecutorBatchAssignmentRequest request)
}

boolean noResourcesAvailable = false;
final Map<TaskExecutorAllocationRequest, Pair<TaskExecutorID, TaskExecutorState>> bestFit = new HashMap<>();
final BestFit bestFit = new BestFit();
final boolean isJobIdAlreadyPending = pendingJobRequests.getIfPresent(request.getJobId()) != null;

for (Entry<MachineDefinition, List<TaskExecutorAllocationRequest>> entry : request.getGroupedByMachineDef().entrySet()) {
final MachineDefinition machineDefinition = entry.getKey();
final List<TaskExecutorAllocationRequest> allocationRequests = entry.getValue();

Optional<Map<TaskExecutorID, TaskExecutorState>> taskExecutors = findTaskExecutorsFor(request, machineDefinition, allocationRequests, isJobIdAlreadyPending);
Optional<Map<TaskExecutorID, TaskExecutorState>> taskExecutors = findTaskExecutorsFor(request, machineDefinition, allocationRequests, isJobIdAlreadyPending, bestFit);

// Mark noResourcesAvailable if we can't find enough TEs for a given machine def
if (!taskExecutors.isPresent()) {
Expand All @@ -277,7 +277,7 @@ public Optional<BestFit> findBestFit(TaskExecutorBatchAssignmentRequest request)
// Map each TE to a given allocation request
int index = 0;
for (Entry<TaskExecutorID, TaskExecutorState> taskToStateEntry : taskExecutors.get().entrySet()) {
bestFit.put(allocationRequests.get(index), Pair.of(taskToStateEntry.getKey(), taskToStateEntry.getValue()));
bestFit.add(allocationRequests.get(index), Pair.of(taskToStateEntry.getKey(), taskToStateEntry.getValue()));
index++;
}
}
Expand All @@ -287,12 +287,12 @@ public Optional<BestFit> findBestFit(TaskExecutorBatchAssignmentRequest request)
return Optional.empty();
} else {
// Return best fit only if there are enough available TEs for all machine def
return Optional.of(new BestFit(bestFit));
return Optional.of(bestFit);
}

}

private Optional<Map<TaskExecutorID, TaskExecutorState>> findBestFitFor(TaskExecutorBatchAssignmentRequest request, MachineDefinition machineDefinition, Integer numWorkers) {
private Optional<Map<TaskExecutorID, TaskExecutorState>> findBestFitFor(TaskExecutorBatchAssignmentRequest request, MachineDefinition machineDefinition, Integer numWorkers, BestFit currentBestFit) {
// only allow allocation in the lowest CPU cores matching group.
SortedMap<Double, NavigableSet<TaskExecutorHolder>> targetMap =
this.executorByCores.tailMap(machineDefinition.getCpuCores());
Expand Down Expand Up @@ -326,6 +326,10 @@ private Optional<Map<TaskExecutorID, TaskExecutorState>> findBestFitFor(TaskExec
return false;
}

if (currentBestFit.contains(teHolder.getId())) {
return false;
}

TaskExecutorState st = this.taskExecutorStateMap.get(teHolder.getId());
return st.isAvailable() &&
st.getRegistration() != null &&
Expand Down Expand Up @@ -433,10 +437,10 @@ private int getPendingCountyByCores(Double cores) {
.orElse(0);
}

private Optional<Map<TaskExecutorID, TaskExecutorState>> findTaskExecutorsFor(TaskExecutorBatchAssignmentRequest request, MachineDefinition machineDefinition, List<TaskExecutorAllocationRequest> allocationRequests, boolean isJobIdAlreadyPending) {
private Optional<Map<TaskExecutorID, TaskExecutorState>> findTaskExecutorsFor(TaskExecutorBatchAssignmentRequest request, MachineDefinition machineDefinition, List<TaskExecutorAllocationRequest> allocationRequests, boolean isJobIdAlreadyPending, BestFit currentBestFit) {
// Finds best fit for N workers of the same machine def
final Optional<Map<TaskExecutorID, TaskExecutorState>> taskExecutors = findBestFitFor(
request, machineDefinition, allocationRequests.size());
request, machineDefinition, allocationRequests.size(), currentBestFit);

// Verify that the number of task executors returned matches the asked
if (taskExecutors.isPresent() && taskExecutors.get().size() == allocationRequests.size()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1042,9 +1042,23 @@ static class GetJobArtifactsToCacheRequest {


@Value
@Builder
static class BestFit {
Map<TaskExecutorAllocationRequest, Pair<TaskExecutorID, TaskExecutorState>> bestFit;
Set<TaskExecutorID> taskExecutorIDSet;

public BestFit() {
this.bestFit = new HashMap<>();
this.taskExecutorIDSet = new HashSet<>();
}

public void add(TaskExecutorAllocationRequest request, Pair<TaskExecutorID, TaskExecutorState> taskExecutorStatePair) {
bestFit.put(request, taskExecutorStatePair);
taskExecutorIDSet.add(taskExecutorStatePair.getLeft());
}

public boolean contains(TaskExecutorID taskExecutorID) {
return taskExecutorIDSet.contains(taskExecutorID);
}

public Map<TaskExecutorAllocationRequest, TaskExecutorID> getRequestToTaskExecutorMap() {
return bestFit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@
import java.time.Clock;
import java.time.Instant;
import java.time.ZoneId;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
Expand Down Expand Up @@ -83,6 +85,10 @@ public class ExecutorStateManagerTests {

private static final MachineDefinition MACHINE_DEFINITION_2 =
new MachineDefinition(4.0, 2.0, 3.0, 4.0, 5);

private static final MachineDefinition MACHINE_DEFINITION_3 =
new MachineDefinition(2.0, 2.0, 3.0, 4.0, 5);

private static final Map<String, String> ATTRIBUTES =
ImmutableMap.of("attr1", "attr2");

Expand Down Expand Up @@ -353,4 +359,39 @@ private TaskExecutorState registerNewTaskExecutor(TaskExecutorID id, MachineDefi

return state;
}

@Test
public void testGetBestFit_WithDifferentResourcesSameSku() {
registerNewTaskExecutor(TASK_EXECUTOR_ID_1,
MACHINE_DEFINITION_2,
ATTRIBUTES_WITH_SCALE_GROUP_2,
stateManager);

// should get te1 with group2
Optional<BestFit> bestFitO =
stateManager.findBestFit(
new TaskExecutorBatchAssignmentRequest(
new HashSet<>(Arrays.asList(
TaskExecutorAllocationRequest.of(WORKER_ID, MACHINE_DEFINITION_2, null, 0),
TaskExecutorAllocationRequest.of(WORKER_ID, MACHINE_DEFINITION_1, null, 1))),
CLUSTER_ID));

assertFalse(bestFitO.isPresent());

registerNewTaskExecutor(TASK_EXECUTOR_ID_2,
MACHINE_DEFINITION_2,
ATTRIBUTES_WITH_SCALE_GROUP_1,
stateManager);

bestFitO =
stateManager.findBestFit(
new TaskExecutorBatchAssignmentRequest(
new HashSet<>(Arrays.asList(
TaskExecutorAllocationRequest.of(WORKER_ID, MACHINE_DEFINITION_2, null, 0),
TaskExecutorAllocationRequest.of(WORKER_ID, MACHINE_DEFINITION_1, null, 1))),
CLUSTER_ID));

assertTrue(bestFitO.isPresent());
assertEquals(new HashSet<>(Arrays.asList(TASK_EXECUTOR_ID_1, TASK_EXECUTOR_ID_2)), bestFitO.get().getTaskExecutorIDSet());
}
}

0 comments on commit d6aca28

Please sign in to comment.