From 61ffb20b49393b8fef5888c5565a478849b097ba Mon Sep 17 00:00:00 2001 From: Martin Harris Date: Wed, 3 Sep 2014 15:28:22 +0100 Subject: [PATCH 1/4] Updates live tests to fail if node.hasJoinedCluster() does not become true, or if cluster fails to reach SERVICE_UP --- .../nosql/riak/RiakClusterEc2LiveTest.java | 32 +++++++++++++------ 1 file changed, 23 insertions(+), 9 deletions(-) diff --git a/software/nosql/src/test/java/brooklyn/entity/nosql/riak/RiakClusterEc2LiveTest.java b/software/nosql/src/test/java/brooklyn/entity/nosql/riak/RiakClusterEc2LiveTest.java index 743c8d8606..ca0dd057cf 100644 --- a/software/nosql/src/test/java/brooklyn/entity/nosql/riak/RiakClusterEc2LiveTest.java +++ b/software/nosql/src/test/java/brooklyn/entity/nosql/riak/RiakClusterEc2LiveTest.java @@ -22,15 +22,19 @@ import org.slf4j.LoggerFactory; import org.testng.annotations.Test; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Iterables; - import brooklyn.entity.AbstractEc2LiveTest; +import brooklyn.entity.basic.Attributes; import brooklyn.entity.proxying.EntitySpec; import brooklyn.location.Location; +import brooklyn.test.Asserts; import brooklyn.test.EntityTestUtils; +import com.google.common.base.Predicates; +import com.google.common.base.Supplier; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; + public class RiakClusterEc2LiveTest extends AbstractEc2LiveTest { @SuppressWarnings("unused") private static final Logger LOG = LoggerFactory.getLogger(RiakNodeEc2LiveTest.class); @@ -47,11 +51,21 @@ protected void doTest(Location loc) throws Exception { RiakNode first = (RiakNode) Iterables.get(cluster.getMembers(), 0); RiakNode second = (RiakNode) Iterables.get(cluster.getMembers(), 1); - EntityTestUtils.assertAttributeEqualsEventually(first, RiakNode.SERVICE_UP, true); - EntityTestUtils.assertAttributeEqualsEventually(second, RiakNode.SERVICE_UP, true); - - EntityTestUtils.assertAttributeEqualsEventually(first, RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER, true); - EntityTestUtils.assertAttributeEqualsEventually(second, RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER, true); + assertNodesUpAndInCluster(first, second); + + EntityTestUtils.assertAttributeEqualsEventually(cluster, Attributes.SERVICE_UP, true); + } + + private void assertNodesUpAndInCluster(final RiakNode... nodes) { + for (final RiakNode node : nodes) { + EntityTestUtils.assertAttributeEqualsEventually(node, RiakNode.SERVICE_UP, true); + EntityTestUtils.assertAttributeEqualsEventually(node, RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER, true); + Asserts.eventually(new Supplier() { + @Override public Boolean get() { + return node.hasJoinedCluster(); + } + }, Predicates.alwaysTrue()); + } } @Test(enabled = false) From 4fe224b84428592f03a87bcbfba04bafc622a0df Mon Sep 17 00:00:00 2001 From: Martin Harris Date: Wed, 3 Sep 2014 14:51:57 +0100 Subject: [PATCH 2/4] Fixes Riak cluster startup --- .../src/main/java/brooklyn/entity/nosql/riak/RiakNodeImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeImpl.java b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeImpl.java index 81777a3c85..b880c42852 100644 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeImpl.java +++ b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeImpl.java @@ -176,7 +176,7 @@ public void commitCluster() { @Override public boolean hasJoinedCluster() { - return Boolean.TRUE.equals(RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER); + return Boolean.TRUE.equals(getAttribute(RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER)); } @Override From 629923bf0288f381a6d89b8249ce0e081333e3e9 Mon Sep 17 00:00:00 2001 From: Martin Harris Date: Wed, 3 Sep 2014 14:52:53 +0100 Subject: [PATCH 3/4] Upps version number to 1.4.10, fail (rather than logs) if riakName is null --- .../entity/nosql/riak/RiakClusterImpl.java | 79 +++++++++---------- .../brooklyn/entity/nosql/riak/RiakNode.java | 2 +- 2 files changed, 38 insertions(+), 43 deletions(-) diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakClusterImpl.java b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakClusterImpl.java index a75c422242..80a9683af3 100644 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakClusterImpl.java +++ b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakClusterImpl.java @@ -21,7 +21,6 @@ import static brooklyn.util.JavaGroovyEquivalents.groovyTruth; import java.util.Collection; -import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; @@ -30,12 +29,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Optional; -import com.google.common.base.Predicate; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Iterables; -import com.google.common.collect.Maps; - import brooklyn.entity.Entity; import brooklyn.entity.basic.Entities; import brooklyn.entity.basic.EntityInternal; @@ -50,6 +43,12 @@ import brooklyn.policy.PolicySpec; import brooklyn.util.time.Time; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.collect.Maps; public class RiakClusterImpl extends DynamicClusterImpl implements RiakCluster { private static final Logger log = LoggerFactory.getLogger(RiakClusterImpl.class); @@ -108,49 +107,45 @@ protected synchronized void onServerPoolMemberChanged(Entity member) { // TODO can we discover the nodes by asking the riak cluster, rather than assuming what we add will be in there? // TODO and can we do join as part of node starting? - if (nodes == null) nodes = Maps.newLinkedHashMap(); + if (nodes == null) { + nodes = Maps.newLinkedHashMap(); + } + String riakName = getRiakName(member); + Preconditions.checkNotNull(riakName); - if (riakName == null) { - log.error("Unable to get riak name for node: {}", member.getId()); - } else { - //flag a first node to be the first node in the riak cluster. - if (!isFirstNodeSet.get()) { - nodes.put(member, riakName); - setAttribute(RIAK_CLUSTER_NODES, nodes); - - ((EntityInternal) member).setAttribute(RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER, Boolean.TRUE); - isFirstNodeSet.set(true); - - log.info("Adding riak node {}: {}; {} to cluster", new Object[]{this, member, getRiakName(member)}); + // flag a first node to be the first node in the riak cluster. + if (!isFirstNodeSet.getAndSet(true)) { + nodes.put(member, riakName); + setAttribute(RIAK_CLUSTER_NODES, nodes); - } else { + ((EntityInternal) member).setAttribute(RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER, Boolean.TRUE); - //TODO: be wary of erreneous nodes but are still flagged 'in cluster' - //add the new node to be part of the riak cluster. - Optional anyNodeInCluster = Iterables.tryFind(nodes.keySet(), new Predicate() { - @Override - public boolean apply(@Nullable Entity node) { - return (node instanceof RiakNode && hasMemberJoinedCluster(node)); - } - }); - - if (anyNodeInCluster.isPresent()) { - if (!nodes.containsKey(member) && !hasMemberJoinedCluster(member)) { + log.info("Adding riak node {}: {}; {} to cluster", new Object[] { this, member, getRiakName(member) }); + } else { + // TODO: be wary of erroneous nodes but are still flagged 'in cluster' + // add the new node to be part of the riak cluster. + Optional anyNodeInCluster = Iterables.tryFind(nodes.keySet(), new Predicate() { + @Override + public boolean apply(@Nullable Entity node) { + return (node instanceof RiakNode && hasMemberJoinedCluster(node)); + } + }); + if (anyNodeInCluster.isPresent()) { + if (!nodes.containsKey(member) && !hasMemberJoinedCluster(member)) { - String anyNodeName = anyNodeInCluster.get().getAttribute(RiakNode.RIAK_NODE_NAME); - Entities.invokeEffectorWithArgs(this, member, RiakNode.JOIN_RIAK_CLUSTER, anyNodeName); - if (getAttribute(IS_CLUSTER_INIT)) { - Entities.invokeEffector(RiakClusterImpl.this, anyNodeInCluster.get(), RiakNode.COMMIT_RIAK_CLUSTER); - } - nodes.put(member, riakName); - setAttribute(RIAK_CLUSTER_NODES, nodes); - log.info("Adding riak node {}: {}; {} to cluster", new Object[]{this, member, getRiakName(member)}); + String anyNodeName = anyNodeInCluster.get().getAttribute(RiakNode.RIAK_NODE_NAME); + Entities.invokeEffectorWithArgs(this, member, RiakNode.JOIN_RIAK_CLUSTER, anyNodeName); + if (getAttribute(IS_CLUSTER_INIT)) { + Entities.invokeEffector(RiakClusterImpl.this, anyNodeInCluster.get(), RiakNode.COMMIT_RIAK_CLUSTER); } - } else { - log.error("entity {}: is not present", member.getId()); + nodes.put(member, riakName); + setAttribute(RIAK_CLUSTER_NODES, nodes); + log.info("Adding riak node {}: {}; {} to cluster", new Object[] { this, member, getRiakName(member) }); } + } else { + log.error("isFirstNodeSet , but no cluster members found to add {}", member.getId()); } } } else { diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNode.java b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNode.java index b51c66acc4..0f5a47a82c 100644 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNode.java +++ b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNode.java @@ -39,7 +39,7 @@ public interface RiakNode extends SoftwareProcess { @SetFromFlag("version") ConfigKey SUGGESTED_VERSION = ConfigKeys.newConfigKeyWithDefault(SoftwareProcess.SUGGESTED_VERSION, - "1.4.8"); + "1.4.10"); @SetFromFlag("vmArgsTemplateUrl") ConfigKey RIAK_VM_ARGS_TEMPLATE_URL = ConfigKeys.newStringConfigKey( From bb884ecaea4a71edbc30ce0efe964d908c4c9f11 Mon Sep 17 00:00:00 2001 From: Martin Harris Date: Wed, 3 Sep 2014 15:25:45 +0100 Subject: [PATCH 4/4] Minor fixes, removes compiler warnings --- .../entity/nosql/riak/RiakCluster.java | 1 + .../entity/nosql/riak/RiakNodeImpl.java | 9 ++++---- .../entity/nosql/riak/RiakNodeSshDriver.java | 22 +++++-------------- .../nosql/riak/RiakNodeIntegrationTest.java | 7 +++--- 4 files changed, 15 insertions(+), 24 deletions(-) diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakCluster.java b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakCluster.java index 2a406dc1d6..c4fabedda6 100644 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakCluster.java +++ b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakCluster.java @@ -35,6 +35,7 @@ @ImplementedBy(RiakClusterImpl.class) public interface RiakCluster extends DynamicCluster { + @SuppressWarnings("serial") AttributeSensor> RIAK_CLUSTER_NODES = Sensors.newSensor(new TypeToken>() { }, "riak.cluster.nodes", "Names of all active Riak nodes in the cluster "); diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeImpl.java b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeImpl.java index b880c42852..a962b19a31 100644 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeImpl.java +++ b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeImpl.java @@ -27,9 +27,6 @@ import javax.annotation.Nullable; -import com.google.common.base.Function; -import com.google.common.base.Functions; - import brooklyn.entity.basic.Entities; import brooklyn.entity.basic.SoftwareProcessImpl; import brooklyn.entity.webapp.WebAppServiceMethods; @@ -40,6 +37,10 @@ import brooklyn.location.cloud.CloudLocationConfig; import brooklyn.util.collections.MutableSet; import brooklyn.util.config.ConfigBag; +import brooklyn.util.guava.Functionals; + +import com.google.common.base.Function; +import com.google.common.base.Functions; public class RiakNodeImpl extends SoftwareProcessImpl implements RiakNode { @@ -135,7 +136,7 @@ public void connectSensors() { .onSuccess(HttpValueFunctions.jsonContents("pbc_active", Integer.class)) .onFailureOrException(Functions.constant(-1))) .poll(new HttpPollConfig>(RING_MEMBERS) - .onSuccess(HttpValueFunctions.chain( + .onSuccess(Functionals.chain( HttpValueFunctions.jsonContents("ring_members", String[].class), new Function>() { @Nullable diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeSshDriver.java b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeSshDriver.java index 372b0bdb55..13250ce1e3 100644 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeSshDriver.java +++ b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeSshDriver.java @@ -18,7 +18,12 @@ */ package brooklyn.entity.nosql.riak; -import static brooklyn.util.ssh.BashCommands.*; +import static brooklyn.util.ssh.BashCommands.INSTALL_CURL; +import static brooklyn.util.ssh.BashCommands.INSTALL_TAR; +import static brooklyn.util.ssh.BashCommands.alternatives; +import static brooklyn.util.ssh.BashCommands.chainGroup; +import static brooklyn.util.ssh.BashCommands.commandToDownloadUrlAs; +import static brooklyn.util.ssh.BashCommands.sudo; import static java.lang.String.format; import java.util.List; @@ -28,7 +33,6 @@ import org.slf4j.LoggerFactory; import brooklyn.entity.basic.AbstractSoftwareProcessSshDriver; -import brooklyn.entity.basic.Attributes; import brooklyn.entity.basic.Entities; import brooklyn.entity.basic.lifecycle.ScriptHelper; import brooklyn.entity.software.SshEffectorTasks; @@ -39,7 +43,6 @@ import brooklyn.util.os.Os; import brooklyn.util.task.DynamicTasks; -import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; @@ -249,10 +252,6 @@ private String getRiakAdminCmd() { return isPackageInstall ? "riak-admin" : Urls.mergePaths(getExpandedInstallDir(), "bin/riak-admin"); } - private String getAppConfigLocation() { - return Urls.mergePaths(getRiakEtcDir(), "app.config"); - } - @Override public void joinCluster(String nodeName) { //FIXME: find a way to batch commit the changes, instead of committing for every operation. @@ -370,15 +369,6 @@ private String getVmArgsLocation() { return Urls.mergePaths(getRiakEtcDir(), "vm.args"); } - private String getPrivateIp() { - Optional subnetAddress = Optional.fromNullable(entity.getAttribute(Attributes.SUBNET_ADDRESS)); - - if (subnetAddress.isPresent()) - return subnetAddress.get(); - else - throw new IllegalArgumentException("Subnet address is not set."); - } - private Boolean hasJoinedCluster() { return ((RiakNode) entity).hasJoinedCluster(); } diff --git a/software/nosql/src/test/java/brooklyn/entity/nosql/riak/RiakNodeIntegrationTest.java b/software/nosql/src/test/java/brooklyn/entity/nosql/riak/RiakNodeIntegrationTest.java index 0833b870d6..1b92e25c9e 100644 --- a/software/nosql/src/test/java/brooklyn/entity/nosql/riak/RiakNodeIntegrationTest.java +++ b/software/nosql/src/test/java/brooklyn/entity/nosql/riak/RiakNodeIntegrationTest.java @@ -24,9 +24,6 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -import com.google.common.collect.ImmutableList; - -import brooklyn.entity.basic.ApplicationBuilder; import brooklyn.entity.basic.Entities; import brooklyn.entity.proxying.EntitySpec; import brooklyn.entity.trait.Startable; @@ -34,6 +31,8 @@ import brooklyn.test.EntityTestUtils; import brooklyn.test.entity.TestApplication; +import com.google.common.collect.ImmutableList; + public class RiakNodeIntegrationTest { private TestApplication app; @@ -42,7 +41,7 @@ public class RiakNodeIntegrationTest { @BeforeMethod(alwaysRun = true) public void setUp() throws Exception { localhostProvisioningLocation = new LocalhostMachineProvisioningLocation(); - app = ApplicationBuilder.newManagedApp(TestApplication.class); + app = TestApplication.Factory.newManagedInstanceForTests(); } @AfterMethod(alwaysRun = true)