From cb3b19f491c52a6b8542e6c0e4517d815395753a Mon Sep 17 00:00:00 2001 From: Aled Sage Date: Thu, 3 Aug 2017 14:37:48 +0100 Subject: [PATCH 1/2] Adds DynamicCluster.START_TIMEOUT MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Similar to SoftwareProcess, this allows a dynamic cluster to wait for service.isUp before returning from `start`. It defaults to not waiting (for backwards compatibility). Previously, it would return as soon as all the initial member’s start() effectors had finished executing. --- .../brooklyn/entity/group/DynamicCluster.java | 8 + .../entity/group/DynamicClusterImpl.java | 23 ++- .../entity/group/DynamicClusterTest.java | 140 +++++++++++++++++- 3 files changed, 166 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/org/apache/brooklyn/entity/group/DynamicCluster.java b/core/src/main/java/org/apache/brooklyn/entity/group/DynamicCluster.java index c66b2a8fc5..ff4163f924 100644 --- a/core/src/main/java/org/apache/brooklyn/entity/group/DynamicCluster.java +++ b/core/src/main/java/org/apache/brooklyn/entity/group/DynamicCluster.java @@ -18,6 +18,8 @@ */ package org.apache.brooklyn.entity.group; +import static org.apache.brooklyn.core.config.ConfigKeys.newConfigKey; + import java.util.Collection; import java.util.List; import java.util.Map; @@ -175,6 +177,12 @@ interface ZoneFailureDetector { ConfigKey CLUSTER_MEMBER_ID = ConfigKeys.newIntegerConfigKey( "cluster.member.id", "The unique ID number (sequential) of a member of a cluster"); + ConfigKey START_TIMEOUT = newConfigKey( + Duration.class, + "start.timeout", + "Time to wait (after members' start() effectors return) for SERVICE_UP before failing (default is not to wait)", + null); + @Beta @SetFromFlag("maxConcurrentChildCommands") ConfigKey MAX_CONCURRENT_CHILD_COMMANDS = ConfigKeys.builder(Integer.class) diff --git a/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java b/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java index 0b2ec02078..a1f2b164b0 100644 --- a/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java +++ b/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java @@ -33,6 +33,7 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; + import javax.annotation.Nullable; import org.apache.brooklyn.api.effector.Effector; @@ -408,14 +409,19 @@ public void start(Collection locsO) { try { doStart(); DynamicTasks.waitForLast(); - ServiceStateLogic.setExpectedState(this, Lifecycle.RUNNING); - } catch (Exception e) { ServiceProblemsLogic.updateProblemsIndicator(this, START, "start failed with error: "+e); ServiceStateLogic.setExpectedStateRunningWithErrors(this); - throw Exceptions.propagate(e); } + + // Don't set problem-indicator if it's just our waitForServiceUp that fails; + // we want to be able to recover if the indicator is subsequently cleared. + try { + waitForServiceUp(); + } finally { + ServiceStateLogic.setExpectedState(this, Lifecycle.RUNNING); + } } protected void doStart() { @@ -485,6 +491,17 @@ protected void doStart() { } } + protected void waitForServiceUp() { + Duration timeout = getConfig(START_TIMEOUT); + if (timeout != null) { + waitForServiceUp(timeout); + } + } + + protected void waitForServiceUp(Duration duration) { + Entities.waitForServiceUp(this, duration); + } + protected List findSubLocations(Location loc) { if (!loc.hasExtension(AvailabilityZoneExtension.class)) { throw new IllegalStateException("Availability zone extension not supported for location " + loc); diff --git a/core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterTest.java b/core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterTest.java index f4a303db8e..54b5731789 100644 --- a/core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterTest.java +++ b/core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterTest.java @@ -62,11 +62,11 @@ import org.apache.brooklyn.core.entity.trait.Changeable; import org.apache.brooklyn.core.entity.trait.FailingEntity; import org.apache.brooklyn.core.entity.trait.Resizable; +import org.apache.brooklyn.core.entity.trait.Startable; import org.apache.brooklyn.core.location.SimulatedLocation; import org.apache.brooklyn.core.mgmt.BrooklynTaskTags; import org.apache.brooklyn.core.sensor.DependentConfiguration; import org.apache.brooklyn.core.sensor.Sensors; -import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport; import org.apache.brooklyn.core.test.entity.BlockingEntity; import org.apache.brooklyn.core.test.entity.TestEntity; import org.apache.brooklyn.core.test.entity.TestEntityImpl; @@ -74,7 +74,6 @@ import org.apache.brooklyn.test.Asserts; import org.apache.brooklyn.util.collections.CollectionFunctionals; import org.apache.brooklyn.util.collections.MutableMap; -import org.apache.brooklyn.util.collections.MutableSet; import org.apache.brooklyn.util.collections.QuorumCheck.QuorumChecks; import org.apache.brooklyn.util.core.task.DynamicTasks; import org.apache.brooklyn.util.core.task.Tasks; @@ -90,9 +89,11 @@ import com.google.common.base.Function; import com.google.common.base.Predicate; import com.google.common.base.Predicates; +import com.google.common.base.Stopwatch; import com.google.common.base.Suppliers; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; @@ -100,6 +101,8 @@ public class DynamicClusterTest extends AbstractDynamicClusterOrFabricTest { + private static final Logger LOG = LoggerFactory.getLogger(DynamicClusterTest.class); + private static final int TIMEOUT_MS = 2000; SimulatedLocation loc; @@ -519,6 +522,139 @@ public void testInitialQuorumSizeSufficientForStartup() throws Exception { } } + // TODO Questionable whether we want this behaviour, but including the test to demonstrate + // what we currently do (and so we can preserve backwards compatibility, if we so choose). + // + // We report service.isUp=true before start() has completed, as soon as the quorum is up. + // This might be surprising for entities that use the cluster's `service.isUp` as a latch + // for when the cluster is ready to use. + // + // The default UP_QUORUM_CHECK is `atLeastOne`. + @Test + public void testReportsServiceUpAsSoonAsQuorumSize() throws Exception { + final Duration shortWait = Duration.millis(50); + final CountDownLatch latch1 = new CountDownLatch(1); + final CountDownLatch latch2 = new CountDownLatch(1); + + DynamicCluster cluster = app.createAndManageChild(EntitySpec.create(DynamicCluster.class) + .configure("initialSize", 2) + .configure("firstMemberSpec", EntitySpec.create(BlockingEntity.class) + .configure(BlockingEntity.STARTUP_LATCH, latch1)) + .configure("memberSpec", EntitySpec.create(BlockingEntity.class) + .configure(BlockingEntity.STARTUP_LATCH, latch2))); + + // Invoke start: should report starting + Task task = cluster.invoke(Startable.START, ImmutableMap.of("locations", ImmutableList.of(loc))); + EntityAsserts.assertAttributeEqualsEventually(cluster, Attributes.SERVICE_UP, false); + EntityAsserts.assertAttributeEqualsEventually(cluster, Attributes.SERVICE_STATE_ACTUAL, Lifecycle.STARTING); + EntityAsserts.assertAttributeEqualsEventually(cluster, DynamicCluster.GROUP_SIZE, 2); + + // Allow first member to complete: we are now quorate, so should report service.isUp=true; + // but will still be starting. + latch1.countDown(); + + EntityAsserts.assertAttributeEqualsEventually(cluster, Attributes.SERVICE_UP, true); + EntityAsserts.assertAttributeEqualsContinually(ImmutableMap.of("timeout", shortWait), cluster, Attributes.SERVICE_STATE_ACTUAL, Lifecycle.STARTING); + assertFalse(task.isDone()); + + // Allow second member to complete; we are now fully up. + latch2.countDown(); + task.get(); + + EntityAsserts.assertAttributeEqualsEventually(cluster, Attributes.SERVICE_UP, true); + EntityAsserts.assertAttributeEqualsEventually(cluster, Attributes.SERVICE_STATE_ACTUAL, Lifecycle.RUNNING); + } + + @Test + public void testWaitForServiceUpDefaultsToNotChecking() throws Exception { + DynamicCluster cluster = app.addChild(EntitySpec.create(DynamicCluster.class) + .configure(DynamicCluster.MEMBER_SPEC, EntitySpec.create(TestEntity.class)) + .configure("initialSize", 1)); + + // Indicate that the cluster is not healthy + ServiceStateLogic.updateMapSensorEntry(cluster, ServiceStateLogic.SERVICE_NOT_UP_INDICATORS, "simulateNotUpKey", "myVal"); + + // Start - expect it to complete promptly + Stopwatch stopwatch = Stopwatch.createStarted(); + app.start(ImmutableList.of()); + Duration startTime = Duration.of(stopwatch); + LOG.info("start-time "+startTime); + assertTrue(startTime.isShorterThan(Asserts.DEFAULT_LONG_TIMEOUT), "startTime="+startTime); + + // Should be on-fire + EntityAsserts.assertAttributeEqualsEventually(cluster, Attributes.SERVICE_STATE_ACTUAL, Lifecycle.ON_FIRE); + EntityAsserts.assertAttributeEqualsEventually(cluster, Attributes.SERVICE_UP, false); + + // Clearing the notUp indicator should allow it to recover + ServiceStateLogic.updateMapSensorEntry(cluster, ServiceStateLogic.SERVICE_NOT_UP_INDICATORS, "simulateNotUpKey", Entities.REMOVE); + EntityAsserts.assertAttributeEqualsEventually(cluster, Attributes.SERVICE_STATE_ACTUAL, Lifecycle.RUNNING); + EntityAsserts.assertAttributeEqualsEventually(cluster, Attributes.SERVICE_UP, true); + } + + @Test + public void testWaitForServiceFails() throws Exception { + DynamicCluster cluster = app.addChild(EntitySpec.create(DynamicCluster.class) + .configure(DynamicCluster.START_TIMEOUT, Duration.ONE_MILLISECOND) + .configure(DynamicCluster.MEMBER_SPEC, EntitySpec.create(TestEntity.class)) + .configure("initialSize", 1)); + + // Indicate that the cluster is not healthy + ServiceStateLogic.updateMapSensorEntry(cluster, ServiceStateLogic.SERVICE_NOT_UP_INDICATORS, "simulateNotUpKey", "myVal"); + + // Start - expect it to fail promptly + Stopwatch stopwatch = Stopwatch.createStarted(); + try { + app.start(ImmutableList.of()); + Asserts.shouldHaveFailedPreviously(); + } catch (Exception e) { + Asserts.expectedFailureContains(e, "Timeout waiting for SERVICE_UP"); + } + Duration startTime = Duration.of(stopwatch); + LOG.info("start-time "+startTime); + assertTrue(startTime.isShorterThan(Asserts.DEFAULT_LONG_TIMEOUT), "startTime="+startTime); + + // Should be on-fire + EntityAsserts.assertAttributeEqualsEventually(cluster, Attributes.SERVICE_STATE_ACTUAL, Lifecycle.ON_FIRE); + EntityAsserts.assertAttributeEqualsEventually(cluster, Attributes.SERVICE_UP, false); + + // Clearing the notUp indicator should allow it to recover + ServiceStateLogic.updateMapSensorEntry(cluster, ServiceStateLogic.SERVICE_NOT_UP_INDICATORS, "simulateNotUpKey", Entities.REMOVE); + EntityAsserts.assertAttributeEqualsEventually(cluster, Attributes.SERVICE_UP, true); + EntityAsserts.assertAttributeEqualsEventually(cluster, Attributes.SERVICE_STATE_ACTUAL, Lifecycle.RUNNING); + } + + @Test + public void testWaitForServiceSucceedsEventually() throws Exception { + Map veryShortWait = ImmutableMap.of("timeout", Duration.millis(50)); + + DynamicCluster cluster = app.addChild(EntitySpec.create(DynamicCluster.class) + .configure(DynamicCluster.START_TIMEOUT, Asserts.DEFAULT_LONG_TIMEOUT.multiply(2)) + .configure(DynamicCluster.MEMBER_SPEC, EntitySpec.create(TestEntity.class)) + .configure("initialSize", 1)); + + // Indicate that the cluster is not healthy + ServiceStateLogic.updateMapSensorEntry(cluster, ServiceStateLogic.SERVICE_NOT_UP_INDICATORS, "simulateNotUpKey", "myVal"); + + // Start in a background thread + Task task = app.invoke(Startable.START, ImmutableMap.of("locations", ImmutableList.of())); + + // The member should start, but we should still be waiting for the cluster's service.isUp + EntityAsserts.assertGroupSizeEqualsEventually(cluster, 1); + TestEntity member = (TestEntity) Iterables.find(cluster.getChildren(), Predicates.instanceOf(TestEntity.class)); + EntityAsserts.assertAttributeEqualsEventually(member, Attributes.SERVICE_STATE_ACTUAL, Lifecycle.RUNNING); + + EntityAsserts.assertAttributeEqualsContinually(veryShortWait, cluster, Attributes.SERVICE_STATE_ACTUAL, Lifecycle.STARTING); + EntityAsserts.assertAttributeEqualsContinually(veryShortWait, cluster, Attributes.SERVICE_UP, false); + assertFalse(task.isDone()); + + // Clearing the not-up-indicator will allow it to return successfully + ServiceStateLogic.updateMapSensorEntry(cluster, ServiceStateLogic.SERVICE_NOT_UP_INDICATORS, "simulateNotUpKey", Entities.REMOVE); + + EntityAsserts.assertAttributeEqualsEventually(cluster, Attributes.SERVICE_UP, true); + EntityAsserts.assertAttributeEqualsEventually(cluster, Attributes.SERVICE_STATE_ACTUAL, Lifecycle.RUNNING); + task.get(Asserts.DEFAULT_LONG_TIMEOUT); + } + @Test public void testInitialQuorumSizeDefaultsToInitialSize() throws Exception { final int failNum = 1; From a6166dc0124aa15419d6b6e15c3e9088b32dcef8 Mon Sep 17 00:00:00 2001 From: Aled Sage Date: Thu, 3 Aug 2017 17:50:44 +0100 Subject: [PATCH 2/2] Fix non-deterministic CatalogMakeOsgiBundleTest --- .../camp/brooklyn/catalog/CatalogMakeOsgiBundleTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/catalog/CatalogMakeOsgiBundleTest.java b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/catalog/CatalogMakeOsgiBundleTest.java index 290c9e87f1..f5ac4bd22a 100644 --- a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/catalog/CatalogMakeOsgiBundleTest.java +++ b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/catalog/CatalogMakeOsgiBundleTest.java @@ -120,7 +120,7 @@ private Entity assertBasic1DeploysAndHasSensor() throws Exception { "- type: " + "basic1"; Entity app = createAndStartApplication(yaml); Entity basic1 = Iterables.getOnlyElement( app.getChildren() ); - EntityAsserts.assertAttribute(basic1, Sensors.newStringSensor("a.sensor"), Predicates.equalTo("A")); + EntityAsserts.assertAttributeEqualsEventually(basic1, Sensors.newStringSensor("a.sensor"), "A"); return basic1; }