Skip to content

Commit

Permalink
prometheus_remote_write: Add cutoff for outdated metrics (#183)
Browse files Browse the repository at this point in the history
This is because the official Prometheus node_exporter should remove the
outdated metrics because it creates the storages which should be
handling/storing metrics' information on every cycles.
At least, we should cut off for 1 hour or older metrics to prevent
metric too old errors on prometheus remote write mechanism.

Signed-off-by: Hiroshi Hatake <hatake@calyptia.com>
  • Loading branch information
cosmo0920 committed Jan 14, 2024
1 parent 82cf0ce commit 57b5c9c
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 5 deletions.
2 changes: 2 additions & 0 deletions include/cmetrics/cmt_encode_prometheus_remote_write.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@
#include <prometheus_remote_write/remote.pb-c.h>

#define CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_ADD_METADATA CMT_FALSE
#define CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_CUTOFF_THRESHOLD 60L*60L*1000000000L

#define CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_SUCCESS 0
#define CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_ALLOCATION_ERROR 1
#define CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_UNEXPECTED_ERROR 2
#define CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_INVALID_ARGUMENT_ERROR 3
#define CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_UNEXPECTED_METRIC_TYPE 4
#define CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_CUTOFF_ERROR 5

struct cmt_prometheus_metric_metadata {
Prometheus__MetricMetadata data;
Expand Down
58 changes: 57 additions & 1 deletion src/cmt_encode_prometheus_remote_write.c
Original file line number Diff line number Diff line change
Expand Up @@ -644,18 +644,38 @@ int pack_basic_metric_sample(struct cmt_prometheus_remote_write_context *context
return append_metric_to_timeseries(time_series, metric);
}

static int check_staled_timestamp(struct cmt_metric *metric, uint64_t now, uint64_t cutoff)
{
uint64_t ts;
uint64_t diff;

ts = cmt_metric_get_timestamp(metric);
diff = now - ts;

return diff > cutoff;
}

int pack_basic_type(struct cmt_prometheus_remote_write_context *context,
struct cmt_map *map)
{
int add_metadata;
struct cmt_metric *metric;
int result;
struct cfl_list *head;
uint64_t now;

context->sequence_number++;
add_metadata = CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_ADD_METADATA;

now = cfl_time_now();

if (map->metric_static_set == CMT_TRUE) {
if (check_staled_timestamp(&map->metric, now,
CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_CUTOFF_THRESHOLD)) {
/* Skip processing metrics which are staled over the threshold */
return CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_CUTOFF_ERROR;
}

result = pack_basic_metric_sample(context, map, &map->metric, add_metadata);

if (result != CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_SUCCESS) {
Expand All @@ -666,6 +686,12 @@ int pack_basic_type(struct cmt_prometheus_remote_write_context *context,
cfl_list_foreach(head, &map->metrics) {
metric = cfl_list_entry(head, struct cmt_metric, _head);

if (check_staled_timestamp(metric, now,
CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_CUTOFF_THRESHOLD)) {
/* Skip processing metrics which are staled over over the threshold */
return CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_CUTOFF_ERROR;
}

result = pack_basic_metric_sample(context, map, metric, add_metadata);

if (result != CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_SUCCESS) {
Expand Down Expand Up @@ -699,6 +725,15 @@ int pack_complex_metric_sample(struct cmt_prometheus_remote_write_context *conte
struct cmt_summary *summary;
int result;
size_t index;
uint64_t now;

now = cfl_time_now();

if (check_staled_timestamp(metric, now,
CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_CUTOFF_THRESHOLD)) {
/* Skip processing metrics which are staled over the threshold */
return CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_CUTOFF_ERROR;
}

additional_label_caption = cfl_sds_create_len(NULL, 128);

Expand Down Expand Up @@ -1067,6 +1102,10 @@ cfl_sds_t cmt_encode_prometheus_remote_write_create(struct cmt *cmt)
counter = cfl_list_entry(head, struct cmt_counter, _head);
result = pack_basic_type(&context, counter->map);

if (result == CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_CUTOFF_ERROR) {
continue;
}

if (result != CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_SUCCESS) {
break;
}
Expand All @@ -1078,6 +1117,10 @@ cfl_sds_t cmt_encode_prometheus_remote_write_create(struct cmt *cmt)
gauge = cfl_list_entry(head, struct cmt_gauge, _head);
result = pack_basic_type(&context, gauge->map);

if (result == CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_CUTOFF_ERROR) {
continue;
}

if (result != CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_SUCCESS) {
break;
}
Expand All @@ -1089,6 +1132,10 @@ cfl_sds_t cmt_encode_prometheus_remote_write_create(struct cmt *cmt)
cfl_list_foreach(head, &cmt->untypeds) {
untyped = cfl_list_entry(head, struct cmt_untyped, _head);
pack_basic_type(&context, untyped->map);

if (result == CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_CUTOFF_ERROR) {
continue;
}
}
}

Expand All @@ -1098,6 +1145,10 @@ cfl_sds_t cmt_encode_prometheus_remote_write_create(struct cmt *cmt)
summary = cfl_list_entry(head, struct cmt_summary, _head);
result = pack_complex_type(&context, summary->map);

if (result == CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_CUTOFF_ERROR) {
continue;
}

if (result != CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_SUCCESS) {
break;
}
Expand All @@ -1110,13 +1161,18 @@ cfl_sds_t cmt_encode_prometheus_remote_write_create(struct cmt *cmt)
histogram = cfl_list_entry(head, struct cmt_histogram, _head);
result = pack_complex_type(&context, histogram->map);

if (result == CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_CUTOFF_ERROR) {
continue;
}

if (result != CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_SUCCESS) {
break;
}
}
}

if (result == CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_SUCCESS) {
if (result == CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_SUCCESS ||
result == CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_CUTOFF_ERROR) {
buf = render_remote_write_context_to_sds(&context);
}

Expand Down
43 changes: 39 additions & 4 deletions tests/encoding.c
Original file line number Diff line number Diff line change
Expand Up @@ -66,19 +66,17 @@ static struct cmt *generate_simple_encoder_test_data()
return cmt;
}

static struct cmt *generate_encoder_test_data()
static struct cmt *generate_encoder_test_data_with_timestamp(uint64_t ts)
{
double quantiles[5];
struct cmt_histogram_buckets *buckets;
double val;
struct cmt *cmt;
uint64_t ts;
struct cmt_gauge *g1;
struct cmt_counter *c1;
struct cmt_summary *s1;
struct cmt_histogram *h1;

ts = 0;
cmt = cmt_create();

c1 = cmt_counter_create(cmt, "kubernetes", "network", "load_counter", "Network load counter",
Expand Down Expand Up @@ -158,6 +156,11 @@ static struct cmt *generate_encoder_test_data()
return cmt;
}

static struct cmt *generate_encoder_test_data()
{
return generate_encoder_test_data_with_timestamp(0);
}

/*
* perform the following data encoding and compare msgpack buffsers
*
Expand Down Expand Up @@ -511,10 +514,13 @@ void test_prometheus_remote_write()
struct cmt *cmt;
cfl_sds_t payload;
FILE *sample_file;
uint64_t ts;

ts = cfl_time_now();

cmt_initialize();

cmt = generate_encoder_test_data();
cmt = generate_encoder_test_data_with_timestamp(ts);

payload = cmt_encode_prometheus_remote_write_create(cmt);
TEST_CHECK(NULL != payload);
Expand Down Expand Up @@ -544,6 +550,34 @@ curl -v 'http://localhost:9090/receive' -H 'Content-Type: application/x-protobuf
cmt_destroy(cmt);
}

void test_prometheus_remote_write_with_outdated_timestamps()
{
struct cmt *cmt;
cfl_sds_t payload;
uint64_t ts;

ts = cfl_time_now() - CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_CUTOFF_THRESHOLD * 1.5;

cmt_initialize();

cmt = generate_encoder_test_data_with_timestamp(ts);

payload = cmt_encode_prometheus_remote_write_create(cmt);
TEST_CHECK(NULL != payload);

if (payload == NULL) {
cmt_destroy(cmt);

return;
}

TEST_CHECK(0 == cfl_sds_len(payload));

cmt_encode_prometheus_remote_write_destroy(payload);

cmt_destroy(cmt);
}

void test_opentelemetry()
{
cfl_sds_t payload;
Expand Down Expand Up @@ -1081,6 +1115,7 @@ TEST_LIST = {
{"cmt_msgpack_cleanup_on_error", test_cmt_to_msgpack_cleanup_on_error},
{"cmt_msgpack_partial_processing", test_cmt_msgpack_partial_processing},
{"prometheus_remote_write", test_prometheus_remote_write},
{"prometheus_remote_write_old_cmt",test_prometheus_remote_write_with_outdated_timestamps},
{"cmt_msgpack_stability", test_cmt_to_msgpack_stability},
{"cmt_msgpack_integrity", test_cmt_to_msgpack_integrity},
{"cmt_msgpack_labels", test_cmt_to_msgpack_labels},
Expand Down

0 comments on commit 57b5c9c

Please sign in to comment.