From 5b9f89653ee5979665bff445d38750e1d0f234ae Mon Sep 17 00:00:00 2001 From: Sam Corbett Date: Wed, 16 Nov 2016 15:36:45 +0000 Subject: [PATCH 1/2] Readability --- .../brooklyn/core/effector/Effectors.java | 8 ++++-- .../entity/group/DynamicClusterImpl.java | 3 ++- .../brooklyn/util/core/task/ParallelTask.java | 27 ++++++++++++++----- .../util/core/task/SequentialTask.java | 27 +++++++++++++------ 4 files changed, 47 insertions(+), 18 deletions(-) diff --git a/core/src/main/java/org/apache/brooklyn/core/effector/Effectors.java b/core/src/main/java/org/apache/brooklyn/core/effector/Effectors.java index 9b10d1d488..6240240418 100644 --- a/core/src/main/java/org/apache/brooklyn/core/effector/Effectors.java +++ b/core/src/main/java/org/apache/brooklyn/core/effector/Effectors.java @@ -183,14 +183,18 @@ public static TaskAdaptable> invocation(Effector eff, Map params public static TaskAdaptable> invocationParallel(Effector eff, Map params, Iterable entities) { List> tasks = new ArrayList>(); for (Entity e: entities) tasks.add(invocation(e, eff, params)); - return Tasks.parallel("invoking "+eff+" on "+tasks.size()+" node"+(Strings.s(tasks.size())), tasks.toArray(new TaskAdaptable[tasks.size()])); + return Tasks.parallel( + "invoking " + eff + " on " + tasks.size() + " node" + (Strings.s(tasks.size())), + tasks.toArray(new TaskAdaptable[tasks.size()])); } /** as {@link #invocationParallel(Effector, Map, Iterable)} but executing sequentially */ public static TaskAdaptable> invocationSequential(Effector eff, Map params, Iterable entities) { List> tasks = new ArrayList>(); for (Entity e: entities) tasks.add(invocation(e, eff, params)); - return Tasks.sequential("invoking "+eff+" on "+tasks.size()+" node"+(Strings.s(tasks.size())), tasks.toArray(new TaskAdaptable[tasks.size()])); + return Tasks.sequential( + "invoking " + eff + " on " + tasks.size() + " node" + (Strings.s(tasks.size())), + tasks.toArray(new TaskAdaptable[tasks.size()])); } /** returns an unsubmitted task which will invoke the given effector on the given entities diff --git a/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java b/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java index eedefa7dbf..8725b12d4a 100644 --- a/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java +++ b/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java @@ -787,7 +787,8 @@ protected Collection shrink(int delta) { Collection removedEntities = pickAndRemoveMembers(delta * -1); // FIXME symmetry in order of added as child, managed, started, and added to group - Task invoke = Entities.invokeEffector(this, (Iterable)(Iterable)Iterables.filter(removedEntities, Startable.class), Startable.STOP, Collections.emptyMap()); + final Iterable removedStartables = (Iterable) (Iterable) Iterables.filter(removedEntities, Startable.class); + Task invoke = Entities.invokeEffector(this, removedStartables, Startable.STOP, Collections.emptyMap()); try { invoke.get(); return removedEntities; diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/ParallelTask.java b/core/src/main/java/org/apache/brooklyn/util/core/task/ParallelTask.java index 82ec564552..a2425f7428 100644 --- a/core/src/main/java/org/apache/brooklyn/util/core/task/ParallelTask.java +++ b/core/src/main/java/org/apache/brooklyn/util/core/task/ParallelTask.java @@ -38,13 +38,26 @@ * order they were passed as arguments. */ public class ParallelTask extends CompoundTask { - public ParallelTask(Object... tasks) { super(tasks); } - - public ParallelTask(Map flags, Collection tasks) { super(flags, tasks); } - public ParallelTask(Collection tasks) { super(tasks); } - - public ParallelTask(Map flags, Iterable tasks) { super(flags, ImmutableList.copyOf(tasks)); } - public ParallelTask(Iterable tasks) { super(ImmutableList.copyOf(tasks)); } + + public ParallelTask(Object... tasks) { + super(tasks); + } + + public ParallelTask(Map flags, Collection tasks) { + super(flags, tasks); + } + + public ParallelTask(Collection tasks) { + super(tasks); + } + + public ParallelTask(Map flags, Iterable tasks) { + super(flags, ImmutableList.copyOf(tasks)); + } + + public ParallelTask(Iterable tasks) { + super(ImmutableList.copyOf(tasks)); + } @Override protected List runJobs() throws InterruptedException, ExecutionException { diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/SequentialTask.java b/core/src/main/java/org/apache/brooklyn/util/core/task/SequentialTask.java index 9bd40af030..1a9fbac160 100644 --- a/core/src/main/java/org/apache/brooklyn/util/core/task/SequentialTask.java +++ b/core/src/main/java/org/apache/brooklyn/util/core/task/SequentialTask.java @@ -28,19 +28,30 @@ import com.google.common.collect.ImmutableList; - /** runs tasks in order, waiting for one to finish before starting the next; return value here is TBD; * (currently is all the return values of individual tasks, but we * might want some pipeline support and eventually only to return final value...) */ public class SequentialTask extends CompoundTask { - public SequentialTask(Object... tasks) { super(tasks); } - - public SequentialTask(Map flags, Collection tasks) { super(flags, tasks); } - public SequentialTask(Collection tasks) { super(tasks); } - - public SequentialTask(Map flags, Iterable tasks) { super(flags, ImmutableList.copyOf(tasks)); } - public SequentialTask(Iterable tasks) { super(ImmutableList.copyOf(tasks)); } + public SequentialTask(Object... tasks) { + super(tasks); + } + + public SequentialTask(Map flags, Collection tasks) { + super(flags, tasks); + } + + public SequentialTask(Collection tasks) { + super(tasks); + } + + public SequentialTask(Map flags, Iterable tasks) { + super(flags, ImmutableList.copyOf(tasks)); + } + + public SequentialTask(Iterable tasks) { + super(ImmutableList.copyOf(tasks)); + } protected List runJobs() throws InterruptedException, ExecutionException { setBlockingDetails("Executing "+ From 4e30074ca9b09a49c6a4b052d6f28d05608b36eb Mon Sep 17 00:00:00 2001 From: Sam Corbett Date: Tue, 29 Nov 2016 16:11:12 +0000 Subject: [PATCH 2/2] Adds maxConcurrentChildCommands parameter to DynamicCluster The option configures the maximum number of simultaneous Startable effector invocations that will be made on members of the group. --- .../apache/brooklyn/core/entity/Entities.java | 2 +- .../brooklyn/entity/group/DynamicCluster.java | 11 +- .../entity/group/DynamicClusterImpl.java | 153 +++++++++++++++++- .../group/DynamicClusterRebindTest.java | 54 +++++++ .../entity/group/DynamicClusterTest.java | 132 +++++++++++++++ 5 files changed, 343 insertions(+), 9 deletions(-) create mode 100644 core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterRebindTest.java diff --git a/core/src/main/java/org/apache/brooklyn/core/entity/Entities.java b/core/src/main/java/org/apache/brooklyn/core/entity/Entities.java index 282165211e..69670ecd4c 100644 --- a/core/src/main/java/org/apache/brooklyn/core/entity/Entities.java +++ b/core/src/main/java/org/apache/brooklyn/core/entity/Entities.java @@ -625,7 +625,7 @@ public static boolean isDescendant(Entity ancestor, Entity potentialDescendant) /** * Return all descendants of given entity matching the given predicate and optionally the entity itself. * - * @see {@link EntityPredicates} for useful second arguments. + * @see EntityPredicates */ @SuppressWarnings("unused") public static Iterable descendants(Entity root, Predicate matching, boolean includeSelf) { diff --git a/core/src/main/java/org/apache/brooklyn/entity/group/DynamicCluster.java b/core/src/main/java/org/apache/brooklyn/entity/group/DynamicCluster.java index f2112e8f7e..3f62f823e8 100644 --- a/core/src/main/java/org/apache/brooklyn/entity/group/DynamicCluster.java +++ b/core/src/main/java/org/apache/brooklyn/entity/group/DynamicCluster.java @@ -103,7 +103,7 @@ interface ZoneFailureDetector { "dynamiccluster.restartMode", "How this cluster should handle restarts; " + "by default it is disallowed, but this key can specify a different mode. " - + "Modes supported by dynamic cluster are 'off', 'sequqential', or 'parallel'. " + + "Modes supported by dynamic cluster are 'off', 'sequential', or 'parallel'. " + "However subclasses can define their own modes or may ignore this.", null); @SetFromFlag("quarantineFailedEntities") @@ -183,6 +183,15 @@ interface ZoneFailureDetector { ConfigKey CLUSTER_MEMBER_ID = ConfigKeys.newIntegerConfigKey( "cluster.member.id", "The unique ID number (sequential) of a member of a cluster"); + @Beta + @SetFromFlag("maxConcurrentChildCommands") + ConfigKey MAX_CONCURRENT_CHILD_COMMANDS = ConfigKeys.builder(Integer.class) + .name("dynamiccluster.maxConcurrentChildCommands") + .description("[Beta] The maximum number of effector invocations that will be made on children at once " + + "(e.g. start, stop, restart). Any value null or less than or equal to zero means invocations are unbounded") + .defaultValue(0) + .build(); + AttributeSensor> SUB_LOCATIONS = new BasicAttributeSensor>( new TypeToken>() {}, "dynamiccluster.subLocations", "Locations for each availability zone to use"); diff --git a/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java b/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java index 8725b12d4a..4ed0ac0fc4 100644 --- a/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java +++ b/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java @@ -30,10 +30,12 @@ import java.util.NoSuchElementException; import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; - import javax.annotation.Nullable; +import org.apache.brooklyn.api.effector.Effector; import org.apache.brooklyn.api.entity.Entity; import org.apache.brooklyn.api.entity.EntitySpec; import org.apache.brooklyn.api.entity.Group; @@ -98,6 +100,8 @@ import com.google.common.collect.Multimap; import com.google.common.collect.Sets; import com.google.common.reflect.TypeToken; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; /** * A cluster of entities that can dynamically increase or decrease the number of entities. @@ -108,6 +112,12 @@ public class DynamicClusterImpl extends AbstractGroupImpl implements DynamicClus private static final AttributeSensor> NEXT_CLUSTER_MEMBER_ID = Sensors.newSensor(new TypeToken>() {}, "next.cluster.member.id", "Returns the ID number of the next member to be added"); + /** + * Controls the maximum number of effector invocations the cluster will make on members at once. + * Only used if {@link #MAX_CONCURRENT_CHILD_COMMANDS} is configured. + */ + private transient Semaphore childTaskSemaphore; + private volatile FunctionFeed clusterOneAndAllMembersUp; // TODO better mechanism for arbitrary class name to instance type coercion @@ -212,9 +222,16 @@ public DynamicClusterImpl() { public void init() { super.init(); initialiseMemberId(); + initialiseTaskPermitSemaphore(); connectAllMembersUp(); } + @Override + public void rebind() { + super.rebind(); + initialiseTaskPermitSemaphore(); + } + private void initialiseMemberId() { synchronized (mutex) { if (sensors().get(NEXT_CLUSTER_MEMBER_ID) == null) { @@ -223,6 +240,17 @@ private void initialiseMemberId() { } } + private void initialiseTaskPermitSemaphore() { + synchronized (mutex) { + if (getChildTaskSemaphore() == null) { + Integer maxChildTasks = config().get(MAX_CONCURRENT_CHILD_COMMANDS); + if (maxChildTasks != null && maxChildTasks > 0) { + childTaskSemaphore = new Semaphore(maxChildTasks); + } + } + } + } + private void connectAllMembersUp() { clusterOneAndAllMembersUp = FunctionFeed.builder() .entity(this) @@ -551,8 +579,9 @@ public void restart() { Iterables.filter(getChildren(), Predicates.and(Predicates.instanceOf(Startable.class), EntityPredicates.isManaged())))); } else if ("parallel".equalsIgnoreCase(mode)) { ServiceStateLogic.setExpectedState(this, Lifecycle.STARTING); - DynamicTasks.queue(Effectors.invocationParallel(Startable.RESTART, null, - Iterables.filter(getChildren(), Predicates.and(Predicates.instanceOf(Startable.class), EntityPredicates.isManaged())))); + for (Entity member : Iterables.filter(getChildren(), Predicates.and(Predicates.instanceOf(Startable.class), EntityPredicates.isManaged()))) { + DynamicTasks.queue(newThrottledEffectorTask(member, Startable.RESTART, Collections.emptyMap())); + } } else { throw new IllegalArgumentException("Unknown "+RESTART_MODE.getName()+" '"+mode+"'"); } @@ -788,7 +817,12 @@ protected Collection shrink(int delta) { // FIXME symmetry in order of added as child, managed, started, and added to group final Iterable removedStartables = (Iterable) (Iterable) Iterables.filter(removedEntities, Startable.class); - Task invoke = Entities.invokeEffector(this, removedStartables, Startable.STOP, Collections.emptyMap()); + ImmutableList.Builder> tasks = ImmutableList.builder(); + for (Entity member : removedStartables) { + tasks.add(newThrottledEffectorTask(member, Startable.STOP, Collections.emptyMap())); + } + Task invoke = Tasks.parallel(tasks.build()); + DynamicTasks.queueIfPossible(invoke).orSubmitAsync(); try { invoke.get(); return removedEntities; @@ -826,8 +860,11 @@ protected ReferenceWithError> addInEachLocation(Iterable args = ImmutableMap.of("locations", MutableList.builder().addIfNotNull(loc).buildImmutable()); - Task task = Effectors.invocation(entity, Startable.START, args).asTask(); + Task task = newThrottledEffectorTask(entity, Startable.START, args, privileged); tasks.put(entity, task); } } @@ -1041,14 +1078,116 @@ protected void discardNode(Entity entity) { protected void stopAndRemoveNode(Entity member) { removeMember(member); - try { if (member instanceof Startable) { - Task task = member.invoke(Startable.STOP, Collections.emptyMap()); + Task task = newThrottledEffectorTask(member, Startable.STOP, Collections.emptyMap()); + DynamicTasks.queueIfPossible(task).orSubmitAsync(); task.getUnchecked(); } } finally { Entities.unmanage(member); } } + + @Nullable + protected Semaphore getChildTaskSemaphore() { + return childTaskSemaphore; + } + + /** + * @return An unprivileged effector task. + * @see #newThrottledEffectorTask(Entity, Effector, Map, boolean) + */ + protected Task newThrottledEffectorTask(Entity target, Effector effector, Map arguments) { + return newThrottledEffectorTask(target, effector, arguments, false); + } + + /** + * Creates tasks that obtain permits from {@link #childTaskSemaphore} before invoking effector + * on target. Permits are released in a {@link ListenableFuture#addListener listener}. No + * permits are obtained if {@link #childTaskSemaphore} is null. + * @param target Entity to invoke effector on + * @param effector Effector to invoke on target + * @param arguments Effector arguments + * @param isPrivileged If true the method obtains a permit from {@link #childTaskSemaphore} + * immediately and returns the effector invocation task, otherwise it + * returns a task that sequentially obtains a permit then runs the effector. + * @return An unsubmitted task. + */ + protected Task newThrottledEffectorTask(Entity target, Effector effector, Map arguments, boolean isPrivileged) { + final Task toSubmit; + final Task effectorTask = Effectors.invocation(target, effector, arguments).asTask(); + if (getChildTaskSemaphore() != null) { + // permitObtained communicates to the release task whether the permit should really be released + // or not. ObtainPermit sets it to true when a permit is acquired. + final AtomicBoolean permitObtained = new AtomicBoolean(); + final String description = "Waiting for permit to run " + effector.getName() + " on " + target; + final Runnable obtain = new ObtainPermit(getChildTaskSemaphore(), description, permitObtained); + // Acquire the permit now for the privileged task and just queue the effector invocation. + // If it's unprivileged then queue a task to obtain a permit first. + if (isPrivileged) { + obtain.run(); + toSubmit = effectorTask; + } else { + Task obtainMutex = Tasks.builder() + .description(description) + .body(new ObtainPermit(getChildTaskSemaphore(), description, permitObtained)) + .build(); + toSubmit = Tasks.sequential( + "Waiting for permit then running " + effector.getName() + " on " + target, + obtainMutex, effectorTask); + } + toSubmit.addListener(new ReleasePermit(getChildTaskSemaphore(), permitObtained), MoreExecutors.sameThreadExecutor()); + } else { + toSubmit = effectorTask; + } + return toSubmit; + } + + private static class ObtainPermit implements Runnable { + private final Semaphore permit; + private final String description; + private final AtomicBoolean hasObtainedPermit; + + private ObtainPermit(Semaphore permit, String description, AtomicBoolean hasObtainedPermit) { + this.permit = permit; + this.description = description; + this.hasObtainedPermit = hasObtainedPermit; + } + + @Override + public void run() { + String oldDetails = Tasks.setBlockingDetails(description); + LOG.debug("{} acquiring permit from {}", this, permit); + try { + permit.acquire(); + hasObtainedPermit.set(true); + } catch (InterruptedException e) { + throw Exceptions.propagate(e); + } finally { + Tasks.setBlockingDetails(oldDetails); + } + } + } + + private static class ReleasePermit implements Runnable { + private final Semaphore permit; + private final AtomicBoolean wasPermitObtained; + + private ReleasePermit(Semaphore permit, AtomicBoolean wasPermitObtained) { + this.permit = permit; + this.wasPermitObtained = wasPermitObtained; + } + + @Override + public void run() { + if (wasPermitObtained.get()) { + LOG.debug("{} releasing permit from {}", this, permit); + permit.release(); + } else { + LOG.debug("{} not releasing a permit from {} because it appears one was never obtained", this, permit); + } + } + } + } diff --git a/core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterRebindTest.java b/core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterRebindTest.java new file mode 100644 index 0000000000..bbf3a2ab90 --- /dev/null +++ b/core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterRebindTest.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.brooklyn.entity.group; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.brooklyn.api.entity.EntitySpec; +import org.apache.brooklyn.core.entity.Attributes; +import org.apache.brooklyn.core.entity.Entities; +import org.apache.brooklyn.core.entity.EntityAsserts; +import org.apache.brooklyn.core.entity.lifecycle.Lifecycle; +import org.apache.brooklyn.core.mgmt.rebind.RebindOptions; +import org.apache.brooklyn.core.mgmt.rebind.RebindTestFixtureWithApp; +import org.testng.annotations.Test; + +import com.google.common.collect.ImmutableList; + +public class DynamicClusterRebindTest extends RebindTestFixtureWithApp { + + @Test + public void testThrottleAppliesAfterRebind() throws Exception { + DynamicCluster cluster = origApp.createAndManageChild(EntitySpec.create(DynamicCluster.class) + .configure(DynamicCluster.MAX_CONCURRENT_CHILD_COMMANDS, 1) + .configure(DynamicCluster.INITIAL_SIZE, 1) + .configure(DynamicCluster.MEMBER_SPEC, EntitySpec.create(DynamicClusterTest.ThrowOnAsyncStartEntity.class)) + .configure(DynamicClusterTest.ThrowOnAsyncStartEntity.COUNTER, new AtomicInteger())); + app().start(ImmutableList.of(origApp.newLocalhostProvisioningLocation())); + EntityAsserts.assertAttributeEquals(cluster, DynamicCluster.GROUP_SIZE, 1); + + rebind(RebindOptions.create().terminateOrigManagementContext(true)); + cluster = Entities.descendants(app(), DynamicCluster.class).iterator().next(); + cluster.resize(10); + EntityAsserts.assertAttributeEqualsEventually(cluster, DynamicCluster.GROUP_SIZE, 10); + EntityAsserts.assertAttributeEquals(cluster, Attributes.SERVICE_STATE_ACTUAL, Lifecycle.RUNNING); + } + +} \ No newline at end of file diff --git a/core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterTest.java b/core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterTest.java index 36d3c39745..c3e7d7f6ce 100644 --- a/core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterTest.java +++ b/core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterTest.java @@ -32,6 +32,7 @@ import java.util.NoSuchElementException; import java.util.Random; import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; @@ -41,12 +42,16 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import org.apache.brooklyn.api.effector.Effector; import org.apache.brooklyn.api.entity.Entity; import org.apache.brooklyn.api.entity.EntitySpec; +import org.apache.brooklyn.api.entity.ImplementedBy; import org.apache.brooklyn.api.location.Location; import org.apache.brooklyn.api.location.NoMachinesAvailableException; import org.apache.brooklyn.api.mgmt.Task; import org.apache.brooklyn.api.sensor.SensorEvent; +import org.apache.brooklyn.config.ConfigKey; +import org.apache.brooklyn.core.config.ConfigKeys; import org.apache.brooklyn.core.entity.Attributes; import org.apache.brooklyn.core.entity.Entities; import org.apache.brooklyn.core.entity.EntityAsserts; @@ -59,6 +64,7 @@ import org.apache.brooklyn.core.entity.trait.Resizable; import org.apache.brooklyn.core.location.SimulatedLocation; import org.apache.brooklyn.core.mgmt.BrooklynTaskTags; +import org.apache.brooklyn.core.sensor.DependentConfiguration; import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport; import org.apache.brooklyn.core.test.entity.TestEntity; import org.apache.brooklyn.core.test.entity.TestEntityImpl; @@ -67,10 +73,15 @@ import org.apache.brooklyn.util.collections.MutableMap; import org.apache.brooklyn.util.collections.MutableSet; import org.apache.brooklyn.util.collections.QuorumCheck.QuorumChecks; +import org.apache.brooklyn.util.core.task.DynamicTasks; +import org.apache.brooklyn.util.core.task.Tasks; import org.apache.brooklyn.util.exceptions.Exceptions; import org.apache.brooklyn.util.time.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.testng.Assert; import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; import com.google.common.base.Function; @@ -1225,4 +1236,125 @@ private void assertFirstAndNonFirstCounts(Collection members, int expect assertEquals(found.size(), expectedNonFirstCount); } + @DataProvider + public Object[][] maxConcurrentCommandsTestProvider() { + return new Object[][]{{1}, {2}, {3}}; + } + + @Test(dataProvider = "maxConcurrentCommandsTestProvider") + public void testEntitiesStartAndStopSequentiallyWhenMaxConcurrentCommandsIsOne(int maxConcurrentCommands) { + EntitySpec memberSpec = EntitySpec.create(ThrowOnAsyncStartEntity.class) + .configure(ThrowOnAsyncStartEntity.MAX_CONCURRENCY, maxConcurrentCommands) + .configure(ThrowOnAsyncStartEntity.COUNTER, new AtomicInteger()); + DynamicCluster cluster = app.createAndManageChild(EntitySpec.create(DynamicCluster.class) + .configure(DynamicCluster.MAX_CONCURRENT_CHILD_COMMANDS, maxConcurrentCommands) + .configure(DynamicCluster.INITIAL_SIZE, 10) + .configure(DynamicCluster.MEMBER_SPEC, memberSpec)); + app.start(ImmutableList.of(app.newSimulatedLocation())); + assertEquals(cluster.sensors().get(Attributes.SERVICE_STATE_ACTUAL), Lifecycle.RUNNING); + } + + // Tests handling of the first member of a cluster by asserting that a group, whose + // other members wait for the first, always starts. + @Test + public void testFirstMemberInFirstBatchWhenMaxConcurrentCommandsSet() throws Exception { + final AtomicInteger counter = new AtomicInteger(); + final DynamicCluster cluster = app.createAndManageChild(EntitySpec.create(DynamicCluster.class) + .configure(DynamicCluster.MAX_CONCURRENT_CHILD_COMMANDS, 1) + .configure(DynamicCluster.INITIAL_SIZE, 3)); + + Task firstMemberUp = Tasks.builder() + .body(new Callable() { + @Override + public Boolean call() throws Exception { + Task first = DependentConfiguration.attributeWhenReady(cluster, DynamicCluster.FIRST); + DynamicTasks.queueIfPossible(first).orSubmitAsync(); + final Entity source = first.get(); + final Task booleanTask = DependentConfiguration.attributeWhenReady(source, Attributes.SERVICE_UP); + DynamicTasks.queueIfPossible(booleanTask).orSubmitAsync(); + return booleanTask.get(); + } + }) + .build(); + + EntitySpec firstMemberSpec = EntitySpec.create(ThrowOnAsyncStartEntity.class) + .configure(ThrowOnAsyncStartEntity.COUNTER, counter) + .configure(ThrowOnAsyncStartEntity.START_LATCH, true); + + EntitySpec memberSpec = EntitySpec.create(ThrowOnAsyncStartEntity.class) + .configure(ThrowOnAsyncStartEntity.COUNTER, counter) + .configure(ThrowOnAsyncStartEntity.START_LATCH, firstMemberUp); + + cluster.config().set(DynamicCluster.FIRST_MEMBER_SPEC, firstMemberSpec); + cluster.config().set(DynamicCluster.MEMBER_SPEC, memberSpec); + + // app.start blocks so in the failure case this test would block forever. + Asserts.assertReturnsEventually(new Runnable() { + public void run() { + app.start(ImmutableList.of(app.newSimulatedLocation())); + EntityAsserts.assertAttributeEqualsEventually(cluster, Attributes.SERVICE_STATE_ACTUAL, Lifecycle.RUNNING); + } + }, Asserts.DEFAULT_LONG_TIMEOUT); + } + + @Test + public void testChildCommandPermitNotReleasedWhenMemberStartTaskCancelledBeforeSubmission() { + // Tests that permits are not released when their start task is cancelled. + // Expected behaviour is: + // - permit obtained for first member. cancelled task submitted. permit released. + // - no permit obtained for second member. cancelled task submitted. no permit released. + DynamicCluster cluster = app.createAndManageChild(EntitySpec.create(CancelEffectorInvokeCluster.class) + .configure(DynamicCluster.MEMBER_SPEC, EntitySpec.create(TestEntity.class)) + .configure(DynamicCluster.INITIAL_SIZE, 2) + .configure(DynamicCluster.MAX_CONCURRENT_CHILD_COMMANDS, 1)); + final DynamicClusterImpl clusterImpl = DynamicClusterImpl.class.cast(Entities.deproxy(cluster)); + assertNotNull(clusterImpl.getChildTaskSemaphore()); + assertEquals(clusterImpl.getChildTaskSemaphore().availablePermits(), 1); + try { + app.start(ImmutableList.of(app.newSimulatedLocation())); + Asserts.shouldHaveFailedPreviously("Cluster start should have failed because the member start was cancelled"); + } catch (Exception e) { + // ignored. + } + assertEquals(clusterImpl.getChildTaskSemaphore().availablePermits(), 1); + } + + @ImplementedBy(ThrowOnAsyncStartEntityImpl.class) + public interface ThrowOnAsyncStartEntity extends TestEntity { + ConfigKey MAX_CONCURRENCY = ConfigKeys.newConfigKey(Integer.class, "concurrency", "max concurrency", 1); + ConfigKey COUNTER = ConfigKeys.newConfigKey(AtomicInteger.class, "counter"); + ConfigKey START_LATCH = ConfigKeys.newConfigKey(Boolean.class, "startlatch"); + } + + public static class ThrowOnAsyncStartEntityImpl extends TestEntityImpl implements ThrowOnAsyncStartEntity { + private static final Logger LOG = LoggerFactory.getLogger(ThrowOnAsyncStartEntityImpl.class); + @Override + public void start(Collection locs) { + int count = config().get(COUNTER).incrementAndGet(); + try { + LOG.debug("{} starting (first={})", new Object[]{this, sensors().get(AbstractGroup.FIRST_MEMBER)}); + config().get(START_LATCH); + // Throw if more than one entity is starting at the same time as this. + assertTrue(count <= config().get(MAX_CONCURRENCY), "expected " + count + " <= " + config().get(MAX_CONCURRENCY)); + super.start(locs); + } finally { + config().get(COUNTER).decrementAndGet(); + } + } + } + + /** Used in {@link #testChildCommandPermitNotReleasedWhenMemberStartTaskCancelledBeforeSubmission}. */ + @ImplementedBy(CancelEffectorInvokeClusterImpl.class) + public interface CancelEffectorInvokeCluster extends DynamicCluster {} + + /** Overrides {@link DynamicClusterImpl#newThrottledEffectorTask} to cancel each task before it's submitted. */ + public static class CancelEffectorInvokeClusterImpl extends DynamicClusterImpl implements CancelEffectorInvokeCluster { + @Override + protected Task newThrottledEffectorTask(Entity target, Effector effector, Map arguments, boolean isPrivileged) { + Task unsubmitted = super.newThrottledEffectorTask(target, effector, arguments, isPrivileged); + unsubmitted.cancel(true); + return unsubmitted; + } + } + }