Skip to content
Permalink
Browse files
BROOKLYN-212: more improvements
- Resizable.resize: throw InsufficientCapacityException if didn’t 
  manage to reach target size, rather than only if didn’t manage to
  increase in size at all.
- DynamicCluster: delete nodes that throw NoMachinesAvailableException,
  rather than putting them in quarantine.
- Fix AutoScalerPolicy’s max-capacity high-water mark, when 
  resizeUpStabilizationDelay is used.
  • Loading branch information
aledsage committed Jan 14, 2016
1 parent ffbad25 commit 4245bd628172ad25bfbf745a9ad259f11050b3ea
Show file tree
Hide file tree
Showing 8 changed files with 223 additions and 64 deletions.
@@ -32,7 +32,8 @@
public interface Resizable {

/**
* Indicates that resizing up (at all) is not possible, because there is insufficient capacity.
* Indicates that resizing up to the desired size is not possible - only resized to the
* {@link Resizable#getCurrentSize()}, because there is insufficient capacity.
*/
public static class InsufficientCapacityException extends RuntimeException {
private static final long serialVersionUID = 953230498564942446L;
@@ -53,7 +54,8 @@ public InsufficientCapacityException(String msg, Throwable cause) {
* @param desiredSize the new size of the entity group.
* @return the new size of the group.
*
* @throws InsufficientCapacityException If the request was to grow, but there is no capacity to grow at all
* @throws InsufficientCapacityException If the request was to grow, but there is no capacity to grow to
* the desired size.
*/
@Effector(description="Changes the size of the entity (e.g. the number of nodes in a cluster)")
Integer resize(@EffectorParam(name="desiredSize", description="The new size of the cluster") Integer desiredSize);
@@ -48,6 +48,7 @@

import com.google.common.annotations.Beta;
import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Multimap;
import com.google.common.reflect.TypeToken;
@@ -101,6 +102,15 @@ interface ZoneFailureDetector {
ConfigKey<Boolean> QUARANTINE_FAILED_ENTITIES = ConfigKeys.newBooleanConfigKey(
"dynamiccluster.quarantineFailedEntities", "If true, will quarantine entities that fail to start; if false, will get rid of them (i.e. delete them)", true);

@SetFromFlag("quarantineFilter")
ConfigKey<Predicate<? super Throwable>> QUARANTINE_FILTER = ConfigKeys.newConfigKey(
new TypeToken<Predicate<? super Throwable>>() {},
"dynamiccluster.quarantineFilter",
"Quarantine the failed nodes that pass this filter (given the exception thrown by the node). "
+ "Default is those that did not fail with NoMachinesAvailableException "
+ "(Config ignored if quarantineFailedEntities is false)",
null);

AttributeSensor<Lifecycle> SERVICE_STATE_ACTUAL = Attributes.SERVICE_STATE_ACTUAL;

BasicNotificationSensor<Entity> ENTITY_QUARANTINED = new BasicNotificationSensor<Entity>(Entity.class, "dynamiccluster.entityQuarantined", "Entity failed to start, and has been quarantined");
@@ -82,6 +82,7 @@
import com.google.common.base.Functions;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
@@ -333,6 +334,19 @@ protected QuarantineGroup getQuarantineGroup() {
return getAttribute(QUARANTINE_GROUP);
}

protected Predicate<? super Throwable> getQuarantineFilter() {
Predicate<? super Throwable> result = getConfig(QUARANTINE_FILTER);
if (result != null) {
return result;
} else {
return new Predicate<Throwable>() {
@Override public boolean apply(Throwable input) {
return Exceptions.getFirstThrowableOfType(input, NoMachinesAvailableException.class) == null;
}
};
}
}

protected int getInitialQuorumSize() {
int initialSize = getConfig(INITIAL_SIZE).intValue();
int initialQuorumSize = getConfig(INITIAL_QUORUM_SIZE).intValue();
@@ -711,16 +725,10 @@ protected Collection<Entity> grow(int delta) {
chosenLocations = Collections.nCopies(delta, getLocation());
}

// create and start the entities
// create and start the entities.
// if any fail, then propagate the error.
ReferenceWithError<Collection<Entity>> result = addInEachLocation(chosenLocations, ImmutableMap.of());

// If any entities were created, return them (even if we didn't manage to create them all).
// Otherwise, propagate any error that happened.
if (result.get().size() > 0) {
return result.get();
} else {
return result.getWithError();
}
return result.getWithError();
}

/** <strong>Note</strong> for sub-clases; this method can be called while synchronized on {@link #mutex}. */
@@ -809,7 +817,7 @@ protected ReferenceWithError<Collection<Entity>> addInEachLocation(Iterable<Loca
// quarantine/cleanup as necessary
if (!errors.isEmpty()) {
if (isQuarantineEnabled()) {
quarantineFailedNodes(errors.keySet());
quarantineFailedNodes(errors);
} else {
cleanupFailedNodes(errors.keySet());
}
@@ -819,11 +827,18 @@ protected ReferenceWithError<Collection<Entity>> addInEachLocation(Iterable<Loca
return ReferenceWithError.newInstanceWithoutError(result);
}

protected void quarantineFailedNodes(Collection<Entity> failedEntities) {
for (Entity entity : failedEntities) {
sensors().emit(ENTITY_QUARANTINED, entity);
getQuarantineGroup().addMember(entity);
removeMember(entity);
protected void quarantineFailedNodes(Map<Entity, Throwable> failedEntities) {
for (Map.Entry<Entity, Throwable> entry : failedEntities.entrySet()) {
Entity entity = entry.getKey();
Throwable cause = entry.getValue();
if (cause == null || getQuarantineFilter().apply(cause)) {
sensors().emit(ENTITY_QUARANTINED, entity);
getQuarantineGroup().addMember(entity);
removeMember(entity);
} else {
LOG.info("Cluster {} discarding failed node {}, rather than quarantining", this, entity);
discardNode(entity);
}
}
}

@@ -59,16 +59,17 @@ protected void initEnrichers() {
@Override
public Integer resize(Integer desiredSize) {
desiredSizeHistory.add(desiredSize);
int achievableSize = Math.min(desiredSize, getConfig(MAX_SIZE));

if (desiredSize > size) {
if (size < getConfig(MAX_SIZE)) {
desiredSize = Math.min(desiredSize, getConfig(MAX_SIZE));
} else {
throw new InsufficientCapacityException("Simulating insufficient capacity (desiredSize="+desiredSize+"; maxSize="+getConfig(MAX_SIZE)+"; currentSize="+size+")");
}
if (achievableSize != size) {
this.sizeHistory.add(achievableSize);
this.size = achievableSize;
}
this.sizeHistory.add(desiredSize);
this.size = desiredSize;

if (desiredSize > achievableSize) {
throw new InsufficientCapacityException("Simulating insufficient capacity (desiredSize="+desiredSize+"; maxSize="+getConfig(MAX_SIZE)+"; newSize="+size+")");
}

return size;
}

@@ -222,7 +222,7 @@ public void testResizeWhereChildThrowsNoMachineAvailableExceptionIsPropagatedAsI
}

@Test
public void testResizeWhereSubsetOfChildrenThrowsNoMachineAvailableExceptionReturnsNormally() throws Exception {
public void testResizeWhereSubsetOfChildrenThrowsNoMachineAvailableExceptionIsPropagatedAsInsuffientCapacityException() throws Exception {
final DynamicCluster cluster = app.createAndManageChild(EntitySpec.create(DynamicCluster.class)
.configure(DynamicCluster.MEMBER_SPEC, EntitySpec.create(FailingEntity.class)
.configure(FailingEntity.FAIL_ON_START_CONDITION, new Predicate<FailingEntity>() {
@@ -236,19 +236,31 @@ public void testResizeWhereSubsetOfChildrenThrowsNoMachineAvailableExceptionRetu
.configure(DynamicCluster.INITIAL_SIZE, 0));
cluster.start(ImmutableList.of(loc));

// Managed to partially resize, so should not fail entirely.
// Instead just say how big we managed to get.
Integer newSize = cluster.resize(2);
assertEquals(newSize, (Integer)1);
// Managed to partially resize, but will still throw exception.
// The getCurrentSize will report how big we managed to get.
// The children that failed due to NoMachinesAvailableException will have been unmanaged automatically.
try {
cluster.resize(2);
Asserts.shouldHaveFailedPreviously();
} catch (Exception e) {
Asserts.expectedFailureOfType(e, Resizable.InsufficientCapacityException.class);
}
assertEquals(cluster.getCurrentSize(), (Integer)1);

// This attempt will fail, because all new children will fail
Iterable<FailingEntity> children1 = Iterables.filter(cluster.getChildren(), FailingEntity.class);
assertEquals(Iterables.size(children1), 1);
assertEquals(Iterables.getOnlyElement(children1).sensors().get(TestEntity.SERVICE_UP), Boolean.TRUE);

// This attempt will also fail, because all new children will fail
try {
cluster.resize(2);
Asserts.shouldHaveFailedPreviously();
} catch (Exception e) {
Asserts.expectedFailureOfType(e, Resizable.InsufficientCapacityException.class);
}
assertEquals(cluster.getCurrentSize(), (Integer)1);
Iterable<FailingEntity> children2 = Iterables.filter(cluster.getChildren(), FailingEntity.class);
assertEquals(Iterables.size(children2), 1);
assertEquals(Iterables.getOnlyElement(children2), Iterables.getOnlyElement(children1));
}

/** This can be sensitive to order, e.g. if TestEntity set expected RUNNING before setting SERVICE_UP,
@@ -471,7 +483,7 @@ public void failingEntitiesDontBreakClusterActions() throws Exception {
}}));

cluster.start(ImmutableList.of(loc));
cluster.resize(3);
resizeExpectingError(cluster, 3);
assertEquals(cluster.getCurrentSize(), (Integer)2);
assertEquals(cluster.getMembers().size(), 2);
for (Entity member : cluster.getMembers()) {
@@ -583,7 +595,7 @@ public void testCanQuarantineFailedEntities() throws Exception {
}}));

cluster.start(ImmutableList.of(loc));
cluster.resize(3);
resizeExpectingError(cluster, 3);
assertEquals(cluster.getCurrentSize(), (Integer)2);
assertEquals(cluster.getMembers().size(), 2);
assertEquals(Iterables.size(Iterables.filter(cluster.getChildren(), Predicates.instanceOf(FailingEntity.class))), 3);
@@ -620,7 +632,7 @@ public void testDoNotQuarantineFailedEntities() throws Exception {
assertEquals(cluster.getChildren().size(), 0, "children="+cluster.getChildren());

// Failed node will not be a member or child
cluster.resize(3);
resizeExpectingError(cluster, 3);
assertEquals(cluster.getCurrentSize(), (Integer)2);
assertEquals(cluster.getMembers().size(), 2);
assertEquals(cluster.getChildren().size(), 2, "children="+cluster.getChildren());
@@ -632,6 +644,62 @@ public void testDoNotQuarantineFailedEntities() throws Exception {
}
}

@Test
public void testQuarantineFailedEntitiesRespectsCustomFilter() throws Exception {
Predicate<Throwable> filter = new Predicate<Throwable>() {
@Override public boolean apply(Throwable input) {
return Exceptions.getFirstThrowableOfType(input, AllowedException.class) != null;
}
};
runQuarantineFailedEntitiesRespectsFilter(AllowedException.class, DisallowedException.class, filter);
}
@SuppressWarnings("serial")
public static class AllowedException extends RuntimeException {
public AllowedException(String message) {
super(message);
}
}
@SuppressWarnings("serial")
public static class DisallowedException extends RuntimeException {
public DisallowedException(String message) {
super(message);
}
}

@Test
public void testQuarantineFailedEntitiesRespectsDefaultFilter() throws Exception {
Predicate<Throwable> filter = null;
runQuarantineFailedEntitiesRespectsFilter(AllowedException.class, NoMachinesAvailableException.class, filter);
}

protected void runQuarantineFailedEntitiesRespectsFilter(Class<? extends Exception> allowedException,
Class<? extends Exception> disallowedException, Predicate<Throwable> quarantineFilter) throws Exception {
final List<Class<? extends Exception>> failureCauses = ImmutableList.<Class<? extends Exception>>of(allowedException, disallowedException);
final AtomicInteger counter = new AtomicInteger(0);
DynamicCluster cluster = app.createAndManageChild(EntitySpec.create(DynamicCluster.class)
.configure("quarantineFailedEntities", true)
.configure("initialSize", 0)
.configure("quarantineFilter", quarantineFilter)
.configure("factory", new EntityFactory() {
@Override public Entity newEntity(Map flags, Entity parent) {
int num = counter.getAndIncrement();
return app.getManagementContext().getEntityManager().createEntity(EntitySpec.create(FailingEntity.class)
.configure(flags)
.configure(FailingEntity.FAIL_ON_START, true)
.configure(FailingEntity.EXCEPTION_CLAZZ, failureCauses.get(num))
.parent(parent));
}}));

cluster.start(ImmutableList.of(loc));
resizeExpectingError(cluster, 2);
Iterable<FailingEntity> children = Iterables.filter(cluster.getChildren(), FailingEntity.class);
Collection<Entity> quarantineMembers = cluster.sensors().get(DynamicCluster.QUARANTINE_GROUP).getMembers();

assertEquals(cluster.getCurrentSize(), (Integer)0);
assertEquals(Iterables.getOnlyElement(children).config().get(FailingEntity.EXCEPTION_CLAZZ), allowedException);
assertEquals(Iterables.getOnlyElement(quarantineMembers), Iterables.getOnlyElement(children));
}

@Test
public void defaultRemovalStrategyShutsDownNewestFirstWhenResizing() throws Exception {
final List<Entity> creationOrder = Lists.newArrayList();
@@ -20,7 +20,6 @@

import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.brooklyn.util.JavaGroovyEquivalents.groovyTruth;
import groovy.lang.Closure;

import java.util.Map;
import java.util.concurrent.Callable;
@@ -30,8 +29,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.brooklyn.api.catalog.Catalog;
import org.apache.brooklyn.api.entity.Entity;
import org.apache.brooklyn.api.entity.EntityLocal;
@@ -55,13 +52,17 @@
import org.apache.brooklyn.util.core.flags.TypeCoercions;
import org.apache.brooklyn.util.core.task.Tasks;
import org.apache.brooklyn.util.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.reflect.TypeToken;
import com.google.common.util.concurrent.ThreadFactoryBuilder;

import groovy.lang.Closure;


/**
* Policy that is attached to a {@link Resizable} entity and dynamically adjusts its size in response to
@@ -869,6 +870,10 @@ private void analyze(ScalingData data, String description) {
onNewUnboundedPoolSize(desiredSizeUnconstrained);
}

private int applyMinMaxConstraints(long desiredSize) {
return applyMinMaxConstraints(desiredSize > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int)desiredSize);
}

private int applyMinMaxConstraints(int desiredSize) {
int minSize = getMinPoolSize();
int maxSize = getMaxPoolSize();
@@ -1014,40 +1019,42 @@ private void notifyMaxReachedIfRequiredNow() {
}

private void resizeNow() {
final long currentPoolSize = getCurrentSizeOperator().apply(poolEntity);
final int currentPoolSize = getCurrentSizeOperator().apply(poolEntity);
CalculatedDesiredPoolSize calculatedDesiredPoolSize = calculateDesiredPoolSize(currentPoolSize);
final long desiredPoolSize = calculatedDesiredPoolSize.size;
long desiredPoolSize = calculatedDesiredPoolSize.size;
boolean stable = calculatedDesiredPoolSize.stable;

final int targetPoolSize = applyMinMaxConstraints(desiredPoolSize);

if (!stable) {
// the desired size fluctuations are not stable; ensure we check again later (due to time-window)
// even if no additional events have been received
// (note we continue now with as "good" a resize as we can given the instability)
if (LOG.isTraceEnabled()) LOG.trace("{} re-scheduling resize check for {}, as desired size not stable (current {}, desired {}); continuing with resize...",
new Object[] {this, poolEntity, currentPoolSize, desiredPoolSize});
new Object[] {this, poolEntity, currentPoolSize, targetPoolSize});
scheduleResize();
}
if (currentPoolSize == desiredPoolSize) {
if (currentPoolSize == targetPoolSize) {
if (LOG.isTraceEnabled()) LOG.trace("{} not resizing pool {} from {} to {}",
new Object[] {this, poolEntity, currentPoolSize, desiredPoolSize});
new Object[] {this, poolEntity, currentPoolSize, targetPoolSize});
return;
}

if (LOG.isDebugEnabled()) LOG.debug("{} requesting resize to {}; current {}, min {}, max {}",
new Object[] {this, desiredPoolSize, currentPoolSize, getMinPoolSize(), getMaxPoolSize()});
new Object[] {this, targetPoolSize, currentPoolSize, getMinPoolSize(), getMaxPoolSize()});

Entities.submit(entity, Tasks.<Void>builder().displayName("Auto-scaler")
.description("Auto-scaler recommending resize from "+currentPoolSize+" to "+desiredPoolSize)
.description("Auto-scaler recommending resize from "+currentPoolSize+" to "+targetPoolSize)
.tag(BrooklynTaskTags.NON_TRANSIENT_TASK_TAG)
.body(new Callable<Void>() {
@Override
public Void call() throws Exception {
// TODO Should we use int throughout, rather than casting here?
try {
getResizeOperator().resize(poolEntity, (int) desiredPoolSize);
getResizeOperator().resize(poolEntity, (int) targetPoolSize);
} catch (Resizable.InsufficientCapacityException e) {
// cannot resize beyond this; set the high-water mark
int insufficientCapacityHighWaterMark = (currentPoolSize > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int)currentPoolSize);
int insufficientCapacityHighWaterMark = getCurrentSizeOperator().apply(poolEntity);
LOG.warn("{} failed to resize {} due to insufficient capacity; setting high-water mark to {}, "
+ "and will not attempt to resize above that level again",
new Object[] {AutoScalerPolicy.this, poolEntity, insufficientCapacityHighWaterMark});

0 comments on commit 4245bd6

Please sign in to comment.