Skip to content

Commit

Permalink
sqale: Introduced retry mechanism and use it for advance sequence.
Browse files Browse the repository at this point in the history
  • Loading branch information
tonydamage committed Mar 7, 2024
1 parent 1c1be26 commit 0db1736
Showing 1 changed file with 115 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,17 @@
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import com.evolveum.midpoint.util.MiscUtil;

import com.evolveum.midpoint.util.backoff.BackoffComputer;
import com.evolveum.midpoint.util.backoff.ExponentialBackoffComputer;

import com.evolveum.midpoint.util.logging.Trace;
import com.evolveum.midpoint.util.logging.TraceManager;

import com.google.common.base.Strings;
import com.google.common.collect.ObjectArrays;
import com.querydsl.core.Tuple;
Expand All @@ -26,6 +33,7 @@
import org.apache.commons.lang3.Validate;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.postgresql.util.PSQLException;
import org.springframework.beans.factory.annotation.Autowired;

import com.evolveum.midpoint.common.SequenceUtil;
Expand Down Expand Up @@ -1790,7 +1798,6 @@ private String pagingCookie(Containerable t) {
// endregion

@Override

public <O extends ObjectType> boolean isDescendant(
PrismObject<O> object, String ancestorOrgOid) {
Validate.notNull(object, "object must not be null");
Expand Down Expand Up @@ -1861,22 +1868,22 @@ public long advanceSequence(String oid, OperationResult parentResult)
OperationResult operationResult = parentResult.subresult(opNamePrefix + OP_ADVANCE_SEQUENCE)
.addParam(OperationResult.PARAM_OID, oid)
.build();

long opHandle = registerOperationStart(OP_ADVANCE_SEQUENCE, SequenceType.class);
try {
return executeAdvanceSequence(oidUuid);
return executeRetriable(opNamePrefix + OP_ADVANCE_SEQUENCE, oid, opHandle, () -> executeAdvanceSequence(oidUuid));
} catch (RepositoryException | RuntimeException | SchemaException e) {
throw handledGeneralException(e, operationResult);
} catch (Throwable t) {
recordFatalError(operationResult, t);
throw t;
} finally {
registerOperationFinish(opHandle);
operationResult.close();
}
}

private long executeAdvanceSequence(UUID oid)
throws ObjectNotFoundException, SchemaException, RepositoryException {
long opHandle = registerOperationStart(OP_ADVANCE_SEQUENCE, SequenceType.class);

try (JdbcSession jdbcSession = sqlRepoContext.newJdbcSession().startTransaction()) {
RootUpdateContext<SequenceType, QObject<MObject>, MObject> updateContext =
Expand All @@ -1893,8 +1900,6 @@ private long executeAdvanceSequence(UUID oid)
updateContext.finishExecutionOwn();
jdbcSession.commit();
return returnValue;
} finally {
registerOperationFinish(opHandle);
}
}

Expand Down Expand Up @@ -2386,4 +2391,108 @@ private AggregateSearchContext aggregateQueryHandler(AggregateQuery<?> query, Op
return new AggregateSearchContext(query, queryContext, result);
}

// region Retries


private <R> R executeRetriable(String opName, String oid, long opHandle, RetriableOperation<R> operation) throws ObjectNotFoundException, SchemaException, RepositoryException {
var maxAttempts = 100;
var attempt = 1;
while (attempt < maxAttempts) {

try {
return operation.execute();
} catch (RuntimeException | RepositoryException e) {
if (!isRetriableException(e)) {
throw e;
}
performanceMonitor.registerOperationNewAttempt(opHandle, attempt);
attempt = prepareNextRetry(opName, oid, attempt,e);
}
}
throw new SystemException("MAX Attempt count reached");
}

/**
* Error for repeateble read isolation - this error message is part of PostgreSQL contract as described in docs.
*
* https://www.postgresql.org/docs/16/transaction-iso.html
*/
private static final String PSQL_CONCURRENT_UPDATE_MESSAGE = "ERROR: could not serialize access due to concurrent update";

private boolean isRetriableException(Exception e) {
Throwable toCheck = e;
while (toCheck != null) {
if (toCheck instanceof PSQLException && toCheck.getMessage().equals(PSQL_CONCURRENT_UPDATE_MESSAGE)) {
return true;
}
toCheck = toCheck.getCause();
}
return false;
}

private interface RetriableOperation<T> {
T execute() throws ObjectNotFoundException, SchemaException, RepositoryException;

}

private static final Trace CONTENTION_LOGGER =
TraceManager.getTrace(SqaleRepositoryService.class.getName() + ".contention");

/**
* How many times we want to repeat operation after lock acquisition,
* pessimistic, optimistic exception.
*/
public static final int LOCKING_MAX_RETRIES = 40;

/**
* Timeout will be a random number between 0 and LOCKING_DELAY_INTERVAL_BASE * 2^exp
* where exp is either real attempt # minus 1, or LOCKING_EXP_THRESHOLD (whatever is lesser).
*/
public static final long LOCKING_DELAY_INTERVAL_BASE = 50;

public static final int LOCKING_EXP_THRESHOLD = 7; // i.e. up to 6400ms wait time

public static final int CONTENTION_LOG_DEBUG_THRESHOLD = 3;
public static final int MAIN_LOG_WARN_THRESHOLD = 8;


private int prepareNextRetry(String operation, String oid, int attempt, Exception ex) {
BackoffComputer backoffComputer = new ExponentialBackoffComputer(LOCKING_MAX_RETRIES, LOCKING_DELAY_INTERVAL_BASE, LOCKING_EXP_THRESHOLD, null);
long waitTime;
try {
waitTime = backoffComputer.computeDelay(attempt);
} catch (BackoffComputer.NoMoreRetriesException e) {
CONTENTION_LOGGER.error("A serialization-related problem occurred, maximum attempts ({}) reached.", attempt, ex);
LOGGER.error("A serialization-related problem occurred, maximum attempts ({}) reached.", attempt, ex);
/*
if (result != null) {
result.recordFatalError("A serialization-related problem occurred.", ex);
}
*/
throw new SystemException(ex.getMessage() + " [attempts: " + attempt + "]", ex);
}
String message = "A serialization-related problem occurred when {} object with oid '{}', retrying after "
+ "{} ms (this is retry {} of {})\n{}: {}";
Object[] objects = { operation, oid, waitTime, attempt, LOCKING_MAX_RETRIES, ex.getClass().getSimpleName(), ex.getMessage() };
if (attempt >= CONTENTION_LOG_DEBUG_THRESHOLD) {
CONTENTION_LOGGER.debug(message, objects);
} else {
CONTENTION_LOGGER.trace(message, objects);
}
if (attempt >= MAIN_LOG_WARN_THRESHOLD) {
LOGGER.warn(message, objects);
} else {
LOGGER.debug(message, objects);
}
if (waitTime > 0) {
try {
Thread.sleep(waitTime);
} catch (InterruptedException ex1) {
// ignore this
}
}
return attempt + 1;
}


}

0 comments on commit 0db1736

Please sign in to comment.