Skip to content

Parallel Libtrace HOWTO: Ticks

Shane Alcock edited this page Sep 21, 2015 · 3 revisions

Our example program is complete, but it isn't overly useful in real-time situations as we'll only get results when our program finishes. It would be much better if we could periodically trigger the processing threads to publish their current tallies to the reporter thread so that we can get regular updates printed to the terminal.

In parallel libtrace, there is the concept of ticks which are special messages that can be configured to be automatically sent to all processing threads when either a certain amount of time has passed or a certain number of packets have been read. The main benefit of ticks is that they will be seen by a thread even if it has not been assigned any packets from the hasher, so you never have to worry about the problem of a thread becoming inactive and no longer publishing results.

In our case, we want to use a time-based tick to get our processing threads to publish their results. A packet count based tick works in more or less the same way, except that the API and callback functions have slightly different names. There is one major difference: time-based ticks are delivered on a best-effort basis, so the processing thread may see packets that were captured slightly after the timestamp of the tick before it sees the tick itself. Packet count ticks are delivered in-band with the packets, so a packet count tick will never be out of order.

First, let's write the callback function that we want the processing threads to run when they receive a tick message. This is going to look a lot like our stopping callback, since we're going to force a result to be published whenever we get a tick.

static void per_tick(libtrace_t *trace, libtrace_thread_t *thread,
         void *global, void *tls, uint64_t tick) {

    struct udpresult *u;

    /* Cast our local storage to the right structure */
    struct counters *c = (struct counters *)tls;

    /* Create a new result object to pass into the reporter. Note
     * this is malloc'd memory so the reporter must free it. */
    u = (struct udpresult *)malloc(sizeof(struct udpresult));

    /* Populate our result structure */
    u->packets = c->packets;
    u->payload = c->payload;

    /* Publish the result -- use the tick timestamp as a key so we
     * can combine results for the same period. */
    trace_publish_result(trace, thread, tick, (libtrace_generic_t){.ptr=u}, 
            RESULT_USER);

    /* Reset our counters */
    c->packets = 0;
    c->payload = 0;
}

The big change here is that when we publish the result, we use the timestamp from the tick (which is passed into our callback as an argument) as the key instead of the 0 we were using before. This allows our combiner to group together results for the same time period and push them to the reporter thread before any results for later time periods.

We can now remove the publishing of results from the stopping callback, so it becomes something like:

static void stop_processing(libtrace_t *trace, libtrace_thread_t *thread,
        void *global, void *tls) {
    struct counters *c = (struct counters *)tls;
    free(c);
}

Next, we should configure our tick interval inside our main function. Like all configuration, you need to do this after you've created the trace but before you start it. We also need to change our combiner to an ordered combiner and add our new callback to the processing callback set.

int main(int argc, char *argv[]) {
    /* Removed everything prior to trace_create() */
    input = trace_create(uri);
    if (trace_is_err(input)) {
        trace_perror(input, "Creating trace");
        return 1;
    }

    /* Add our new callback to the processing callback set */
    trace_set_tick_interval_cb(processing, per_tick);

    /* Setting a tick interval of 10 seconds -- note this is
     * specified in milliseconds. */
    trace_set_tick_interval(input, 10000);

    /* Our results now have an inherent timestamp order, so we
     * need to switch to an ordered combiner.
     */
    trace_set_combiner(input, &combiner_ordered,
             (libtrace_generic_t){0});

    /* Configure the rest of our options here, e.g. thread count */
    
    /* Start the trace */
    if (trace_pstart(input, NULL, processing, reporter)) {
        trace_perror(input, "Starting parallel trace");
        return 1;
    }
    
    trace_join(input);
    
    /* XXX Insert cleanup stuff down here */
    return 0;
}

These changes will ensure that our results get published every 10 seconds, but we still need to update the reporter thread callbacks to print our periodic tallies as they arrive rather than when the reporter thread ends.

 static void per_result(libtrace_t *trace, libtrace_thread_t *sender,
        void *global, void *tls, libtrace_result_t *result) {

    struct udpresult *u;
    uint64_t key;
    struct udptally *tally;

    if (result->type != RESULT_USER)
        return;

    /* The key is the same as the key value that was passed into 
     * trace_publish_result() when the processing thread published this
     * result. This is the timestamp of the result we received.
     */
    key = result->key;
    u = (struct udpresult *)result->value.ptr;

    /* If the timestamp is new, we've seen all the results that we are
     * going to get for the previous timestamp. Print them to the
     * terminal, along with the timestamp (in seconds since the epoch).
     */
    if (key > tally->lastkey) {
        double avgsize = 0.0;
        if (tally->packets != 0)
            avgsize = tally->payload / (double)tally->packets;
        printf("%u Average UDP payload size: %.2f\n", 
                (tally->lastkey >> 32), avgsize);

        /* Reset the tally */
        tally->payload = 0;
        tally->packets = 0;
    }

    /* Grab our tally out of thread local storage and update it based on
     * this new result.
     */
    tally = (struct udptally *)tls;
    tally->packets += u->packets;
    tally->payload += u->payload;
    tally->lastkey = key;

    /* Remember that 'u' was malloced by the processing thread, so make sure
     * we free it here.
     */
    free(u);
}

static void stop_reporter(libtrace_t *trace, libtrace_thread_t *thread, 
        void *global, void *tls) {

    struct udpresult *tally;
    tally = (struct udpresult *)tls;
    free(tally);
}

This completes the program for our example scenario. However, there are still some callbacks in parallel libtrace that it is worth being familiar with. The first of these is the custom message callback.

Clone this wiki locally