Skip to content

Commit

Permalink
Improve auto-scaling mechanism
Browse files Browse the repository at this point in the history
1. A task releases all buckets as soon as it exits processing
of a current bucket in a way other than regular completion. This
ensures e.g. that tasks that are suspended in a standard way hold
no buckets. (And this ensures correct processing of all buckets by
remaining workers.)

2. When a node is detected to be down by ClusterManager, all buckets
held by all tasks that were executing on it are automatically released.
The reason is exactly the same as above: to prevent buckets be held
by ghost tasks indefinitely.

3. When reconciling workers, we suspend only the superfluous tasks that
run on nodes that are still up and alive. (It is not sufficient
that they are not down.) This is because we rely on the tasks
themselves to release their buckets. This is not a perfect solution,
though - see MID-7180.
  • Loading branch information
mederly committed Aug 23, 2021
1 parent 6195d91 commit 40469e9
Show file tree
Hide file tree
Showing 15 changed files with 314 additions and 102 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7295,6 +7295,13 @@
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="nodeUpAndAlive" type="xsd:string" minOccurs="0" maxOccurs="unbounded">
<xsd:annotation>
<xsd:documentation>
Nodes that are up and alive (according to operational state + last check-in time in repo).
</xsd:documentation>
</xsd:annotation>
</xsd:element>
</xsd:sequence>
</xsd:complexType>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@

import static com.evolveum.midpoint.xml.ns._public.common.common_3.TaskSchedulingStateType.READY;

import static com.evolveum.midpoint.xml.ns._public.common.common_3.TaskSchedulingStateType.SUSPENDED;

import static org.assertj.core.api.Assertions.assertThat;

/**
Expand Down Expand Up @@ -349,11 +347,13 @@ public void test150NodeRemovedAfter3Hours() throws Exception {
.end();
// @formatter:on

assertWorkerTaskFourSubtasksHalfSuspended();
// Note that tasks on Node 1 are NOT suspended. The reason is that they were not actually started (Node1 did not
// exist in reality). So the reconciliator just lets them be.
assertWorkerTaskFourSubtasks();
}

/**
* No change for two days. Reconciliation should be trigger after 1 day.
* No change for two days. Reconciliation should be triggered after 1 day.
*/
@Test
public void test160NoChangeForTwoDays() throws Exception {
Expand Down Expand Up @@ -394,7 +394,7 @@ public void test160NoChangeForTwoDays() throws Exception {
.end();
// @formatter:on

assertWorkerTaskFourSubtasksHalfSuspended();
assertWorkerTaskFourSubtasks();
}

/**
Expand Down Expand Up @@ -438,7 +438,7 @@ public void test900AutoScalingSkipInitial() throws Exception {
.end();
// @formatter:on

assertWorkerTaskFourSubtasksHalfSuspended();
assertWorkerTaskFourSubtasks();
}

private void setLastReconciliationTimestamp(XMLGregorianCalendar lastReconciliationTimestamp) {
Expand Down Expand Up @@ -480,23 +480,4 @@ private void assertWorkerTaskFourSubtasks() throws SchemaException, ObjectNotFou
.end();
// @formatter:on
}

private void assertWorkerTaskFourSubtasksHalfSuspended() throws SchemaException, ObjectNotFoundException {
// @formatter:off
assertTaskTree(TASK_TWO_WORKERS_PER_NODE.oid, "working task after")
.assertSubtasks(4)
.subtask("Worker DefaultNode:1 for root activity in task-two-workers-per-node")
.assertSchedulingState(READY)
.end()
.subtask("Worker DefaultNode:2 for root activity in task-two-workers-per-node")
.assertSchedulingState(READY)
.end()
.subtask("Worker Node1:1 for root activity in task-two-workers-per-node")
.assertSchedulingState(SUSPENDED)
.end()
.subtask("Worker Node1:2 for root activity in task-two-workers-per-node")
.assertSchedulingState(SUSPENDED)
.end();
// @formatter:on
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -199,19 +199,32 @@ private void doExecute(OperationResult result)
for (; task.canRun(); initialExecution = false) {

bucket = getWorkBucket(initialExecution, result);
if (!task.canRun()) {
break;
}

if (bucket == null) {
LOGGER.trace("No (next) work bucket within {}, exiting", task);
break;
}

executeSingleBucket(result);

if (!task.canRun() || errorState.wasStoppingExceptionEncountered()) {
break;
boolean complete = false;
try {
if (!task.canRun()) {
break;
}

executeSingleBucket(result);
if (!task.canRun() || errorState.wasStoppingExceptionEncountered()) {
break;
}

complete = true;
} finally {
if (!complete) {
// This is either when the task was stopped (canRun is false or there's an stopping exception)
// or an unhandled exception occurred.
//
// This most probably means that the task is going to be suspended. So let us release the buckets
// to allow their processing by other workers.
releaseAllBucketsIfWorker(result);
}
}

completeWorkBucketAndCommitProgress(result);
Expand Down Expand Up @@ -253,6 +266,13 @@ private boolean isScavenger(RunningTask task) {
return BucketingUtil.isScavenger(task.getActivitiesStateOrClone(), getActivityPath());
}

private void releaseAllBucketsIfWorker(OperationResult result) throws SchemaException, ObjectNotFoundException {
if (bucketingSituation.workerTaskOid != null) {
beans.bucketingManager.releaseAllWorkBucketsFromWorker(bucketingSituation.coordinatorTaskOid,
bucketingSituation.workerTaskOid, getActivityPath(), getLiveBucketManagementStatistics(), result);
}
}

private void completeWorkBucketAndCommitProgress(OperationResult result) throws ActivityExecutionException {
try {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,26 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;

import com.evolveum.midpoint.repo.common.task.CommonTaskBeans;
import com.evolveum.midpoint.repo.common.task.TaskExceptionHandlingUtil;
import com.evolveum.midpoint.repo.common.activity.definition.ActivityDefinition;

import com.evolveum.midpoint.repo.common.task.work.BucketingManager;
import com.evolveum.midpoint.schema.result.OperationResult;
import com.evolveum.midpoint.schema.util.task.ActivityPath;
import com.evolveum.midpoint.task.api.*;

import com.evolveum.midpoint.util.exception.ObjectNotFoundException;
import com.evolveum.midpoint.util.exception.SchemaException;
import com.evolveum.midpoint.util.logging.TraceManager;
import com.evolveum.midpoint.xml.ns._public.common.common_3.ActivityExecutionRoleType;
import com.evolveum.midpoint.xml.ns._public.common.common_3.TaskActivityStateType;
import com.evolveum.midpoint.xml.ns._public.common.common_3.TaskType;

import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -33,6 +44,8 @@
@Component
public class GenericTaskHandler implements TaskHandler {

private static final Trace LOGGER = TraceManager.getTrace(GenericTaskHandler.class);

public static final String HANDLER_URI = "http://midpoint.evolveum.com/xml/ns/public/task/generic/handler-3";

/**
Expand All @@ -50,6 +63,7 @@ public class GenericTaskHandler implements TaskHandler {
/** Common beans */
@Autowired private CommonTaskBeans beans;
@Autowired private TaskManager taskManager;
@Autowired private BucketingManager bucketingManager;

@PostConstruct
public void initialize() {
Expand Down Expand Up @@ -137,11 +151,31 @@ public void unregisterLegacyHandlerUri(String handlerUri) {
beans.taskManager.unregisterHandler(handlerUri);
}

public boolean isAvoidAutoAssigningArchetypes() {
boolean isAvoidAutoAssigningArchetypes() {
return avoidAutoAssigningArchetypes;
}

public void setAvoidAutoAssigningArchetypes(boolean avoidAutoAssigningArchetypes) {
this.avoidAutoAssigningArchetypes = avoidAutoAssigningArchetypes;
}

@Override
public void cleanupOnNodeDown(@NotNull TaskType taskBean, @NotNull OperationResult result)
throws SchemaException, ObjectNotFoundException {
TaskActivityStateType state = taskBean.getActivityState();
if (state == null || state.getLocalRootActivityExecutionRole() != ActivityExecutionRoleType.WORKER) {
return;
}

Task task = taskManager.createTaskInstance(taskBean.asPrismObject(), result);
Task parentTask = Objects.requireNonNull(
task.getParentTask(result), () -> "No parent for worker task " + task);
ActivityPath activityPath =
ActivityPath.fromBean(
Objects.requireNonNull(
state.getLocalRoot(), "No local root in " + task));

LOGGER.info("Returning all buckets from {} (coordinator {})", task, parentTask);
bucketingManager.releaseAllWorkBucketsFromWorker(parentTask.getOid(), task.getOid(), activityPath, null, result);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,9 @@ public WorkBucketType getWorkBucket(@NotNull String coordinatorTaskOid, @Nullabl
/**
* Marks a work bucket as complete.
*/
public void completeWorkBucket(String coordinatorTaskOid, String workerTaskOid, ActivityPath activityPath,
int sequentialNumber, ActivityBucketManagementStatistics statistics, OperationResult result)
public void completeWorkBucket(@NotNull String coordinatorTaskOid, @Nullable String workerTaskOid,
@NotNull ActivityPath activityPath, int sequentialNumber,
ActivityBucketManagementStatistics statistics, OperationResult result)
throws ObjectAlreadyExistsException, ObjectNotFoundException, SchemaException {
new CompleteBucketOperation(coordinatorTaskOid, workerTaskOid, activityPath, statistics, beans, sequentialNumber)
.execute(result);
Expand All @@ -75,8 +76,9 @@ public void completeWorkBucket(String coordinatorTaskOid, String workerTaskOid,
/**
* Releases work bucket.
*/
public void releaseWorkBucket(String coordinatorTaskOid, String workerTaskOid, ActivityPath activityPath,
int sequentialNumber, ActivityBucketManagementStatistics statistics, OperationResult result)
public void releaseWorkBucket(@NotNull String coordinatorTaskOid, @NotNull String workerTaskOid,
@NotNull ActivityPath activityPath, int sequentialNumber,
ActivityBucketManagementStatistics statistics, OperationResult result)
throws ObjectNotFoundException, SchemaException {
new ReleaseBucketsOperation(coordinatorTaskOid, workerTaskOid, activityPath, statistics, beans, sequentialNumber)
.execute(result);
Expand All @@ -88,8 +90,8 @@ public void releaseWorkBucket(String coordinatorTaskOid, String workerTaskOid, A
* Will change in the future - there are some preconditions to be checked within the modification operation.
*/
@Experimental
public void releaseAllWorkBucketsFromSuspendedWorker(String coordinatorTaskOid, String workerTaskOid,
ActivityPath activityPath, ActivityBucketManagementStatistics statistics, OperationResult result)
public void releaseAllWorkBucketsFromWorker(@NotNull String coordinatorTaskOid, @NotNull String workerTaskOid,
@NotNull ActivityPath activityPath, ActivityBucketManagementStatistics statistics, OperationResult result)
throws ObjectNotFoundException, SchemaException {
new ReleaseBucketsOperation(coordinatorTaskOid, workerTaskOid, activityPath, statistics, beans, null)
.execute(result);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,11 @@

package com.evolveum.midpoint.repo.common.task.work;

import static com.evolveum.midpoint.util.MiscUtil.argCheck;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;

import com.evolveum.midpoint.util.exception.SystemException;

import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import com.evolveum.midpoint.prism.delta.ItemDelta;
import com.evolveum.midpoint.prism.path.ItemPath;
Expand All @@ -32,6 +27,7 @@
import com.evolveum.midpoint.util.exception.ObjectAlreadyExistsException;
import com.evolveum.midpoint.util.exception.ObjectNotFoundException;
import com.evolveum.midpoint.util.exception.SchemaException;
import com.evolveum.midpoint.util.exception.SystemException;
import com.evolveum.midpoint.util.logging.Trace;
import com.evolveum.midpoint.util.logging.TraceManager;
import com.evolveum.midpoint.xml.ns._public.common.common_3.TaskType;
Expand All @@ -45,7 +41,7 @@ public class ReleaseBucketsOperation extends BucketOperation {
/** null means all delegated buckets */
private final Integer sequentialNumber;

ReleaseBucketsOperation(@NotNull String coordinatorTaskOid, @Nullable String workerTaskOid,
ReleaseBucketsOperation(@NotNull String coordinatorTaskOid, @NotNull String workerTaskOid,
@NotNull ActivityPath activityPath, ActivityBucketManagementStatistics collector, CommonTaskBeans beans,
Integer sequentialNumber) {
super(coordinatorTaskOid, workerTaskOid, activityPath, collector, beans);
Expand All @@ -55,9 +51,6 @@ public class ReleaseBucketsOperation extends BucketOperation {
public void execute(OperationResult result)
throws ObjectNotFoundException, SchemaException {

argCheck(workerTaskOid != null,
"Cannot release work bucket(s) from standalone task %s", coordinatorTaskOid);

ModifyObjectResult<TaskType> modifyObjectResult;
try {
modifyObjectResult = plainRepositoryService.modifyObjectDynamically(TaskType.class, coordinatorTaskOid, null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@

package com.evolveum.midpoint.repo.common.task.work.workers;

import com.evolveum.axiom.concepts.Lazy;
import com.evolveum.midpoint.prism.PrismContext;
import com.evolveum.midpoint.repo.common.activity.Activity;
import com.evolveum.midpoint.repo.common.task.CommonTaskBeans;
import com.evolveum.midpoint.schema.result.OperationResult;
import com.evolveum.midpoint.task.api.Task;
import com.evolveum.midpoint.task.api.TaskManager;
import com.evolveum.midpoint.util.MiscUtil;
import com.evolveum.midpoint.util.exception.SchemaException;
import com.evolveum.midpoint.util.exception.SystemException;
Expand All @@ -38,6 +38,12 @@ class ExpectedSetup {
@NotNull private final Task coordinatorTask;
@NotNull private final Task rootTask;

/** Nodes that are technically "up" i.e. marked in repository as such. */
@NotNull private final Set<String> nodesUp = new HashSet<>();

/** Nodes that are "up" and alive, i.e. regularly checking in. See {@link TaskManager#isUpAndAlive(NodeType)}. */
@NotNull private final Set<String> nodesUpAndAlive = new HashSet<>();

/**
* A collection of expected workers, characterized by group + name + scavenger flag.
*/
Expand Down Expand Up @@ -67,9 +73,10 @@ static ExpectedSetup create(@NotNull Activity<?, ?> activity, @NotNull WorkersMa
}

private void initialize(OperationResult result) {
Lazy<Collection<String>> allRunningNodesLazy = Lazy.from(() -> getAllRunningNodes(result));
determineClusterState(result);

for (WorkerTasksPerNodeConfigurationType perNodeConfig : getWorkersPerNode()) {
for (String nodeIdentifier : getNodeIdentifiers(perNodeConfig, allRunningNodesLazy)) {
for (String nodeIdentifier : getNodeIdentifiers(perNodeConfig)) {
int count = defaultIfNull(perNodeConfig.getCount(), 1);
int scavengers = defaultIfNull(perNodeConfig.getScavengers(), 1);
for (int index = 1; index <= count; index++) {
Expand All @@ -82,10 +89,11 @@ private void initialize(OperationResult result) {
}
}

private Collection<String> getAllRunningNodes(OperationResult result) {
private void determineClusterState(OperationResult result) {
try {
return beans.taskManager.determineClusterState(result)
.getNodeUp();
ClusterStateType state = beans.taskManager.determineClusterState(result);
nodesUp.addAll(state.getNodeUp());
nodesUpAndAlive.addAll(state.getNodeUpAndAlive());
} catch (SchemaException e) {
throw new SystemException(e);
}
Expand Down Expand Up @@ -127,12 +135,11 @@ private List<WorkerTasksPerNodeConfigurationType> getWorkersPerNode() {
}
}

private Collection<String> getNodeIdentifiers(WorkerTasksPerNodeConfigurationType perNodeConfig,
Lazy<Collection<String>> allNodesLazy) {
private Collection<String> getNodeIdentifiers(WorkerTasksPerNodeConfigurationType perNodeConfig) {
if (!perNodeConfig.getNodeIdentifier().isEmpty()) {
return perNodeConfig.getNodeIdentifier();
} else {
return allNodesLazy.get();
return nodesUp;
}
}

Expand All @@ -143,4 +150,12 @@ private Collection<String> getNodeIdentifiers(WorkerTasksPerNodeConfigurationTyp
@NotNull Map<WorkerCharacterization, WorkerTasksPerNodeConfigurationType> getWorkersConfiguration() {
return workersConfiguration;
}

@NotNull Set<String> getNodesUp() {
return nodesUp;
}

@NotNull Set<String> getNodesUpAndAlive() {
return nodesUpAndAlive;
}
}

0 comments on commit 40469e9

Please sign in to comment.