Skip to content

Commit

Permalink
Fix minor issues in work buckets management
Browse files Browse the repository at this point in the history
  • Loading branch information
mederly committed Apr 16, 2018
1 parent 83e2c9c commit f3d0e91
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 15 deletions.
Expand Up @@ -684,7 +684,7 @@ private TaskRunResult executeWorkBucketAwareTaskHandler(WorkBucketAwareTaskHandl
.item(TaskType.F_WORK_STATE).replace()
.asItemDeltas();
task.applyDeltasImmediate(itemDeltas, executionResult);
} catch (SchemaException | ObjectAlreadyExistsException | ObjectNotFoundException e) {
} catch (SchemaException | ObjectAlreadyExistsException | ObjectNotFoundException | RuntimeException e) {
LoggingUtils.logUnexpectedException(LOGGER, "Couldn't remove work state from (completed) task {} -- continuing", e, task);
}
}
Expand Down Expand Up @@ -737,7 +737,7 @@ private TaskRunResult executeWorkBucketAwareTaskHandler(WorkBucketAwareTaskHandl
}
try {
taskManagerImpl.getWorkStateManager().completeWorkBucket(task.getOid(), bucket.getSequentialNumber(), executionResult);
} catch (ObjectAlreadyExistsException | ObjectNotFoundException | SchemaException e) {
} catch (ObjectAlreadyExistsException | ObjectNotFoundException | SchemaException | RuntimeException e) {
LoggingUtils.logUnexpectedException(LOGGER, "Couldn't complete work bucket for task {}", e, task);
return createFailureTaskRunResult("Couldn't complete work bucket: " + e.getMessage(), e);
}
Expand Down
Expand Up @@ -66,7 +66,7 @@ public GetBucketResult getBucket(@NotNull TaskWorkStateType workState) throws Sc
.content(newBucketContent)
.state(WorkBucketStateType.READY));
}
return new GetBucketResult.NewBuckets(newBuckets);
return new GetBucketResult.NewBuckets(newBuckets, 0);
} else {
return new NothingFound(!somethingDelegated);
}
Expand Down
Expand Up @@ -184,9 +184,10 @@ private WorkBucketType getWorkBucketMultiNode(Context ctx, long freeBucketWaitTi
try {
if (response instanceof NewBuckets) {
NewBuckets newBucketsResponse = (NewBuckets) response;
int selected = newBucketsResponse.selected;
List<WorkBucketType> newCoordinatorBuckets = new ArrayList<>(coordinatorWorkState.getBucket());
for (int i = 0; i < newBucketsResponse.newBuckets.size(); i++) {
if (i == 0) {
if (i == selected) {
newCoordinatorBuckets.add(newBucketsResponse.newBuckets.get(i).clone().state(WorkBucketStateType.DELEGATED));
} else {
newCoordinatorBuckets.add(newBucketsResponse.newBuckets.get(i).clone());
Expand All @@ -196,8 +197,8 @@ private WorkBucketType getWorkBucketMultiNode(Context ctx, long freeBucketWaitTi
bucketsReplaceDeltas(newCoordinatorBuckets),
bucketsReplacePrecondition(coordinatorWorkState.getBucket()), null, result);
repositoryService.modifyObject(TaskType.class, ctx.workerTask.getOid(),
bucketsAddDeltas(newBucketsResponse.newBuckets), null, result);
return newBucketsResponse.newBuckets.get(0);
bucketsAddDeltas(newBucketsResponse.newBuckets.subList(selected, selected+1)), null, result);
return newBucketsResponse.newBuckets.get(selected);
} else if (response instanceof FoundExisting) {
FoundExisting existingResponse = (FoundExisting) response;
repositoryService.modifyObject(TaskType.class, ctx.coordinatorTask.getOid(),
Expand Down Expand Up @@ -255,7 +256,7 @@ private WorkBucketType getWorkBucketMultiNode(Context ctx, long freeBucketWaitTi

private BackoffComputer createBackoffComputer() {
TaskManagerConfiguration c = configuration;
return new ExponentialBackoffComputer(c.getWorkAllocationMaxRetries(), c.getWorkAllocationInitialDelay(),
return new ExponentialBackoffComputer(c.getWorkAllocationMaxRetries(), c.getWorkAllocationRetryInterval(),
c.getWorkAllocationRetryExponentialThreshold());
}

Expand Down Expand Up @@ -339,7 +340,7 @@ private WorkBucketType getWorkBucketStandalone(Context ctx, OperationResult resu
NewBuckets newBucketsResponse = (NewBuckets) response;
repositoryService.modifyObject(TaskType.class, ctx.workerTask.getOid(),
bucketsAddDeltas(newBucketsResponse.newBuckets), null, result);
return newBucketsResponse.newBuckets.get(0);
return newBucketsResponse.newBuckets.get(newBucketsResponse.selected);
} else if (response instanceof NothingFound) {
if (!((NothingFound) response).definite) {
throw new AssertionError("Unexpected 'indefinite' answer when looking for next bucket in a standalone task: " + ctx.workerTask);
Expand Down Expand Up @@ -550,11 +551,7 @@ private ModificationPrecondition<TaskType> bucketUnchangedPrecondition(WorkBucke
WorkBucketType currentBucket = findBucketByNumber(getWorkStateOrNew(taskObject).getBucket(),
originalBucket.getSequentialNumber());
// performance is not optimal but OK for precondition checking
boolean rv = currentBucket != null && cloneNoId(currentBucket).equals(cloneNoId(originalBucket));
if (!rv) {
System.out.println("Hi");
}
return rv;
return currentBucket != null && cloneNoId(currentBucket).equals(cloneNoId(originalBucket));
};
}

Expand Down
Expand Up @@ -71,12 +71,14 @@ public FoundExisting(@NotNull WorkBucketType bucket) {
*/
public static class NewBuckets extends GetBucketResult {
/**
* New buckets. The first one is to be returned as the one to be processed.
* New buckets.
*/
@NotNull public final List<WorkBucketType> newBuckets;
public final int selected;

public NewBuckets(@NotNull List<WorkBucketType> newBuckets) {
public NewBuckets(@NotNull List<WorkBucketType> newBuckets, int selected) {
this.newBuckets = newBuckets;
this.selected = selected;
}
}
}
Expand Down

0 comments on commit f3d0e91

Please sign in to comment.