Skip to content

Commit

Permalink
Relieve work bucket allocation contention
Browse files Browse the repository at this point in the history
New configuration parameters related to work bucket allocation
contention management were added: workAllocationMaxAttempts,
workAllocationRetryInterval, workAllocationInitialDelay,
workAllocationDefaultFreeBucketWaitInterval.

They are part of <taskManager> configuration section.
  • Loading branch information
mederly committed Apr 6, 2018
1 parent 33102dc commit f086922
Show file tree
Hide file tree
Showing 8 changed files with 107 additions and 24 deletions.
Expand Up @@ -75,6 +75,11 @@ public class TaskManagerConfiguration {
private static final String RUN_NOW_KEEPS_ORIGINAL_SCHEDULE_CONFIG_ENTRY = "runNowKeepsOriginalSchedule";
private static final String SCHEDULER_INITIALLY_STOPPED_CONFIG_ENTRY = "schedulerInitiallyStopped";

private static final String WORK_ALLOCATION_MAX_ATTEMPTS_ENTRY = "workAllocationMaxAttempts";
private static final String WORK_ALLOCATION_RETRY_INTERVAL_ENTRY = "workAllocationRetryInterval";
private static final String WORK_ALLOCATION_INITIAL_DELAY_ENTRY = "workAllocationInitialDelay";
private static final String WORK_ALLOCATION_DEFAULT_FREE_BUCKET_WAIT_INTERVAL_ENTRY = "workAllocationDefaultFreeBucketWaitInterval";

private static final String MIDPOINT_NODE_ID_PROPERTY = "midpoint.nodeId";
private static final String MIDPOINT_JMX_HOST_NAME_PROPERTY = "midpoint.jmxHostName";
private static final String JMX_PORT_PROPERTY = "com.sun.management.jmxremote.port";
Expand All @@ -99,6 +104,11 @@ public class TaskManagerConfiguration {
private static final int STALLED_TASKS_REPEATED_NOTIFICATION_INTERVAL_DEFAULT = 3600;
private static final boolean RUN_NOW_KEEPS_ORIGINAL_SCHEDULE_DEFAULT = false;

private static final int WORK_ALLOCATION_MAX_ATTEMPTS_DEFAULT = 40;
private static final long WORK_ALLOCATION_RETRY_INTERVAL_DEFAULT = 5000L;
private static final long WORK_ALLOCATION_INITIAL_DELAY_DEFAULT = 5000L;
private static final long WORK_ALLOCATION_DEFAULT_FREE_BUCKET_WAIT_INTERVAL_DEFAULT = 20000L;

private boolean stopOnInitializationFailure;
private int threads;
private boolean jdbcJobStore;
Expand All @@ -117,6 +127,11 @@ public class TaskManagerConfiguration {
private boolean runNowKeepsOriginalSchedule;
private boolean schedulerInitiallyStopped;

private int workAllocationMaxAttempts;
private long workAllocationRetryInterval;
private long workAllocationInitialDelay;
private long workAllocationDefaultFreeBucketWaitInterval;

// JMX credentials for connecting to remote nodes
private String jmxUsername;
private String jmxPassword;
Expand Down Expand Up @@ -178,7 +193,11 @@ public class TaskManagerConfiguration {
STALLED_TASKS_THRESHOLD_CONFIG_ENTRY,
STALLED_TASKS_REPEATED_NOTIFICATION_INTERVAL_CONFIG_ENTRY,
RUN_NOW_KEEPS_ORIGINAL_SCHEDULE_CONFIG_ENTRY,
SCHEDULER_INITIALLY_STOPPED_CONFIG_ENTRY
SCHEDULER_INITIALLY_STOPPED_CONFIG_ENTRY,
WORK_ALLOCATION_MAX_ATTEMPTS_ENTRY,
WORK_ALLOCATION_RETRY_INTERVAL_ENTRY,
WORK_ALLOCATION_INITIAL_DELAY_ENTRY,
WORK_ALLOCATION_DEFAULT_FREE_BUCKET_WAIT_INTERVAL_ENTRY
);

void checkAllowedKeys(MidpointConfiguration masterConfig) throws TaskManagerConfigurationException {
Expand Down Expand Up @@ -269,6 +288,12 @@ void setBasicInformation(MidpointConfiguration masterConfig) throws TaskManagerC
stalledTasksRepeatedNotificationInterval = c.getInt(STALLED_TASKS_REPEATED_NOTIFICATION_INTERVAL_CONFIG_ENTRY, STALLED_TASKS_REPEATED_NOTIFICATION_INTERVAL_DEFAULT);
runNowKeepsOriginalSchedule = c.getBoolean(RUN_NOW_KEEPS_ORIGINAL_SCHEDULE_CONFIG_ENTRY, RUN_NOW_KEEPS_ORIGINAL_SCHEDULE_DEFAULT);
schedulerInitiallyStopped = c.getBoolean(SCHEDULER_INITIALLY_STOPPED_CONFIG_ENTRY, false);

workAllocationMaxAttempts = c.getInt(WORK_ALLOCATION_MAX_ATTEMPTS_ENTRY, WORK_ALLOCATION_MAX_ATTEMPTS_DEFAULT);
workAllocationRetryInterval = c.getLong(WORK_ALLOCATION_RETRY_INTERVAL_ENTRY, WORK_ALLOCATION_RETRY_INTERVAL_DEFAULT);
workAllocationInitialDelay = c.getLong(WORK_ALLOCATION_INITIAL_DELAY_ENTRY, WORK_ALLOCATION_INITIAL_DELAY_DEFAULT);
workAllocationDefaultFreeBucketWaitInterval = c.getLong(WORK_ALLOCATION_DEFAULT_FREE_BUCKET_WAIT_INTERVAL_ENTRY,
WORK_ALLOCATION_DEFAULT_FREE_BUCKET_WAIT_INTERVAL_DEFAULT);
}

private static final Map<String,String> schemas = new HashMap<>();
Expand Down Expand Up @@ -339,8 +364,6 @@ void setJdbcJobStoreInformation(MidpointConfiguration masterConfig, SqlRepositor

/**
* Check configuration, except for JDBC JobStore-specific parts.
*
* @throws TaskManagerConfigurationException
*/
void validateBasicInformation() throws TaskManagerConfigurationException {

Expand Down Expand Up @@ -525,4 +548,20 @@ public String getDataSource() {
public boolean isSchedulerInitiallyStopped() {
return schedulerInitiallyStopped;
}

public int getWorkAllocationMaxAttempts() {
return workAllocationMaxAttempts;
}

public long getWorkAllocationRetryInterval() {
return workAllocationRetryInterval;
}

public long getWorkAllocationInitialDelay() {
return workAllocationInitialDelay;
}

public long getWorkAllocationDefaultFreeBucketWaitInterval() {
return workAllocationDefaultFreeBucketWaitInterval;
}
}
Expand Up @@ -2141,6 +2141,6 @@ public TaskHandler createAndRegisterPartitioningTaskHandler(String handlerUri, F

@Override
public void setFreeBucketWaitInterval(long value) {
workStateManager.setFreeBucketWaitInterval(value);
workStateManager.setFreeBucketWaitIntervalOverride(value);
}
}
Expand Up @@ -689,6 +689,12 @@ private TaskRunResult executeWorkBucketAwareTaskHandler(WorkBucketAwareTaskHandl
}
}

try {
workStateManager.executeInitialDelay(task);
} catch (InterruptedException e) {
return createInterruptedTaskRunResult();
}

TaskWorkBucketProcessingResult runResult = null;
for (;;) {
WorkBucketType bucket;
Expand Down
Expand Up @@ -34,6 +34,9 @@
import com.evolveum.midpoint.task.api.Task;
import com.evolveum.midpoint.task.api.TaskExecutionStatus;
import com.evolveum.midpoint.task.api.TaskManager;
import com.evolveum.midpoint.task.quartzimpl.TaskManagerConfiguration;
import com.evolveum.midpoint.task.quartzimpl.TaskManagerQuartzImpl;
import com.evolveum.midpoint.task.quartzimpl.TaskQuartzImpl;
import com.evolveum.midpoint.task.quartzimpl.work.segmentation.content.WorkBucketContentHandler;
import com.evolveum.midpoint.task.quartzimpl.work.segmentation.content.WorkBucketContentHandlerRegistry;
import com.evolveum.midpoint.task.quartzimpl.work.segmentation.WorkSegmentationStrategy;
Expand Down Expand Up @@ -81,12 +84,9 @@ public class WorkStateManager {
@Autowired private WorkSegmentationStrategyFactory strategyFactory;
@Autowired private WorkBucketContentHandlerRegistry handlerFactory;

private static final int MAX_ATTEMPTS = 40; // temporary
private static final long DELAY_INTERVAL = 5000L; // temporary
private static final long DEFAULT_FREE_BUCKET_WAIT_INTERVAL = 20000L;
private static final long DYNAMIC_SLEEP_INTERVAL = 100L;

private long freeBucketWaitInterval = DEFAULT_FREE_BUCKET_WAIT_INTERVAL;
private Long freeBucketWaitIntervalOverride = null;

private class Context {
Task workerTask;
Expand All @@ -113,15 +113,15 @@ public void reloadWorkerTask(OperationResult result) throws SchemaException, Obj
workerTask = taskManager.getTask(workerTask.getOid(), null, result);
}

public boolean canRun() {
return canRunSupplier == null || BooleanUtils.isTrue(canRunSupplier.get());
}

public TaskWorkManagementType getWorkStateConfiguration() {
return isStandalone() ? workerTask.getWorkManagement() : coordinatorTask.getWorkManagement();
}
}

public boolean canRun(Supplier<Boolean> canRunSupplier) {
return canRunSupplier == null || BooleanUtils.isTrue(canRunSupplier.get());
}

/**
* Allocates work bucket. If no free work buckets are currently present it tries to create one.
* If there is already allocated work bucket for given worker task, it is returned.
Expand Down Expand Up @@ -172,8 +172,8 @@ private WorkBucketType getWorkBucketMultiNode(Context ctx, long freeBucketWaitTi

waitForAvailableBucket: // this cycle exits when something is found OR when a definite 'no more buckets' answer is received
for (;;) {
waitForConflictLessUpdate: // this cycle exits when coordinator task update succeeds
for (int attempt = 0; attempt < MAX_ATTEMPTS; attempt++) {
waitForConflictLessUpdate: // this cycle exits when coordinator task update succeeds
for (int attempt = 0; attempt < getMaxAttempts(); attempt++) {
TaskWorkStateType coordinatorWorkState = getWorkStateOrNew(ctx.coordinatorTask.getTaskPrismObject());
GetBucketResult response = workStateStrategy.getBucket(coordinatorWorkState);
LOGGER.trace("getWorkBucketMultiNode: workStateStrategy returned {} for worker task {}, coordinator {}", response, ctx.workerTask, ctx.coordinatorTask);
Expand Down Expand Up @@ -215,7 +215,7 @@ private WorkBucketType getWorkBucketMultiNode(Context ctx, long freeBucketWaitTi
return null;
}
//System.out.println("*** No free work bucket -- waiting ***");
dynamicSleep(Math.min(toWait, freeBucketWaitInterval), ctx);
dynamicSleep(Math.min(toWait, getFreeBucketWaitInterval()), ctx);
ctx.reloadCoordinatorTask(result);
ctx.reloadWorkerTask(result);
if (reclaimWronglyAllocatedBuckets(ctx.coordinatorTask, result)) {
Expand All @@ -229,8 +229,10 @@ private WorkBucketType getWorkBucketMultiNode(Context ctx, long freeBucketWaitTi
throw new AssertionError(response);
}
} catch (PreconditionViolationException e) {
long delay = (long) (Math.random() * DELAY_INTERVAL);
LOGGER.debug("getWorkBucketMultiNode: conflict; this was attempt #{}; waiting {} ms", attempt, delay, e);
long delay = (long) (Math.random() * getDelayInterval());
// temporary
LOGGER.info("getWorkBucketMultiNode: conflict; this was attempt #{}; waiting {} ms in {}, worker {}",
attempt, delay, ctx.coordinatorTask, ctx.workerTask, e);
dynamicSleep(delay, ctx);
ctx.reloadCoordinatorTask(result);
ctx.reloadWorkerTask(result);
Expand All @@ -243,6 +245,27 @@ private WorkBucketType getWorkBucketMultiNode(Context ctx, long freeBucketWaitTi
}
}

private long getFreeBucketWaitInterval() {
return freeBucketWaitIntervalOverride != null ? freeBucketWaitIntervalOverride :
getConfiguration().getWorkAllocationDefaultFreeBucketWaitInterval();
}

private int getMaxAttempts() {
return getConfiguration().getWorkAllocationMaxAttempts();
}

private long getDelayInterval() {
return getConfiguration().getWorkAllocationRetryInterval();
}

private long getInitialDelay() {
return getConfiguration().getWorkAllocationInitialDelay();
}

private TaskManagerConfiguration getConfiguration() {
return ((TaskManagerQuartzImpl) taskManager).getConfiguration();
}

private void setOrUpdateEstimatedNumberOfBuckets(Task task, WorkSegmentationStrategy workStateStrategy, OperationResult result)
throws SchemaException, ObjectAlreadyExistsException, ObjectNotFoundException {
Integer number = workStateStrategy.estimateNumberOfBuckets(task.getWorkState());
Expand All @@ -255,8 +278,12 @@ private void setOrUpdateEstimatedNumberOfBuckets(Task task, WorkSegmentationStra
}

private void dynamicSleep(long delay, Context ctx) throws InterruptedException {
dynamicSleep(delay, ctx.canRunSupplier);
}

private void dynamicSleep(long delay, Supplier<Boolean> canRunSupplier) throws InterruptedException {
while (delay > 0) {
if (!ctx.canRun()) {
if (!canRun(canRunSupplier)) {
throw new InterruptedException();
}
Thread.sleep(Math.min(delay, DYNAMIC_SLEEP_INTERVAL));
Expand Down Expand Up @@ -556,8 +583,8 @@ private TaskWorkStateType getWorkState(Task task) throws SchemaException {
}
}

public void setFreeBucketWaitInterval(long freeBucketWaitInterval) {
this.freeBucketWaitInterval = freeBucketWaitInterval;
public void setFreeBucketWaitIntervalOverride(Long value) {
this.freeBucketWaitIntervalOverride = value;
}

// TODO
Expand Down Expand Up @@ -593,4 +620,15 @@ public ObjectQuery narrowQueryForWorkBucket(Task workerTask, ObjectQuery query,
// TODO update sorting criteria
return updatedQuery;
}

public void executeInitialDelay(TaskQuartzImpl task) throws InterruptedException {
if (task.getWorkManagement() != null && task.getWorkManagement().getTaskKind() == TaskKindType.WORKER) {
long delay = (long) (Math.random() * getInitialDelay());
if (delay != 0) {
// temporary info level logging
LOGGER.info("executeInitialDelay: waiting {} ms in {}", delay, task);
dynamicSleep(delay, task::canRun);
}
}
}
}
Expand Up @@ -95,7 +95,7 @@ protected String coordinatorTaskOid(String TEST_NAME) {
@PostConstruct
public void initialize() throws Exception {
super.initialize();
workStateManager.setFreeBucketWaitInterval(1000L);
workStateManager.setFreeBucketWaitIntervalOverride(1000L);
DebugUtil.setPrettyPrintBeansAs(PrismContext.LANG_YAML);
}

Expand Down
Expand Up @@ -106,7 +106,7 @@ protected String coordinatorTaskOid(String TEST_NAME) {
@PostConstruct
public void initialize() throws Exception {
super.initialize();
workStateManager.setFreeBucketWaitInterval(1000L);
workStateManager.setFreeBucketWaitIntervalOverride(1000L);
DebugUtil.setPrettyPrintBeansAs(PrismContext.LANG_YAML);
}

Expand Down
Expand Up @@ -114,7 +114,7 @@ protected String coordinatorTaskOid(String TEST_NAME) {
@PostConstruct
public void initialize() throws Exception {
super.initialize();
workStateManager.setFreeBucketWaitInterval(1000L);
workStateManager.setFreeBucketWaitIntervalOverride(1000L);
DebugUtil.setPrettyPrintBeansAs(PrismContext.LANG_YAML);
}

Expand Down
Expand Up @@ -102,7 +102,7 @@ protected String coordinatorTaskOid(String TEST_NAME) {
@PostConstruct
public void initialize() throws Exception {
super.initialize();
workStateManager.setFreeBucketWaitInterval(1000L);
workStateManager.setFreeBucketWaitIntervalOverride(1000L);
DebugUtil.setPrettyPrintBeansAs(PrismContext.LANG_YAML);
}

Expand Down

0 comments on commit f086922

Please sign in to comment.