From 42db86403e0c4f96651b93a6f720c520f92bd619 Mon Sep 17 00:00:00 2001 From: Svetoslav Neykov Date: Sun, 2 Oct 2016 12:20:58 +0300 Subject: [PATCH 1/4] Adds optional task startup jitter Useful for troubleshooting/testing concurrency related code. --- .../core/BrooklynFeatureEnablement.java | 9 ++++ .../util/core/task/BasicExecutionManager.java | 43 +++++++++++++++++-- 2 files changed, 49 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/org/apache/brooklyn/core/BrooklynFeatureEnablement.java b/core/src/main/java/org/apache/brooklyn/core/BrooklynFeatureEnablement.java index f2e8a0ac2d..8bc2ffd298 100644 --- a/core/src/main/java/org/apache/brooklyn/core/BrooklynFeatureEnablement.java +++ b/core/src/main/java/org/apache/brooklyn/core/BrooklynFeatureEnablement.java @@ -83,6 +83,14 @@ public class BrooklynFeatureEnablement { */ public static final String FEATURE_RENAME_THREADS = "brooklyn.executionManager.renameThreads"; + /** + * Add a jitter to the startup of tasks for testing concurrency code. + * Use {@code brooklyn.executionManager.jitterThreads.maxDelay} to tune the maximum time task + * startup gets delayed in milliseconds. The actual time will be a random value between [0, maxDelay). + * Default is 200 milliseconds. + */ + public static final String FEATURE_JITTER_THREADS = "brooklyn.executionManager.jitterThreads"; + /** * When rebinding to state created from very old versions, the catalogItemId properties will be missing which * results in errors when OSGi bundles are used. When enabled the code tries to infer the catalogItemId from @@ -149,6 +157,7 @@ static void setDefaults() { setDefault(FEATURE_DEFAULT_STANDBY_IS_HOT_PROPERTY, false); setDefault(FEATURE_USE_BROOKLYN_LIVE_OBJECTS_DATAGRID_STORAGE, false); setDefault(FEATURE_RENAME_THREADS, false); + setDefault(FEATURE_JITTER_THREADS, false); setDefault(FEATURE_BACKWARDS_COMPATIBILITY_INFER_CATALOG_ITEM_ON_REBIND, true); setDefault(FEATURE_AUTO_FIX_CATALOG_REF_ON_REBIND, false); setDefault(FEATURE_SSH_ASYNC_EXEC, false); diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java index 8f43827f3d..70672f9af2 100644 --- a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java +++ b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java @@ -19,7 +19,6 @@ package org.apache.brooklyn.util.core.task; import static com.google.common.base.Preconditions.checkNotNull; -import groovy.lang.Closure; import java.util.Collection; import java.util.Collections; @@ -41,6 +40,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -70,12 +70,13 @@ import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; -import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.common.util.concurrent.ExecutionList; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import groovy.lang.Closure; + /** * Manages the execution of atomic tasks and scheduled (recurring) tasks, * including setting tags and invoking callbacks. @@ -84,7 +85,11 @@ public class BasicExecutionManager implements ExecutionManager { private static final Logger log = LoggerFactory.getLogger(BasicExecutionManager.class); private static final boolean RENAME_THREADS = BrooklynFeatureEnablement.isEnabled(BrooklynFeatureEnablement.FEATURE_RENAME_THREADS); - + private static final String JITTER_THREADS_MAX_DELAY_PROPERTY = BrooklynFeatureEnablement.FEATURE_JITTER_THREADS + ".maxDelay"; + + private boolean jitterThreads = BrooklynFeatureEnablement.isEnabled(BrooklynFeatureEnablement.FEATURE_JITTER_THREADS); + private int jitterThreadsMaxDelay = Integer.getInteger(JITTER_THREADS_MAX_DELAY_PROPERTY, 200); + private static class PerThreadCurrentTaskHolder { public static final ThreadLocal> perThreadCurrentTask = new ThreadLocal>(); } @@ -146,6 +151,10 @@ public BasicExecutionManager(String contextid) { daemonThreadFactory); delayedRunner = new ScheduledThreadPoolExecutor(1, daemonThreadFactory); + + if (jitterThreads) { + log.info("Task startup jittering enabled with a maximum of " + jitterThreadsMaxDelay + " delay."); + } } private final static class UncaughtExceptionHandlerImplementation implements Thread.UncaughtExceptionHandler { @@ -757,9 +766,23 @@ protected void internalBeforeStart(Map flags, Task task) { PerThreadCurrentTaskHolder.perThreadCurrentTask.set(task); ((TaskInternal)task).setStartTimeUtc(System.currentTimeMillis()); } + + jitterThreadStart(task); + invokeCallback(flags.get("newTaskStartCallback"), task); } + private void jitterThreadStart(Task task) { + if (jitterThreads) { + try { + Thread.sleep(ThreadLocalRandom.current().nextInt(jitterThreadsMaxDelay)); + } catch (InterruptedException e) { + log.warn("Task " + task + " got cancelled before starting because of jitter."); + throw Exceptions.propagate(e); + } + } + } + @SuppressWarnings({ "unchecked", "rawtypes" }) // not ideal, such loose typing on the callback -- should prefer Function // but at least it's package-private @@ -890,4 +913,18 @@ public ConcurrentMap getSchedulerByTag() { return schedulerByTag; } + public void setJitterThreads(boolean jitterThreads) { + this.jitterThreads = jitterThreads; + if (jitterThreads) { + log.info("Task startup jittering enabled with a maximum of " + jitterThreadsMaxDelay + " delay."); + } else { + log.info("Disabled task startup jittering"); + } + } + + public void setJitterThreadsMaxDelay(int jitterThreadsMaxDelay) { + this.jitterThreadsMaxDelay = jitterThreadsMaxDelay; + log.info("Setting task startup jittering maximum delay to " + jitterThreadsMaxDelay); + } + } From 5fc8344c8640ac641dadaa9482cfe2314db0bbb5 Mon Sep 17 00:00:00 2001 From: Svetoslav Neykov Date: Sun, 2 Oct 2016 15:26:49 +0200 Subject: [PATCH 2/4] Logging - link thread name to running task --- .../apache/brooklyn/util/core/task/BasicExecutionManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java index 70672f9af2..8bb7705bf7 100644 --- a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java +++ b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java @@ -754,7 +754,7 @@ protected void internalBeforeStart(Map flags, Task task) { activeTaskCount.incrementAndGet(); //set thread _before_ start time, so we won't get a null thread when there is a start-time - if (log.isTraceEnabled()) log.trace(""+this+" beforeStart, task: "+task); + if (log.isTraceEnabled()) log.trace(""+this+" beforeStart, task: "+task + " running on thread " + Thread.currentThread().getName()); if (!task.isCancelled()) { Thread thread = Thread.currentThread(); ((TaskInternal)task).setThread(thread); From 05622be2cd4ad252d1e0be6bdc086fe81563ff9d Mon Sep 17 00:00:00 2001 From: Svetoslav Neykov Date: Sun, 2 Oct 2016 12:52:13 +0300 Subject: [PATCH 3/4] Remove duplicate thread renaming The code in beforeStartAtomicTask overrides the value set here. Keeping the other one as it's called for scheduled tasks as well. --- .../brooklyn/util/core/task/BasicExecutionManager.java | 9 --------- 1 file changed, 9 deletions(-) diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java index 8bb7705bf7..1bf5ecaf9b 100644 --- a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java +++ b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java @@ -516,13 +516,7 @@ public T call() { try { T result = null; Throwable error = null; - String oldThreadName = Thread.currentThread().getName(); try { - if (RENAME_THREADS) { - String newThreadName = oldThreadName+"-"+task.getDisplayName()+ - "["+task.getId().substring(0, 8)+"]"; - Thread.currentThread().setName(newThreadName); - } beforeStartAtomicTask(flags, task); if (!task.isCancelled()) { result = ((TaskInternal)task).getJob().call(); @@ -530,9 +524,6 @@ public T call() { } catch(Throwable e) { error = e; } finally { - if (RENAME_THREADS) { - Thread.currentThread().setName(oldThreadName); - } afterEndAtomicTask(flags, task); } if (error!=null) { From 670f83d19ce90ab94c5446506ebeb81c23cd2a56 Mon Sep 17 00:00:00 2001 From: Svetoslav Neykov Date: Sun, 2 Oct 2016 15:19:46 +0200 Subject: [PATCH 4/4] Test Transformer reliability when resolving DSL --- .../TransformerEnricherWithDslTest.java | 79 +++++++++++++++++++ 1 file changed, 79 insertions(+) create mode 100644 camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/enricher/TransformerEnricherWithDslTest.java diff --git a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/enricher/TransformerEnricherWithDslTest.java b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/enricher/TransformerEnricherWithDslTest.java new file mode 100644 index 0000000000..2f4465ad44 --- /dev/null +++ b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/enricher/TransformerEnricherWithDslTest.java @@ -0,0 +1,79 @@ +/* + * Copyright 2016 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.brooklyn.camp.brooklyn.enricher; + +import static org.testng.Assert.assertEquals; + +import org.apache.brooklyn.api.sensor.AttributeSensor; +import org.apache.brooklyn.api.sensor.EnricherSpec; +import org.apache.brooklyn.camp.brooklyn.spi.dsl.methods.BrooklynDslCommon; +import org.apache.brooklyn.core.entity.EntityAsserts; +import org.apache.brooklyn.core.sensor.Sensors; +import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport; +import org.apache.brooklyn.enricher.stock.Transformer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.Test; + +public class TransformerEnricherWithDslTest extends BrooklynAppUnitTestSupport { + private static final Logger LOG = LoggerFactory.getLogger(TransformerEnricherWithDslTest.class); + + int START_PORT = 10000; + + @Test(groups="Broken") + // See https://issues.apache.org/jira/browse/BROOKLYN-356 + public void testTransformerResolvesResolvableValues() { + testTransformerResolvesResolvableValues(START_PORT, 200); + } + + @Test(groups={"Integration", "Broken"}, invocationCount=10) + // See https://issues.apache.org/jira/browse/BROOKLYN-356 + public void testTransformerResolvesResolvableValuesIntegration() { + LOG.info("Starting 1000 iterations"); + testTransformerResolvesResolvableValues(START_PORT, 1000); + } + + private void testTransformerResolvesResolvableValues(int portStart, int portCount) { + // Note: The test gets progressively slower with iterations, probably due to the GC triggering much more frequently. + // There's no memory leak, but doesn't seem right to be putting so much pressure on the GC with such a simple test. + AttributeSensor sourceSensor = Sensors.newIntegerSensor("port"); + AttributeSensor targetSensor = Sensors.newStringSensor("port.transformed"); + app.enrichers().add(EnricherSpec.create(Transformer.class) + .configure(Transformer.SOURCE_SENSOR, sourceSensor) + .configure(Transformer.TARGET_SENSOR, targetSensor) + .configure(Transformer.TARGET_VALUE, + // Can only use the inner-most sensor, but including the + // wrapping formatStrings amplifies the resolving effort, making + // a bug more probable to manifest. + BrooklynDslCommon.formatString("%s", + BrooklynDslCommon.formatString("%d", + BrooklynDslCommon.attributeWhenReady("port"))))); + + int failures = 0; + for (int port = portStart; port < portStart + portCount; port++) { + app.sensors().set(sourceSensor, port); + try { + EntityAsserts.assertAttributeEqualsEventually(app, targetSensor, Integer.toString(port)); + } catch (Exception e) { + failures++; + LOG.warn("Assertion failed, port=" + port + ", transformed sensor is " + app.sensors().get(targetSensor), e); + } + } + + assertEquals(failures, 0, failures + " assertion failures while transforming sensor; see logs for detailed errors"); + } + +}