Skip to content
Permalink
Browse files
clearer semantics for cancel, and cancel dependent submitted tasks by…
… default

prevents leaks where dependent tasks (eg resolveValue) are submitted in the background,
then the caller is cancelled; the interruption was not propagated.  now by default it is,
to children tasks and to submitted transients, with options for other (weaker and stronger) cancellations.
see TaskInternal.cancel(TaskCancellationOptions), and new tests for cancelling children in DynamicSequentialTaskTest.

also remove deprecated loose-typing ExecutionUtils.invoke,
more TRACE logging for activities,
and wrap batch config in a task so more resolutions are nested
  • Loading branch information
ahgittin committed Jan 20, 2016
1 parent 2f3e465 commit aed078633aaf40a4a9da907469700808a245cc78
Showing 20 changed files with 409 additions and 121 deletions.
@@ -49,3 +49,8 @@ parent or application root in YAML.

For changes in prior versions, please refer to the release notes for
[0.8.0](/v/0.8.0-incubating/misc/release-notes.html).

3. Task cancellation is now propagated to dependent submitted tasks, including backgrounded tasks if they are transient.
Previously when a task was cancelled the API did not guarantee semantics but the behaviour was to cancel sub-tasks only
in very limited cases. Now the semantics are more precise and controllable, and more sub-tasks are cancelled.
This can prevent some leaked waits on `attributeWhenReady`.
@@ -80,6 +80,24 @@
*/
public boolean isError();

/**
* As {@link Future#isDone()}. In particular if cancelled, this will return true
* as soon as it is cancelled. The thread for this task may still be running,
* if the cancellation (often an interruption, but may be weaker) has not applied,
* and submitted threads may also be running depending on cancellation parameters.
* <p>
* {@link #get()} is guaranteed to return immediately, throwing in the case of cancellation
* prior to completion (and including the case above where a thread may still be running).
* <p>
* To check whether cancelled threads for this task have completed,
* inspect {@link #getEndTimeUtc()}, which is guaranteed to be set when threads complete
* if the thread is started (as determinable by whether {@link #getStartTimeUtc()} is set).
* (The threads of submitted/child tasks will usually be independent; to determine their
* completion requires inspecting the {@link ExecutionManager}.)
*/
@Override
public boolean isDone();

/**
* Causes calling thread to block until the task is started.
*/
@@ -115,7 +115,7 @@ public final T get() {

try {
if (log.isDebugEnabled())
log.debug("Queuing task to resolve "+dsl);
log.debug("Queuing task to resolve "+dsl+", called by "+Tasks.current());

EntityInternal entity = (EntityInternal) BrooklynTaskTags.getTargetOrContextEntity(Tasks.current());
ExecutionContext exec =
@@ -307,12 +307,16 @@ protected <T extends Entity> void initEntityAndDescendants(String entityId, fina

/* Marked transient so that the task is not needlessly kept around at the highest level.
* Note that the task is not normally visible in the GUI, because
* (a) while it is running, the entity is parentless (and so not in the tree);
* (a) while it is running, the entity is often parentless (and so not in the tree);
* and (b) when it is completed it is GC'd, as it is transient.
* However task info is available via the API if you know its ID,
* and if better subtask querying is available it will be picked up as a background task
* of the parent entity creating this child entity
* (note however such subtasks are currently filtered based on parent entity so is excluded).
* <p>
* Some of these (initializers and enrichers) submit background scheduled tasks,
* which currently show up at the top level once the initializer task completes.
* TODO It would be nice if these schedule tasks were grouped in a bucket!
*/
((EntityInternal)entity).getExecutionContext().submit(Tasks.builder().dynamic(false).displayName("Entity initialization")
.tag(BrooklynTaskTags.tagForContextEntity(entity))
@@ -248,7 +248,7 @@ public V call() {

// return immediately if either the ready predicate or the abort conditions hold
if (ready(value)) return postProcess(value);

final List<Exception> abortionExceptions = Lists.newCopyOnWriteArrayList();
long start = System.currentTimeMillis();

@@ -790,6 +790,7 @@ public Task<V> build() {
.displayName("waiting on "+sensor.getName())
.description("Waiting on sensor "+sensor.getName()+" from "+source)
.tag("attributeWhenReady")
.tag(BrooklynTaskTags.TRANSIENT_TASK_TAG)
.body(new WaitInTaskForAttributeReady<T,V>(this))
.build();
}
@@ -175,18 +175,18 @@ public T call() {
}

final Object startCallback = properties.get("newTaskStartCallback");
properties.put("newTaskStartCallback", new Function<Object,Void>() {
public Void apply(Object it) {
properties.put("newTaskStartCallback", new Function<Task<?>,Void>() {
public Void apply(Task<?> it) {
registerPerThreadExecutionContext();
if (startCallback!=null) ExecutionUtils.invoke(startCallback, it);
if (startCallback!=null) BasicExecutionManager.invokeCallback(startCallback, it);
return null;
}});

final Object endCallback = properties.get("newTaskEndCallback");
properties.put("newTaskEndCallback", new Function<Object,Void>() {
public Void apply(Object it) {
properties.put("newTaskEndCallback", new Function<Task<?>,Void>() {
public Void apply(Task<?> it) {
try {
if (endCallback!=null) ExecutionUtils.invoke(endCallback, it);
if (endCallback!=null) BasicExecutionManager.invokeCallback(endCallback, it);
} finally {
clearPerThreadExecutionContext();
}
@@ -19,6 +19,7 @@
package org.apache.brooklyn.util.core.task;

import static com.google.common.base.Preconditions.checkNotNull;
import groovy.lang.Closure;

import java.util.Collection;
import java.util.Collections;
@@ -51,16 +52,22 @@
import org.apache.brooklyn.api.mgmt.TaskAdaptable;
import org.apache.brooklyn.core.BrooklynFeatureEnablement;
import org.apache.brooklyn.core.config.Sanitizer;
import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
import org.apache.brooklyn.util.collections.MutableList;
import org.apache.brooklyn.util.core.task.TaskInternal.TaskCancellationMode;
import org.apache.brooklyn.util.exceptions.Exceptions;
import org.apache.brooklyn.util.exceptions.RuntimeInterruptedException;
import org.apache.brooklyn.util.text.Identifiers;
import org.apache.brooklyn.util.text.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.annotations.Beta;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.CaseFormat;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ExecutionList;
@@ -368,7 +375,6 @@ protected Task<?> submitNewScheduledTask(final Map<?,?> flags, final ScheduledTa
return submitSubsequentScheduledTask(flags, task);
}

@SuppressWarnings("unchecked")
protected Task<?> submitSubsequentScheduledTask(final Map<?,?> flags, final ScheduledTask task) {
if (!task.isDone()) {
task.internalFuture = delayedRunner.schedule(new ScheduledTaskCallable(task, flags),
@@ -508,9 +514,15 @@ public T call() {
*/
if (log.isDebugEnabled()) {
// debug only here, because most submitters will handle failures
log.debug("Exception running task "+task+" (rethrowing): "+error.getMessage(), error);
if (log.isTraceEnabled())
log.trace("Trace for exception running task "+task+" (rethrowing): "+error.getMessage(), error);
if (error instanceof InterruptedException || error instanceof RuntimeInterruptedException) {
log.debug("Detected interruption on task "+task+" (rethrowing)" +
(Strings.isNonBlank(error.getMessage()) ? ": "+error.getMessage() : ""));
} else {
log.debug("Exception running task "+task+" (rethrowing): "+error);
}
if (log.isTraceEnabled()) {
log.trace("Trace for exception running task "+task+" (rethrowing): "+error, error);
}
}
throw Exceptions.propagate(error);
}
@@ -526,19 +538,63 @@ public String toString() {
}
}

private final static class ListenableForwardingFutureForTask<T> extends ListenableForwardingFuture<T> {
@SuppressWarnings("deprecation")
// TODO do we even need a listenable future here? possibly if someone wants to interrogate the future it might
// be interesting, so possibly it is useful that we implement ListenableFuture...
private final static class CancellingListenableForwardingFutureForTask<T> extends ListenableForwardingFuture<T> {
private final Task<T> task;
private BasicExecutionManager execMgmt;

private ListenableForwardingFutureForTask(Future<T> delegate, ExecutionList list, Task<T> task) {
private CancellingListenableForwardingFutureForTask(BasicExecutionManager execMgmt, Future<T> delegate, ExecutionList list, Task<T> task) {
super(delegate, list);
this.execMgmt = execMgmt;
this.task = task;
}

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
public boolean cancel(TaskCancellationMode mode) {
boolean result = false;
if (!task.isCancelled()) result |= task.cancel(mayInterruptIfRunning);
result |= super.cancel(mayInterruptIfRunning);
if (log.isTraceEnabled()) {
log.trace("CLFFT cancelling "+task+" mode "+mode);
}
if (!task.isCancelled()) result |= ((TaskInternal<T>)task).cancel(mode);
result |= delegate().cancel(mode.isAllowedToInterruptTask());

if (mode.isAllowedToInterruptAllSubmittedTasks() || mode.isAllowedToInterruptDependentSubmittedTasks()) {
int subtasksFound=0;
int subtasksReallyCancelled=0;

if (task instanceof HasTaskChildren) {
for (Task<?> child: ((HasTaskChildren)task).getChildren()) {
if (log.isTraceEnabled()) {
log.trace("Cancelling "+child+" on recursive cancellation of "+task);
}
subtasksFound++;
if (((TaskInternal<?>)child).cancel(mode)) {
result = true;
subtasksReallyCancelled++;
}
}
}
for (Task<?> t: execMgmt.getAllTasks()) {
if (task.equals(t.getSubmittedByTask())) {
if (mode.isAllowedToInterruptAllSubmittedTasks() || BrooklynTaskTags.isTransient(t)) {
if (log.isTraceEnabled()) {
log.trace("Cancelling "+t+" on recursive cancellation of "+task);
}
subtasksFound++;
if (((TaskInternal<?>)t).cancel(mode)) {
result = true;
subtasksReallyCancelled++;
}
}
}
}
if (log.isTraceEnabled()) {
log.trace("On cancel of "+task+", applicable subtask count "+subtasksFound+", of which "+subtasksReallyCancelled+" were actively cancelled");
}
}

((TaskInternal<?>)task).runListeners();
return result;
}
@@ -571,9 +627,15 @@ public void run() {

@SuppressWarnings("unchecked")
protected <T> Task<T> submitNewTask(final Map<?,?> flags, final Task<T> task) {
if (log.isTraceEnabled()) log.trace("Submitting task {} ({}), with flags {}, and tags {}, job {}",
if (log.isTraceEnabled()) {
log.trace("Submitting task {} ({}), with flags {}, and tags {}, job {}; caller {}",
new Object[] {task.getId(), task, Sanitizer.sanitize(flags), task.getTags(),
(task instanceof TaskInternal ? ((TaskInternal<T>)task).getJob() : "<unavailable>")});
(task instanceof TaskInternal ? ((TaskInternal<T>)task).getJob() : "<unavailable>"),
Tasks.current() });
if (Tasks.current()==null && BrooklynTaskTags.isTransient(task)) {
log.trace("Stack trace for unparented submission of transient "+task, new Throwable("trace only (not an error)"));
}
}

if (task instanceof ScheduledTask)
return (Task<T>) submitNewScheduledTask(flags, (ScheduledTask)task);
@@ -604,15 +666,16 @@ protected <T> Task<T> submitNewTask(final Map<?,?> flags, final Task<T> task) {
} else {
future = runner.submit(job);
}
// on completion, listeners get triggered above; here, below we ensure they get triggered on cancel
// (and we make sure the same ExecutionList is used in the future as in the task)
ListenableFuture<T> listenableFuture = new ListenableForwardingFutureForTask<T>(future, ((TaskInternal<T>)task).getListeners(), task);
// doesn't matter whether the listener is added to the listenableFuture or the task,
// except that for the task we can more easily wrap it so that it only logs debug if the executor is shutdown
// (avoid a bunch of ugly warnings in tests which start and stop things a lot!)
// [probably even nicer to run this in the same thread, it doesn't do much; but that is messier to implement]
// SubmissionCallable (above) invokes the listeners on completion;
// this future allows a caller to add custom listeners
// (it does not notify the listeners; that's our job);
// except on cancel we want to listen
ListenableFuture<T> listenableFuture = new CancellingListenableForwardingFutureForTask<T>(this, future, ((TaskInternal<T>)task).getListeners(), task);
// and we want to make sure *our* (manager) listeners are given suitable callback
((TaskInternal<T>)task).addListener(new SubmissionListenerToCallOtherListeners<T>(task), runner);
// NB: can the above mean multiple callbacks to TaskInternal#runListeners?

// finally expose the future to callers
((TaskInternal<T>)task).initInternalFuture(listenableFuture);

return task;
@@ -665,9 +728,27 @@ protected void internalBeforeStart(Map<?,?> flags, Task<?> task) {
PerThreadCurrentTaskHolder.perThreadCurrentTask.set(task);
((TaskInternal<?>)task).setStartTimeUtc(System.currentTimeMillis());
}
ExecutionUtils.invoke(flags.get("newTaskStartCallback"), task);
invokeCallback(flags.get("newTaskStartCallback"), task);
}

@SuppressWarnings({ "unchecked", "rawtypes" })
// not ideal, such loose typing on the callback -- should prefer Function<Task,Object>
// but at least it's package-private
static Object invokeCallback(Object callable, Task<?> task) {
if (callable instanceof Closure) return ((Closure<?>)callable).call(task);
if (callable instanceof Callable) {
try {
return ((Callable<?>)callable).call();
} catch (Throwable t) {
throw Throwables.propagate(t);
}
}
if (callable instanceof Runnable) { ((Runnable)callable).run(); return null; }
if (callable instanceof Function) { return ((Function)callable).apply(task); }
if (callable==null) return null;
throw new IllegalArgumentException("Cannot invoke unexpected callback object "+callable+" of type "+callable.getClass()+" on "+task);
}

/** normally (if not interrupted) called once for each call to {@link #beforeSubmitScheduledTaskAllIterations(Map, Task)} */
protected void afterEndScheduledTaskAllIterations(Map<?,?> flags, Task<?> task) {
internalAfterEnd(flags, task, false, true);
@@ -693,7 +774,7 @@ protected void internalAfterEnd(Map<?,?> flags, Task<?> task, boolean startedInT
}
if (isEndingAllIterations) {
incompleteTaskIds.remove(task.getId());
ExecutionUtils.invoke(flags.get("newTaskEndCallback"), task);
invokeCallback(flags.get("newTaskEndCallback"), task);
((TaskInternal<?>)task).setEndTimeUtc(System.currentTimeMillis());
}

@@ -155,7 +155,7 @@ public String toString() {
(Strings.isNonEmpty(displayName) ?
displayName :
(job + (tags!=null && !tags.isEmpty() ? ";"+tags : "")) ) +
":"+getId()+"]";
"]@"+getId();
}

@Override
@@ -196,7 +196,7 @@ public Task<T> asTask() {
protected Maybe<Task<?>> submittedByTask;

protected volatile Thread thread = null;
private volatile boolean cancelled = false;
protected volatile boolean cancelled = false;
/** normally a {@link ListenableFuture}, except for scheduled tasks when it may be a {@link ScheduledFuture} */
protected volatile Future<T> internalFuture = null;

@@ -288,15 +288,33 @@ public synchronized boolean uncancel() {
}

@Override
public synchronized boolean cancel(boolean mayInterruptIfRunning) {
public final synchronized boolean cancel(boolean mayInterruptIfRunning) {
// semantics changed in 2016-01, previously "true" was INTERRUPT_TASK_BUT_NOT_SUBMITTED_TASKS
return cancel(mayInterruptIfRunning ? TaskCancellationMode.INTERRUPT_TASK_AND_DEPENDENT_SUBMITTED_TASKS
: TaskCancellationMode.DO_NOT_INTERRUPT);
}

public synchronized boolean cancel(TaskCancellationMode mode) {
if (isDone()) return false;
boolean cancel = true;
if (log.isTraceEnabled()) {
log.trace("BT cancelling "+this+" mode "+mode);
}
cancelled = true;
doCancel(mode);
notifyAll();
return true;
}

@SuppressWarnings("deprecation")
protected boolean doCancel(TaskCancellationMode mode) {
if (internalFuture!=null) {
cancel = internalFuture.cancel(mayInterruptIfRunning);
if (internalFuture instanceof ListenableForwardingFuture) {
return ((ListenableForwardingFuture<?>)internalFuture).cancel(mode);
} else {
return internalFuture.cancel(mode.isAllowedToInterruptTask());
}
}
notifyAll();
return cancel;
return true;
}

@Override

0 comments on commit aed0786

Please sign in to comment.