Skip to content
Permalink
Browse files
support dynamic cluster restart
  • Loading branch information
ahgittin committed Jan 21, 2016
1 parent 9141c99 commit ecbd90e22c989c7d24983942b7426c8a17c29463
Showing 3 changed files with 45 additions and 3 deletions.
@@ -173,14 +173,26 @@ public static <V> ConfigKey<V> asConfigKey(ParameterType<V> paramType) {
return ConfigKeys.newConfigKey(paramType.getParameterClass(), paramType.getName(), paramType.getDescription(), paramType.getDefaultValue());
}

/** returns an unsubmitted task which will invoke the given effector on the given entities;
* return type is Task<List<T>> (but haven't put in the blood sweat toil and tears to make the generics work) */
/** convenience for {@link #invocationParallel(Effector, Map, Iterable)} */
public static TaskAdaptable<List<?>> invocation(Effector<?> eff, Map<?,?> params, Iterable<? extends Entity> entities) {
return invocationParallel(eff, params, entities);
}

/** returns an unsubmitted task which will invoke the given effector on the given entities in parallel;
* return type is Task<List<T>> (but haven't put in the blood sweat toil and tears to make the generics work) */
public static TaskAdaptable<List<?>> invocationParallel(Effector<?> eff, Map<?,?> params, Iterable<? extends Entity> entities) {
List<TaskAdaptable<?>> tasks = new ArrayList<TaskAdaptable<?>>();
for (Entity e: entities) tasks.add(invocation(e, eff, params));
return Tasks.parallel("invoking "+eff+" on "+tasks.size()+" node"+(Strings.s(tasks.size())), tasks.toArray(new TaskAdaptable[tasks.size()]));
}

/** as {@link #invocationParallel(Effector, Map, Iterable)} but executing sequentially */
public static TaskAdaptable<List<?>> invocationSequential(Effector<?> eff, Map<?,?> params, Iterable<? extends Entity> entities) {
List<TaskAdaptable<?>> tasks = new ArrayList<TaskAdaptable<?>>();
for (Entity e: entities) tasks.add(invocation(e, eff, params));
return Tasks.sequential("invoking "+eff+" on "+tasks.size()+" node"+(Strings.s(tasks.size())), tasks.toArray(new TaskAdaptable[tasks.size()]));
}

/** returns an unsubmitted task which will invoke the given effector on the given entities
* (this form of method is a convenience for {@link #invocation(Effector, Map, Iterable)}) */
public static TaskAdaptable<List<?>> invocation(Effector<?> eff, MutableMap<?, ?> params, Entity ...entities) {
@@ -98,6 +98,14 @@ interface ZoneFailureDetector {

MethodEffector<Collection<Entity>> RESIZE_BY_DELTA = new MethodEffector<Collection<Entity>>(DynamicCluster.class, "resizeByDelta");

@SetFromFlag("restartMode")
ConfigKey<String> RESTART_MODE = ConfigKeys.newStringConfigKey(
"dynamiccluster.restartMode",
"How this cluster should handle restarts; "
+ "by default it is disallowed, but this key can specify a different mode. "
+ "Modes supported by dynamic cluster are 'off', 'sequqential', or 'parallel'. "
+ "However subclasses can define their own modes or may ignore this.", null);

@SetFromFlag("quarantineFailedEntities")
ConfigKey<Boolean> QUARANTINE_FAILED_ENTITIES = ConfigKeys.newBooleanConfigKey(
"dynamiccluster.quarantineFailedEntities", "If true, will quarantine entities that fail to start; if false, will get rid of them (i.e. delete them)", true);
@@ -47,6 +47,7 @@
import org.apache.brooklyn.core.config.render.RendererHints;
import org.apache.brooklyn.core.effector.Effectors;
import org.apache.brooklyn.core.entity.Entities;
import org.apache.brooklyn.core.entity.EntityPredicates;
import org.apache.brooklyn.core.entity.factory.EntityFactory;
import org.apache.brooklyn.core.entity.factory.EntityFactoryForLocation;
import org.apache.brooklyn.core.entity.lifecycle.Lifecycle;
@@ -522,7 +523,28 @@ public void stop() {

@Override
public void restart() {
throw new UnsupportedOperationException();
String mode = getConfig(RESTART_MODE);
if (mode==null) {
throw new UnsupportedOperationException("Restart not supported for this cluster: "+RESTART_MODE.getName()+" is not configured.");
}
if ("off".equalsIgnoreCase(mode)) {
throw new UnsupportedOperationException("Restart not supported for this cluster.");
}

if ("sequential".equalsIgnoreCase(mode)) {
ServiceStateLogic.setExpectedState(this, Lifecycle.STARTING);
DynamicTasks.queue(Effectors.invocationSequential(Startable.RESTART, null,
Iterables.filter(getChildren(), Predicates.and(Predicates.instanceOf(Startable.class), EntityPredicates.isManaged()))));
} else if ("parallel".equalsIgnoreCase(mode)) {
ServiceStateLogic.setExpectedState(this, Lifecycle.STARTING);
DynamicTasks.queue(Effectors.invocationParallel(Startable.RESTART, null,
Iterables.filter(getChildren(), Predicates.and(Predicates.instanceOf(Startable.class), EntityPredicates.isManaged()))));
} else {
throw new IllegalArgumentException("Unknown "+RESTART_MODE.getName()+" '"+mode+"'");
}

DynamicTasks.waitForLast();
ServiceStateLogic.setExpectedState(this, Lifecycle.RUNNING);
}

@Override

0 comments on commit ecbd90e

Please sign in to comment.