From 57b5c9cbc1b266cbc21027ea93b821e5eb6d0756 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Mon, 15 Jan 2024 07:10:37 +0900 Subject: [PATCH] prometheus_remote_write: Add cutoff for outdated metrics (#183) This is because the official Prometheus node_exporter should remove the outdated metrics because it creates the storages which should be handling/storing metrics' information on every cycles. At least, we should cut off for 1 hour or older metrics to prevent metric too old errors on prometheus remote write mechanism. Signed-off-by: Hiroshi Hatake --- .../cmt_encode_prometheus_remote_write.h | 2 + src/cmt_encode_prometheus_remote_write.c | 58 ++++++++++++++++++- tests/encoding.c | 43 ++++++++++++-- 3 files changed, 98 insertions(+), 5 deletions(-) diff --git a/include/cmetrics/cmt_encode_prometheus_remote_write.h b/include/cmetrics/cmt_encode_prometheus_remote_write.h index c160c27..9351ed6 100644 --- a/include/cmetrics/cmt_encode_prometheus_remote_write.h +++ b/include/cmetrics/cmt_encode_prometheus_remote_write.h @@ -25,12 +25,14 @@ #include #define CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_ADD_METADATA CMT_FALSE +#define CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_CUTOFF_THRESHOLD 60L*60L*1000000000L #define CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_SUCCESS 0 #define CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_ALLOCATION_ERROR 1 #define CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_UNEXPECTED_ERROR 2 #define CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_INVALID_ARGUMENT_ERROR 3 #define CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_UNEXPECTED_METRIC_TYPE 4 +#define CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_CUTOFF_ERROR 5 struct cmt_prometheus_metric_metadata { Prometheus__MetricMetadata data; diff --git a/src/cmt_encode_prometheus_remote_write.c b/src/cmt_encode_prometheus_remote_write.c index 2aede25..b5421de 100644 --- a/src/cmt_encode_prometheus_remote_write.c +++ b/src/cmt_encode_prometheus_remote_write.c @@ -644,6 +644,17 @@ int pack_basic_metric_sample(struct cmt_prometheus_remote_write_context *context return append_metric_to_timeseries(time_series, metric); } +static int check_staled_timestamp(struct cmt_metric *metric, uint64_t now, uint64_t cutoff) +{ + uint64_t ts; + uint64_t diff; + + ts = cmt_metric_get_timestamp(metric); + diff = now - ts; + + return diff > cutoff; +} + int pack_basic_type(struct cmt_prometheus_remote_write_context *context, struct cmt_map *map) { @@ -651,11 +662,20 @@ int pack_basic_type(struct cmt_prometheus_remote_write_context *context, struct cmt_metric *metric; int result; struct cfl_list *head; + uint64_t now; context->sequence_number++; add_metadata = CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_ADD_METADATA; + now = cfl_time_now(); + if (map->metric_static_set == CMT_TRUE) { + if (check_staled_timestamp(&map->metric, now, + CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_CUTOFF_THRESHOLD)) { + /* Skip processing metrics which are staled over the threshold */ + return CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_CUTOFF_ERROR; + } + result = pack_basic_metric_sample(context, map, &map->metric, add_metadata); if (result != CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_SUCCESS) { @@ -666,6 +686,12 @@ int pack_basic_type(struct cmt_prometheus_remote_write_context *context, cfl_list_foreach(head, &map->metrics) { metric = cfl_list_entry(head, struct cmt_metric, _head); + if (check_staled_timestamp(metric, now, + CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_CUTOFF_THRESHOLD)) { + /* Skip processing metrics which are staled over over the threshold */ + return CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_CUTOFF_ERROR; + } + result = pack_basic_metric_sample(context, map, metric, add_metadata); if (result != CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_SUCCESS) { @@ -699,6 +725,15 @@ int pack_complex_metric_sample(struct cmt_prometheus_remote_write_context *conte struct cmt_summary *summary; int result; size_t index; + uint64_t now; + + now = cfl_time_now(); + + if (check_staled_timestamp(metric, now, + CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_CUTOFF_THRESHOLD)) { + /* Skip processing metrics which are staled over the threshold */ + return CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_CUTOFF_ERROR; + } additional_label_caption = cfl_sds_create_len(NULL, 128); @@ -1067,6 +1102,10 @@ cfl_sds_t cmt_encode_prometheus_remote_write_create(struct cmt *cmt) counter = cfl_list_entry(head, struct cmt_counter, _head); result = pack_basic_type(&context, counter->map); + if (result == CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_CUTOFF_ERROR) { + continue; + } + if (result != CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_SUCCESS) { break; } @@ -1078,6 +1117,10 @@ cfl_sds_t cmt_encode_prometheus_remote_write_create(struct cmt *cmt) gauge = cfl_list_entry(head, struct cmt_gauge, _head); result = pack_basic_type(&context, gauge->map); + if (result == CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_CUTOFF_ERROR) { + continue; + } + if (result != CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_SUCCESS) { break; } @@ -1089,6 +1132,10 @@ cfl_sds_t cmt_encode_prometheus_remote_write_create(struct cmt *cmt) cfl_list_foreach(head, &cmt->untypeds) { untyped = cfl_list_entry(head, struct cmt_untyped, _head); pack_basic_type(&context, untyped->map); + + if (result == CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_CUTOFF_ERROR) { + continue; + } } } @@ -1098,6 +1145,10 @@ cfl_sds_t cmt_encode_prometheus_remote_write_create(struct cmt *cmt) summary = cfl_list_entry(head, struct cmt_summary, _head); result = pack_complex_type(&context, summary->map); + if (result == CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_CUTOFF_ERROR) { + continue; + } + if (result != CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_SUCCESS) { break; } @@ -1110,13 +1161,18 @@ cfl_sds_t cmt_encode_prometheus_remote_write_create(struct cmt *cmt) histogram = cfl_list_entry(head, struct cmt_histogram, _head); result = pack_complex_type(&context, histogram->map); + if (result == CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_CUTOFF_ERROR) { + continue; + } + if (result != CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_SUCCESS) { break; } } } - if (result == CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_SUCCESS) { + if (result == CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_SUCCESS || + result == CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_CUTOFF_ERROR) { buf = render_remote_write_context_to_sds(&context); } diff --git a/tests/encoding.c b/tests/encoding.c index 093adce..9d4478e 100644 --- a/tests/encoding.c +++ b/tests/encoding.c @@ -66,19 +66,17 @@ static struct cmt *generate_simple_encoder_test_data() return cmt; } -static struct cmt *generate_encoder_test_data() +static struct cmt *generate_encoder_test_data_with_timestamp(uint64_t ts) { double quantiles[5]; struct cmt_histogram_buckets *buckets; double val; struct cmt *cmt; - uint64_t ts; struct cmt_gauge *g1; struct cmt_counter *c1; struct cmt_summary *s1; struct cmt_histogram *h1; - ts = 0; cmt = cmt_create(); c1 = cmt_counter_create(cmt, "kubernetes", "network", "load_counter", "Network load counter", @@ -158,6 +156,11 @@ static struct cmt *generate_encoder_test_data() return cmt; } +static struct cmt *generate_encoder_test_data() +{ + return generate_encoder_test_data_with_timestamp(0); +} + /* * perform the following data encoding and compare msgpack buffsers * @@ -511,10 +514,13 @@ void test_prometheus_remote_write() struct cmt *cmt; cfl_sds_t payload; FILE *sample_file; + uint64_t ts; + + ts = cfl_time_now(); cmt_initialize(); - cmt = generate_encoder_test_data(); + cmt = generate_encoder_test_data_with_timestamp(ts); payload = cmt_encode_prometheus_remote_write_create(cmt); TEST_CHECK(NULL != payload); @@ -544,6 +550,34 @@ curl -v 'http://localhost:9090/receive' -H 'Content-Type: application/x-protobuf cmt_destroy(cmt); } +void test_prometheus_remote_write_with_outdated_timestamps() +{ + struct cmt *cmt; + cfl_sds_t payload; + uint64_t ts; + + ts = cfl_time_now() - CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_CUTOFF_THRESHOLD * 1.5; + + cmt_initialize(); + + cmt = generate_encoder_test_data_with_timestamp(ts); + + payload = cmt_encode_prometheus_remote_write_create(cmt); + TEST_CHECK(NULL != payload); + + if (payload == NULL) { + cmt_destroy(cmt); + + return; + } + + TEST_CHECK(0 == cfl_sds_len(payload)); + + cmt_encode_prometheus_remote_write_destroy(payload); + + cmt_destroy(cmt); +} + void test_opentelemetry() { cfl_sds_t payload; @@ -1081,6 +1115,7 @@ TEST_LIST = { {"cmt_msgpack_cleanup_on_error", test_cmt_to_msgpack_cleanup_on_error}, {"cmt_msgpack_partial_processing", test_cmt_msgpack_partial_processing}, {"prometheus_remote_write", test_prometheus_remote_write}, + {"prometheus_remote_write_old_cmt",test_prometheus_remote_write_with_outdated_timestamps}, {"cmt_msgpack_stability", test_cmt_to_msgpack_stability}, {"cmt_msgpack_integrity", test_cmt_to_msgpack_integrity}, {"cmt_msgpack_labels", test_cmt_to_msgpack_labels},