Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/Evolveum/midpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
KaterynaHonchar committed May 2, 2018
2 parents d8ca617 + d4f4261 commit 12e50a9
Show file tree
Hide file tree
Showing 17 changed files with 422 additions and 128 deletions.
Expand Up @@ -1299,6 +1299,13 @@
</xsd:annotation>
</xsd:element>
</xsd:choice>
<xsd:element name="allocation" type="tns:WorkAllocationConfigurationType" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
Parameters related to buckets allocation process.
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<!--<xsd:element name="progressMarking" type="tns:WorkBucketsProgressMarkingType" minOccurs="0">-->
<!--<xsd:annotation>-->
<!--<xsd:documentation>-->
Expand All @@ -1309,6 +1316,88 @@
</xsd:sequence>
</xsd:complexType>

<xsd:complexType name="WorkAllocationConfigurationType">
<xsd:annotation>
<xsd:documentation>
Parameters related to buckets allocation process.
</xsd:documentation>
<xsd:appinfo>
<a:since>3.8</a:since>
</xsd:appinfo>
</xsd:annotation>
<xsd:sequence>
<xsd:element name="bucketCreationBatch" type="xsd:int" minOccurs="0" default="1">
<xsd:annotation>
<xsd:documentation>
How many buckets are to be created at once (when needed)?
EXPERIMENTAL
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<!-- TODO bucket allocation batch -->
<xsd:element name="allocateFirst" type="xsd:boolean" minOccurs="0" default="true">
<xsd:annotation>
<xsd:documentation>
Should the first available bucket be always allocated?
EXPERIMENTAL
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="workAllocationMaxRetries" type="xsd:int" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
How many retries for allocation of a work bucket should be attempted? The default is specified
in the system configuration.
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="workAllocationRetryIntervalBase" type="xsd:long" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
The base for exponential growth of the retry interval window. The actual delay is computed as

random() * retryIntervalBase * 2 ^ (min(retryNumber, exponentialThreshold) - 1)

Where retryNumber goes from 1 upwards.

The default is specified in the system configuration.
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="workAllocationRetryExponentialThreshold" type="xsd:int" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
The retry number from which the retry delay window starts to be constant (instead of exponentially
dependent on retry number). The default is specified in the system configuration.
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="workAllocationRetryIntervalLimit" type="xsd:long" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
Alternative way of limiting the retry delay growth. If specified, this is an absolute limit on the
delay window size (i.e. the factor that multiplies the random number of 0..1).
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="workAllocationInitialDelay" type="xsd:long" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
Size of random interval for the initial delay. The default is specified in the system configuration.
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="workAllocationFreeBucketWaitInterval" type="xsd:long" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
If specified, overrides the time used to wait for free bucket(s) reclamation. This is applied when
no free buckets are available but the work is not completely done.
</xsd:documentation>
</xsd:annotation>
</xsd:element>
</xsd:sequence>
</xsd:complexType>

<xsd:complexType name="AbstractWorkSegmentationType">
<xsd:annotation>
<xsd:documentation>
Expand Down
Expand Up @@ -21,17 +21,24 @@
*/
public class ExponentialBackoffComputer extends RetryLimitedBackoffComputer {

private long delayStep;
private long baseDelayInterval;
private int exponentialThreshold;
private Long delayIntervalLimit;

public ExponentialBackoffComputer(int maxRetries, long delayStep, int exponentialThreshold) {
public ExponentialBackoffComputer(int maxRetries, long baseDelayInterval, int exponentialThreshold, Long delayIntervalLimit) {
super(maxRetries);
this.delayStep = delayStep;
this.baseDelayInterval = baseDelayInterval;
this.exponentialThreshold = exponentialThreshold;
this.delayIntervalLimit = delayIntervalLimit;
}

@Override
public long computeDelayWithinLimits(int retryNumber) {
return Math.round(Math.random() * delayStep * Math.pow(2, Math.min(retryNumber, exponentialThreshold) - 1));
//System.out.println("baseDelayInterval = " + baseDelayInterval + ", limits: " + exponentialThreshold + "/" + delayIntervalLimit + " (retry " + retryNumber + ")");
double delayInterval = baseDelayInterval * Math.pow(2, Math.min(retryNumber, exponentialThreshold) - 1);
if (delayIntervalLimit != null && delayInterval > delayIntervalLimit) {
delayInterval = delayIntervalLimit;
}
return Math.round(Math.random() * delayInterval);
}
}
Expand Up @@ -100,6 +100,8 @@ public synchronized PrismObject<ResourceType> get(String oid, String version, Ge
cachedResource.checkImmutability();
} catch (IllegalStateException ex) {
LOGGER.error("Failed immutability test", ex);
cache.remove(oid);

return null;
}
return cachedResource;
Expand Down
Expand Up @@ -32,8 +32,8 @@ public class SqlBaseService {
// pessimistic, optimistic exception
public static final int LOCKING_MAX_RETRIES = 40;

// timeout will be a random number between 0 and LOCKING_TIMEOUT_STEP * 2^exp where exp is either real attempt # minus 1, or LOCKING_EXP_THRESHOLD (whatever is lesser)
public static final long LOCKING_TIMEOUT_STEP = 50;
// timeout will be a random number between 0 and LOCKING_DELAY_INTERVAL_BASE * 2^exp where exp is either real attempt # minus 1, or LOCKING_EXP_THRESHOLD (whatever is lesser)
public static final long LOCKING_DELAY_INTERVAL_BASE = 50;
public static final int LOCKING_EXP_THRESHOLD = 7; // i.e. up to 6400 msec wait time

@Autowired
Expand Down
Expand Up @@ -39,7 +39,7 @@

import static com.evolveum.midpoint.repo.sql.SqlBaseService.LOCKING_EXP_THRESHOLD;
import static com.evolveum.midpoint.repo.sql.SqlBaseService.LOCKING_MAX_RETRIES;
import static com.evolveum.midpoint.repo.sql.SqlBaseService.LOCKING_TIMEOUT_STEP;
import static com.evolveum.midpoint.repo.sql.SqlBaseService.LOCKING_DELAY_INTERVAL_BASE;

/**
* Core functionality needed in all members of SQL service family.
Expand Down Expand Up @@ -198,7 +198,7 @@ public int logOperationAttempt(String oid, String operation, int attempt, @NotNu
throw ex;
}

BackoffComputer backoffComputer = new ExponentialBackoffComputer(LOCKING_MAX_RETRIES, LOCKING_TIMEOUT_STEP, LOCKING_EXP_THRESHOLD);
BackoffComputer backoffComputer = new ExponentialBackoffComputer(LOCKING_MAX_RETRIES, LOCKING_DELAY_INTERVAL_BASE, LOCKING_EXP_THRESHOLD, null);
long waitTime;
try {
waitTime = backoffComputer.computeDelay(attempt);
Expand Down
Expand Up @@ -78,7 +78,8 @@ public class TaskManagerConfiguration {
private static final String SCHEDULER_INITIALLY_STOPPED_CONFIG_ENTRY = "schedulerInitiallyStopped";

private static final String WORK_ALLOCATION_MAX_RETRIES_ENTRY = "workAllocationMaxRetries";
private static final String WORK_ALLOCATION_RETRY_INTERVAL_ENTRY = "workAllocationRetryInterval";
private static final String WORK_ALLOCATION_RETRY_INTERVAL_BASE_ENTRY = "workAllocationRetryIntervalBase";
private static final String WORK_ALLOCATION_RETRY_INTERVAL_LIMIT_ENTRY = "workAllocationRetryIntervalLimit";
private static final String WORK_ALLOCATION_RETRY_EXPONENTIAL_THRESHOLD_ENTRY = "workAllocationRetryExponentialThreshold";
private static final String WORK_ALLOCATION_INITIAL_DELAY_ENTRY = "workAllocationInitialDelay";
private static final String WORK_ALLOCATION_DEFAULT_FREE_BUCKET_WAIT_INTERVAL_ENTRY = "workAllocationDefaultFreeBucketWaitInterval";
Expand Down Expand Up @@ -132,7 +133,8 @@ public class TaskManagerConfiguration {
private boolean schedulerInitiallyStopped;

private int workAllocationMaxRetries;
private long workAllocationRetryInterval;
private long workAllocationRetryIntervalBase;
private Long workAllocationRetryIntervalLimit;
private int workAllocationRetryExponentialThreshold;
private long workAllocationInitialDelay;
private long workAllocationDefaultFreeBucketWaitInterval;
Expand Down Expand Up @@ -200,7 +202,8 @@ public class TaskManagerConfiguration {
RUN_NOW_KEEPS_ORIGINAL_SCHEDULE_CONFIG_ENTRY,
SCHEDULER_INITIALLY_STOPPED_CONFIG_ENTRY,
WORK_ALLOCATION_MAX_RETRIES_ENTRY,
WORK_ALLOCATION_RETRY_INTERVAL_ENTRY,
WORK_ALLOCATION_RETRY_INTERVAL_BASE_ENTRY,
WORK_ALLOCATION_RETRY_INTERVAL_LIMIT_ENTRY,
WORK_ALLOCATION_INITIAL_DELAY_ENTRY,
WORK_ALLOCATION_RETRY_EXPONENTIAL_THRESHOLD_ENTRY,
WORK_ALLOCATION_DEFAULT_FREE_BUCKET_WAIT_INTERVAL_ENTRY
Expand Down Expand Up @@ -296,7 +299,8 @@ void setBasicInformation(MidpointConfiguration masterConfig) throws TaskManagerC
schedulerInitiallyStopped = c.getBoolean(SCHEDULER_INITIALLY_STOPPED_CONFIG_ENTRY, false);

workAllocationMaxRetries = c.getInt(WORK_ALLOCATION_MAX_RETRIES_ENTRY, WORK_ALLOCATION_MAX_RETRIES_DEFAULT);
workAllocationRetryInterval = c.getLong(WORK_ALLOCATION_RETRY_INTERVAL_ENTRY, WORK_ALLOCATION_RETRY_INTERVAL_DEFAULT);
workAllocationRetryIntervalBase = c.getLong(WORK_ALLOCATION_RETRY_INTERVAL_BASE_ENTRY, WORK_ALLOCATION_RETRY_INTERVAL_DEFAULT);
workAllocationRetryIntervalLimit = c.getLong(WORK_ALLOCATION_RETRY_INTERVAL_LIMIT_ENTRY, null);
workAllocationRetryExponentialThreshold = c.getInt(WORK_ALLOCATION_RETRY_EXPONENTIAL_THRESHOLD_ENTRY, WORK_ALLOCATION_RETRY_EXPONENTIAL_THRESHOLD_DEFAULT);
workAllocationInitialDelay = c.getLong(WORK_ALLOCATION_INITIAL_DELAY_ENTRY, WORK_ALLOCATION_INITIAL_DELAY_DEFAULT);
workAllocationDefaultFreeBucketWaitInterval = c.getLong(WORK_ALLOCATION_DEFAULT_FREE_BUCKET_WAIT_INTERVAL_ENTRY,
Expand Down Expand Up @@ -560,11 +564,15 @@ public int getWorkAllocationMaxRetries() {
return workAllocationMaxRetries;
}

public long getWorkAllocationRetryInterval() {
return workAllocationRetryInterval;
public long getWorkAllocationRetryIntervalBase() {
return workAllocationRetryIntervalBase;
}

public int getWorkAllocationRetryExponentialThreshold() {
public Long getWorkAllocationRetryIntervalLimit() {
return workAllocationRetryIntervalLimit;
}

public int getWorkAllocationRetryExponentialThreshold() {
return workAllocationRetryExponentialThreshold;
}

Expand Down
Expand Up @@ -689,30 +689,26 @@ private TaskRunResult executeWorkBucketAwareTaskHandler(WorkBucketAwareTaskHandl
}
}

try {
workStateManager.executeInitialDelay(task);
} catch (InterruptedException e) {
return createInterruptedTaskRunResult();
}

boolean initialBucket = true;
TaskWorkBucketProcessingResult runResult = null;
for (;;) {
WorkBucketType bucket;
try {
try {
bucket = workStateManager.getWorkBucket(task.getOid(), FREE_BUCKET_WAIT_TIME, () -> task.canRun(), executionResult);
bucket = workStateManager.getWorkBucket(task.getOid(), FREE_BUCKET_WAIT_TIME, () -> task.canRun(), initialBucket, executionResult);
} catch (InterruptedException e) {
LOGGER.trace("InterruptedExecution in getWorkBucket for {}", task);
if (task.canRun()) {
throw new IllegalStateException("Unexpected InterruptedException", e);
throw new IllegalStateException("Unexpected InterruptedException: " + e.getMessage(), e);
} else {
return createInterruptedTaskRunResult();
}
}
} catch (Throwable t) {
LoggingUtils.logUnexpectedException(LOGGER, "Couldn't allocate a work bucket for task {} (coordinator {})", t, task, null);
return createFailureTaskRunResult("Couldn't allocate a work bucket for task", t);
return createFailureTaskRunResult("Couldn't allocate a work bucket for task: " + t.getMessage(), t);
}
initialBucket = false;
if (bucket == null) {
LOGGER.trace("No (next) work bucket within {}, exiting", task);
runResult = handler.onNoMoreBuckets(task, runResult);
Expand Down
Expand Up @@ -35,9 +35,11 @@
// <CNT extends AbstractWorkBucketContentType, CFG extends AbstractTaskWorkBucketsConfigurationType>
public abstract class BaseWorkSegmentationStrategy implements WorkSegmentationStrategy {

private final TaskWorkManagementType configuration;
protected final PrismContext prismContext;

protected BaseWorkSegmentationStrategy(PrismContext prismContext) {
protected BaseWorkSegmentationStrategy(TaskWorkManagementType configuration, PrismContext prismContext) {
this.configuration = configuration;
this.prismContext = prismContext;
}

Expand All @@ -48,13 +50,17 @@ protected BaseWorkSegmentationStrategy(PrismContext prismContext) {
@Override
public GetBucketResult getBucket(@NotNull TaskWorkStateType workState) throws SchemaException {
boolean somethingDelegated = false;
List<WorkBucketType> ready = new ArrayList<>();
for (WorkBucketType bucket : workState.getBucket()) {
if (bucket.getState() == WorkBucketStateType.READY) {
return new GetBucketResult.FoundExisting(bucket);
ready.add(bucket);
} else if (bucket.getState() == WorkBucketStateType.DELEGATED) {
somethingDelegated = true;
}
}
if (!ready.isEmpty()) {
return new GetBucketResult.FoundExisting(ready.get(selectReadyBucket(ready.size())));
}
List<? extends AbstractWorkBucketContentType> newBucketsContent = createAdditionalBuckets(workState);
if (!newBucketsContent.isEmpty()) {
List<WorkBucketType> newBuckets = new ArrayList<>(newBucketsContent.size());
Expand All @@ -66,12 +72,61 @@ public GetBucketResult getBucket(@NotNull TaskWorkStateType workState) throws Sc
.content(newBucketContent)
.state(WorkBucketStateType.READY));
}
return new GetBucketResult.NewBuckets(newBuckets, 0);
return new GetBucketResult.NewBuckets(newBuckets, selectReadyBucket(newBuckets.size()));
} else {
return new NothingFound(!somethingDelegated);
}
}

private int selectReadyBucket(int size) {
if (isAllocateFirst()) {
return 0;
} else {
return (int) (Math.random() * size);
}
}

@NotNull
protected abstract List<? extends AbstractWorkBucketContentType> createAdditionalBuckets(TaskWorkStateType workState) throws SchemaException;
protected List<? extends AbstractWorkBucketContentType> createAdditionalBuckets(TaskWorkStateType workState) throws SchemaException {
WorkBucketType lastBucket = TaskWorkStateTypeUtil.getLastBucket(workState.getBucket());
AbstractWorkBucketContentType lastContent = lastBucket != null ? lastBucket.getContent() : null;
Integer lastSequentialNumber = lastBucket != null ? lastBucket.getSequentialNumber() : null;
int count = getBucketCreationBatch();
List<AbstractWorkBucketContentType> rv = new ArrayList<>(count);
for (int i = 0; i < count; i++) {
AbstractWorkBucketContentType newContent = createAdditionalBucket(lastContent, lastSequentialNumber);
if (newContent == null) {
break;
}
rv.add(newContent);
lastContent = newContent;
lastSequentialNumber = lastSequentialNumber != null ? lastSequentialNumber + 1 : 1;
}
return rv;
}

private WorkAllocationConfigurationType getAllocationConfiguration() {
return configuration != null && configuration.getBuckets() != null ? configuration.getBuckets().getAllocation() : null;
}

private int getBucketCreationBatch() {
WorkAllocationConfigurationType ac = getAllocationConfiguration();
if (ac != null && ac.getBucketCreationBatch() != null) {
return ac.getBucketCreationBatch();
} else {
return 1;
}
}

private boolean isAllocateFirst() {
WorkAllocationConfigurationType ac = getAllocationConfiguration();
if (ac != null && ac.isAllocateFirst() != null) {
return ac.isAllocateFirst();
} else {
return true;
}
}

// the issue with this method is that we cannot distinguish between returning null content and returning no content (no more buckets)
protected abstract AbstractWorkBucketContentType createAdditionalBucket(AbstractWorkBucketContentType lastBucketContent, Integer lastBucketSequentialNumber) throws SchemaException;
}

0 comments on commit 12e50a9

Please sign in to comment.