Skip to content
Permalink
Browse files
This closes #1160
  • Loading branch information
ahgittin committed Jan 20, 2016
2 parents 2a164a8 + 54cba8a commit 8303fea866a9d39c58dc7174f73543477b6e73bc
Showing 20 changed files with 411 additions and 121 deletions.
@@ -51,3 +51,8 @@ referenced types blueprint plan source code and the catalog item ID are now set

For changes in prior versions, please refer to the release notes for
[0.8.0](/v/0.8.0-incubating/misc/release-notes.html).

3. Task cancellation is now propagated to dependent submitted tasks, including backgrounded tasks if they are transient.
Previously when a task was cancelled the API did not guarantee semantics but the behaviour was to cancel sub-tasks only
in very limited cases. Now the semantics are more precise and controllable, and more sub-tasks are cancelled.
This can prevent some leaked waits on `attributeWhenReady`.
@@ -80,6 +80,24 @@
*/
public boolean isError();

/**
* As {@link Future#isDone()}. In particular if cancelled, this will return true
* as soon as it is cancelled. The thread for this task may still be running,
* if the cancellation (often an interruption, but may be weaker) has not applied,
* and submitted threads may also be running depending on cancellation parameters.
* <p>
* {@link #get()} is guaranteed to return immediately, throwing in the case of cancellation
* prior to completion (and including the case above where a thread may still be running).
* <p>
* To check whether cancelled threads for this task have completed,
* inspect {@link #getEndTimeUtc()}, which is guaranteed to be set when threads complete
* if the thread is started (as determinable by whether {@link #getStartTimeUtc()} is set).
* (The threads of submitted/child tasks will usually be independent; to determine their
* completion requires inspecting the {@link ExecutionManager}.)
*/
@Override
public boolean isDone();

/**
* Causes calling thread to block until the task is started.
*/
@@ -115,7 +115,7 @@ public final T get() {

try {
if (log.isDebugEnabled())
log.debug("Queuing task to resolve "+dsl);
log.debug("Queuing task to resolve "+dsl+", called by "+Tasks.current());

EntityInternal entity = (EntityInternal) BrooklynTaskTags.getTargetOrContextEntity(Tasks.current());
ExecutionContext exec =
@@ -307,12 +307,16 @@ protected <T extends Entity> void initEntityAndDescendants(String entityId, fina

/* Marked transient so that the task is not needlessly kept around at the highest level.
* Note that the task is not normally visible in the GUI, because
* (a) while it is running, the entity is parentless (and so not in the tree);
* (a) while it is running, the entity is often parentless (and so not in the tree);
* and (b) when it is completed it is GC'd, as it is transient.
* However task info is available via the API if you know its ID,
* and if better subtask querying is available it will be picked up as a background task
* of the parent entity creating this child entity
* (note however such subtasks are currently filtered based on parent entity so is excluded).
* <p>
* Some of these (initializers and enrichers) submit background scheduled tasks,
* which currently show up at the top level once the initializer task completes.
* TODO It would be nice if these schedule tasks were grouped in a bucket!
*/
((EntityInternal)entity).getExecutionContext().submit(Tasks.builder().dynamic(false).displayName("Entity initialization")
.tag(BrooklynTaskTags.tagForContextEntity(entity))
@@ -248,7 +248,7 @@ public V call() {

// return immediately if either the ready predicate or the abort conditions hold
if (ready(value)) return postProcess(value);

final List<Exception> abortionExceptions = Lists.newCopyOnWriteArrayList();
long start = System.currentTimeMillis();

@@ -790,6 +790,7 @@ public Task<V> build() {
.displayName("waiting on "+sensor.getName())
.description("Waiting on sensor "+sensor.getName()+" from "+source)
.tag("attributeWhenReady")
.tag(BrooklynTaskTags.TRANSIENT_TASK_TAG)
.body(new WaitInTaskForAttributeReady<T,V>(this))
.build();
}
@@ -175,18 +175,18 @@ public T call() {
}

final Object startCallback = properties.get("newTaskStartCallback");
properties.put("newTaskStartCallback", new Function<Object,Void>() {
public Void apply(Object it) {
properties.put("newTaskStartCallback", new Function<Task<?>,Void>() {
public Void apply(Task<?> it) {
registerPerThreadExecutionContext();
if (startCallback!=null) ExecutionUtils.invoke(startCallback, it);
if (startCallback!=null) BasicExecutionManager.invokeCallback(startCallback, it);
return null;
}});

final Object endCallback = properties.get("newTaskEndCallback");
properties.put("newTaskEndCallback", new Function<Object,Void>() {
public Void apply(Object it) {
properties.put("newTaskEndCallback", new Function<Task<?>,Void>() {
public Void apply(Task<?> it) {
try {
if (endCallback!=null) ExecutionUtils.invoke(endCallback, it);
if (endCallback!=null) BasicExecutionManager.invokeCallback(endCallback, it);
} finally {
clearPerThreadExecutionContext();
}
@@ -19,6 +19,7 @@
package org.apache.brooklyn.util.core.task;

import static com.google.common.base.Preconditions.checkNotNull;
import groovy.lang.Closure;

import java.util.Collection;
import java.util.Collections;
@@ -51,16 +52,22 @@
import org.apache.brooklyn.api.mgmt.TaskAdaptable;
import org.apache.brooklyn.core.BrooklynFeatureEnablement;
import org.apache.brooklyn.core.config.Sanitizer;
import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
import org.apache.brooklyn.util.collections.MutableList;
import org.apache.brooklyn.util.core.task.TaskInternal.TaskCancellationMode;
import org.apache.brooklyn.util.exceptions.Exceptions;
import org.apache.brooklyn.util.exceptions.RuntimeInterruptedException;
import org.apache.brooklyn.util.text.Identifiers;
import org.apache.brooklyn.util.text.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.annotations.Beta;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.CaseFormat;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ExecutionList;
@@ -368,7 +375,6 @@ protected Task<?> submitNewScheduledTask(final Map<?,?> flags, final ScheduledTa
return submitSubsequentScheduledTask(flags, task);
}

@SuppressWarnings("unchecked")
protected Task<?> submitSubsequentScheduledTask(final Map<?,?> flags, final ScheduledTask task) {
if (!task.isDone()) {
task.internalFuture = delayedRunner.schedule(new ScheduledTaskCallable(task, flags),
@@ -508,9 +514,15 @@ public T call() {
*/
if (log.isDebugEnabled()) {
// debug only here, because most submitters will handle failures
log.debug("Exception running task "+task+" (rethrowing): "+error.getMessage(), error);
if (log.isTraceEnabled())
log.trace("Trace for exception running task "+task+" (rethrowing): "+error.getMessage(), error);
if (error instanceof InterruptedException || error instanceof RuntimeInterruptedException) {
log.debug("Detected interruption on task "+task+" (rethrowing)" +
(Strings.isNonBlank(error.getMessage()) ? ": "+error.getMessage() : ""));
} else {
log.debug("Exception running task "+task+" (rethrowing): "+error);
}
if (log.isTraceEnabled()) {
log.trace("Trace for exception running task "+task+" (rethrowing): "+error, error);
}
}
throw Exceptions.propagate(error);
}
@@ -526,19 +538,64 @@ public String toString() {
}
}

private final static class ListenableForwardingFutureForTask<T> extends ListenableForwardingFuture<T> {
@SuppressWarnings("deprecation")
// TODO do we even need a listenable future here? possibly if someone wants to interrogate the future it might
// be interesting, so possibly it is useful that we implement ListenableFuture...
private final static class CancellingListenableForwardingFutureForTask<T> extends ListenableForwardingFuture<T> {
private final Task<T> task;
private BasicExecutionManager execMgmt;

private ListenableForwardingFutureForTask(Future<T> delegate, ExecutionList list, Task<T> task) {
private CancellingListenableForwardingFutureForTask(BasicExecutionManager execMgmt, Future<T> delegate, ExecutionList list, Task<T> task) {
super(delegate, list);
this.execMgmt = execMgmt;
this.task = task;
}

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
public boolean cancel(TaskCancellationMode mode) {
boolean result = false;
if (!task.isCancelled()) result |= task.cancel(mayInterruptIfRunning);
result |= super.cancel(mayInterruptIfRunning);
if (log.isTraceEnabled()) {
log.trace("CLFFT cancelling "+task+" mode "+mode);
}
if (!task.isCancelled()) result |= ((TaskInternal<T>)task).cancel(mode);
result |= delegate().cancel(mode.isAllowedToInterruptTask());

if (mode.isAllowedToInterruptAllSubmittedTasks() || mode.isAllowedToInterruptDependentSubmittedTasks()) {
int subtasksFound=0;
int subtasksReallyCancelled=0;

if (task instanceof HasTaskChildren) {
for (Task<?> child: ((HasTaskChildren)task).getChildren()) {
if (log.isTraceEnabled()) {
log.trace("Cancelling "+child+" on recursive cancellation of "+task);
}
subtasksFound++;
if (((TaskInternal<?>)child).cancel(mode)) {
result = true;
subtasksReallyCancelled++;
}
}
}
// TODO this is inefficient; might want to keep an index on submitted-by
for (Task<?> t: execMgmt.getAllTasks()) {
if (task.equals(t.getSubmittedByTask())) {
if (mode.isAllowedToInterruptAllSubmittedTasks() || BrooklynTaskTags.isTransient(t)) {
if (log.isTraceEnabled()) {
log.trace("Cancelling "+t+" on recursive cancellation of "+task);
}
subtasksFound++;
if (((TaskInternal<?>)t).cancel(mode)) {
result = true;
subtasksReallyCancelled++;
}
}
}
}
if (log.isTraceEnabled()) {
log.trace("On cancel of "+task+", applicable subtask count "+subtasksFound+", of which "+subtasksReallyCancelled+" were actively cancelled");
}
}

((TaskInternal<?>)task).runListeners();
return result;
}
@@ -571,9 +628,15 @@ public void run() {

@SuppressWarnings("unchecked")
protected <T> Task<T> submitNewTask(final Map<?,?> flags, final Task<T> task) {
if (log.isTraceEnabled()) log.trace("Submitting task {} ({}), with flags {}, and tags {}, job {}",
if (log.isTraceEnabled()) {
log.trace("Submitting task {} ({}), with flags {}, and tags {}, job {}; caller {}",
new Object[] {task.getId(), task, Sanitizer.sanitize(flags), task.getTags(),
(task instanceof TaskInternal ? ((TaskInternal<T>)task).getJob() : "<unavailable>")});
(task instanceof TaskInternal ? ((TaskInternal<T>)task).getJob() : "<unavailable>"),
Tasks.current() });
if (Tasks.current()==null && BrooklynTaskTags.isTransient(task)) {
log.trace("Stack trace for unparented submission of transient "+task, new Throwable("trace only (not an error)"));
}
}

if (task instanceof ScheduledTask)
return (Task<T>) submitNewScheduledTask(flags, (ScheduledTask)task);
@@ -604,15 +667,16 @@ protected <T> Task<T> submitNewTask(final Map<?,?> flags, final Task<T> task) {
} else {
future = runner.submit(job);
}
// on completion, listeners get triggered above; here, below we ensure they get triggered on cancel
// (and we make sure the same ExecutionList is used in the future as in the task)
ListenableFuture<T> listenableFuture = new ListenableForwardingFutureForTask<T>(future, ((TaskInternal<T>)task).getListeners(), task);
// doesn't matter whether the listener is added to the listenableFuture or the task,
// except that for the task we can more easily wrap it so that it only logs debug if the executor is shutdown
// (avoid a bunch of ugly warnings in tests which start and stop things a lot!)
// [probably even nicer to run this in the same thread, it doesn't do much; but that is messier to implement]
// SubmissionCallable (above) invokes the listeners on completion;
// this future allows a caller to add custom listeners
// (it does not notify the listeners; that's our job);
// except on cancel we want to listen
ListenableFuture<T> listenableFuture = new CancellingListenableForwardingFutureForTask<T>(this, future, ((TaskInternal<T>)task).getListeners(), task);
// and we want to make sure *our* (manager) listeners are given suitable callback
((TaskInternal<T>)task).addListener(new SubmissionListenerToCallOtherListeners<T>(task), runner);
// NB: can the above mean multiple callbacks to TaskInternal#runListeners?

// finally expose the future to callers
((TaskInternal<T>)task).initInternalFuture(listenableFuture);

return task;
@@ -665,9 +729,27 @@ protected void internalBeforeStart(Map<?,?> flags, Task<?> task) {
PerThreadCurrentTaskHolder.perThreadCurrentTask.set(task);
((TaskInternal<?>)task).setStartTimeUtc(System.currentTimeMillis());
}
ExecutionUtils.invoke(flags.get("newTaskStartCallback"), task);
invokeCallback(flags.get("newTaskStartCallback"), task);
}

@SuppressWarnings({ "unchecked", "rawtypes" })
// not ideal, such loose typing on the callback -- should prefer Function<Task,Object>
// but at least it's package-private
static Object invokeCallback(Object callable, Task<?> task) {
if (callable instanceof Closure) return ((Closure<?>)callable).call(task);
if (callable instanceof Callable) {
try {
return ((Callable<?>)callable).call();
} catch (Throwable t) {
throw Exceptions.propagate(t);
}
}
if (callable instanceof Runnable) { ((Runnable)callable).run(); return null; }
if (callable instanceof Function) { return ((Function)callable).apply(task); }
if (callable==null) return null;
throw new IllegalArgumentException("Cannot invoke unexpected callback object "+callable+" of type "+callable.getClass()+" on "+task);
}

/** normally (if not interrupted) called once for each call to {@link #beforeSubmitScheduledTaskAllIterations(Map, Task)} */
protected void afterEndScheduledTaskAllIterations(Map<?,?> flags, Task<?> task) {
internalAfterEnd(flags, task, false, true);
@@ -693,7 +775,7 @@ protected void internalAfterEnd(Map<?,?> flags, Task<?> task, boolean startedInT
}
if (isEndingAllIterations) {
incompleteTaskIds.remove(task.getId());
ExecutionUtils.invoke(flags.get("newTaskEndCallback"), task);
invokeCallback(flags.get("newTaskEndCallback"), task);
((TaskInternal<?>)task).setEndTimeUtc(System.currentTimeMillis());
}

@@ -155,7 +155,7 @@ public String toString() {
(Strings.isNonEmpty(displayName) ?
displayName :
(job + (tags!=null && !tags.isEmpty() ? ";"+tags : "")) ) +
":"+getId()+"]";
"]@"+getId();
}

@Override
@@ -196,7 +196,7 @@ public Task<T> asTask() {
protected Maybe<Task<?>> submittedByTask;

protected volatile Thread thread = null;
private volatile boolean cancelled = false;
protected volatile boolean cancelled = false;
/** normally a {@link ListenableFuture}, except for scheduled tasks when it may be a {@link ScheduledFuture} */
protected volatile Future<T> internalFuture = null;

@@ -288,15 +288,34 @@ public synchronized boolean uncancel() {
}

@Override
public synchronized boolean cancel(boolean mayInterruptIfRunning) {
public final synchronized boolean cancel(boolean mayInterruptIfRunning) {
// semantics changed in 2016-01, previously "true" was INTERRUPT_TASK_BUT_NOT_SUBMITTED_TASKS
return cancel(mayInterruptIfRunning ? TaskCancellationMode.INTERRUPT_TASK_AND_DEPENDENT_SUBMITTED_TASKS
: TaskCancellationMode.DO_NOT_INTERRUPT);
}

@Override @Beta
public synchronized boolean cancel(TaskCancellationMode mode) {
if (isDone()) return false;
boolean cancel = true;
if (log.isTraceEnabled()) {
log.trace("BT cancelling "+this+" mode "+mode);
}
cancelled = true;
doCancel(mode);
notifyAll();
return true;
}

@SuppressWarnings("deprecation")
protected boolean doCancel(TaskCancellationMode mode) {
if (internalFuture!=null) {
cancel = internalFuture.cancel(mayInterruptIfRunning);
if (internalFuture instanceof ListenableForwardingFuture) {
return ((ListenableForwardingFuture<?>)internalFuture).cancel(mode);
} else {
return internalFuture.cancel(mode.isAllowedToInterruptTask());
}
}
notifyAll();
return cancel;
return true;
}

@Override

0 comments on commit 8303fea

Please sign in to comment.