Permalink
Browse files

Add configurable graphite path. Fixes issue 87

Deprecate the "graphite_prefix" attribute in favor of "graphite_path"
which is configurable via macros (%s %m %h). Sane defaults ensure that
existing configurations are not broken by this change. This should solve
issue 87 and any future needs in its general direction. I've moved code
relating to graphite and memcached from rrdhelpers.(c|h), to
export_helpers. This is housekeeping that reflects how those features
should have been implemented in my original ganglia patch.
  • Loading branch information...
1 parent 80c26e3 commit 1783bac9a5609f96b6a008021bf54b979691eed7 Dave Josephsen committed Apr 10, 2013
Showing with 341 additions and 248 deletions.
  1. +1 −1 gmetad/Makefile.am
  2. +6 −7 gmetad/conf.c.in
  3. +1 −1 gmetad/conf.h
  4. +292 −0 gmetad/export_helpers.c
  5. +24 −0 gmetad/export_helpers.h
  6. +1 −0 gmetad/gmetad.c
  7. +15 −2 gmetad/gmetad.conf.in
  8. +1 −0 gmetad/process_xml.c
  9. +0 −222 gmetad/rrd_helpers.c
  10. +0 −15 gmetad/rrd_helpers.h
View
@@ -22,7 +22,7 @@ cmdline.c: cmdline.c.in $(FIXCONFIG)
gmetad_SOURCES = gmetad.c cmdline.c.in cmdline.c cmdline.h gmetad.h data_thread.c \
server.c process_xml.c rrd_helpers.c conf.c conf.h type_hash.c \
xml_hash.c cleanup.c rrd_helpers.h daemon_init.c daemon_init.h \
- server_priv.h
+ server_priv.h export_helpers.h export_helpers.h
gmetad_LDADD = $(top_builddir)/lib/libganglia.la -lrrd -lm \
$(GLDADD) $(DEPS_LIBS)
View
@@ -278,22 +278,21 @@ static DOTCONF_CB(cb_carbon_timeout)
return NULL;
}
-static DOTCONF_CB(cb_memcached_parameters)
+static DOTCONF_CB(cb_graphite_prefix)
{
gmetad_config_t *c = (gmetad_config_t*) cmd->option->info;
- debug_msg("Enabling memcached parameters to %s", cmd->data.str);
- c->memcached_parameters = strdup (cmd->data.str);
+ debug_msg("Enabling Graphite proxy to %s", cmd->data.str);
+ c->graphite_prefix = strdup (cmd->data.str);
return NULL;
}
-static DOTCONF_CB(cb_graphite_prefix)
+static DOTCONF_CB(cb_graphite_path)
{
gmetad_config_t *c = (gmetad_config_t*) cmd->option->info;
debug_msg("Enabling Graphite proxy to %s", cmd->data.str);
- c->graphite_prefix = strdup (cmd->data.str);
+ c->graphite_path = strdup (cmd->data.str);
return NULL;
}
-
static DOTCONF_CB(cb_unsummarized_metrics)
{
int i;
@@ -338,8 +337,8 @@ static configoption_t gmetad_options[] =
{"carbon_server", ARG_STR, cb_carbon_server, &gmetad_config, 0},
{"carbon_port", ARG_INT, cb_carbon_port, &gmetad_config, 0},
{"carbon_timeout", ARG_INT, cb_carbon_timeout, &gmetad_config, 0},
- {"memcached_parameters", ARG_STR, cb_memcached_parameters, &gmetad_config, 0},
{"graphite_prefix", ARG_STR, cb_graphite_prefix, &gmetad_config, 0},
+ {"graphite_path", ARG_STR, cb_graphite_path, &gmetad_config, 0},
{"unsummarized_metrics", ARG_LIST, cb_unsummarized_metrics, &gmetad_config, 0},
LAST_OPTION
};
View
@@ -21,8 +21,8 @@ typedef struct
char *carbon_server;
int carbon_port;
int carbon_timeout;
- char *memcached_parameters;
char *graphite_prefix;
+ char *graphite_path;
int scalable_mode;
int write_rrds;
int all_trusted;
View
@@ -0,0 +1,292 @@
+#include <ctype.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <rrd.h>
+#include <gmetad.h>
+#include <errno.h>
+#include <pthread.h>
+#include <time.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <netdb.h>
+#include <sys/poll.h>
+
+#ifdef WITH_MEMCACHED
+#include <libmemcached-1.0/memcached.h>
+#include <libmemcachedutil-1.0/util.h>
+#endif /* WITH_MEMCACHED */
+
+#include "export_helpers.h"
+
+#define PATHSIZE 4096
+extern gmetad_config_t gmetad_config;
+
+
+void init_sockaddr (struct sockaddr_in *name, const char *hostname, uint16_t port)
+{
+ struct hostent *hostinfo;
+
+ name->sin_family = AF_INET;
+ name->sin_port = htons (port);
+ hostinfo = gethostbyname (hostname);
+ if (hostinfo == NULL)
+ {
+ fprintf (stderr, "Unknown host %s.\n", hostname);
+ exit (EXIT_FAILURE);
+ }
+ name->sin_addr = *(struct in_addr *) hostinfo->h_addr;
+}
+
+static int
+push_data_to_carbon( char *graphite_msg)
+{
+ int port;
+ int carbon_socket;
+ struct sockaddr_in server;
+ int carbon_timeout ;
+ int nbytes;
+ struct pollfd carbon_struct_poll;
+ int poll_rval;
+ int fl;
+
+ if (gmetad_config.carbon_timeout)
+ carbon_timeout=gmetad_config.carbon_timeout;
+ else
+ carbon_timeout = 500;
+
+ if (gmetad_config.carbon_port)
+ port=gmetad_config.carbon_port;
+ else
+ port=2003;
+
+
+ debug_msg("Carbon Proxy:: sending \'%s\' to %s", graphite_msg, gmetad_config.carbon_server);
+
+ /* Create a socket. */
+ carbon_socket = socket (PF_INET, SOCK_STREAM, 0);
+ if (carbon_socket < 0)
+ {
+ err_msg("socket (client): %s", strerror(errno));
+ close (carbon_socket);
+ return EXIT_FAILURE;
+ }
+
+ /* Set the socket to not block */
+ fl = fcntl(carbon_socket,F_GETFL,0);
+ fcntl(carbon_socket,F_SETFL,fl | O_NONBLOCK);
+
+ /* Connect to the server. */
+ init_sockaddr (&server, gmetad_config.carbon_server, port);
+ connect (carbon_socket, (struct sockaddr *) &server, sizeof (server));
+
+ /* Start Poll */
+ carbon_struct_poll.fd=carbon_socket;
+ carbon_struct_poll.events = POLLOUT;
+ poll_rval = poll( &carbon_struct_poll, 1, carbon_timeout ); // default timeout .5s
+
+ /* Send data to the server when the socket becomes ready */
+ if( poll_rval < 0 ) {
+ debug_msg("carbon proxy:: poll() error");
+ } else if ( poll_rval == 0 ) {
+ debug_msg("carbon proxy:: Timeout connecting to %s",gmetad_config.carbon_server);
+ } else {
+ if( carbon_struct_poll.revents & POLLOUT ) {
+ /* Ready to send data to the server. */
+ debug_msg("carbon proxy:: %s is ready to receive",gmetad_config.carbon_server);
+ nbytes = write (carbon_socket, graphite_msg, strlen(graphite_msg) + 1);
+ if (nbytes < 0) {
+ err_msg("write: %s", strerror(errno));
+ close(carbon_socket);
+ return EXIT_FAILURE;
+ }
+ } else if ( carbon_struct_poll.revents & POLLHUP ) {
+ debug_msg("carbon proxy:: Recvd an RST from %s during transmission",gmetad_config.carbon_server);
+ close(carbon_socket);
+ return EXIT_FAILURE;
+ } else if ( carbon_struct_poll.revents & POLLERR ) {
+ debug_msg("carbon proxy:: Recvd an POLLERR from %s during transmission",gmetad_config.carbon_server);
+ close(carbon_socket);
+ return EXIT_FAILURE;
+ }
+ }
+ close (carbon_socket);
+ return EXIT_SUCCESS;
+}
+
+#ifdef WITH_MEMCACHED
+#define MEMCACHED_MAX_KEY_LENGTH 250 /* Maximum allowed by memcached */
+int
+write_data_to_memcached ( const char *cluster, const char *host, const char *metric,
+ const char *sum, unsigned int process_time, unsigned int expiry )
+{
+ time_t expiry_time;
+ char s_path[MEMCACHED_MAX_KEY_LENGTH];
+ if (strlen(cluster) + strlen(host) + strlen(metric) + 3 > MEMCACHED_MAX_KEY_LENGTH) {
+ debug_msg("Cluster + host + metric + 3 > %d", MEMCACHED_MAX_KEY_LENGTH);
+ return EXIT_FAILURE;
+ }
+ sprintf(s_path, "%s/%s/%s", cluster, host, metric);
+
+ if (expiry != 0) {
+ expiry_time = expiry;
+ } else {
+ expiry_time = (time_t) 0;
+ }
+
+ memcached_return_t rc;
+ memcached_st *memc = memcached_pool_pop(memcached_connection_pool, false, &rc);
+ if (rc != MEMCACHED_SUCCESS) {
+ debug_msg("Unable to retrieve a memcached connection from the pool");
+ return EXIT_FAILURE;
+ }
+ rc = memcached_set(memc, s_path, strlen(s_path), sum, strlen(sum), expiry_time, (uint32_t)0);
+ if (rc != MEMCACHED_SUCCESS) {
+ debug_msg("Unable to push %s value %s to the memcached server(s) - %s", s_path, sum, memcached_strerror(memc, rc));
+ memcached_pool_push(memcached_connection_pool, memc);
+ return EXIT_FAILURE;
+ } else {
+ debug_msg("Pushed %s value %s to the memcached server(s)", s_path, sum);
+ memcached_pool_push(memcached_connection_pool, memc);
+ return EXIT_SUCCESS;
+ }
+}
+#endif /* WITH_MEMCACHED */
+
+/* This function replaces the macros (%s, %m etc..) in the given graphite_string*/
+char *
+path_macro_replace(char *path, graphite_path_macro *patrn)
+{
+ char *final=malloc(PATHSIZE); //heap-side so we can pass it back
+ char path_cp[PATHSIZE]; //copy of path so we can clobber it
+ char *prefix;
+ char *suffix;
+ char *offset;
+
+ strncpy(final, path, PATHSIZE);
+ strncpy(path_cp, path, PATHSIZE);
+ for(int i=0; patrn[i].torepl != 0; i++){
+ while((offset = strstr(path_cp, patrn[i].torepl)))
+ {
+ prefix=path_cp; //pointer to the beginning of path_cp (for clarity)
+ suffix=offset+(strlen(patrn[i].torepl));// get a pointer to after patrn
+ *offset='\0'; // split the path_cp string at the first byte of patrn
+ snprintf(final,PATHSIZE,"%s%s%s",prefix,patrn[i].replwith,suffix); //build a new final from the pieces
+ strncpy(path_cp, final,PATHSIZE);
+ }
+ }
+ return final;
+}
+
+int
+write_data_to_carbon ( const char *source, const char *host, const char *metric,
+ const char *sum, unsigned int process_time )
+{
+
+ int hostlen=strlen(host);
+ char hostcp[hostlen+1];
+ int sourcelen=strlen(source);
+ char sourcecp[sourcelen+1];
+ int metriclen=strlen(metric);
+ char metriccp[metriclen+1];
+ char s_process_time[15];
+ char graphite_msg[ PATHSIZE + 1 ];
+ int i;
+
+ /* if process_time is undefined, we set it to the current time */
+ if (!process_time) process_time = time(0);
+ sprintf(s_process_time, "%u", process_time);
+
+ /* prepend everything with graphite_prefix if it's set */
+ if (gmetad_config.graphite_prefix != NULL && strlen(gmetad_config.graphite_prefix) > 1) {
+ strncpy(graphite_msg, gmetad_config.graphite_prefix, PATHSIZE);
+ }
+
+ /*prep the source name*/
+ if (source) {
+
+ /* find and replace space for _ in the sourcename*/
+ for(i=0; i<=sourcelen; i++){
+ if ( source[i] == ' ') {
+ sourcecp[i]='_';
+ }else{
+ sourcecp[i]=source[i];
+ }
+ }
+ sourcecp[i+1]=0;
+ }
+
+
+ /* prep the host name*/
+ if (host) {
+ /* find and replace . for _ in the hostname*/
+ for(i=0; i<=hostlen; i++){
+ if ( host[i] == '.') {
+ hostcp[i]='_';
+ }else{
+ hostcp[i]=host[i];
+ }
+ }
+ hostcp[i+1]=0;
+ i = strlen(graphite_msg);
+ if(gmetad_config.case_sensitive_hostnames == 0) {
+ /* Convert the hostname to lowercase */
+ for( ; graphite_msg[i] != 0; i++)
+ graphite_msg[i] = tolower(graphite_msg[i]);
+ }
+ }
+
+ /*if graphite_path is set, then process it*/
+ if (gmetad_config.graphite_path != NULL && strlen(gmetad_config.graphite_path) > 1) {
+ graphite_path_macro patrn[4]; //macros we need to replace in graphite_path
+ char graphite_path_cp[ PATHSIZE + 1 ]; //copy of graphite_path
+ char *graphite_path_ptr; //a pointer to catch returns from path_macro_replace()
+ strncpy(graphite_path_cp,gmetad_config.graphite_path,PATHSIZE);
+
+ patrn[0].torepl="%s";
+ patrn[0].replwith=sourcecp;
+ patrn[1].torepl="%h";
+ patrn[1].replwith=hostcp;
+ patrn[2].torepl="%m";
+ patrn[2].replwith=metriccp;
+ patrn[3].torepl='\0'; //explicitly cap the array
+
+ graphite_path_ptr=path_macro_replace(graphite_path_cp, patrn);
+ strncpy(graphite_path_cp,graphite_path_ptr,PATHSIZE);
+ free(graphite_path_ptr);//malloc'd in path_macro_replace()
+
+ /* add the graphite_path to graphite_msg (with a dot first if prefix exists) */
+ if (gmetad_config.graphite_prefix != NULL && strlen(gmetad_config.graphite_prefix) > 1) {
+ strncat(graphite_msg, ".", PATHSIZE-strlen(graphite_msg));
+ strncat(graphite_msg, graphite_path_cp, PATHSIZE-strlen(graphite_msg));
+ } else {
+ strncpy(graphite_msg, sourcecp, PATHSIZE);
+ }
+
+ }else{ /* no graphite_path specified, so do things the old way */
+ if (gmetad_config.graphite_prefix != NULL && strlen(gmetad_config.graphite_prefix) > 1) {
+ strncat(graphite_msg, ".", PATHSIZE-strlen(graphite_msg));
+ strncat(graphite_msg, sourcecp, PATHSIZE-strlen(graphite_msg));
+ } else {
+ strncpy(graphite_msg, sourcecp, PATHSIZE);
+ }
+ strncat(graphite_msg, ".", PATHSIZE-strlen(graphite_msg));
+ strncat(graphite_msg, hostcp, PATHSIZE-strlen(graphite_msg));
+ strncat(graphite_msg, ".", PATHSIZE-strlen(graphite_msg));
+ strncat(graphite_msg, metric, PATHSIZE-strlen(graphite_msg));
+ }
+
+ /* finish off with the value and date (space separated) */
+ strncat(graphite_msg, " ", PATHSIZE-strlen(graphite_msg));
+ strncat(graphite_msg, sum, PATHSIZE-strlen(graphite_msg));
+ strncat(graphite_msg, " ", PATHSIZE-strlen(graphite_msg));
+ strncat(graphite_msg, s_process_time, PATHSIZE-strlen(graphite_msg));
+ strncat(graphite_msg, "\n", PATHSIZE-strlen(graphite_msg));
+
+ graphite_msg[strlen(graphite_msg)+1] = 0;
+ return push_data_to_carbon( graphite_msg );
+}
View
@@ -0,0 +1,24 @@
+#include "ganglia.h"
+
+#ifdef WITH_MEMCACHED
+#include <libmemcached-1.0/memcached.h>
+#include <libmemcachedutil-1.0/util.h>
+#endif /* WITH_MEMCACHED */
+
+/* Tracks the macros we need to interpret in the graphite_path */
+typedef struct
+{
+ char *torepl;
+ char *replwith;
+}
+graphite_path_macro;
+
+#ifdef WITH_MEMCACHED
+int
+write_data_to_memcached ( const char *cluster, const char *host, const char *metric,
+ const char *sum, unsigned int process_time, unsigned int expiry );
+#endif /* WITH_MEMCACHED */
+
+int
+write_data_to_carbon ( const char *source, const char *host, const char *metric,
+ const char *sum, unsigned int process_time);
View
@@ -16,6 +16,7 @@
#include "update_pidfile.h"
#include "rrd_helpers.h"
+#include "export_helpers.h"
#define METADATA_SLEEP_RANDOMIZE 5.0
#define METADATA_MINIMUM_SLEEP 1
Oops, something went wrong.

0 comments on commit 1783bac

Please sign in to comment.