From 33f1e49e5919ce58f600336f9929947be2778e7d Mon Sep 17 00:00:00 2001 From: Alex Heneveld Date: Fri, 19 Jun 2015 08:31:19 -0700 Subject: [PATCH] sensors at riak cluster level and other minor tidies for riak and couchbase --- .../nosql/couchbase/CouchbaseClusterImpl.java | 1 + .../nosql/couchbase/CouchbaseNodeImpl.java | 3 - .../entity/nosql/riak/RiakCluster.java | 4 ++ .../entity/nosql/riak/RiakClusterImpl.java | 55 +++++++++++++++++-- .../brooklyn/entity/nosql/riak/RiakNode.java | 11 ++-- .../entity/nosql/riak/RiakNodeImpl.java | 4 ++ 6 files changed, 65 insertions(+), 13 deletions(-) diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseClusterImpl.java b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseClusterImpl.java index dcdafb6c09..5c47fe7300 100644 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseClusterImpl.java +++ b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseClusterImpl.java @@ -414,6 +414,7 @@ public Object apply(Set input) { if (config().getLocalRaw(UP_QUORUM_CHECK).isAbsent()) { // TODO Only leaving CouchbaseQuorumCheck here in case it is contained in persisted state. // If so, need a transformer and then to delete it + @SuppressWarnings({ "unused", "hiding" }) @Deprecated class CouchbaseQuorumCheck implements QuorumCheck { @Override diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeImpl.java b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeImpl.java index e4b1c0ae93..d7439ca4ce 100644 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeImpl.java +++ b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeImpl.java @@ -21,7 +21,6 @@ import static java.lang.String.format; import java.net.URI; -import java.nio.charset.Charset; import java.util.Collection; import java.util.Map; import java.util.Set; @@ -37,7 +36,6 @@ import brooklyn.event.AttributeSensor; import brooklyn.event.SensorEvent; import brooklyn.event.SensorEventListener; -import brooklyn.event.basic.DependentConfiguration; import brooklyn.event.feed.http.HttpFeed; import brooklyn.event.feed.http.HttpPollConfig; import brooklyn.event.feed.http.HttpValueFunctions; @@ -55,7 +53,6 @@ import brooklyn.util.http.HttpTool; import brooklyn.util.http.HttpToolResponse; import brooklyn.util.net.Urls; -import brooklyn.util.task.DynamicTasks; import brooklyn.util.task.Tasks; import brooklyn.util.text.Strings; import brooklyn.util.time.Duration; diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakCluster.java b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakCluster.java index 41aa6be36f..876ebde7c0 100644 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakCluster.java +++ b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakCluster.java @@ -58,4 +58,8 @@ public interface RiakCluster extends DynamicCluster { AttributeSensor RIAK_CONSOLE_URI = Attributes.MAIN_URI; + AttributeSensor NODE_GETS_1MIN_PER_NODE = Sensors.newIntegerSensor("riak.node.gets.1m.perNode", "Gets in the last minute, averaged across cluster"); + AttributeSensor NODE_PUTS_1MIN_PER_NODE = Sensors.newIntegerSensor("riak.node.puts.1m.perNode", "Puts in the last minute, averaged across cluster"); + AttributeSensor NODE_OPS_1MIN_PER_NODE = Sensors.newIntegerSensor("riak.node.ops.1m.perNode", "Sum of node gets and puts in the last minute, averaged across cluster"); + } diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakClusterImpl.java b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakClusterImpl.java index 637346e1a8..7b256c0780 100644 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakClusterImpl.java +++ b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakClusterImpl.java @@ -41,10 +41,11 @@ import brooklyn.entity.group.DynamicClusterImpl; import brooklyn.entity.proxying.EntitySpec; import brooklyn.entity.trait.Startable; +import brooklyn.event.AttributeSensor; import brooklyn.event.basic.DependentConfiguration; -import brooklyn.location.Location; import brooklyn.policy.EnricherSpec; import brooklyn.policy.PolicySpec; +import brooklyn.util.task.Tasks; import brooklyn.util.time.Duration; import brooklyn.util.time.Time; @@ -53,6 +54,7 @@ import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Predicates; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; @@ -71,13 +73,20 @@ public void init() { } @Override - public void start(Collection locations) { - super.start(locations); + protected void doStart() { + super.doStart(); connectSensors(); - Time.sleep(getConfig(DELAY_BEFORE_ADVERTISING_CLUSTER)); + try { + Duration delay = getConfig(DELAY_BEFORE_ADVERTISING_CLUSTER); + Tasks.setBlockingDetails("Sleeping for "+delay+" before advertising cluster available"); + Time.sleep(delay); + } finally { + Tasks.resetBlockingDetails(); + } //FIXME: add a quorum to tolerate failed nodes before setting on fire. + @SuppressWarnings("unchecked") Optional anyNode = Iterables.tryFind(getMembers(), Predicates.and( Predicates.instanceOf(RiakNode.class), EntityPredicates.attributeEqualTo(RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER, true), @@ -91,8 +100,9 @@ public void start(Collection locations) { } protected EntitySpec getMemberSpec() { - return getConfig(MEMBER_SPEC, EntitySpec.create(RiakNode.class)); - + EntitySpec result = config().get(MEMBER_SPEC); + if (result!=null) return result; + return EntitySpec.create(RiakNode.class); } protected void connectSensors() { @@ -112,6 +122,38 @@ public URI apply(Collection input) { .fromMembers() .build(); addEnricher(first); + + Map, ? extends AttributeSensor> enricherSetup = + ImmutableMap., AttributeSensor>builder() + .put(RiakNode.NODE_PUTS, RiakCluster.NODE_PUTS_1MIN_PER_NODE) + .put(RiakNode.NODE_GETS, RiakCluster.NODE_GETS_1MIN_PER_NODE) + .put(RiakNode.NODE_OPS, RiakCluster.NODE_OPS_1MIN_PER_NODE) + .build(); + // construct sum and average over cluster + for (AttributeSensor nodeSensor : enricherSetup.keySet()) { + addSummingMemberEnricher(nodeSensor); + addAveragingMemberEnricher(nodeSensor, enricherSetup.get(nodeSensor)); + } + } + + private void addAveragingMemberEnricher(AttributeSensor fromSensor, AttributeSensor toSensor) { + addEnricher(Enrichers.builder() + .aggregating(fromSensor) + .publishing(toSensor) + .fromMembers() + .computingAverage() + .build() + ); + } + + private void addSummingMemberEnricher(AttributeSensor source) { + addEnricher(Enrichers.builder() + .aggregating(source) + .publishing(source) + .fromMembers() + .computingSum() + .build() + ); } protected void onServerPoolMemberChanged(final Entity member) { @@ -161,6 +203,7 @@ protected void onServerPoolMemberChanged(final Entity member) { } else { if (nodes != null && nodes.containsKey(member)) { DependentConfiguration.attributeWhenReady(member, RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER, Predicates.equalTo(false)).blockUntilEnded(Duration.TWO_MINUTES); + @SuppressWarnings("unchecked") Optional anyNodeInCluster = Iterables.tryFind(nodes.keySet(), Predicates.and( Predicates.instanceOf(RiakNode.class), EntityPredicates.attributeEqualTo(RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER, true), diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNode.java b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNode.java index d82c3f471c..1f29366d39 100644 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNode.java +++ b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNode.java @@ -126,10 +126,10 @@ public interface RiakNode extends SoftwareProcess { PortAttributeSensorAndConfigKey SEARCH_SOLR_PORT = new PortAttributeSensorAndConfigKey("search.solr.port", "Solr port", "8093+"); PortAttributeSensorAndConfigKey SEARCH_SOLR_JMX_PORT = new PortAttributeSensorAndConfigKey("search.solr.jmx_port", "Solr port", "8985+"); - AttributeSensor NODE_GETS = Sensors.newIntegerSensor("riak.node.gets"); - AttributeSensor NODE_GETS_TOTAL = Sensors.newIntegerSensor("riak.node.gets.total"); - AttributeSensor NODE_PUTS = Sensors.newIntegerSensor("riak.node.puts"); - AttributeSensor NODE_PUTS_TOTAL = Sensors.newIntegerSensor("riak.node.puts.total"); + AttributeSensor NODE_GETS = Sensors.newIntegerSensor("riak.node.gets", "Gets in the last minute"); + AttributeSensor NODE_GETS_TOTAL = Sensors.newIntegerSensor("riak.node.gets.total", "Total gets since node started"); + AttributeSensor NODE_PUTS = Sensors.newIntegerSensor("riak.node.puts", "Puts in the last minute"); + AttributeSensor NODE_PUTS_TOTAL = Sensors.newIntegerSensor("riak.node.puts.total", "Total puts since node started"); AttributeSensor VNODE_GETS = Sensors.newIntegerSensor("riak.vnode.gets"); AttributeSensor VNODE_GETS_TOTAL = Sensors.newIntegerSensor("riak.vnode.gets.total"); @@ -147,6 +147,9 @@ public interface RiakNode extends SoftwareProcess { @SuppressWarnings("serial") AttributeSensor> RING_MEMBERS = Sensors.newSensor(new TypeToken>() {}, "ring.members", "all the riak nodes in the ring"); + + AttributeSensor NODE_OPS = Sensors.newIntegerSensor("riak.node.ops", "Sum of node gets and puts in the last minute"); + AttributeSensor NODE_OPS_TOTAL = Sensors.newIntegerSensor("riak.node.ops.total", "Sum of node gets and puts since the node started"); MethodEffector JOIN_RIAK_CLUSTER = new MethodEffector(RiakNode.class, "joinCluster"); MethodEffector LEAVE_RIAK_CLUSTER = new MethodEffector(RiakNode.class, "leaveCluster"); diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeImpl.java b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeImpl.java index dea8b87dd8..590cb3a972 100644 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeImpl.java +++ b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeImpl.java @@ -27,6 +27,7 @@ import javax.annotation.Nullable; +import brooklyn.enricher.Enrichers; import brooklyn.entity.basic.Entities; import brooklyn.entity.basic.SoftwareProcessImpl; import brooklyn.entity.webapp.WebAppServiceMethods; @@ -78,6 +79,7 @@ public void init() { maxOpenFiles, defaultMaxOpenFiles); } + @SuppressWarnings("rawtypes") public boolean isPackageDownloadUrlProvided() { AttributeSensorAndConfigKey[] downloadProperties = { DOWNLOAD_URL_RHEL_CENTOS, DOWNLOAD_URL_UBUNTU, DOWNLOAD_URL_DEBIAN }; for (AttributeSensorAndConfigKey property : downloadProperties) { @@ -182,6 +184,8 @@ public List apply(@Nullable String[] strings) { httpFeed = httpFeedBuilder.build(); + addEnricher(Enrichers.builder().combining(NODE_GETS, NODE_PUTS).computingSum().publishing(NODE_OPS).build()); + addEnricher(Enrichers.builder().combining(NODE_GETS_TOTAL, NODE_PUTS_TOTAL).computingSum().publishing(NODE_OPS_TOTAL).build()); WebAppServiceMethods.connectWebAppServerPolicies(this); }