Skip to content
This repository has been archived by the owner on Nov 14, 2019. It is now read-only.

Lone metrics stuck in rcvmmsg call #38

Closed
deanbittner opened this issue May 3, 2016 · 4 comments
Closed

Lone metrics stuck in rcvmmsg call #38

deanbittner opened this issue May 3, 2016 · 4 comments

Comments

@deanbittner
Copy link

In the brubeck source, at /brubeck-master/src/samplers/statsd.c, line 37, the rcvmmsg call should have the MSG_WAITFORONE flag set in the 4th argument, which is currently zero. With this flag NOT set, a single udp msg does not receive, and without a timeout, the call blocks indefinitely, waiting for > 1 message. This can result in the loss of lone messages, like sparse calls with a single metric.

Most of the time, the metrics flow is significant, and these single messages are flushed with additional messages, and the rcvmmsg will work. If, however, a single metric is sent, the call blocks indefinitely, leaving the message stranded.

This can be reproduced by sending a single metric to a running instance of brubeck. The single metric fails to emit in the line or pickle protocol to graphite. When the MSG_WAITFORONE flag is added, the metric flows through as expected.

The workaround is to change the brubeck configuration by setting the value of the "multimsg" setting in /etc/brubeck/config.json to 1, and then restart brubeck. This configuration change deactivates the rcvmmsg call detailed above, falling back to rcvmsg. It's not clear that this will have any performance implication with multiple worker threads in the receive pool; however, it seems likely that there will be some impact on a busy metrics flow.

I have built and tested with the flag above, and it works fine. (I also have a Mac build for anyone who wants it).

@vmg
Copy link
Contributor

vmg commented May 4, 2016

Thanks for the report! I've cherry picked and pushed your proposed change. :)

@vmg vmg closed this as completed May 4, 2016
@deanbittner
Copy link
Author

I’m getting some of these now in the brubeck log … after #38

instance=orwell.sls.inf.brubeck.log01 sampler=statsd event=packet_drop
instance=orwell.sls.inf.brubeck.log01 sampler=statsd event=packet_drop
instance=orwell.sls.inf.brubeck.log01 sampler=statsd event=packet_drop

On May 4, 2016, at 3:06 AM, Vicent Marti notifications@github.com wrote:

Closed #38 #38.


You are receiving this because you authored the thread.
Reply to this email directly or view it on GitHub #38 (comment)

@deanbittner
Copy link
Author

The recvmmsg call returns the number of messages read, which will often be less than SIM_PACKETS. It looks like if there are less messages received than SIM_PACKETS, a dropped_packet is logged. I think the change is to test the number of packets received against the positive return result from the recvmmsg call instead of SIM_PACKETS. I will test that.

On May 4, 2016, at 3:06 AM, Vicent Marti notifications@github.com wrote:

Closed #38 #38.


You are receiving this because you authored the thread.
Reply to this email directly or view it on GitHub #38 (comment)

@deanbittner
Copy link
Author

deanbittner commented May 5, 2016

diff --git a/src/samplers/statsd.c b/src/samplers/statsd.c
index 7f828f0..f1f49de 100644
--- a/src/samplers/statsd.c
+++ b/src/samplers/statsd.c
@@ -4,6 +4,7 @@
 #include <sys/socket.h>
 #include "brubeck.h"

+
 #ifdef __GLIBC__
 #      if ((__GLIBC__ > 2) || ((__GLIBC__ == 2) && (__GLIBC_MINOR__ >= 12)))
 #              define HAVE_RECVMMSG 1
@@ -34,7 +35,10 @@ static void statsd_run_recvmmsg(struct brubeck_statsd *statsd, int sock)
        log_splunk("sampler=statsd event=worker_online syscall=recvmmsg socket=%d", sock);

        for (;;) {
-               int res = recvmmsg(sock, msgs, SIM_PACKETS, 0, NULL);
+
+    //MSG_WAITFORONE
+    //         int res = recvmmsg(sock, msgs, SIM_PACKETS, 0, NULL);
+               int res = recvmmsg(sock, msgs, SIM_PACKETS, MSG_WAITFORONE, NULL);

                if (res < 0) {
                        if (errno == EAGAIN || errno == EINTR)
@@ -44,11 +48,18 @@ static void statsd_run_recvmmsg(struct brubeck_statsd *statsd, int sock)
                        brubeck_stats_inc(server, errors);
                        continue;
                }
+               if (res == 0)
+                 {
+                       log_splunk_errno("sampler=statsd event=no_read");
+                       brubeck_stats_inc(server, errors);
+                       continue;
+                 }

                /* store stats */
-               brubeck_atomic_add(&statsd->sampler.inflow, SIM_PACKETS);
+               //              brubeck_atomic_add(&statsd->sampler.inflow, SIM_PACKETS);
+               brubeck_atomic_add(&statsd->sampler.inflow, res);

-               for (i = 0; i < SIM_PACKETS; ++i) {
+               for (i = 0; i < res; ++i) {
                        char *buf = msgs[i].msg_hdr.msg_iov->iov_base;
                        char *end = buf + msgs[i].msg_len;
                        brubeck_statsd_packet_parse(server, buf, end);

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants