Permalink
Browse files

Initial commit

  • Loading branch information...
1 parent dd37b0f commit 9b1bc5668af32c17c0d0bb1158bcdee43df189fc Jake Luciani committed Oct 2, 2012
Showing with 391 additions and 3 deletions.
  1. +51 −3 README.md
  2. +117 −0 pom.xml
  3. +223 −0 src/main/java/com/bluemountain/RiemannCassandraClient.java
View
@@ -1,4 +1,52 @@
-riemann-cassandra
-=================
+# riemann-cassandra : A tool for emitting Cassandra metrics to Riemann
-riemann tool for cassandra
+## Background
+
+[Riemann] along with [Graphite] make it possible to easily monitor any service.
+
+This tool lets you monitor [Cassandra] by emitting metrics in JMX as Riemann events
+You should run one of these per Cassandra node.
+
+## Usage
+``` bash
+ java -jar riemann-cassandra-0.0.1.jar
+ -riemann_host <arg> #defaults to localhost
+ -riemann_port <arg> #defaults to 5555
+ -cassandra_host <arg> #defaults to localhost
+ -jmx_port <arg> #defaults to 7199
+ -jmx_username <arg> #defaults to null
+ -jmx_password <arg> #defaults to null
+ -interval_seconds <arg> #defaults to 5
+```
+
+## Metrics tracked
+
+ * General
+ * cassandra.heap_committed_mb
+ * cassandra.heap_used_mb
+ * cassandra.exception_count
+ * cassandra.recent_timeouts
+ * cassandra.pending_compactions
+ * cassandra.total_sstable_mb
+
+ * Per ThreadPool
+ * cassandra.tp.active
+ * cassandra.tp.blocked
+ * cassandra.tp.pending
+
+ * Per Keyspace/ColumnFamily
+ * cassandra.db.keys
+ * cassandra.db.mean_row_size
+ * cassandra.db.min_row_size
+ * cassandra.db.max_row_size
+ * cassandra.db.sstable_count
+ * cassandra.db.total_sstable_mb
+ * cassandra.db.total_bloom_mb
+ * cassamdra.db.bloom_fp_rate
+ * cassandra.db.memtable_size_mb
+ * cassandra.db.read_latency
+ * cassandra.db.write_latency
+
+[Cassandra]:http://cassandra.apache.org
+[Riemann]:http://riemann.apache.org
+[Graphite]:http://graphite.wikidot.org
View
@@ -0,0 +1,117 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>com.bluemountain</groupId>
+ <artifactId>riemann-cassandra</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ <name>Riemann tool for streaming cassandra metrics</name>
+
+ <repositories>
+ <repository>
+ <id>boundary-site</id>
+ <url>http://maven.boundary.com/artifactory/repo</url>
+ </repository>
+ </repositories>
+
+ <dependencies>
+ <dependency>
+ <groupId>com.aphyr</groupId>
+ <artifactId>riemann-java-client</artifactId>
+ <version>0.0.3</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.cassandra</groupId>
+ <artifactId>cassandra-all</artifactId>
+ <version>1.1.5</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>2.3.2</version>
+ <configuration>
+ <source>1.6</source>
+ <target>1.6</target>
+ <encoding>UTF-8</encoding>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-source-plugin</artifactId>
+ <version>2.1.2</version>
+ <executions>
+ <execution>
+ <id>attach-sources</id>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-resources-plugin</artifactId>
+ <version>2.5</version>
+ <configuration>
+ <outputDirectory />
+ <encoding>UTF-8</encoding>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>1.6</version>
+ <configuration>
+ <createDependencyReducedPom>true</createDependencyReducedPom>
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ </configuration>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <transformers>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+ <mainClass>com.bluemountain.RiemannCassandraClient</mainClass>
+ </transformer>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-deploy-plugin</artifactId>
+ <version>2.7</version>
+ <configuration>
+ <skip>true</skip>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-site-plugin</artifactId>
+ <version>3.0</version>
+ <configuration>
+ <skip>true</skip>
+ <skipDeploy>true</skipDeploy>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
@@ -0,0 +1,223 @@
+package com.bluemountain;
+
+import java.io.PrintWriter;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutorMBean;
+import org.apache.cassandra.db.ColumnFamilyStoreMBean;
+import org.apache.cassandra.tools.NodeProbe;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.commons.cli.BasicParser;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+
+import com.aphyr.riemann.Proto.Event;
+import com.aphyr.riemann.client.RiemannClient;
+
+/**
+ * Monitoring client that polls Cassandra MBeans for metrics and streams them to
+ * riemann as events
+ *
+ * @author jluciani
+ *
+ */
+public class RiemannCassandraClient {
+
+ private static Options options = null;
+
+ static {
+ options = new Options();
+
+ options.addOption("riemann_host", true, "hostname for riemann server");
+ options.addOption("riemann_port", true, "port number for riemann server");
+ options.addOption("cassandra_host", true, "hostname for cassandra node");
+ options.addOption("jmx_port", true, "port number for jmx on cassandra node");
+ options.addOption("jmx_username", true, "username for cassandra jmx agent");
+ options.addOption("jmx_password", true, "password cassandra jmx agent");
+ options.addOption("interval_seconds", true, "number of seconds between updates");
+ }
+
+ final RiemannClient riemannClient;
+ volatile NodeProbe jmxClient = null;
+
+ final String cassandraHost;
+ final Integer cassandraJmxPort;
+ final String jmxUsername;
+ final String jmxPassword;
+ final Event protoEvent;
+
+ public RiemannCassandraClient(String riemannHost, Integer riemannPort, String cassandraHost, Integer cassandraJmxPort, String jmxUsername,
+ String jmxPassword) {
+
+ this.cassandraHost = cassandraHost;
+ this.cassandraJmxPort = cassandraJmxPort;
+ this.jmxUsername = jmxUsername;
+ this.jmxPassword = jmxPassword;
+
+ protoEvent = Event.newBuilder().setHost(cassandraHost).addTags("cassandra").setState("ok").setTtl(5).build();
+
+ riemannClient = new RiemannClient(new InetSocketAddress(riemannHost, riemannPort));
+
+ if (!reconnectJMX())
+ System.err.println(String.format("Unable to connect to Cassandra JMX (%s:%d) will continue to try silently....", cassandraHost, cassandraJmxPort));
+ }
+
+ private synchronized boolean reconnectJMX() {
+
+ try {
+ if (jmxUsername == null)
+ jmxClient = new NodeProbe(cassandraHost, cassandraJmxPort);
+ else
+ jmxClient = new NodeProbe(cassandraHost, cassandraJmxPort, jmxUsername, jmxPassword);
+
+ return true;
+ } catch (Exception e) {
+ // We silently continue
+ }
+
+ jmxClient = null;
+ return false;
+ }
+
+ private void add(List<Event> events, String name, float val) {
+ events.add(Event.newBuilder(protoEvent).setService(name).setMetricF(val).build());
+ }
+
+ private void add(List<Event> events, String name, float val, String desc) {
+ events.add(Event.newBuilder(protoEvent).setService(name).setMetricF(val).setDescription(desc).build());
+ }
+
+ private long emitColumnFamilyMetrics() {
+ List<Event> events = new ArrayList<Event>();
+
+ Iterator<Entry<String, ColumnFamilyStoreMBean>> it = jmxClient.getColumnFamilyStoreMBeanProxies();
+
+ // CF metrics
+ long totalBytes = 0;
+ while (it.hasNext()) {
+ Entry<String, ColumnFamilyStoreMBean> e = it.next();
+
+ String name = "cassandra.db." + e.getKey() + "." + e.getValue().getColumnFamilyName();
+ ColumnFamilyStoreMBean v = e.getValue();
+
+ add(events, name + ".keys", v.estimateKeys() / 1000);
+ add(events, name + ".total_sstable_mb", v.getLiveDiskSpaceUsed() / (1024 * 1024));
+ add(events, name + ".total_bloom_mb", v.getBloomFilterDiskSpaceUsed() / (1024 * 1024));
+ add(events, name + ".bloom_fp_rate", (float) v.getRecentBloomFilterFalseRatio());
+ add(events, name + ".max_row_size_kb", v.getMaxRowSize() / 1024);
+ add(events, name + ".min_row_size_kb", v.getMinRowSize() / 1024);
+ add(events, name + ".mean_row_size_kb", v.getMeanRowSize() / 1024);
+ add(events, name + ".sstable_count", v.getLiveSSTableCount());
+ add(events, name + ".memtable_size_mb", (float) v.getMemtableDataSize() / (1024 * 1024));
+
+ // latencies can return NaN
+ Float f = (float) v.getRecentReadLatencyMicros();
+ add(events, name + ".read_latency", f.equals(Float.NaN) ? 0.0f : f);
+
+ f = (float) e.getValue().getRecentWriteLatencyMicros();
+ add(events, name + ".write_latency", f.equals(Float.NaN) ? 0.0f : f);
+
+ totalBytes += e.getValue().getLiveDiskSpaceUsed();
+
+ riemannClient.sendEvents(events.toArray(new Event[] {}));
+ events.clear();
+ }
+
+ return totalBytes;
+ }
+
+ private void emitThreadPoolMetrics() {
+
+ List<Event> events = new ArrayList<Event>();
+
+ Iterator<Entry<String, JMXEnabledThreadPoolExecutorMBean>> it = jmxClient.getThreadPoolMBeanProxies();
+ while (it.hasNext()) {
+ Entry<String, JMXEnabledThreadPoolExecutorMBean> p = it.next();
+
+ String name = "cassandra.tp." + p.getKey();
+ JMXEnabledThreadPoolExecutorMBean v = p.getValue();
+
+ add(events, name + ".active", v.getActiveCount());
+ add(events, name + ".pending", v.getPendingTasks());
+ add(events, name + ".blocked", v.getCurrentlyBlockedTasks());
+
+ riemannClient.sendEvents(events.toArray(new Event[] {}));
+ events.clear();
+ }
+ }
+
+ private void emitMetrics() {
+
+ if (jmxClient == null && !reconnectJMX())
+ return;
+
+ try {
+
+ // TP Metrics
+ emitThreadPoolMetrics();
+
+ // CF Metrics
+ long totalSSTableBytes = emitColumnFamilyMetrics();
+
+ // Basic metrics
+ List<Event> events = new ArrayList<Event>();
+
+ add(events, "cassandra.exception_count", jmxClient.getExceptionCount());
+ add(events, "cassandra.heap_used_mb", jmxClient.getHeapMemoryUsage().getUsed() / (1024 * 1024));
+ add(events, "cassandra.heap_max_mb", jmxClient.getHeapMemoryUsage().getMax() / (1024 * 1024));
+ add(events, "cassandra.heap_committed_mb", jmxClient.getHeapMemoryUsage().getCommitted() / (1024 * 1024));
+ add(events, "cassandra.recent_timeouts", jmxClient.msProxy.getRecentTotalTimouts(), FBUtilities.json(jmxClient.msProxy.getRecentTimeoutsPerHost()));
+ add(events, "cassandra.pending_compactions", jmxClient.getCompactionManagerProxy().getPendingTasks());
+ add(events, "cassandra.total_sstable_mb", totalSSTableBytes / (1024 * 1024));
+
+ riemannClient.sendEvents(events.toArray(new Event[] {}));
+ } catch (Throwable t) {
+ // Try again later
+ t.printStackTrace();
+ }
+ }
+
+ public static void printUsage() {
+ final PrintWriter writer = new PrintWriter(System.out);
+ final HelpFormatter usageFormatter = new HelpFormatter();
+ usageFormatter.printUsage(writer, 80, "riemann-cassandra", options);
+ writer.close();
+ }
+
+ public static void main(String[] args) {
+
+ BasicParser parser = new BasicParser();
+ CommandLine cl = null;
+
+ try {
+ cl = parser.parse(options, args);
+ } catch (ParseException e) {
+ printUsage();
+ System.exit(1);
+ }
+
+ // Extracted options
+ String cassandraHost = cl.getOptionValue("cassandra_host", "localhost");
+ String riemannHost = cl.getOptionValue("riemann_host", "localhost");
+ String jmxUsername = cl.getOptionValue("jmx_username");
+ String jmxPassword = cl.getOptionValue("jmx_password");
+ Integer jmxPort = Integer.valueOf(cl.getOptionValue("jmx_port", "7199"));
+ Integer riemannPort = Integer.valueOf(cl.getOptionValue("riemann_port", "5555"));
+ Integer intervalSeconds = Integer.valueOf(cl.getOptionValue("interval_seconds", "5"));
+
+ RiemannCassandraClient cli = new RiemannCassandraClient(riemannHost, riemannPort, cassandraHost, jmxPort, jmxUsername, jmxPassword);
+
+ while (true) {
+ cli.emitMetrics();
+ try {
+ Thread.sleep(intervalSeconds * 1000);
+ } catch (InterruptedException e) {}
+ }
+ }
+}

0 comments on commit 9b1bc56

Please sign in to comment.