From 1f4a338e0790ab0194874674afffb7734b9a89c0 Mon Sep 17 00:00:00 2001 From: Alex Heneveld Date: Tue, 16 Sep 2014 10:06:28 +0100 Subject: [PATCH 1/4] support for catching errors when resolving values, and test where time-limited access is passed in as a config value and retrieved --- .../brooklyn/entity/proxying/EntitySpec.java | 7 ++ .../entity/basic/BrooklynTaskTags.java | 4 +- .../main/java/brooklyn/util/task/Tasks.java | 1 + .../brooklyn/util/task/ValueResolver.java | 75 ++++++++++++++++++- .../java/brooklyn/util/task/TasksTest.java | 25 +++++++ .../brooklyn/util/task/ValueResolverTest.java | 57 ++++++++++++++ .../java/brooklyn/util/guava/Functionals.java | 4 + 7 files changed, 169 insertions(+), 4 deletions(-) diff --git a/api/src/main/java/brooklyn/entity/proxying/EntitySpec.java b/api/src/main/java/brooklyn/entity/proxying/EntitySpec.java index 4ed953a070..e8acea2ad4 100644 --- a/api/src/main/java/brooklyn/entity/proxying/EntitySpec.java +++ b/api/src/main/java/brooklyn/entity/proxying/EntitySpec.java @@ -42,6 +42,7 @@ import brooklyn.policy.Policy; import brooklyn.policy.PolicySpec; +import com.google.common.base.Supplier; import com.google.common.base.Throwables; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; @@ -384,6 +385,12 @@ public EntitySpec configure(ConfigKey key, Task val) { return this; } + public EntitySpec configure(ConfigKey key, Supplier val) { + checkMutable(); + config.put(checkNotNull(key, "key"), val); + return this; + } + public EntitySpec configure(HasConfigKey key, V val) { checkMutable(); config.put(checkNotNull(key, "key").getConfigKey(), val); diff --git a/core/src/main/java/brooklyn/entity/basic/BrooklynTaskTags.java b/core/src/main/java/brooklyn/entity/basic/BrooklynTaskTags.java index b1197f289e..cbc4ca664b 100644 --- a/core/src/main/java/brooklyn/entity/basic/BrooklynTaskTags.java +++ b/core/src/main/java/brooklyn/entity/basic/BrooklynTaskTags.java @@ -24,6 +24,8 @@ import java.util.Map; import java.util.Set; +import javax.annotation.Nullable; + import org.codehaus.jackson.annotate.JsonProperty; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -262,7 +264,7 @@ public static String tagForEffectorName(String name) { * and we are checking eff2, whether to match eff1 * @return whether the given task is part of the given effector */ - public static boolean isInEffectorTask(Task task, Entity entity, Effector effector, boolean allowNestedEffectorCalls) { + public static boolean isInEffectorTask(Task task, @Nullable Entity entity, @Nullable Effector effector, boolean allowNestedEffectorCalls) { Task t = task; while (t!=null) { Set tags = t.getTags(); diff --git a/core/src/main/java/brooklyn/util/task/Tasks.java b/core/src/main/java/brooklyn/util/task/Tasks.java index 71b43b33cf..43c688beec 100644 --- a/core/src/main/java/brooklyn/util/task/Tasks.java +++ b/core/src/main/java/brooklyn/util/task/Tasks.java @@ -268,6 +268,7 @@ public static boolean tryQueueing(TaskQueueingContext adder, TaskAdaptable ta } } + /** see also {@link #resolving(Object)} which gives much more control about submission, timeout, etc */ public static Supplier supplier(final TaskAdaptable task) { return new Supplier() { @Override diff --git a/core/src/main/java/brooklyn/util/task/ValueResolver.java b/core/src/main/java/brooklyn/util/task/ValueResolver.java index 5508b0d4f1..4bb557e307 100644 --- a/core/src/main/java/brooklyn/util/task/ValueResolver.java +++ b/core/src/main/java/brooklyn/util/task/ValueResolver.java @@ -25,9 +25,13 @@ import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import brooklyn.management.ExecutionContext; import brooklyn.management.Task; import brooklyn.management.TaskAdaptable; +import brooklyn.util.exceptions.Exceptions; import brooklyn.util.flags.TypeCoercions; import brooklyn.util.guava.Maybe; import brooklyn.util.time.CountdownTimer; @@ -47,7 +51,9 @@ *

* Fluent-style API exposes a number of other options. */ -public class ValueResolver { +public class ValueResolver implements DeferredSupplier { + + private static final Logger log = LoggerFactory.getLogger(ValueResolver.class); final Object value; final Class type; @@ -59,6 +65,10 @@ public class ValueResolver { /** timeout on execution, if possible, or if embedResolutionInTask is true */ Duration timeout; + T defaultValue = null; + boolean returnDefaultOnGet = false; + boolean swallowExceptions = false; + // internal fields final Object parentOriginalValue; final CountdownTimer parentTimer; @@ -89,6 +99,8 @@ public class ValueResolver { parentTimer = parent.parentTimer; if (parentTimer!=null && parentTimer.isExpired()) expired = true; + + // default value and swallow exceptions do not need to be nested } public static class ResolverBuilderPretype { @@ -100,6 +112,18 @@ public ValueResolver as(Class type) { return new ValueResolver(v, type); } } + + /** returns a copy of this resolver which can be queried, even if the original (single-use instance) has already been copied */ + public ValueResolver clone() { + ValueResolver result = new ValueResolver(value, type) + .context(exec).description(description) + .embedResolutionInTask(embedResolutionInTask) + .deep(forceDeep) + .timeout(timeout); + if (returnDefaultOnGet) result.defaultValue(defaultValue); + if (swallowExceptions) result.swallowExceptions(); + return result; + } /** execution context to use when resolving; required if resolving unsubmitted tasks or running with a time limit */ public ValueResolver context(ExecutionContext exec) { @@ -113,6 +137,38 @@ public ValueResolver description(String description) { return this; } + /** sets a default value which will be returned on a call to {@link #get()} if the task does not complete + * or completes with an error + *

+ * note that {@link #getMaybe()} returns an absent object even in the presence of + * a default, so that any error can still be accessed */ + public ValueResolver defaultValue(T defaultValue) { + this.defaultValue = defaultValue; + this.returnDefaultOnGet = true; + return this; + } + + /** indicates that no default value should be returned on a call to {@link #get()}, and instead it should throw + * (this is the default; this method is provided to undo a call to {@link #defaultValue(Object)}) */ + public ValueResolver noDefaultValue() { + this.returnDefaultOnGet = false; + this.defaultValue = null; + return this; + } + + /** indicates that exceptions in resolution should not be thrown on a call to {@link #getMaybe()}, + * but rather used as part of the {@link Maybe#get()} if it's absent, + * and swallowed altogether on a call to {@link #get()} in the presence of a {@link #defaultValue(Object)} */ + public ValueResolver swallowExceptions() { + this.swallowExceptions = true; + return this; + } + + public Maybe getDefault() { + if (returnDefaultOnGet) return Maybe.of(defaultValue); + else return Maybe.absent("No default value set"); + } + /** causes nested structures (maps, lists) to be descended and nested unresolved values resolved */ public ValueResolver deep(boolean forceDeep) { this.forceDeep = forceDeep; @@ -145,7 +201,10 @@ protected void checkTypeNotNull() { } public T get() { - return getMaybe().get(); + Maybe m = getMaybe(); + if (m.isPresent()) return m.get(); + if (returnDefaultOnGet) return defaultValue; + return m.get(); } @SuppressWarnings({ "unchecked", "rawtypes" }) @@ -268,7 +327,17 @@ public Object call() throws Exception { } } catch (Exception e) { - throw new IllegalArgumentException("Error resolving "+(description!=null ? description+", " : "")+v+", in "+exec+": "+e, e); + Exceptions.propagateIfFatal(e); + + IllegalArgumentException problem = new IllegalArgumentException("Error resolving "+(description!=null ? description+", " : "")+v+", in "+exec+": "+e, e); + if (swallowExceptions) { + if (log.isDebugEnabled()) + log.debug("Resolution of "+this+" failed, swallowing and returning: "+e); + return Maybe.absent(problem); + } + if (log.isDebugEnabled()) + log.debug("Resolution of "+this+" failed, throwing: "+e); + throw problem; } return new ValueResolver(v, type, this).getMaybe(); diff --git a/core/src/test/java/brooklyn/util/task/TasksTest.java b/core/src/test/java/brooklyn/util/task/TasksTest.java index 8ba61e06cb..9ee9970016 100644 --- a/core/src/test/java/brooklyn/util/task/TasksTest.java +++ b/core/src/test/java/brooklyn/util/task/TasksTest.java @@ -28,8 +28,13 @@ import org.testng.annotations.Test; import brooklyn.entity.BrooklynAppUnitTestSupport; +import brooklyn.entity.basic.EntityFunctions; import brooklyn.management.ExecutionContext; +import brooklyn.management.Task; import brooklyn.test.entity.TestApplication; +import brooklyn.test.entity.TestEntity; +import brooklyn.util.guava.Functionals; +import brooklyn.util.time.Duration; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -102,4 +107,24 @@ private void assertResolvesValue(Object actual, Class type, Object expected) Object result = Tasks.resolveValue(actual, type, executionContext); assertEquals(result, expected); } + + @Test + public void testErrorsResolvingPropagatesOrSwallowedAllCorrectly() throws Exception { + app.setConfig(TestEntity.CONF_OBJECT, ValueResolverTest.newThrowTask(Duration.ZERO)); + Task t = Tasks.builder().body(Functionals.callable(EntityFunctions.config(TestEntity.CONF_OBJECT), app)).build(); + ValueResolver v = Tasks.resolving(t).as(Object.class).context(app.getExecutionContext()); + + ValueResolverTest.assertThrowsOnMaybe(v); + ValueResolverTest.assertThrowsOnGet(v); + + v.swallowExceptions(); + ValueResolverTest.assertMaybeIsAbsent(v); + ValueResolverTest.assertThrowsOnGet(v); + + v.defaultValue("foo"); + ValueResolverTest.assertMaybeIsAbsent(v); + assertEquals(v.clone().get(), "foo"); + assertResolvesValue(v, Object.class, "foo"); + } + } diff --git a/core/src/test/java/brooklyn/util/task/ValueResolverTest.java b/core/src/test/java/brooklyn/util/task/ValueResolverTest.java index 43c156c915..9f4b2e1ba6 100644 --- a/core/src/test/java/brooklyn/util/task/ValueResolverTest.java +++ b/core/src/test/java/brooklyn/util/task/ValueResolverTest.java @@ -55,6 +55,15 @@ public String call() { ).build(); } + public static final Task newThrowTask(final Duration timeout) { + return Tasks.builder().body(new Callable() { + public String call() { + Time.sleep(timeout); + throw new IllegalStateException("intended, during tests"); + }} + ).build(); + } + public void testTimeoutZero() { Maybe result = Tasks.resolving(newSleepTask(Duration.TEN_SECONDS, "foo")).as(String.class).context(executionContext).timeout(Duration.ZERO).getMaybe(); Assert.assertFalse(result.isPresent()); @@ -72,4 +81,52 @@ public void testNoExecutionContextOnCompleted() { Assert.assertEquals(result.get(), "foo"); } + public static Throwable assertThrowsOnMaybe(ValueResolver result) { + try { + result = result.clone(); + result.getMaybe(); + Assert.fail("should have thrown"); + return null; + } catch (Exception e) { return e; } + } + public static Throwable assertThrowsOnGet(ValueResolver result) { + result = result.clone(); + try { + result.get(); + Assert.fail("should have thrown"); + return null; + } catch (Exception e) { return e; } + } + public static Maybe assertMaybeIsAbsent(ValueResolver result) { + result = result.clone(); + Maybe maybe = result.getMaybe(); + Assert.assertFalse(maybe.isPresent()); + return maybe; + } + + public void testSwallowError() { + ValueResolver result = Tasks.resolving(newThrowTask(Duration.ZERO)).as(String.class).context(executionContext).swallowExceptions(); + assertMaybeIsAbsent(result); + assertThrowsOnGet(result); + } + + + public void testDontSwallowError() { + ValueResolver result = Tasks.resolving(newThrowTask(Duration.ZERO)).as(String.class).context(executionContext); + assertThrowsOnMaybe(result); + assertThrowsOnGet(result); + } + + public void testDefaultWhenSwallowError() { + ValueResolver result = Tasks.resolving(newThrowTask(Duration.ZERO)).as(String.class).context(executionContext).swallowExceptions().defaultValue("foo"); + assertMaybeIsAbsent(result); + Assert.assertEquals(result.get(), "foo"); + } + + public void testDefaultBeforeDelayAndError() { + ValueResolver result = Tasks.resolving(newThrowTask(Duration.TEN_SECONDS)).as(String.class).context(executionContext).timeout(Duration.ZERO).defaultValue("foo"); + assertMaybeIsAbsent(result); + Assert.assertEquals(result.get(), "foo"); + } + } diff --git a/utils/common/src/main/java/brooklyn/util/guava/Functionals.java b/utils/common/src/main/java/brooklyn/util/guava/Functionals.java index 6688babd8a..dcbd9daa55 100644 --- a/utils/common/src/main/java/brooklyn/util/guava/Functionals.java +++ b/utils/common/src/main/java/brooklyn/util/guava/Functionals.java @@ -26,6 +26,7 @@ import com.google.common.base.Functions; import com.google.common.base.Predicate; import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; public class Functionals { @@ -122,5 +123,8 @@ public T call() { } return new SupplierAsCallable(); } + public static Callable callable(Function f, T x) { + return callable(Suppliers.compose(f, Suppliers.ofInstance(x))); + } } From 189f426e60601b64f4c04411121d095b3713e4ff Mon Sep 17 00:00:00 2001 From: Alex Heneveld Date: Tue, 16 Sep 2014 21:12:39 +0100 Subject: [PATCH 2/4] introduce WeaklyPresent in maybe using weak references, and use that for streams so that large streams attached to tasks don't cause leaks; also use a strongly typed tag for effector tasks, including entity information --- .../entity/basic/BrooklynTaskTags.java | 92 +++++++++++++++---- .../management/internal/EffectorUtils.java | 2 +- .../util/task/system/ProcessTaskWrapper.java | 4 +- .../entity/basic/lifecycle/ScriptHelper.java | 6 +- .../BrooklynEntityMirrorImpl.java | 2 +- .../main/java/brooklyn/util/guava/Maybe.java | 41 +++++++++ .../java/brooklyn/util/stream/Streams.java | 10 ++ 7 files changed, 131 insertions(+), 26 deletions(-) diff --git a/core/src/main/java/brooklyn/entity/basic/BrooklynTaskTags.java b/core/src/main/java/brooklyn/entity/basic/BrooklynTaskTags.java index cbc4ca664b..fe573a7313 100644 --- a/core/src/main/java/brooklyn/entity/basic/BrooklynTaskTags.java +++ b/core/src/main/java/brooklyn/entity/basic/BrooklynTaskTags.java @@ -34,12 +34,14 @@ import brooklyn.entity.Entity; import brooklyn.management.ExecutionManager; import brooklyn.management.Task; +import brooklyn.util.guava.Maybe; import brooklyn.util.stream.Streams; import brooklyn.util.task.TaskTags; import brooklyn.util.task.Tasks; import brooklyn.util.text.StringEscapes.BashStringEscapes; import brooklyn.util.text.Strings; +import com.google.common.base.Functions; import com.google.common.base.Objects; import com.google.common.base.Preconditions; import com.google.common.base.Supplier; @@ -86,6 +88,7 @@ public int hashCode() { } @Override public boolean equals(Object obj) { + if (this==obj) return true; if (!(obj instanceof WrappedEntity)) return false; return Objects.equal(entity, ((WrappedEntity)obj).entity) && @@ -202,6 +205,14 @@ public boolean equals(Object obj) { public static WrappedStream tagForStream(String streamType, ByteArrayOutputStream stream) { return new WrappedStream(streamType, stream); } + /** creates a tag suitable for marking a stream available on a task, but which might be GC'd */ + public static WrappedStream tagForStreamWeak(String streamType, ByteArrayOutputStream stream) { + Maybe weakStream = Maybe.weakThen(stream, + Maybe.of(Streams.byteArrayOfString(""))); + return new WrappedStream(streamType, + Suppliers.compose(Functions.toStringFunction(), weakStream), + Suppliers.compose(Streams.sizeFunction(), weakStream)); + } /** creates a tag suitable for marking a stream available on a task */ public static WrappedStream tagForStream(String streamType, Supplier contents, Supplier size) { @@ -241,18 +252,47 @@ public static WrappedStream stream(Task task, String streamType) { // ------ misc - public static void setTransient(Task task) { - addTagDynamically(task, TRANSIENT_TASK_TAG); - } + public static void setInessential(Task task) { addTagDynamically(task, INESSENTIAL_TASK); } + public static void setTransient(Task task) { addTagDynamically(task, TRANSIENT_TASK_TAG); } + public static boolean isTransient(Task task) { return hasTag(task, TRANSIENT_TASK_TAG); } + public static boolean isSubTask(Task task) { return hasTag(task, SUB_TASK_TAG); } + public static boolean isEffectorTask(Task task) { return hasTag(task, EFFECTOR_TAG); } - public static void setInessential(Task task) { - addTagDynamically(task, INESSENTIAL_TASK); - } - // ------ effector tags - public static String tagForEffectorName(String name) { - return EFFECTOR_TAG+":"+name; + public static class EffectorCallTag { + protected final String entityId; + protected final String effectorName; + protected EffectorCallTag(String entityId, String effectorName) { + this.entityId = entityId; + this.effectorName = effectorName; + } + public String toString() { + return EFFECTOR_TAG+"@"+entityId+":"+effectorName; + } + @Override + public int hashCode() { + return Objects.hashCode(entityId, effectorName); + } + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (!(obj instanceof EffectorCallTag)) return false; + EffectorCallTag other = (EffectorCallTag) obj; + return + Objects.equal(entityId, other.entityId) && + Objects.equal(effectorName, other.effectorName); + } + public String getEntityId() { + return entityId; + } + public String getEffectorName() { + return effectorName; + } + } + + public static EffectorCallTag tagForEffectorCall(Entity entity, String effectorName) { + return new EffectorCallTag(entity.getId(), effectorName); } /** @@ -269,29 +309,43 @@ public static boolean isInEffectorTask(Task task, @Nullable Entity entity, @N while (t!=null) { Set tags = t.getTags(); if (tags.contains(EFFECTOR_TAG)) { - boolean match = true; - if (entity!=null && !entity.equals(getTargetOrContextEntity(t))) - match = false; - if (effector!=null && !tags.contains(tagForEffectorName(effector.getName()))) - match = false; - if (match) return true; + for (Object tag: tags) { + if (tag instanceof EffectorCallTag) { + EffectorCallTag et = (EffectorCallTag)tag; + if (entity!=null && !et.entityId.equals(entity.getId())) + continue; + if (effector!=null && !et.effectorName.equals(effector.getName())) + continue; + return true; + } + } if (!allowNestedEffectorCalls) return false; } t = t.getSubmittedByTask(); } return false; } - - public static String getEffectorName(Task task) { + + /** finds the first {@link EffectorCallTag} tag on this tag, or optionally on submitters, or null */ + public static EffectorCallTag getEffectorCallTag(Task task, boolean recurse) { Task t = task; while (t!=null) { for (Object tag: task.getTags()) { - if (tag instanceof String && ((String)tag).startsWith(EFFECTOR_TAG+":")) - return Strings.removeFromStart((String)tag, EFFECTOR_TAG+":"); + if (tag instanceof EffectorCallTag) + return (EffectorCallTag)tag; } + if (!recurse) + return null; t = t.getSubmittedByTask(); } return null; } + /** finds the first {@link EffectorCallTag} tag on this tag or a submitter, and returns the effector name */ + public static String getEffectorName(Task task) { + EffectorCallTag result = getEffectorCallTag(task, true); + if (result==null) return null; + return result.effectorName; + } + } diff --git a/core/src/main/java/brooklyn/management/internal/EffectorUtils.java b/core/src/main/java/brooklyn/management/internal/EffectorUtils.java index ac813dbe2b..39bbcd1415 100644 --- a/core/src/main/java/brooklyn/management/internal/EffectorUtils.java +++ b/core/src/main/java/brooklyn/management/internal/EffectorUtils.java @@ -322,7 +322,7 @@ public static Map getTaskFlagsForEffectorInvocation(Entity entity .put("displayName", effector.getName()) .put("tags", MutableList.of( BrooklynTaskTags.EFFECTOR_TAG, - BrooklynTaskTags.tagForEffectorName(effector.getName()), + BrooklynTaskTags.tagForEffectorCall(entity, effector.getName()), BrooklynTaskTags.tagForTargetEntity(entity))) .build(); } diff --git a/core/src/main/java/brooklyn/util/task/system/ProcessTaskWrapper.java b/core/src/main/java/brooklyn/util/task/system/ProcessTaskWrapper.java index db4f0804eb..9910bad69f 100644 --- a/core/src/main/java/brooklyn/util/task/system/ProcessTaskWrapper.java +++ b/core/src/main/java/brooklyn/util/task/system/ProcessTaskWrapper.java @@ -55,8 +55,8 @@ public abstract class ProcessTaskWrapper extends ProcessTaskStub implements protected ProcessTaskWrapper(AbstractProcessTaskFactory constructor) { super(constructor); TaskBuilder tb = constructor.constructCustomizedTaskBuilder(); - if (stdout!=null) tb.tag(BrooklynTaskTags.tagForStream(BrooklynTaskTags.STREAM_STDOUT, stdout)); - if (stderr!=null) tb.tag(BrooklynTaskTags.tagForStream(BrooklynTaskTags.STREAM_STDERR, stderr)); + if (stdout!=null) tb.tag(BrooklynTaskTags.tagForStreamWeak(BrooklynTaskTags.STREAM_STDOUT, stdout)); + if (stderr!=null) tb.tag(BrooklynTaskTags.tagForStreamWeak(BrooklynTaskTags.STREAM_STDERR, stderr)); task = (Task) tb.body(new ProcessTaskInternalJob()).build(); } diff --git a/software/base/src/main/java/brooklyn/entity/basic/lifecycle/ScriptHelper.java b/software/base/src/main/java/brooklyn/entity/basic/lifecycle/ScriptHelper.java index e60e1d781d..a0dafe8a3b 100644 --- a/software/base/src/main/java/brooklyn/entity/basic/lifecycle/ScriptHelper.java +++ b/software/base/src/main/java/brooklyn/entity/basic/lifecycle/ScriptHelper.java @@ -269,7 +269,7 @@ public Integer call() throws Exception { stdin.write(line.getBytes()); stdin.write("\n".getBytes()); } - tb.tag(BrooklynTaskTags.tagForStream(BrooklynTaskTags.STREAM_STDIN, stdin)); + tb.tag(BrooklynTaskTags.tagForStreamWeak(BrooklynTaskTags.STREAM_STDIN, stdin)); } catch (IOException e) { log.warn("Error registering stream "+BrooklynTaskTags.STREAM_STDIN+" on "+tb+": "+e, e); } @@ -283,9 +283,9 @@ public Integer call() throws Exception { if (gatherOutput) { stdout = new ByteArrayOutputStream(); - tb.tag(BrooklynTaskTags.tagForStream(BrooklynTaskTags.STREAM_STDOUT, stdout)); + tb.tag(BrooklynTaskTags.tagForStreamWeak(BrooklynTaskTags.STREAM_STDOUT, stdout)); stderr = new ByteArrayOutputStream(); - tb.tag(BrooklynTaskTags.tagForStream(BrooklynTaskTags.STREAM_STDERR, stderr)); + tb.tag(BrooklynTaskTags.tagForStreamWeak(BrooklynTaskTags.STREAM_STDERR, stderr)); } task = tb.build(); if (isTransient) BrooklynTaskTags.setTransient(task); diff --git a/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynEntityMirrorImpl.java b/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynEntityMirrorImpl.java index fb87f39df8..254ac9bde4 100644 --- a/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynEntityMirrorImpl.java +++ b/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynEntityMirrorImpl.java @@ -176,7 +176,7 @@ public static byte[] submit(HttpClient client, URI uri, Map args) Exceptions.propagateIfFatal(e); throw new IllegalStateException("Invalid response invoking "+uri+": "+e, e); } - Tasks.addTagDynamically(BrooklynTaskTags.tagForStream("http_response", Streams.byteArray(content))); + Tasks.addTagDynamically(BrooklynTaskTags.tagForStreamWeak("http_response", Streams.byteArray(content))); if (!HttpTool.isStatusCodeHealthy(result.getResponseCode())) { log.warn("Invalid response invoking "+uri+": response code "+result.getResponseCode()+"\n"+result+": "+new String(content)); throw new IllegalStateException("Invalid response invoking "+uri+": response code "+result.getResponseCode()); diff --git a/utils/common/src/main/java/brooklyn/util/guava/Maybe.java b/utils/common/src/main/java/brooklyn/util/guava/Maybe.java index fb60bf8a9d..f4f8d13642 100644 --- a/utils/common/src/main/java/brooklyn/util/guava/Maybe.java +++ b/utils/common/src/main/java/brooklyn/util/guava/Maybe.java @@ -19,10 +19,12 @@ package brooklyn.util.guava; import java.io.Serializable; +import java.lang.ref.WeakReference; import java.util.Collections; import java.util.Iterator; import java.util.Set; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import brooklyn.util.javalang.JavaClassNames; @@ -69,6 +71,16 @@ public static Maybe of(@Nullable T value) { return new Present(value); } + /** creates an instance wrapping a {@link WeakReference}, so it might go absent later on */ + public static Maybe weak(T value) { + return weakThen(value, null); + } + /** creates an instance wrapping a {@link WeakReference}, using the second item given if lost */ + public static Maybe weakThen(T value, Maybe ifEmpty) { + if (value==null) return of((T)null); + return new WeaklyPresent(value).usingAfterExpiry(ifEmpty); + } + /** like {@link Optional#fromNullable(Object)}, returns absent if the argument is null */ public static Maybe fromNullable(@Nullable T value) { if (value==null) return absent(); @@ -193,6 +205,35 @@ public T get() { } } + public static class WeaklyPresent extends Maybe { + private static final long serialVersionUID = 436799990500336015L; + private final WeakReference value; + private Maybe defaultValue; + protected WeaklyPresent(@Nonnull T value) { + this.value = new WeakReference(value); + } + @Override + public T get() { + T result = value.get(); + if (result==null) { + if (defaultValue==null) throw new IllegalStateException("Weakly present item has been GC'd"); + return defaultValue.get(); + } + return result; + } + @Override + public boolean isPresent() { + return value.get()!=null || (defaultValue!=null && defaultValue.isPresent()); + } + public Maybe solidify() { + return Maybe.fromNullable(value.get()); + } + WeaklyPresent usingAfterExpiry(Maybe defaultValue) { + this.defaultValue = defaultValue; + return this; + } + } + @Override public String toString() { return JavaClassNames.simpleClassName(this)+"["+(isPresent()?"value="+get():"")+"]"; diff --git a/utils/common/src/main/java/brooklyn/util/stream/Streams.java b/utils/common/src/main/java/brooklyn/util/stream/Streams.java index 3e62e6b7e3..d7f0a7d4a2 100644 --- a/utils/common/src/main/java/brooklyn/util/stream/Streams.java +++ b/utils/common/src/main/java/brooklyn/util/stream/Streams.java @@ -38,6 +38,7 @@ import com.google.common.annotations.Beta; import com.google.common.base.Charsets; +import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.base.Supplier; import com.google.common.io.ByteStreams; @@ -139,6 +140,15 @@ public Integer get() { }; } + public static Function sizeFunction() { + return new Function() { + @Override + public Integer apply(ByteArrayOutputStream input) { + return input.size(); + } + }; + } + public static ByteArrayOutputStream byteArrayOfString(String in) { return byteArray(in.getBytes(Charsets.UTF_8)); } From 87596aa11587027956d28648ab6e4f918069e678 Mon Sep 17 00:00:00 2001 From: Alex Heneveld Date: Tue, 16 Sep 2014 21:40:09 +0100 Subject: [PATCH 3/4] big change to garbage collection, doing a much better job of keeping a balanced history of tasks -- the primary limit is now on tag, each tag is allowed to have by default 50 tasks; coupled with new EffectorCalllTag for entity+effector_name, this means we keep effector calls much longer. (the overall defaults have been increased also, for history to 30 days, and a new per-entity limit of 1000 tasks.) other miscellaneous cleanup to tasks also, fixing the leak for DST internal jobs, and simplifying how task-completion callbacks work (so attempting to get() a result in a callback will not deadlock!) --- .../brooklyn/entity/basic/ConfigKeys.java | 21 +- .../internal/BrooklynGarbageCollector.java | 478 ++++++++++++++---- .../util/task/BasicExecutionManager.java | 69 ++- .../java/brooklyn/util/task/BasicTask.java | 46 +- .../util/task/DynamicSequentialTask.java | 5 +- .../brooklyn/util/task/ExecutionListener.java | 5 + .../brooklyn/util/task/ForwardingTask.java | 8 +- .../brooklyn/util/task/ScheduledTask.java | 20 +- .../java/brooklyn/util/task/TaskInternal.java | 9 +- .../java/brooklyn/util/task/TaskTags.java | 6 +- .../internal/EntityExecutionManagerTest.java | 211 ++++++-- .../util/task/BasicTaskExecutionTest.java | 4 +- .../launcher/config/BrooklynGlobalConfig.java | 7 +- .../java/brooklyn/util/time/Duration.java | 18 + .../java/brooklyn/util/time/DurationTest.java | 8 + 15 files changed, 708 insertions(+), 207 deletions(-) diff --git a/core/src/main/java/brooklyn/entity/basic/ConfigKeys.java b/core/src/main/java/brooklyn/entity/basic/ConfigKeys.java index b79f4b79dc..83c877cd65 100644 --- a/core/src/main/java/brooklyn/entity/basic/ConfigKeys.java +++ b/core/src/main/java/brooklyn/entity/basic/ConfigKeys.java @@ -33,6 +33,7 @@ import brooklyn.event.basic.PortAttributeSensorAndConfigKey; import brooklyn.util.config.ConfigBag; import brooklyn.util.text.Strings; +import brooklyn.util.time.Duration; import com.google.common.base.CaseFormat; import com.google.common.base.Preconditions; @@ -154,11 +155,9 @@ public static ConfigKey convert(ConfigKey key, CaseFormat inputCaseStr public static ConfigKey newStringConfigKey(String name) { return newConfigKey(String.class, name); } - public static ConfigKey newStringConfigKey(String name, String description) { return newConfigKey(String.class, name, description); } - public static ConfigKey newStringConfigKey(String name, String description, String defaultValue) { return newConfigKey(String.class, name, description, defaultValue); } @@ -166,11 +165,9 @@ public static ConfigKey newStringConfigKey(String name, String descripti public static ConfigKey newIntegerConfigKey(String name) { return newConfigKey(Integer.class, name); } - public static ConfigKey newIntegerConfigKey(String name, String description) { return newConfigKey(Integer.class, name, description); } - public static ConfigKey newIntegerConfigKey(String name, String description, Integer defaultValue) { return newConfigKey(Integer.class, name, description, defaultValue); } @@ -178,11 +175,9 @@ public static ConfigKey newIntegerConfigKey(String name, String descrip public static ConfigKey newLongConfigKey(String name) { return newConfigKey(Long.class, name); } - public static ConfigKey newLongConfigKey(String name, String description) { return newConfigKey(Long.class, name, description); } - public static ConfigKey newLongConfigKey(String name, String description, Long defaultValue) { return newConfigKey(Long.class, name, description, defaultValue); } @@ -190,11 +185,9 @@ public static ConfigKey newLongConfigKey(String name, String description, public static ConfigKey newDoubleConfigKey(String name) { return newConfigKey(Double.class, name); } - public static ConfigKey newDoubleConfigKey(String name, String description) { return newConfigKey(Double.class, name, description); } - public static ConfigKey newDoubleConfigKey(String name, String description, Double defaultValue) { return newConfigKey(Double.class, name, description, defaultValue); } @@ -202,15 +195,23 @@ public static ConfigKey newDoubleConfigKey(String name, String descripti public static ConfigKey newBooleanConfigKey(String name) { return newConfigKey(Boolean.class, name); } - public static ConfigKey newBooleanConfigKey(String name, String description) { return newConfigKey(Boolean.class, name, description); } - public static ConfigKey newBooleanConfigKey(String name, String description, Boolean defaultValue) { return newConfigKey(Boolean.class, name, description, defaultValue); } + public static ConfigKey newDurationConfigKey(String name) { + return newConfigKey(Duration.class, name); + } + public static ConfigKey newDurationConfigKey(String name, String description) { + return newConfigKey(Duration.class, name, description); + } + public static ConfigKey newDurationConfigKey(String name, String description, Duration defaultValue) { + return newConfigKey(Duration.class, name, description, defaultValue); + } + public static class DynamicKeys { // TODO see below diff --git a/core/src/main/java/brooklyn/management/internal/BrooklynGarbageCollector.java b/core/src/main/java/brooklyn/management/internal/BrooklynGarbageCollector.java index 870b4fee33..bf28c35377 100644 --- a/core/src/main/java/brooklyn/management/internal/BrooklynGarbageCollector.java +++ b/core/src/main/java/brooklyn/management/internal/BrooklynGarbageCollector.java @@ -18,14 +18,19 @@ */ package brooklyn.management.internal; +import java.util.Collection; import java.util.Collections; import java.util.Comparator; +import java.util.ConcurrentModificationException; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,19 +39,21 @@ import brooklyn.config.ConfigKey; import brooklyn.entity.Entity; import brooklyn.entity.basic.BrooklynTaskTags; -import brooklyn.event.basic.BasicConfigKey; +import brooklyn.entity.basic.BrooklynTaskTags.WrappedEntity; +import brooklyn.entity.basic.BrooklynTaskTags.WrappedStream; +import brooklyn.entity.basic.ConfigKeys; import brooklyn.internal.storage.BrooklynStorage; import brooklyn.location.Location; import brooklyn.management.Task; +import brooklyn.util.collections.MutableList; +import brooklyn.util.collections.MutableMap; import brooklyn.util.exceptions.Exceptions; -import brooklyn.util.exceptions.RuntimeInterruptedException; import brooklyn.util.task.BasicExecutionManager; import brooklyn.util.task.ExecutionListener; import brooklyn.util.text.Strings; +import brooklyn.util.time.Duration; -import com.google.common.base.Predicate; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; +import com.google.common.annotations.Beta; /** * Deletes record of old tasks, to prevent space leaks and the eating up of more and more memory. @@ -54,15 +61,16 @@ * The deletion policy is configurable: *
    *
  • Period - how frequently to look at the existing tasks to delete some, if required - *
  • Max tasks per tag - the maximum number of tasks to be kept for a given tag (e.g. for - * effector calls invoked on a particular entity) *
  • Max task age - the time after which a completed task will be automatically deleted - * (i.e. any task completed more than maxTaskAge+period milliseconds ago will definitely - * be deleted. + * (i.e. any root task completed more than maxTaskAge ago will be deleted) + *
  • Max tasks per - the maximum number of tasks to be kept for a given tag, + * split into categories based on what is seeming to be useful *
* - * The default is to check with a period of one minute, to keep at most 100 tasks per tag, and to - * delete old completed tasks after one day. + * The default is to check with a period of one minute, deleting tasks after 30 days, + * and keeping at most 100000 tasks in the system, + * max 1000 tasks per entity, 50 per effector within that entity, and 50 per other non-effector tag + * within that entity (or global if not attached to an entity). * * @author aled */ @@ -70,39 +78,70 @@ public class BrooklynGarbageCollector { private static final Logger LOG = LoggerFactory.getLogger(BrooklynGarbageCollector.class); - public static final ConfigKey GC_PERIOD = new BasicConfigKey( - Long.class, "brooklyn.gc.period", "the period, in millisconds, for checking if any tasks need to be deleted", 60*1000L); + public static final ConfigKey GC_PERIOD = ConfigKeys.newDurationConfigKey( + "brooklyn.gc.period", "the period, in millisconds, for checking if any tasks need to be deleted", + Duration.minutes(1)); - public static final ConfigKey DO_SYSTEM_GC = new BasicConfigKey( - Boolean.class, "brooklyn.gc.doSystemGc", "whether to periodically call System.gc()", false); + public static final ConfigKey DO_SYSTEM_GC = ConfigKeys.newBooleanConfigKey( + "brooklyn.gc.doSystemGc", "whether to periodically call System.gc()", false); - public static final ConfigKey MAX_TASKS_PER_TAG = new BasicConfigKey( - Integer.class, "brooklyn.gc.maxTasksPerTag", - "the maximum number of tasks to be kept for a given tag (e.g. for effector calls invoked on a particular entity)", - 100); + /** + * should we check for tasks which are submitted by another but backgrounded, i.e. not a child of that task? + * default to yes, despite it can be some extra loops, to make sure we GC them promptly. + * @since 0.7.0 */ + // work offender is {@link DynamicSequentialTask} internal job tracker, but it is marked + // transient so it is destroyed prompty; there may be others, however; + // but OTOH it might be expensive to check for these all the time! + @Beta + public static final ConfigKey CHECK_SUBTASK_SUBMITTERS = ConfigKeys.newBooleanConfigKey( + "brooklyn.gc.checkSubtaskSubmitters", "whether for subtasks to check the submitters", true); + + public static final ConfigKey MAX_TASKS_PER_TAG = ConfigKeys.newIntegerConfigKey( + "brooklyn.gc.maxTasksPerTag", + "the maximum number of tasks to be kept for a given tag " + + "within an execution context (e.g. entity); " + + "some broad-brush tags are excluded, and if an entity has multiple tags all tag counts must be full", + 50); - public static final ConfigKey MAX_TASK_AGE = new BasicConfigKey( - Long.class, "brooklyn.gc.maxTaskAge", + public static final ConfigKey MAX_TASKS_PER_ENTITY = ConfigKeys.newIntegerConfigKey( + "brooklyn.gc.maxTasksPerEntity", + "the maximum number of tasks to be kept for a given entity", + 1000); + + public static final ConfigKey MAX_TASKS_GLOBAL = ConfigKeys.newIntegerConfigKey( + "brooklyn.gc.maxTasksGlobal", + "the maximum number of tasks to be kept across the entire system", + 100000); + + public static final ConfigKey MAX_TASK_AGE = ConfigKeys.newDurationConfigKey( + "brooklyn.gc.maxTaskAge", "the number of milliseconds after which a completed task will be automatically deleted", - TimeUnit.DAYS.toMillis(1)); + Duration.days(30)); + + protected final static Comparator> TASKS_OLDEST_FIRST_COMPARATOR = new Comparator>() { + @Override public int compare(Task t1, Task t2) { + long end1 = t1.getEndTimeUtc(); + long end2 = t2.getEndTimeUtc(); + return (end1 < end2) ? -1 : ((end1 == end2) ? 0 : 1); + } + }; private final BasicExecutionManager executionManager; private final BrooklynStorage storage; + private final BrooklynProperties brooklynProperties; private final ScheduledExecutorService executor; - private final long gcPeriodMs; - private final int maxTasksPerTag; - private final long maxTaskAge; + private ScheduledFuture activeCollector; + + private final Duration gcPeriod; private final boolean doSystemGc; private volatile boolean running = true; - public BrooklynGarbageCollector(BrooklynProperties brooklynProperties, BasicExecutionManager executionManager, BrooklynStorage storage) { this.executionManager = executionManager; this.storage = storage; + this.brooklynProperties = brooklynProperties; - gcPeriodMs = brooklynProperties.getConfig(GC_PERIOD); - maxTasksPerTag = brooklynProperties.getConfig(MAX_TASKS_PER_TAG); - maxTaskAge = brooklynProperties.getConfig(MAX_TASK_AGE); + gcPeriod = brooklynProperties.getConfig(GC_PERIOD); doSystemGc = brooklynProperties.getConfig(DO_SYSTEM_GC); executor = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { @@ -115,35 +154,44 @@ public BrooklynGarbageCollector(BrooklynProperties brooklynProperties, BasicExec BrooklynGarbageCollector.this.onTaskDone(task); }}); - executor.scheduleWithFixedDelay( + scheduleCollector(true); + } + + protected synchronized void scheduleCollector(boolean canInterruptCurrent) { + if (activeCollector != null) activeCollector.cancel(canInterruptCurrent); + + activeCollector = executor.scheduleWithFixedDelay( new Runnable() { @Override public void run() { - try { - logUsage("brooklyn gc (before)"); - gcTasks(); - logUsage("brooklyn gc (after)"); - - if (doSystemGc) { - // Can be very useful when tracking down OOMEs etc, where a lot of tasks are executing - // Empirically observed that (on OS X jvm at least) calling twice blocks - logs a significant - // amount of memory having been released, as though a full-gc had been run. But this is highly - // dependent on the JVM implementation. - System.gc(); System.gc(); - logUsage("brooklyn gc (after system gc)"); - } - } catch (RuntimeInterruptedException e) { - throw e; // graceful shutdown - } catch (Throwable t) { - LOG.warn("Error during management-context GC", t); - throw Exceptions.propagate(t); - } + gcIteration(); } }, - gcPeriodMs, - gcPeriodMs, + gcPeriod.toMillisecondsRoundingUp(), + gcPeriod.toMillisecondsRoundingUp(), TimeUnit.MILLISECONDS); } + public void gcIteration() { + try { + logUsage("brooklyn gc (before)"); + gcTasks(); + logUsage("brooklyn gc (after)"); + + if (doSystemGc) { + // Can be very useful when tracking down OOMEs etc, where a lot of tasks are executing + // Empirically observed that (on OS X jvm at least) calling twice blocks - logs a significant + // amount of memory having been released, as though a full-gc had been run. But this is highly + // dependent on the JVM implementation. + System.gc(); System.gc(); + logUsage("brooklyn gc (after system gc)"); + } + } catch (Throwable t) { + Exceptions.propagateIfFatal(t); + LOG.warn("Error during management-context GC: "+t, t); + // previously we bailed on all errors, but I don't think we should do that -Alex + } + } + public void logUsage(String prefix) { if (LOG.isDebugEnabled()) LOG.debug(prefix+" - using "+getUsageString()); @@ -163,6 +211,7 @@ public String getUsageString() { public void shutdownNow() { running = false; + if (activeCollector != null) activeCollector.cancel(true); if (executor != null) executor.shutdownNow(); } @@ -179,21 +228,30 @@ public void onUnmanaged(Location loc) { } public void onTaskDone(Task task) { - if (shouldDeleteTask(task)) { + if (shouldDeleteTaskImmediately(task)) { executionManager.deleteTask(task); } } + /** @deprecated since 0.7.0, method moved internal until semantics are clarified; see also {@link #shouldDeleteTaskImmediately(Task)} */ + @Deprecated public boolean shouldDeleteTask(Task task) { + return shouldDeleteTaskImmediately(task); + } + /** whether this task should be deleted on completion, + * because it is transient, or because it and all submitter ancestors are completed and neither an effector nor non-transient */ + protected boolean shouldDeleteTaskImmediately(Task task) { + if (!task.isDone()) return false; + Set tags = task.getTags(); if (tags.contains(ManagementContextInternal.TRANSIENT_TASK_TAG)) return true; if (tags.contains(ManagementContextInternal.EFFECTOR_TAG) || tags.contains(ManagementContextInternal.NON_TRANSIENT_TASK_TAG)) return false; - if (task.getSubmittedByTask()!=null && !shouldDeleteTask(task.getSubmittedByTask())) + if (task.getSubmittedByTask()!=null && !shouldDeleteTaskImmediately(task.getSubmittedByTask())) return false; // e.g. scheduled tasks, sensor events, etc - // TODO (in future may keep these for a shorter duration, e.g. to investigate) + // TODO (in future may keep some of these with another limit, based on a new TagCategory) return true; } @@ -201,72 +259,288 @@ public boolean shouldDeleteTask(Task task) { * Deletes old tasks. The age/number of tasks to keep is controlled by fields like * {@link #maxTasksPerTag} and {@link #maxTaskAge}. */ - private void gcTasks() { - // TODO Must be careful with memory usage here : saw an OOME when copying the tasks set to sort it. - // (This happened when there were a *lot* of tasks (e.g. 100,000s) due to a crazy trigger for - // effector calls.) - // - // At that point we have potentially three copies of the list in-memory: - // - original in ExecutionManager - // - copy returned by getTasksWithTag(tag) - // - copy for sortedTasks (but at least now that only contains done tasks) - // - // It's hard to avoid fewer copies: getTasksWithTag(tag) should continue to return copy rather than - // mutable underlying list; and if we're going to sort then we need to take a copy. + protected synchronized int gcTasks() { + // TODO Must be careful with memory usage here: have seen OOME if we get crazy lots of tasks. + // hopefully the use new limits, filters, and use of live lists in some places (added Sep 2014) will help. // // An option is for getTasksWithTag(tag) to return an ArrayList rather than a LinkedHashSet. That // is a far more memory efficient data structure (e.g. 4 bytes overhead per object rather than // 32 bytes overhead per object for HashSet). + // + // More notes on optimization is in the history of this file. + + if (!running) return 0; + + Duration newPeriod = brooklynProperties.getConfig(GC_PERIOD); + if (!gcPeriod.equals(newPeriod)) { + // caller has changed period, reschedule on next run + scheduleCollector(false); + } + + expireAgedTasks(); - if (!running) return; + // now look at overcapacity tags, non-entity tags first Set taskTags = executionManager.getTaskTags(); + + int maxTasksPerEntity = brooklynProperties.getConfig(MAX_TASKS_PER_ENTITY); + int maxTasksPerTag = brooklynProperties.getConfig(MAX_TASKS_PER_TAG); + + Map taskNonEntityTagsOverCapacity = MutableMap.of(); + Map taskEntityTagsOverCapacity = MutableMap.of(); + + Map taskAllTagsOverCapacity = MutableMap.of(); + for (Object tag : taskTags) { - if (tag == null || tag.equals(ManagementContextInternal.EFFECTOR_TAG) - || tag.equals(ManagementContextInternal.SUB_TASK_TAG) - || tag.equals(ManagementContextInternal.NON_TRANSIENT_TASK_TAG) - || tag.equals(ManagementContextInternal.TRANSIENT_TASK_TAG)) { - continue; // there'll be other tags - } + if (isTagIgnoredForGc(tag)) continue; - // For each tag, we find the tasks with that tag; we ignore sub-tasks - // as those will be automatically GC'ed by the parent task at the appropriate point. - Set> tasksWithTag = executionManager.getTasksWithTag(tag); + Set> tasksWithTag = executionManager.tasksWithTagLiveOrNull(tag); + if (tasksWithTag==null) continue; + AtomicInteger overA = null; + if (tag instanceof WrappedEntity) { + int over = tasksWithTag.size() - maxTasksPerEntity; + if (over>0) { + overA = new AtomicInteger(over); + taskEntityTagsOverCapacity.put(tag, overA); + } + } else { + int over = tasksWithTag.size() - maxTasksPerTag; + if (over>0) { + overA = new AtomicInteger(over); + taskNonEntityTagsOverCapacity.put(tag, overA); + } + } + if (overA!=null) { + taskAllTagsOverCapacity.put(tag, overA); + } + } + + int deletedCount = 0; + deletedCount += expireOverCapacityTagsInCategory(taskNonEntityTagsOverCapacity, taskAllTagsOverCapacity, TagCategory.NON_ENTITY_NORMAL, false); + deletedCount += expireOverCapacityTagsInCategory(taskEntityTagsOverCapacity, taskAllTagsOverCapacity, TagCategory.ENTITY, true); + deletedCount += expireSubTasksWhoseSubmitterIsExpired(); + + int deletedGlobally = expireIfOverCapacityGlobally(); + deletedCount += deletedGlobally; + if (deletedGlobally>0) deletedCount += expireSubTasksWhoseSubmitterIsExpired(); + + return deletedCount; + } + + protected static boolean isTagIgnoredForGc(Object tag) { + if (tag == null) return true; + if (tag.equals(ManagementContextInternal.EFFECTOR_TAG)) return true; + if (tag.equals(ManagementContextInternal.SUB_TASK_TAG)) return true; + if (tag.equals(ManagementContextInternal.NON_TRANSIENT_TASK_TAG)) return true; + if (tag.equals(ManagementContextInternal.TRANSIENT_TASK_TAG)) return true; + if (tag instanceof WrappedStream) { + return true; + } + + return false; + } + + protected void expireAgedTasks() { + Duration maxTaskAge = brooklynProperties.getConfig(MAX_TASK_AGE); + + Collection> allTasks = executionManager.allTasksLive(); + Collection> tasksToDelete = MutableList.of(); + + try { + for (Task task: allTasks) { + if (!task.isDone()) continue; + if (BrooklynTaskTags.isSubTask(task)) continue; - Iterable> topTasksWithTag = Iterables.filter(tasksWithTag, new Predicate>() { - @Override public boolean apply(Task input) { - return input != null && input.isDone() && !input.getTags().contains(ManagementContextInternal.SUB_TASK_TAG); - }}); + if (maxTaskAge.isShorterThan(Duration.sinceUtc(task.getEndTimeUtc()))) + tasksToDelete.add(task); + } + + } catch (ConcurrentModificationException e) { + // delete what we've found so far + LOG.debug("Got CME inspecting aged tasks, with "+tasksToDelete.size()+" found for deletion: "+e); + } + + for (Task task: tasksToDelete) { + executionManager.deleteTask(task); + } + } + + protected int expireSubTasksWhoseSubmitterIsExpired() { + // ideally we wouldn't have this; see comments on CHECK_SUBTASK_SUBMITTERS + if (!brooklynProperties.getConfig(CHECK_SUBTASK_SUBMITTERS)) + return 0; + + Collection> allTasks = executionManager.allTasksLive(); + Collection> tasksToDelete = MutableList.of(); + try { + for (Task task: allTasks) { + if (!task.isDone()) continue; + Task submitter = task.getSubmittedByTask(); + // if we've leaked, ie a subtask which is not a child task, + // and the submitter is GC'd, then delete this also + if (submitter!=null && submitter.isDone() && executionManager.getTask(submitter.getId())==null) { + tasksToDelete.add(task); + } + } - int numTasksToDelete = (Iterables.size(topTasksWithTag) - maxTasksPerTag); - if (numTasksToDelete > 0 || maxTaskAge > 0) { - List> sortedTasks = Lists.newArrayList(topTasksWithTag); - topTasksWithTag = null; - tasksWithTag = null; + } catch (ConcurrentModificationException e) { + // delete what we've found so far + LOG.debug("Got CME inspecting aged tasks, with "+tasksToDelete.size()+" found for deletion: "+e); + } + + for (Task task: tasksToDelete) { + executionManager.deleteTask(task); + } + return tasksToDelete.size(); + } + + protected enum TagCategory { + ENTITY, NON_ENTITY_NORMAL; + + public boolean acceptsTag(Object tag) { + if (isTagIgnoredForGc(tag)) return false; + if (tag instanceof WrappedEntity) return this==ENTITY; + if (this==ENTITY) return false; + return true; + } + } + + + /** expires tasks which are over-capacity in all their non-entity tag categories, returned count */ + protected int expireOverCapacityTagsInCategory(Map taskTagsInCategoryOverCapacity, Map taskAllTagsOverCapacity, TagCategory category, boolean emptyFilterNeeded) { + if (emptyFilterNeeded) { + // previous run may have decremented counts + MutableList nowOkayTags = MutableList.of(); + for (Map.Entry entry: taskTagsInCategoryOverCapacity.entrySet()) { + if (entry.getValue().get()<=0) nowOkayTags.add(entry.getKey()); + } + for (Object tag: nowOkayTags) taskTagsInCategoryOverCapacity.remove(tag); + } + + if (taskTagsInCategoryOverCapacity.isEmpty()) + return 0; + + Collection> tasks = executionManager.allTasksLive(); + List> tasksToConsiderDeleting = MutableList.of(); + try { + for (Task task: tasks) { + if (!task.isDone()) continue; - Collections.sort(sortedTasks, new Comparator>() { - @Override public int compare(Task t1, Task t2) { - // know that tasks are all done; filtered those above - long end1 = t1.getEndTimeUtc(); - long end2 = t2.getEndTimeUtc(); - return (end1 < end2) ? -1 : ((end1 == end2) ? 0 : 1); - } - }); - if (numTasksToDelete > 0) { - for (Task taskToDelete : sortedTasks.subList(0, numTasksToDelete)) { - executionManager.deleteTask(taskToDelete); + Set tags = task.getTags(); + + int categoryTags = 0, tooFullCategoryTags = 0; + for (Object tag: tags) { + if (category.acceptsTag(tag)) { + categoryTags++; + if (taskTagsInCategoryOverCapacity.containsKey(tag)) + tooFullCategoryTags++; } } - if (maxTaskAge > 0) { - for (Task taskContender : sortedTasks.subList((numTasksToDelete > 0 ? numTasksToDelete : 0), sortedTasks.size())) { - if (System.currentTimeMillis() - taskContender.getEndTimeUtc() > maxTaskAge) { - executionManager.deleteTask(taskContender); - } else { - break; // all subsequent tasks will be newer; stop looking + if (tooFullCategoryTags>0) { + if (categoryTags==tooFullCategoryTags) { + // all buckets are full, delete this one + tasksToConsiderDeleting.add(task); + } else { + // if any bucket is under capacity, then give grace to the other buckets in this category + for (Object tag: tags) { + if (category.acceptsTag(tag)) { + AtomicInteger over = taskTagsInCategoryOverCapacity.get(tag); + if (over!=null) { + if (over.decrementAndGet()<=0) { + // and remove it from over-capacity if so + taskTagsInCategoryOverCapacity.remove(tag); + if (taskTagsInCategoryOverCapacity.isEmpty()) + return 0; + } + } + } } } } } + + } catch (ConcurrentModificationException e) { + // do CME's happen with these data structures? + // if so, let's just delete what we've found so far + LOG.debug("Got CME inspecting tasks, with "+tasksToConsiderDeleting.size()+" found for deletion: "+e); } + + if (LOG.isDebugEnabled()) + LOG.debug("brooklyn-gc detected "+taskTagsInCategoryOverCapacity.size()+" "+category+" " + + "tags over capacity, expiring old tasks; " + + tasksToConsiderDeleting.size()+" tasks under consideration; categories are: " + + taskTagsInCategoryOverCapacity); + + Collections.sort(tasksToConsiderDeleting, TASKS_OLDEST_FIRST_COMPARATOR); + // now try deleting tasks which are overcapacity for each (non-entity) tag + int deleted = 0; + for (Task task: tasksToConsiderDeleting) { + boolean delete = true; + for (Object tag: task.getTags()) { + if (!category.acceptsTag(tag)) + continue; + if (taskTagsInCategoryOverCapacity.get(tag)==null) { + // no longer over capacity in this tag + delete = false; + break; + } + } + if (delete) { + // delete this and update overcapacity info + deleted++; + executionManager.deleteTask(task); + for (Object tag: task.getTags()) { + AtomicInteger counter = taskAllTagsOverCapacity.get(tag); + if (counter!=null && counter.decrementAndGet()<=0) + taskTagsInCategoryOverCapacity.remove(tag); + } + if (LOG.isTraceEnabled()) + LOG.trace("brooklyn-gc deleted "+task+", buckets now "+taskTagsInCategoryOverCapacity); + if (taskTagsInCategoryOverCapacity.isEmpty()) + break; + } + } + + if (LOG.isDebugEnabled()) + LOG.debug("brooklyn-gc deleted "+deleted+" tasks in over-capacity " + category+" tag categories; " + + "capacities now: " + taskTagsInCategoryOverCapacity); + return deleted; } + + protected int expireIfOverCapacityGlobally() { + Collection> tasksLive = executionManager.allTasksLive(); + if (tasksLive.size() <= brooklynProperties.getConfig(MAX_TASKS_GLOBAL)) + return 0; + LOG.debug("brooklyn-gc detected "+tasksLive.size()+" tasks in memory, over global limit, looking at deleting some"); + + try { + tasksLive = MutableList.copyOf(tasksLive); + } catch (ConcurrentModificationException e) { + tasksLive = executionManager.getTasksWithAllTags(MutableList.of()); + } + + MutableList> tasks = MutableList.of(); + for (Task task: tasksLive) { + if (task.isDone()) { + tasks.add(task); + } + } + + int numToDelete = tasks.size() - brooklynProperties.getConfig(MAX_TASKS_GLOBAL); + if (numToDelete <= 0) { + LOG.debug("brooklyn-gc detected only "+tasks.size()+" completed tasks in memory, not over global limit, so not deleting any"); + return 0; + } + + Collections.sort(tasks, TASKS_OLDEST_FIRST_COMPARATOR); + + int numDeleted = 0; + while (numDeleted < numToDelete && tasks.size()>numDeleted) { + executionManager.deleteTask( tasks.get(numDeleted++) ); + } + if (LOG.isDebugEnabled()) + LOG.debug("brooklyn-gc deleted "+numDeleted+" tasks as was over global limit, now have "+executionManager.allTasksLive().size()); + return numDeleted; + } + } diff --git a/core/src/main/java/brooklyn/util/task/BasicExecutionManager.java b/core/src/main/java/brooklyn/util/task/BasicExecutionManager.java index 2f6e396784..db7a05b64a 100644 --- a/core/src/main/java/brooklyn/util/task/BasicExecutionManager.java +++ b/core/src/main/java/brooklyn/util/task/BasicExecutionManager.java @@ -51,9 +51,11 @@ import brooklyn.management.HasTaskChildren; import brooklyn.management.Task; import brooklyn.management.TaskAdaptable; +import brooklyn.util.collections.MutableList; import brooklyn.util.exceptions.Exceptions; import brooklyn.util.text.Identifiers; +import com.google.common.annotations.Beta; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.CaseFormat; import com.google.common.base.Preconditions; @@ -189,7 +191,7 @@ public void deleteTask(Task task) { protected boolean deleteTaskNonRecursive(Task task) { Set tags = checkNotNull(task, "task").getTags(); for (Object tag : tags) { - Set> tasks = getMutableTasksWithTagOrNull(tag); + Set> tasks = tasksWithTagLiveOrNull(tag); if (tasks != null) tasks.remove(task); } Task removed = tasksById.remove(task.getId()); @@ -216,13 +218,15 @@ public long getNumInMemoryTasks() { return tasksById.size(); } - private Set> getMutableTasksWithTag(Object tag) { + private Set> tasksWithTagLiveNonNull(Object tag) { Preconditions.checkNotNull(tag); tasksByTag.putIfAbsent(tag, Collections.synchronizedSet(new LinkedHashSet>())); - return tasksByTag.get(tag); + return tasksWithTagLiveOrNull(tag); } - private Set> getMutableTasksWithTagOrNull(Object tag) { + /** exposes live view, for internal use only */ + @Beta + public Set> tasksWithTagLiveOrNull(Object tag) { return tasksByTag.get(tag); } @@ -231,9 +235,18 @@ public Task getTask(String id) { return tasksById.get(id); } + /** not on interface because potentially expensive */ + public List> getAllTasks() { + // not sure if synching makes any difference; have not observed CME's yet + // (and so far this is only called when a CME was caught on a previous operation) + synchronized (tasksById) { + return MutableList.copyOf(tasksById.values()); + } + } + @Override public Set> getTasksWithTag(Object tag) { - Set> result = getMutableTasksWithTag(tag); + Set> result = tasksWithTagLiveNonNull(tag); synchronized (result) { return (Set>)Collections.unmodifiableSet(new LinkedHashSet>(result)); } @@ -249,6 +262,7 @@ public Set> getTasksWithAnyTag(Iterable tags) { return Collections.unmodifiableSet(result); } + /** only works with at least one tag */ @Override public Set> getTasksWithAllTags(Iterable tags) { //NB: for this method retrieval for multiple tags could be made (much) more efficient (if/when it is used with multiple tags!) @@ -269,6 +283,10 @@ public Set> getTasksWithAllTags(Iterable tags) { return Collections.unmodifiableSet(result); } + /** live view of all tasks, for internal use only */ + @Beta + public Collection> allTasksLive() { return tasksById.values(); } + public Set getTaskTags() { return Collections.unmodifiableSet(Sets.newLinkedHashSet(tasksByTag.keySet())); } public Task submit(Runnable r) { return submit(new LinkedHashMap(1), r); } @@ -282,7 +300,7 @@ public Task submit(Map flags, TaskAdaptable task) { if (!(task instanceof Task)) task = task.asTask(); synchronized (task) { - if (((TaskInternal)task).getResult()!=null) return (Task)task; + if (((TaskInternal)task).getInternalFuture()!=null) return (Task)task; return submitNewTask(flags, (Task) task); } } @@ -290,7 +308,7 @@ public Task submit(Map flags, TaskAdaptable task) { public Task scheduleWith(Task task) { return scheduleWith(Collections.emptyMap(), task); } public Task scheduleWith(Map flags, Task task) { synchronized (task) { - if (((TaskInternal)task).getResult()!=null) return task; + if (((TaskInternal)task).getInternalFuture()!=null) return task; return submitNewTask(flags, task); } } @@ -300,10 +318,10 @@ protected Task submitNewScheduledTask(final Map flags, final ScheduledTa task.submitTimeUtc = System.currentTimeMillis(); tasksById.put(task.getId(), task); if (!task.isDone()) { - task.result = delayedRunner.schedule(new ScheduledTaskCallable(task, flags), + task.internalFuture = delayedRunner.schedule(new ScheduledTaskCallable(task, flags), task.delay.toNanoseconds(), TimeUnit.NANOSECONDS); } else { - task.endTimeUtc = System.currentTimeMillis(); + task.setEndTimeUtc(System.currentTimeMillis()); } return task; } @@ -449,8 +467,27 @@ public boolean cancel(boolean mayInterruptIfRunning) { return result; } }; - - ((TaskInternal)task).initResult(listenableFuture); + listenableFuture.addListener(new Runnable() { + @Override + public void run() { + try { + ((TaskInternal)task).runListeners(); + } catch (Exception e) { + log.warn("Error running task listeners for task "+task+" done", e); + } + + for (ExecutionListener listener : listeners) { + try { + listener.onTaskDone(task); + } catch (Exception e) { + log.warn("Error running execution listener "+listener+" of task "+task+" done", e); + } + } + } + }, runner); + + ((TaskInternal)task).initInternalFuture(listenableFuture); + return task; } @@ -465,7 +502,7 @@ protected void beforeSubmit(Map flags, Task task) { if (flags.get("tags")!=null) ((TaskInternal)task).getMutableTags().addAll((Collection)flags.remove("tags")); for (Object tag: ((TaskInternal)task).getTags()) { - getMutableTasksWithTag(tag).add(task); + tasksWithTagLiveNonNull(tag).add(task); } } @@ -502,14 +539,6 @@ protected void afterEnd(Map flags, Task task) { } ((TaskInternal)task).setThread(null); synchronized (task) { task.notifyAll(); } - - for (ExecutionListener listener : listeners) { - try { - listener.onTaskDone(task); - } catch (Exception e) { - log.warn("Error notifying listener "+listener+" of task "+task+" done", e); - } - } } public TaskScheduler getTaskSchedulerForTag(Object tag) { diff --git a/core/src/main/java/brooklyn/util/task/BasicTask.java b/core/src/main/java/brooklyn/util/task/BasicTask.java index 44f2535dfa..33ecf56ea6 100644 --- a/core/src/main/java/brooklyn/util/task/BasicTask.java +++ b/core/src/main/java/brooklyn/util/task/BasicTask.java @@ -37,6 +37,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.Future; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -90,6 +91,7 @@ public class BasicTask implements TaskInternal { protected Task blockingTask = null; Object extraStatusText = null; + /** listeners attached at task level; these are stored here, but run on the underlying ListenableFuture */ protected final ExecutionList listeners = new ExecutionList(); /** @@ -191,13 +193,14 @@ public Task asTask() { protected volatile Thread thread = null; private volatile boolean cancelled = false; - protected volatile Future result = null; + /** normally a {@link ListenableFuture}, except for scheduled tasks when it may be a {@link ScheduledFuture} */ + protected volatile Future internalFuture = null; @Override - public synchronized void initResult(ListenableFuture result) { - if (this.result != null) + public synchronized void initInternalFuture(ListenableFuture result) { + if (this.internalFuture != null) throw new IllegalStateException("task "+this+" is being given a result twice"); - this.result = result; + this.internalFuture = result; notifyAll(); } @@ -221,7 +224,7 @@ public synchronized void initResult(ListenableFuture result) { public long getEndTimeUtc() { return endTimeUtc; } @Override - public Future getResult() { return result; } + public Future getInternalFuture() { return internalFuture; } @Override public Task getSubmittedByTask() { return submittedByTask; } @@ -282,8 +285,8 @@ public synchronized boolean cancel(boolean mayInterruptIfRunning) { if (isDone()) return false; boolean cancel = true; cancelled = true; - if (result!=null) { - cancel = result.cancel(mayInterruptIfRunning); + if (internalFuture!=null) { + cancel = internalFuture.cancel(mayInterruptIfRunning); } notifyAll(); return cancel; @@ -291,12 +294,15 @@ public synchronized boolean cancel(boolean mayInterruptIfRunning) { @Override public boolean isCancelled() { - return cancelled || (result!=null && result.isCancelled()); + return cancelled || (internalFuture!=null && internalFuture.isCancelled()); } @Override public boolean isDone() { - return cancelled || (result!=null && result.isDone()); + // if endTime is set, result might not be completed yet, but it will be set very soon + // (the two values are set close in time, result right after the endTime; + // but callback hooks might not see the result yet) + return cancelled || (internalFuture!=null && internalFuture.isDone()) || endTimeUtc>0; } /** @@ -326,7 +332,7 @@ public T get() throws InterruptedException, ExecutionException { if (!isDone()) Tasks.setBlockingTask(this); blockUntilStarted(); - return result.get(); + return internalFuture.get(); } finally { Tasks.resetBlockingTask(); } @@ -351,7 +357,7 @@ public synchronized boolean blockUntilStarted(Duration timeout) { Long endTime = timeout==null ? null : System.currentTimeMillis() + timeout.toMillisecondsRoundingUp(); while (true) { if (cancelled) throw new CancellationException(); - if (result==null) + if (internalFuture==null) try { if (timeout==null) { wait(); @@ -366,7 +372,7 @@ public synchronized boolean blockUntilStarted(Duration timeout) { Thread.currentThread().interrupt(); Throwables.propagate(e); } - if (result!=null) return true; + if (internalFuture!=null) return true; } } @@ -382,11 +388,11 @@ public boolean blockUntilEnded(Duration timeout) { boolean started = blockUntilStarted(timeout); if (!started) return false; if (timeout==null) { - result.get(); + internalFuture.get(); } else { long remaining = endTime - System.currentTimeMillis(); if (remaining>0) - result.get(remaining, TimeUnit.MILLISECONDS); + internalFuture.get(remaining, TimeUnit.MILLISECONDS); } return isDone(); } catch (Throwable t) { @@ -408,22 +414,22 @@ public T get(Duration duration) throws InterruptedException, ExecutionException, Long end = duration==null ? null : start + duration.toMillisecondsRoundingUp(); while (end==null || end > System.currentTimeMillis()) { if (cancelled) throw new CancellationException(); - if (result == null) { + if (internalFuture == null) { synchronized (this) { long remaining = end - System.currentTimeMillis(); - if (result==null && remaining>0) + if (internalFuture==null && remaining>0) wait(remaining); } } - if (result != null) break; + if (internalFuture != null) break; } Long remaining = end==null ? null : end - System.currentTimeMillis(); if (isDone()) { - return result.get(1, TimeUnit.MILLISECONDS); + return internalFuture.get(1, TimeUnit.MILLISECONDS); } else if (remaining == null) { - return result.get(); + return internalFuture.get(); } else if (remaining > 0) { - return result.get(remaining, TimeUnit.MILLISECONDS); + return internalFuture.get(remaining, TimeUnit.MILLISECONDS); } else { throw new TimeoutException(); } diff --git a/core/src/main/java/brooklyn/util/task/DynamicSequentialTask.java b/core/src/main/java/brooklyn/util/task/DynamicSequentialTask.java index 70fcc2a602..d991a4b576 100644 --- a/core/src/main/java/brooklyn/util/task/DynamicSequentialTask.java +++ b/core/src/main/java/brooklyn/util/task/DynamicSequentialTask.java @@ -40,7 +40,6 @@ import brooklyn.util.exceptions.Exceptions; import brooklyn.util.time.CountdownTimer; import brooklyn.util.time.Duration; -import brooklyn.util.time.Time; import com.google.common.annotations.Beta; import com.google.common.collect.ImmutableList; @@ -241,6 +240,10 @@ public T call() throws Exception { // or use some kind of single threaded executor for the queued tasks Task> secondaryJobMaster = Tasks.>builder().dynamic(false) .name("DST manager (internal)") + // TODO marking it transient helps it be GC'd sooner, + // but ideally we wouldn't have this, + // or else it would be a child + .tag(BrooklynTaskTags.TRANSIENT_TASK_TAG) .body(new Callable>() { @Override diff --git a/core/src/main/java/brooklyn/util/task/ExecutionListener.java b/core/src/main/java/brooklyn/util/task/ExecutionListener.java index eb39d92abf..1cb7f9ff80 100644 --- a/core/src/main/java/brooklyn/util/task/ExecutionListener.java +++ b/core/src/main/java/brooklyn/util/task/ExecutionListener.java @@ -22,5 +22,10 @@ public interface ExecutionListener { + /** invoked when a task completes: + * {@link Task#getEndTimeUtc()} and {@link Task#isDone()} are guaranteed to be set, + * and {@link Task#get()} should return immediately for most Task implementations + * (care has been taken to avoid potential deadlocks here, waiting for a result!) */ public void onTaskDone(Task task); + } diff --git a/core/src/main/java/brooklyn/util/task/ForwardingTask.java b/core/src/main/java/brooklyn/util/task/ForwardingTask.java index e54ebeff55..001c1ba4d7 100644 --- a/core/src/main/java/brooklyn/util/task/ForwardingTask.java +++ b/core/src/main/java/brooklyn/util/task/ForwardingTask.java @@ -178,8 +178,8 @@ public T getUnchecked(Duration duration) { } @Override - public void initResult(ListenableFuture result) { - delegate().initResult(result); + public void initInternalFuture(ListenableFuture result) { + delegate().initInternalFuture(result); } @Override @@ -188,8 +188,8 @@ public long getQueuedTimeUtc() { } @Override - public Future getResult() { - return delegate().getResult(); + public Future getInternalFuture() { + return delegate().getInternalFuture(); } @Override diff --git a/core/src/main/java/brooklyn/util/task/ScheduledTask.java b/core/src/main/java/brooklyn/util/task/ScheduledTask.java index c6447b3856..b6b3d9d1e5 100644 --- a/core/src/main/java/brooklyn/util/task/ScheduledTask.java +++ b/core/src/main/java/brooklyn/util/task/ScheduledTask.java @@ -34,15 +34,15 @@ import com.google.common.base.Throwables; +/** + * A task which runs with a fixed period. + *

+ * Note that some termination logic, including {@link #addListener(Runnable, java.util.concurrent.Executor)}, + * is not precisely defined. + */ +// TODO ScheduledTask is a very pragmatic implementation; would be nice to tighten, +// reduce external assumptions about internal structure, and clarify "done" semantics public class ScheduledTask extends BasicTask { - - // TODO It looks like now groovy callers construct ScheduledTask with these values in the map constructor. - // How does that work in pure-java!? - // Adding builder-like-setters so can be used in Java. But would be much nicer if this was immutable. - // Previous (out-of-date?) todo was: - // See BasicExecutionManager.submitNewScheduledTask for where these fields are actually set. - // Would be nice if the scheduledTask was more self-contained, rather than its fields being - // modified by a different class in a non-obvious way... final Callable> taskFactory; /** initial delay before running, set as flag in constructor; defaults to 0 */ @@ -58,7 +58,7 @@ public class ScheduledTask extends BasicTask { protected Task recentRun, nextRun; public int getRunCount() { return runCount; } - public ScheduledFuture getNextScheduled() { return (ScheduledFuture)result; } + public ScheduledFuture getNextScheduled() { return (ScheduledFuture)internalFuture; } public ScheduledTask(Callable> taskFactory) { this(MutableMap.of(), taskFactory); @@ -160,6 +160,6 @@ public void blockUntilEnded() { public Object get() throws InterruptedException, ExecutionException { blockUntilStarted(); blockUntilFirstScheduleStarted(); - return (truth(recentRun)) ? recentRun.get() : result.get(); + return (truth(recentRun)) ? recentRun.get() : internalFuture.get(); } } diff --git a/core/src/main/java/brooklyn/util/task/TaskInternal.java b/core/src/main/java/brooklyn/util/task/TaskInternal.java index d3a4ce88c9..222164b177 100644 --- a/core/src/main/java/brooklyn/util/task/TaskInternal.java +++ b/core/src/main/java/brooklyn/util/task/TaskInternal.java @@ -20,6 +20,7 @@ import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import brooklyn.management.ExecutionManager; @@ -44,14 +45,16 @@ @Beta public interface TaskInternal extends Task { - void initResult(ListenableFuture result); + /** sets the internal future object used to record the association to a job submitted to an {@link ExecutorService} */ + void initInternalFuture(ListenableFuture result); + /** returns the underlying future where this task's results will come in; see {@link #initInternalFuture(ListenableFuture)} */ + Future getInternalFuture(); + /** if the job is queued for submission (e.g. by another task) it can indicate that fact (and time) here; * note tasks can (and often are) submitted without any queueing, in which case this value may be -1 */ long getQueuedTimeUtc(); - Future getResult(); - boolean isQueuedOrSubmitted(); boolean isQueuedAndNotSubmitted(); boolean isQueued(); diff --git a/core/src/main/java/brooklyn/util/task/TaskTags.java b/core/src/main/java/brooklyn/util/task/TaskTags.java index dc71e56200..624bae54fd 100644 --- a/core/src/main/java/brooklyn/util/task/TaskTags.java +++ b/core/src/main/java/brooklyn/util/task/TaskTags.java @@ -56,7 +56,11 @@ public Void apply(@Nullable Set input) { public static boolean isInessential(Task task) { - return task.getTags().contains(INESSENTIAL_TASK); + return hasTag(task, INESSENTIAL_TASK); + } + + public static boolean hasTag(Task task, Object tag) { + return task.getTags().contains(tag); } public static > V markInessential(V task) { diff --git a/core/src/test/java/brooklyn/management/internal/EntityExecutionManagerTest.java b/core/src/test/java/brooklyn/management/internal/EntityExecutionManagerTest.java index e8d8986fe2..7116d3e3a6 100644 --- a/core/src/test/java/brooklyn/management/internal/EntityExecutionManagerTest.java +++ b/core/src/test/java/brooklyn/management/internal/EntityExecutionManagerTest.java @@ -25,12 +25,13 @@ import java.io.Serializable; import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.Set; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.Semaphore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -41,14 +42,23 @@ import brooklyn.entity.basic.BrooklynTaskTags; import brooklyn.entity.basic.BrooklynTaskTags.WrappedEntity; import brooklyn.entity.basic.Entities; +import brooklyn.entity.basic.EntityInternal; import brooklyn.entity.proxying.EntitySpec; import brooklyn.event.basic.BasicAttributeSensor; +import brooklyn.management.ExecutionManager; import brooklyn.management.Task; import brooklyn.test.Asserts; import brooklyn.test.entity.LocalManagementContextForTests; import brooklyn.test.entity.TestApplication; import brooklyn.test.entity.TestEntity; import brooklyn.util.collections.MutableMap; +import brooklyn.util.javalang.JavaClassNames; +import brooklyn.util.task.BasicExecutionManager; +import brooklyn.util.task.ExecutionListener; +import brooklyn.util.task.TaskBuilder; +import brooklyn.util.task.Tasks; +import brooklyn.util.time.Duration; +import brooklyn.util.time.Time; import com.google.common.base.Stopwatch; import com.google.common.collect.ImmutableList; @@ -56,13 +66,16 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; +import com.google.common.util.concurrent.Callables; +/** Includes many tests for {@link BrooklynGarbageCollector} */ public class EntityExecutionManagerTest { private static final Logger LOG = LoggerFactory.getLogger(EntityExecutionManagerTest.class); - private static final int TIMEOUT_MS = 10*1000; + private static final Duration TIMEOUT_MS = Duration.TEN_SECONDS; + private ManagementContextInternal mgmt; private TestApplication app; private TestEntity e; @@ -74,24 +87,162 @@ public void setUp() throws Exception { public void tearDown() throws Exception { if (app != null) Entities.destroyAll(app.getManagementContext()); app = null; + if (mgmt != null) Entities.destroyAll(mgmt); } @Test - public void testGetTasksOfEntity() throws Exception { + public void testOnDoneCallback() throws InterruptedException { + mgmt = LocalManagementContextForTests.newInstance(); + ExecutionManager em = mgmt.getExecutionManager(); + BasicExecutionManager bem = (BasicExecutionManager)em; + final Map,Duration> completedTasks = MutableMap.of(); + final Semaphore sema4 = new Semaphore(-1); + bem.addListener(new ExecutionListener() { + @Override + public void onTaskDone(Task task) { + Assert.assertTrue(task.isDone()); + Assert.assertEquals(task.getUnchecked(), "foo"); + completedTasks.put(task, Duration.sinceUtc(task.getEndTimeUtc())); + sema4.release(); + } + }); + Task t1 = em.submit( Tasks.builder().name("t1").dynamic(false).body(Callables.returning("foo")).build() ); + t1.getUnchecked(); + Task t2 = em.submit( Tasks.builder().name("t2").dynamic(false).body(Callables.returning("foo")).build() ); + sema4.acquire(); + Assert.assertEquals(completedTasks.size(), 2, "completed tasks are: "+completedTasks); + completedTasks.get(t1).isShorterThan(Duration.TEN_SECONDS); + completedTasks.get(t2).isShorterThan(Duration.TEN_SECONDS); + } + + protected void forceGc() { + ((LocalManagementContext)app.getManagementContext()).getGarbageCollector().gcIteration(); + } + + protected static Task runEmptyTaskWithNameAndTags(Entity target, String name, Object ...tags) { + TaskBuilder tb = newEmptyTask(name); + for (Object tag: tags) tb.tag(tag); + Task task = ((EntityInternal)target).getExecutionContext().submit(tb.build()); + task.getUnchecked(); + return task; + } + + protected static TaskBuilder newEmptyTask(String name) { + return Tasks.builder().name(name).dynamic(false).body(Callables.returning(null)); + } + + protected static void assertTaskCountForEntity(Entity entity, int expectedCount) { + Collection> tasks = BrooklynTaskTags.getTasksInEntityContext(((EntityInternal)entity).getManagementContext().getExecutionManager(), entity); + Assert.assertEquals(tasks.size(), expectedCount, "Tasks were "+tasks); + } + + @Test + public void testGetTasksAndGcBoringTags() throws Exception { app = TestApplication.Factory.newManagedInstanceForTests(); e = app.createAndManageChild(EntitySpec.create(TestEntity.class)); - final CountDownLatch latch = new CountDownLatch(1); - Task task = e.getExecutionContext().submit( - MutableMap.of("tag", ManagementContextInternal.NON_TRANSIENT_TASK_TAG), - new Runnable() { - @Override public void run() { - latch.countDown(); - }}); - latch.await(TIMEOUT_MS, TimeUnit.MILLISECONDS); + Task task = runEmptyTaskWithNameAndTags(e, "should-be-kept", ManagementContextInternal.NON_TRANSIENT_TASK_TAG); + runEmptyTaskWithNameAndTags(e, "should-be-gcd", ManagementContextInternal.TRANSIENT_TASK_TAG); + // dead task (and initialization task) should have been GC'd on completion Collection> tasks = BrooklynTaskTags.getTasksInEntityContext(app.getManagementContext().getExecutionManager(), e); - assertEquals(tasks, ImmutableList.of(task)); + assertEquals(tasks, ImmutableList.of(task), "Mismatched tasks, got: "+tasks); + } + + @Test + public void testGcTaskAtNormalTagLimit() throws Exception { + app = TestApplication.Factory.newManagedInstanceForTests(); + e = app.createAndManageChild(EntitySpec.create(TestEntity.class)); + + ((BrooklynProperties)app.getManagementContext().getConfig()).put( + BrooklynGarbageCollector.MAX_TASKS_PER_TAG, 2); + + for (int count=0; count<5; count++) + runEmptyTaskWithNameAndTags(e, "task"+count, ManagementContextInternal.NON_TRANSIENT_TASK_TAG, "boring-tag"); + + forceGc(); + assertTaskCountForEntity(e, 2); + } + + @Test + public void testGcTaskAtEntityLimit() throws Exception { + app = TestApplication.Factory.newManagedInstanceForTests(); + e = app.createAndManageChild(EntitySpec.create(TestEntity.class)); + + ((BrooklynProperties)app.getManagementContext().getConfig()).put( + BrooklynGarbageCollector.MAX_TASKS_PER_ENTITY, 2); + + for (int count=0; count<5; count++) + runEmptyTaskWithNameAndTags(e, "task-e-"+count, ManagementContextInternal.NON_TRANSIENT_TASK_TAG, "boring-tag"); + for (int count=0; count<5; count++) + runEmptyTaskWithNameAndTags(app, "task-app-"+count, ManagementContextInternal.NON_TRANSIENT_TASK_TAG, "boring-tag"); + + forceGc(); + assertTaskCountForEntity(app, 2); + assertTaskCountForEntity(e, 2); + } + + @Test + public void testGcTaskWithTagAndEntityLimit() throws Exception { + app = TestApplication.Factory.newManagedInstanceForTests(); + e = app.createAndManageChild(EntitySpec.create(TestEntity.class)); + + ((BrooklynProperties)app.getManagementContext().getConfig()).put( + BrooklynGarbageCollector.MAX_TASKS_PER_ENTITY, 6); + ((BrooklynProperties)app.getManagementContext().getConfig()).put( + BrooklynGarbageCollector.MAX_TASKS_PER_TAG, 2); + + int count=0; + + runEmptyTaskWithNameAndTags(app, "task-"+(count++), ManagementContextInternal.NON_TRANSIENT_TASK_TAG, "boring-tag"); + runEmptyTaskWithNameAndTags(e, "task-"+(count++), ManagementContextInternal.NON_TRANSIENT_TASK_TAG, "boring-tag"); + Time.sleep(Duration.ONE_MILLISECOND); + // should keep the 2 below, because all the other borings get grace, but delete the ones above + runEmptyTaskWithNameAndTags(e, "task-"+(count++), ManagementContextInternal.NON_TRANSIENT_TASK_TAG, "boring-tag"); + runEmptyTaskWithNameAndTags(e, "task-"+(count++), ManagementContextInternal.NON_TRANSIENT_TASK_TAG, "boring-tag"); + + runEmptyTaskWithNameAndTags(e, "task-"+(count++), ManagementContextInternal.NON_TRANSIENT_TASK_TAG, "boring-tag", "another-tag-e"); + runEmptyTaskWithNameAndTags(e, "task-"+(count++), ManagementContextInternal.NON_TRANSIENT_TASK_TAG, "boring-tag", "another-tag-e"); + // should keep both the above + + runEmptyTaskWithNameAndTags(e, "task-"+(count++), ManagementContextInternal.NON_TRANSIENT_TASK_TAG, "another-tag"); + runEmptyTaskWithNameAndTags(e, "task-"+(count++), ManagementContextInternal.NON_TRANSIENT_TASK_TAG, "another-tag"); + Time.sleep(Duration.ONE_MILLISECOND); + runEmptyTaskWithNameAndTags(app, "task-"+(count++), ManagementContextInternal.NON_TRANSIENT_TASK_TAG, "another-tag"); + // should keep the below since they have unique tags, but remove one of the e tasks above + runEmptyTaskWithNameAndTags(e, "task-"+(count++), ManagementContextInternal.NON_TRANSIENT_TASK_TAG, "another-tag", "and-another-tag"); + runEmptyTaskWithNameAndTags(app, "task-"+(count++), ManagementContextInternal.NON_TRANSIENT_TASK_TAG, "another-tag-app", "another-tag"); + runEmptyTaskWithNameAndTags(app, "task-"+(count++), ManagementContextInternal.NON_TRANSIENT_TASK_TAG, "another-tag-app", "another-tag"); + + forceGc(); + assertTaskCountForEntity(e, 6); + assertTaskCountForEntity(app, 3); + + // now with a lowered limit, we should remove one more e + ((BrooklynProperties)app.getManagementContext().getConfig()).put( + BrooklynGarbageCollector.MAX_TASKS_PER_ENTITY, 5); + forceGc(); + assertTaskCountForEntity(e, 5); + } + + @Test + public void testGcDynamicTaskAtNormalTagLimit() throws Exception { + app = TestApplication.Factory.newManagedInstanceForTests(); + e = app.createAndManageChild(EntitySpec.create(TestEntity.class)); + + ((BrooklynProperties)app.getManagementContext().getConfig()).put( + BrooklynGarbageCollector.MAX_TASKS_PER_TAG, 2); + + for (int count=0; count<5; count++) { + TaskBuilder tb = Tasks.builder().name("task-"+count).dynamic(true).body(new Runnable() { @Override public void run() {}}) + .tag(ManagementContextInternal.NON_TRANSIENT_TASK_TAG).tag("foo"); + ((EntityInternal)e).getExecutionContext().submit(tb.build()); + } + + forceGc(); + + final int EXTRA_TASKS_PER_DYNAMIC = 1; + assertTaskCountForEntity(e, 2*(1+EXTRA_TASKS_PER_DYNAMIC)); } @Test @@ -132,12 +283,14 @@ public void testUnmanagedEntityGcedOnUnmanageEvenIfEffectorInvoked() throws Exce BasicAttributeSensor byteArrayAttrib = new BasicAttributeSensor(Object.class, "test.byteArray", ""); for (int i = 0; i < 1000; i++) { + if (i%100==0) LOG.info(JavaClassNames.niceClassAndMethod()+": iteration "+i); try { LOG.debug("testUnmanagedEntityGcedOnUnmanageEvenIfEffectorInvoked: iteration="+i); TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class)); entity.setAttribute(byteArrayAttrib, new BigObject(10*1000*1000)); entity.invoke(TestEntity.MY_EFFECTOR, ImmutableMap.of()).get(); Entities.destroy(entity); + forceGc(); } catch (OutOfMemoryError e) { LOG.info("testUnmanagedEntityGcedOnUnmanageEvenIfEffectorInvoked: OOME at iteration="+i); throw e; @@ -153,19 +306,20 @@ public void testUnmanagedEntityGcedOnUnmanageEvenIfEffectorInvoked() throws Exce public void testEffectorTasksGcedSoNoOome() throws Exception { BrooklynProperties brooklynProperties = BrooklynProperties.Factory.newEmpty(); - brooklynProperties.put(BrooklynGarbageCollector.GC_PERIOD, 1); + brooklynProperties.put(BrooklynGarbageCollector.GC_PERIOD, Duration.ONE_MILLISECOND); brooklynProperties.put(BrooklynGarbageCollector.MAX_TASKS_PER_TAG, 2); app = ApplicationBuilder.newManagedApp(TestApplication.class, LocalManagementContextForTests.newInstance(brooklynProperties)); TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class)); for (int i = 0; i < 1000; i++) { + if (i%100==0) LOG.info(JavaClassNames.niceClassAndMethod()+": iteration "+i); try { LOG.debug("testEffectorTasksGced: iteration="+i); entity.invoke(TestEntity.IDENTITY_EFFECTOR, ImmutableMap.of("arg", new BigObject(10*1000*1000))).get(); - Thread.sleep(1); // Give GC thread a chance to run - + Time.sleep(Duration.ONE_MILLISECOND); // Give GC thread a chance to run + forceGc(); } catch (OutOfMemoryError e) { LOG.info("testEffectorTasksGced: OOME at iteration="+i); throw e; @@ -173,17 +327,11 @@ public void testEffectorTasksGcedSoNoOome() throws Exception { } } - // FIXME DynamicSequentialTask creates a secondaryJobMaster task (DstJob) so we have these extra tasks interfering. - // We can't just mark that task as transient, as all sub-tasks of the sequential-task have that as its - // context so are automatically deleted. We probably don't want to make that secondaryJobMaster a child of the - // DynamicSequentialTask to have it deleted automatically because then it would be listed in the web-console's - // task view. - // The "right" solution is probably to get rid of that task altogether, and rely on the newTaskEndCallback. - @Test(groups={"Integration", "WIP"}) + @Test(groups={"Integration"}) public void testEffectorTasksGcedForMaxPerTag() throws Exception { int maxNumTasks = 2; BrooklynProperties brooklynProperties = BrooklynProperties.Factory.newEmpty(); - brooklynProperties.put(BrooklynGarbageCollector.GC_PERIOD, 1000); + brooklynProperties.put(BrooklynGarbageCollector.GC_PERIOD, Duration.ONE_SECOND); brooklynProperties.put(BrooklynGarbageCollector.MAX_TASKS_PER_TAG, 2); app = ApplicationBuilder.newManagedApp(TestApplication.class, LocalManagementContextForTests.newInstance(brooklynProperties)); @@ -214,11 +362,11 @@ public void testEffectorTasksGcedForMaxPerTag() throws Exception { @Test(groups="Integration") public void testEffectorTasksGcedForAge() throws Exception { - int maxTaskAge = 100; - int maxOverhead = 250; - int earlyReturnGrace = 10; + Duration maxTaskAge = Duration.millis(100); + Duration maxOverhead = Duration.millis(250); + Duration earlyReturnGrace = Duration.millis(10); BrooklynProperties brooklynProperties = BrooklynProperties.Factory.newEmpty(); - brooklynProperties.put(BrooklynGarbageCollector.GC_PERIOD, 1); + brooklynProperties.put(BrooklynGarbageCollector.GC_PERIOD, Duration.ONE_MILLISECOND); brooklynProperties.put(BrooklynGarbageCollector.MAX_TASK_AGE, maxTaskAge); app = ApplicationBuilder.newManagedApp(TestApplication.class, LocalManagementContextForTests.newInstance(brooklynProperties)); @@ -236,10 +384,9 @@ public void testEffectorTasksGcedForAge() throws Exception { assertEquals(storedTasks, ImmutableSet.of(), "storedTasks="+storedTasks); }}); - long timeToGc = stopwatch.elapsed(TimeUnit.MILLISECONDS); - - assertTrue(timeToGc > (maxTaskAge-earlyReturnGrace), "timeToGc="+timeToGc+"; maxTaskAge="+maxTaskAge); - assertTrue(timeToGc < (maxTaskAge+maxOverhead), "timeToGc="+timeToGc+"; maxTaskAge="+maxTaskAge); + Duration timeToGc = Duration.of(stopwatch); + assertTrue(timeToGc.isLongerThan(maxTaskAge.subtract(earlyReturnGrace)), "timeToGc="+timeToGc+"; maxTaskAge="+maxTaskAge); + assertTrue(timeToGc.isShorterThan(maxTaskAge.add(maxOverhead)), "timeToGc="+timeToGc+"; maxTaskAge="+maxTaskAge); } private static class BigObject implements Serializable { diff --git a/core/src/test/java/brooklyn/util/task/BasicTaskExecutionTest.java b/core/src/test/java/brooklyn/util/task/BasicTaskExecutionTest.java index cd0d2b134b..d427633633 100644 --- a/core/src/test/java/brooklyn/util/task/BasicTaskExecutionTest.java +++ b/core/src/test/java/brooklyn/util/task/BasicTaskExecutionTest.java @@ -372,14 +372,14 @@ public Integer call() throws Exception { }}); assertEquals(null, t.submittedByTask); assertEquals(-1, t.submitTimeUtc); - assertNull(t.getResult()); + assertNull(t.getInternalFuture()); em.submit(MutableMap.of("tag", "A"), t); assertTrue(signalStarted.await(TIMEOUT_MS, TimeUnit.MILLISECONDS)); assertTrue(t.submitTimeUtc > 0); assertTrue(t.startTimeUtc >= t.submitTimeUtc); - assertNotNull(t.getResult()); + assertNotNull(t.getInternalFuture()); assertEquals(-1, t.endTimeUtc); assertEquals(false, t.isCancelled()); diff --git a/usage/launcher/src/main/java/brooklyn/launcher/config/BrooklynGlobalConfig.java b/usage/launcher/src/main/java/brooklyn/launcher/config/BrooklynGlobalConfig.java index d5911cb050..08a8c578db 100644 --- a/usage/launcher/src/main/java/brooklyn/launcher/config/BrooklynGlobalConfig.java +++ b/usage/launcher/src/main/java/brooklyn/launcher/config/BrooklynGlobalConfig.java @@ -27,6 +27,7 @@ import brooklyn.rest.BrooklynWebConfig; import brooklyn.util.internal.BrooklynSystemProperties; import brooklyn.util.internal.StringSystemProperty; +import brooklyn.util.time.Duration; /** * Convenience collection of popular global configuration values. @@ -47,10 +48,12 @@ public class BrooklynGlobalConfig { public static final ConfigKey REQUIRE_HTTPS = BrooklynWebConfig.HTTPS_REQUIRED; - public static final ConfigKey GC_PERIOD = BrooklynGarbageCollector.GC_PERIOD; + public static final ConfigKey GC_PERIOD = BrooklynGarbageCollector.GC_PERIOD; public static final ConfigKey DO_SYSTEM_GC = BrooklynGarbageCollector.DO_SYSTEM_GC; public static final ConfigKey MAX_TASKS_PER_TAG = BrooklynGarbageCollector.MAX_TASKS_PER_TAG; - public static final ConfigKey MAX_TASK_AGE = BrooklynGarbageCollector.MAX_TASK_AGE; + public static final ConfigKey MAX_TASKS_PER_ENTITY = BrooklynGarbageCollector.MAX_TASKS_PER_ENTITY; + public static final ConfigKey MAX_TASKS_GLOBAL = BrooklynGarbageCollector.MAX_TASKS_GLOBAL; + public static final ConfigKey MAX_TASK_AGE = BrooklynGarbageCollector.MAX_TASK_AGE; public static final StringSystemProperty LOCALHOST_IP_ADDRESS = BrooklynServiceAttributes.LOCALHOST_IP_ADDRESS; diff --git a/utils/common/src/main/java/brooklyn/util/time/Duration.java b/utils/common/src/main/java/brooklyn/util/time/Duration.java index 3fc86584b9..73cc31430f 100644 --- a/utils/common/src/main/java/brooklyn/util/time/Duration.java +++ b/utils/common/src/main/java/brooklyn/util/time/Duration.java @@ -145,6 +145,16 @@ public static Duration parse(String textualDescription) { return new Duration(Time.parseTimeString(textualDescription), TimeUnit.MILLISECONDS); } + /** creates new {@link Duration} instance of the given length of time */ + public static Duration days(Number n) { + return new Duration((long) (n.doubleValue() * TimeUnit.DAYS.toNanos(1)), TimeUnit.NANOSECONDS); + } + + /** creates new {@link Duration} instance of the given length of time */ + public static Duration hours(Number n) { + return new Duration((long) (n.doubleValue() * TimeUnit.HOURS.toNanos(1)), TimeUnit.NANOSECONDS); + } + /** creates new {@link Duration} instance of the given length of time */ public static Duration minutes(Number n) { return new Duration((long) (n.doubleValue() * TimeUnit.MINUTES.toNanos(1)), TimeUnit.NANOSECONDS); @@ -263,4 +273,12 @@ public boolean isPositive() { return nanos()>0; } + public boolean isLongerThan(Duration x) { + return compareTo(x) > 0; + } + + public boolean isShorterThan(Duration x) { + return compareTo(x) < 0; + } + } diff --git a/utils/common/src/test/java/brooklyn/util/time/DurationTest.java b/utils/common/src/test/java/brooklyn/util/time/DurationTest.java index f5548d2a50..b59c73b857 100644 --- a/utils/common/src/test/java/brooklyn/util/time/DurationTest.java +++ b/utils/common/src/test/java/brooklyn/util/time/DurationTest.java @@ -96,4 +96,12 @@ public void testNotRoundingNegative() { Assert.assertEquals(Duration.nanos(-1).toMillisecondsRoundingUp(), -1); } + public void testComparison() { + Assert.assertTrue(Duration.seconds(1.8).isLongerThan(Duration.millis(1600))); + Assert.assertTrue(Duration.millis(1600).isShorterThan(Duration.seconds(1.8))); + + Assert.assertTrue(Duration.seconds(1).isLongerThan(Duration.ZERO)); + Assert.assertFalse(Duration.seconds(-1).isLongerThan(Duration.ZERO)); + } + } From 6b054e991686986a1053eff9eefed274e5321228 Mon Sep 17 00:00:00 2001 From: Alex Heneveld Date: Tue, 16 Sep 2014 23:30:08 +0100 Subject: [PATCH 4/4] use an internal proxy on tasks so that sub-tasks submitted by the internal job show up as submitted by their parent, making it easier to GC the right things and show the right things in the dashboard; also tweak how sub-tasks are cleaned up. works nicely in gui now! --- .../internal/BrooklynGarbageCollector.java | 36 +++- .../util/task/BasicExecutionManager.java | 198 +++++++++++------- .../java/brooklyn/util/task/BasicTask.java | 57 ++++- .../util/task/DynamicSequentialTask.java | 2 + .../brooklyn/util/task/ForwardingTask.java | 5 + .../java/brooklyn/util/task/TaskInternal.java | 5 + .../main/java/brooklyn/util/task/Tasks.java | 30 +-- .../internal/EntityExecutionManagerTest.java | 71 ++++--- .../util/task/BasicTaskExecutionTest.java | 6 +- .../util/task/ScheduledExecutionTest.java | 4 +- 10 files changed, 282 insertions(+), 132 deletions(-) diff --git a/core/src/main/java/brooklyn/management/internal/BrooklynGarbageCollector.java b/core/src/main/java/brooklyn/management/internal/BrooklynGarbageCollector.java index bf28c35377..a09477bdda 100644 --- a/core/src/main/java/brooklyn/management/internal/BrooklynGarbageCollector.java +++ b/core/src/main/java/brooklyn/management/internal/BrooklynGarbageCollector.java @@ -42,8 +42,10 @@ import brooklyn.entity.basic.BrooklynTaskTags.WrappedEntity; import brooklyn.entity.basic.BrooklynTaskTags.WrappedStream; import brooklyn.entity.basic.ConfigKeys; +import brooklyn.entity.basic.Entities; import brooklyn.internal.storage.BrooklynStorage; import brooklyn.location.Location; +import brooklyn.management.HasTaskChildren; import brooklyn.management.Task; import brooklyn.util.collections.MutableList; import brooklyn.util.collections.MutableMap; @@ -54,6 +56,7 @@ import brooklyn.util.time.Duration; import com.google.common.annotations.Beta; +import com.google.common.collect.Iterables; /** * Deletes record of old tasks, to prevent space leaks and the eating up of more and more memory. @@ -92,6 +95,12 @@ public class BrooklynGarbageCollector { // work offender is {@link DynamicSequentialTask} internal job tracker, but it is marked // transient so it is destroyed prompty; there may be others, however; // but OTOH it might be expensive to check for these all the time! + // TODO probably we can set this false (remove this and related code), + // and just rely on usual GC to pick up background tasks; the lifecycle of background task + // should normally be independent of the submitter. (DST was the exception, and marking + // transient there fixes the main problem, which is when the submitter is GC'd but the submitted is not, + // and we don't want the submitted to show up at the root in the GUI, which it will if its + // submitter has been GC'd) @Beta public static final ConfigKey CHECK_SUBTASK_SUBMITTERS = ConfigKeys.newBooleanConfigKey( "brooklyn.gc.checkSubtaskSubmitters", "whether for subtasks to check the submitters", true); @@ -239,7 +248,7 @@ public boolean shouldDeleteTask(Task task) { return shouldDeleteTaskImmediately(task); } /** whether this task should be deleted on completion, - * because it is transient, or because it and all submitter ancestors are completed and neither an effector nor non-transient */ + * because it is transient, or because it is submitted background without much context information */ protected boolean shouldDeleteTaskImmediately(Task task) { if (!task.isDone()) return false; @@ -248,10 +257,31 @@ protected boolean shouldDeleteTaskImmediately(Task task) { return true; if (tags.contains(ManagementContextInternal.EFFECTOR_TAG) || tags.contains(ManagementContextInternal.NON_TRANSIENT_TASK_TAG)) return false; - if (task.getSubmittedByTask()!=null && !shouldDeleteTaskImmediately(task.getSubmittedByTask())) - return false; + + if (task.getSubmittedByTask()!=null) { + Task parent = task.getSubmittedByTask(); + if (executionManager.getTask(parent.getId())==null) { + // parent is already cleaned up + return true; + } + if (parent instanceof HasTaskChildren && Iterables.contains(((HasTaskChildren)parent).getChildren(), task)) { + // it is a child, let the parent manage this task's death + return false; + } + Entity associatedEntity = BrooklynTaskTags.getTargetOrContextEntity(task); + if (associatedEntity!=null) { + // this is associated to an entity; destroy only if the entity is unmanaged + return !Entities.isManaged(associatedEntity); + } + // if not associated to an entity, then delete immediately + return true; + } + // e.g. scheduled tasks, sensor events, etc // TODO (in future may keep some of these with another limit, based on a new TagCategory) + // there may also be a server association for server-side tasks which should be kept + // (but be careful not to keep too many subscriptions!) + return true; } diff --git a/core/src/main/java/brooklyn/util/task/BasicExecutionManager.java b/core/src/main/java/brooklyn/util/task/BasicExecutionManager.java index db7a05b64a..f7113158bc 100644 --- a/core/src/main/java/brooklyn/util/task/BasicExecutionManager.java +++ b/core/src/main/java/brooklyn/util/task/BasicExecutionManager.java @@ -61,6 +61,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.Sets; +import com.google.common.util.concurrent.ExecutionList; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -132,6 +133,13 @@ public BasicExecutionManager(String contextid) { delayedRunner = new ScheduledThreadPoolExecutor(1, daemonThreadFactory); } + private final static class UncaughtExceptionHandlerImplementation implements Thread.UncaughtExceptionHandler { + @Override + public void uncaughtException(Thread t, Throwable e) { + log.error("Uncaught exception in thread "+t.getName(), e); + } + } + /** * For use by overriders to use custom thread factory. * But be extremely careful: called by constructor, so before sub-class' constructor will @@ -140,11 +148,7 @@ public BasicExecutionManager(String contextid) { protected ThreadFactory newThreadFactory(String contextid) { return new ThreadFactoryBuilder() .setNameFormat("brooklyn-execmanager-"+contextid+"-%d") - .setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { - @Override - public void uncaughtException(Thread t, Throwable e) { - log.error("Uncaught exception in thread "+t.getName(), e); - }}) + .setUncaughtExceptionHandler(new UncaughtExceptionHandlerImplementation()) .build(); } @@ -377,6 +381,107 @@ public String toString() { } } + private final class SubmissionCallable implements Callable { + private final Map flags; + private final Task task; + + private SubmissionCallable(Map flags, Task task) { + this.flags = flags; + this.task = task; + } + + public T call() { + try { + T result = null; + Throwable error = null; + String oldThreadName = Thread.currentThread().getName(); + try { + if (RENAME_THREADS) { + String newThreadName = oldThreadName+"-"+task.getDisplayName()+ + "["+task.getId().substring(0, 8)+"]"; + Thread.currentThread().setName(newThreadName); + } + beforeStart(flags, task); + if (!task.isCancelled()) { + result = ((TaskInternal)task).getJob().call(); + } else throw new CancellationException(); + } catch(Throwable e) { + error = e; + } finally { + if (RENAME_THREADS) { + Thread.currentThread().setName(oldThreadName); + } + afterEnd(flags, task); + } + if (error!=null) { + /* we throw, after logging debug. + * the throw means the error is available for task submitters to monitor. + * however it is possible no one is monitoring it, in which case we will have debug logging only for errors. + * (the alternative, of warn-level logging in lots of places where we don't want it, seems worse!) + */ + if (log.isDebugEnabled()) { + // debug only here, because most submitters will handle failures + log.debug("Exception running task "+task+" (rethrowing): "+error.getMessage(), error); + if (log.isTraceEnabled()) + log.trace("Trace for exception running task "+task+" (rethrowing): "+error.getMessage(), error); + } + throw Exceptions.propagate(error); + } + return result; + } finally { + ((TaskInternal)task).runListeners(); + } + } + + @Override + public String toString() { + return "BEM.call("+task+","+flags+")"; + } + } + + private final static class ListenableForwardingFutureForTask extends ListenableForwardingFuture { + private final Task task; + + private ListenableForwardingFutureForTask(Future delegate, ExecutionList list, Task task) { + super(delegate, list); + this.task = task; + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + boolean result = false; + if (!task.isCancelled()) result |= task.cancel(mayInterruptIfRunning); + result |= super.cancel(mayInterruptIfRunning); + ((TaskInternal)task).runListeners(); + return result; + } + } + + private final class SubmissionListenerToCallOtherListeners implements Runnable { + private final Task task; + + private SubmissionListenerToCallOtherListeners(Task task) { + this.task = task; + } + + @Override + public void run() { + try { + ((TaskInternal)task).runListeners(); + } catch (Exception e) { + log.warn("Error running task listeners for task "+task+" done", e); + } + + for (ExecutionListener listener : listeners) { + try { + listener.onTaskDone(task); + } catch (Exception e) { + log.warn("Error running execution listener "+listener+" of task "+task+" done", e); + } + } + } + } + @SuppressWarnings("unchecked") protected Task submitNewTask(final Map flags, final Task task) { if (task instanceof ScheduledTask) @@ -390,54 +495,7 @@ protected Task submitNewTask(final Map flags, final Task task) { if (((TaskInternal)task).getJob() == null) throw new NullPointerException("Task "+task+" submitted with with null job: job must be supplied."); - Callable job = new Callable() { - public T call() { - try { - T result = null; - Throwable error = null; - String oldThreadName = Thread.currentThread().getName(); - try { - if (RENAME_THREADS) { - String newThreadName = oldThreadName+"-"+task.getDisplayName()+ - "["+task.getId().substring(0, 8)+"]"; - Thread.currentThread().setName(newThreadName); - } - beforeStart(flags, task); - if (!task.isCancelled()) { - result = ((TaskInternal)task).getJob().call(); - } else throw new CancellationException(); - } catch(Throwable e) { - error = e; - } finally { - if (RENAME_THREADS) { - Thread.currentThread().setName(oldThreadName); - } - afterEnd(flags, task); - } - if (error!=null) { - /* we throw, after logging debug. - * the throw means the error is available for task submitters to monitor. - * however it is possible no one is monitoring it, in which case we will have debug logging only for errors. - * (the alternative, of warn-level logging in lots of places where we don't want it, seems worse!) - */ - if (log.isDebugEnabled()) { - // debug only here, because most submitters will handle failures - log.debug("Exception running task "+task+" (rethrowing): "+error.getMessage(), error); - if (log.isTraceEnabled()) - log.trace("Trace for exception running task "+task+" (rethrowing): "+error.getMessage(), error); - } - throw Exceptions.propagate(error); - } - return result; - } finally { - ((TaskInternal)task).runListeners(); - } - } - @Override - public String toString() { - return "BEM.call("+task+","+flags+")"; - } - }; + Callable job = new SubmissionCallable(flags, task); // If there's a scheduler then use that; otherwise execute it directly Set schedulers = null; @@ -457,34 +515,12 @@ public String toString() { } // on completion, listeners get triggered above; here, below we ensure they get triggered on cancel // (and we make sure the same ExecutionList is used in the future as in the task) - ListenableFuture listenableFuture = new ListenableForwardingFuture(future, ((TaskInternal)task).getListeners()) { - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - boolean result = false; - if (!task.isCancelled()) result |= task.cancel(mayInterruptIfRunning); - result |= super.cancel(mayInterruptIfRunning); - ((TaskInternal)task).runListeners(); - return result; - } - }; - listenableFuture.addListener(new Runnable() { - @Override - public void run() { - try { - ((TaskInternal)task).runListeners(); - } catch (Exception e) { - log.warn("Error running task listeners for task "+task+" done", e); - } - - for (ExecutionListener listener : listeners) { - try { - listener.onTaskDone(task); - } catch (Exception e) { - log.warn("Error running execution listener "+listener+" of task "+task+" done", e); - } - } - } - }, runner); + ListenableFuture listenableFuture = new ListenableForwardingFutureForTask(future, ((TaskInternal)task).getListeners(), task); + // doesn't matter whether the listener is added to the listenableFuture or the task, + // except that for the task we can more easily wrap it so that it only logs debug if the executor is shutdown + // (avoid a bunch of ugly warnings in tests which start and stop things a lot!) + // [probably even nicer to run this in the same thread, it doesn't do much; but that is messier to implement] + ((TaskInternal)task).addListener(new SubmissionListenerToCallOtherListeners(task), runner); ((TaskInternal)task).initInternalFuture(listenableFuture); diff --git a/core/src/main/java/brooklyn/util/task/BasicTask.java b/core/src/main/java/brooklyn/util/task/BasicTask.java index 33ecf56ea6..d1ca9d14cf 100644 --- a/core/src/main/java/brooklyn/util/task/BasicTask.java +++ b/core/src/main/java/brooklyn/util/task/BasicTask.java @@ -36,6 +36,7 @@ import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -48,6 +49,7 @@ import brooklyn.management.Task; import brooklyn.util.GroovyJavaMethods; import brooklyn.util.exceptions.Exceptions; +import brooklyn.util.guava.Maybe; import brooklyn.util.text.Identifiers; import brooklyn.util.text.Strings; import brooklyn.util.time.Duration; @@ -59,6 +61,7 @@ import com.google.common.base.Throwables; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; +import com.google.common.util.concurrent.Callables; import com.google.common.util.concurrent.ExecutionList; import com.google.common.util.concurrent.ListenableFuture; @@ -86,6 +89,8 @@ public class BasicTask implements TaskInternal { protected final Set tags = Sets.newConcurrentHashSet(); // for debugging, to record where tasks were created // { tags.add(new Throwable("Creation stack trace")); } + + protected Task proxyTargetTask = null; protected String blockingDetails = null; protected Task blockingTask = null; @@ -189,7 +194,7 @@ public Task asTask() { protected long submitTimeUtc = -1; protected long startTimeUtc = -1; protected long endTimeUtc = -1; - protected Task submittedByTask; + protected Maybe> submittedByTask; protected volatile Thread thread = null; private volatile boolean cancelled = false; @@ -227,7 +232,10 @@ public synchronized void initInternalFuture(ListenableFuture result) { public Future getInternalFuture() { return internalFuture; } @Override - public Task getSubmittedByTask() { return submittedByTask; } + public Task getSubmittedByTask() { + if (submittedByTask==null) return null; + return submittedByTask.orNull(); + } /** the thread where the task is running, if it is running */ @Override @@ -780,9 +788,35 @@ protected void finalize() throws Throwable { finalizer.onTaskFinalization(this); } + public static class SubmissionErrorCatchingExecutor implements Executor { + final Executor target; + public SubmissionErrorCatchingExecutor(Executor target) { + this.target = target; + } + @Override + public void execute(Runnable command) { + if (isShutdown()) { + log.debug("Skipping execution of task callback hook "+command+" because executor is shutdown."); + return; + } + try { + target.execute(command); + } catch (Exception e) { + if (isShutdown()) { + log.debug("Ignoring failed execution of task callback hook "+command+" because executor is shutdown."); + } else { + log.warn("Execution of task callback hook "+command+" failed: "+e, e); + } + } + } + protected boolean isShutdown() { + return target instanceof ExecutorService && ((ExecutorService)target).isShutdown(); + } + } + @Override public void addListener(Runnable listener, Executor executor) { - listeners.add(listener, executor); + listeners.add(listener, new SubmissionErrorCatchingExecutor(executor)); } @Override @@ -820,9 +854,18 @@ public void setSubmitTimeUtc(long val) { submitTimeUtc = val; } + private static Task newGoneTaskFor(Task task) { + Task t = Tasks.builder().dynamic(false).name(task.getDisplayName()) + .description("Details of the original task "+task+" have been forgotten.") + .body(Callables.returning((T)null)).build(); + ((BasicTask)t).ignoreIfNotRun(); + return t; + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) @Override public void setSubmittedByTask(Task task) { - submittedByTask = task; + submittedByTask = (Maybe)Maybe.weakThen((Task)task, (Maybe)Maybe.of(BasicTask.newGoneTaskFor(task))); } @Override @@ -839,5 +882,9 @@ public void setStartTimeUtc(long val) { public void applyTagModifier(Function,Void> modifier) { modifier.apply(tags); } - + + @Override + public Task getProxyTarget() { + return proxyTargetTask; + } } diff --git a/core/src/main/java/brooklyn/util/task/DynamicSequentialTask.java b/core/src/main/java/brooklyn/util/task/DynamicSequentialTask.java index d991a4b576..266921b508 100644 --- a/core/src/main/java/brooklyn/util/task/DynamicSequentialTask.java +++ b/core/src/main/java/brooklyn/util/task/DynamicSequentialTask.java @@ -311,6 +311,8 @@ public List call() throws Exception { return result; } }).build(); + ((BasicTask)secondaryJobMaster).proxyTargetTask = DynamicSequentialTask.this; + submitBackgroundInheritingContext(secondaryJobMaster); T result = null; diff --git a/core/src/main/java/brooklyn/util/task/ForwardingTask.java b/core/src/main/java/brooklyn/util/task/ForwardingTask.java index 001c1ba4d7..91e4dfe16b 100644 --- a/core/src/main/java/brooklyn/util/task/ForwardingTask.java +++ b/core/src/main/java/brooklyn/util/task/ForwardingTask.java @@ -316,4 +316,9 @@ public void setStartTimeUtc(long currentTimeMillis) { public void applyTagModifier(Function, Void> modifier) { delegate().applyTagModifier(modifier); } + + @Override + public Task getProxyTarget() { + return delegate().getProxyTarget(); + } } diff --git a/core/src/main/java/brooklyn/util/task/TaskInternal.java b/core/src/main/java/brooklyn/util/task/TaskInternal.java index 222164b177..b731b4d92b 100644 --- a/core/src/main/java/brooklyn/util/task/TaskInternal.java +++ b/core/src/main/java/brooklyn/util/task/TaskInternal.java @@ -116,4 +116,9 @@ public interface TaskInternal extends Task { void setStartTimeUtc(long currentTimeMillis); void applyTagModifier(Function,Void> modifier); + + /** if a task is a proxy for another one (used mainly for internal tasks), + * this returns the "real" task represented by this one */ + Task getProxyTarget(); + } diff --git a/core/src/main/java/brooklyn/util/task/Tasks.java b/core/src/main/java/brooklyn/util/task/Tasks.java index 43c688beec..2afd539bf4 100644 --- a/core/src/main/java/brooklyn/util/task/Tasks.java +++ b/core/src/main/java/brooklyn/util/task/Tasks.java @@ -97,10 +97,20 @@ public static T withBlockingDetails(String description, Callable code) th } } - /** the {@link Task} where the current thread is executing, if executing in a Task, otherwise null */ + /** the {@link Task} where the current thread is executing, if executing in a Task, otherwise null; + * if the current task is a proxy, this returns the target of that proxy */ @SuppressWarnings("rawtypes") - public static Task current() { return BasicExecutionManager.getPerThreadCurrentTask().get(); } + public static Task current() { + return getFinalProxyTarget(BasicExecutionManager.getPerThreadCurrentTask().get()); + } + public static Task getFinalProxyTarget(Task task) { + if (task==null) return null; + Task proxy = ((TaskInternal)task).getProxyTarget(); + if (proxy==null || proxy.equals(task)) return task; + return getFinalProxyTarget(proxy); + } + /** creates a {@link ValueResolver} instance which allows significantly more customization than * the various {@link #resolveValue(Object, Class, ExecutionContext)} methods here */ public static ValueResolver resolving(Object v, Class type) { @@ -177,39 +187,31 @@ private static Task[] asTasks(TaskAdaptable ...tasks) { public static Task> parallel(TaskAdaptable ...tasks) { return parallelInternal("parallelised tasks", asTasks(tasks)); } - public static Task> parallel(String name, TaskAdaptable ...tasks) { return parallelInternal(name, asTasks(tasks)); } - public static Task> parallel(String name, Iterable> tasks) { return parallelInternal(name, asTasks(Iterables.toArray(tasks, TaskAdaptable.class))); } - - public static Task> parallelInternal(String name, Task[] tasks) { + private static Task> parallelInternal(String name, Task[] tasks) { return Tasks.>builder().name(name).parallel(true).add(tasks).build(); } public static Task> sequential(TaskAdaptable ...tasks) { return sequentialInternal("sequential tasks", asTasks(tasks)); } - public static Task> sequential(String name, TaskAdaptable ...tasks) { return sequentialInternal(name, asTasks(tasks)); } - - private static Task> sequentialInternal(String name, Task[] tasks) { - return Tasks.>builder().name(name).parallel(false).add(tasks).build(); - } - public static TaskFactory sequential(TaskFactory ...taskFactories) { return sequentialInternal("sequential tasks", taskFactories); } - public static TaskFactory sequential(String name, TaskFactory ...taskFactories) { return sequentialInternal(name, taskFactories); } - + private static Task> sequentialInternal(String name, Task[] tasks) { + return Tasks.>builder().name(name).parallel(false).add(tasks).build(); + } private static TaskFactory sequentialInternal(final String name, final TaskFactory ...taskFactories) { return new TaskFactory>() { @Override diff --git a/core/src/test/java/brooklyn/management/internal/EntityExecutionManagerTest.java b/core/src/test/java/brooklyn/management/internal/EntityExecutionManagerTest.java index 7116d3e3a6..0e79b97f51 100644 --- a/core/src/test/java/brooklyn/management/internal/EntityExecutionManagerTest.java +++ b/core/src/test/java/brooklyn/management/internal/EntityExecutionManagerTest.java @@ -236,13 +236,14 @@ public void testGcDynamicTaskAtNormalTagLimit() throws Exception { for (int count=0; count<5; count++) { TaskBuilder tb = Tasks.builder().name("task-"+count).dynamic(true).body(new Runnable() { @Override public void run() {}}) .tag(ManagementContextInternal.NON_TRANSIENT_TASK_TAG).tag("foo"); - ((EntityInternal)e).getExecutionContext().submit(tb.build()); + ((EntityInternal)e).getExecutionContext().submit(tb.build()).getUnchecked(); } forceGc(); - final int EXTRA_TASKS_PER_DYNAMIC = 1; - assertTaskCountForEntity(e, 2*(1+EXTRA_TASKS_PER_DYNAMIC)); + // might need an eventually here, if the internal job completion and GC is done in the background + // (if there are no test failures for a few months, since Sept 2014, then we can remove this comment) + assertTaskCountForEntity(e, 2); } @Test @@ -260,8 +261,6 @@ public void testUnmanagedEntityCanBeGcedEvenIfPreviouslyTagged() throws Exceptio assertTrue(tags.contains(BrooklynTaskTags.tagForContextEntity(e)), "tags="+tags); Entities.destroy(e); - e = null; - for (int i = 0; i < 5; i++) System.gc(); Set tags2 = app.getManagementContext().getExecutionManager().getTaskTags(); for (Object tag : tags2) { @@ -277,27 +276,25 @@ public void testUnmanagedEntityCanBeGcedEvenIfPreviouslyTagged() throws Exceptio } @Test(groups="Integration") - public void testUnmanagedEntityGcedOnUnmanageEvenIfEffectorInvoked() throws Exception { + public void testSubscriptionAndEffectorTasksGced() throws Exception { app = TestApplication.Factory.newManagedInstanceForTests(); + BasicExecutionManager em = (BasicExecutionManager) app.getManagementContext().getExecutionManager(); + // allow background enrichers to complete + Time.sleep(Duration.ONE_SECOND); + forceGc(); + List> t1 = em.getAllTasks(); - BasicAttributeSensor byteArrayAttrib = new BasicAttributeSensor(Object.class, "test.byteArray", ""); - - for (int i = 0; i < 1000; i++) { - if (i%100==0) LOG.info(JavaClassNames.niceClassAndMethod()+": iteration "+i); - try { - LOG.debug("testUnmanagedEntityGcedOnUnmanageEvenIfEffectorInvoked: iteration="+i); - TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class)); - entity.setAttribute(byteArrayAttrib, new BigObject(10*1000*1000)); - entity.invoke(TestEntity.MY_EFFECTOR, ImmutableMap.of()).get(); - Entities.destroy(entity); - forceGc(); - } catch (OutOfMemoryError e) { - LOG.info("testUnmanagedEntityGcedOnUnmanageEvenIfEffectorInvoked: OOME at iteration="+i); - throw e; - } - } + TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class)); + entity.setAttribute(TestEntity.NAME, "bob"); + entity.invoke(TestEntity.MY_EFFECTOR, ImmutableMap.of()).get(); + Entities.destroy(entity); + Time.sleep(Duration.ONE_SECOND); + forceGc(); + List> t2 = em.getAllTasks(); + + Assert.assertEquals(t1.size(), t2.size(), "lists are different:\n"+t1+"\n"+t2+"\n"); } - + /** * Invoke effector many times, where each would claim 10MB because it stores the return value. * If it didn't gc the tasks promptly, it would consume 10GB ram (so would OOME before that). @@ -321,7 +318,33 @@ public void testEffectorTasksGcedSoNoOome() throws Exception { Time.sleep(Duration.ONE_MILLISECOND); // Give GC thread a chance to run forceGc(); } catch (OutOfMemoryError e) { - LOG.info("testEffectorTasksGced: OOME at iteration="+i); + LOG.warn(JavaClassNames.niceClassAndMethod()+": OOME at iteration="+i); + throw e; + } + } + } + + @Test(groups="Integration") + public void testUnmanagedEntityGcedOnUnmanageEvenIfEffectorInvoked() throws Exception { + app = TestApplication.Factory.newManagedInstanceForTests(); + + BasicAttributeSensor byteArrayAttrib = new BasicAttributeSensor(Object.class, "test.byteArray", ""); + + for (int i = 0; i < 1000; i++) { + if (i%100==0) LOG.info(JavaClassNames.niceClassAndMethod()+": iteration "+i); + try { + LOG.debug(JavaClassNames.niceClassAndMethod()+": iteration="+i); + TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class)); + entity.setAttribute(byteArrayAttrib, new BigObject(10*1000*1000)); + entity.invoke(TestEntity.MY_EFFECTOR, ImmutableMap.of()).get(); + Entities.destroy(entity); + forceGc(); + System.gc(); System.gc(); + } catch (OutOfMemoryError e) { + LOG.warn(JavaClassNames.niceClassAndMethod()+": OOME at iteration="+i); + ExecutionManager em = app.getManagementContext().getExecutionManager(); + Collection> tasks = ((BasicExecutionManager)em).getAllTasks(); + LOG.info("TASKS count "+tasks.size()+": "+tasks); throw e; } } diff --git a/core/src/test/java/brooklyn/util/task/BasicTaskExecutionTest.java b/core/src/test/java/brooklyn/util/task/BasicTaskExecutionTest.java index d427633633..2471f87c01 100644 --- a/core/src/test/java/brooklyn/util/task/BasicTaskExecutionTest.java +++ b/core/src/test/java/brooklyn/util/task/BasicTaskExecutionTest.java @@ -370,7 +370,7 @@ public Integer call() throws Exception { allowCompletion.await(); return 42; }}); - assertEquals(null, t.submittedByTask); + assertEquals(null, t.getSubmittedByTask()); assertEquals(-1, t.submitTimeUtc); assertNull(t.getInternalFuture()); @@ -411,9 +411,9 @@ public Integer call() throws Exception { BasicTask tb = (BasicTask) em.getTasksWithTag("B").iterator().next(); assertEquals( 46, tb.get() ); assertEquals( t, em.getTasksWithTag("A").iterator().next() ); - assertNull( t.submittedByTask ); + assertNull( t.getSubmittedByTask() ); - BasicTask submitter = (BasicTask) tb.submittedByTask; + BasicTask submitter = (BasicTask) tb.getSubmittedByTask(); assertNotNull(submitter); assertEquals("sample", submitter.displayName); assertEquals("some descr", submitter.description); diff --git a/core/src/test/java/brooklyn/util/task/ScheduledExecutionTest.java b/core/src/test/java/brooklyn/util/task/ScheduledExecutionTest.java index 9cda495fef..654168e17b 100644 --- a/core/src/test/java/brooklyn/util/task/ScheduledExecutionTest.java +++ b/core/src/test/java/brooklyn/util/task/ScheduledExecutionTest.java @@ -79,7 +79,7 @@ public void testScheduledTaskSelfEnding() throws Exception { public Task call() throws Exception { return new BasicTask(new Callable() { public Integer call() { - ScheduledTask submitter = (ScheduledTask) ((BasicTask)Tasks.current()).submittedByTask; + ScheduledTask submitter = (ScheduledTask) ((BasicTask)Tasks.current()).getSubmittedByTask(); if (i.get() >= 4) submitter.period = null; log.info("task running ("+i+"): "+Tasks.current()+" "+Tasks.current().getStatusDetail(false)); return i.incrementAndGet(); @@ -109,7 +109,7 @@ public Task call() throws Exception { return new BasicTask(new Callable() { public Integer call() { log.info("task running ("+i+"): "+Tasks.current()+" "+Tasks.current().getStatusDetail(false)); - ScheduledTask submitter = (ScheduledTask) ((BasicTask)Tasks.current()).submittedByTask; + ScheduledTask submitter = (ScheduledTask) ((BasicTask)Tasks.current()).getSubmittedByTask(); i.incrementAndGet(); if (i.get() >= 5) submitter.cancel(); return i.get();