From 92da714c31e368b56ecb00cff21b0f7939faeb95 Mon Sep 17 00:00:00 2001 From: Sam Corbett Date: Tue, 18 Oct 2016 15:59:30 +0100 Subject: [PATCH 1/2] Fix Javadoc references --- .../networking/JcloudsLocationSecurityGroupCustomizer.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/locations/jclouds/src/main/java/org/apache/brooklyn/location/jclouds/networking/JcloudsLocationSecurityGroupCustomizer.java b/locations/jclouds/src/main/java/org/apache/brooklyn/location/jclouds/networking/JcloudsLocationSecurityGroupCustomizer.java index e15ad68fdf..a5bff7d58e 100644 --- a/locations/jclouds/src/main/java/org/apache/brooklyn/location/jclouds/networking/JcloudsLocationSecurityGroupCustomizer.java +++ b/locations/jclouds/src/main/java/org/apache/brooklyn/location/jclouds/networking/JcloudsLocationSecurityGroupCustomizer.java @@ -168,13 +168,13 @@ public JcloudsLocationSecurityGroupCustomizer setSshCidrSupplier(Supplier return this; } - /** @see #addPermissionsToLocation(JcloudsSshMachineLocation, java.lang.Iterable) */ + /** @see #addPermissionsToLocation(JcloudsMachineLocation, java.lang.Iterable) */ public JcloudsLocationSecurityGroupCustomizer addPermissionsToLocation(final JcloudsMachineLocation location, IpPermission... permissions) { addPermissionsToLocation(location, ImmutableList.copyOf(permissions)); return this; } - /** @see #addPermissionsToLocation(JcloudsSshMachineLocation, java.lang.Iterable) */ + /** @see #addPermissionsToLocation(JcloudsMachineLocation, java.lang.Iterable) */ public JcloudsLocationSecurityGroupCustomizer addPermissionsToLocation(final JcloudsMachineLocation location, SecurityGroupDefinition securityGroupDefinition) { addPermissionsToLocation(location, securityGroupDefinition.getPermissions()); return this; From a413cedf5cffec87a343ee3041b37db538d6a03a Mon Sep 17 00:00:00 2001 From: Sam Corbett Date: Fri, 28 Oct 2016 11:52:55 +0100 Subject: [PATCH 2/2] Add AbstractInvokeEffectorPolicy Supports tracking the number of effector tasks that are ongoing and publishing that as an "is busy" sensor on the entity the policy is attached to. --- .../policy/AbstractInvokeEffectorPolicy.java | 145 ++++++++++++++++++ ...nvokeEffectorOnCollectionSensorChange.java | 14 +- .../policy/InvokeEffectorOnSensorChange.java | 54 +++++-- .../AbstractInvokeEffectorPolicyTest.java | 105 +++++++++++++ ...eEffectorOnCollectionSensorChangeTest.java | 25 ++- ...EffectorOnSensorChangeIntegrationTest.java | 87 +++++++++++ 6 files changed, 415 insertions(+), 15 deletions(-) create mode 100644 core/src/main/java/org/apache/brooklyn/policy/AbstractInvokeEffectorPolicy.java create mode 100644 core/src/test/java/org/apache/brooklyn/policy/AbstractInvokeEffectorPolicyTest.java create mode 100644 core/src/test/java/org/apache/brooklyn/policy/InvokeEffectorOnSensorChangeIntegrationTest.java diff --git a/core/src/main/java/org/apache/brooklyn/policy/AbstractInvokeEffectorPolicy.java b/core/src/main/java/org/apache/brooklyn/policy/AbstractInvokeEffectorPolicy.java new file mode 100644 index 0000000000..d0b5e6705e --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/policy/AbstractInvokeEffectorPolicy.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.brooklyn.policy; + +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import javax.annotation.Nonnull; + +import org.apache.brooklyn.api.effector.Effector; +import org.apache.brooklyn.api.entity.EntityLocal; +import org.apache.brooklyn.api.mgmt.Task; +import org.apache.brooklyn.api.sensor.AttributeSensor; +import org.apache.brooklyn.config.ConfigKey; +import org.apache.brooklyn.core.config.ConfigKeys; +import org.apache.brooklyn.core.policy.AbstractPolicy; +import org.apache.brooklyn.core.sensor.Sensors; +import org.apache.brooklyn.util.guava.Maybe; +import org.apache.brooklyn.util.text.Strings; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Objects; +import com.google.common.util.concurrent.MoreExecutors; + +public abstract class AbstractInvokeEffectorPolicy extends AbstractPolicy { + + private static final Logger LOG = LoggerFactory.getLogger(AbstractInvokeEffectorPolicy.class); + + public static final ConfigKey IS_BUSY_SENSOR_NAME = ConfigKeys.newStringConfigKey( + "isBusySensor", + "Name of the sensor to publish on the entity that indicates that this policy has incomplete effectors. " + + "If unset running tasks will not be tracked."); + + private final AtomicInteger taskCounter = new AtomicInteger(); + + /** + * Indicates that onEvent was notified of an value of is not the latest sensor value. + */ + private boolean moreUpdatesComing; + /** + * The timestamp of the event that informed moreUpdatesComing. Subsequent notifications + * of earlier events will not cause updates of moreUpdatesComing. + */ + private long mostRecentUpdate = 0; + /** + * Guards {@link #moreUpdatesComing} and {@link #mostRecentUpdate}. + */ + private final Object[] moreUpdatesLock = new Object[0]; + + @Override + public void setEntity(EntityLocal entity) { + super.setEntity(entity); + if (isBusySensorEnabled()) { + // Republishes when the entity rebinds. + publishIsBusy(); + } + } + + /** + * Invoke effector with parameters on the entity that the policy is attached to. + */ + protected Task invoke(Effector effector, Map parameters) { + if (isBusySensorEnabled()) { + getTaskCounter().incrementAndGet(); + publishIsBusy(); + } + Task task = entity.invoke(effector, parameters); + if (isBusySensorEnabled()) { + task.addListener(new EffectorListener(), MoreExecutors.sameThreadExecutor()); + } + return task; + } + + protected boolean isBusy() { + synchronized (moreUpdatesLock) { + return getTaskCounter().get() != 0 || moreUpdatesComing; + } + } + + protected boolean isBusySensorEnabled() { + return Strings.isNonBlank(getIsBusySensorName()); + } + + protected Maybe> getEffectorNamed(String name) { + return entity.getEntityType().getEffectorByName(name); + } + + @Nonnull + protected String getIsBusySensorName() { + return getConfig(IS_BUSY_SENSOR_NAME); + } + + /** + * Indicates that when the policy was notified of eventValue, occurring at time + * eventTimestamp, it observed the current sensor value to be current. This + * informs the value for {@link #moreUpdatesComing}. + */ + protected void setMoreUpdatesComing(long eventTimestamp, T eventValue, T current) { + if (eventTimestamp >= mostRecentUpdate) { + synchronized (moreUpdatesLock) { + if (eventTimestamp >= mostRecentUpdate) { + moreUpdatesComing = !Objects.equal(eventValue, current); + mostRecentUpdate = eventTimestamp; + } + } + } + } + + private AtomicInteger getTaskCounter() { + return taskCounter; + } + + private void publishIsBusy() { + final boolean busy = isBusy(); + LOG.trace("{} taskCount={}, isBusy={}", new Object[]{this, getTaskCounter().get(), busy}); + AttributeSensor sensor = Sensors.newBooleanSensor(getIsBusySensorName()); + entity.sensors().set(sensor, busy); + } + + private class EffectorListener implements Runnable { + @Override + public void run() { + getTaskCounter().decrementAndGet(); + publishIsBusy(); + } + } + +} diff --git a/core/src/main/java/org/apache/brooklyn/policy/InvokeEffectorOnCollectionSensorChange.java b/core/src/main/java/org/apache/brooklyn/policy/InvokeEffectorOnCollectionSensorChange.java index fad9be6cfe..352d40d13d 100644 --- a/core/src/main/java/org/apache/brooklyn/policy/InvokeEffectorOnCollectionSensorChange.java +++ b/core/src/main/java/org/apache/brooklyn/policy/InvokeEffectorOnCollectionSensorChange.java @@ -37,7 +37,6 @@ import org.apache.brooklyn.api.sensor.SensorEventListener; import org.apache.brooklyn.config.ConfigKey; import org.apache.brooklyn.core.config.ConfigKeys; -import org.apache.brooklyn.core.policy.AbstractPolicy; import org.apache.brooklyn.util.collections.MutableMap; import org.apache.brooklyn.util.core.flags.TypeCoercions; import org.apache.brooklyn.util.guava.Maybe; @@ -72,7 +71,7 @@ * there are no guarantees that the `onAdded' task will have finished before the * corresponding `onRemoved' task is invoked. */ -public class InvokeEffectorOnCollectionSensorChange extends AbstractPolicy implements SensorEventListener> { +public class InvokeEffectorOnCollectionSensorChange extends AbstractInvokeEffectorPolicy implements SensorEventListener> { private static final Logger LOG = LoggerFactory.getLogger(InvokeEffectorOnCollectionSensorChange.class); @@ -132,6 +131,15 @@ public void onEvent(SensorEvent> event) { final Set newValue = event.getValue() != null ? new LinkedHashSet<>(event.getValue()) : ImmutableSet.of(); + if (isBusySensorEnabled()) { + // There are more events coming that this policy hasn't been notified of if the + // value received in the event does not match the current value of the sensor. + final Collection sensorVal = entity.sensors().get(getTriggerSensor()); + final Set sensorValSet = sensorVal != null + ? new LinkedHashSet<>(sensorVal) + : ImmutableSet.of(); + setMoreUpdatesComing(event.getTimestamp(), newValue, sensorValSet); + } final Set added = new LinkedHashSet<>(), removed = new LinkedHashSet<>(); // It's only necessary to hold updateLock just to calculate the difference but // it is useful to guarantee that all the effectors are queued before the next @@ -174,7 +182,7 @@ private void onEvent(String effectorName, Object parameter) { } LOG.debug("{} invoking {} on {} with parameters {}", new Object[]{this, effector, entity, parameters}); - entity.invoke(effector.get(), parameters); + invoke(effector.get(), parameters); } } diff --git a/core/src/main/java/org/apache/brooklyn/policy/InvokeEffectorOnSensorChange.java b/core/src/main/java/org/apache/brooklyn/policy/InvokeEffectorOnSensorChange.java index ebb6da8427..662fd1d150 100644 --- a/core/src/main/java/org/apache/brooklyn/policy/InvokeEffectorOnSensorChange.java +++ b/core/src/main/java/org/apache/brooklyn/policy/InvokeEffectorOnSensorChange.java @@ -18,19 +18,23 @@ */ package org.apache.brooklyn.policy; +import org.apache.brooklyn.api.effector.Effector; import org.apache.brooklyn.api.entity.EntityLocal; -import org.apache.brooklyn.api.sensor.Sensor; +import org.apache.brooklyn.api.sensor.AttributeSensor; import org.apache.brooklyn.api.sensor.SensorEvent; import org.apache.brooklyn.api.sensor.SensorEventListener; import org.apache.brooklyn.config.ConfigKey; import org.apache.brooklyn.core.config.ConfigKeys; -import org.apache.brooklyn.core.policy.AbstractPolicy; import org.apache.brooklyn.core.sensor.Sensors; import org.apache.brooklyn.util.collections.MutableMap; +import org.apache.brooklyn.util.core.flags.TypeCoercions; +import org.apache.brooklyn.util.text.StringPredicates; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; +import com.google.common.base.Predicates; +import com.google.common.reflect.TypeToken; /** * Invokes the given effector when the policy changes. @@ -40,28 +44,56 @@ * * support conditions * * allow to be triggered by sensors on members */ -public class InvokeEffectorOnSensorChange extends AbstractPolicy implements SensorEventListener { +public class InvokeEffectorOnSensorChange extends AbstractInvokeEffectorPolicy implements SensorEventListener { private static final Logger LOG = LoggerFactory.getLogger(InvokeEffectorOnSensorChange.class); - public static final ConfigKey SENSOR = ConfigKeys.newConfigKey(Object.class, - "sensor", "Sensor to be monitored, as string or sensor type"); + public static final ConfigKey SENSOR = ConfigKeys.builder(Object.class) + .name("sensor") + .description("Sensor to be monitored, as string or sensor type") + .constraint(Predicates.notNull()) + .build(); - public static final ConfigKey EFFECTOR = ConfigKeys.newStringConfigKey( - "effector", "Name of effector to invoke"); + public static final ConfigKey EFFECTOR = ConfigKeys.builder(String.class) + .name("effector") + .description("Name of effector to invoke") + .constraint(StringPredicates.isNonBlank()) + .build(); + + private AttributeSensor sensor; @Override public void setEntity(EntityLocal entity) { super.setEntity(entity); Preconditions.checkNotNull(getConfig(EFFECTOR), EFFECTOR); - Object sensor = Preconditions.checkNotNull(getConfig(SENSOR), SENSOR); - if (sensor instanceof String) sensor = Sensors.newSensor(Object.class, (String)sensor); - subscriptions().subscribe(entity, (Sensor)sensor, this); + sensor = getSensor(); + subscriptions().subscribe(entity, sensor, this); + LOG.debug("{} subscribed to {} events on {}", new Object[]{this, sensor, entity}); } @Override public void onEvent(SensorEvent event) { - entity.invoke(entity.getEntityType().getEffectorByName(getConfig(EFFECTOR)).get(), MutableMap.of()); + final Effector eff = getEffectorNamed(getConfig(EFFECTOR)).get(); + if (isBusySensorEnabled()) { + final Object currentSensorValue = entity.sensors().get(sensor); + setMoreUpdatesComing(event.getTimestamp(), event.getValue(), currentSensorValue); + } + invoke(eff, MutableMap.of()); + } + + private AttributeSensor getSensor() { + final Object configVal = Preconditions.checkNotNull(getConfig(SENSOR), SENSOR); + final AttributeSensor sensor; + if (configVal == null) { + throw new NullPointerException("Value for " + SENSOR.getName() + " is null"); + } else if (configVal instanceof String) { + sensor = Sensors.newSensor(Object.class, (String) configVal); + } else if (configVal instanceof AttributeSensor) { + sensor = (AttributeSensor) configVal; + } else { + sensor = TypeCoercions.tryCoerce(configVal, new TypeToken>() {}).get(); + } + return sensor; } } diff --git a/core/src/test/java/org/apache/brooklyn/policy/AbstractInvokeEffectorPolicyTest.java b/core/src/test/java/org/apache/brooklyn/policy/AbstractInvokeEffectorPolicyTest.java new file mode 100644 index 0000000000..15f3b48bd8 --- /dev/null +++ b/core/src/test/java/org/apache/brooklyn/policy/AbstractInvokeEffectorPolicyTest.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.brooklyn.policy; + +import java.util.concurrent.CountDownLatch; + +import org.apache.brooklyn.api.effector.Effector; +import org.apache.brooklyn.api.entity.EntityLocal; +import org.apache.brooklyn.api.entity.EntitySpec; +import org.apache.brooklyn.api.mgmt.Task; +import org.apache.brooklyn.api.policy.PolicySpec; +import org.apache.brooklyn.api.sensor.AttributeSensor; +import org.apache.brooklyn.core.effector.EffectorBody; +import org.apache.brooklyn.core.effector.Effectors; +import org.apache.brooklyn.core.entity.EntityAsserts; +import org.apache.brooklyn.core.sensor.Sensors; +import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport; +import org.apache.brooklyn.core.test.entity.TestEntity; +import org.apache.brooklyn.test.Asserts; +import org.apache.brooklyn.util.core.config.ConfigBag; +import org.apache.brooklyn.util.exceptions.Exceptions; +import org.testng.annotations.Test; + +import com.google.common.base.Predicates; +import com.google.common.base.Supplier; +import com.google.common.collect.ImmutableMap; + +public class AbstractInvokeEffectorPolicyTest extends BrooklynAppUnitTestSupport { + + @Test + public void testCountReflectsNumberOfExecutingEffectors() { + final CountDownLatch effectorLatch = new CountDownLatch(1); + final AttributeSensor policyIsBusy = Sensors.newBooleanSensor( + "policyIsBusy"); + final Effector blockingEffector = Effectors.effector(Void.class, "abstract-invoke-effector-policy-test") + .impl(new BlockingEffector(effectorLatch)) + .build(); + final TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class)); + final TestAbstractInvokeEffectorPolicy policy = entity.policies().add( + PolicySpec.create(TestAbstractInvokeEffectorPolicy.class) + .configure(AbstractInvokeEffectorPolicy.IS_BUSY_SENSOR_NAME, policyIsBusy.getName())); + final Task effectorTask = policy.invoke(blockingEffector, ImmutableMap.of()); + + // expect isbusy on entity, effector incomplete. + Supplier effectorTaskDoneSupplier = new Supplier() { + @Override + public Boolean get() { + return effectorTask.isDone(); + } + }; + Asserts.continually(effectorTaskDoneSupplier, Predicates.equalTo(false)); + EntityAsserts.assertAttributeEqualsEventually(entity, policyIsBusy, true); + + effectorLatch.countDown(); + + Asserts.eventually(effectorTaskDoneSupplier, Predicates.equalTo(true)); + EntityAsserts.assertAttributeEqualsEventually(entity, policyIsBusy, false); + } + + public static class TestAbstractInvokeEffectorPolicy extends AbstractInvokeEffectorPolicy { + public TestAbstractInvokeEffectorPolicy() { + } + + @Override + public void setEntity(EntityLocal entity) { + super.setEntity(entity); + } + } + + private static class BlockingEffector extends EffectorBody { + final CountDownLatch latch; + + private BlockingEffector(CountDownLatch latch) { + this.latch = latch; + } + + @Override + public Void call(ConfigBag config) { + try { + latch.await(); + } catch (InterruptedException e) { + throw Exceptions.propagate(e); + } + return null; + } + } + +} diff --git a/core/src/test/java/org/apache/brooklyn/policy/InvokeEffectorOnCollectionSensorChangeTest.java b/core/src/test/java/org/apache/brooklyn/policy/InvokeEffectorOnCollectionSensorChangeTest.java index ab72f71e5f..49d8c33b2b 100644 --- a/core/src/test/java/org/apache/brooklyn/policy/InvokeEffectorOnCollectionSensorChangeTest.java +++ b/core/src/test/java/org/apache/brooklyn/policy/InvokeEffectorOnCollectionSensorChangeTest.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -34,6 +35,8 @@ import org.apache.brooklyn.api.entity.EntitySpec; import org.apache.brooklyn.api.policy.PolicySpec; import org.apache.brooklyn.api.sensor.AttributeSensor; +import org.apache.brooklyn.api.sensor.SensorEvent; +import org.apache.brooklyn.api.sensor.SensorEventListener; import org.apache.brooklyn.core.effector.AddEffector; import org.apache.brooklyn.core.effector.EffectorBody; import org.apache.brooklyn.core.effector.Effectors; @@ -48,6 +51,7 @@ import com.google.common.base.Predicates; import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; @@ -58,6 +62,9 @@ public class InvokeEffectorOnCollectionSensorChangeTest extends BrooklynAppUnitT private static final AttributeSensor> DEFAULT_SENSOR = Sensors.newSensor(new TypeToken>() {}, "invokeeffectoronsetchangetest.sensor"); + private static final AttributeSensor IS_BUSY_SENSOR = Sensors.newBooleanSensor( + "invokeeffectoronsetchangetest.isBusy"); + LinkedBlockingQueue onAddedParameters; LinkedBlockingQueue onRemovedParameters; Effector onAddedEffector; @@ -202,13 +209,29 @@ public void testErrorIfTriggerSensorNotSet() { .configure(InvokeEffectorOnCollectionSensorChange.ON_REMOVED_EFFECTOR_NAME, onRemovedEffector.getName())); } + @Test + public void testPublishesIsBusySensor() { + final List isBusyValues = new CopyOnWriteArrayList<>(); + testEntity.subscriptions().subscribe(testEntity, IS_BUSY_SENSOR, new SensorEventListener() { + @Override + public void onEvent(SensorEvent event) { + isBusyValues.add(event.getValue()); + } + }); + addSetChangePolicy(true, false); + testEntity.sensors().set(DEFAULT_SENSOR, ImmutableSet.of(1)); + List expected = ImmutableList.of(false, true, false); + Asserts.eventually(Suppliers.ofInstance(isBusyValues), Predicates.equalTo(expected)); + } + private void addSetChangePolicy(boolean includeOnAdded, boolean includeOnRemoved) { addSetChangePolicy(DEFAULT_SENSOR, includeOnAdded, includeOnRemoved); } private void addSetChangePolicy(AttributeSensor> sensor, boolean includeOnAdded, boolean includeOnRemoved) { PolicySpec policySpec = PolicySpec.create(InvokeEffectorOnCollectionSensorChange.class) - .configure(InvokeEffectorOnCollectionSensorChange.TRIGGER_SENSOR, sensor); + .configure(InvokeEffectorOnCollectionSensorChange.TRIGGER_SENSOR, sensor) + .configure(InvokeEffectorOnCollectionSensorChange.IS_BUSY_SENSOR_NAME, IS_BUSY_SENSOR.getName()); if (includeOnAdded) { policySpec.configure(InvokeEffectorOnCollectionSensorChange.ON_ADDED_EFFECTOR_NAME, onAddedEffector.getName()); } diff --git a/core/src/test/java/org/apache/brooklyn/policy/InvokeEffectorOnSensorChangeIntegrationTest.java b/core/src/test/java/org/apache/brooklyn/policy/InvokeEffectorOnSensorChangeIntegrationTest.java new file mode 100644 index 0000000000..cfeaf8389d --- /dev/null +++ b/core/src/test/java/org/apache/brooklyn/policy/InvokeEffectorOnSensorChangeIntegrationTest.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.brooklyn.policy; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.brooklyn.api.effector.Effector; +import org.apache.brooklyn.api.entity.EntitySpec; +import org.apache.brooklyn.api.policy.PolicySpec; +import org.apache.brooklyn.api.sensor.AttributeSensor; +import org.apache.brooklyn.core.effector.AddEffector; +import org.apache.brooklyn.core.effector.EffectorBody; +import org.apache.brooklyn.core.effector.Effectors; +import org.apache.brooklyn.core.entity.EntityAsserts; +import org.apache.brooklyn.core.sensor.Sensors; +import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport; +import org.apache.brooklyn.entity.stock.BasicEntity; +import org.apache.brooklyn.util.core.config.ConfigBag; +import org.testng.annotations.Test; + +public class InvokeEffectorOnSensorChangeIntegrationTest extends BrooklynAppUnitTestSupport { + + @Test(groups = "Integration") + public void testIsBusySensorAlwaysFalseAtEnd() throws InterruptedException { + /* + * Stress-test isBusy. Reliably failed with insufficient synchronisation + * in AbstractInvokeEffectorPolicy. + */ + final AttributeSensor sensor = Sensors.newStringSensor("my-sensor"); + final AttributeSensor isBusy = Sensors.newBooleanSensor("is-busy"); + Effector effector = Effectors.effector(Void.class, "effector") + .impl(new DoNothingEffector()) + .build(); + final BasicEntity entity = app.createAndManageChild(EntitySpec.create(BasicEntity.class) + .addInitializer(new AddEffector(effector)) + .policy(PolicySpec.create(InvokeEffectorOnSensorChange.class) + .configure(InvokeEffectorOnSensorChange.SENSOR, sensor) + .configure(InvokeEffectorOnSensorChange.EFFECTOR, "effector") + .configure(InvokeEffectorOnSensorChange.IS_BUSY_SENSOR_NAME, isBusy.getName()))); + final AtomicInteger threadId = new AtomicInteger(); + Thread[] threads = new Thread[10]; + for (int i = 0; i < threads.length; i++) { + threads[i] = new Thread(new Runnable() { + private int count = 0; + @Override + public void run() { + int id = threadId.incrementAndGet(); + while (count++ < 1000) { + entity.sensors().set(sensor, "thread-" + id + "-" + count); + } + } + }); + threads[i].start(); + } + for (Thread thread : threads) { + thread.join(); + } + + EntityAsserts.assertAttributeEqualsEventually(entity, isBusy, false); + } + + + private static class DoNothingEffector extends EffectorBody { + @Override + public Void call(ConfigBag config) { + return null; + } + } + +} \ No newline at end of file