From 0918608c829c398cd9aa5cabfa0612b4af44866d Mon Sep 17 00:00:00 2001 From: Svetoslav Neykov Date: Wed, 22 Oct 2014 11:51:19 +0300 Subject: [PATCH] BrooklynNode cluster + upgrade effector Also effectors to: * select master in the cluster * set HA priority * set HA state --- .../entity/basic/EntityPredicates.java | 9 + .../entity/brooklynnode/BrooklynCluster.java | 56 ++++ .../brooklynnode/BrooklynClusterImpl.java | 110 ++++++++ .../BrooklynEntityMirrorImpl.java | 2 - .../entity/brooklynnode/BrooklynNode.java | 30 +- .../entity/brooklynnode/BrooklynNodeImpl.java | 16 +- .../effector/SelectMasterEffectorBody.java | 173 ++++++++++++ .../effector/SetHAModeEffectorBody.java | 64 +++++ .../effector/SetHAPriorityEffectorBody.java | 55 ++++ .../effector/UpgradeClusterEffectorBody.java | 199 +++++++++++++ .../effector/CallbackEntityHttpClient.java | 90 ++++++ .../effector/SelectMasterEffectorTest.java | 267 ++++++++++++++++++ .../brooklynnode/effector/TestHttpEntity.java | 66 +++++ .../java/brooklyn/rest/api/ServerApi.java | 23 +- .../rest/resources/ServerResource.java | 26 +- 15 files changed, 1178 insertions(+), 8 deletions(-) create mode 100644 software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynCluster.java create mode 100644 software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynClusterImpl.java create mode 100644 software/base/src/main/java/brooklyn/entity/brooklynnode/effector/SelectMasterEffectorBody.java create mode 100644 software/base/src/main/java/brooklyn/entity/brooklynnode/effector/SetHAModeEffectorBody.java create mode 100644 software/base/src/main/java/brooklyn/entity/brooklynnode/effector/SetHAPriorityEffectorBody.java create mode 100644 software/base/src/main/java/brooklyn/entity/brooklynnode/effector/UpgradeClusterEffectorBody.java create mode 100644 software/base/src/test/java/brooklyn/entity/brooklynnode/effector/CallbackEntityHttpClient.java create mode 100644 software/base/src/test/java/brooklyn/entity/brooklynnode/effector/SelectMasterEffectorTest.java create mode 100644 software/base/src/test/java/brooklyn/entity/brooklynnode/effector/TestHttpEntity.java diff --git a/core/src/main/java/brooklyn/entity/basic/EntityPredicates.java b/core/src/main/java/brooklyn/entity/basic/EntityPredicates.java index e0db9e9058..b359b1e2bc 100644 --- a/core/src/main/java/brooklyn/entity/basic/EntityPredicates.java +++ b/core/src/main/java/brooklyn/entity/basic/EntityPredicates.java @@ -90,6 +90,15 @@ public boolean apply(@Nullable Entity input) { }; } + public static Predicate attributeNotEqualTo(final AttributeSensor attribute, final T val) { + return new SerializablePredicate() { + @Override + public boolean apply(@Nullable Entity input) { + return (input != null) && !Objects.equal(input.getAttribute(attribute), val); + } + }; + } + public static Predicate configEqualTo(final ConfigKey configKey, final T val) { return new SerializablePredicate() { @Override diff --git a/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynCluster.java b/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynCluster.java new file mode 100644 index 0000000000..a3c21574a5 --- /dev/null +++ b/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynCluster.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package brooklyn.entity.brooklynnode; + +import brooklyn.config.ConfigKey; +import brooklyn.entity.Effector; +import brooklyn.entity.basic.ConfigKeys; +import brooklyn.entity.basic.SoftwareProcess; +import brooklyn.entity.effector.Effectors; +import brooklyn.entity.group.DynamicCluster; +import brooklyn.entity.proxying.ImplementedBy; +import brooklyn.event.AttributeSensor; +import brooklyn.event.basic.BasicAttributeSensor; + +@ImplementedBy(BrooklynClusterImpl.class) +public interface BrooklynCluster extends DynamicCluster { + public static final AttributeSensor MASTER_NODE = new BasicAttributeSensor( + BrooklynNode.class, "brooklyncluster.master", "Pointer to the child node with MASTER state in the cluster"); + + public interface SelectMasterEffector { + ConfigKey NEW_MASTER_ID = ConfigKeys.newStringConfigKey( + "brooklyncluster.new_master_id", "The ID of the node to become master", null); + Effector SELECT_MASTER = Effectors.effector(Void.class, "selectMaster") + .description("Select a new master in the cluster") + .parameter(NEW_MASTER_ID) + .buildAbstract(); + } + + public static final Effector SELECT_MASTER = SelectMasterEffector.SELECT_MASTER; + + public interface UpgradeClusterEffector { + Effector UPGRADE_CLUSTER = Effectors.effector(Void.class, "upgradeCluster") + .description("Upgrade the cluster with new distribution version") + .parameter(SoftwareProcess.DOWNLOAD_URL.getConfigKey()) + .buildAbstract(); + } + + public static final Effector UPGRADE_CLUSTER = UpgradeClusterEffector.UPGRADE_CLUSTER; + +} diff --git a/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynClusterImpl.java b/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynClusterImpl.java new file mode 100644 index 0000000000..bfaf33c443 --- /dev/null +++ b/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynClusterImpl.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package brooklyn.entity.brooklynnode; + +import java.util.Collection; +import java.util.concurrent.Callable; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import brooklyn.entity.Entity; +import brooklyn.entity.basic.EntityPredicates; +import brooklyn.entity.basic.ServiceStateLogic.ServiceNotUpLogic; +import brooklyn.entity.brooklynnode.effector.SelectMasterEffectorBody; +import brooklyn.entity.brooklynnode.effector.UpgradeClusterEffectorBody; +import brooklyn.entity.group.DynamicClusterImpl; +import brooklyn.event.feed.function.FunctionFeed; +import brooklyn.event.feed.function.FunctionPollConfig; +import brooklyn.management.ha.ManagementNodeState; +import brooklyn.util.time.Duration; + +import com.google.common.collect.FluentIterable; +import com.google.common.collect.Iterables; + +public class BrooklynClusterImpl extends DynamicClusterImpl implements BrooklynCluster { + + private static final String MSG_NO_MASTER = "No master node in cluster"; + + private static final Logger LOG = LoggerFactory.getLogger(BrooklynClusterImpl.class); + + //TODO set MEMBER_SPEC + + private FunctionFeed scanMaster; + + @Override + public void init() { + super.init(); + getMutableEntityType().addEffector(SelectMasterEffectorBody.SELECT_MASTER); + getMutableEntityType().addEffector(UpgradeClusterEffectorBody.UPGRADE_CLUSTER); + + ServiceNotUpLogic.updateNotUpIndicator(this, MASTER_NODE, MSG_NO_MASTER); + scanMaster = FunctionFeed.builder() + .entity(this) + .poll(new FunctionPollConfig(MASTER_NODE) + .period(Duration.ONE_SECOND) + .callable(new Callable() { + @Override + public BrooklynNode call() throws Exception { + return findMasterChild(); + } + })) + .build(); + } + + private BrooklynNode findMasterChild() { + Collection masters = FluentIterable.from(getMembers()) + .filter(EntityPredicates.attributeEqualTo(BrooklynNode.MANAGEMENT_NODE_STATE, ManagementNodeState.MASTER)) + .toList(); + + if (masters.size() == 0) { + ServiceNotUpLogic.updateNotUpIndicator(this, MASTER_NODE, MSG_NO_MASTER); + return null; + } else if (masters.size() == 1) { + ServiceNotUpLogic.clearNotUpIndicator(this, MASTER_NODE); + return (BrooklynNode)Iterables.getOnlyElement(masters); + } else if (masters.size() == 2) { + //Probably hit a window where we have a new master + //its BrooklynNode picked it up, but the BrooklynNode + //for the old master hasn't refreshed its state yet. + //Just pick one of them, should sort itself out in next update. + LOG.warn("Two masters detected, probably a handover just occured: " + masters); + + //Don't clearNotUpIndicator - if there were no masters previously + //why have two now. + + return (BrooklynNode)Iterables.getOnlyElement(masters); + } else { + //Set on fire? + String msg = "Multiple (>=3) master nodes in cluster: " + masters; + LOG.error(msg); + throw new IllegalStateException(msg); + } + } + + @Override + public void stop() { + super.stop(); + + if (scanMaster != null && scanMaster.isActivated()) { + scanMaster.stop(); + } + } + +} diff --git a/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynEntityMirrorImpl.java b/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynEntityMirrorImpl.java index a71402f7e8..b0fb72800d 100644 --- a/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynEntityMirrorImpl.java +++ b/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynEntityMirrorImpl.java @@ -25,8 +25,6 @@ import javax.annotation.Nullable; import org.apache.http.HttpStatus; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import brooklyn.entity.Effector; import brooklyn.entity.basic.AbstractEntity; diff --git a/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynNode.java b/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynNode.java index b6ffb97ba4..cde11b4925 100644 --- a/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynNode.java +++ b/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynNode.java @@ -40,6 +40,8 @@ import brooklyn.event.basic.BasicAttributeSensorAndConfigKey.StringAttributeSensorAndConfigKey; import brooklyn.event.basic.MapConfigKey; import brooklyn.event.basic.PortAttributeSensorAndConfigKey; +import brooklyn.management.ha.HighAvailabilityMode; +import brooklyn.management.ha.ManagementNodeState; import brooklyn.util.collections.MutableMap; import brooklyn.util.flags.SetFromFlag; import brooklyn.util.net.Networking; @@ -215,8 +217,11 @@ static enum ExistingFileBehaviour { "brooklynnode.webconsole.portMapper", "Function for mapping private to public ports, for use in inferring the brooklyn URI", Functions.identity()); public static final AttributeSensor WEB_CONSOLE_URI = new BasicAttributeSensor( - URI.class, "brooklynnode.webconsole.url", "URL of the brooklyn web-console"); - + URI.class, "brooklynnode.webconsole.url", "URL of the brooklyn web-console"); + + public static final AttributeSensor MANAGEMENT_NODE_STATE = new BasicAttributeSensor( + ManagementNodeState.class, "brooklynnode.ha.state", "High-availability state of the management node (MASTER, HOT_STANDBY, etc)"); + @SetFromFlag("noShutdownOnExit") public static final ConfigKey NO_SHUTDOWN_ON_EXIT = ConfigKeys.newBooleanConfigKey("brooklynnode.noshutdownonexit", "Whether to shutdown entities on exit", false); @@ -274,6 +279,25 @@ public interface StopNodeAndKillAppsEffector { public static final Effector STOP_NODE_AND_KILL_APPS = StopNodeAndKillAppsEffector.STOP_NODE_AND_KILL_APPS; - public EntityHttpClient http(); + public interface SetHAPriorityEffector { + ConfigKey PRIORITY = ConfigKeys.newIntegerConfigKey("priority", "HA priority"); + Effector SET_HA_PRIORITY = Effectors.effector(Integer.class, "setHAPriotity") + .description("Set HA priority on the node, returns the old priority") + .parameter(PRIORITY) + .buildAbstract(); + } + public static final Effector SET_HA_PRIORITY = SetHAPriorityEffector.SET_HA_PRIORITY; + + public interface SetHAModeEffector { + ConfigKey MODE = ConfigKeys.newConfigKey(HighAvailabilityMode.class, "mode", "HA mode"); + Effector SET_HA_MODE = Effectors.effector(ManagementNodeState.class, "setHAMode") + .description("Set HA mode on the node, returns the existing state") + .parameter(MODE) + .buildAbstract(); + } + + public static final Effector SET_HA_MODE = SetHAModeEffector.SET_HA_MODE; + + public EntityHttpClient http(); } diff --git a/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynNodeImpl.java b/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynNodeImpl.java index 56582c6cf2..9c4488e554 100644 --- a/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynNodeImpl.java +++ b/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynNodeImpl.java @@ -37,13 +37,17 @@ import brooklyn.entity.basic.Lifecycle; import brooklyn.entity.basic.ServiceStateLogic.ServiceNotUpLogic; import brooklyn.entity.basic.SoftwareProcessImpl; +import brooklyn.entity.brooklynnode.effector.SetHAModeEffectorBody; +import brooklyn.entity.brooklynnode.effector.SetHAPriorityEffectorBody; import brooklyn.entity.effector.EffectorBody; import brooklyn.entity.effector.Effectors; import brooklyn.event.feed.ConfigToAttributes; import brooklyn.event.feed.http.HttpFeed; import brooklyn.event.feed.http.HttpPollConfig; import brooklyn.event.feed.http.HttpValueFunctions; +import brooklyn.event.feed.http.JsonFunctions; import brooklyn.management.TaskAdaptable; +import brooklyn.management.ha.ManagementNodeState; import brooklyn.util.collections.Jsonya; import brooklyn.util.collections.MutableMap; import brooklyn.util.config.ConfigBag; @@ -51,6 +55,7 @@ import brooklyn.util.exceptions.PropagatedRuntimeException; import brooklyn.util.guava.Functionals; import brooklyn.util.http.HttpToolResponse; +import brooklyn.util.javalang.Enums; import brooklyn.util.javalang.JavaClassNames; import brooklyn.util.task.DynamicTasks; import brooklyn.util.task.TaskTags; @@ -60,6 +65,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; import com.google.gson.Gson; public class BrooklynNodeImpl extends SoftwareProcessImpl implements BrooklynNode { @@ -92,6 +98,8 @@ public void init() { getMutableEntityType().addEffector(ShutdownEffectorBody.SHUTDOWN); getMutableEntityType().addEffector(StopNodeButLeaveAppsEffectorBody.STOP_NODE_BUT_LEAVE_APPS); getMutableEntityType().addEffector(StopNodeAndKillAppsEffectorBody.STOP_NODE_AND_KILL_APPS); + getMutableEntityType().addEffector(SetHAPriorityEffectorBody.SET_HA_PRIORITY); + getMutableEntityType().addEffector(SetHAModeEffectorBody.SET_HA_MODE); } @Override @@ -186,7 +194,9 @@ public Void call(ConfigBag parameters) { .addIfNotNull("delayForHttpReturn", toNullableString(parameters.get(DELAY_FOR_HTTP_RETURN))); try { HttpToolResponse resp = ((BrooklynNode)entity()).http() - .post("/v1/server/shutdown", MutableMap.of(), formParams); + .post("/v1/server/shutdown", + ImmutableMap.of("Brooklyn-Allow-Non-Master-Access", "true"), + formParams); if (resp.getResponseCode() != HttpStatus.SC_NO_CONTENT) { throw new IllegalStateException("Response code "+resp.getResponseCode()); } @@ -322,6 +332,10 @@ protected void connectSensors() { .poll(new HttpPollConfig(WEB_CONSOLE_ACCESSIBLE) .onSuccess(HttpValueFunctions.responseCodeEquals(200)) .setOnFailureOrException(false)) + .poll(new HttpPollConfig(MANAGEMENT_NODE_STATE) + .suburl("/v1/server/ha/state") + .onSuccess(Functionals.chain(Functionals.chain(HttpValueFunctions.jsonContents(), JsonFunctions.cast(String.class)), Enums.fromStringFunction(ManagementNodeState.class))) + .setOnFailureOrException(null)) .build(); if (!Lifecycle.RUNNING.equals(getAttribute(SERVICE_STATE_ACTUAL))) { diff --git a/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/SelectMasterEffectorBody.java b/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/SelectMasterEffectorBody.java new file mode 100644 index 0000000000..255b27cade --- /dev/null +++ b/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/SelectMasterEffectorBody.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package brooklyn.entity.brooklynnode.effector; + +import java.util.NoSuchElementException; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import brooklyn.entity.Effector; +import brooklyn.entity.Entity; +import brooklyn.entity.Group; +import brooklyn.entity.basic.EntityPredicates; +import brooklyn.entity.brooklynnode.BrooklynCluster; +import brooklyn.entity.brooklynnode.BrooklynCluster.SelectMasterEffector; +import brooklyn.entity.brooklynnode.BrooklynNode; +import brooklyn.entity.brooklynnode.BrooklynNode.SetHAModeEffector; +import brooklyn.entity.brooklynnode.BrooklynNode.SetHAPriorityEffector; +import brooklyn.entity.effector.EffectorBody; +import brooklyn.entity.effector.Effectors; +import brooklyn.management.ha.HighAvailabilityMode; +import brooklyn.management.ha.ManagementNodeState; +import brooklyn.util.collections.MutableMap; +import brooklyn.util.config.ConfigBag; +import brooklyn.util.repeat.Repeater; +import brooklyn.util.task.DynamicTasks; +import brooklyn.util.time.Duration; + +import com.google.api.client.util.Preconditions; +import com.google.common.collect.Iterables; + +public class SelectMasterEffectorBody extends EffectorBody implements SelectMasterEffector { + public static final Effector SELECT_MASTER = Effectors.effector(SelectMasterEffector.SELECT_MASTER).impl(new SelectMasterEffectorBody()).build(); + + private static final Logger LOG = LoggerFactory.getLogger(SelectMasterEffectorBody.class); + + private static final int HA_STANDBY_PRIORITY = 0; + private static final int HA_MASTER_PRIORITY = 1; + + private AtomicBoolean selectMasterInProgress = new AtomicBoolean(); + + @Override + public Void call(ConfigBag parameters) { + if (!selectMasterInProgress.compareAndSet(false, true)) { + throw new IllegalStateException("A master change is already in progress."); + } + + try { + selectMaster(parameters); + } finally { + selectMasterInProgress.set(false); + } + return null; + } + + private void selectMaster(ConfigBag parameters) { + String newMasterId = parameters.get(NEW_MASTER_ID); + Preconditions.checkNotNull(newMasterId, NEW_MASTER_ID.getName() + " parameter is required"); + + final Entity oldMaster = entity().getAttribute(BrooklynCluster.MASTER_NODE); + if (oldMaster != null && oldMaster.getId().equals(newMasterId)) { + LOG.info(newMasterId + " is already the current master, no change needed."); + return; + } + + final Entity newMaster = getMember(newMasterId); + + //1. Increase the priority of the node we wish to become master + setNodePriority(newMaster, HA_MASTER_PRIORITY); + + //2. Denote the existing master so a new election takes place + try { + //If no master was yet selected, at least wait to see + //if the new master will be what we expect. + if (oldMaster != null) { + setNodeState(oldMaster, HighAvailabilityMode.HOT_STANDBY); + } + + waitMasterHandover(oldMaster, newMaster); + } finally { + //3. Revert the priority of the node once it has become master + setNodePriority(newMaster, HA_STANDBY_PRIORITY); + } + + checkMasterSelected(newMaster); + } + + private void waitMasterHandover(final Entity oldMaster, final Entity newMaster) { + boolean masterChanged = Repeater.create() + .backoff(Duration.millis(500), 1.2, Duration.FIVE_SECONDS) + .limitTimeTo(Duration.ONE_MINUTE) + .until(new Callable() { + @Override + public Boolean call() throws Exception { + Entity master = getMasterNode(); + return master != oldMaster && master != null; + } + }) + .run(); + if (!masterChanged) { + LOG.warn("Timeout waiting for node to become master: " + newMaster + "."); + } + } + + private void setNodeState(Entity oldMaster, HighAvailabilityMode mode) { + ManagementNodeState oldState = DynamicTasks.queue( + Effectors.invocation( + oldMaster, + BrooklynNode.SET_HA_MODE, + MutableMap.of(SetHAModeEffector.MODE, mode)) + ).asTask().getUnchecked(); + + if (oldState != ManagementNodeState.MASTER) { + LOG.warn("The previous HA state on node " + oldMaster.getId() + " was " + oldState + + ", while the expected value is " + ManagementNodeState.MASTER + "."); + } + } + + private void setNodePriority(Entity newMaster, int newPriority) { + Integer oldPriority = DynamicTasks.queue( + Effectors.invocation( + newMaster, + BrooklynNode.SET_HA_PRIORITY, + MutableMap.of(SetHAPriorityEffector.PRIORITY, newPriority)) + ).asTask().getUnchecked(); + + Integer expectedPriority = (newPriority == HA_MASTER_PRIORITY ? HA_STANDBY_PRIORITY : HA_MASTER_PRIORITY); + if (oldPriority != expectedPriority) { + LOG.warn("The previous HA priority on node " + newMaster.getId() + " was " + oldPriority + + ", while the expected value is " + expectedPriority + " (while setting priority " + + newPriority + ")."); + } + } + + private void checkMasterSelected(Entity newMaster) { + Entity actualMaster = getMasterNode(); + if (actualMaster != newMaster) { + throw new IllegalStateException("Expected node " + newMaster + " to be master, but found that " + + "master is " + actualMaster + " instead."); + } + } + + private Entity getMember(String memberId) { + Group cluster = (Group)entity(); + try { + return Iterables.find(cluster.getMembers(), EntityPredicates.idEqualTo(memberId)); + } catch (NoSuchElementException e) { + throw new IllegalStateException(memberId + " is not an ID of brooklyn node in this cluster"); + } + } + + private Entity getMasterNode() { + return entity().getAttribute(BrooklynCluster.MASTER_NODE); + } +} diff --git a/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/SetHAModeEffectorBody.java b/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/SetHAModeEffectorBody.java new file mode 100644 index 0000000000..36a51d08f3 --- /dev/null +++ b/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/SetHAModeEffectorBody.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package brooklyn.entity.brooklynnode.effector; + +import org.apache.http.HttpStatus; + +import brooklyn.entity.Effector; +import brooklyn.entity.brooklynnode.BrooklynNode; +import brooklyn.entity.brooklynnode.BrooklynNode.SetHAModeEffector; +import brooklyn.entity.brooklynnode.EntityHttpClient; +import brooklyn.entity.effector.EffectorBody; +import brooklyn.entity.effector.Effectors; +import brooklyn.event.feed.http.HttpValueFunctions; +import brooklyn.event.feed.http.JsonFunctions; +import brooklyn.management.ha.HighAvailabilityMode; +import brooklyn.management.ha.ManagementNodeState; +import brooklyn.util.config.ConfigBag; +import brooklyn.util.guava.Functionals; +import brooklyn.util.http.HttpToolResponse; +import brooklyn.util.javalang.Enums; + +import com.google.api.client.util.Preconditions; +import com.google.common.base.Function; +import com.google.common.collect.ImmutableMap; + +public class SetHAModeEffectorBody extends EffectorBody implements SetHAModeEffector { + public static final Effector SET_HA_MODE = Effectors.effector(SetHAModeEffector.SET_HA_MODE).impl(new SetHAModeEffectorBody()).build(); + + @Override + public ManagementNodeState call(ConfigBag parameters) { + HighAvailabilityMode mode = parameters.get(MODE); + Preconditions.checkNotNull(mode, MODE.getName() + " parameter is required"); + + EntityHttpClient httpClient = ((BrooklynNode)entity()).http(); + HttpToolResponse resp = httpClient.post("/v1/server/ha/state", + ImmutableMap.of("Brooklyn-Allow-Non-Master-Access", "true"), + ImmutableMap.of("mode", mode.toString())); + + if (resp.getResponseCode() == HttpStatus.SC_OK) { + Function parseRespone = Functionals.chain( + Functionals.chain(HttpValueFunctions.jsonContents(), JsonFunctions.cast(String.class)), + Enums.fromStringFunction(ManagementNodeState.class)); + return parseRespone.apply(resp); + } else { + throw new IllegalStateException("Unexpected response code: " + resp.getResponseCode() + "\n" + resp.getContentAsString()); + } + } +} diff --git a/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/SetHAPriorityEffectorBody.java b/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/SetHAPriorityEffectorBody.java new file mode 100644 index 0000000000..94a961c034 --- /dev/null +++ b/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/SetHAPriorityEffectorBody.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package brooklyn.entity.brooklynnode.effector; + +import org.apache.http.HttpStatus; + +import brooklyn.entity.Effector; +import brooklyn.entity.brooklynnode.BrooklynNode; +import brooklyn.entity.brooklynnode.BrooklynNode.SetHAPriorityEffector; +import brooklyn.entity.brooklynnode.EntityHttpClient; +import brooklyn.entity.effector.EffectorBody; +import brooklyn.entity.effector.Effectors; +import brooklyn.util.config.ConfigBag; +import brooklyn.util.http.HttpToolResponse; + +import com.google.api.client.util.Preconditions; +import com.google.common.collect.ImmutableMap; + +public class SetHAPriorityEffectorBody extends EffectorBody implements SetHAPriorityEffector { + public static final Effector SET_HA_PRIORITY = Effectors.effector(SetHAPriorityEffector.SET_HA_PRIORITY).impl(new SetHAPriorityEffectorBody()).build(); + + @Override + public Integer call(ConfigBag parameters) { + Integer priority = parameters.get(PRIORITY); + Preconditions.checkNotNull(priority, PRIORITY.getName() + " parameter is required"); + + EntityHttpClient httpClient = ((BrooklynNode)entity()).http(); + HttpToolResponse resp = httpClient.post("/v1/server/ha/priority", + ImmutableMap.of("Brooklyn-Allow-Non-Master-Access", "true"), + ImmutableMap.of("priority", priority.toString())); + + if (resp.getResponseCode() == HttpStatus.SC_OK) { + return Integer.valueOf(resp.getContentAsString()); + } else { + throw new IllegalStateException("Unexpected response code: " + resp.getResponseCode() + "\n" + resp.getContentAsString()); + } + } + +} diff --git a/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/UpgradeClusterEffectorBody.java b/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/UpgradeClusterEffectorBody.java new file mode 100644 index 0000000000..70b184afc0 --- /dev/null +++ b/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/UpgradeClusterEffectorBody.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package brooklyn.entity.brooklynnode.effector; + +import java.io.File; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +import brooklyn.config.ConfigKey; +import brooklyn.entity.Effector; +import brooklyn.entity.Entity; +import brooklyn.entity.Group; +import brooklyn.entity.basic.EntityPredicates; +import brooklyn.entity.basic.Lifecycle; +import brooklyn.entity.brooklynnode.BrooklynCluster; +import brooklyn.entity.brooklynnode.BrooklynCluster.SelectMasterEffector; +import brooklyn.entity.brooklynnode.BrooklynCluster.UpgradeClusterEffector; +import brooklyn.entity.brooklynnode.BrooklynNode; +import brooklyn.entity.brooklynnode.BrooklynNode.SetHAModeEffector; +import brooklyn.entity.effector.EffectorBody; +import brooklyn.entity.effector.Effectors; +import brooklyn.entity.group.DynamicCluster; +import brooklyn.entity.proxying.EntitySpec; +import brooklyn.event.AttributeSensor; +import brooklyn.management.ha.HighAvailabilityMode; +import brooklyn.management.ha.ManagementNodeState; +import brooklyn.util.collections.MutableMap; +import brooklyn.util.config.ConfigBag; +import brooklyn.util.exceptions.Exceptions; +import brooklyn.util.net.Urls; +import brooklyn.util.repeat.Repeater; +import brooklyn.util.task.DynamicTasks; +import brooklyn.util.task.Tasks; +import brooklyn.util.time.Duration; + +import com.google.api.client.util.Preconditions; +import com.google.common.base.Predicate; +import com.google.common.collect.Collections2; +import com.google.common.collect.Iterables; + +public class UpgradeClusterEffectorBody extends EffectorBody implements UpgradeClusterEffector { + public static final Effector UPGRADE_CLUSTER = Effectors.effector(UpgradeClusterEffector.UPGRADE_CLUSTER).impl(new UpgradeClusterEffectorBody()).build(); + + private AtomicBoolean upgradeInProgress = new AtomicBoolean(); + + @Override + public Void call(ConfigBag parameters) { + if (!upgradeInProgress.compareAndSet(false, true)) { + throw new IllegalStateException("An upgrade is already in progress."); + } + + EntitySpec memberSpec = entity().getConfig(BrooklynCluster.MEMBER_SPEC); + Preconditions.checkNotNull(memberSpec, BrooklynCluster.MEMBER_SPEC.getName() + " is required for " + UpgradeClusterEffector.class.getName()); + + Map, Object> specCfg = memberSpec.getConfig(); + String oldDownloadUrl = (String) specCfg.get(BrooklynNode.DOWNLOAD_URL); + String oldUploadUrl = (String) specCfg.get(BrooklynNode.DISTRO_UPLOAD_URL); + String newDownloadUrl = parameters.get(BrooklynNode.DOWNLOAD_URL.getConfigKey()); + String newUploadUrl = inferUploadUrl(newDownloadUrl); + try { + memberSpec.configure(BrooklynNode.DOWNLOAD_URL, newUploadUrl); + memberSpec.configure(BrooklynNode.DISTRO_UPLOAD_URL, newUploadUrl); + upgrade(parameters); + } catch (Exception e) { + memberSpec.configure(BrooklynNode.DOWNLOAD_URL, oldDownloadUrl); + memberSpec.configure(BrooklynNode.DISTRO_UPLOAD_URL, oldUploadUrl); + throw Exceptions.propagate(e); + } finally { + upgradeInProgress.set(false); + } + return null; + } + + private String inferUploadUrl(String newDownloadUrl) { + boolean isLocal = "file".equals(Urls.getProtocol(newDownloadUrl)) || new File(newDownloadUrl).exists(); + if (isLocal) { + return newDownloadUrl; + } else { + return null; + } + } + + private void upgrade(ConfigBag parameters) { + //TODO might be worth separating each step in a task for better UI + + Group cluster = (Group)entity(); + Collection initialMembers = cluster.getMembers(); + int initialClusterSize = initialMembers.size(); + + //1. Initially create a single node to check if it will launch successfully + Entity initialNode = Iterables.getOnlyElement(createNodes(1)); + + //2. If everything is OK with the first node launch the rest as well + Collection remainingNodes = createNodes(initialClusterSize - 1); + + //3. Once we have all nodes running without errors switch master + DynamicTasks.queue(Effectors.invocation(cluster, BrooklynCluster.SELECT_MASTER, MutableMap.of(SelectMasterEffector.NEW_MASTER_ID, initialNode.getId()))).asTask().getUnchecked(); + + //4. Stop the nodes which were running at the start of the upgrade call, but keep them around. + // Should we create a quarantine-like zone for old stopped version? + // For members that were created meanwhile - they will be using the new version already. If the new version + // isn't good then they will fail to start as well, forcing the policies to retry (and succeed once the + // URL is reverted). + HashSet oldMembers = new HashSet(initialMembers); + oldMembers.removeAll(remainingNodes); + oldMembers.remove(initialNode); + DynamicTasks.queue(Effectors.invocation(BrooklynNode.STOP_NODE_BUT_LEAVE_APPS, Collections.emptyMap(), oldMembers)).asTask().getUnchecked(); + } + + private Collection createNodes(int nodeCnt) { + DynamicCluster cluster = (DynamicCluster)entity(); + + //1. Create the nodes + Collection newNodes = cluster.resizeByDelta(nodeCnt); + + //2. Wait for them to be RUNNING + waitAttributeNotEqualTo( + newNodes, + BrooklynNode.SERVICE_STATE_ACTUAL, Lifecycle.STARTING); + + //3. Set HOT_STANDBY in case it is not enabled on the command line ... + DynamicTasks.queue(Effectors.invocation( + BrooklynNode.SET_HA_MODE, + MutableMap.of(SetHAModeEffector.MODE, HighAvailabilityMode.HOT_STANDBY), + newNodes)).asTask().getUnchecked(); + + //4. ... and wait until all of the nodes change state + //TODO if the REST call is blocking this is not needed + waitAttributeEqualTo( + newNodes, + BrooklynNode.MANAGEMENT_NODE_STATE, + ManagementNodeState.HOT_STANDBY); + + //5. Just in case check if all of the nodes are SERVICE_UP (which would rule out ON_FIRE as well) + Collection failedNodes = Collections2.filter(newNodes, EntityPredicates.attributeEqualTo(BrooklynNode.SERVICE_UP, Boolean.FALSE)); + if (!failedNodes.isEmpty()) { + throw new IllegalStateException("Nodes " + failedNodes + " are not " + BrooklynNode.SERVICE_UP + " though successfully in " + ManagementNodeState.HOT_STANDBY); + } + return newNodes; + } + + private void waitAttributeEqualTo(Collection nodes, AttributeSensor sensor, T value) { + waitPredicate( + nodes, + EntityPredicates.attributeEqualTo(sensor, value), + "Waiting for nodes " + nodes + ", sensor " + sensor + " to be " + value, + "Timeout while waiting for nodes " + nodes + ", sensor " + sensor + " to change to " + value); + } + + private void waitAttributeNotEqualTo(Collection nodes, AttributeSensor sensor, T value) { + waitPredicate( + nodes, + EntityPredicates.attributeNotEqualTo(sensor, value), + "Waiting for nodes " + nodes + ", sensor " + sensor + " to change from " + value, + "Timeout while waiting for nodes " + nodes + ", sensor " + sensor + " to change from " + value); + } + + private void waitPredicate(Collection nodes, Predicate waitPredicate, String blockingMsg, String errorMsg) { + Tasks.setBlockingDetails(blockingMsg); + boolean pollSuccess = Repeater.create(blockingMsg) + .backoff(Duration.ONE_SECOND, 1.2, Duration.TEN_SECONDS) + .limitTimeTo(Duration.ONE_HOUR) + .until(nodes, allMatch(waitPredicate)) + .run(); + Tasks.resetBlockingDetails(); + + if (!pollSuccess) { + throw new IllegalStateException(errorMsg); + } + } + + public static Predicate> allMatch(final Predicate predicate) { + return new Predicate>() { + @Override + public boolean apply(Collection input) { + return Iterables.all(input, predicate); + } + }; + } +} diff --git a/software/base/src/test/java/brooklyn/entity/brooklynnode/effector/CallbackEntityHttpClient.java b/software/base/src/test/java/brooklyn/entity/brooklynnode/effector/CallbackEntityHttpClient.java new file mode 100644 index 0000000000..8c8004b848 --- /dev/null +++ b/software/base/src/test/java/brooklyn/entity/brooklynnode/effector/CallbackEntityHttpClient.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package brooklyn.entity.brooklynnode.effector; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.apache.http.HttpStatus; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpPost; + +import brooklyn.entity.Entity; +import brooklyn.entity.brooklynnode.EntityHttpClient; +import brooklyn.util.http.HttpTool.HttpClientBuilder; +import brooklyn.util.http.HttpToolResponse; + +import com.google.common.base.Function; + +public class CallbackEntityHttpClient implements EntityHttpClient { + public static class Request { + private Entity entity; + private String method; + private String path; + private Map params; + public Request(Entity entity, String method, String path, Map params) { + this.entity = entity; + this.method = method; + this.path = path; + this.params = params; + } + public Entity getEntity() { + return entity; + } + public String getMethod() { + return method; + } + public String getPath() { + return path; + } + public Map getParams() { + return params; + } + } + private Function callback; + private Entity entity; + + public CallbackEntityHttpClient(Entity entity, Function callback) { + this.entity = entity; + this.callback = callback; + } + + @Override + public HttpClientBuilder getHttpClientForBrooklynNode() { + throw new IllegalStateException("Method call not expected"); + } + + @Override + public HttpToolResponse get(String path) { + String result = callback.apply(new Request(entity, HttpGet.METHOD_NAME, path, Collections.emptyMap())); + return new HttpToolResponse(HttpStatus.SC_OK, null, result.getBytes(), 0, 0, 0); + } + + @Override + public HttpToolResponse post(String path, Map headers, byte[] body) { + throw new IllegalStateException("Method call not expected"); + } + + @Override + public HttpToolResponse post(String path, Map headers, Map formParams) { + String result = callback.apply(new Request(entity, HttpPost.METHOD_NAME, path, formParams)); + return new HttpToolResponse(HttpStatus.SC_OK, Collections.>emptyMap(), result.getBytes(), 0, 0, 0); + } +} diff --git a/software/base/src/test/java/brooklyn/entity/brooklynnode/effector/SelectMasterEffectorTest.java b/software/base/src/test/java/brooklyn/entity/brooklynnode/effector/SelectMasterEffectorTest.java new file mode 100644 index 0000000000..6036507e0e --- /dev/null +++ b/software/base/src/test/java/brooklyn/entity/brooklynnode/effector/SelectMasterEffectorTest.java @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package brooklyn.entity.brooklynnode.effector; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; + +import java.util.Collection; +import java.util.Collections; +import java.util.concurrent.Callable; + +import org.apache.http.client.methods.HttpPost; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import brooklyn.entity.Entity; +import brooklyn.entity.Group; +import brooklyn.entity.basic.ApplicationBuilder; +import brooklyn.entity.basic.BasicApplication; +import brooklyn.entity.basic.EntityLocal; +import brooklyn.entity.brooklynnode.BrooklynCluster; +import brooklyn.entity.brooklynnode.BrooklynCluster.SelectMasterEffector; +import brooklyn.entity.brooklynnode.BrooklynClusterImpl; +import brooklyn.entity.brooklynnode.BrooklynNode; +import brooklyn.entity.brooklynnode.effector.CallbackEntityHttpClient.Request; +import brooklyn.entity.effector.Effectors; +import brooklyn.entity.group.DynamicCluster; +import brooklyn.entity.proxying.EntitySpec; +import brooklyn.event.feed.AttributePollHandler; +import brooklyn.event.feed.DelegatingPollHandler; +import brooklyn.event.feed.Poller; +import brooklyn.event.feed.function.FunctionFeed; +import brooklyn.management.ManagementContext; +import brooklyn.management.ha.ManagementNodeState; +import brooklyn.test.EntityTestUtils; +import brooklyn.test.entity.LocalManagementContextForTests; +import brooklyn.util.task.BasicExecutionContext; +import brooklyn.util.task.BasicExecutionManager; +import brooklyn.util.time.Duration; + +import com.google.common.base.Function; +import com.google.common.base.Objects; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; + +public class SelectMasterEffectorTest { + private static final Logger LOG = LoggerFactory.getLogger(BrooklynClusterImpl.class); + + protected ManagementContext mgmt; + protected BasicApplication app; + protected BasicExecutionContext ec; + protected BrooklynCluster cluster; + protected FunctionFeed scanMaster; + protected Poller poller; + + @BeforeMethod + public void setUp() { + mgmt = new LocalManagementContextForTests(); + EntitySpec appSpec = EntitySpec.create(BasicApplication.class) + .child(EntitySpec.create(BrooklynCluster.class)); + app = ApplicationBuilder.newManagedApp(appSpec, mgmt); + cluster = (BrooklynCluster)Iterables.getOnlyElement(app.getChildren()); + + BasicExecutionManager em = new BasicExecutionManager("mycontext"); + ec = new BasicExecutionContext(em); + + poller = new Poller((EntityLocal)app, false); + poller.scheduleAtFixedRate( + new Callable() { + @Override + public Void call() throws Exception { + masterFailoverIfNeeded(); + return null; + } + }, + new DelegatingPollHandler(Collections.>emptyList()), + Duration.millis(200)); + poller.start(); + } + + @AfterMethod + public void tearDown() { + poller.stop(); + } + + @Test + public void testInvalidNewMasterIdFails() { + try { + BrooklynCluster cluster = app.addChild(EntitySpec.create(BrooklynCluster.class)); + selectMaster(cluster, "1234"); + fail("Non-existend entity ID provided."); + } catch (Exception e) { + assertTrue(e.toString().contains("1234 is not an ID of brooklyn node in this cluster")); + } + } + + @Test + public void testSelectMaster() { + HttpCallback cb = new HttpCallback(); + BrooklynNode node1 = cluster.addMemberChild(EntitySpec.create(BrooklynNode.class) + .impl(TestHttpEntity.class) + .configure(TestHttpEntity.HTTP_CLIENT_CALLBACK, cb)); + BrooklynNode node2 = cluster.addMemberChild(EntitySpec.create(BrooklynNode.class) + .impl(TestHttpEntity.class) + .configure(TestHttpEntity.HTTP_CLIENT_CALLBACK, cb)); + + cluster.addMemberChild(node1); + cluster.addMemberChild(node2); + + setManagementState(node1, ManagementNodeState.MASTER); + EntityTestUtils.assertAttributeEqualsEventually(cluster, BrooklynCluster.MASTER_NODE, node1); + + selectMaster(cluster, node2.getId()); + checkMaster(cluster, node2); + } + + @Test(groups="WIP") + //after throwing an exception in HttpCallback tasks are no longer executed, why? + public void testSelectMasterFailsAtChangeState() { + HttpCallback cb = new HttpCallback(); + cb.setFailAtStateChange(true); + + BrooklynNode node1 = cluster.addMemberChild(EntitySpec.create(BrooklynNode.class) + .impl(TestHttpEntity.class) + .configure(TestHttpEntity.HTTP_CLIENT_CALLBACK, cb)); + BrooklynNode node2 = cluster.addMemberChild(EntitySpec.create(BrooklynNode.class) + .impl(TestHttpEntity.class) + .configure(TestHttpEntity.HTTP_CLIENT_CALLBACK, cb)); + + cluster.addMemberChild(node1); + cluster.addMemberChild(node2); + + setManagementState(node1, ManagementNodeState.MASTER); + EntityTestUtils.assertAttributeEqualsEventually(cluster, BrooklynCluster.MASTER_NODE, node1); + + selectMaster(cluster, node2.getId()); + checkMaster(cluster, node1); + } + + private void checkMaster(Group cluster, Entity node) { + assertEquals(node.getAttribute(BrooklynNode.MANAGEMENT_NODE_STATE), ManagementNodeState.MASTER); + assertEquals(cluster.getAttribute(BrooklynCluster.MASTER_NODE), node); + for (Entity member : cluster.getMembers()) { + if (member != node) { + assertEquals(member.getAttribute(BrooklynNode.MANAGEMENT_NODE_STATE), ManagementNodeState.HOT_STANDBY); + } + assertEquals((int)member.getAttribute(TestHttpEntity.HA_PRIORITY), 0); + } + } + + private static class HttpCallback implements Function { + private enum State { + INITIAL, + PROMOTED + } + private State state = State.INITIAL; + private boolean failAtStateChange; + + @Override + public String apply(Request input) { + if ("/v1/server/ha/state".equals(input.getPath())) { + if (failAtStateChange) { + throw new RuntimeException("Testing failure at chaning node state"); + } + + checkRequest(input, HttpPost.METHOD_NAME, "/v1/server/ha/state", "mode", "HOT_STANDBY"); + Entity entity = input.getEntity(); + EntityTestUtils.assertAttributeEquals(entity, BrooklynNode.MANAGEMENT_NODE_STATE, ManagementNodeState.MASTER); + EntityTestUtils.assertAttributeEquals(entity, TestHttpEntity.HA_PRIORITY, 0); + + setManagementState(entity, ManagementNodeState.HOT_STANDBY); + + return "MASTER"; + } else { + switch(state) { + case INITIAL: + checkRequest(input, HttpPost.METHOD_NAME, "/v1/server/ha/priority", "priority", "1"); + state = State.PROMOTED; + setPriority(input.getEntity(), Integer.parseInt(input.getParams().get("priority"))); + return "0"; + case PROMOTED: + checkRequest(input, HttpPost.METHOD_NAME, "/v1/server/ha/priority", "priority", "0"); + state = State.INITIAL; + setPriority(input.getEntity(), Integer.parseInt(input.getParams().get("priority"))); + return "1"; + default: throw new IllegalStateException("Illegal call at state " + state + ". Request = " + input.getMethod() + " " + input.getPath()); + } + } + } + + public void checkRequest(Request input, String methodName, String path, String... keyValue) { + if (!input.getMethod().equals(methodName) || !input.getPath().equals(path)) { + throw new IllegalStateException("Request doesn't match expected state. Expected = " + input.getMethod() + " " + input.getPath() + ". " + + "Actual = " + methodName + " " + path); + } + for(int i = 0; i < keyValue.length / 2; i++) { + String key = keyValue[i]; + String value = keyValue[i+1]; + String inputValue = input.getParams().get(key); + if(!Objects.equal(value, inputValue)) { + throw new IllegalStateException("Request doesn't match expected parameter " + methodName + " " + path + ". Parameter " + key + + " expected = " + value + ", actual = " + inputValue); + } + } + } + + public void setFailAtStateChange(boolean failAtStateChange) { + this.failAtStateChange = failAtStateChange; + } + + } + + private void masterFailoverIfNeeded() { + if (cluster.getAttribute(BrooklynCluster.MASTER_NODE) == null) { + Collection members = cluster.getMembers(); + if (members.size() > 0) { + for (Entity member : members) { + if (member.getAttribute(TestHttpEntity.HA_PRIORITY) == 1) { + masterFailover(member); + return; + } + } + masterFailover(members.iterator().next()); + } + } + } + + private void masterFailover(Entity member) { + LOG.debug("Master failover to " + member); + setManagementState(member, ManagementNodeState.MASTER); + EntityTestUtils.assertAttributeEqualsEventually(cluster, BrooklynCluster.MASTER_NODE, (BrooklynNode)member); + return; + } + + public static void setManagementState(Entity entity, ManagementNodeState state) { + ((EntityLocal)entity).setAttribute(BrooklynNode.MANAGEMENT_NODE_STATE, state); + } + + public static void setPriority(Entity entity, int priority) { + ((EntityLocal)entity).setAttribute(TestHttpEntity.HA_PRIORITY, priority); + } + + private void selectMaster(DynamicCluster cluster, String id) { + ec.submit(Effectors.invocation(cluster, BrooklynCluster.SELECT_MASTER, ImmutableMap.of(SelectMasterEffector.NEW_MASTER_ID.getName(), id))).asTask().getUnchecked(); + } + +} diff --git a/software/base/src/test/java/brooklyn/entity/brooklynnode/effector/TestHttpEntity.java b/software/base/src/test/java/brooklyn/entity/brooklynnode/effector/TestHttpEntity.java new file mode 100644 index 0000000000..259a271cd3 --- /dev/null +++ b/software/base/src/test/java/brooklyn/entity/brooklynnode/effector/TestHttpEntity.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package brooklyn.entity.brooklynnode.effector; + +import java.util.Collection; + +import brooklyn.config.ConfigKey; +import brooklyn.entity.basic.AbstractEntity; +import brooklyn.entity.basic.ConfigKeys; +import brooklyn.entity.brooklynnode.BrooklynNode; +import brooklyn.entity.brooklynnode.EntityHttpClient; +import brooklyn.entity.brooklynnode.effector.CallbackEntityHttpClient.Request; +import brooklyn.event.AttributeSensor; +import brooklyn.event.basic.BasicAttributeSensor; +import brooklyn.location.Location; + +import com.google.common.base.Function; +import com.google.common.reflect.TypeToken; + +public class TestHttpEntity extends AbstractEntity implements BrooklynNode { + @SuppressWarnings("serial") + public static final ConfigKey> HTTP_CLIENT_CALLBACK = ConfigKeys.newConfigKey(new TypeToken>(){}, "httpClientCallback"); + public static final AttributeSensor HA_PRIORITY = new BasicAttributeSensor(Integer.class, "priority"); + + @Override + public void init() { + super.init(); + getMutableEntityType().addEffector(SetHAPriorityEffectorBody.SET_HA_PRIORITY); + getMutableEntityType().addEffector(SetHAModeEffectorBody.SET_HA_MODE); + setAttribute(HA_PRIORITY, 0); + } + + @Override + public EntityHttpClient http() { + return new CallbackEntityHttpClient(this, getConfig(HTTP_CLIENT_CALLBACK)); + } + + @Override + public void start(Collection locations) { + } + + @Override + public void stop() { + } + + @Override + public void restart() { + } + +} diff --git a/usage/rest-api/src/main/java/brooklyn/rest/api/ServerApi.java b/usage/rest-api/src/main/java/brooklyn/rest/api/ServerApi.java index e0ba7eaf4a..5d1ca9ba54 100644 --- a/usage/rest-api/src/main/java/brooklyn/rest/api/ServerApi.java +++ b/usage/rest-api/src/main/java/brooklyn/rest/api/ServerApi.java @@ -27,6 +27,7 @@ import javax.ws.rs.Produces; import javax.ws.rs.core.MediaType; +import brooklyn.management.ha.HighAvailabilityMode; import brooklyn.management.ha.ManagementNodeState; import brooklyn.rest.apidoc.Apidoc; import brooklyn.rest.domain.HighAvailabilitySummary; @@ -90,14 +91,34 @@ public void shutdown( @ApiOperation(value = "Returns the HA state of this management node") public ManagementNodeState getHighAvailabilityNodeState(); + @POST + @Path("/ha/state") + @ApiOperation(value = "Changes the HA state of this management node") + public ManagementNodeState setHighAvailabilityNodeState( + @ApiParam(name = "state", value = "The state to change to") + @FormParam("mode") HighAvailabilityMode mode); + @GET @Path("/ha/states") @ApiOperation(value = "Returns the HA states and detail for all nodes in this management plane", responseClass = "brooklyn.rest.domain.HighAvailabilitySummary") public HighAvailabilitySummary getHighAvailabilityPlaneStates(); + + @GET + @Path("/ha/priority") + @ApiOperation(value = "Returns the HA node priority for MASTER failover") + public long getHighAvailabitlityPriority(); + @POST + @Path("/ha/priority") + @ApiOperation(value = "Sets the HA node priority for MASTER failover") + public long setHighAvailabilityPriority( + @ApiParam(name = "priority", value = "The priority to be set") + @FormParam("priority") long priority); + @GET @Path("/user") @ApiOperation(value = "Return user information for this Brooklyn instance", responseClass = "String", multiValueResponse = false) - public String getUser(); + public String getUser(); + } diff --git a/usage/rest-server/src/main/java/brooklyn/rest/resources/ServerResource.java b/usage/rest-server/src/main/java/brooklyn/rest/resources/ServerResource.java index 55fe968dcb..a9cf8a565c 100644 --- a/usage/rest-server/src/main/java/brooklyn/rest/resources/ServerResource.java +++ b/usage/rest-server/src/main/java/brooklyn/rest/resources/ServerResource.java @@ -38,6 +38,8 @@ import brooklyn.management.Task; import brooklyn.management.entitlement.EntitlementContext; import brooklyn.management.entitlement.Entitlements; +import brooklyn.management.ha.HighAvailabilityManager; +import brooklyn.management.ha.HighAvailabilityMode; import brooklyn.management.ha.ManagementNodeState; import brooklyn.management.ha.ManagementPlaneSyncRecord; import brooklyn.management.internal.ManagementContextInternal; @@ -228,7 +230,28 @@ public HighAvailabilitySummary getHighAvailability() { public ManagementNodeState getHighAvailabilityNodeState() { return mgmt().getHighAvailabilityManager().getNodeState(); } - + + @Override + public ManagementNodeState setHighAvailabilityNodeState(HighAvailabilityMode mode) { + HighAvailabilityManager haMgr = mgmt().getHighAvailabilityManager(); + ManagementNodeState existingState = haMgr.getNodeState(); + haMgr.changeMode(mode); + return existingState; + } + + @Override + public long getHighAvailabitlityPriority() { + return mgmt().getHighAvailabilityManager().getPriority(); + } + + @Override + public long setHighAvailabilityPriority(long priority) { + HighAvailabilityManager haMgr = mgmt().getHighAvailabilityManager(); + long oldPrio = haMgr.getPriority(); + haMgr.setPriority(priority); + return oldPrio; + } + @Override public HighAvailabilitySummary getHighAvailabilityPlaneStates() { ManagementPlaneSyncRecord memento = mgmt().getHighAvailabilityManager().getManagementPlaneSyncState(); @@ -244,4 +267,5 @@ public String getUser() { return null; //User can be null if no authentication was requested } } + }