Skip to content

Commit

Permalink
Merge branch 'feature/optimizing-work-allocation'
Browse files Browse the repository at this point in the history
  • Loading branch information
mederly committed Apr 30, 2018
2 parents 8759a80 + e63c0ec commit 424fde5
Show file tree
Hide file tree
Showing 9 changed files with 217 additions and 71 deletions.
Expand Up @@ -1299,6 +1299,13 @@
</xsd:annotation>
</xsd:element>
</xsd:choice>
<xsd:element name="allocation" type="tns:WorkAllocationConfigurationType" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
Parameters related to buckets allocation process.
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<!--<xsd:element name="progressMarking" type="tns:WorkBucketsProgressMarkingType" minOccurs="0">-->
<!--<xsd:annotation>-->
<!--<xsd:documentation>-->
Expand All @@ -1309,6 +1316,37 @@
</xsd:sequence>
</xsd:complexType>

<xsd:complexType name="WorkAllocationConfigurationType">
<xsd:annotation>
<xsd:documentation>
Parameters related to buckets allocation process.
</xsd:documentation>
<xsd:appinfo>
<a:since>3.8</a:since>
</xsd:appinfo>
</xsd:annotation>
<xsd:sequence>
<xsd:element name="bucketCreationBatch" type="xsd:int" minOccurs="0" default="1">
<xsd:annotation>
<xsd:documentation>
How many buckets are to be created at once (when needed)?
EXPERIMENTAL
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<!-- TODO bucket allocation batch -->
<xsd:element name="allocateFirst" type="xsd:boolean" minOccurs="0" default="true">
<xsd:annotation>
<xsd:documentation>
Should the first available bucket be always allocated?
EXPERIMENTAL
</xsd:documentation>
</xsd:annotation>
</xsd:element>
</xsd:sequence>
</xsd:complexType>


<xsd:complexType name="AbstractWorkSegmentationType">
<xsd:annotation>
<xsd:documentation>
Expand Down
Expand Up @@ -35,9 +35,11 @@
// <CNT extends AbstractWorkBucketContentType, CFG extends AbstractTaskWorkBucketsConfigurationType>
public abstract class BaseWorkSegmentationStrategy implements WorkSegmentationStrategy {

private final TaskWorkManagementType configuration;
protected final PrismContext prismContext;

protected BaseWorkSegmentationStrategy(PrismContext prismContext) {
protected BaseWorkSegmentationStrategy(TaskWorkManagementType configuration, PrismContext prismContext) {
this.configuration = configuration;
this.prismContext = prismContext;
}

Expand All @@ -48,13 +50,17 @@ protected BaseWorkSegmentationStrategy(PrismContext prismContext) {
@Override
public GetBucketResult getBucket(@NotNull TaskWorkStateType workState) throws SchemaException {
boolean somethingDelegated = false;
List<WorkBucketType> ready = new ArrayList<>();
for (WorkBucketType bucket : workState.getBucket()) {
if (bucket.getState() == WorkBucketStateType.READY) {
return new GetBucketResult.FoundExisting(bucket);
ready.add(bucket);
} else if (bucket.getState() == WorkBucketStateType.DELEGATED) {
somethingDelegated = true;
}
}
if (!ready.isEmpty()) {
return new GetBucketResult.FoundExisting(ready.get(selectReadyBucket(ready.size())));
}
List<? extends AbstractWorkBucketContentType> newBucketsContent = createAdditionalBuckets(workState);
if (!newBucketsContent.isEmpty()) {
List<WorkBucketType> newBuckets = new ArrayList<>(newBucketsContent.size());
Expand All @@ -66,12 +72,61 @@ public GetBucketResult getBucket(@NotNull TaskWorkStateType workState) throws Sc
.content(newBucketContent)
.state(WorkBucketStateType.READY));
}
return new GetBucketResult.NewBuckets(newBuckets, 0);
return new GetBucketResult.NewBuckets(newBuckets, selectReadyBucket(newBuckets.size()));
} else {
return new NothingFound(!somethingDelegated);
}
}

private int selectReadyBucket(int size) {
if (isAllocateFirst()) {
return 0;
} else {
return (int) (Math.random() * size);
}
}

@NotNull
protected abstract List<? extends AbstractWorkBucketContentType> createAdditionalBuckets(TaskWorkStateType workState) throws SchemaException;
protected List<? extends AbstractWorkBucketContentType> createAdditionalBuckets(TaskWorkStateType workState) throws SchemaException {
WorkBucketType lastBucket = TaskWorkStateTypeUtil.getLastBucket(workState.getBucket());
AbstractWorkBucketContentType lastContent = lastBucket != null ? lastBucket.getContent() : null;
Integer lastSequentialNumber = lastBucket != null ? lastBucket.getSequentialNumber() : null;
int count = getBucketCreationBatch();
List<AbstractWorkBucketContentType> rv = new ArrayList<>(count);
for (int i = 0; i < count; i++) {
AbstractWorkBucketContentType newContent = createAdditionalBucket(lastContent, lastSequentialNumber);
if (newContent == null) {
break;
}
rv.add(newContent);
lastContent = newContent;
lastSequentialNumber = lastSequentialNumber != null ? lastSequentialNumber + 1 : 1;
}
return rv;
}

private WorkAllocationConfigurationType getAllocationConfiguration() {
return configuration != null && configuration.getBuckets() != null ? configuration.getBuckets().getAllocation() : null;
}

private int getBucketCreationBatch() {
WorkAllocationConfigurationType ac = getAllocationConfiguration();
if (ac != null && ac.getBucketCreationBatch() != null) {
return ac.getBucketCreationBatch();
} else {
return 1;
}
}

private boolean isAllocateFirst() {
WorkAllocationConfigurationType ac = getAllocationConfiguration();
if (ac != null && ac.isAllocateFirst() != null) {
return ac.isAllocateFirst();
} else {
return true;
}
}

// the issue with this method is that we cannot distinguish between returning null content and returning no content (no more buckets)
protected abstract AbstractWorkBucketContentType createAdditionalBucket(AbstractWorkBucketContentType lastBucketContent, Integer lastBucketSequentialNumber) throws SchemaException;
}
Expand Up @@ -23,38 +23,28 @@
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.util.List;

import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;

/**
* TODO
*
* @author mederly
*/
public class ExplicitWorkSegmentationStrategy extends BaseWorkSegmentationStrategy {

@NotNull private final TaskWorkManagementType configuration;
@NotNull private final ExplicitWorkSegmentationType bucketsConfiguration;

public ExplicitWorkSegmentationStrategy(@NotNull TaskWorkManagementType configuration,
PrismContext prismContext) {
super(prismContext);
this.configuration = configuration;
public ExplicitWorkSegmentationStrategy(@NotNull TaskWorkManagementType configuration, PrismContext prismContext) {
super(configuration, prismContext);
this.bucketsConfiguration = (ExplicitWorkSegmentationType)
TaskWorkStateTypeUtil.getWorkSegmentationConfiguration(configuration);
}

@NotNull
@Override
protected List<AbstractWorkBucketContentType> createAdditionalBuckets(TaskWorkStateType workState) {
WorkBucketType lastBucket = TaskWorkStateTypeUtil.getLastBucket(workState.getBucket());
int nextSequentialNumber = lastBucket != null ? lastBucket.getSequentialNumber() + 1 : 1;
if (nextSequentialNumber > bucketsConfiguration.getContent().size()) {
return emptyList();
protected AbstractWorkBucketContentType createAdditionalBucket(AbstractWorkBucketContentType lastBucketContent, Integer lastBucketSequentialNumber) {
int currentBucketNumber = lastBucketSequentialNumber != null ? lastBucketSequentialNumber : 0;
if (currentBucketNumber < bucketsConfiguration.getContent().size()) {
return bucketsConfiguration.getContent().get(currentBucketNumber);
} else {
return singletonList(bucketsConfiguration.getContent().get(nextSequentialNumber-1));
return null;
}
}

Expand Down
Expand Up @@ -24,10 +24,6 @@
import org.jetbrains.annotations.Nullable;

import java.math.BigInteger;
import java.util.List;

import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;

/**
* Implements work state management strategy based on numeric identifier intervals.
Expand All @@ -42,42 +38,39 @@ public class NumericWorkSegmentationStrategy extends BaseWorkSegmentationStrateg

public NumericWorkSegmentationStrategy(@NotNull TaskWorkManagementType configuration,
PrismContext prismContext) {
super(prismContext);
super(configuration, prismContext);
this.configuration = configuration;
this.bucketsConfiguration = (NumericWorkSegmentationType)
TaskWorkStateTypeUtil.getWorkSegmentationConfiguration(configuration);
}

@NotNull
@Override
protected List<NumericIntervalWorkBucketContentType> createAdditionalBuckets(TaskWorkStateType workState) {
protected NumericIntervalWorkBucketContentType createAdditionalBucket(AbstractWorkBucketContentType lastBucketContent,
Integer lastBucketSequentialNumber) {
BigInteger bucketSize = getOrComputeBucketSize();
BigInteger from = getFrom();
BigInteger to = getOrComputeTo();

WorkBucketType lastBucket = TaskWorkStateTypeUtil.getLastBucket(workState.getBucket());
NumericIntervalWorkBucketContentType newContent;
if (lastBucket != null) {
if (!(lastBucket.getContent() instanceof NumericIntervalWorkBucketContentType)) {
throw new IllegalStateException("Null or unsupported bucket content: " + lastBucket.getContent());
if (lastBucketSequentialNumber != null) {
if (!(lastBucketContent instanceof NumericIntervalWorkBucketContentType)) {
throw new IllegalStateException("Null or unsupported bucket content: " + lastBucketContent);
}
NumericIntervalWorkBucketContentType lastContent = (NumericIntervalWorkBucketContentType) lastBucket.getContent();
NumericIntervalWorkBucketContentType lastContent = (NumericIntervalWorkBucketContentType) lastBucketContent;
if (lastContent.getTo() == null || lastContent.getTo().compareTo(to) >= 0) {
return emptyList(); // no more buckets
return null; // no more buckets
}
BigInteger newEnd = lastContent.getTo().add(bucketSize);
if (newEnd.compareTo(to) > 0) {
newEnd = to;
}
newContent = new NumericIntervalWorkBucketContentType()
return new NumericIntervalWorkBucketContentType()
.from(lastContent.getTo())
.to(newEnd);
} else {
newContent = new NumericIntervalWorkBucketContentType()
return new NumericIntervalWorkBucketContentType()
.from(from)
.to(from.add(bucketSize));
}
return singletonList(newContent);
}

@NotNull
Expand Down
Expand Up @@ -18,6 +18,7 @@

import com.evolveum.midpoint.prism.PrismContext;
import com.evolveum.midpoint.task.quartzimpl.work.BaseWorkSegmentationStrategy;
import com.evolveum.midpoint.util.exception.SchemaException;
import com.evolveum.midpoint.xml.ns._public.common.common_3.AbstractWorkBucketContentType;
import com.evolveum.midpoint.xml.ns._public.common.common_3.TaskWorkManagementType;
import com.evolveum.midpoint.xml.ns._public.common.common_3.TaskWorkStateType;
Expand All @@ -39,7 +40,7 @@ public class SingleNullWorkSegmentationStrategy extends BaseWorkSegmentationStra
@SuppressWarnings("unused")
public SingleNullWorkSegmentationStrategy(TaskWorkManagementType configuration,
PrismContext prismContext) {
super(prismContext);
super(configuration, prismContext);
}

@NotNull
Expand All @@ -52,6 +53,12 @@ protected List<AbstractWorkBucketContentType> createAdditionalBuckets(TaskWorkSt
}
}

@Override
protected AbstractWorkBucketContentType createAdditionalBucket(AbstractWorkBucketContentType lastBucketContent,
Integer lastBucketSequentialNumber) throws SchemaException {
throw new UnsupportedOperationException();
}

@Override
public Integer estimateNumberOfBuckets(@Nullable TaskWorkStateType workState) {
return 1;
Expand Down
Expand Up @@ -31,7 +31,6 @@

import static com.evolveum.midpoint.xml.ns._public.common.common_3.StringWorkBucketsBoundaryMarkingType.INTERVAL;
import static com.evolveum.midpoint.xml.ns._public.common.common_3.StringWorkBucketsBoundaryMarkingType.PREFIX;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static org.apache.commons.lang3.ObjectUtils.defaultIfNull;

Expand All @@ -49,59 +48,54 @@ public class StringWorkSegmentationStrategy extends BaseWorkSegmentationStrategy
private static final String OID_BOUNDARIES = "0-9a-f";

public StringWorkSegmentationStrategy(@NotNull TaskWorkManagementType configuration, PrismContext prismContext) {
super(prismContext);
super(configuration, prismContext);
this.bucketsConfiguration = (StringWorkSegmentationType)
TaskWorkStateTypeUtil.getWorkSegmentationConfiguration(configuration);
this.marking = defaultIfNull(bucketsConfiguration.getComparisonMethod(), INTERVAL);
this.boundaries = processBoundaries();
}

@NotNull
@Override
protected List<? extends AbstractWorkBucketContentType> createAdditionalBuckets(TaskWorkStateType workState) {
protected AbstractWorkBucketContentType createAdditionalBucket(AbstractWorkBucketContentType lastBucketContent, Integer lastBucketSequentialNumber) {
if (marking == INTERVAL) {
return createAdditionalIntervalBuckets(workState);
return createAdditionalIntervalBucket(lastBucketContent, lastBucketSequentialNumber);
} else if (marking == PREFIX) {
return createAdditionalPrefixBuckets(workState);
return createAdditionalPrefixBucket(lastBucketContent, lastBucketSequentialNumber);
} else {
throw new AssertionError("unsupported marking: " + marking);
}
}

private List<? extends AbstractWorkBucketContentType> createAdditionalIntervalBuckets(TaskWorkStateType workState) {
WorkBucketType lastBucket = TaskWorkStateTypeUtil.getLastBucket(workState.getBucket());
private AbstractWorkBucketContentType createAdditionalIntervalBucket(AbstractWorkBucketContentType lastBucketContent, Integer lastBucketSequentialNumber) {
String lastBoundary;
if (lastBucket != null) {
if (!(lastBucket.getContent() instanceof StringIntervalWorkBucketContentType)) {
throw new IllegalStateException("Null or unsupported bucket content: " + lastBucket.getContent());
if (lastBucketSequentialNumber != null) {
if (!(lastBucketContent instanceof StringIntervalWorkBucketContentType)) {
throw new IllegalStateException("Null or unsupported bucket content: " + lastBucketContent);
}
StringIntervalWorkBucketContentType lastContent = (StringIntervalWorkBucketContentType) lastBucket.getContent();
StringIntervalWorkBucketContentType lastContent = (StringIntervalWorkBucketContentType) lastBucketContent;
if (lastContent.getTo() == null) {
return emptyList();
return null;
}
lastBoundary = lastContent.getTo();
} else {
lastBoundary = null;
}
StringIntervalWorkBucketContentType nextBucket =
new StringIntervalWorkBucketContentType()
.from(lastBoundary)
.to(computeNextBoundary(lastBoundary));
return singletonList(nextBucket);
return new StringIntervalWorkBucketContentType()
.from(lastBoundary)
.to(computeNextBoundary(lastBoundary));
}

private List<? extends AbstractWorkBucketContentType> createAdditionalPrefixBuckets(TaskWorkStateType workState) {
WorkBucketType lastBucket = TaskWorkStateTypeUtil.getLastBucket(workState.getBucket());
private AbstractWorkBucketContentType createAdditionalPrefixBucket(AbstractWorkBucketContentType lastBucketContent, Integer lastBucketSequentialNumber) {
String lastBoundary;
if (lastBucket != null) {
if (!(lastBucket.getContent() instanceof StringPrefixWorkBucketContentType)) {
throw new IllegalStateException("Null or unsupported bucket content: " + lastBucket.getContent());
if (lastBucketSequentialNumber != null) {
if (!(lastBucketContent instanceof StringPrefixWorkBucketContentType)) {
throw new IllegalStateException("Null or unsupported bucket content: " + lastBucketContent);
}
StringPrefixWorkBucketContentType lastContent = (StringPrefixWorkBucketContentType) lastBucket.getContent();
StringPrefixWorkBucketContentType lastContent = (StringPrefixWorkBucketContentType) lastBucketContent;
if (lastContent.getPrefix().size() > 1) {
throw new IllegalStateException("Multiple prefixes are not supported now: " + lastContent);
} else if (lastContent.getPrefix().isEmpty()) {
return emptyList();
return null;
} else {
lastBoundary = lastContent.getPrefix().get(0);
}
Expand All @@ -110,12 +104,10 @@ private List<? extends AbstractWorkBucketContentType> createAdditionalPrefixBuck
}
String nextBoundary = computeNextBoundary(lastBoundary);
if (nextBoundary != null) {
StringPrefixWorkBucketContentType nextBucket =
new StringPrefixWorkBucketContentType()
.prefix(nextBoundary);
return singletonList(nextBucket);
return new StringPrefixWorkBucketContentType()
.prefix(nextBoundary);
} else {
return emptyList();
return null;
}
}

Expand Down

0 comments on commit 424fde5

Please sign in to comment.