From 75dab94d3fa25654df149cb8fdbbd01ee9f4b39a Mon Sep 17 00:00:00 2001 From: Pavol Mederly Date: Wed, 14 Apr 2021 12:35:29 +0200 Subject: [PATCH] Improve bucket management efficiency Implemented experimental modifyObjectDynamically method in the repository API to avoid costly get -> compute delta -> modify-with-precondition -> retry cycles. This also changes the semantics of 'conflicts' in bucket management statistics. The conflicts now represent repo-level conflicts, that were not measured there previously. (This also required adding repo perf statistics into ModifyObjectResult.) Unrelated changes: - added aggregation of repo performance information in task trees. --- .../util/task/TaskOperationStatsUtil.java | 2 + .../midpoint/repo/api/ModifyObjectResult.java | 42 ++- .../midpoint/repo/api/RepositoryService.java | 38 ++ .../midpoint/repo/sql/ConcurrencyTest.java | 121 ++++++- .../midpoint/repo/sql/ModifyTest.java | 26 ++ .../midpoint/repo/sql/OperationLogger.java | 14 + .../repo/sql/SqlRepositoryServiceImpl.java | 99 ++++- .../repo/sql/helpers/ExplicitAccessLock.java | 50 +++ .../repo/sql/helpers/ObjectUpdater.java | 80 ++++- .../perfmon/SqlPerformanceMonitorImpl.java | 4 +- .../quartzimpl/work/WorkStateManager.java | 337 +++++++++--------- .../WorkSegmentationStrategy.java | 2 +- .../buckets/task-recompute-16-04.xml | 3 + .../buckets/task-recompute-256-30.xml | 3 + 14 files changed, 624 insertions(+), 197 deletions(-) create mode 100644 repo/repo-sql-impl/src/main/java/com/evolveum/midpoint/repo/sql/helpers/ExplicitAccessLock.java diff --git a/infra/schema/src/main/java/com/evolveum/midpoint/schema/util/task/TaskOperationStatsUtil.java b/infra/schema/src/main/java/com/evolveum/midpoint/schema/util/task/TaskOperationStatsUtil.java index a9cbf90c005..a7c9dabda66 100644 --- a/infra/schema/src/main/java/com/evolveum/midpoint/schema/util/task/TaskOperationStatsUtil.java +++ b/infra/schema/src/main/java/com/evolveum/midpoint/schema/util/task/TaskOperationStatsUtil.java @@ -111,6 +111,7 @@ public static OperationStatsType getOperationStatsFromTree(TaskType task, PrismC .synchronizationInformation(new SynchronizationInformationType(prismContext)) .actionsExecutedInformation(new ActionsExecutedInformationType()) .environmentalPerformanceInformation(new EnvironmentalPerformanceInformationType()) + .repositoryPerformanceInformation(new RepositoryPerformanceInformationType()) .workBucketManagementPerformanceInformation(new WorkBucketManagementPerformanceInformationType()); Stream subTasks = TaskTreeUtil.getAllTasksStream(task); @@ -121,6 +122,7 @@ public static OperationStatsType getOperationStatsFromTree(TaskType task, PrismC SynchronizationInformation.addTo(aggregate.getSynchronizationInformation(), operationStatsBean.getSynchronizationInformation()); ActionsExecutedInformation.addTo(aggregate.getActionsExecutedInformation(), operationStatsBean.getActionsExecutedInformation()); EnvironmentalPerformanceInformation.addTo(aggregate.getEnvironmentalPerformanceInformation(), operationStatsBean.getEnvironmentalPerformanceInformation()); + RepositoryPerformanceInformationUtil.addTo(aggregate.getRepositoryPerformanceInformation(), operationStatsBean.getRepositoryPerformanceInformation()); TaskWorkBucketManagementPerformanceInformationUtil.addTo(aggregate.getWorkBucketManagementPerformanceInformation(), operationStatsBean.getWorkBucketManagementPerformanceInformation()); } }); diff --git a/repo/repo-api/src/main/java/com/evolveum/midpoint/repo/api/ModifyObjectResult.java b/repo/repo-api/src/main/java/com/evolveum/midpoint/repo/api/ModifyObjectResult.java index 248ec6a6cb6..ec3cf8a413d 100644 --- a/repo/repo-api/src/main/java/com/evolveum/midpoint/repo/api/ModifyObjectResult.java +++ b/repo/repo-api/src/main/java/com/evolveum/midpoint/repo/api/ModifyObjectResult.java @@ -9,6 +9,8 @@ import com.evolveum.midpoint.prism.PrismObject; import com.evolveum.midpoint.prism.delta.ItemDelta; +import com.evolveum.midpoint.repo.api.perf.OperationRecord; +import com.evolveum.midpoint.util.annotation.Experimental; import com.evolveum.midpoint.xml.ns._public.common.common_3.ObjectType; import java.util.Collection; @@ -24,17 +26,25 @@ * * EXPERIMENTAL. We will probably drop objectBefore and modifications. */ +@Experimental public class ModifyObjectResult { private final PrismObject objectBefore; private final PrismObject objectAfter; private final Collection modifications; + /** + * Performance record for the current operation. + * Very experimental. Probably should be present also for other repository operation result objects. + */ + private OperationRecord performanceRecord; + public ModifyObjectResult(Collection modifications) { this(null, null, modifications); } - public ModifyObjectResult(PrismObject objectBefore, PrismObject objectAfter, Collection modifications) { + public ModifyObjectResult(PrismObject objectBefore, PrismObject objectAfter, + Collection modifications) { this.objectBefore = objectBefore; this.objectAfter = objectAfter; this.modifications = modifications; @@ -48,21 +58,33 @@ public PrismObject getObjectAfter() { return objectAfter; } + public Collection getModifications() { + return modifications; + } + + public OperationRecord getPerformanceRecord() { + return performanceRecord; + } + + public void setPerformanceRecord(OperationRecord performanceRecord) { + this.performanceRecord = performanceRecord; + } + + public int getRetries() { + return performanceRecord != null ? performanceRecord.getAttempts() - 1 : 0; + } + + public long getWastedTime() { + return performanceRecord != null ? performanceRecord.getWastedTime() : 0; + } + @Override public String toString() { return "ModifyObjectResult{" + "objectBefore=" + objectBefore + ", objectAfter=" + objectAfter + ", modifications=" + modifications + + ", performanceRecord=" + performanceRecord + '}'; } - -// private String getDeltaDump() { -// if (objectBefore != null && objectAfter != null) { -// ObjectDelta diff = objectBefore.diff(objectAfter); -// return diff.debugDump(); -// } else { -// return ""; -// } -// } } diff --git a/repo/repo-api/src/main/java/com/evolveum/midpoint/repo/api/RepositoryService.java b/repo/repo-api/src/main/java/com/evolveum/midpoint/repo/api/RepositoryService.java index a9cb6c9e8b8..5dba30a169c 100644 --- a/repo/repo-api/src/main/java/com/evolveum/midpoint/repo/api/RepositoryService.java +++ b/repo/repo-api/src/main/java/com/evolveum/midpoint/repo/api/RepositoryService.java @@ -8,6 +8,8 @@ import java.util.Collection; +import com.evolveum.midpoint.util.annotation.Experimental; + import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -115,6 +117,7 @@ public interface RepositoryService { String SEARCH_CONTAINERS = CLASS_NAME_WITH_DOT + "searchContainers"; String COUNT_CONTAINERS = CLASS_NAME_WITH_DOT + "countContainers"; String MODIFY_OBJECT = CLASS_NAME_WITH_DOT + "modifyObject"; + String MODIFY_OBJECT_DYNAMICALLY = CLASS_NAME_WITH_DOT + "modifyObjectDynamically"; String COUNT_OBJECTS = CLASS_NAME_WITH_DOT + "countObjects"; String GET_VERSION = CLASS_NAME_WITH_DOT + "getVersion"; String SEARCH_OBJECTS_ITERATIVE = CLASS_NAME_WITH_DOT + "searchObjectsIterative"; @@ -132,6 +135,7 @@ public interface RepositoryService { String OP_DELETE_OBJECT = "deleteObject"; String OP_COUNT_OBJECTS = "countObjects"; String OP_MODIFY_OBJECT = "modifyObject"; + String OP_MODIFY_OBJECT_DYNAMICALLY = "modifyObjectDynamically"; String OP_GET_VERSION = "getVersion"; String OP_IS_ANY_SUBORDINATE = "isAnySubordinate"; String OP_ADVANCE_SEQUENCE = "advanceSequence"; @@ -267,6 +271,40 @@ String getVersion(Class type, String oid, OperationRes @NotNull OperationResult parentResult) throws ObjectNotFoundException, SchemaException, ObjectAlreadyExistsException, PreconditionViolationException; + /** + * Modifies an object dynamically. This means that the deltas are not provided by the caller, but computed by specified + * supplier, based on the current object state. + * + * This is to allow more complex atomic modifications with low overhead: Instead of calling getObject + compute deltas + + * modifyObject (with precondition that the object has not changed in the meanwhile) + repeating if the precondition fails, + * we now simply use modifyObjectDynamically that does all of this within a single DB transaction. + * + * BEWARE: Do not use unless really needed. Use modifyObject method instead. + * + * @param type Type of the object to modify + * @param oid OID of the object to modify + * @param getOptions Options to use when getting the original object state + * @param modificationsSupplier Supplier of the modifications (item deltas) to be applied on the object + * @param modifyOptions Options to be used when modifying the object + * @param parentResult Operation result into which we put our result + */ + @Experimental + @NotNull default ModifyObjectResult modifyObjectDynamically( + @NotNull Class type, + @NotNull String oid, + @Nullable Collection> getOptions, + @NotNull ModificationsSupplier modificationsSupplier, + @Nullable RepoModifyOptions modifyOptions, + @NotNull OperationResult parentResult) + throws ObjectNotFoundException, SchemaException, ObjectAlreadyExistsException { + throw new UnsupportedOperationException(); + } + + @FunctionalInterface + interface ModificationsSupplier { + @NotNull Collection> get(T object) throws SchemaException; + } + /** *

Deletes object with specified OID.

*

diff --git a/repo/repo-sql-impl-test/src/test/java/com/evolveum/midpoint/repo/sql/ConcurrencyTest.java b/repo/repo-sql-impl-test/src/test/java/com/evolveum/midpoint/repo/sql/ConcurrencyTest.java index 0e55be614f0..6e08f03881b 100644 --- a/repo/repo-sql-impl-test/src/test/java/com/evolveum/midpoint/repo/sql/ConcurrencyTest.java +++ b/repo/repo-sql-impl-test/src/test/java/com/evolveum/midpoint/repo/sql/ConcurrencyTest.java @@ -6,6 +6,8 @@ */ package com.evolveum.midpoint.repo.sql; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; import static org.testng.AssertJUnit.assertEquals; import static org.testng.AssertJUnit.assertTrue; @@ -30,15 +32,20 @@ import com.evolveum.midpoint.prism.path.ItemPath; import com.evolveum.midpoint.prism.polystring.PolyString; import com.evolveum.midpoint.prism.xml.XmlTypeConverter; +import com.evolveum.midpoint.repo.api.RepoModifyOptions; +import com.evolveum.midpoint.repo.api.RepositoryService; +import com.evolveum.midpoint.repo.api.RepositoryService.ModificationsSupplier; import com.evolveum.midpoint.repo.sql.testing.SqlRepoTestUtil; import com.evolveum.midpoint.schema.SearchResultList; import com.evolveum.midpoint.schema.constants.ObjectTypes; import com.evolveum.midpoint.schema.result.OperationResult; import com.evolveum.midpoint.schema.util.ObjectTypeUtil; +import com.evolveum.midpoint.util.DebugUtil; import com.evolveum.midpoint.util.logging.LoggingUtils; import com.evolveum.midpoint.xml.ns._public.common.common_3.*; import com.evolveum.prism.xml.ns._public.types_3.PolyStringType; +@SuppressWarnings("BusyWait") @ContextConfiguration(locations = { "../../../../../ctx-test.xml" }) @DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS) public class ConcurrencyTest extends BaseSQLRepoTest { @@ -154,6 +161,7 @@ private void concurrencyUniversal(String name, long duration, long waitStep, Pro checker.check(readIteration, oid); } if (waitStep > 0L) { + //noinspection BusyWait Thread.sleep(waitStep); } for (PropertyModifierThread mt : modifierThreads) { @@ -189,8 +197,6 @@ private void concurrencyUniversal(String name, long duration, long waitStep, Pro abstract class WorkerThread extends Thread { int id; - Class objectClass; // object to modify - String oid; // object to modify String lastVersion = null; volatile Throwable threadResult; AtomicInteger counter = new AtomicInteger(0); @@ -224,18 +230,28 @@ public void run() { abstract void runOnce(OperationResult result) throws Exception; abstract String description(); - public void setObject(Class objectClass, String oid) { - this.objectClass = objectClass; - this.oid = oid; - } - @Override public String toString() { return description() + " @" + counter; } } - class PropertyModifierThread extends WorkerThread { + abstract class ObjectModificationThread extends WorkerThread { + + Class objectClass; + String oid; + + ObjectModificationThread(int id) { + super(id); + } + + public void setObject(Class objectClass, String oid) { + this.objectClass = objectClass; + this.oid = oid; + } + } + + class PropertyModifierThread extends ObjectModificationThread { final ItemPath attribute1; // attribute to modify final ItemPath attribute2; // attribute to modify @@ -434,7 +450,7 @@ void runOnce(OperationResult result) throws Exception { protected abstract String getOidToDelete(); } - abstract class DeltaExecutionThread extends WorkerThread { + abstract class DeltaExecutionThread extends ObjectModificationThread { String description; @@ -554,6 +570,8 @@ public void test110AddAssignments() throws Exception { int THREADS = 8; long DURATION = 30000L; + AtomicInteger globalCounter = new AtomicInteger(); + UserType user = new UserType(prismContext).name("alice"); OperationResult result = new OperationResult("test110AddAssignments"); @@ -570,6 +588,7 @@ public void test110AddAssignments() throws Exception { DeltaExecutionThread thread = new DeltaExecutionThread(i, UserType.class, oid, "assignment adder #" + i) { @Override Collection> getItemDeltas() throws Exception { + globalCounter.incrementAndGet(); return prismContext.deltaFor(UserType.class) .item(UserType.F_ASSIGNMENT).add( new AssignmentType(prismContext) @@ -584,6 +603,9 @@ public void test110AddAssignments() throws Exception { waitForThreads(threads, DURATION); PrismObject userAfter = repositoryService.getObject(UserType.class, oid, null, result); displayValue("user after", userAfter); + assertThat(userAfter.asObjectable().getAssignment().size()) + .as("# of assignments") + .isEqualTo(globalCounter.get()); } @Test @@ -695,7 +717,7 @@ protected PrismObject getObjectToAdd() { AtomicInteger objectsPointer = new AtomicInteger(0); List> deleteThreads = new ArrayList<>(); for (int i = 0; i < DELETE_THREADS; i++) { - DeleteObjectsThread thread = new DeleteObjectsThread(i, UserType.class, "deleter #" + i) { + DeleteObjectsThread thread = new DeleteObjectsThread<>(i, UserType.class, "deleter #" + i) { @Override protected String getOidToDelete() { int pointer = objectsPointer.getAndIncrement(); @@ -716,6 +738,85 @@ protected String getOidToDelete() { repositoryService.countObjects(UserType.class, null, null, result)); } + /** + * Here we test concurrent work bucket creation using {@link RepositoryService#modifyObjectDynamically(Class, String, Collection, RepositoryService.ModificationsSupplier, RepoModifyOptions, OperationResult)} + * method. + * + * Note that on H2 this test passes only due to explicit locking (see ExplicitAccessLock helper class). + */ + @Test + public void test140WorkBuckets() throws Exception { + + int THREADS = 8; + long DURATION = 30000L; + + TaskType task = new TaskType(prismContext).name("test140"); + + OperationResult result = new OperationResult("test140WorkBuckets"); + String oid = repositoryService.addObject(task.asPrismObject(), null, result); + + displayValue("object added", oid); + + logger.info("Starting worker threads"); + + List threads = new ArrayList<>(); + for (int i = 0; i < THREADS; i++) { + final int threadIndex = i; + + WorkerThread thread = new WorkerThread(i) { + @Override + void runOnce(OperationResult result) throws Exception { + ModificationsSupplier modificationSupplier = + task -> prismContext.deltaFor(TaskType.class) + .item(TaskType.F_WORK_STATE, TaskWorkStateType.F_BUCKET) + .add(getNextBucket(task)) + .asItemDeltas(); + repositoryService.modifyObjectDynamically(TaskType.class, oid, null, modificationSupplier, null, result); + } + + private WorkBucketType getNextBucket(TaskType task) { + int lastBucketNumber = task.getWorkState() != null ? getLastBucketNumber(task.getWorkState().getBucket()) : 0; + return new WorkBucketType(prismContext) + .sequentialNumber(lastBucketNumber + 1) + .state(WorkBucketStateType.DELEGATED) + .workerRef(String.valueOf(threadIndex), TaskType.COMPLEX_TYPE); + } + + private int getLastBucketNumber(List buckets) { + return buckets.stream() + .mapToInt(WorkBucketType::getSequentialNumber) + .max().orElse(0); + } + + @Override + String description() { + return "Bucket computer thread #" + threadIndex; + } + }; + thread.start(); + threads.add(thread); + } + + waitForThreads(threads, DURATION); + PrismObject taskAfter = repositoryService.getObject(TaskType.class, oid, null, result); + displayValue("user after", taskAfter); + + assertCorrectBucketSequence(taskAfter.asObjectable().getWorkState().getBucket()); + } + + private void assertCorrectBucketSequence(List buckets) { + for (int i = 1; i <= buckets.size(); i++) { + int sequentialNumber = i; + List selected = buckets.stream() + .filter(b -> b.getSequentialNumber() == sequentialNumber) + .collect(Collectors.toList()); + if (selected.size() != 1) { + fail("Unexpected # of bucket with sequential number " + sequentialNumber + ":\n" + + DebugUtil.debugDump(selected, 1)); + } + } + } + private void waitForThreadsFinish(List threads, long timeout) throws InterruptedException { logger.info("*** Waiting until finish, at most {} ms ***", timeout); long startTime = System.currentTimeMillis(); diff --git a/repo/repo-sql-impl-test/src/test/java/com/evolveum/midpoint/repo/sql/ModifyTest.java b/repo/repo-sql-impl-test/src/test/java/com/evolveum/midpoint/repo/sql/ModifyTest.java index 1c4d2083cd2..206475e5760 100644 --- a/repo/repo-sql-impl-test/src/test/java/com/evolveum/midpoint/repo/sql/ModifyTest.java +++ b/repo/repo-sql-impl-test/src/test/java/com/evolveum/midpoint/repo/sql/ModifyTest.java @@ -365,6 +365,32 @@ public void test050ModifyUserEmployeeNumber() throws Exception { assertUserEmployeeNumber(user.getOid(), "new"); } + @Test + public void test052ModifyUserEmployeeNumberDynamically() throws Exception { + OperationResult result = createOperationResult(); + UserType user = new UserType(prismContext) + .oid("052") + .name("user052") + .employeeNumber("old"); + + repositoryService.addObject(user.asPrismObject(), null, result); + assertUserEmployeeNumber(user.getOid(), "old"); + + repositoryService.modifyObjectDynamically(UserType.class, user.getOid(), null, userBefore -> { + String employeeNumber = userBefore.getEmployeeNumber(); + if ("old".equals(employeeNumber)) { + return prismContext.deltaFor(UserType.class) + .item(UserType.F_EMPLOYEE_NUMBER) + .add("new") + .delete("old") + .asItemDeltas(); + } else { + throw new IllegalStateException("employeeNumber is not 'old': " + employeeNumber); + } + }, getModifyOptions(), result); + assertUserEmployeeNumber(user.getOid(), "new"); + } + @Test // MID-4801 public void test055DeleteUserEmployeeNumberWrong() throws Exception { OperationResult result = createOperationResult(); diff --git a/repo/repo-sql-impl/src/main/java/com/evolveum/midpoint/repo/sql/OperationLogger.java b/repo/repo-sql-impl/src/main/java/com/evolveum/midpoint/repo/sql/OperationLogger.java index 00626b1bb81..51fb6750e8c 100644 --- a/repo/repo-sql-impl/src/main/java/com/evolveum/midpoint/repo/sql/OperationLogger.java +++ b/repo/repo-sql-impl/src/main/java/com/evolveum/midpoint/repo/sql/OperationLogger.java @@ -11,6 +11,7 @@ import com.evolveum.midpoint.prism.PrismObject; import com.evolveum.midpoint.prism.delta.ItemDelta; import com.evolveum.midpoint.repo.api.ModificationPrecondition; +import com.evolveum.midpoint.repo.api.ModifyObjectResult; import com.evolveum.midpoint.repo.api.RepoAddOptions; import com.evolveum.midpoint.repo.api.RepoModifyOptions; import com.evolveum.midpoint.schema.GetOperationOptions; @@ -51,6 +52,19 @@ public static void logModify(Class type, String oid, C DebugUtil.debugDump(modifications, 1, false)); } + public static void logModifyDynamically(Class type, String oid, + ModifyObjectResult modifyObjectResult, Collection> getOptions, + RepoModifyOptions modifyOptions, OperationResult result) { + if (!LOGGER_OP.isDebugEnabled()) { + return; + } + Collection modifications = modifyObjectResult != null ? modifyObjectResult.getModifications() : null; + LOGGER_OP.debug("{} modify dynamically {} {}{}: {}\n{}", PREFIX, type.getSimpleName(), oid, + shortDumpOptions(modifyOptions), + getStatus(result), + DebugUtil.debugDump(modifications, 1, false)); + } + public static void logDelete(Class type, String oid, OperationResult subResult) { if (!LOGGER_OP.isDebugEnabled()) { return; diff --git a/repo/repo-sql-impl/src/main/java/com/evolveum/midpoint/repo/sql/SqlRepositoryServiceImpl.java b/repo/repo-sql-impl/src/main/java/com/evolveum/midpoint/repo/sql/SqlRepositoryServiceImpl.java index 600fc16510f..7559d56c7f5 100644 --- a/repo/repo-sql-impl/src/main/java/com/evolveum/midpoint/repo/sql/SqlRepositoryServiceImpl.java +++ b/repo/repo-sql-impl/src/main/java/com/evolveum/midpoint/repo/sql/SqlRepositoryServiceImpl.java @@ -19,6 +19,8 @@ import javax.annotation.PreDestroy; import javax.xml.namespace.QName; +import com.evolveum.midpoint.util.annotation.Experimental; + import org.apache.commons.lang3.Validate; import org.hibernate.Session; import org.hibernate.SessionFactory; @@ -557,6 +559,54 @@ public ModifyObjectResult modifyObject( return new ModifyObjectResult<>(modifications); } + checkModifications(modifications); + logNameChange(modifications); + + // TODO executeAttempts? + final String operation = "modifying"; + int attempt = 1; + int restarts = 0; + + boolean noFetchExtensionValueInsertionForbidden = false; + + SqlPerformanceMonitorImpl pm = getPerformanceMonitor(); + long opHandle = pm.registerOperationStart(OP_MODIFY_OBJECT, type); + + try { + while (true) { + try { + ModifyObjectResult rv = objectUpdater.modifyObjectAttempt(type, oid, modifications, precondition, options, + attempt, subResult, this, noFetchExtensionValueInsertionForbidden, null); + invokeConflictWatchers((w) -> w.afterModifyObject(oid)); + rv.setPerformanceRecord( + pm.registerOperationFinish(opHandle, attempt)); + return rv; + } catch (RestartOperationRequestedException ex) { + // special case: we want to restart but we do not want to count these + LOGGER.trace("Restarting because of {}", ex.getMessage()); + restarts++; + if (restarts > RESTART_LIMIT) { + throw new IllegalStateException("Too many operation restarts"); + } else if (ex.isForbidNoFetchExtensionValueAddition()) { + noFetchExtensionValueInsertionForbidden = true; + } + } catch (RuntimeException ex) { + attempt = baseHelper.logOperationAttempt(oid, operation, attempt, ex, subResult); + pm.registerOperationNewAttempt(opHandle, attempt); + } + } + } catch (Throwable t) { + LOGGER.debug("Got exception while processing modifications on {}:{}:\n{}", type.getSimpleName(), oid, + DebugUtil.debugDump(modifications), t); + pm.registerOperationFinish(opHandle, attempt); + throw t; + } finally { + OperationLogger.logModify(type, oid, modifications, precondition, options, subResult); + } + } + + private void checkModifications(@NotNull Collection> modifications) { + if (InternalsConfig.encryptionChecks) { CryptoUtil.checkEncrypted(modifications); } @@ -566,7 +616,9 @@ public ModifyObjectResult modifyObject( } else { ItemDeltaCollectionsUtil.checkConsistence(modifications, ConsistencyCheckScope.MANDATORY_CHECKS_ONLY); } + } + private void logNameChange(@NotNull Collection> modifications) { if (LOGGER.isTraceEnabled()) { for (ItemDelta modification : modifications) { if (modification instanceof PropertyDelta) { @@ -581,6 +633,24 @@ public ModifyObjectResult modifyObject( } } } + } + + @NotNull + @Override + @Experimental + public ModifyObjectResult modifyObjectDynamically( + @NotNull Class type, + @NotNull String oid, + @Nullable Collection> getOptions, + @NotNull ModificationsSupplier modificationsSupplier, + RepoModifyOptions modifyOptions, + @NotNull OperationResult parentResult) + throws ObjectNotFoundException, SchemaException, ObjectAlreadyExistsException { + + Validate.notNull(type, "Object class in delta must not be null."); + Validate.notEmpty(oid, "Oid must not null or empty."); + Validate.notNull(modificationsSupplier, "Modifications supplier must not be null."); + Validate.notNull(parentResult, "Operation result must not be null."); // TODO executeAttempts? final String operation = "modifying"; @@ -590,14 +660,29 @@ public ModifyObjectResult modifyObject( boolean noFetchExtensionValueInsertionForbidden = false; SqlPerformanceMonitorImpl pm = getPerformanceMonitor(); - long opHandle = pm.registerOperationStart(OP_MODIFY_OBJECT, type); + long opHandle = pm.registerOperationStart(OP_MODIFY_OBJECT_DYNAMICALLY, type); + + OperationResult result = parentResult.subresult(MODIFY_OBJECT_DYNAMICALLY) + .addQualifier(type.getSimpleName()) + .addParam("type", type.getName()) + .addParam("oid", oid) + .build(); + ModifyObjectResult rv = null; try { while (true) { try { - ModifyObjectResult rv = objectUpdater.modifyObjectAttempt(type, oid, modifications, precondition, options, - attempt, subResult, this, noFetchExtensionValueInsertionForbidden); + ModificationsSupplier innerModificationsSupplier = object -> { + Collection> modifications = modificationsSupplier.get(object); + checkModifications(modifications); + logNameChange(modifications); + return modifications; + }; + rv = objectUpdater.modifyObjectDynamicallyAttempt(type, oid, getOptions, innerModificationsSupplier, + modifyOptions, attempt, result, this, noFetchExtensionValueInsertionForbidden); invokeConflictWatchers((w) -> w.afterModifyObject(oid)); + rv.setPerformanceRecord( + pm.registerOperationFinish(opHandle, attempt)); return rv; } catch (RestartOperationRequestedException ex) { // special case: we want to restart but we do not want to count these @@ -609,16 +694,16 @@ public ModifyObjectResult modifyObject( noFetchExtensionValueInsertionForbidden = true; } } catch (RuntimeException ex) { - attempt = baseHelper.logOperationAttempt(oid, operation, attempt, ex, subResult); + attempt = baseHelper.logOperationAttempt(oid, operation, attempt, ex, result); pm.registerOperationNewAttempt(opHandle, attempt); } } } catch (Throwable t) { - LOGGER.debug("Got exception while processing modifications on {}:{}:\n{}", type.getSimpleName(), oid, DebugUtil.debugDump(modifications), t); + LOGGER.debug("Got exception while processing dynamic modifications on {}:{}", type.getSimpleName(), oid, t); + pm.registerOperationFinish(opHandle, attempt); throw t; } finally { - pm.registerOperationFinish(opHandle, attempt); - OperationLogger.logModify(type, oid, modifications, precondition, options, subResult); + OperationLogger.logModifyDynamically(type, oid, rv, getOptions, modifyOptions, result); } } diff --git a/repo/repo-sql-impl/src/main/java/com/evolveum/midpoint/repo/sql/helpers/ExplicitAccessLock.java b/repo/repo-sql-impl/src/main/java/com/evolveum/midpoint/repo/sql/helpers/ExplicitAccessLock.java new file mode 100644 index 00000000000..c318bbb4fe7 --- /dev/null +++ b/repo/repo-sql-impl/src/main/java/com/evolveum/midpoint/repo/sql/helpers/ExplicitAccessLock.java @@ -0,0 +1,50 @@ +/* + * Copyright (C) 2010-2021 Evolveum and contributors + * + * This work is dual-licensed under the Apache License 2.0 + * and European Union Public License. See LICENSE file for details. + */ + +package com.evolveum.midpoint.repo.sql.helpers; + +import com.evolveum.midpoint.util.annotation.Experimental; +import com.evolveum.midpoint.util.exception.SystemException; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Semaphore; + +/** + * Works around the inability of H2 database to correctly serialize concurrent object modifications + * by providing explicit per-OID locking. + * + * Assumes that there is only a single midPoint instance running, therefore local locking is sufficient. + * + * NOT TO BE USED IN PRODUCTION. + * CALLERS: MAKE SURE YOU USE IT ONLY WITH CONNECTION TO H2 DATABASE. + */ +@Experimental +class ExplicitAccessLock { + + private static final Map SEMAPHORE_MAP = new ConcurrentHashMap<>(); + + private final Semaphore semaphore; + + private ExplicitAccessLock(Semaphore semaphore) { + this.semaphore = semaphore; + } + + static ExplicitAccessLock acquireFor(String oid) { + Semaphore semaphore = SEMAPHORE_MAP.computeIfAbsent(oid, s -> new Semaphore(1)); + try { + semaphore.acquire(); + } catch (InterruptedException e) { + throw new SystemException("Unexpected InterruptedException", e); + } + return new ExplicitAccessLock(semaphore); + } + + void release() { + semaphore.release(); + } +} diff --git a/repo/repo-sql-impl/src/main/java/com/evolveum/midpoint/repo/sql/helpers/ObjectUpdater.java b/repo/repo-sql-impl/src/main/java/com/evolveum/midpoint/repo/sql/helpers/ObjectUpdater.java index 8e92f2a121d..2e6a29863df 100644 --- a/repo/repo-sql-impl/src/main/java/com/evolveum/midpoint/repo/sql/helpers/ObjectUpdater.java +++ b/repo/repo-sql-impl/src/main/java/com/evolveum/midpoint/repo/sql/helpers/ObjectUpdater.java @@ -13,6 +13,12 @@ import java.util.List; import javax.persistence.PersistenceException; +import com.evolveum.midpoint.schema.*; + +import com.evolveum.midpoint.schema.result.OperationResultStatus; + +import com.evolveum.midpoint.util.exception.SystemException; + import org.apache.commons.lang3.StringUtils; import org.hibernate.Session; import org.hibernate.exception.ConstraintViolationException; @@ -41,9 +47,6 @@ import com.evolveum.midpoint.repo.sql.data.common.dictionary.ExtItemDictionary; import com.evolveum.midpoint.repo.sql.helpers.delta.ObjectDeltaUpdater; import com.evolveum.midpoint.repo.sql.util.*; -import com.evolveum.midpoint.schema.GetOperationOptionsBuilder; -import com.evolveum.midpoint.schema.RelationRegistry; -import com.evolveum.midpoint.schema.SchemaService; import com.evolveum.midpoint.schema.result.OperationResult; import com.evolveum.midpoint.schema.util.ExceptionUtil; import com.evolveum.midpoint.schema.util.ObjectTypeUtil; @@ -341,11 +344,15 @@ public DeleteObjectResult deleteObjectAttempt(Class ty } } + /** + * @param externalSession If non-null, this session is used to execute the operation. Note that usual commit/rollback is + * issued even if external session is present. We assume we are the last element of the processing in the session. + */ public ModifyObjectResult modifyObjectAttempt( Class type, String oid, Collection> originalModifications, ModificationPrecondition precondition, RepoModifyOptions originalModifyOptions, int attempt, OperationResult result, SqlRepositoryServiceImpl sqlRepositoryService, - boolean noFetchExtensionValueInsertionForbidden) + boolean noFetchExtensionValueInsertionForbidden, Session externalSession) throws ObjectNotFoundException, SchemaException, ObjectAlreadyExistsException, SerializationRelatedException, PreconditionViolationException { @@ -363,10 +370,12 @@ public ModifyObjectResult modifyObjectAttempt( LOGGER.trace("Modifications:\n{}", DebugUtil.debugDumpLazily(modifications)); LOGGER.trace("noFetchExtensionValueInsertionForbidden: {}", noFetchExtensionValueInsertionForbidden); - Session session = null; + Session session = externalSession; OrgClosureManager.Context closureContext = null; try { - session = baseHelper.beginTransaction(); + if (session == null) { + session = baseHelper.beginTransaction(); + } closureContext = closureManager.onBeginTransactionModify(session, type, oid, modifications); @@ -552,6 +561,65 @@ private void cleanupClosureAndSessionAndResult(final OrgClosureManager.Context c baseHelper.cleanupSessionAndResult(session, result); } + public ModifyObjectResult modifyObjectDynamicallyAttempt(Class type, String oid, + Collection> getOptions, + RepositoryService.ModificationsSupplier modificationsSupplier, + RepoModifyOptions modifyOptions, int attempt, OperationResult result, + SqlRepositoryServiceImpl sqlRepositoryService, boolean noFetchExtensionValueInsertionForbidden) + throws ObjectNotFoundException, SchemaException, ObjectAlreadyExistsException { + + LOGGER_PERFORMANCE.debug("> modify object dynamically {}, oid={}", type.getSimpleName(), oid); + + ExplicitAccessLock lock = getConfiguration().isUsingH2() ? ExplicitAccessLock.acquireFor(oid) : null; + + try { + + Session session = baseHelper.beginTransaction(); + + PrismObject objectBefore; + Collection> modifications; + try { + objectBefore = objectRetriever.getObjectInternal(session, type, oid, getOptions, false); + LOGGER.trace("Object retrieved:\n{}", objectBefore.debugDumpLazily(1)); + + // Intentionally within this try-catch block because this call must be covered by proper exception handling. + modifications = modificationsSupplier.get(objectBefore.asObjectable()); + LOGGER.trace("Modifications computed:\n{}", DebugUtil.debugDumpLazily(modifications, 1)); + } catch (ObjectNotFoundException ex) { + GetOperationOptions rootOptions = SelectorOptions.findRootOptions(getOptions); + baseHelper.rollbackTransaction(session, ex, result, !GetOperationOptions.isAllowNotFound(rootOptions)); + throw ex; + } catch (SchemaException ex) { + baseHelper.rollbackTransaction(session, ex, "Schema error while getting object with oid: " + + oid + ". Reason: " + ex.getMessage(), result, true); + throw ex; + } catch (DtoTranslationException | RuntimeException ex) { + baseHelper.handleGeneralException(ex, session, result); + throw new AssertionError("shouldn't be here"); + } + + if (modifications.isEmpty() && !RepoModifyOptions.isForceReindex(modifyOptions)) { + LOGGER.debug("Modification list is empty, nothing was modified."); + session.getTransaction().commit(); + result.recordStatus(OperationResultStatus.SUCCESS, "Computed modification list is empty"); + return new ModifyObjectResult<>(objectBefore, objectBefore, modifications); + } + + try { + // TODO: eliminate redundant getObjectInternal call in modifyObjectAttempt + return modifyObjectAttempt(type, oid, modifications, null, modifyOptions, attempt, result, sqlRepositoryService, + noFetchExtensionValueInsertionForbidden, session); + } catch (PreconditionViolationException e) { + throw new SystemException("Unexpected PreconditionViolationException: " + e.getMessage(), e); + } + + } finally { + if (lock != null) { + lock.release(); + } + } + } + /** * Handles serialization-related cases and no-fetch extension value insertion collisions. */ diff --git a/repo/repo-sqlbase/src/main/java/com/evolveum/midpoint/repo/sqlbase/perfmon/SqlPerformanceMonitorImpl.java b/repo/repo-sqlbase/src/main/java/com/evolveum/midpoint/repo/sqlbase/perfmon/SqlPerformanceMonitorImpl.java index 3af9d5d4e8f..e3889a1b007 100644 --- a/repo/repo-sqlbase/src/main/java/com/evolveum/midpoint/repo/sqlbase/perfmon/SqlPerformanceMonitorImpl.java +++ b/repo/repo-sqlbase/src/main/java/com/evolveum/midpoint/repo/sqlbase/perfmon/SqlPerformanceMonitorImpl.java @@ -131,13 +131,15 @@ public long registerOperationStart(String kind, Class objectType) { } } - public void registerOperationFinish(long opHandle, int attempt) { + public OperationRecord registerOperationFinish(long opHandle, int attempt) { if (level > LEVEL_NONE) { OperationRecord operation = outstandingOperations.get(opHandle); if (isOperationHandleOk(operation, opHandle)) { registerOperationFinishInternal(operation, attempt); + return operation; } } + return null; } private boolean isOperationHandleOk(OperationRecord operation, long opHandle) { diff --git a/repo/task-quartz-impl/src/main/java/com/evolveum/midpoint/task/quartzimpl/work/WorkStateManager.java b/repo/task-quartz-impl/src/main/java/com/evolveum/midpoint/task/quartzimpl/work/WorkStateManager.java index 29cd3969023..09bfc248351 100644 --- a/repo/task-quartz-impl/src/main/java/com/evolveum/midpoint/task/quartzimpl/work/WorkStateManager.java +++ b/repo/task-quartz-impl/src/main/java/com/evolveum/midpoint/task/quartzimpl/work/WorkStateManager.java @@ -15,10 +15,7 @@ import com.evolveum.midpoint.prism.query.ObjectFilter; import com.evolveum.midpoint.prism.query.ObjectQuery; import com.evolveum.midpoint.prism.util.CloneUtil; -import com.evolveum.midpoint.repo.api.ModificationPrecondition; -import com.evolveum.midpoint.repo.api.PreconditionViolationException; -import com.evolveum.midpoint.repo.api.RepositoryService; -import com.evolveum.midpoint.repo.api.VersionPrecondition; +import com.evolveum.midpoint.repo.api.*; import com.evolveum.midpoint.schema.result.OperationResult; import com.evolveum.midpoint.schema.util.ObjectQueryUtil; import com.evolveum.midpoint.schema.util.task.TaskWorkStateUtil; @@ -36,12 +33,10 @@ import com.evolveum.midpoint.task.quartzimpl.work.segmentation.WorkSegmentationStrategyFactory; import com.evolveum.midpoint.task.quartzimpl.work.segmentation.content.WorkBucketContentHandler; import com.evolveum.midpoint.task.quartzimpl.work.segmentation.content.WorkBucketContentHandlerRegistry; -import com.evolveum.midpoint.util.backoff.BackoffComputer; -import com.evolveum.midpoint.util.backoff.ExponentialBackoffComputer; +import com.evolveum.midpoint.util.Holder; import com.evolveum.midpoint.util.exception.ObjectAlreadyExistsException; import com.evolveum.midpoint.util.exception.ObjectNotFoundException; import com.evolveum.midpoint.util.exception.SchemaException; -import com.evolveum.midpoint.util.exception.SystemException; import com.evolveum.midpoint.util.logging.Trace; import com.evolveum.midpoint.util.logging.TraceManager; import com.evolveum.midpoint.xml.ns._public.common.common_3.*; @@ -52,6 +47,7 @@ import org.springframework.stereotype.Component; import java.util.*; +import java.util.Objects; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -62,10 +58,7 @@ /** * Responsible for managing task work state. - * - * @author mederly */ - @Component public class WorkStateManager { @@ -78,9 +71,6 @@ public class WorkStateManager { private static final String GET_WORK_BUCKET_NO_MORE_BUCKETS_DEFINITE = "getWorkBucket.noMoreBucketsDefinite"; private static final String GET_WORK_BUCKET_NO_MORE_BUCKETS_NOT_SCAVENGER = "getWorkBucket.noMoreBucketsNotScavenger"; private static final String GET_WORK_BUCKET_NO_MORE_BUCKETS_WAIT_TIME_ELAPSED = "getWorkBucket.NoMoreBucketsWaitTimeElapsed"; -// private static final String GET_WORK_BUCKET_RECLAIMED_NONE = "getWorkBucket.reclaimedNone"; -// private static final String GET_WORK_BUCKET_RECLAIMED_SOME = "getWorkBucket.reclaimedSome"; -// private static final String GET_WORK_BUCKET_RECLAIM_ABORTED = "getWorkBucket.reclaimAborted"; private static final String COMPLETE_WORK_BUCKET = "completeWorkBucket"; private static final String RELEASE_WORK_BUCKET = "releaseWorkBucket"; @@ -98,7 +88,7 @@ public class WorkStateManager { private class Context { private final long start = System.currentTimeMillis(); private Task workerTask; - private Task coordinatorTask; // null for standalone worker tasks + private Task coordinatorTask; // null for standalone worker tasks private final Supplier canRunSupplier; private final WorkBucketStatisticsCollector collector; private final boolean isGetOperation; @@ -141,11 +131,6 @@ void register(String situation) { } } - void registerConflictOccurred(long wastedTime) { - conflictCount++; - conflictWastedTime += wastedTime; - } - void registerWaitTime(long waitTime) { assert isGetOperation; bucketWaitCount++; @@ -156,6 +141,15 @@ void registerReclaim(int count) { bucketsReclaimed += count; } + void addToConflictCounts(ModifyObjectResult modifyObjectResult) { + conflictCount += modifyObjectResult.getRetries(); + conflictWastedTime += modifyObjectResult.getWastedTime(); + } + + void setConflictCounts(ModifyObjectResult modifyObjectResult) { + conflictCount = modifyObjectResult.getRetries(); + conflictWastedTime = modifyObjectResult.getWastedTime(); + } } public boolean canRun(Supplier canRunSupplier) { @@ -177,7 +171,7 @@ public WorkBucketType getWorkBucket(@NotNull String workerTaskOid, long freeBuck * * WE ASSUME THIS METHOD IS CALLED FROM THE WORKER TASK; SO IT IS NOT NECESSARY TO SYNCHRONIZE ACCESS TO THIS TASK WORK STATE. * - * @pre task is persistent and has work state management configured + * PRECONDITION: task is persistent and has work state management configured */ public WorkBucketType getWorkBucket(@NotNull String workerTaskOid, long freeBucketWaitTime, Supplier canRun, boolean executeInitialWait, @Nullable WorkBucketStatisticsCollector collector, @@ -223,128 +217,150 @@ private WorkBucketType findSelfAllocatedBucket(Context ctx) { private WorkBucketType getWorkBucketMultiNode(Context ctx, long freeBucketWaitTime, OperationResult result) throws SchemaException, ObjectAlreadyExistsException, ObjectNotFoundException, InterruptedException { TaskWorkManagementType workManagement = ctx.coordinatorTask.getWorkManagement(); - WorkSegmentationStrategy workStateStrategy = strategyFactory.createStrategy(workManagement); - setOrUpdateEstimatedNumberOfBuckets(ctx.coordinatorTask, workStateStrategy, result); + WorkSegmentationStrategy segmentationStrategy = strategyFactory.createStrategy(workManagement); + setOrUpdateEstimatedNumberOfBuckets(ctx.coordinatorTask, segmentationStrategy, result); -waitForAvailableBucket: // this cycle exits when something is found OR when a definite 'no more buckets' answer is received for (;;) { - BackoffComputer backoffComputer = createBackoffComputer(workManagement); - int retry = 0; -waitForConflictLessUpdate: // this cycle exits when coordinator task update succeeds - for (;;) { - long attemptStart = System.currentTimeMillis(); - TaskWorkStateType coordinatorWorkState = getWorkStateOrNew(ctx.coordinatorTask); - GetBucketResult response = workStateStrategy.getBucket(coordinatorWorkState); - LOGGER.trace("getWorkBucketMultiNode: workStateStrategy returned {} for worker task {}, coordinator {}", response, ctx.workerTask, ctx.coordinatorTask); - try { - if (response instanceof NewBuckets) { - NewBuckets newBucketsResponse = (NewBuckets) response; - int selected = newBucketsResponse.selected; - List newCoordinatorBuckets = new ArrayList<>(coordinatorWorkState.getBucket()); - for (int i = 0; i < newBucketsResponse.newBuckets.size(); i++) { - if (i == selected) { - newCoordinatorBuckets.add(newBucketsResponse.newBuckets.get(i).clone() - .state(WorkBucketStateType.DELEGATED) - .workerRef(ctx.workerTask.getOid(), TaskType.COMPLEX_TYPE)); - } else { - newCoordinatorBuckets.add(newBucketsResponse.newBuckets.get(i).clone()); - } - } - repositoryService.modifyObject(TaskType.class, ctx.coordinatorTask.getOid(), - bucketsReplaceDeltas(newCoordinatorBuckets), - bucketsReplacePrecondition(coordinatorWorkState.getBucket()), null, result); - repositoryService.modifyObject(TaskType.class, ctx.workerTask.getOid(), - bucketsAddDeltas(newBucketsResponse.newBuckets.subList(selected, selected+1)), null, result); - CONTENTION_LOGGER.trace("New bucket(s) acquired after {} ms (conflicts: {}) in {}", System.currentTimeMillis() - ctx.start, ctx.conflictCount, ctx.workerTask); - ctx.register(GET_WORK_BUCKET_CREATED_NEW); - return newBucketsResponse.newBuckets.get(selected); - } else if (response instanceof FoundExisting) { - FoundExisting existingResponse = (FoundExisting) response; - repositoryService.modifyObject(TaskType.class, ctx.coordinatorTask.getOid(), - bucketStateChangeDeltas(existingResponse.bucket, WorkBucketStateType.DELEGATED, ctx.workerTask.getOid()), - bucketUnchangedPrecondition(existingResponse.bucket), null, result); - WorkBucketType foundBucket = existingResponse.bucket.clone(); - repositoryService.modifyObject(TaskType.class, ctx.workerTask.getOid(), - bucketsAddDeltas(singletonList(foundBucket)), null, result); - CONTENTION_LOGGER.trace("Existing bucket acquired after {} ms (conflicts: {}) in {}", System.currentTimeMillis() - ctx.start, ctx.conflictCount, ctx.workerTask); - ctx.register(GET_WORK_BUCKET_DELEGATED); - return foundBucket; - } else if (response instanceof NothingFound) { - if (!ctx.workerTask.isScavenger()) { - CONTENTION_LOGGER.trace("'No bucket' found (and not a scavenger) after {} ms (conflicts: {}) in {}", System.currentTimeMillis() - ctx.start, ctx.conflictCount, ctx.workerTask); - ctx.register(GET_WORK_BUCKET_NO_MORE_BUCKETS_NOT_SCAVENGER); - return null; - } else if (((NothingFound) response).definite || freeBucketWaitTime == 0L) { - markWorkComplete(ctx.coordinatorTask, result); // TODO also if response is not definite? - CONTENTION_LOGGER.trace("'No bucket' found after {} ms (conflicts: {}) in {}", System.currentTimeMillis() - ctx.start, ctx.conflictCount, ctx.workerTask); - ctx.register(GET_WORK_BUCKET_NO_MORE_BUCKETS_DEFINITE); - return null; - } else { - long waitDeadline = freeBucketWaitTime >= 0 ? ctx.start + freeBucketWaitTime : Long.MAX_VALUE; - long toWait = waitDeadline - System.currentTimeMillis(); - if (toWait <= 0) { - markWorkComplete(ctx.coordinatorTask, result); // TODO also if response is not definite? - CONTENTION_LOGGER.trace("'No bucket' found (wait time elapsed) after {} ms (conflicts: {}) in {}", System.currentTimeMillis() - ctx.start, ctx.conflictCount, ctx.workerTask); - ctx.register(GET_WORK_BUCKET_NO_MORE_BUCKETS_WAIT_TIME_ELAPSED); - return null; - } - //System.out.println("*** No free work bucket -- waiting ***"); - long waitStart = System.currentTimeMillis(); - long sleepFor = Math.min(toWait, getFreeBucketWaitInterval(workManagement)); - CONTENTION_LOGGER.trace("Entering waiting for free bucket (waiting for {}) - after {} ms (conflicts: {}) in {}", - sleepFor, System.currentTimeMillis() - ctx.start, ctx.conflictCount, ctx.workerTask); - dynamicSleep(sleepFor, ctx); - ctx.registerWaitTime(System.currentTimeMillis() - waitStart); - ctx.reloadCoordinatorTask(result); - ctx.reloadWorkerTask(result); - reclaimWronglyAllocatedBuckets(ctx, result); - // we continue even if we could not find any wrongly allocated bucket -- maybe someone else found - // them before us - continue waitForAvailableBucket; - } + + Holder lastGetBucketResultHolder = new Holder<>(); + ModifyObjectResult modifyResult = repositoryService.modifyObjectDynamically(TaskType.class, + ctx.coordinatorTask.getOid(), null, + task -> computeWorkBucketModifications(task, ctx, segmentationStrategy, lastGetBucketResultHolder), + null, result); + + // We ignore conflicts encountered in previous iterations (scavenger hitting 'no more buckets' situation). + ctx.setConflictCounts(modifyResult); + + GetBucketResult getBucketResult = + Objects.requireNonNull(lastGetBucketResultHolder.getValue(), "no last getBucket result"); + + if (getBucketResult instanceof NewBuckets) { + return recordNewBucketInWorkerTask(ctx, (NewBuckets) getBucketResult, result); + } if (getBucketResult instanceof FoundExisting) { + return recordExistingBucketInWorkerTask(ctx, (FoundExisting) getBucketResult, result); + } else if (getBucketResult instanceof NothingFound) { + if (!ctx.workerTask.isScavenger()) { + processNothingFoundForNonScavenger(ctx); + return null; + } else if (((NothingFound) getBucketResult).definite || freeBucketWaitTime == 0L) { + processNothingFoundDefinite(ctx, result); + return null; + } else { + long toWait = getRemainingTimeToWait(ctx, freeBucketWaitTime); + if (toWait <= 0) { + processNothingFoundButWaitTimeElapsed(ctx, result); + return null; } else { - throw new AssertionError(response); - } - } catch (PreconditionViolationException e) { - retry++; - long delay; - try { - delay = backoffComputer.computeDelay(retry); - } catch (BackoffComputer.NoMoreRetriesException e1) { - String message = - "Couldn't allocate work bucket because of repeated database conflicts (retry limit reached); coordinator task = " - + ctx.coordinatorTask; - CONTENTION_LOGGER.error(message, e1); - throw new SystemException(message, e1); + wait(toWait, ctx, workManagement); + reclaimWronglyAllocatedBuckets(ctx, result); + // We continue even if we could not find any wrongly allocated + // bucket -- maybe someone else found them before us, so we could use them. } - String message = "getWorkBucketMultiNode: conflict; continuing as retry #{}; waiting {} ms in {}, worker {}"; - Object[] objects = { retry, delay, ctx.coordinatorTask, ctx.workerTask, e }; - CONTENTION_LOGGER.debug(message, objects); - LOGGER.debug(message, objects); - dynamicSleep(delay, ctx); - ctx.reloadCoordinatorTask(result); - ctx.reloadWorkerTask(result); - ctx.registerConflictOccurred(System.currentTimeMillis() - attemptStart); - //noinspection UnnecessaryContinue,UnnecessaryLabelOnContinueStatement - continue waitForConflictLessUpdate; } + } else { + throw new AssertionError(getBucketResult); } } } - private BackoffComputer createBackoffComputer(TaskWorkManagementType workManagement) { - WorkAllocationConfigurationType ac = workManagement != null && workManagement.getBuckets() != null ? - workManagement.getBuckets().getAllocation() : null; - TaskManagerConfiguration c = configuration; - int workAllocationMaxRetries = ac != null && ac.getWorkAllocationMaxRetries() != null ? - ac.getWorkAllocationMaxRetries() : c.getWorkAllocationMaxRetries(); - long workAllocationRetryIntervalBase = ac != null && ac.getWorkAllocationRetryIntervalBase() != null ? - ac.getWorkAllocationRetryIntervalBase() : c.getWorkAllocationRetryIntervalBase(); - int workAllocationRetryExponentialThreshold = ac != null && ac.getWorkAllocationRetryExponentialThreshold() != null ? - ac.getWorkAllocationRetryExponentialThreshold() : c.getWorkAllocationRetryExponentialThreshold(); - Long workAllocationRetryIntervalLimit = ac != null && ac.getWorkAllocationRetryIntervalLimit() != null ? - ac.getWorkAllocationRetryIntervalLimit() : c.getWorkAllocationRetryIntervalLimit(); - return new ExponentialBackoffComputer(workAllocationMaxRetries, workAllocationRetryIntervalBase, workAllocationRetryExponentialThreshold, workAllocationRetryIntervalLimit); + private void wait(long toWait, Context ctx, TaskWorkManagementType workManagement) throws InterruptedException { + long waitStart = System.currentTimeMillis(); + long sleepFor = Math.min(toWait, getFreeBucketWaitInterval(workManagement)); + CONTENTION_LOGGER.trace("Entering waiting for free bucket (waiting for {}) - after {} ms (conflicts: {}) in {}", + sleepFor, System.currentTimeMillis() - ctx.start, ctx.conflictCount, ctx.workerTask); + dynamicSleep(sleepFor, ctx); + ctx.registerWaitTime(System.currentTimeMillis() - waitStart); + } + + private void processNothingFoundButWaitTimeElapsed(Context ctx, OperationResult result) + throws ObjectAlreadyExistsException, ObjectNotFoundException, SchemaException { + markWorkComplete(ctx.coordinatorTask, result); // TODO also if response is not definite? + CONTENTION_LOGGER.trace("'No bucket' found (wait time elapsed) after {} ms (conflicts: {}) in {}", + System.currentTimeMillis() - ctx.start, ctx.conflictCount, ctx.workerTask); + ctx.register(GET_WORK_BUCKET_NO_MORE_BUCKETS_WAIT_TIME_ELAPSED); + } + + private long getRemainingTimeToWait(Context ctx, long freeBucketWaitTime) { + long waitDeadline = freeBucketWaitTime >= 0 ? ctx.start + freeBucketWaitTime : Long.MAX_VALUE; + return waitDeadline - System.currentTimeMillis(); + } + + private void processNothingFoundDefinite(Context ctx, OperationResult result) + throws ObjectAlreadyExistsException, ObjectNotFoundException, SchemaException { + markWorkComplete(ctx.coordinatorTask, result); + CONTENTION_LOGGER.trace("'No bucket' found after {} ms (conflicts: {}) in {}", System.currentTimeMillis() - ctx.start, + ctx.conflictCount, ctx.workerTask); + ctx.register(GET_WORK_BUCKET_NO_MORE_BUCKETS_DEFINITE); + } + + private WorkBucketType recordNewBucketInWorkerTask(Context ctx, NewBuckets newBucketsResult, OperationResult result) + throws ObjectNotFoundException, SchemaException, ObjectAlreadyExistsException { + WorkBucketType selectedBucket = newBucketsResult.newBuckets.get(newBucketsResult.selected).clone(); + repositoryService.modifyObject(TaskType.class, ctx.workerTask.getOid(), + bucketsAddDeltas(singletonList(selectedBucket)), null, result); + CONTENTION_LOGGER.info("New bucket(s) acquired after {} ms (retries: {}) in {}", + System.currentTimeMillis() - ctx.start, ctx.conflictCount, ctx.workerTask); + ctx.register(GET_WORK_BUCKET_CREATED_NEW); + return selectedBucket; + } + + private WorkBucketType recordExistingBucketInWorkerTask(Context ctx, FoundExisting foundExistingResult, + OperationResult result) throws ObjectNotFoundException, SchemaException, ObjectAlreadyExistsException { + WorkBucketType foundBucket = foundExistingResult.bucket.clone(); + repositoryService.modifyObject(TaskType.class, ctx.workerTask.getOid(), + bucketsAddDeltas(singletonList(foundBucket)), null, result); + CONTENTION_LOGGER.trace("Existing bucket acquired after {} ms (conflicts: {}) in {}", + System.currentTimeMillis() - ctx.start, ctx.conflictCount, ctx.workerTask); + ctx.register(GET_WORK_BUCKET_DELEGATED); + return foundBucket; + } + + private void processNothingFoundForNonScavenger(Context ctx) { + CONTENTION_LOGGER.trace("'No bucket' found (and not a scavenger) after {} ms (conflicts: {}) in {}", + System.currentTimeMillis() - ctx.start, ctx.conflictCount, ctx.workerTask); + ctx.register(GET_WORK_BUCKET_NO_MORE_BUCKETS_NOT_SCAVENGER); + } + + /** + * Computes modifications to the current work state aimed to obtain new buckets, if possible. + */ + private Collection> computeWorkBucketModifications(TaskType coordinatorTask, Context ctx, + WorkSegmentationStrategy segmentationStrategy, Holder lastGetBucketResultHolder) throws SchemaException { + + TaskWorkStateType workState = getWorkStateOrNew(coordinatorTask); + GetBucketResult getBucketResult = segmentationStrategy.getBucket(workState); + lastGetBucketResultHolder.setValue(getBucketResult); + LOGGER.trace("computeWorkBucketModifications: segmentationStrategy returned {} for worker task {}, coordinator {}", + getBucketResult, ctx.workerTask, ctx.coordinatorTask); + if (getBucketResult instanceof NewBuckets) { + return computeModificationsForNewBuckets(ctx, workState, (NewBuckets) getBucketResult); + } else if (getBucketResult instanceof FoundExisting) { + return computeModificationsForExistingBucket(ctx, (FoundExisting) getBucketResult); + } else if (getBucketResult instanceof NothingFound) { + return emptyList(); // Nothing to do for now. + } else { + throw new AssertionError(getBucketResult); + } + } + + private Collection> computeModificationsForNewBuckets(Context ctx, TaskWorkStateType workState, + NewBuckets response) throws SchemaException { + List newCoordinatorBuckets = new ArrayList<>(workState.getBucket()); + for (int i = 0; i < response.newBuckets.size(); i++) { + if (i == response.selected) { + newCoordinatorBuckets.add(response.newBuckets.get(i).clone() + .state(WorkBucketStateType.DELEGATED) + .workerRef(ctx.workerTask.getOid(), TaskType.COMPLEX_TYPE)); + } else { + newCoordinatorBuckets.add(response.newBuckets.get(i).clone()); + } + } + return bucketsReplaceDeltas(newCoordinatorBuckets); + } + + private Collection> computeModificationsForExistingBucket(Context ctx, FoundExisting response) + throws SchemaException { + return bucketStateChangeDeltas(response.bucket, WorkBucketStateType.DELEGATED, ctx.workerTask.getOid()); } private long getFreeBucketWaitInterval(TaskWorkManagementType workManagement) { @@ -395,7 +411,9 @@ private void dynamicSleep(long delay, Supplier canRunSupplier) throws I * Returns true if there was something to reclaim. */ private void reclaimWronglyAllocatedBuckets(Context ctx, OperationResult result) - throws SchemaException, PreconditionViolationException, ObjectNotFoundException, ObjectAlreadyExistsException { + throws SchemaException, ObjectNotFoundException, ObjectAlreadyExistsException { + ctx.reloadCoordinatorTask(result); + ctx.reloadWorkerTask(result); if (ctx.coordinatorTask.getWorkState() == null) { return; } @@ -407,7 +425,7 @@ private void reclaimWronglyAllocatedBuckets(Context ctx, OperationResult result) if (bucket.getState() == WorkBucketStateType.DELEGATED) { String workerOid = bucket.getWorkerRef() != null ? bucket.getWorkerRef().getOid() : null; if (isDead(workerOid, deadWorkers, liveWorkers, result)) { - LOGGER.info("Reclaiming wrongly allocated work bucket {} from worker task {}", bucket, workerOid); + LOGGER.info("Will reclaim wrongly allocated work bucket {} from worker task {}", bucket, workerOid); bucket.setState(WorkBucketStateType.READY); bucket.setWorkerRef(null); // TODO modify also the worker if it exists (maybe) @@ -418,17 +436,19 @@ private void reclaimWronglyAllocatedBuckets(Context ctx, OperationResult result) LOGGER.trace("Reclaiming wrongly allocated buckets found {} buckets to reclaim in {}", reclaiming, ctx.coordinatorTask); if (reclaiming > 0) { CONTENTION_LOGGER.debug("Reclaiming wrongly allocated buckets found {} buckets to reclaim in {}", reclaiming, ctx.coordinatorTask); - // As for the precondition we use the whole task state (reflected by version). The reason is that if the work - // state originally contains (wrongly) DELEGATED bucket plus e.g. last COMPLETE one, and this bucket is reclaimed - // by two subtasks at once, each of them see the same state afterwards: DELEGATED + COMPLETE. The solution would - // be either to enhance the delegated bucket with some information (like to whom it is delegated), or this one. - // In the future we might go the former way; as it would make reclaiming much efficient - not requiring to read - // the whole task tree. - repositoryService.modifyObject(TaskType.class, ctx.coordinatorTask.getOid(), - bucketsReplaceDeltas(newState.getBucket()), - new VersionPrecondition<>(ctx.coordinatorTask.getVersion()), null, result); + try { + // As for the precondition we use the whole task state (reflected by version). The reason is that if the work + // state originally contains (wrongly) DELEGATED bucket plus e.g. last COMPLETE one, and this bucket is reclaimed + // by two subtasks at once, each of them see the same state afterwards: READY + COMPLETE. + repositoryService.modifyObject(TaskType.class, ctx.coordinatorTask.getOid(), + bucketsReplaceDeltas(newState.getBucket()), + new VersionPrecondition<>(ctx.coordinatorTask.getVersion()), null, result); + ctx.registerReclaim(reclaiming); + } catch (PreconditionViolationException e) { + LOGGER.info("Concurrent modification of work state in {}. {} wrongly allocated bucket(s) will " + + "not be reclaimed in this run.", ctx.coordinatorTask, reclaiming); + } ctx.reloadCoordinatorTask(result); - ctx.registerReclaim(reclaiming); } } @@ -524,7 +544,7 @@ public void completeWorkBucket(String workerTaskOid, int sequentialNumber, WorkBucketStatisticsCollector workBucketStatisticsCollector, OperationResult result) throws ObjectAlreadyExistsException, ObjectNotFoundException, SchemaException { Context ctx = createContext(workerTaskOid, null, workBucketStatisticsCollector, false, result); - LOGGER.trace("Completing work bucket {} in {} (coordinator {})", workerTaskOid, ctx.workerTask, ctx.coordinatorTask); + LOGGER.trace("Completing work bucket #{} in {} (coordinator {})", sequentialNumber, ctx.workerTask, ctx.coordinatorTask); if (ctx.isStandalone()) { completeWorkBucketStandalone(ctx, sequentialNumber, result); } else { @@ -546,13 +566,14 @@ private void completeWorkBucketMultiNode(Context ctx, int sequentialNumber, Oper checkWorkerRefOnDelegatedBucket(ctx, bucket); Collection> modifications = bucketStateChangeDeltas(bucket, WorkBucketStateType.COMPLETE); try { - repositoryService.modifyObject(TaskType.class, ctx.coordinatorTask.getOid(), - modifications, bucketUnchangedPrecondition(bucket), null, result); + ModifyObjectResult modifyObjectResult = repositoryService.modifyObject(TaskType.class, + ctx.coordinatorTask.getOid(), modifications, bucketUnchangedPrecondition(bucket), null, result); + ctx.addToConflictCounts(modifyObjectResult); } catch (PreconditionViolationException e) { throw new IllegalStateException("Unexpected concurrent modification of work bucket " + bucket + " in " + ctx.coordinatorTask, e); } ((TaskQuartzImpl) ctx.coordinatorTask).applyModificationsTransient(modifications); - compressCompletedBuckets(ctx.coordinatorTask, result); + compressCompletedBuckets(ctx.coordinatorTask, ctx, result); deleteBucketFromWorker(ctx, sequentialNumber, result); ctx.register(COMPLETE_WORK_BUCKET); } @@ -585,7 +606,7 @@ private void completeWorkBucketStandalone(Context ctx, int sequentialNumber, Ope repositoryService.modifyObject(TaskType.class, ctx.workerTask.getOid(), modifications, null, result); ((TaskQuartzImpl) ctx.workerTask).applyModificationsTransient(modifications); ((TaskQuartzImpl) ctx.workerTask).applyDeltasImmediate(modifications, result); - compressCompletedBuckets(ctx.workerTask, result); + compressCompletedBuckets(ctx.workerTask, ctx, result); ctx.register(COMPLETE_WORK_BUCKET); } @@ -637,7 +658,7 @@ private void checkWorkerRefOnDelegatedBucket(Context ctx, WorkBucketType bucket) } } - private void compressCompletedBuckets(Task task, OperationResult result) + private void compressCompletedBuckets(Task task, Context ctx, OperationResult result) throws SchemaException, ObjectAlreadyExistsException, ObjectNotFoundException { List buckets = new ArrayList<>(getWorkState(task).getBucket()); TaskWorkStateUtil.sortBucketsBySequentialNumber(buckets); @@ -657,7 +678,9 @@ private void compressCompletedBuckets(Task task, OperationResult result) LOGGER.trace("Compression of completed buckets: deleting {} buckets before last completed one in {}", deleteItemDeltas.size(), task); // these buckets should not be touched by anyone (as they are already completed); so we can execute without preconditions if (!deleteItemDeltas.isEmpty()) { - repositoryService.modifyObject(TaskType.class, task.getOid(), deleteItemDeltas, null, result); + ModifyObjectResult modifyObjectResult = + repositoryService.modifyObject(TaskType.class, task.getOid(), deleteItemDeltas, null, result); + ctx.addToConflictCounts(modifyObjectResult); } } @@ -673,11 +696,6 @@ private void compressCompletedBuckets(Task task, OperationResult result) .addRealValues(CloneUtil.cloneCollectionMembers(buckets)).asItemDeltas(); } - private ModificationPrecondition bucketsReplacePrecondition(List originalBuckets) { - // performance is not optimal but OK for precondition checking - return taskObject -> cloneNoId(originalBuckets).equals(cloneNoId(getWorkStateOrNew(taskObject.asObjectable()).getBucket())); - } - @SuppressWarnings("SameParameterValue") private Collection> bucketStateChangeDeltas(WorkBucketType bucket, WorkBucketStateType newState) throws SchemaException { return prismContext.deltaFor(TaskType.class) @@ -714,11 +732,6 @@ private WorkBucketType cloneNoId(WorkBucketType bucket) { return bucket.clone().id(null); } - private List cloneNoId(List buckets) { - return buckets.stream().map(this::cloneNoId) - .collect(Collectors.toCollection(() -> new ArrayList<>(buckets.size()))); - } - @NotNull private TaskWorkStateType getWorkStateOrNew(Task task) { if (task.getWorkState() != null) { diff --git a/repo/task-quartz-impl/src/main/java/com/evolveum/midpoint/task/quartzimpl/work/segmentation/WorkSegmentationStrategy.java b/repo/task-quartz-impl/src/main/java/com/evolveum/midpoint/task/quartzimpl/work/segmentation/WorkSegmentationStrategy.java index f1fb9ef05da..5fe4ae38581 100644 --- a/repo/task-quartz-impl/src/main/java/com/evolveum/midpoint/task/quartzimpl/work/segmentation/WorkSegmentationStrategy.java +++ b/repo/task-quartz-impl/src/main/java/com/evolveum/midpoint/task/quartzimpl/work/segmentation/WorkSegmentationStrategy.java @@ -49,7 +49,7 @@ public NothingFound(boolean definite) { */ public static class FoundExisting extends GetBucketResult { /** - * Free bucket that is provided as a result of the operation; or null if no bucket could be obtained. + * Free bucket that is provided as a result of the operation. */ @NotNull public final WorkBucketType bucket; diff --git a/testing/story/src/test/resources/buckets/task-recompute-16-04.xml b/testing/story/src/test/resources/buckets/task-recompute-16-04.xml index 490e2d569fa..de2321edc7d 100644 --- a/testing/story/src/test/resources/buckets/task-recompute-16-04.xml +++ b/testing/story/src/test/resources/buckets/task-recompute-16-04.xml @@ -37,6 +37,9 @@ 1 + + 0 + http://midpoint.evolveum.com/xml/ns/public/model/synchronization/task/recompute/handler-3 diff --git a/testing/story/src/test/resources/buckets/task-recompute-256-30.xml b/testing/story/src/test/resources/buckets/task-recompute-256-30.xml index d21643592e4..b643d5ca891 100644 --- a/testing/story/src/test/resources/buckets/task-recompute-256-30.xml +++ b/testing/story/src/test/resources/buckets/task-recompute-256-30.xml @@ -37,6 +37,9 @@ 2 + + 0 + http://midpoint.evolveum.com/xml/ns/public/model/synchronization/task/recompute/handler-3