Skip to content

Commit

Permalink
sqale: Retry support for add, modify + dynamicly, delete, searchObjects
Browse files Browse the repository at this point in the history
  • Loading branch information
tonydamage committed Mar 4, 2024
1 parent 4ac7467 commit aa49a89
Showing 1 changed file with 92 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@
import com.evolveum.midpoint.util.backoff.BackoffComputer;
import com.evolveum.midpoint.util.backoff.ExponentialBackoffComputer;

import com.evolveum.midpoint.util.exception.*;
import com.evolveum.midpoint.util.logging.Trace;
import com.evolveum.midpoint.util.logging.TraceManager;

import com.google.common.base.Strings;
import com.google.common.base.Throwables;
import com.google.common.collect.ObjectArrays;
import com.querydsl.core.Tuple;
import com.querydsl.core.types.Path;
Expand Down Expand Up @@ -77,10 +79,6 @@
import com.evolveum.midpoint.schema.result.OperationResultStatus;
import com.evolveum.midpoint.schema.util.ObjectQueryUtil;
import com.evolveum.midpoint.util.QNameUtil;
import com.evolveum.midpoint.util.exception.ObjectAlreadyExistsException;
import com.evolveum.midpoint.util.exception.ObjectNotFoundException;
import com.evolveum.midpoint.util.exception.SchemaException;
import com.evolveum.midpoint.util.exception.SystemException;
import com.evolveum.midpoint.xml.ns._public.common.common_3.*;
import com.evolveum.prism.xml.ns._public.types_3.PolyStringType;

Expand Down Expand Up @@ -380,10 +378,14 @@ private <T extends ObjectType> String executeAddObject(
long opHandle = registerOperationStart(OP_ADD_OBJECT, object);

try {
String oid = new AddObjectContext<>(sqlRepoContext, object)
.execute();
String oid = executeRetriable(OP_ADD_OBJECT, null, opHandle,
() -> new AddObjectContext<>(sqlRepoContext, object).execute());

invokeConflictWatchers((w) -> w.afterAddObject(oid, object));
return oid;
} catch (ObjectNotFoundException | RepositoryException e) {
// Should not happen
throw new SystemException(e);
} finally {
registerOperationFinish(opHandle);
}
Expand All @@ -398,36 +400,44 @@ private <T extends ObjectType> String executeOverwriteObject(
UUID oidUuid = SqaleUtils.oidToUuidMandatory(oid);

long opHandle = registerOperationStart(OP_ADD_OBJECT_OVERWRITE, newObject);
try (JdbcSession jdbcSession = sqlRepoContext.newJdbcSession().startTransaction()) {
try {
//noinspection ConstantConditions
RootUpdateContext<T, QObject<MObject>, MObject> updateContext =
prepareUpdateContext(jdbcSession, newObject.getCompileTimeClass(), oidUuid);
PrismObject<T> prismObject = updateContext.getPrismObject();
// no precondition check for overwrite

invokeConflictWatchers(w -> w.beforeModifyObject(prismObject));
newObject.setUserData(RepositoryService.KEY_ORIGINAL_OBJECT, prismObject.clone());
ObjectDelta<T> delta = prismObject.diff(newObject, EquivalenceStrategy.LITERAL);
Collection<? extends ItemDelta<?, ?>> modifications = delta.getModifications();

logger.trace("overwriteAddObjectAttempt: originalOid={}, modifications={}",
oid, modifications);
Collection<? extends ItemDelta<?, ?>> executedModifications =
updateContext.execute(modifications, false);
replaceObject(updateContext, updateContext.getPrismObject());
if (!executedModifications.isEmpty()) {
invokeConflictWatchers((w) -> w.afterModifyObject(oid));

try {
executeRetriable(OP_ADD_OBJECT_OVERWRITE, oidUuid, opHandle, () -> {
try (JdbcSession jdbcSession = sqlRepoContext.newJdbcSession().startTransaction()) {
try {
//noinspection ConstantConditions
RootUpdateContext<T, QObject<MObject>, MObject> updateContext =
prepareUpdateContext(jdbcSession, newObject.getCompileTimeClass(), oidUuid);
PrismObject<T> prismObject = updateContext.getPrismObject();
// no precondition check for overwrite

invokeConflictWatchers(w -> w.beforeModifyObject(prismObject));
newObject.setUserData(RepositoryService.KEY_ORIGINAL_OBJECT, prismObject.clone());
ObjectDelta<T> delta = prismObject.diff(newObject, EquivalenceStrategy.LITERAL);
Collection<? extends ItemDelta<?, ?>> modifications = delta.getModifications();

logger.trace("overwriteAddObjectAttempt: originalOid={}, modifications={}",
oid, modifications);
Collection<? extends ItemDelta<?, ?>> executedModifications =
updateContext.execute(modifications, false);
replaceObject(updateContext, updateContext.getPrismObject());
if (!executedModifications.isEmpty()) {
invokeConflictWatchers((w) -> w.afterModifyObject(oid));
}
logger.trace("OBJECT after:\n{}", prismObject.debugDumpLazily());
} catch (ObjectNotFoundException e) {
// so it is just plain addObject after all
new AddObjectContext<>(sqlRepoContext, newObject)
.execute(jdbcSession);
invokeConflictWatchers((w) -> w.afterAddObject(oid, newObject));
}
jdbcSession.commit();
return null;
}
logger.trace("OBJECT after:\n{}", prismObject.debugDumpLazily());
} catch (ObjectNotFoundException e) {
// so it is just plain addObject after all
new AddObjectContext<>(sqlRepoContext, newObject)
.execute(jdbcSession);
invokeConflictWatchers((w) -> w.afterAddObject(oid, newObject));
}
jdbcSession.commit();
});
return oid;
} catch (ObjectNotFoundException e) {
throw new SystemException("Should not happen", e);
} catch (RuntimeException e) {
SqaleUtils.handlePostgresException(e);
throw e;
Expand Down Expand Up @@ -486,15 +496,30 @@ public <T extends ObjectType> ModifyObjectResult<T> modifyObject(
.addArbitraryObjectCollectionAsParam("modifications", modifications)
.build();

long opHandle = registerOperationStart(OP_MODIFY_OBJECT, type);

try {
return executeModifyObject(type, oidUuid, modifications, precondition, options, operationResult);
return executeRetriable(OP_MODIFY_OBJECT, oidUuid, opHandle,
() -> {
try {
return executeModifyObject(type, oidUuid, modifications, precondition, options, operationResult);
} catch (PreconditionViolationException e) {
throw new TunnelException(e);
}
});
} catch (TunnelException e) {
if (e.getCause() instanceof PreconditionViolationException pve) {
throw pve;
}
throw e;
} catch (RepositoryException | RuntimeException e) {
throw handledGeneralException(e, operationResult);
} catch (Throwable t) {
recordFatalError(operationResult, t);
throw t;
} finally {
operationResult.close();
registerOperationFinish(opHandle);
OperationLogger.logModify(type, oid, modifications, precondition, options, operationResult);
}
}
Expand All @@ -508,9 +533,6 @@ private <T extends ObjectType> ModifyObjectResult<T> executeModifyObject(
@Nullable RepoModifyOptions options,
@NotNull OperationResult parentResult)
throws SchemaException, ObjectNotFoundException, PreconditionViolationException, RepositoryException {

long opHandle = registerOperationStart(OP_MODIFY_OBJECT, type);

try (JdbcSession jdbcSession = sqlRepoContext.newJdbcSession().startTransaction()) {
RootUpdateContext<T, QObject<MObject>, MObject> updateContext =
prepareUpdateContext(jdbcSession, type, modifications, oidUuid, options);
Expand All @@ -519,8 +541,6 @@ private <T extends ObjectType> ModifyObjectResult<T> executeModifyObject(
updateContext, modifications, precondition, options, parentResult);
jdbcSession.commit();
return rv;
} finally {
registerOperationFinish(opHandle);
}
}

Expand All @@ -547,9 +567,10 @@ private <T extends ObjectType> ModifyObjectResult<T> executeModifyObject(
.build();

ModifyObjectResult<T> rv = null;
long opHandle = registerOperationStart(OP_MODIFY_OBJECT_DYNAMICALLY, type);
try {
rv = executeModifyObjectDynamically(
type, oidUuid, getOptions, modificationsSupplier, modifyOptions, operationResult);
rv = executeRetriable(OP_MODIFY_OBJECT_DYNAMICALLY, oidUuid, opHandle, () -> executeModifyObjectDynamically(
type, oidUuid, getOptions, modificationsSupplier, modifyOptions, operationResult));
return rv;
} catch (ObjectNotFoundException e) {
throw handleObjectNotFound(e, operationResult, getOptions);
Expand All @@ -561,6 +582,7 @@ private <T extends ObjectType> ModifyObjectResult<T> executeModifyObject(
} finally {
operationResult.close();
OperationLogger.logModifyDynamically(type, oid, rv, modifyOptions, operationResult);
registerOperationFinish(opHandle);
}
}

Expand All @@ -582,9 +604,6 @@ private <T extends ObjectType> ModifyObjectResult<T> executeModifyObjectDynamica
@Nullable RepoModifyOptions modifyOptions,
@NotNull OperationResult parentResult)
throws SchemaException, ObjectNotFoundException, RepositoryException {

long opHandle = registerOperationStart(OP_MODIFY_OBJECT_DYNAMICALLY, type);

try (JdbcSession jdbcSession = sqlRepoContext.newJdbcSession().startTransaction()) {
RootUpdateContext<T, QObject<MObject>, MObject> updateContext =
prepareUpdateContext(jdbcSession, type, oidUuid, getOptions, modifyOptions);
Expand All @@ -600,8 +619,6 @@ private <T extends ObjectType> ModifyObjectResult<T> executeModifyObjectDynamica
} catch (PreconditionViolationException e) {
// no precondition is checked in this scenario, this should not happen
throw new AssertionError(e);
} finally {
registerOperationFinish(opHandle);
}
}

Expand Down Expand Up @@ -830,31 +847,33 @@ public ModifyObjectResult<SimulationResultType> deleteSimulatedProcessedObjects(
.addParam(OperationResult.PARAM_TYPE, type.getName())
.addParam(OperationResult.PARAM_OID, oid)
.build();
long opHandle = registerOperationStart(OP_DELETE_OBJECT, type);

try {
return executeDeleteObject(type, oid, oidUuid);
return executeRetriable(OP_DELETE_OBJECT, oidUuid, opHandle, () -> executeDeleteObject(type, oid, oidUuid));
} catch (SchemaException | ObjectAlreadyExistsException | RepositoryException e) {
throw new SystemException("Should not happen", e);
} catch (RuntimeException e) {
throw handledGeneralException(e, operationResult);
} catch (Throwable t) {
recordFatalError(operationResult, t);
throw t;
} finally {
operationResult.close();
registerOperationFinish(opHandle);
}
}

@NotNull
private <T extends ObjectType> DeleteObjectResult executeDeleteObject(
Class<T> type, String oid, UUID oidUuid) throws ObjectNotFoundException {

long opHandle = registerOperationStart(OP_DELETE_OBJECT, type);
try (JdbcSession jdbcSession = sqlRepoContext.newJdbcSession().startTransaction()) {
DeleteObjectResult result = deleteObjectAttempt(type, oidUuid, jdbcSession);
invokeConflictWatchers((w) -> w.afterDeleteObject(oid));

jdbcSession.commit();
return result;
} finally {
registerOperationFinish(opHandle);
}
}

Expand Down Expand Up @@ -948,22 +967,25 @@ private <T extends ObjectType> int executeCountObjects(
.addParam(OperationResult.PARAM_OPTIONS, String.valueOf(options))
.build();

long opHandle = registerOperationStart(OP_SEARCH_OBJECTS, type);
try (var sqaleResult = SqlBaseOperationTracker.with(operationResult)) {
logSearchInputParameters(type, query, "Search objects");

query = ObjectQueryUtil.simplifyQuery(query);
if (ObjectQueryUtil.isNoneQuery(query)) {
var finalQuery = ObjectQueryUtil.simplifyQuery(query);
if (ObjectQueryUtil.isNoneQuery(finalQuery)) {
return new SearchResultList<>();
}

return executeSearchObjects(type, query, options, OP_SEARCH_OBJECTS);
return executeRetriable(OP_SEARCH_OBJECTS, null, opHandle, () -> executeSearchObjects(type, finalQuery, options, OP_SEARCH_OBJECTS));
} catch (RepositoryException | RuntimeException e) {
throw handledGeneralException(e, operationResult);
} catch (ObjectNotFoundException | ObjectAlreadyExistsException e) {
throw new SystemException("Should not happen", e);
} catch (Throwable t) {
recordFatalError(operationResult, t);
throw t;
} finally {
operationResult.close();
registerOperationFinish(opHandle);
}
}

Expand All @@ -973,19 +995,13 @@ private <T extends ObjectType> SearchResultList<PrismObject<T>> executeSearchObj
Collection<SelectorOptions<GetOperationOptions>> options,
String operationKind)
throws RepositoryException, SchemaException {

long opHandle = registerOperationStart(operationKind, type);
try {
SearchResultList<T> result = sqlQueryExecutor.list(
SqaleQueryContext.from(type, sqlRepoContext),
query,
options);
//noinspection unchecked
return result.map(
o -> (PrismObject<T>) o.asPrismObject());
} finally {
registerOperationFinish(opHandle);
}
SearchResultList<T> result = sqlQueryExecutor.list(
SqaleQueryContext.from(type, sqlRepoContext),
query,
options);
//noinspection unchecked
return result.map(
o -> (PrismObject<T>) o.asPrismObject());
}

@Override
Expand All @@ -1006,8 +1022,8 @@ public <T extends ObjectType> SearchResultMetadata searchObjectsIterative(
try (var sqaleResult = SqlBaseOperationTracker.with(operationResult)) {
logSearchInputParameters(type, query, "Iterative search objects");

query = ObjectQueryUtil.simplifyQuery(query);
if (ObjectQueryUtil.isNoneQuery(query)) {
var finalQuery = ObjectQueryUtil.simplifyQuery(query);
if (ObjectQueryUtil.isNoneQuery(finalQuery)) {
return new SearchResultMetadata().approxNumberOfAllResults(0);
}

Expand Down Expand Up @@ -1086,6 +1102,8 @@ private <T extends ObjectType> SearchResultMetadata executeSearchObjectsIterativ

// we don't call public searchObject to avoid subresults and query simplification
logSearchInputParameters(type, pagedQuery, "Search object iterative page");

// Should we do retries here?
List<PrismObject<T>> objects = executeSearchObjects(
type, pagedQuery, options, OP_SEARCH_OBJECTS_ITERATIVE_PAGE);

Expand Down Expand Up @@ -1873,7 +1891,7 @@ public long advanceSequence(String oid, OperationResult parentResult)
try {
return executeRetriable(opNamePrefix + OP_ADVANCE_SEQUENCE, oidUuid, opHandle,
() -> executeAdvanceSequence(oidUuid));
} catch (RepositoryException | RuntimeException | SchemaException e) {
} catch (ObjectAlreadyExistsException | RepositoryException | RuntimeException | SchemaException e) {
throw handledGeneralException(e, operationResult);
} catch (Throwable t) {
recordFatalError(operationResult, t);
Expand Down Expand Up @@ -1928,7 +1946,7 @@ public void returnUnusedValuesToSequence(
try {
executeRetriable(opNamePrefix + OP_RETURN_UNUSED_VALUES_TO_SEQUENCE, oidUuid, opHandle,
() -> executeReturnUnusedValuesToSequence(oidUuid, unusedValues));
} catch (RepositoryException | RuntimeException | SchemaException e) {
} catch (RepositoryException | RuntimeException | ObjectAlreadyExistsException | SchemaException e) {
throw handledGeneralException(e, operationResult);
} catch (Throwable t) {
recordFatalError(operationResult, t);
Expand Down Expand Up @@ -2396,7 +2414,7 @@ private AggregateSearchContext aggregateQueryHandler(AggregateQuery<?> query, Op
// region Retries


private <R> R executeRetriable(String opName, UUID oid, long opHandle, RetriableOperation<R> operation) throws ObjectNotFoundException, SchemaException, RepositoryException {
private <R, E> R executeRetriable(String opName, UUID oid, long opHandle, RetriableOperation<R> operation) throws ObjectNotFoundException, SchemaException, RepositoryException, ObjectAlreadyExistsException {
var maxAttempts = 100;
var attempt = 1;
while (attempt < maxAttempts) {
Expand Down Expand Up @@ -2433,7 +2451,7 @@ private boolean isRetriableException(Exception e) {
}

private interface RetriableOperation<T> {
T execute() throws ObjectNotFoundException, SchemaException, RepositoryException;
T execute() throws ObjectNotFoundException, SchemaException, RepositoryException, ObjectAlreadyExistsException;

}

Expand Down

0 comments on commit aa49a89

Please sign in to comment.