Skip to content

Commit

Permalink
Stop maintaining buckets in worker tasks
Browse files Browse the repository at this point in the history
It seems that careful and systematic use of modifyObjectDynamically
obsoletes the original design of "borrowing" buckets from coordinator
and storing them in workers.

So this commit:

1) Stops storing buckets in the worker tasks.
2) Migrates all bucket management operations to modifyObjectDynamically
mechanism.

This simplifies the bucketing code, makes it more robust, and reduces
the number of repository operations (e.g. it eliminates all getObject
operations when manipulating the buckets).

Related changes:
- RunningTask has now a reference to (resolved) parent task.
  • Loading branch information
mederly committed Aug 23, 2021
1 parent 9bb7d31 commit 7f04fd4
Show file tree
Hide file tree
Showing 35 changed files with 1,103 additions and 1,060 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@ public static WorkBucketType findBucketByNumber(List<WorkBucketType> buckets, in
.findFirst().orElse(null);
}

public static @NotNull WorkBucketType findBucketByNumberRequired(List<WorkBucketType> buckets, int sequentialNumber) {
return buckets.stream()
.filter(b -> b.getSequentialNumber() == sequentialNumber)
.findFirst().orElseThrow(
() -> new IllegalStateException("No bucket #" + sequentialNumber + " found"));
}

// beware: do not call this on prism structure directly (it does not support setting values)
public static void sortBucketsBySequentialNumber(List<WorkBucketType> buckets) {
buckets.sort(Comparator.comparingInt(WorkBucketType::getSequentialNumber));
Expand Down Expand Up @@ -156,11 +163,16 @@ public static boolean isScavenger(TaskActivityStateType taskState, ActivityPath
return bucketing != null && Boolean.TRUE.equals(bucketing.isScavenger());
}

public static boolean isInScavengingPhase(TaskActivityStateType taskState, ActivityPath activityPath) {
ActivityBucketingStateType bucketing = ActivityStateUtil.getActivityStateRequired(taskState, activityPath).getBucketing();
return bucketing != null && Boolean.TRUE.equals(bucketing.isScavenging());
}

public static boolean isWorkComplete(ActivityStateType state) {
return state != null && state.getBucketing() != null && Boolean.TRUE.equals(state.getBucketing().isWorkComplete());
}

public static String getWorkerOid(@NotNull WorkBucketType bucket) {
public static @Nullable String getWorkerOid(@NotNull WorkBucketType bucket) {
return bucket.getWorkerRef() != null ? bucket.getWorkerRef().getOid() : null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@
*/
public class BucketingConstants {

public static final String GET_WORK_BUCKET_FOUND_SELF_ALLOCATED = "getWorkBucket.foundSelfAllocated";
public static final String GET_WORK_BUCKET_FOUND_ALREADY_DELEGATED = "getWorkBucket.foundAlreadyDelegated";
public static final String GET_WORK_BUCKET_CREATED_NEW = "getWorkBucket.createdNew";
public static final String GET_WORK_BUCKET_DELEGATED = "getWorkBucket.delegated";
public static final String GET_WORK_BUCKET_FOUND_EXISTING = "getWorkBucket.foundExisting";
public static final String GET_WORK_BUCKET_NO_MORE_BUCKETS_DEFINITE = "getWorkBucket.noMoreBucketsDefinite";
public static final String GET_WORK_BUCKET_NO_MORE_BUCKETS_NOT_SCAVENGER = "getWorkBucket.noMoreBucketsNotScavenger";
public static final String GET_WORK_BUCKET_NO_MORE_BUCKETS_WAIT_TIME_ELAPSED = "getWorkBucket.NoMoreBucketsWaitTimeElapsed";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3207,7 +3207,8 @@
<xsd:complexType name="ActivityBucketingStateType">
<xsd:annotation>
<xsd:documentation>
Information about the bucket processing within an activity.
Information about the bucket processing within an activity. It is present in both coordinator/standalone tasks
("buckets-holding" for short) and worker tasks.
</xsd:documentation>
<xsd:appinfo>
<a:container>true</a:container>
Expand All @@ -3219,20 +3220,22 @@
<xsd:element name="bucket" type="tns:WorkBucketType" minOccurs="0" maxOccurs="unbounded">
<xsd:annotation>
<xsd:documentation>
Current buckets of work. Some buckets might be present implicitly: if a bucket with
sequence number of N is present, it is expected that all unlisted buckets with numbers lower than N
are either complete (for coordinator and standalone tasks) or not relevant (for worker tasks).
Current buckets of work.

Some buckets might be present implicitly: if a bucket with sequence number of N is present,
it is expected that all unlisted buckets with numbers lower than N are complete.
In a similar way, if no free bucket is available and we are not at the end it is expected that some
free buckets are yet to be created (for coordinator tasks).
free buckets are yet to be created.

Present only in buckets-holding tasks (coordinator or standalone).
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="numberOfBuckets" type="xsd:int" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
How many buckets are there? This is an indicative information, e.g. to be used when
estimating total progress. Relevant only for coordinator and standalone tasks.
estimating total progress. Present only in buckets-holding tasks.
</xsd:documentation>
</xsd:annotation>
</xsd:element>
Expand All @@ -3248,22 +3251,24 @@
<xsd:element name="scavenging" type="xsd:boolean" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
Are we in the scavenging phase? Relevant only for coordinator tasks.
Are we in the scavenging phase? Present only in buckets-holding tasks,
relevant only in coordinator-workers scenarios.
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="scavenger" type="xsd:boolean" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
Is this task a scavenger i.e. will it wait until the work is done, looking
for buckets that could be reclaimed? Applicable only to worker tasks.
for buckets that could be reclaimed? Present only in worker tasks.
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="workComplete" type="xsd:boolean" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
True if all the work is complete: there are no more buckets to be processed.
Present only in buckets-holding tasks.
</xsd:documentation>
</xsd:annotation>
</xsd:element>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

import java.io.FileNotFoundException;

import com.evolveum.midpoint.repo.common.task.work.BucketingConfigurationOverrides;

import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.annotation.DirtiesContext.ClassMode;
import org.springframework.test.context.ContextConfiguration;
Expand Down Expand Up @@ -40,7 +42,7 @@ public class TestSyncStoryUsingReconciliationMultithreaded extends TestSyncStory
@Override
public void initSystem(Task initTask, OperationResult initResult) throws Exception {
super.initSystem(initTask, initResult);
bucketingManager.setFreeBucketWaitIntervalOverride(100L);
BucketingConfigurationOverrides.setFreeBucketWaitIntervalOverride(100L);
}

@SuppressWarnings("Duplicates")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

import java.io.FileNotFoundException;

import com.evolveum.midpoint.repo.common.task.work.BucketingConfigurationOverrides;

import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.annotation.DirtiesContext.ClassMode;
import org.springframework.test.context.ContextConfiguration;
Expand Down Expand Up @@ -41,7 +43,7 @@ public class TestSyncStoryUsingReconciliationPartitioned extends TestSyncStoryUs
@Override
public void initSystem(Task initTask, OperationResult initResult) throws Exception {
super.initSystem(initTask, initResult);
bucketingManager.setFreeBucketWaitIntervalOverride(100L);
BucketingConfigurationOverrides.setFreeBucketWaitIntervalOverride(100L);
}

@SuppressWarnings("Duplicates")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import javax.xml.datatype.XMLGregorianCalendar;
import javax.xml.namespace.QName;

import com.evolveum.midpoint.repo.common.task.work.BucketingConfigurationOverrides;

import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.mutable.MutableInt;
import org.jetbrains.annotations.NotNull;
Expand Down Expand Up @@ -226,7 +228,7 @@ public void initSystem(Task initTask, OperationResult initResult) throws Excepti
}

// This is generally useful in tests, to avoid long waiting for bucketed tasks.
bucketingManager.setFreeBucketWaitIntervalOverride(100L);
BucketingConfigurationOverrides.setFreeBucketWaitIntervalOverride(100L);

// We generally do not import all the archetypes for all kinds of tasks (at least not now).
genericTaskHandler.setAvoidAutoAssigningArchetypes(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
import static com.evolveum.midpoint.task.api.TaskRunResult.TaskRunResultStatus.PERMANENT_ERROR;

import com.evolveum.midpoint.repo.common.activity.state.ActivityBucketManagementStatistics;
import com.evolveum.midpoint.repo.common.task.work.GetBucketOperationOptions;
import com.evolveum.midpoint.repo.common.task.work.GetBucketOperationOptions.GetBucketOperationOptionsBuilder;
import com.evolveum.midpoint.schema.util.task.BucketingUtil;
import com.evolveum.midpoint.util.logging.LoggingUtils;
import com.evolveum.midpoint.xml.ns._public.common.common_3.WorkBucketType;

Expand Down Expand Up @@ -41,6 +44,10 @@
import com.evolveum.midpoint.xml.ns._public.common.common_3.AbstractActivityWorkStateType;
import com.evolveum.midpoint.xml.ns._public.common.common_3.ExecutionModeType;

import org.jetbrains.annotations.Nullable;

import java.util.Objects;

/**
* Represents an execution of an iterative activity: either plain iterative one or search-based one.
*
Expand Down Expand Up @@ -94,6 +101,13 @@ public abstract class IterativeActivityExecution<
*/
protected WorkBucketType bucket;

/**
* Information needed to manage buckets.
*
* Determined on the execution start.
*/
private BucketingSituation bucketingSituation;

/**
* Schedules individual items for processing by worker threads (if running in multiple threads).
* Re-created for each individual bucket.
Expand Down Expand Up @@ -172,12 +186,14 @@ public IterativeActivityExecution(@NotNull ExecutionInstantiationContext<WD, AH>
/**
* Bucketed version of the execution.
*/
protected void doExecute(OperationResult result)
private void doExecute(OperationResult result)
throws ActivityExecutionException, CommonException {

RunningTask task = taskExecution.getRunningTask();
boolean initialExecution = true;

bucketingSituation = determineBucketingSituation();

// resetWorkStateAndStatisticsIfWorkComplete(result);
// startCollectingStatistics(task, handler);

Expand Down Expand Up @@ -208,9 +224,16 @@ private WorkBucketType getWorkBucket(boolean initialExecution, OperationResult r

WorkBucketType bucket;
try {
bucket = beans.bucketingManager.getWorkBucket(task, activity.getPath(),
activity.getDefinition().getDistributionDefinition(), FREE_BUCKET_WAIT_TIME, initialExecution,
getLiveBucketManagementStatistics(), executionSpecifics, result);
GetBucketOperationOptions options = GetBucketOperationOptionsBuilder.anOptions()
.withDistributionDefinition(activity.getDefinition().getDistributionDefinition())
.withFreeBucketWaitTime(FREE_BUCKET_WAIT_TIME)
.withCanRun(task::canRun)
.withExecuteInitialWait(initialExecution)
.withImplicitSegmentationResolver(executionSpecifics)
.withIsScavenger(isScavenger(task))
.build();
bucket = beans.bucketingManager.getWorkBucket(bucketingSituation.coordinatorTaskOid,
bucketingSituation.workerTaskOid, activity.getPath(), options, getLiveBucketManagementStatistics(), result);
task.refresh(result); // We want to have the most current state of the running task.
} catch (InterruptedException e) {
LOGGER.trace("InterruptedExecution in getWorkBucket for {}", task);
Expand All @@ -227,11 +250,15 @@ private WorkBucketType getWorkBucket(boolean initialExecution, OperationResult r
return bucket;
}

private boolean isScavenger(RunningTask task) {
return BucketingUtil.isScavenger(task.getActivitiesStateOrClone(), getActivityPath());
}

private void completeWorkBucketAndCommitProgress(OperationResult result) throws ActivityExecutionException {
RunningTask task = taskExecution.getRunningTask();
try {
beans.bucketingManager.completeWorkBucket(task, getActivityPath(), bucket,
getLiveBucketManagementStatistics(), result);

beans.bucketingManager.completeWorkBucket(bucketingSituation.coordinatorTaskOid, bucketingSituation.workerTaskOid,
getActivityPath(), bucket.getSequentialNumber(), getLiveBucketManagementStatistics(), result);

activityState.getLiveProgress().onCommitPoint();
activityState.updateProgressAndStatisticsNoCommit();
Expand Down Expand Up @@ -468,4 +495,35 @@ public interface SpecificsSupplier<AE extends IterativeActivityExecution<?, ?, ?
public WorkBucketType getBucket() {
return bucket;
}

private @NotNull BucketingSituation determineBucketingSituation() {
if (getActivityState().isWorker()) {
return BucketingSituation.worker(getRunningTask());
} else {
return BucketingSituation.standalone(getRunningTask());
}
}

private static class BucketingSituation {
@NotNull private final String coordinatorTaskOid;
@Nullable private final String workerTaskOid;

private BucketingSituation(@NotNull String coordinatorTaskOid, @Nullable String workerTaskOid) {
this.coordinatorTaskOid = coordinatorTaskOid;
this.workerTaskOid = workerTaskOid;
}

public static BucketingSituation worker(RunningTask worker) {
return new BucketingSituation(
Objects.requireNonNull(
worker.getParentTask(),
"No parent task for worker " + worker)
.getOid(),
worker.getOid());
}

public static BucketingSituation standalone(RunningTask task) {
return new BucketingSituation(task.getOid(), null);
}
}
}

0 comments on commit 7f04fd4

Please sign in to comment.