Skip to content

Commit

Permalink
Improve bucket management efficiency
Browse files Browse the repository at this point in the history
Implemented experimental modifyObjectDynamically method
in the repository API to avoid costly get -> compute delta ->
modify-with-precondition -> retry cycles.

This also changes the semantics of 'conflicts' in bucket management
statistics. The conflicts now represent repo-level conflicts, that
were not measured there previously. (This also required adding repo
perf statistics into ModifyObjectResult.)

Unrelated changes:
- added aggregation of repo performance information in task trees.
  • Loading branch information
mederly committed Apr 14, 2021
1 parent ada0538 commit 75dab94
Show file tree
Hide file tree
Showing 14 changed files with 624 additions and 197 deletions.
Expand Up @@ -111,6 +111,7 @@ public static OperationStatsType getOperationStatsFromTree(TaskType task, PrismC
.synchronizationInformation(new SynchronizationInformationType(prismContext))
.actionsExecutedInformation(new ActionsExecutedInformationType())
.environmentalPerformanceInformation(new EnvironmentalPerformanceInformationType())
.repositoryPerformanceInformation(new RepositoryPerformanceInformationType())
.workBucketManagementPerformanceInformation(new WorkBucketManagementPerformanceInformationType());

Stream<TaskType> subTasks = TaskTreeUtil.getAllTasksStream(task);
Expand All @@ -121,6 +122,7 @@ public static OperationStatsType getOperationStatsFromTree(TaskType task, PrismC
SynchronizationInformation.addTo(aggregate.getSynchronizationInformation(), operationStatsBean.getSynchronizationInformation());
ActionsExecutedInformation.addTo(aggregate.getActionsExecutedInformation(), operationStatsBean.getActionsExecutedInformation());
EnvironmentalPerformanceInformation.addTo(aggregate.getEnvironmentalPerformanceInformation(), operationStatsBean.getEnvironmentalPerformanceInformation());
RepositoryPerformanceInformationUtil.addTo(aggregate.getRepositoryPerformanceInformation(), operationStatsBean.getRepositoryPerformanceInformation());
TaskWorkBucketManagementPerformanceInformationUtil.addTo(aggregate.getWorkBucketManagementPerformanceInformation(), operationStatsBean.getWorkBucketManagementPerformanceInformation());
}
});
Expand Down
Expand Up @@ -9,6 +9,8 @@

import com.evolveum.midpoint.prism.PrismObject;
import com.evolveum.midpoint.prism.delta.ItemDelta;
import com.evolveum.midpoint.repo.api.perf.OperationRecord;
import com.evolveum.midpoint.util.annotation.Experimental;
import com.evolveum.midpoint.xml.ns._public.common.common_3.ObjectType;

import java.util.Collection;
Expand All @@ -24,17 +26,25 @@
*
* EXPERIMENTAL. We will probably drop objectBefore and modifications.
*/
@Experimental
public class ModifyObjectResult<T extends ObjectType> {

private final PrismObject<T> objectBefore;
private final PrismObject<T> objectAfter;
private final Collection<? extends ItemDelta> modifications;

/**
* Performance record for the current operation.
* Very experimental. Probably should be present also for other repository operation result objects.
*/
private OperationRecord performanceRecord;

public ModifyObjectResult(Collection<? extends ItemDelta> modifications) {
this(null, null, modifications);
}

public ModifyObjectResult(PrismObject<T> objectBefore, PrismObject<T> objectAfter, Collection<? extends ItemDelta> modifications) {
public ModifyObjectResult(PrismObject<T> objectBefore, PrismObject<T> objectAfter,
Collection<? extends ItemDelta> modifications) {
this.objectBefore = objectBefore;
this.objectAfter = objectAfter;
this.modifications = modifications;
Expand All @@ -48,21 +58,33 @@ public PrismObject<T> getObjectAfter() {
return objectAfter;
}

public Collection<? extends ItemDelta> getModifications() {
return modifications;
}

public OperationRecord getPerformanceRecord() {
return performanceRecord;
}

public void setPerformanceRecord(OperationRecord performanceRecord) {
this.performanceRecord = performanceRecord;
}

public int getRetries() {
return performanceRecord != null ? performanceRecord.getAttempts() - 1 : 0;
}

public long getWastedTime() {
return performanceRecord != null ? performanceRecord.getWastedTime() : 0;
}

@Override
public String toString() {
return "ModifyObjectResult{" +
"objectBefore=" + objectBefore +
", objectAfter=" + objectAfter +
", modifications=" + modifications +
", performanceRecord=" + performanceRecord +
'}';
}

// private String getDeltaDump() {
// if (objectBefore != null && objectAfter != null) {
// ObjectDelta<T> diff = objectBefore.diff(objectAfter);
// return diff.debugDump();
// } else {
// return "";
// }
// }
}
Expand Up @@ -8,6 +8,8 @@

import java.util.Collection;

import com.evolveum.midpoint.util.annotation.Experimental;

import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

Expand Down Expand Up @@ -115,6 +117,7 @@ public interface RepositoryService {
String SEARCH_CONTAINERS = CLASS_NAME_WITH_DOT + "searchContainers";
String COUNT_CONTAINERS = CLASS_NAME_WITH_DOT + "countContainers";
String MODIFY_OBJECT = CLASS_NAME_WITH_DOT + "modifyObject";
String MODIFY_OBJECT_DYNAMICALLY = CLASS_NAME_WITH_DOT + "modifyObjectDynamically";
String COUNT_OBJECTS = CLASS_NAME_WITH_DOT + "countObjects";
String GET_VERSION = CLASS_NAME_WITH_DOT + "getVersion";
String SEARCH_OBJECTS_ITERATIVE = CLASS_NAME_WITH_DOT + "searchObjectsIterative";
Expand All @@ -132,6 +135,7 @@ public interface RepositoryService {
String OP_DELETE_OBJECT = "deleteObject";
String OP_COUNT_OBJECTS = "countObjects";
String OP_MODIFY_OBJECT = "modifyObject";
String OP_MODIFY_OBJECT_DYNAMICALLY = "modifyObjectDynamically";
String OP_GET_VERSION = "getVersion";
String OP_IS_ANY_SUBORDINATE = "isAnySubordinate";
String OP_ADVANCE_SEQUENCE = "advanceSequence";
Expand Down Expand Up @@ -267,6 +271,40 @@ <T extends ObjectType> String getVersion(Class<T> type, String oid, OperationRes
@NotNull OperationResult parentResult)
throws ObjectNotFoundException, SchemaException, ObjectAlreadyExistsException, PreconditionViolationException;

/**
* Modifies an object dynamically. This means that the deltas are not provided by the caller, but computed by specified
* supplier, based on the current object state.
*
* This is to allow more complex atomic modifications with low overhead: Instead of calling getObject + compute deltas +
* modifyObject (with precondition that the object has not changed in the meanwhile) + repeating if the precondition fails,
* we now simply use modifyObjectDynamically that does all of this within a single DB transaction.
*
* BEWARE: Do not use unless really needed. Use modifyObject method instead.
*
* @param type Type of the object to modify
* @param oid OID of the object to modify
* @param getOptions Options to use when getting the original object state
* @param modificationsSupplier Supplier of the modifications (item deltas) to be applied on the object
* @param modifyOptions Options to be used when modifying the object
* @param parentResult Operation result into which we put our result
*/
@Experimental
@NotNull default <T extends ObjectType> ModifyObjectResult<T> modifyObjectDynamically(
@NotNull Class<T> type,
@NotNull String oid,
@Nullable Collection<SelectorOptions<GetOperationOptions>> getOptions,
@NotNull ModificationsSupplier<T> modificationsSupplier,
@Nullable RepoModifyOptions modifyOptions,
@NotNull OperationResult parentResult)
throws ObjectNotFoundException, SchemaException, ObjectAlreadyExistsException {
throw new UnsupportedOperationException();
}

@FunctionalInterface
interface ModificationsSupplier<T extends ObjectType> {
@NotNull Collection<? extends ItemDelta<?, ?>> get(T object) throws SchemaException;
}

/**
* <p>Deletes object with specified OID.</p>
* <p>
Expand Down
Expand Up @@ -6,6 +6,8 @@
*/
package com.evolveum.midpoint.repo.sql;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
import static org.testng.AssertJUnit.assertEquals;
import static org.testng.AssertJUnit.assertTrue;

Expand All @@ -30,15 +32,20 @@
import com.evolveum.midpoint.prism.path.ItemPath;
import com.evolveum.midpoint.prism.polystring.PolyString;
import com.evolveum.midpoint.prism.xml.XmlTypeConverter;
import com.evolveum.midpoint.repo.api.RepoModifyOptions;
import com.evolveum.midpoint.repo.api.RepositoryService;
import com.evolveum.midpoint.repo.api.RepositoryService.ModificationsSupplier;
import com.evolveum.midpoint.repo.sql.testing.SqlRepoTestUtil;
import com.evolveum.midpoint.schema.SearchResultList;
import com.evolveum.midpoint.schema.constants.ObjectTypes;
import com.evolveum.midpoint.schema.result.OperationResult;
import com.evolveum.midpoint.schema.util.ObjectTypeUtil;
import com.evolveum.midpoint.util.DebugUtil;
import com.evolveum.midpoint.util.logging.LoggingUtils;
import com.evolveum.midpoint.xml.ns._public.common.common_3.*;
import com.evolveum.prism.xml.ns._public.types_3.PolyStringType;

@SuppressWarnings("BusyWait")
@ContextConfiguration(locations = { "../../../../../ctx-test.xml" })
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS)
public class ConcurrencyTest extends BaseSQLRepoTest {
Expand Down Expand Up @@ -154,6 +161,7 @@ private void concurrencyUniversal(String name, long duration, long waitStep, Pro
checker.check(readIteration, oid);
}
if (waitStep > 0L) {
//noinspection BusyWait
Thread.sleep(waitStep);
}
for (PropertyModifierThread mt : modifierThreads) {
Expand Down Expand Up @@ -189,8 +197,6 @@ private void concurrencyUniversal(String name, long duration, long waitStep, Pro
abstract class WorkerThread extends Thread {

int id;
Class<? extends ObjectType> objectClass; // object to modify
String oid; // object to modify
String lastVersion = null;
volatile Throwable threadResult;
AtomicInteger counter = new AtomicInteger(0);
Expand Down Expand Up @@ -224,18 +230,28 @@ public void run() {
abstract void runOnce(OperationResult result) throws Exception;
abstract String description();

public void setObject(Class<? extends ObjectType> objectClass, String oid) {
this.objectClass = objectClass;
this.oid = oid;
}

@Override
public String toString() {
return description() + " @" + counter;
}
}

class PropertyModifierThread extends WorkerThread {
abstract class ObjectModificationThread extends WorkerThread {

Class<? extends ObjectType> objectClass;
String oid;

ObjectModificationThread(int id) {
super(id);
}

public void setObject(Class<? extends ObjectType> objectClass, String oid) {
this.objectClass = objectClass;
this.oid = oid;
}
}

class PropertyModifierThread extends ObjectModificationThread {

final ItemPath attribute1; // attribute to modify
final ItemPath attribute2; // attribute to modify
Expand Down Expand Up @@ -434,7 +450,7 @@ void runOnce(OperationResult result) throws Exception {
protected abstract String getOidToDelete();
}

abstract class DeltaExecutionThread extends WorkerThread {
abstract class DeltaExecutionThread extends ObjectModificationThread {

String description;

Expand Down Expand Up @@ -554,6 +570,8 @@ public void test110AddAssignments() throws Exception {
int THREADS = 8;
long DURATION = 30000L;

AtomicInteger globalCounter = new AtomicInteger();

UserType user = new UserType(prismContext).name("alice");

OperationResult result = new OperationResult("test110AddAssignments");
Expand All @@ -570,6 +588,7 @@ public void test110AddAssignments() throws Exception {
DeltaExecutionThread thread = new DeltaExecutionThread(i, UserType.class, oid, "assignment adder #" + i) {
@Override
Collection<ItemDelta<?, ?>> getItemDeltas() throws Exception {
globalCounter.incrementAndGet();
return prismContext.deltaFor(UserType.class)
.item(UserType.F_ASSIGNMENT).add(
new AssignmentType(prismContext)
Expand All @@ -584,6 +603,9 @@ public void test110AddAssignments() throws Exception {
waitForThreads(threads, DURATION);
PrismObject<UserType> userAfter = repositoryService.getObject(UserType.class, oid, null, result);
displayValue("user after", userAfter);
assertThat(userAfter.asObjectable().getAssignment().size())
.as("# of assignments")
.isEqualTo(globalCounter.get());
}

@Test
Expand Down Expand Up @@ -695,7 +717,7 @@ protected PrismObject<UserType> getObjectToAdd() {
AtomicInteger objectsPointer = new AtomicInteger(0);
List<DeleteObjectsThread<UserType>> deleteThreads = new ArrayList<>();
for (int i = 0; i < DELETE_THREADS; i++) {
DeleteObjectsThread<UserType> thread = new DeleteObjectsThread<UserType>(i, UserType.class, "deleter #" + i) {
DeleteObjectsThread<UserType> thread = new DeleteObjectsThread<>(i, UserType.class, "deleter #" + i) {
@Override
protected String getOidToDelete() {
int pointer = objectsPointer.getAndIncrement();
Expand All @@ -716,6 +738,85 @@ protected String getOidToDelete() {
repositoryService.countObjects(UserType.class, null, null, result));
}

/**
* Here we test concurrent work bucket creation using {@link RepositoryService#modifyObjectDynamically(Class, String, Collection, RepositoryService.ModificationsSupplier, RepoModifyOptions, OperationResult)}
* method.
*
* Note that on H2 this test passes only due to explicit locking (see ExplicitAccessLock helper class).
*/
@Test
public void test140WorkBuckets() throws Exception {

int THREADS = 8;
long DURATION = 30000L;

TaskType task = new TaskType(prismContext).name("test140");

OperationResult result = new OperationResult("test140WorkBuckets");
String oid = repositoryService.addObject(task.asPrismObject(), null, result);

displayValue("object added", oid);

logger.info("Starting worker threads");

List<WorkerThread> threads = new ArrayList<>();
for (int i = 0; i < THREADS; i++) {
final int threadIndex = i;

WorkerThread thread = new WorkerThread(i) {
@Override
void runOnce(OperationResult result) throws Exception {
ModificationsSupplier<TaskType> modificationSupplier =
task -> prismContext.deltaFor(TaskType.class)
.item(TaskType.F_WORK_STATE, TaskWorkStateType.F_BUCKET)
.add(getNextBucket(task))
.asItemDeltas();
repositoryService.modifyObjectDynamically(TaskType.class, oid, null, modificationSupplier, null, result);
}

private WorkBucketType getNextBucket(TaskType task) {
int lastBucketNumber = task.getWorkState() != null ? getLastBucketNumber(task.getWorkState().getBucket()) : 0;
return new WorkBucketType(prismContext)
.sequentialNumber(lastBucketNumber + 1)
.state(WorkBucketStateType.DELEGATED)
.workerRef(String.valueOf(threadIndex), TaskType.COMPLEX_TYPE);
}

private int getLastBucketNumber(List<WorkBucketType> buckets) {
return buckets.stream()
.mapToInt(WorkBucketType::getSequentialNumber)
.max().orElse(0);
}

@Override
String description() {
return "Bucket computer thread #" + threadIndex;
}
};
thread.start();
threads.add(thread);
}

waitForThreads(threads, DURATION);
PrismObject<TaskType> taskAfter = repositoryService.getObject(TaskType.class, oid, null, result);
displayValue("user after", taskAfter);

assertCorrectBucketSequence(taskAfter.asObjectable().getWorkState().getBucket());
}

private void assertCorrectBucketSequence(List<WorkBucketType> buckets) {
for (int i = 1; i <= buckets.size(); i++) {
int sequentialNumber = i;
List<WorkBucketType> selected = buckets.stream()
.filter(b -> b.getSequentialNumber() == sequentialNumber)
.collect(Collectors.toList());
if (selected.size() != 1) {
fail("Unexpected # of bucket with sequential number " + sequentialNumber + ":\n" +
DebugUtil.debugDump(selected, 1));
}
}
}

private void waitForThreadsFinish(List<? extends WorkerThread> threads, long timeout) throws InterruptedException {
logger.info("*** Waiting until finish, at most {} ms ***", timeout);
long startTime = System.currentTimeMillis();
Expand Down

0 comments on commit 75dab94

Please sign in to comment.