Skip to content

Commit

Permalink
[7.x] Only start re-assigning persistent tasks if they are not alread…
Browse files Browse the repository at this point in the history
…y being reassigned (#76258) (#76314)

* Only start re-assigning persistent tasks if they are not already being reassigned (#76258)

* Only start re-assigning persistent tasks if they are not already being reassigned

* adding tests addressing PR comments

* addressing Pr COmments

* addressing PR comments + style"

* improving test rigor

* test improvement
  • Loading branch information
benwtrent committed Aug 10, 2021
1 parent 7a36725 commit dea66ad
Show file tree
Hide file tree
Showing 4 changed files with 264 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.io.Closeable;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

/**
Expand All @@ -55,6 +56,7 @@ public class PersistentTasksClusterService implements ClusterStateListener, Clos
private final EnableAssignmentDecider enableDecider;
private final ThreadPool threadPool;
private final PeriodicRechecker periodicRechecker;
private final AtomicBoolean reassigningTasks = new AtomicBoolean(false);

public PersistentTasksClusterService(Settings settings, PersistentTasksExecutorRegistry registry, ClusterService clusterService,
ThreadPool threadPool) {
Expand Down Expand Up @@ -353,7 +355,10 @@ public void clusterChanged(ClusterChangedEvent event) {
/**
* Submit a cluster state update to reassign any persistent tasks that need reassigning
*/
private void reassignPersistentTasks() {
void reassignPersistentTasks() {
if (this.reassigningTasks.compareAndSet(false, true) == false) {
return;
}
clusterService.submitStateUpdateTask("reassign persistent tasks", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
Expand All @@ -362,6 +367,7 @@ public ClusterState execute(ClusterState currentState) {

@Override
public void onFailure(String source, Exception e) {
reassigningTasks.set(false);
logger.warn("failed to reassign persistent tasks", e);
if (e instanceof NotMasterException == false) {
// There must be a task that's worth rechecking because there was one
Expand All @@ -373,6 +379,7 @@ public void onFailure(String source, Exception e) {

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
reassigningTasks.set(false);
if (isAnyTaskUnassigned(newState.getMetadata().custom(PersistentTasksCustomMetadata.TYPE))) {
periodicRechecker.rescheduleIfNecessary();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
Expand All @@ -71,8 +72,13 @@
import static org.hamcrest.core.Is.is;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;

public class PersistentTasksClusterServiceTests extends ESTestCase {
Expand Down Expand Up @@ -431,7 +437,7 @@ public void testPeriodicRecheck() throws Exception {
nonClusterStateCondition = false;

boolean shouldSimulateFailure = randomBoolean();
ClusterService recheckTestClusterService = createRecheckTestClusterService(clusterState, shouldSimulateFailure);
ClusterService recheckTestClusterService = createStateUpdateClusterState(clusterState, shouldSimulateFailure);
PersistentTasksClusterService service = createService(recheckTestClusterService,
(params, candidateNodes, currentState) -> assignBasedOnNonClusterStateCondition(candidateNodes));

Expand Down Expand Up @@ -479,7 +485,7 @@ public void testPeriodicRecheckOffMaster() {
ClusterState clusterState = builder.metadata(metadata).nodes(nodes).build();
nonClusterStateCondition = false;

ClusterService recheckTestClusterService = createRecheckTestClusterService(clusterState, false);
ClusterService recheckTestClusterService = createStateUpdateClusterState(clusterState, false);
PersistentTasksClusterService service = createService(recheckTestClusterService,
(params, candidateNodes, currentState) -> assignBasedOnNonClusterStateCondition(candidateNodes));

Expand Down Expand Up @@ -641,7 +647,53 @@ public void testTasksNotAssignedToShuttingDownNodes() {
Sets.haveEmptyIntersection(shutdownNodes, nodesWithTasks));
}

private ClusterService createRecheckTestClusterService(ClusterState initialState, boolean shouldSimulateFailure) {
public void testReassignOnlyOnce() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
ClusterState initialState = initialState();
ClusterState.Builder builder = ClusterState.builder(initialState);
PersistentTasksCustomMetadata.Builder tasks = PersistentTasksCustomMetadata.builder(
initialState.metadata().custom(PersistentTasksCustomMetadata.TYPE)
);
DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(initialState.nodes());
addTestNodes(nodes, randomIntBetween(1, 3));
addTask(tasks, "assign_based_on_non_cluster_state_condition", null);
Metadata.Builder metadata = Metadata.builder(initialState.metadata()).putCustom(PersistentTasksCustomMetadata.TYPE, tasks.build());
ClusterState clusterState = builder.metadata(metadata).nodes(nodes).build();

boolean shouldSimulateFailure = randomBoolean();
ClusterService recheckTestClusterService = createStateUpdateClusterState(clusterState, shouldSimulateFailure, latch);
PersistentTasksClusterService service = createService(
recheckTestClusterService,
(params, candidateNodes, currentState) -> assignBasedOnNonClusterStateCondition(candidateNodes)
);
verify(recheckTestClusterService, atLeastOnce()).getClusterSettings();
verify(recheckTestClusterService, atLeastOnce()).addListener(any());
Thread t1 = new Thread(service::reassignPersistentTasks);
Thread t2 = new Thread(service::reassignPersistentTasks);
try {
t1.start();
// Make sure we have at least one reassign check before we count down the latch
assertBusy(
() -> verify(recheckTestClusterService, atLeastOnce()).submitStateUpdateTask(eq("reassign persistent tasks"), any())
);
t2.start();
} finally {
t2.join();
latch.countDown();
t1.join();
service.reassignPersistentTasks();
}
// verify that our reassignment is possible again, here we have once from the previous reassignment in the `try` block
// And one from the line above once the other threads have joined
assertBusy(() -> verify(recheckTestClusterService, times(2)).submitStateUpdateTask(eq("reassign persistent tasks"), any()));
verifyNoMoreInteractions(recheckTestClusterService);
}

private ClusterService createStateUpdateClusterState(ClusterState initialState, boolean shouldSimulateFailure) {
return createStateUpdateClusterState(initialState, shouldSimulateFailure, null);
}

private ClusterService createStateUpdateClusterState(ClusterState initialState, boolean shouldSimulateFailure, CountDownLatch await) {
AtomicBoolean testFailureNextTime = new AtomicBoolean(shouldSimulateFailure);
AtomicReference<ClusterState> state = new AtomicReference<>(initialState);
ClusterService recheckTestClusterService = mock(ClusterService.class);
Expand All @@ -653,6 +705,9 @@ private ClusterService createRecheckTestClusterService(ClusterState initialState
ClusterStateUpdateTask task = (ClusterStateUpdateTask) invocationOnMock.getArguments()[1];
ClusterState before = state.get();
ClusterState after = task.execute(before);
if (await != null) {
await.await();
}
if (testFailureNextTime.compareAndSet(true, false)) {
task.onFailure("testing failure", new RuntimeException("foo"));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@

import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -144,7 +144,7 @@ public PersistentTasksCustomMetadata.Assignment selectNode(int dynamicMaxOpenJob
logger.debug(reason);
return new PersistentTasksCustomMetadata.Assignment(null, reason);
}
List<String> reasons = new LinkedList<>();
Map<String, String> reasons = new TreeMap<>();
long maxAvailableMemory = Long.MIN_VALUE;
DiscoveryNode minLoadedNodeByMemory = null;
for (DiscoveryNode node : candidateNodes) {
Expand All @@ -153,7 +153,7 @@ public PersistentTasksCustomMetadata.Assignment selectNode(int dynamicMaxOpenJob
String reason = nodeFilter.apply(node);
if (reason != null) {
logger.trace(reason);
reasons.add(reason);
reasons.put(node.getName(), reason);
continue;
}
NodeLoad currentLoad = nodeLoadDetector.detectNodeLoad(
Expand All @@ -167,7 +167,7 @@ public PersistentTasksCustomMetadata.Assignment selectNode(int dynamicMaxOpenJob
if (currentLoad.getError() != null) {
reason = createReason(jobId, nodeNameAndMlAttributes(node), currentLoad.getError());
logger.trace(reason);
reasons.add(reason);
reasons.put(node.getName(), reason);
continue;
}
// Assuming the node is eligible at all, check loading
Expand All @@ -181,7 +181,7 @@ public PersistentTasksCustomMetadata.Assignment selectNode(int dynamicMaxOpenJob
currentLoad.getNumAllocatingJobs(),
maxConcurrentJobAllocations);
logger.trace(reason);
reasons.add(reason);
reasons.put(node.getName(), reason);
continue;
}

Expand All @@ -193,7 +193,7 @@ public PersistentTasksCustomMetadata.Assignment selectNode(int dynamicMaxOpenJob
MAX_OPEN_JOBS_PER_NODE.getKey(),
maxNumberOfOpenJobs);
logger.trace(reason);
reasons.add(reason);
reasons.put(node.getName(), reason);
continue;
}

Expand All @@ -202,7 +202,7 @@ public PersistentTasksCustomMetadata.Assignment selectNode(int dynamicMaxOpenJob
nodeNameAndMlAttributes(node),
"This node is not providing accurate information to determine is load by memory.");
logger.trace(reason);
reasons.add(reason);
reasons.put(node.getName(),reason);
continue;
}

Expand All @@ -211,7 +211,7 @@ public PersistentTasksCustomMetadata.Assignment selectNode(int dynamicMaxOpenJob
nodeNameAndMlAttributes(node),
"This node is indicating that it has no native memory for machine learning.");
logger.trace(reason);
reasons.add(reason);
reasons.put(node.getName(),reason);
continue;
}

Expand All @@ -235,7 +235,7 @@ public PersistentTasksCustomMetadata.Assignment selectNode(int dynamicMaxOpenJob
requiredMemoryForJob,
ByteSizeValue.ofBytes(requiredMemoryForJob).toString());
logger.trace(reason);
reasons.add(reason);
reasons.put(node.getName(),reason);
continue;
}

Expand All @@ -247,14 +247,14 @@ public PersistentTasksCustomMetadata.Assignment selectNode(int dynamicMaxOpenJob

return createAssignment(
minLoadedNodeByMemory,
reasons,
reasons.values(),
maxNodeSize > 0L ?
NativeMemoryCalculator.allowedBytesForMl(maxNodeSize, maxMachineMemoryPercent, useAutoMemoryPercentage) :
Long.MAX_VALUE);
}

PersistentTasksCustomMetadata.Assignment createAssignment(DiscoveryNode minLoadedNode,
List<String> reasons,
Collection<String> reasons,
long biggestPossibleJob) {
if (minLoadedNode == null) {
String explanation = String.join("|", reasons);
Expand Down

0 comments on commit dea66ad

Please sign in to comment.