Skip to content
Evan Summers edited this page Nov 24, 2013 · 9 revisions

We herewith continue the no-hit wonder "Timestamped: a trilogy in a few parts," this being the second part, where we introduce an interface for so-called Timestamped records, and use an ArrayDeque to gather them, with a time-based capacity.

We will be analysing logs in this unwinding series. Ultimately we gonna hook up a remote Log4j appender to digest our logs to gather stats, and make some judgement calls as to the rapidly changing status of our app.

Without further ado, I give you the namesake interface of this series.

public interface Timestamped {
    public long getTimestamp();    
}

which returns the timestamp in "millis" ala System.currentTimeMillis().

Also take an adapter for Log4j's LoggingEvent.

public class TimestampedLoggingEventAdapter implements Timestamped {
    LoggingEvent loggingEvent;

    public TimestampedLoggingEventAdapter(LoggingEvent loggingEvent) {
        this.loggingEvent = loggingEvent;
    }

    @Override
    public long getTimestamp() {
        return loggingEvent.getTimeStamp();
    }
}

And a generic wrapped element.

public class TimestampedElement<T> implements Timestamped, Comparable<Timestamped> {
    T element;
    long timestamp;

    public TimestampedElement(T element, long timestamp) {
        this.element = element;
        this.timestamp = timestamp;
    }

    public T getElement() {
        return element;
    }
    
    @Override
    public long getTimestamp() {
        return timestamp;
    }

    @Override
    public int compareTo(Timestamped other) {
        if (timestamp < other.getTimestamp()) return -1;
        if (timestamp > other.getTimestamp()) return 1;
        else return 0;
    }    
}

where we implement compareTo() for natural ordering by the timestamp.

Since duplicate timestamps are possible i.e. where two or more events occur at the same millisecond, and indeed duplicate log messages at the same time, we forgo implementing hashCode() and equals(). Imagine we add such elements to a Set, whose javadoc describes it thus:

Set - A collection that contains no duplicate elements. More formally, sets contain no pair of elements e1 and e2 such that e1.equals(e)
Therefore we defer to the default hashCode() and equals() from Object, which are based on object address reference.

We might construct a SortedSet of Timestamped elements.

SortedSet - The elements are ordered using their natural ordering, or by a Comparator typically provided at sorted set creation time.

So if we have not implemented compareTo(), then we will need a comparator.

public class TimestampedComparator implements Comparator<Timestamped> {

    @Override
    public int compare(Timestamped o1, Timestamped o2) {
        if (o1.getTimestamp() < o2.getTimestamp()) return -1;
        if (o1.getTimestamp() > o2.getTimestamp()) return 1;
        else return 0;
    }    
}
Our inclination might be collect Timestamped elements in a List, or a Queue perhaps.
Queue - A collection designed for holding elements prior to processing.
That sounds rather appropriate for our digestive purposes, to find the ghost in the machine.

Deque collector

So let's introduce the namesake of this article, a collector of timestamped thingies - a circular buffer, some might call it - and impose a time-based capacity thereupon.

So we use the java.util.Deque found in Java 1.6, courtesy of those most excellent gentlemen, Doug Lea and Josh Bloch. Its javadoc describes it thus:

Deque - A linear collection that supports element insertion and removal at both ends. The name deque is short for "double ended queue" and is usually pronounced "deck."
We use the efficient ArrayDeque implementation.
ArrayDeque - This class is likely to be faster than Stack when used as a stack, and faster than LinkedList when used as a queue.
Fantastic. Let's get us some of that. ```java public class TimestampedDequer { long capacityMillis; long lastTimestamp; ArrayDeque deque = new ArrayDeque();
public TimestampedDequer(long capacityMillis) {
    this.capacityMillis = capacityMillis;
}

public synchronized void addLast(T element) {
    if (element.getTimestamp() == 0 || element.getTimestamp() < lastTimestamp) {
        deque.clear(); // throw our toys out the cot exception
    } else {
        lastTimestamp = element.getTimestamp();
        prune(lastTimestamp);
        deque.addLast(element);
    }
}

private void prune(long lastTimestamp) {
    while (deque.size() > 0 && 
            deque.getFirst().getTimestamp() <= lastTimestamp - capacityMillis) {
        deque.removeFirst();
    }
}

<img src="http://jroller.com/evanx/resource/arcane-deck-white-back-300.jpg" align="left" hspace="16"/>
where we compose an <tt>ArrayDeque</tt> and <tt>synchronize</tt> it "externally" for our purposes, considering that it will be digesting log records continually, whilst being under interrogation by RMX, HTTP requests and what-not. 

When we add an element onto the tail, we <tt>prune()</tt> expired elements from the head. Observe that the above implementation assumes that elements are added in chronological order. However we expect our host's time to be adjusted by <tt>NTP</tt> occassionally - hence we <tt>clear()</tt> i.e. when we encounter an eventuality that we don't play nicely with.

If we are digesting logs from multiple servers or what-have-you, the above so-called "dequer" aint gonna work, baby - it's gonna come up empty, baby. Don't shuffle this deque, baby. <i>(As Elvis might have said.)</i> We'll deal with such a handful another time.

Now, in order to analyse the contents for the desired interval, we take a <tt>snapshot()</tt>.
```java
    public synchronized Deque<T> snapshot(long lastTimestamp) {
        prune(lastTimestamp);
        return deque.clone();
    }

which returns a defensive copy, and so is a relatively costly operation. Perhaps you could recommend an alternative strategy? Perhaps we will implement a special concurrent deque implementation in a future episode, as an exercise - taking inspiration from that Disruptor thingymajig, perchance, as well as ArrayDeque itself - one that supports aggregating from multiple servers, methinks.

Another use-case is to get the tail i.e. the latest so-many elements, for informational purposes e.g. to display via a servlet, or attach to an email alert.

    public synchronized Deque<T> tail(int size) {
        Deque tail = new ArrayDeque();
        Iterator<T> it = deque.descendingIterator();
        for (int i = 0; i < size && it.hasNext(); i++) {
            tail.addFirst(it.next());
        }
        return tail;
    }    

where we use descendingIterator() to read from the tail of the deque, and addFirst() to rectify the order.

Let's test this thing.

public class TimestampedDequerTest  {
    TimestampedDequer<TimestampedElement> dequer = new TimestampedDequer(capacityMillis);
    boolean verbose = false;
    
    private void addLast() {
        long timestamp = System.currentTimeMillis();
        String value = "record at " + timestamp;
        dequer.addLast(new TimestampedElement(value, timestamp));
        if (verbose) {
            System.out.println(value);
        }
    }

    @Test
    public void test() throws Exception {
        check();
        check();
    }

where we check() twice... just to make sure. Of prune(), that is.

    private void check() throws Exception {
        Thread.sleep(capacityMillis);
        Assert.assertEquals(0, dequer.snapshot(System.currentTimeMillis()).size());
        Assert.assertEquals(0, dequer.tail(4).size());
        addLast();
        Assert.assertEquals(1, dequer.tail(4).size());
        Assert.assertEquals(1, dequer.snapshot(System.currentTimeMillis()).size());
        Thread.sleep(capacityMillis/2);
        Assert.assertEquals(1, dequer.snapshot(System.currentTimeMillis()).size());
        Assert.assertEquals(1, dequer.tail(4).size());
        addLast();
        Assert.assertEquals(2, dequer.tail(4).size());
        Assert.assertEquals(2, dequer.snapshot(System.currentTimeMillis()).size());
        Thread.sleep(capacityMillis/2);
        Assert.assertEquals(2, dequer.tail(4).size());
        Assert.assertEquals(1, dequer.snapshot(System.currentTimeMillis()).size());
        Assert.assertEquals(1, dequer.tail(4).size());
    }
where expect the final snapshot() to loose an element to prune()'ing, given the two half capacityMillis sleeps since the first addList().

Considering that the purpose of this Timestamped series is reducing information overload, we'll tail off here for now, and leave the "heavy-dropping" for next week, namely, testing with threads.

Thereafter, we'll see about using our TimestampedDequer to analyse the latest deque of logs e.g. every minute, to detect when our app might be coming down like a house of cards.

Credits

Thanks to my colleague Zach Visagie at ipay.co.za, for his kind reviews and indispensible input!

Resources

https://github.com/evanx/vellum/wiki

Clone this wiki locally