Permalink
Browse files

Merge pull request #1473 from Nakomis/couchbase-sensors

Couchbase sensors
  • Loading branch information...
2 parents 7358c58 + 3dfdda5 commit b6512014ae62683877b556c98bec66ef340e7f96 @ahgittin ahgittin committed Jun 13, 2014
@@ -41,4 +41,8 @@
@SetFromFlag("serviceUpTimeOut")
ConfigKey<Duration> SERVICE_UP_TIME_OUT = ConfigKeys.newConfigKey(Duration.class, "couchbase.cluster.serviceUpTimeOut", "Service up time out duration for all the couchbase nodes", Duration.seconds(3 * 60));
+
+ @SuppressWarnings("serial")
+ AttributeSensor<List<String>> COUCHBASE_CLUSTER_UP_NODE_ADDRESSES = Sensors.newSensor(new TypeToken<List<String>>() {},
+ "couchbase.cluster.node.addresses", "List of host:port of all active nodes in the cluster (http admin port, and public hostname/IP)");
}
@@ -3,11 +3,13 @@
import static brooklyn.util.JavaGroovyEquivalents.groovyTruth;
import java.util.Collection;
+import java.util.List;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import brooklyn.enricher.Enrichers;
import brooklyn.entity.Entity;
import brooklyn.entity.basic.Attributes;
import brooklyn.entity.basic.Entities;
@@ -17,6 +19,7 @@
import brooklyn.entity.group.DynamicClusterImpl;
import brooklyn.entity.proxying.EntitySpec;
import brooklyn.entity.trait.Startable;
+import brooklyn.event.AttributeSensor;
import brooklyn.event.SensorEvent;
import brooklyn.event.SensorEventListener;
import brooklyn.location.Location;
@@ -25,7 +28,9 @@
import brooklyn.util.task.Tasks;
import brooklyn.util.time.Time;
+import com.google.common.base.Function;
import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
public class CouchbaseClusterImpl extends DynamicClusterImpl implements CouchbaseCluster {
@@ -35,6 +40,46 @@
public void init() {
log.info("Initializing the Couchbase cluster...");
super.init();
+
+ addEnricher(
+ Enrichers.builder()
+ .transforming(COUCHBASE_CLUSTER_UP_NODES)
+ .from(this)
+ .publishing(COUCHBASE_CLUSTER_UP_NODE_ADDRESSES)
+ .computing(new Function<Set<Entity>, List<String>>() {
+ @Override public List<String> apply(Set<Entity> input) {
+ List<String> addresses = Lists.newArrayList();
+ for (Entity entity : input) {
+ addresses.add(String.format("%s:%s", entity.getAttribute(Attributes.ADDRESS),
+ entity.getAttribute(CouchbaseNode.COUCHBASE_WEB_ADMIN_PORT)));
+ }
+ return addresses;
+ }
+ }).build()
+ );
+
+ addSummingMemberEnricher(CouchbaseNode.OPS);
+ addSummingMemberEnricher(CouchbaseNode.COUCH_DOCS_DATA_SIZE);
+ addSummingMemberEnricher(CouchbaseNode.COUCH_DOCS_ACTUAL_DISK_SIZE);
+ addSummingMemberEnricher(CouchbaseNode.EP_BG_FETCHED);
+ addSummingMemberEnricher(CouchbaseNode.MEM_USED);
+ addSummingMemberEnricher(CouchbaseNode.COUCH_VIEWS_ACTUAL_DISK_SIZE);
+ addSummingMemberEnricher(CouchbaseNode.CURR_ITEMS);
+ addSummingMemberEnricher(CouchbaseNode.VB_REPLICA_CURR_ITEMS);
+ addSummingMemberEnricher(CouchbaseNode.COUCH_VIEWS_DATA_SIZE);
+ addSummingMemberEnricher(CouchbaseNode.GET_HITS);
+ addSummingMemberEnricher(CouchbaseNode.CMD_GET);
+ addSummingMemberEnricher(CouchbaseNode.CURR_ITEMS_TOT);
+ }
+
+ private void addSummingMemberEnricher(AttributeSensor<Integer> source) {
+ addEnricher(Enrichers.builder()
+ .aggregating(source)
+ .publishing(source)
+ .fromMembers()
+ .computingSum()
+ .build()
+ );
}
@Override
@@ -255,4 +300,5 @@ public boolean isClusterInitialized() {
public boolean isMemberInCluster(Entity e) {
return Optional.fromNullable(e.getAttribute(CouchbaseNode.IS_IN_CLUSTER)).or(false);
}
+
}
@@ -53,6 +53,32 @@
AttributeSensor<Boolean> IS_IN_CLUSTER = Sensors.newBooleanSensor("couchbase.isInCluster", "flag to determine if the current couchbase node has been added to a cluster");
public static final AttributeSensor<String> COUCHBASE_WEB_ADMIN_URL = WebAppServiceConstants.ROOT_URL; // By using this specific sensor, the value will be shown in the summary tab
+ // Interesting stats
+ AttributeSensor<Integer> OPS = Sensors.newIntegerSensor("couchbase.stats.ops",
+ "Retrieved from pools/nodes/<current node>/interestingStats/ops");
+ AttributeSensor<Integer> COUCH_DOCS_DATA_SIZE = Sensors.newIntegerSensor("couchbase.stats.couch.docs.data.size",
+ "Retrieved from pools/nodes/<current node>/interestingStats/couch_docs_data_size");
+ AttributeSensor<Integer> COUCH_DOCS_ACTUAL_DISK_SIZE = Sensors.newIntegerSensor("couchbase.stats.couch.docs.actual.disk.size",
+ "Retrieved from pools/nodes/<current node>/interestingStats/couch_docs_actual_disk_size");
+ AttributeSensor<Integer> EP_BG_FETCHED = Sensors.newIntegerSensor("couchbase.stats.ep.bg.fetched",
+ "Retrieved from pools/nodes/<current node>/interestingStats/ep_bg_fetched");
+ AttributeSensor<Integer> MEM_USED = Sensors.newIntegerSensor("couchbase.stats.mem.used",
+ "Retrieved from pools/nodes/<current node>/interestingStats/mem_used");
+ AttributeSensor<Integer> COUCH_VIEWS_ACTUAL_DISK_SIZE = Sensors.newIntegerSensor("couchbase.stats.couch.views.actual.disk.size",
+ "Retrieved from pools/nodes/<current node>/interestingStats/couch_views_actual_disk_size");
+ AttributeSensor<Integer> CURR_ITEMS = Sensors.newIntegerSensor("couchbase.stats.curr.items",
+ "Retrieved from pools/nodes/<current node>/interestingStats/curr_items");
+ AttributeSensor<Integer> VB_REPLICA_CURR_ITEMS = Sensors.newIntegerSensor("couchbase.stats.vb.replica.curr.items",
+ "Retrieved from pools/nodes/<current node>/interestingStats/vb_replica_curr_items");
+ AttributeSensor<Integer> COUCH_VIEWS_DATA_SIZE = Sensors.newIntegerSensor("couchbase.stats.couch.views.data.size",
+ "Retrieved from pools/nodes/<current node>/interestingStats/couch_views_data_size");
+ AttributeSensor<Integer> GET_HITS = Sensors.newIntegerSensor("couchbase.stats.get.hits",
+ "Retrieved from pools/nodes/<current node>/interestingStats/get_hits");
+ AttributeSensor<Integer> CMD_GET = Sensors.newIntegerSensor("couchbase.stats.cmd.get",
+ "Retrieved from pools/nodes/<current node>/interestingStats/cmd_get");
+ AttributeSensor<Integer> CURR_ITEMS_TOT = Sensors.newIntegerSensor("couchbase.stats.curr.items.tot",
+ "Retrieved from pools/nodes/<current node>/interestingStats/curr_items_tot");
+
// this class is added because the ROOT_URL relies on a static initialization which unfortunately
// can't be added to
// an interface.
@@ -10,12 +10,27 @@
import brooklyn.entity.basic.SoftwareProcessImpl;
import brooklyn.event.SensorEvent;
import brooklyn.event.SensorEventListener;
+import brooklyn.event.feed.http.HttpFeed;
+import brooklyn.event.feed.http.HttpPollConfig;
+import brooklyn.event.feed.http.HttpValueFunctions;
+import brooklyn.event.feed.http.JsonFunctions;
import brooklyn.location.MachineProvisioningLocation;
+import brooklyn.location.access.BrooklynAccessUtils;
import brooklyn.location.cloud.CloudLocationConfig;
import brooklyn.util.collections.MutableSet;
import brooklyn.util.config.ConfigBag;
+import brooklyn.util.http.HttpToolResponse;
+
+import com.google.common.base.Function;
+import com.google.common.base.Functions;
+import com.google.common.base.Preconditions;
+import com.google.common.net.HostAndPort;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
public class CouchbaseNodeImpl extends SoftwareProcessImpl implements CouchbaseNode {
+
+ HttpFeed httpFeed;
@Override
public Class<CouchbaseNodeDriver> getDriverInterface() {
@@ -77,11 +92,78 @@ public void rebalance() {
public void connectSensors() {
super.connectSensors();
connectServiceUpIsRunning();
+
+ Function<HttpToolResponse, JsonElement> getThisNodesStats = HttpValueFunctions.chain(
+ HttpValueFunctions.jsonContents(),
+ JsonFunctions.walk("nodes"),
+ new Function<JsonElement, JsonElement>() {
+ @Override public JsonElement apply(JsonElement input) {
+ JsonArray nodes = input.getAsJsonArray();
+ for (JsonElement element : nodes) {
+ if (Boolean.TRUE.equals(element.getAsJsonObject().get("thisNode").getAsBoolean())) {
+ return element.getAsJsonObject().get("interestingStats");
+ }
+ }
+ return null;
+ }}
+ );
+
+ Integer rawPort = getAttribute(CouchbaseNode.COUCHBASE_WEB_ADMIN_PORT);
+ Preconditions.checkNotNull(rawPort, "HTTP_PORT sensors not set for %s; is an acceptable port available?", this);
+ HostAndPort hp = BrooklynAccessUtils.getBrooklynAccessibleAddress(this, rawPort);
+
+ String adminUrl = String.format("http://%s:%s", hp.getHostText(), hp.getPort());
+
+ httpFeed = HttpFeed.builder()
+ .entity(this)
+ .period(1000)
+ .baseUri(adminUrl + "/pools/nodes/")
+ .credentialsIfNotNull(getConfig(CouchbaseNode.COUCHBASE_ADMIN_USERNAME), getConfig(CouchbaseNode.COUCHBASE_ADMIN_PASSWORD))
+ .poll(new HttpPollConfig<Integer>(CouchbaseNode.OPS)
+ .onSuccess(HttpValueFunctions.chain(getThisNodesStats, JsonFunctions.walk("ops"), JsonFunctions.cast(Integer.class)))
+ .onFailureOrException(Functions.<Integer>constant(null)))
+ .poll(new HttpPollConfig<Integer>(CouchbaseNode.COUCH_DOCS_DATA_SIZE)
+ .onSuccess(HttpValueFunctions.chain(getThisNodesStats, JsonFunctions.walk("couch_docs_data_size"), JsonFunctions.cast(Integer.class)))
+ .onFailureOrException(Functions.<Integer>constant(null)))
+ .poll(new HttpPollConfig<Integer>(CouchbaseNode.COUCH_DOCS_ACTUAL_DISK_SIZE)
+ .onSuccess(HttpValueFunctions.chain(getThisNodesStats, JsonFunctions.walk("couch_docs_actual_disk_size"), JsonFunctions.cast(Integer.class)))
+ .onFailureOrException(Functions.<Integer>constant(null)))
+ .poll(new HttpPollConfig<Integer>(CouchbaseNode.EP_BG_FETCHED)
+ .onSuccess(HttpValueFunctions.chain(getThisNodesStats, JsonFunctions.walk("ep_bg_fetched"), JsonFunctions.cast(Integer.class)))
+ .onFailureOrException(Functions.<Integer>constant(null)))
+ .poll(new HttpPollConfig<Integer>(CouchbaseNode.MEM_USED)
+ .onSuccess(HttpValueFunctions.chain(getThisNodesStats, JsonFunctions.walk("mem_used"), JsonFunctions.cast(Integer.class)))
+ .onFailureOrException(Functions.<Integer>constant(null)))
+ .poll(new HttpPollConfig<Integer>(CouchbaseNode.COUCH_VIEWS_ACTUAL_DISK_SIZE)
+ .onSuccess(HttpValueFunctions.chain(getThisNodesStats, JsonFunctions.walk("couch_views_actual_disk_size"), JsonFunctions.cast(Integer.class)))
+ .onFailureOrException(Functions.<Integer>constant(null)))
+ .poll(new HttpPollConfig<Integer>(CouchbaseNode.CURR_ITEMS)
+ .onSuccess(HttpValueFunctions.chain(getThisNodesStats, JsonFunctions.walk("curr_items"), JsonFunctions.cast(Integer.class)))
+ .onFailureOrException(Functions.<Integer>constant(null)))
+ .poll(new HttpPollConfig<Integer>(CouchbaseNode.VB_REPLICA_CURR_ITEMS)
+ .onSuccess(HttpValueFunctions.chain(getThisNodesStats, JsonFunctions.walk("vb_replica_curr_items"), JsonFunctions.cast(Integer.class)))
+ .onFailureOrException(Functions.<Integer>constant(null)))
+ .poll(new HttpPollConfig<Integer>(CouchbaseNode.COUCH_VIEWS_DATA_SIZE)
+ .onSuccess(HttpValueFunctions.chain(getThisNodesStats, JsonFunctions.walk("couch_views_data_size"), JsonFunctions.cast(Integer.class)))
+ .onFailureOrException(Functions.<Integer>constant(null)))
+ .poll(new HttpPollConfig<Integer>(CouchbaseNode.GET_HITS)
+ .onSuccess(HttpValueFunctions.chain(getThisNodesStats, JsonFunctions.walk("get_hits"), JsonFunctions.cast(Integer.class)))
+ .onFailureOrException(Functions.<Integer>constant(null)))
+ .poll(new HttpPollConfig<Integer>(CouchbaseNode.CMD_GET)
+ .onSuccess(HttpValueFunctions.chain(getThisNodesStats, JsonFunctions.walk("cmd_get"), JsonFunctions.cast(Integer.class)))
+ .onFailureOrException(Functions.<Integer>constant(null)))
+ .poll(new HttpPollConfig<Integer>(CouchbaseNode.CURR_ITEMS_TOT)
+ .onSuccess(HttpValueFunctions.chain(getThisNodesStats, JsonFunctions.walk("curr_items_tot"), JsonFunctions.cast(Integer.class)))
+ .onFailureOrException(Functions.<Integer>constant(null)))
+ .build();
}
public void disconnectSensors() {
super.disconnectSensors();
disconnectServiceUpIsRunning();
+ if (httpFeed != null) {
+ httpFeed.stop();
+ }
}

0 comments on commit b651201

Please sign in to comment.