Skip to content
Permalink
Browse files
This closes #1136
  • Loading branch information
aledsage committed Jan 15, 2016
2 parents 848f750 + 4245bd6 commit d9d13371c70037a9ceb1c20ba9024b33ed9b983d
Showing 12 changed files with 620 additions and 48 deletions.
@@ -31,13 +31,31 @@
*/
public interface Resizable {

/**
* Indicates that resizing up to the desired size is not possible - only resized to the
* {@link Resizable#getCurrentSize()}, because there is insufficient capacity.
*/
public static class InsufficientCapacityException extends RuntimeException {
private static final long serialVersionUID = 953230498564942446L;

public InsufficientCapacityException(String msg) {
super(msg);
}
public InsufficientCapacityException(String msg, Throwable cause) {
super(msg, cause);
}
}

MethodEffector<Integer> RESIZE = new MethodEffector<Integer>(Resizable.class, "resize");

/**
* Grow or shrink this entity to the desired size.
*
* @param desiredSize the new size of the entity group.
* @return the new size of the group.
*
* @throws InsufficientCapacityException If the request was to grow, but there is no capacity to grow to
* the desired size.
*/
@Effector(description="Changes the size of the entity (e.g. the number of nodes in a cluster)")
Integer resize(@EffectorParam(name="desiredSize", description="The new size of the cluster") Integer desiredSize);
@@ -48,6 +48,7 @@

import com.google.common.annotations.Beta;
import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Multimap;
import com.google.common.reflect.TypeToken;
@@ -101,6 +102,15 @@ interface ZoneFailureDetector {
ConfigKey<Boolean> QUARANTINE_FAILED_ENTITIES = ConfigKeys.newBooleanConfigKey(
"dynamiccluster.quarantineFailedEntities", "If true, will quarantine entities that fail to start; if false, will get rid of them (i.e. delete them)", true);

@SetFromFlag("quarantineFilter")
ConfigKey<Predicate<? super Throwable>> QUARANTINE_FILTER = ConfigKeys.newConfigKey(
new TypeToken<Predicate<? super Throwable>>() {},
"dynamiccluster.quarantineFilter",
"Quarantine the failed nodes that pass this filter (given the exception thrown by the node). "
+ "Default is those that did not fail with NoMachinesAvailableException "
+ "(Config ignored if quarantineFailedEntities is false)",
null);

AttributeSensor<Lifecycle> SERVICE_STATE_ACTUAL = Attributes.SERVICE_STATE_ACTUAL;

BasicNotificationSensor<Entity> ENTITY_QUARANTINED = new BasicNotificationSensor<Entity>(Entity.class, "dynamiccluster.entityQuarantined", "Entity failed to start, and has been quarantined");
@@ -38,6 +38,7 @@
import org.apache.brooklyn.api.entity.Group;
import org.apache.brooklyn.api.location.Location;
import org.apache.brooklyn.api.location.MachineProvisioningLocation;
import org.apache.brooklyn.api.location.NoMachinesAvailableException;
import org.apache.brooklyn.api.mgmt.Task;
import org.apache.brooklyn.api.policy.Policy;
import org.apache.brooklyn.api.sensor.AttributeSensor;
@@ -50,6 +51,7 @@
import org.apache.brooklyn.core.entity.lifecycle.Lifecycle;
import org.apache.brooklyn.core.entity.lifecycle.ServiceStateLogic;
import org.apache.brooklyn.core.entity.lifecycle.ServiceStateLogic.ServiceProblemsLogic;
import org.apache.brooklyn.core.entity.trait.Resizable;
import org.apache.brooklyn.core.entity.trait.Startable;
import org.apache.brooklyn.core.entity.trait.StartableMethods;
import org.apache.brooklyn.core.location.Locations;
@@ -80,6 +82,7 @@
import com.google.common.base.Functions;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
@@ -331,6 +334,19 @@ protected QuarantineGroup getQuarantineGroup() {
return getAttribute(QUARANTINE_GROUP);
}

protected Predicate<? super Throwable> getQuarantineFilter() {
Predicate<? super Throwable> result = getConfig(QUARANTINE_FILTER);
if (result != null) {
return result;
} else {
return new Predicate<Throwable>() {
@Override public boolean apply(Throwable input) {
return Exceptions.getFirstThrowableOfType(input, NoMachinesAvailableException.class) == null;
}
};
}
}

protected int getInitialQuorumSize() {
int initialSize = getConfig(INITIAL_SIZE).intValue();
int initialQuorumSize = getConfig(INITIAL_QUORUM_SIZE).intValue();
@@ -518,7 +534,20 @@ public Integer resize(Integer desiredSize) {
} else {
if (LOG.isDebugEnabled()) LOG.debug("Resize no-op {} from {} to {}", new Object[] {this, originalSize, desiredSize});
}
resizeByDelta(delta);
// If we managed to grow at all, then expect no exception.
// Otherwise, if failed because NoMachinesAvailable, then propagate as InsufficientCapacityException.
// This tells things like the AutoScalerPolicy to not keep retrying.
try {
resizeByDelta(delta);
} catch (Exception e) {
Exceptions.propagateIfFatal(e);
NoMachinesAvailableException nmae = Exceptions.getFirstThrowableOfType(e, NoMachinesAvailableException.class);
if (nmae != null) {
throw new Resizable.InsufficientCapacityException("Failed to resize", e);
} else {
throw Exceptions.propagate(e);
}
}
}
return getCurrentSize();
}
@@ -669,7 +698,7 @@ public Collection<Entity> resizeByDelta(int delta) {
}
}

/** <strong>Note</strong> for sub-clases; this method can be called while synchronized on {@link #mutex}. */
/** <strong>Note</strong> for sub-classes; this method can be called while synchronized on {@link #mutex}. */
protected Collection<Entity> grow(int delta) {
Preconditions.checkArgument(delta > 0, "Must call grow with positive delta.");

@@ -696,8 +725,10 @@ protected Collection<Entity> grow(int delta) {
chosenLocations = Collections.nCopies(delta, getLocation());
}

// create and start the entities
return addInEachLocation(chosenLocations, ImmutableMap.of()).getWithError();
// create and start the entities.
// if any fail, then propagate the error.
ReferenceWithError<Collection<Entity>> result = addInEachLocation(chosenLocations, ImmutableMap.of());
return result.getWithError();
}

/** <strong>Note</strong> for sub-clases; this method can be called while synchronized on {@link #mutex}. */
@@ -786,7 +817,7 @@ protected ReferenceWithError<Collection<Entity>> addInEachLocation(Iterable<Loca
// quarantine/cleanup as necessary
if (!errors.isEmpty()) {
if (isQuarantineEnabled()) {
quarantineFailedNodes(errors.keySet());
quarantineFailedNodes(errors);
} else {
cleanupFailedNodes(errors.keySet());
}
@@ -796,11 +827,18 @@ protected ReferenceWithError<Collection<Entity>> addInEachLocation(Iterable<Loca
return ReferenceWithError.newInstanceWithoutError(result);
}

protected void quarantineFailedNodes(Collection<Entity> failedEntities) {
for (Entity entity : failedEntities) {
sensors().emit(ENTITY_QUARANTINED, entity);
getQuarantineGroup().addMember(entity);
removeMember(entity);
protected void quarantineFailedNodes(Map<Entity, Throwable> failedEntities) {
for (Map.Entry<Entity, Throwable> entry : failedEntities.entrySet()) {
Entity entity = entry.getKey();
Throwable cause = entry.getValue();
if (cause == null || getQuarantineFilter().apply(cause)) {
sensors().emit(ENTITY_QUARANTINED, entity);
getQuarantineGroup().addMember(entity);
removeMember(entity);
} else {
LOG.info("Cluster {} discarding failed node {}, rather than quarantining", this, entity);
discardNode(entity);
}
}
}

@@ -60,7 +60,7 @@ public interface FailingEntity extends TestEntity {
ConfigKey<Predicate<? super FailingEntity>> FAIL_ON_RESTART_CONDITION = (ConfigKey) ConfigKeys.newConfigKey(Predicate.class, "failOnRestartCondition", "Whether to throw exception on call to restart", null);

@SetFromFlag("exceptionClazz")
ConfigKey<Class<? extends RuntimeException>> EXCEPTION_CLAZZ = (ConfigKey) ConfigKeys.newConfigKey(Class.class, "exceptionClazz", "Type of exception to throw", IllegalStateException.class);
ConfigKey<Class<? extends Exception>> EXCEPTION_CLAZZ = (ConfigKey) ConfigKeys.newConfigKey(Class.class, "exceptionClazz", "Type of exception to throw", IllegalStateException.class);

@SetFromFlag("execOnFailure")
ConfigKey<Function<? super FailingEntity,?>> EXEC_ON_FAILURE = (ConfigKey) ConfigKeys.newConfigKey(Function.class, "execOnFailure", "Callback to execute before throwing an exception, on any failure", Functions.identity());
@@ -79,7 +79,12 @@ private RuntimeException fail(final String msg) {

private RuntimeException newException(String msg) {
try {
return getConfig(EXCEPTION_CLAZZ).getConstructor(String.class).newInstance("Simulating entity stop failure for test");
Exception result = getConfig(EXCEPTION_CLAZZ).getConstructor(String.class).newInstance("Simulating entity stop failure for test");
if (!(result instanceof RuntimeException)) {
return new RuntimeException("wrapping", result);
} else {
return (RuntimeException)result;
}
} catch (Exception e) {
throw Exceptions.propagate(e);
}
@@ -18,13 +18,23 @@
*/
package org.apache.brooklyn.core.test.entity;

import java.util.List;

import org.apache.brooklyn.api.entity.EntityLocal;
import org.apache.brooklyn.api.entity.ImplementedBy;
import org.apache.brooklyn.config.ConfigKey;
import org.apache.brooklyn.core.config.ConfigKeys;
import org.apache.brooklyn.entity.group.DynamicCluster;

/**
* Mock cluster entity for testing.
*/
@ImplementedBy(TestClusterImpl.class)
public interface TestCluster extends DynamicCluster, EntityLocal {

ConfigKey<Integer> MAX_SIZE = ConfigKeys.newIntegerConfigKey("testCluster.maxSize", "Size after which it will throw InsufficientCapacityException", Integer.MAX_VALUE);

List<Integer> getSizeHistory();

List<Integer> getDesiredSizeHistory();
}
@@ -18,22 +18,32 @@
*/
package org.apache.brooklyn.core.test.entity;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import org.apache.brooklyn.core.entity.trait.Startable;
import org.apache.brooklyn.entity.group.DynamicClusterImpl;
import org.apache.brooklyn.util.collections.QuorumCheck.QuorumChecks;

import com.google.common.collect.ImmutableList;

/**
* Mock cluster entity for testing.
*/
public class TestClusterImpl extends DynamicClusterImpl implements TestCluster {
private volatile int size;

private final List<Integer> desiredSizeHistory = Collections.synchronizedList(new ArrayList<Integer>());
private final List<Integer> sizeHistory = Collections.synchronizedList(new ArrayList<Integer>());

public TestClusterImpl() {
}

@Override
public void init() {
super.init();
sizeHistory.add(size);
size = getConfig(INITIAL_SIZE);
sensors().set(Startable.SERVICE_UP, true);
}
@@ -48,10 +58,35 @@ protected void initEnrichers() {

@Override
public Integer resize(Integer desiredSize) {
this.size = desiredSize;
desiredSizeHistory.add(desiredSize);
int achievableSize = Math.min(desiredSize, getConfig(MAX_SIZE));

if (achievableSize != size) {
this.sizeHistory.add(achievableSize);
this.size = achievableSize;
}

if (desiredSize > achievableSize) {
throw new InsufficientCapacityException("Simulating insufficient capacity (desiredSize="+desiredSize+"; maxSize="+getConfig(MAX_SIZE)+"; newSize="+size+")");
}

return size;
}

@Override
public List<Integer> getSizeHistory() {
synchronized (sizeHistory) {
return ImmutableList.copyOf(sizeHistory);
}
}

@Override
public List<Integer> getDesiredSizeHistory() {
synchronized (desiredSizeHistory) {
return ImmutableList.copyOf(desiredSizeHistory);
}
}

@Override
public void stop() {
size = 0;

0 comments on commit d9d1337

Please sign in to comment.