Skip to content

Parallel Libtrace HOWTO: Custom Messages

Shane Alcock edited this page Sep 21, 2015 · 3 revisions

In some applications, there may be cases where you may want to send a custom message to one or more libtrace threads. This message may be sourced from the main thread, the reporter thread or even another processing thread.

Let's consider an example where we are developing a packet counting application that has a "reset counters" button. If the user clicks on that button, we want all of our processing threads to reset their packet counter to zero.

First, let's write some code to create the custom message and send it to all of the processing threads.

#define RESET_MESSAGE (MESSAGE_USER + 10)

static void reset_button(libtrace_t *trace) {
    struct timeval tv;
    libtrace_message_t msg;

    /* Include the current time with message */
    gettimeofday(&tv, NULL);
   
    /* Any message with a code >= MESSAGE_USER is a custom
     * message. Use the code to distinguish between different
     * types of custom message.
     */
    msg.code = RESET_MESSAGE;

    /* data is a libtrace_generic_t that can be used to
     * include some data along with the message. data is
     * optional -- the code should indicate how to interpret
     * any data that is present.
     */
    msg.data.uint32 = tv.tv_sec;
    
    /* If a libtrace thread triggered this message, sender should
     * be set to point to that thread so the recipient of the 
     * message can respond if necessary. If no response is required
     * or the sender is not a libtrace thread, set this to NULL.
     */
    msg.sender = NULL;

    /* This will send the message to all processing threads. See
     * also trace_message_reporter() and trace_message_thread().
     */
    trace_message_perpkts(trace, &msg);
}

Now we need to add a callback for our processing threads to handle this message. Assuming we are using a similar 'counters' structure to the one we had in our earlier examples, the callback might look something like:

static void message_cb(libtrace_t *trace, libtrace_thread_t *thread,
        void *global, void *tls, int code, libtrace_generic_t data,
        libtrace_thread_t *sender) {

    struct counters *c = (struct counters *)tls;

    /* All custom messages will reach this callback so if we have
     * more than one, we need to check the message code.
     */
    if (code == RESET_MESSAGE) {
        struct udpresult *u;

        /* We know that the message data contains the timestamp when
         * the reset was triggered.
         */
        uint32_t ts = data.uint32;
        
        /* Publish our last counts before the reset, just in case */
        u = (struct udpresult *)malloc(sizeof(struct udpresult));
        u->packets = c->packets;
        u->payload = c->payload;

        /* Remember to convert our timestamp into a 64 bit ERF
         * timestamp.
         */
        trace_publish_result(trace, thread, (ts << 32), 
                (libtrace_generic_t){.ptr=u}, RESULT_USER);

        /* Now we can reset our counter */
        c->packets = 0;
        c->payload = 0;
        c->lastkey = (ts << 32);       
    }
}

The final step is to make sure this callback is registered with our processing threads. Just as with the other callbacks, we need to do this before starting the trace.

    /* somewhere inside main, before calling trace_pstart() */
    trace_set_user_message_cb(processing, message_cb);

Next up, pausing and resuming traces.

Clone this wiki locally