Skip to content

Commit

Permalink
cassandra: Added metrics=path connect setting.
Browse files Browse the repository at this point in the history
Cassandra's metrics are written to the path in JSON format. It can be a file
or a FIFO. The path supports expanding the standard global %variables, such
as %{pid}
  • Loading branch information
sirainen committed May 26, 2016
1 parent 17aed59 commit a2f4214
Showing 1 changed file with 76 additions and 0 deletions.
76 changes: 76 additions & 0 deletions src/lib-sql/driver-cassandra.c
Expand Up @@ -8,10 +8,12 @@
#include "net.h"
#include "write-full.h"
#include "time-util.h"
#include "var-expand.h"
#include "settings-parser.h"
#include "sql-api-private.h"

#ifdef BUILD_CASSANDRA
#include <fcntl.h>
#include <unistd.h>
#include <cassandra.h>

Expand Down Expand Up @@ -66,6 +68,9 @@ struct cassandra_db {
ARRAY(struct cassandra_result *) results;
unsigned int callback_ids;

char *metrics_path;
struct timeout *to_metrics;

struct timeval first_fallback_sent[CASSANDRA_QUERY_TYPE_COUNT];
time_t last_fallback_warning[CASSANDRA_QUERY_TYPE_COUNT];
unsigned int fallback_failures[CASSANDRA_QUERY_TYPE_COUNT];
Expand Down Expand Up @@ -442,6 +447,9 @@ static void driver_cassandra_parse_connect_string(struct cassandra_db *db,
} else if (strcmp(key, "request_timeout") == 0) {
if (settings_get_time(value, &db->request_timeout_secs, &error) < 0)
i_fatal("cassandra: Invalid request_timeout '%s': %s", value, error);
} else if (strcmp(key, "metrics") == 0) {
i_free(db->metrics_path);
db->metrics_path = i_strdup(value);
} else {
i_fatal("cassandra: Unknown connect string: %s", key);
}
Expand All @@ -461,6 +469,69 @@ static void driver_cassandra_parse_connect_string(struct cassandra_db *db,
db->hosts = i_strdup(str_c(hosts));
}

static void
driver_cassandra_get_metrics_json(struct cassandra_db *db, string_t *dest)
{
#define ADD_UINT64(_struct, _field) \
str_printfa(dest, "\""#_field"\": %llu,", (unsigned long long)metrics._struct._field);
#define ADD_DOUBLE(_struct, _field) \
str_printfa(dest, "\""#_field"\": %02lf,", metrics._struct._field);
CassMetrics metrics;

cass_session_get_metrics(db->session, &metrics);
str_append(dest, "{ \"requests\": {");
ADD_UINT64(requests, min);
ADD_UINT64(requests, max);
ADD_UINT64(requests, mean);
ADD_UINT64(requests, stddev);
ADD_UINT64(requests, median);
ADD_UINT64(requests, percentile_75th);
ADD_UINT64(requests, percentile_95th);
ADD_UINT64(requests, percentile_98th);
ADD_UINT64(requests, percentile_99th);
ADD_UINT64(requests, percentile_999th);
ADD_DOUBLE(requests, mean_rate);
ADD_DOUBLE(requests, one_minute_rate);
ADD_DOUBLE(requests, five_minute_rate);
ADD_DOUBLE(requests, fifteen_minute_rate);
str_truncate(dest, str_len(dest)-1);
str_append(dest, "}, \"stats\": {");
ADD_UINT64(stats, total_connections);
ADD_UINT64(stats, available_connections);
ADD_UINT64(stats, exceeded_pending_requests_water_mark);
ADD_UINT64(stats, exceeded_write_bytes_water_mark);
str_truncate(dest, str_len(dest)-1);
str_append(dest, "}, \"errors\": {");
ADD_UINT64(errors, connection_timeouts);
ADD_UINT64(errors, pending_request_timeouts);
ADD_UINT64(errors, request_timeouts);
str_truncate(dest, str_len(dest)-1);
str_append(dest, "}}");
}

static void driver_cassandra_metrics_write(struct cassandra_db *db)
{
struct var_expand_table tab[] = {
{ '\0', NULL, NULL }
};
string_t *path = t_str_new(64);
string_t *data;
int fd;

var_expand(path, db->metrics_path, tab);

fd = open(str_c(path), O_WRONLY | O_CREAT | O_TRUNC | O_NONBLOCK, 0600);
if (fd == -1) {
i_error("creat(%s) failed: %m", str_c(path));
return;
}
data = t_str_new(1024);
driver_cassandra_get_metrics_json(db, data);
if (write_full(fd, str_data(data), str_len(data)) < 0)
i_error("write(%s) failed: %m", str_c(path));
i_close_fd(&fd);
}

static struct sql_db *driver_cassandra_init_v(const char *connect_string)
{
struct cassandra_db *db;
Expand Down Expand Up @@ -489,6 +560,8 @@ static struct sql_db *driver_cassandra_init_v(const char *connect_string)
if (db->num_threads != 0)
cass_cluster_set_num_threads_io(db->cluster, db->num_threads);
db->session = cass_session_new();
if (db->metrics_path != NULL)
db->to_metrics = timeout_add(1000, driver_cassandra_metrics_write, db);
i_array_init(&db->results, 16);
i_array_init(&db->callbacks, 16);
return &db->api;
Expand All @@ -508,6 +581,9 @@ static void driver_cassandra_deinit_v(struct sql_db *_db)
cass_session_free(db->session);
cass_cluster_free(db->cluster);
cass_timestamp_gen_free(db->timestamp_gen);
if (db->to_metrics != NULL)
timeout_remove(&db->to_metrics);
i_free(db->metrics_path);
i_free(db->hosts);
i_free(db->error);
i_free(db->keyspace);
Expand Down

0 comments on commit a2f4214

Please sign in to comment.