diff --git a/src/collectd.conf.pod b/src/collectd.conf.pod index 27c4e16ec3..4938bf9d36 100644 --- a/src/collectd.conf.pod +++ b/src/collectd.conf.pod @@ -8184,6 +8184,85 @@ more than one DS. =back +=head3 Note about rules and metadata + +The C plugin recognizes metadata : + +=over 4 + +=item B B + +If set, metric name will be prefixed with B. + +Note : B also applies when you define B. + +=item B B + +When you define B, the metric name is not calculated but is B (with prefix if you defined B). + +=item B B + +=item B B + +=item B B + +=item B B + +=item B B + +If a B is defined, the related item is removed from the metric name. If B is not empty, +a tag will be added with key B and value the item. + +=item B B + +When you define a B, a B=B key-value pair tag is added. The +key comes from B and would be B here. Note that you cannot define the same tag twice. +Examples: + + MetaDataSet "tsdb_tag_add_k1" "v1" + MetaDataSet "tsdb_tag_add_k2" "v2" + MetaDataSet "tsdb_tag_add_k3" "v3" + +=back + +Example without B : + + cpu.1.cpu.user + +Same example with : + tsdb_tag_pluginInstance = "cpu" + tsdb_tag_type = "" + +We get : + cpu.user cpu=1 + +In this example, pluginInstance (named "1") is removed from the metric name and is set as a tag named "cpu". +The type (named "cpu") is removed because B is defined but is empty. + +You can define tags with filter rules and target_set : + + + + + Plugin "^cpu$" + + + MetaDataSet "tsdb_tag_pluginInstance" "cpu" + MetaDataSet "tsdb_tag_type" "" + + + + + Host "^" + + + MetaDataSet "tsdb_prefix" "sys." + + + + +This example will replace (for example) "cpu.1.cpu.user" with "sys.cpu.user cpu=1" + =head2 Plugin C The I will send values to I, a schema-less diff --git a/src/write_tsdb.c b/src/write_tsdb.c index 0c87c473d7..fd26e72440 100644 --- a/src/write_tsdb.c +++ b/src/write_tsdb.c @@ -28,9 +28,11 @@ * write_tsdb Authors: * Brett Hawn * Kevin Bowling + * Yves Mettier **/ /* write_tsdb plugin configuation example + * -------------------------------------- * * * @@ -39,6 +41,92 @@ * HostTags "status=production deviceclass=www" * * + * + * write_tsdb meta_data + * -------------------- + * - tsdb_prefix : Will prefix the OpenTSDB (also prefix tsdb_id if + * defined) + * - tsdb_id : Replace the metric with this tag + * + * - tsdb_tag_plugin : When defined, tsdb_tag_* removes the related + * - tsdb_tag_pluginInstance : item from metric id. + * - tsdb_tag_type : If it is not empty, it will be the key of an + * - tsdb_tag_typeInstance : opentsdb tag (the value is the item itself) + * - tsdb_tag_dsname : If it is empty, no tag is defined. + * + * - tsdb_tag_add_* : Should contain "tagv". Il will add a tag. + * : The key tagk comes from the tsdb_tag_add_* + * : tag. Example : tsdb_tag_add_status adds a tag + * : named 'status'. + * : It will be sent as is to the TSDB server. + * + * write_tsdb plugin filter rules example + * -------------------------------------- + * + * + * + * + * Plugin "^cpu$" + * + * + * MetaDataSet "tsdb_tag_pluginInstance" "cpu" + * MetaDataSet "tsdb_tag_type" "" + * MetaDataSet "tsdb_prefix" "sys." + * + * + * + * + * Plugin "^df$" + * + * + * MetaDataSet "tsdb_tag_pluginInstance" "mount" + * MetaDataSet "tsdb_tag_type" "" + * MetaDataSet "tsdb_prefix" "sys." + * + * + * + * + * Plugin "^disk$" + * + * + * MetaDataSet "tsdb_tag_pluginInstance" "disk" + * MetaDataSet "tsdb_prefix" "sys." + * + * + * + * + * Plugin "^interface$" + * + * + * MetaDataSet "tsdb_tag_pluginInstance" "iface" + * MetaDataSet "tsdb_prefix" "sys." + * + * + * + * + * Plugin "^loac$" + * + * + * MetaDataSet "tsdb_tag_type" "" + * MetaDataSet "tsdb_prefix" "sys." + * + * + * + * + * Plugin "^swap$" + * + * + * MetaDataSet "tsdb_prefix" "sys." + * + * + * + * + * IMPORTANT WARNING + * ----------------- + * OpenTSDB allows no more than 8 tags. + * Collectd admins should be aware of this when defining filter rules and host + * tags. + * */ #include "collectd.h" @@ -67,6 +155,16 @@ #define WT_SEND_BUF_SIZE 1428 #endif +/* Meta data definitions about tsdb tags */ +#define TSDB_TAG_PLUGIN 0 +#define TSDB_TAG_PLUGININSTANCE 1 +#define TSDB_TAG_TYPE 2 +#define TSDB_TAG_TYPEINSTANCE 3 +#define TSDB_TAG_DSNAME 4 +static const char *meta_tag_metric_id[] = { + "tsdb_tag_plugin", "tsdb_tag_pluginInstance", "tsdb_tag_type", + "tsdb_tag_typeInstance", "tsdb_tag_dsname"}; + /* * Private variables */ @@ -86,6 +184,8 @@ struct wt_callback { cdtime_t send_buf_init_time; pthread_mutex_t send_lock; + + _Bool connect_failed_log_enabled; }; /* @@ -160,8 +260,11 @@ static int wt_callback_init(struct wt_callback *cb) { status = getaddrinfo(node, service, &ai_hints, &ai_list); if (status != 0) { - ERROR("write_tsdb plugin: getaddrinfo (%s, %s) failed: %s", node, service, - gai_strerror(status)); + if (cb->connect_failed_log_enabled) { + ERROR("write_tsdb plugin: getaddrinfo (%s, %s) failed: %s", node, service, + gai_strerror(status)); + cb->connect_failed_log_enabled = 0; + } return -1; } @@ -188,13 +291,20 @@ static int wt_callback_init(struct wt_callback *cb) { freeaddrinfo(ai_list); if (cb->sock_fd < 0) { - char errbuf[1024]; - ERROR("write_tsdb plugin: Connecting to %s:%s failed. " - "The last error was: %s", - node, service, sstrerror(errno, errbuf, sizeof(errbuf))); + if (cb->connect_failed_log_enabled) { + char errbuf[1024]; + ERROR("write_tsdb plugin: Connecting to %s:%s failed. " + "The last error was: %s", + node, service, sstrerror(errno, errbuf, sizeof(errbuf))); + cb->connect_failed_log_enabled = 0; + } return -1; } + if (0 == cb->connect_failed_log_enabled) { + WARNING("write_tsdb plugin: Connecting to %s:%s succeeded.", node, service); + cb->connect_failed_log_enabled = 1; + } wt_reset_buffer(cb); return 0; @@ -240,7 +350,11 @@ static int wt_flush(cdtime_t timeout, if (cb->sock_fd < 0) { status = wt_callback_init(cb); if (status != 0) { - ERROR("write_tsdb plugin: wt_callback_init failed."); + if (cb->connect_failed_log_enabled || cb->sock_fd >= 0) { + /* Do not log if socket is not enabled : it was logged already + * in wt_callback_init(). */ + ERROR("write_tsdb plugin: wt_callback_init failed."); + } pthread_mutex_unlock(&cb->send_lock); return -1; } @@ -306,12 +420,145 @@ static int wt_format_values(char *ret, size_t ret_len, int ds_num, return 0; } +static int wt_format_tags(char *ret, int ret_len, const value_list_t *vl, + const struct wt_callback *cb, const char *ds_name) { + int status; + char *temp = NULL; + char *ptr = ret; + size_t remaining_len = ret_len; + char **meta_toc; + int i, n; +#define TSDB_META_TAG_ADD_PREFIX "tsdb_tag_add_" + +#define TSDB_META_DATA_GET_STRING(tag) \ + do { \ + temp = NULL; \ + status = meta_data_get_string(vl->meta, tag, &temp); \ + if (status == -ENOENT) { \ + temp = NULL; \ + /* defaults to empty string */ \ + } else if (status < 0) { \ + sfree(temp); \ + return status; \ + } \ + } while (0) + +#define TSDB_STRING_APPEND_SPRINTF(key, value) \ + do { \ + int n; \ + const char *k = (key); \ + const char *v = (value); \ + if (k[0] != '\0' && v[0] != '\0') { \ + n = ssnprintf(ptr, remaining_len, " %s=%s", k, v); \ + if (n >= remaining_len) { \ + ptr[0] = '\0'; \ + } else { \ + char *ptr2 = ptr + 1; \ + while (NULL != (ptr2 = strchr(ptr2, ' '))) \ + ptr2[0] = '_'; \ + ptr += n; \ + remaining_len -= n; \ + } \ + } \ + } while (0) + + if (vl->meta) { + TSDB_META_DATA_GET_STRING(meta_tag_metric_id[TSDB_TAG_PLUGIN]); + if (temp) { + TSDB_STRING_APPEND_SPRINTF(temp, vl->plugin); + sfree(temp); + } + + TSDB_META_DATA_GET_STRING(meta_tag_metric_id[TSDB_TAG_PLUGININSTANCE]); + if (temp) { + TSDB_STRING_APPEND_SPRINTF(temp, vl->plugin_instance); + sfree(temp); + } + + TSDB_META_DATA_GET_STRING(meta_tag_metric_id[TSDB_TAG_TYPE]); + if (temp) { + TSDB_STRING_APPEND_SPRINTF(temp, vl->type); + sfree(temp); + } + + TSDB_META_DATA_GET_STRING(meta_tag_metric_id[TSDB_TAG_TYPEINSTANCE]); + if (temp) { + TSDB_STRING_APPEND_SPRINTF(temp, vl->type_instance); + sfree(temp); + } + + if (ds_name) { + TSDB_META_DATA_GET_STRING(meta_tag_metric_id[TSDB_TAG_DSNAME]); + if (temp) { + TSDB_STRING_APPEND_SPRINTF(temp, ds_name); + sfree(temp); + } + } + + n = meta_data_toc(vl->meta, &meta_toc); + for (i = 0; i < n; i++) { + if (strncmp(meta_toc[i], TSDB_META_TAG_ADD_PREFIX, + sizeof(TSDB_META_TAG_ADD_PREFIX) - 1)) { + free(meta_toc[i]); + continue; + } + if ('\0' == meta_toc[i][sizeof(TSDB_META_TAG_ADD_PREFIX) - 1]) { + ERROR("write_tsdb plugin: meta_data tag '%s' is unknown (host=%s, " + "plugin=%s, type=%s)", + temp, vl->host, vl->plugin, vl->type); + free(meta_toc[i]); + continue; + } + + TSDB_META_DATA_GET_STRING(meta_toc[i]); + if (temp && temp[0]) { + int n; + char *key = meta_toc[i] + sizeof(TSDB_META_TAG_ADD_PREFIX) - 1; + + n = ssnprintf(ptr, remaining_len, " %s=%s", key, temp); + if (n >= remaining_len) { + ptr[0] = '\0'; + } else { + /* We do not check the tags syntax here. It should have + * been done earlier. + */ + ptr += n; + remaining_len -= n; + } + } + if (temp) + sfree(temp); + free(meta_toc[i]); + } + if (meta_toc) + free(meta_toc); + + } else { + ret[0] = '\0'; + } + +#undef TSDB_META_DATA_GET_STRING +#undef TSDB_STRING_APPEND_SPRINTF + + return 0; +} + static int wt_format_name(char *ret, int ret_len, const value_list_t *vl, const struct wt_callback *cb, const char *ds_name) { int status; + int i; char *temp = NULL; - const char *prefix = ""; + char *prefix = NULL; const char *meta_prefix = "tsdb_prefix"; + char *tsdb_id = NULL; + const char *meta_id = "tsdb_id"; + + _Bool include_in_id[] = { + /* plugin = */ 1, + /* plugin instance = */ (vl->plugin_instance[0] == '\0') ? 0 : 1, + /* type = */ 1, + /* type instance = */ (vl->type_instance[0] == '\0') ? 0 : 1, + /* ds_name = */ (ds_name == NULL) ? 0 : 1}; if (vl->meta) { status = meta_data_get_string(vl->meta, meta_prefix, &temp); @@ -323,52 +570,94 @@ static int wt_format_name(char *ret, int ret_len, const value_list_t *vl, } else { prefix = temp; } - } - if (ds_name != NULL) { - if (vl->plugin_instance[0] == '\0') { - if (vl->type_instance[0] == '\0') { - ssnprintf(ret, ret_len, "%s%s.%s.%s", prefix, vl->plugin, vl->type, - ds_name); - } else { - ssnprintf(ret, ret_len, "%s%s.%s.%s.%s", prefix, vl->plugin, vl->type, - vl->type_instance, ds_name); - } - } else { /* vl->plugin_instance != "" */ - if (vl->type_instance[0] == '\0') { - ssnprintf(ret, ret_len, "%s%s.%s.%s.%s", prefix, vl->plugin, - vl->plugin_instance, vl->type, ds_name); - } else { - ssnprintf(ret, ret_len, "%s%s.%s.%s.%s.%s", prefix, vl->plugin, - vl->plugin_instance, vl->type, vl->type_instance, ds_name); - } + status = meta_data_get_string(vl->meta, meta_id, &temp); + if (status == -ENOENT) { + /* defaults to empty string */ + } else if (status < 0) { + sfree(temp); + return status; + } else { + tsdb_id = temp; } - } else { /* ds_name == NULL */ - if (vl->plugin_instance[0] == '\0') { - if (vl->type_instance[0] == '\0') { - ssnprintf(ret, ret_len, "%s%s.%s", prefix, vl->plugin, vl->type); - } else { - ssnprintf(ret, ret_len, "%s%s.%s.%s", prefix, vl->plugin, - vl->type_instance, vl->type); - } - } else { /* vl->plugin_instance != "" */ - if (vl->type_instance[0] == '\0') { - ssnprintf(ret, ret_len, "%s%s.%s.%s", prefix, vl->plugin, - vl->plugin_instance, vl->type); + + for (i = 0; i < (sizeof(meta_tag_metric_id) / sizeof(*meta_tag_metric_id)); + i++) { + if (meta_data_exists(vl->meta, meta_tag_metric_id[i]) == 0) { + /* defaults to already initialized format */ } else { - ssnprintf(ret, ret_len, "%s%s.%s.%s.%s", prefix, vl->plugin, - vl->plugin_instance, vl->type, vl->type_instance); + include_in_id[i] = 0; } } } + if (tsdb_id) { + ssnprintf(ret, ret_len, "%s%s", prefix ? prefix : "", tsdb_id); + } else { +#define TSDB_STRING_APPEND_STRING(string) \ + do { \ + const char *str = (string); \ + size_t len = strlen(str); \ + if (len > (remaining_len - 1)) { \ + ptr[0] = '\0'; \ + return (-ENOSPC); \ + } \ + if (len > 0) { \ + memcpy(ptr, str, len); \ + ptr += len; \ + remaining_len -= len; \ + } \ + } while (0) - sfree(temp); +#define TSDB_STRING_APPEND_DOT \ + do { \ + if (remaining_len > 2) { \ + ptr[0] = '.'; \ + ptr++; \ + remaining_len--; \ + } else { \ + ptr[0] = '\0'; \ + return (-ENOSPC); \ + } \ + } while (0) + + char *ptr = ret; + size_t remaining_len = ret_len; + if (prefix) { + TSDB_STRING_APPEND_STRING(prefix); + } + if (include_in_id[TSDB_TAG_PLUGIN]) { + TSDB_STRING_APPEND_STRING(vl->plugin); + } + + if (include_in_id[TSDB_TAG_PLUGININSTANCE]) { + TSDB_STRING_APPEND_DOT; + TSDB_STRING_APPEND_STRING(vl->plugin_instance); + } + if (include_in_id[TSDB_TAG_TYPE]) { + TSDB_STRING_APPEND_DOT; + TSDB_STRING_APPEND_STRING(vl->type); + } + if (include_in_id[TSDB_TAG_TYPEINSTANCE]) { + TSDB_STRING_APPEND_DOT; + TSDB_STRING_APPEND_STRING(vl->type_instance); + } + if (include_in_id[TSDB_TAG_DSNAME]) { + TSDB_STRING_APPEND_DOT; + TSDB_STRING_APPEND_STRING(ds_name); + } + ptr[0] = '\0'; +#undef TSDB_STRING_APPEND_STRING +#undef TSDB_STRING_APPEND_DOT + } + + sfree(tsdb_id); + sfree(prefix); return 0; } -static int wt_send_message(const char *key, const char *value, cdtime_t time, - struct wt_callback *cb, const char *host, - meta_data_t *md) { +static int wt_send_message(const char *key, const char *value, + const char *value_tags, cdtime_t time, + struct wt_callback *cb, const value_list_t *vl) { int status; size_t message_len; char *temp = NULL; @@ -376,6 +665,11 @@ static int wt_send_message(const char *key, const char *value, cdtime_t time, char message[1024]; const char *host_tags = cb->host_tags ? cb->host_tags : ""; const char *meta_tsdb = "tsdb_tags"; + const char *host = vl->host; + meta_data_t *md = vl->meta; + + const char *node = cb->node ? cb->node : WT_DEFAULT_NODE; + const char *service = cb->service ? cb->service : WT_DEFAULT_SERVICE; /* skip if value is NaN */ if (value[0] == 'n') @@ -386,27 +680,27 @@ static int wt_send_message(const char *key, const char *value, cdtime_t time, if (status == -ENOENT) { /* defaults to empty string */ } else if (status < 0) { - ERROR("write_tsdb plugin: tags metadata get failure"); + ERROR("write_tsdb plugin (%s:%s): tags metadata get failure", node, + service); sfree(temp); - pthread_mutex_unlock(&cb->send_lock); return status; } else { tags = temp; } } - status = - ssnprintf(message, sizeof(message), "put %s %.0f %s fqdn=%s %s %s\r\n", - key, CDTIME_T_TO_DOUBLE(time), value, host, tags, host_tags); + status = ssnprintf( + message, sizeof(message), "put %s %.0f %s fqdn=%s %s %s %s\r\n", key, + CDTIME_T_TO_DOUBLE(time), value, host, value_tags, tags, host_tags); sfree(temp); if (status < 0) return -1; message_len = (size_t)status; if (message_len >= sizeof(message)) { - ERROR("write_tsdb plugin: message buffer too small: " + ERROR("write_tsdb plugin(%s:%s): message buffer too small: " "Need %zu bytes.", - message_len + 1); + node, service, message_len + 1); return -1; } @@ -415,7 +709,13 @@ static int wt_send_message(const char *key, const char *value, cdtime_t time, if (cb->sock_fd < 0) { status = wt_callback_init(cb); if (status != 0) { - ERROR("write_tsdb plugin: wt_callback_init failed."); + if (cb->connect_failed_log_enabled || cb->sock_fd >= 0) { + /* Do not log if socket is not enabled : it was logged already + * in wt_callback_init(). */ + ERROR("write_tsdb plugin (%s:%s): wt_callback_init failed.", node, + service); + cb->connect_failed_log_enabled = 0; + } pthread_mutex_unlock(&cb->send_lock); return -1; } @@ -438,8 +738,8 @@ static int wt_send_message(const char *key, const char *value, cdtime_t time, cb->send_buf_fill += message_len; cb->send_buf_free -= message_len; - DEBUG("write_tsdb plugin: [%s]:%s buf %zu/%zu (%.1f %%) \"%s\"", cb->node, - cb->service, cb->send_buf_fill, sizeof(cb->send_buf), + DEBUG("write_tsdb plugin: [%s]:%s buf %zu/%zu (%.1f %%) \"%s\"", node, + service, cb->send_buf_fill, sizeof(cb->send_buf), 100.0 * ((double)cb->send_buf_fill) / ((double)sizeof(cb->send_buf)), message); @@ -452,9 +752,13 @@ static int wt_write_messages(const data_set_t *ds, const value_list_t *vl, struct wt_callback *cb) { char key[10 * DATA_MAX_NAME_LEN]; char values[512]; + char tags[10 * DATA_MAX_NAME_LEN]; int status; + const char *node = cb->node ? cb->node : WT_DEFAULT_NODE; + const char *service = cb->service ? cb->service : WT_DEFAULT_SERVICE; + if (0 != strcmp(ds->type, vl->type)) { ERROR("write_tsdb plugin: DS type does not match " "value list type"); @@ -485,11 +789,24 @@ static int wt_write_messages(const data_set_t *ds, const value_list_t *vl, return status; } + /* Copy tags from p-pi/t-ti ds notation into tags */ + tags[0] = '\0'; + status = wt_format_tags(tags, sizeof(tags), vl, cb, ds_name); + if (status != 0) { + ERROR("write_tsdb plugin: error with format_tags"); + return status; + } + /* Send the message to tsdb */ - status = wt_send_message(key, values, vl->time, cb, vl->host, vl->meta); + status = wt_send_message(key, values, tags, vl->time, cb, vl); if (status != 0) { - ERROR("write_tsdb plugin: error with " - "wt_send_message"); + if (cb->connect_failed_log_enabled) { + /* Do not log if socket is not enabled : it was logged already + * in wt_callback_init(). */ + ERROR("write_tsdb plugin (%s:%s): error with " + "wt_send_message", + node, service); + } return status; } } @@ -526,6 +843,7 @@ static int wt_config_tsd(oconfig_item_t *ci) { cb->service = NULL; cb->host_tags = NULL; cb->store_rates = 0; + cb->connect_failed_log_enabled = 1; pthread_mutex_init(&cb->send_lock, NULL);