diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..043f8e9 --- /dev/null +++ b/Makefile @@ -0,0 +1,25 @@ +ZMQ_ROOT=/hb +COLLECTD_SRC=../collectd-4.10.2 +COLLECTD_ROOT=/opt/collectd + +LIBTOOL=$(COLLECTD_SRC)/libtool + + +all: .INIT write_zmq.la + +install: all + $(LIBTOOL) --mode=install /usr/bin/install -c write_zmq.la $(COLLECTD_ROOT)/lib/collectd + +clean: + rm -rf .libs + rm -rf build + rm -f write_zmq.la + +.INIT: + mkdir -p build + +write_zmq.la: build/write_zmq.lo build/utils_value_json.lo + $(LIBTOOL) --tag=CC --mode=link gcc -Wall -Werror -O2 -module -avoid-version -L$(ZMQ_ROOT)/lib -o $@ -rpath $(COLLECTD_ROOT)/lib/collectd -lpthread -ldl -lzmq $^ + +%.lo: ../src/%.c + $(LIBTOOL) --mode=compile gcc -DHAVE_CONFIG_H -I src -I $(COLLECTD_SRC)/src -I$(ZMQ_ROOT)/include -Wall -Werror -g -O2 -MD -MP -c -o $@ $< \ No newline at end of file diff --git a/src/utils_value_json.c b/src/utils_value_json.c new file mode 100644 index 0000000..2aec676 --- /dev/null +++ b/src/utils_value_json.c @@ -0,0 +1,291 @@ +/** + * utils_value_json.c + * Copyright (C) 2009 Florian octo Forster + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; only version 2 of the License is applicable. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + * + * Authors: + * Florian octo Forster + **/ + +#include "collectd.h" +#include "plugin.h" +#include "common.h" + +#include "utils_cache.h" +#include "utils_format_json.h" + +static int escape_string (char *buffer, size_t buffer_size, /* {{{ */ + const char *string) +{ + size_t src_pos; + size_t dst_pos; + + if ((buffer == NULL) || (string == NULL)) + return (-EINVAL); + + if (buffer_size < 3) + return (-ENOMEM); + + dst_pos = 0; + +#define BUFFER_ADD(c) do { \ + if (dst_pos >= (buffer_size - 1)) { \ + buffer[buffer_size - 1] = 0; \ + return (-ENOMEM); \ + } \ + buffer[dst_pos] = (c); \ + dst_pos++; \ +} while (0) + + /* Escape special characters */ + BUFFER_ADD ('"'); + for (src_pos = 0; string[src_pos] != 0; src_pos++) + { + if ((string[src_pos] == '"') + || (string[src_pos] == '\\')) + { + BUFFER_ADD ('\\'); + BUFFER_ADD (string[src_pos]); + } + else if (string[src_pos] <= 0x001F) + BUFFER_ADD ('?'); + else + BUFFER_ADD (string[src_pos]); + } /* for */ + BUFFER_ADD ('"'); + buffer[dst_pos] = 0; + +#undef BUFFER_ADD + + return (0); +} /* }}} int buffer_add_string */ + +static int values_to_json (char *buffer, size_t buffer_size, /* {{{ */ + const data_set_t *ds, const value_list_t *vl, int store_rates) +{ + size_t offset = 0; + int i; + gauge_t *rates = NULL; + + memset (buffer, 0, buffer_size); + +#define BUFFER_ADD(...) do { \ + int status; \ + status = ssnprintf (buffer + offset, buffer_size - offset, \ + __VA_ARGS__); \ + if (status < 1) \ + { \ + sfree(rates); \ + return (-1); \ + } \ + else if (((size_t) status) >= (buffer_size - offset)) \ + { \ + sfree(rates); \ + return (-ENOMEM); \ + } \ + else \ + offset += ((size_t) status); \ +} while (0) + + BUFFER_ADD ("["); + for (i = 0; i < ds->ds_num; i++) + { + if (i > 0) + BUFFER_ADD (","); + + if (ds->ds[i].type == DS_TYPE_GAUGE) + { + if(isfinite (vl->values[i].gauge)) + BUFFER_ADD ("%g", vl->values[i].gauge); + else + BUFFER_ADD ("null"); + } + else if (store_rates) + { + if (rates == NULL) + rates = uc_get_rate (ds, vl); + if (rates == NULL) + { + WARNING ("utils_format_json: uc_get_rate failed."); + sfree(rates); + return (-1); + } + + if(isfinite (rates[i])) + BUFFER_ADD ("%g", rates[i]); + else + BUFFER_ADD ("null"); + } + else if (ds->ds[i].type == DS_TYPE_COUNTER) + BUFFER_ADD ("%llu", vl->values[i].counter); + else if (ds->ds[i].type == DS_TYPE_DERIVE) + BUFFER_ADD ("%"PRIi64, vl->values[i].derive); + else if (ds->ds[i].type == DS_TYPE_ABSOLUTE) + BUFFER_ADD ("%"PRIu64, vl->values[i].absolute); + else + { + ERROR ("format_json: Unknown data source type: %i", + ds->ds[i].type); + sfree (rates); + return (-1); + } + } /* for ds->ds_num */ + BUFFER_ADD ("]"); + +#undef BUFFER_ADD + + DEBUG ("format_json: values_to_json: buffer = %s;", buffer); + sfree(rates); + return (0); +} /* }}} int values_to_json */ + +static int dstypes_to_json (char *buffer, size_t buffer_size, /* {{{ */ + const data_set_t *ds, const value_list_t *vl) +{ + size_t offset = 0; + int i; + + memset (buffer, 0, buffer_size); + +#define BUFFER_ADD(...) do { \ + int status; \ + status = ssnprintf (buffer + offset, buffer_size - offset, \ + __VA_ARGS__); \ + if (status < 1) \ + return (-1); \ + else if (((size_t) status) >= (buffer_size - offset)) \ + return (-ENOMEM); \ + else \ + offset += ((size_t) status); \ +} while (0) + + BUFFER_ADD ("["); + for (i = 0; i < ds->ds_num; i++) + { + if (i > 0) + BUFFER_ADD (","); + + BUFFER_ADD ("\"%s\"", DS_TYPE_TO_STRING (ds->ds[i].type)); + } /* for ds->ds_num */ + BUFFER_ADD ("]"); + +#undef BUFFER_ADD + + DEBUG ("format_json: dstypes_to_json: buffer = %s;", buffer); + + return (0); +} /* }}} int dstypes_to_json */ + +static int dsnames_to_json (char *buffer, size_t buffer_size, /* {{{ */ + const data_set_t *ds, const value_list_t *vl) +{ + size_t offset = 0; + int i; + + memset (buffer, 0, buffer_size); + +#define BUFFER_ADD(...) do { \ + int status; \ + status = ssnprintf (buffer + offset, buffer_size - offset, \ + __VA_ARGS__); \ + if (status < 1) \ + return (-1); \ + else if (((size_t) status) >= (buffer_size - offset)) \ + return (-ENOMEM); \ + else \ + offset += ((size_t) status); \ +} while (0) + + BUFFER_ADD ("["); + for (i = 0; i < ds->ds_num; i++) + { + if (i > 0) + BUFFER_ADD (","); + + BUFFER_ADD ("\"%s\"", ds->ds[i].name); + } /* for ds->ds_num */ + BUFFER_ADD ("]"); + +#undef BUFFER_ADD + + DEBUG ("format_json: dsnames_to_json: buffer = %s;", buffer); + + return (0); +} /* }}} int dsnames_to_json */ + +int value_list_to_json (char *buffer, size_t buffer_size, /* {{{ */ + const data_set_t *ds, const value_list_t *vl, int store_rates) +{ + char temp[512]; + size_t offset = 0; + int status; + + memset (buffer, 0, buffer_size); + +#define BUFFER_ADD(...) do { \ + status = ssnprintf (buffer + offset, buffer_size - offset, \ + __VA_ARGS__); \ + if (status < 1) \ + return (-1); \ + else if (((size_t) status) >= (buffer_size - offset)) \ + return (-ENOMEM); \ + else \ + offset += ((size_t) status); \ +} while (0) + + BUFFER_ADD ("{"); + + status = values_to_json (temp, sizeof (temp), ds, vl, store_rates); + if (status != 0) + return (status); + BUFFER_ADD ("\"values\":%s", temp); + + status = dstypes_to_json (temp, sizeof (temp), ds, vl); + if (status != 0) + return (status); + BUFFER_ADD (",\"dstypes\":%s", temp); + + status = dsnames_to_json (temp, sizeof (temp), ds, vl); + if (status != 0) + return (status); + BUFFER_ADD (",\"dsnames\":%s", temp); + + BUFFER_ADD (",\"time\":%lu", (unsigned long) vl->time); + BUFFER_ADD (",\"interval\":%i", vl->interval); + +#define BUFFER_ADD_KEYVAL(key, value) do { \ + status = escape_string (temp, sizeof (temp), (value)); \ + if (status != 0) \ + return (status); \ + BUFFER_ADD (",\"%s\":%s", (key), temp); \ +} while (0) + + BUFFER_ADD_KEYVAL ("host", vl->host); + BUFFER_ADD_KEYVAL ("plugin", vl->plugin); + BUFFER_ADD_KEYVAL ("plugin_instance", vl->plugin_instance); + BUFFER_ADD_KEYVAL ("type", vl->type); + BUFFER_ADD_KEYVAL ("type_instance", vl->type_instance); + + BUFFER_ADD ("}"); + +#undef BUFFER_ADD_KEYVAL +#undef BUFFER_ADD + + DEBUG ("format_json: value_list_to_json: buffer = %s;", buffer); + + return (0); +} /* }}} int value_list_to_json */ + +/* vim: set sw=2 sts=2 et fdm=marker : */ diff --git a/src/utils_value_json.h b/src/utils_value_json.h new file mode 100644 index 0000000..6469523 --- /dev/null +++ b/src/utils_value_json.h @@ -0,0 +1,31 @@ +/** + * collectd - src/utils_format_json.c + * Copyright (C) 2009 Florian octo Forster + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; only version 2 of the License is applicable. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + * + * Authors: + * Florian octo Forster + **/ + +#ifndef UTILS_VALUE_JSON_H +#define UTILS_VALUE_JSON_H 1 + +#include "collectd.h" +#include "plugin.h" + +int value_list_to_json (char *buffer, size_t buffer_size, + const data_set_t *ds, const value_list_t *vl, int store_rates); + +#endif /* UTILS_VALUE_JSON_H */ diff --git a/src/write_zmq.c b/src/write_zmq.c new file mode 100644 index 0000000..00547e6 --- /dev/null +++ b/src/write_zmq.c @@ -0,0 +1,165 @@ +/** + * Basically copied from write_http.c + */ + +#include "collectd.h" +#include "plugin.h" +#include "common.h" +#include "utils_cache.h" +#include "utils_parse_option.h" + +#include +#include + +#include "utils_value_json.h" + +typedef struct { + void *zmq_socket; + pthread_mutex_t send_lock; +} wz_callback_t; + + +static void *zmq_ctx; + + +static void wz_callback_free (void *data) { + wz_callback_t *cb; + if (data == NULL) + return; + + cb = data; + zmq_close(cb->zmq_socket); + sfree (cb); +} + + +static int wz_write(const data_set_t *ds, const value_list_t *vl, + user_data_t *user_data) { + wz_callback_t *cb; + int status; + zmq_msg_t msg; + size_t json_size; + + char json_buf[4096]; + + if (user_data == NULL) + return (-EINVAL); + cb = user_data->data; + + // Format JSON + status = value_list_to_json(json_buf, sizeof (json_buf), ds, vl, 0); + if (status != 0) { + WARNING("write_zmq_plugin: failed to format JSON."); + return -1; + } + + // Init ZMQ message + json_size = strlen(json_buf); + status = zmq_msg_init_size(&msg, json_size); + if (status != 0) { + WARNING("write_zmq_plugin: failed to allocate zmq message: %s.", + zmq_strerror(errno)); + return -1; + } + memcpy(zmq_msg_data(&msg), json_buf, json_size); + + // Need mutex to let the ZMQ socket migrate threads + pthread_mutex_lock (&cb->send_lock); + status = zmq_send(cb->zmq_socket, &msg, 0); + if (status != 0) { + WARNING("write_zmq_plugin: failed to send message: %s.", + zmq_strerror(errno)); + return -1; + } + pthread_mutex_unlock (&cb->send_lock); + + // Discard message + zmq_msg_close(&msg); + + if (status != 0) { + WARNING("write_zmq plugin: zmq_send failed: %s.", + zmq_strerror(errno)); + return -1; + } + return 0; +} + + +int wz_config_url(oconfig_item_t *ci) { + wz_callback_t *cb; + user_data_t user_data; + int status; + + cb = malloc (sizeof (*cb)); + if (cb == NULL) { + ERROR("write_zmq plugin: malloc failed."); + return -1; + } + + memset (cb, 0, sizeof (*cb)); + pthread_mutex_init (&cb->send_lock, NULL); + + if (ci->values_num != 1 || + ci->values[0].type != OCONFIG_TYPE_STRING) { + ERROR("write_zmq plugin: The `%s' config option " + "needs to be a string.", ci->key); + return -1; + } + + cb->zmq_socket = zmq_socket(zmq_ctx, ZMQ_PUB); + if (cb->zmq_socket == NULL) { + ERROR("write_zmq plugin: zmq_socket failed: %s.", + zmq_strerror(errno)); + return -1; + } + + status = zmq_bind(cb->zmq_socket, + ci->values[0].value.string); + if (status != 0) { + ERROR("write_zmq plugin: zmq_bind failed: %s.", + zmq_strerror(errno)); + return -1; + } + + memset (&user_data, 0, sizeof (user_data)); + user_data.data = cb; + user_data.free_func = wz_callback_free; + plugin_register_write ("write_zmq", wz_write, &user_data); + + return 0; +} + + +static int wz_config(oconfig_item_t *ci) { + int i; + + zmq_ctx = zmq_init(1); + if (zmq_ctx == NULL) { + return -1; + } + + for (i = 0; i < ci->children_num; i++) { + oconfig_item_t *child = ci->children + i; + + if (strcasecmp ("URL", child->key) == 0) + wz_config_url (child); + else { + ERROR("write_zmq plugin: Invalid configuration " + "option: %s.", child->key); + } + } + + return 0; +} + + +static int wz_shutdown() { + // zmq_term(zmq_ctx); + return 0; +} + + +void module_register (void) { + plugin_register_shutdown ("write_zmq", wz_shutdown); + plugin_register_complex_config("write_zmq", wz_config); +}