diff --git a/lang/java/src/java/org/apache/avro/ipc/trace/FileSpanStorage.java b/lang/java/src/java/org/apache/avro/ipc/trace/FileSpanStorage.java index 544b5f8..3dd054c 100644 --- a/lang/java/src/java/org/apache/avro/ipc/trace/FileSpanStorage.java +++ b/lang/java/src/java/org/apache/avro/ipc/trace/FileSpanStorage.java @@ -25,9 +25,8 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; -import java.util.Queue; +import java.util.TreeMap; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.LinkedBlockingQueue; import org.apache.avro.file.CodecFactory; @@ -44,13 +43,15 @@ */ public class FileSpanStorage implements SpanStorage { /* - * We use rolling Avro DataFiles that store Span data associated with ten - * minute chunks. Because we enforce an upper limit on the number of spans - * stored, simply drop oldest file if and when the next write causes us to - * exceed that limit. + * We use rolling Avro data files that store Span data associated with ten + * minute chunks (this provides a simple way to index on time). Because we + * enforce an upper limit on the number of spans stored, simply drop + * oldest file if and when the next write causes us to exceed that limit. This + * approximates a FIFO queue of spans, which is basically what we want to + * maintain. * * Focus is on efficiency since most logic occurs every - * time a span is recorded (that is, every RPC call!). + * time a span is recorded (that is, every RPC call). * * We never want to block on span adding operations, which occur in the same * thread as the Requestor. We are okay to block on span retrieving @@ -60,12 +61,12 @@ public class FileSpanStorage implements SpanStorage { */ private class DiskWriterThread implements Runnable { - + /** Shared Span queue. Read-only for this thread. */ private BlockingQueue outstanding; /** Shared queue of files currently in view. Read/write for this thread. */ - private Queue files; + private TreeMap files; /** How many Spans already written to each file. */ private HashMap spansPerFile = new HashMap(); @@ -82,16 +83,22 @@ private class DiskWriterThread implements Runnable { /** Whether to buffer file writes.*/ private boolean doBuffer; + /** Compression level for files. */ + private int compressionLevel; + /** * Thread that runs continuously and writes outstanding requests to * Avro files. This thread also deals with rolling files over and dropping * old files when the span limit is reached. + * @param compressionLevel */ public DiskWriterThread(BlockingQueue outstanding, - Queue files, boolean buffer) { + TreeMap files, boolean buffer, + int compressionLevel) { this.outstanding = outstanding; this.files = files; this.doBuffer = buffer; + this.compressionLevel = compressionLevel; } public void run() { @@ -100,15 +107,16 @@ public void run() { try { s = this.outstanding.take(); } catch (InterruptedException e1) { - continue; + continue; // should not be interrupted } try { assureCurrentWriter(); this.currentWriter.append(s); if (!this.doBuffer) this.currentWriter.flush(); this.spansSoFar += 1; - long fileSpans = this.spansPerFile.get(this.files.peek()); - this.spansPerFile.put(this.files.peek(),fileSpans + 1); + File latest = this.files.lastEntry().getValue(); + long fileSpans = this.spansPerFile.get(latest); + this.spansPerFile.put(latest, fileSpans + 1); } catch (IOException e) { // Fail silently if can't write } @@ -127,11 +135,9 @@ private void assureCurrentWriter() throws IOException { // Will we overshoot policy? while (this.spansSoFar >= maxSpans) { File oldest = null; + // If spansSoFar is positive, there must be at least one file synchronized (this.files) { - oldest = this.files.poll(); - } - if (oldest == null) { - throw new RuntimeException("Bad state."); + oldest = this.files.remove(this.files.firstKey()); } this.spansSoFar -= spansPerFile.get(oldest); spansPerFile.remove(oldest); @@ -143,33 +149,37 @@ private void assureCurrentWriter() throws IOException { currentTimestamp = (long) 0; currentWriter = null; } + long rightNow = System.currentTimeMillis() / 1000L; + + // What file should we be in + long cutOff = floorSecond(rightNow); - long currentStamp = System.currentTimeMillis() / 1000L; - long cutOff = currentStamp - (currentStamp % SECONDS_PER_FILE); if (currentWriter == null) { createNewFile = true; } - else if (cutOff > (currentTimestamp + SECONDS_PER_FILE)) { + // Test for roll-over. + else if (cutOff >= (currentTimestamp + SECONDS_PER_FILE)) { currentWriter.close(); createNewFile = true; } + if (createNewFile) { File newFile = new File(TRACE_FILE_DIR + "/" + - Thread.currentThread().getId() + "_" + currentStamp + FILE_SUFFIX); + Thread.currentThread().getId() + "_" + cutOff + FILE_SUFFIX); synchronized (this.files) { - this.files.add(newFile); + this.files.put(cutOff, newFile); } this.spansPerFile.put(newFile, (long) 0); this.currentWriter = new DataFileWriter(SPAN_WRITER); - this.currentWriter.setCodec(CodecFactory.deflateCodec(9)); + this.currentWriter.setCodec(CodecFactory.deflateCodec(compressionLevel)); this.currentWriter.create(Span.SCHEMA$, newFile); - currentTimestamp = cutOff; + this.currentTimestamp = cutOff; } } } /** Granularity of file chunks. */ - private final static int SECONDS_PER_FILE = 60 * 10; // ten minute chunks + private static int SECONDS_PER_FILE = 60 * 10; // ten minute chunks /** Directory of data files */ private static String TRACE_FILE_DIR = "/tmp"; @@ -180,13 +190,10 @@ else if (cutOff > (currentTimestamp + SECONDS_PER_FILE)) { private final static SpecificDatumReader SPAN_READER = new SpecificDatumReader(Span.class); - /** How frequently to poll for new Spans, in Milliseconds */ - public final static long POLL_FREQUNECY_MILLIS = 50; - private long maxSpans = DEFAULT_MAX_SPANS; /** Shared queue of files currently in view. This thread only reads.*/ - private Queue files = new LinkedList(); + private TreeMap files = new TreeMap(); /** Shared Span queue. This thread only writes. */ LinkedBlockingQueue outstanding = new LinkedBlockingQueue(); @@ -195,9 +202,66 @@ else if (cutOff > (currentTimestamp + SECONDS_PER_FILE)) { Thread writer; Boolean writerEnabled; - public FileSpanStorage(boolean buffer) { + /** + * Return the head of the time bucket associated with this specific time. + */ + private static long floorSecond(long currentSecond) { + return currentSecond - (currentSecond % SECONDS_PER_FILE); + } + + /** + * Given a path to a data file of Spans, extract all spans and add them + * to the provided list. + */ + private static void addFileSpans(File f, List list) throws IOException { + DataFileReader reader = new DataFileReader(f, SPAN_READER); + Iterator it = reader.iterator(); + ArrayList spans = new ArrayList(); + while (it.hasNext()) { + spans.add(it.next()); + } + list.addAll(spans); + } + + /** + * Given a path to a data file of Spans, extract spans within a time period + * bounded by start and end. + */ + private static void addFileSpans(File f, List list, + long start, long end) throws IOException { + DataFileReader reader = new DataFileReader(f, SPAN_READER); + Iterator it = reader.iterator(); + ArrayList spans = new ArrayList(); + while (it.hasNext()) { + // See if this span occurred entirely in range + long startTime = 0; + long endTime = 0; + + Span test = it.next(); + for (TimestampedEvent e: test.events) { + if (e.event instanceof SpanEvent) { + SpanEvent ev = (SpanEvent) e.event; + switch (ev) { + case CLIENT_SEND: startTime = e.timeStamp; + case SERVER_RECV: startTime = e.timeStamp; + case CLIENT_RECV: endTime = e.timeStamp; + case SERVER_SEND: endTime = e.timeStamp; + } + } + } + if (startTime > start && endTime < end) { spans.add(test); } + } + list.addAll(spans); + } + + public static void setFileGranularityForTesting(int granularity) { + SECONDS_PER_FILE = granularity; + } + + public FileSpanStorage(boolean buffer, int compressionLevel) { this.writerEnabled = true; - this.writer = new Thread(new DiskWriterThread(outstanding, files, buffer)); + this.writer = new Thread(new DiskWriterThread( + outstanding, files, buffer, compressionLevel)); this.writer.start(); } @@ -214,26 +278,90 @@ public void addSpan(Span s) { public List getAllSpans() { ArrayList out = new ArrayList(); synchronized (this.files) { - for (File f: this.files) { - DataFileReader reader; + for (File f: this.files.values()) { try { - reader = new DataFileReader(f, SPAN_READER); + addFileSpans(f, out); } catch (IOException e) { - continue; // Skip if there is a problem with this file - } - Iterator it = reader.iterator(); - ArrayList spans = new ArrayList(); - while (it.hasNext()) { - spans.add(it.next()); + continue; } - out.addAll(spans); } } return out; } + + /** + * Clear all Span data stored by this plugin. + */ + public void clear() { + ArrayList out = new ArrayList(); + synchronized (this.files) { + for (Long l: new LinkedList(this.files.keySet())) { + File f = this.files.remove(l); + f.delete(); + } + } + } @Override public void setMaxSpans(long maxSpans) { this.maxSpans = maxSpans; } + + @Override + public List getSpansInRange(long start, long end) { + /* + * We first find the book-end files (first and last) whose Spans may + * or may not fit in the the range. Intermediary files can be directly + * passed, since they are completely within the time range. + * + * [ ] <-- Time range + * + * |-----++|+++++++|+++++++|+++++++|+++----|-------|---> + * \ / \ / + * start end + * file file + * + */ + List out = new ArrayList(); + List middleFiles = new LinkedList(); + + long startSecond = start / SpanStorage.NANOS_PER_SECOND; + long endSecond = end / SpanStorage.NANOS_PER_SECOND; + + int numFiles = (int) (endSecond - startSecond) / SECONDS_PER_FILE; + for (int i = 1; i < (numFiles); i++) { + middleFiles.add(startSecond + i * SECONDS_PER_FILE); + } + + synchronized (this.files) { + for (Long l: middleFiles) { + if (files.containsKey(l)) { + try { + addFileSpans(files.get(l), out); + } catch (IOException e) { + continue; + } + } + } + + // Start file + if (files.containsKey(startSecond)) { + try { + addFileSpans(files.get(startSecond), out, start, end); + } catch (IOException e) { + // Give up silently + } + } + + // End file + if (files.containsKey(endSecond)) { + try { + addFileSpans(files.get(endSecond), out, start, end); + } catch (IOException e) { + // Give up silently + } + } + } + return out; + } } diff --git a/lang/java/src/java/org/apache/avro/ipc/trace/InMemorySpanStorage.java b/lang/java/src/java/org/apache/avro/ipc/trace/InMemorySpanStorage.java index 7820088..7efb400 100644 --- a/lang/java/src/java/org/apache/avro/ipc/trace/InMemorySpanStorage.java +++ b/lang/java/src/java/org/apache/avro/ipc/trace/InMemorySpanStorage.java @@ -64,4 +64,10 @@ public void setMaxSpans(long maxSpans) { public List getAllSpans() { return (LinkedList) this.spans.clone(); } + + @Override + public List getSpansInRange(long start, long end) { + // TODO Auto-generated method stub + return null; + } } diff --git a/lang/java/src/java/org/apache/avro/ipc/trace/SpanStorage.java b/lang/java/src/java/org/apache/avro/ipc/trace/SpanStorage.java index a3eaf2a..f08d7c6 100644 --- a/lang/java/src/java/org/apache/avro/ipc/trace/SpanStorage.java +++ b/lang/java/src/java/org/apache/avro/ipc/trace/SpanStorage.java @@ -28,7 +28,9 @@ * */ public interface SpanStorage { - public static final long DEFAULT_MAX_SPANS = 10000; + public static final long DEFAULT_MAX_SPANS = 10000L; + public final static long MILLIS_PER_SECOND = 1000L; + public final static long NANOS_PER_SECOND = 1000000000L; /** * Add a span. @@ -45,4 +47,11 @@ public interface SpanStorage { * Return a list of all spans currently stored. For testing. */ List getAllSpans(); + + /** + * Return a list of all spans that fall within the time given range. + * @param start UNIX time (in nanoseconds) as a long + * @param end UNIX time (in nanoseconds) as a long + */ + List getSpansInRange(long start, long end); } diff --git a/lang/java/src/java/org/apache/avro/ipc/trace/TracePlugin.java b/lang/java/src/java/org/apache/avro/ipc/trace/TracePlugin.java index 6ddc51c..532717c 100644 --- a/lang/java/src/java/org/apache/avro/ipc/trace/TracePlugin.java +++ b/lang/java/src/java/org/apache/avro/ipc/trace/TracePlugin.java @@ -84,12 +84,23 @@ public TraceResponder(SpanStorage spanStorage) { public GenericArray getAllSpans() throws AvroRemoteException { List spans = this.spanStorage.getAllSpans(); GenericData.Array out; - synchronized (spans) { - out = new GenericData.Array(spans.size(), - Schema.createArray(Span.SCHEMA$)); - for (Span s: spans) { - out.add(s); - } + out = new GenericData.Array(spans.size(), + Schema.createArray(Span.SCHEMA$)); + for (Span s: spans) { + out.add(s); + } + return out; + } + + @Override + public GenericArray getSpansInRange(long start, long end) + throws AvroRemoteException { + List spans = this.spanStorage.getSpansInRange(start, end); + GenericData.Array out; + out = new GenericData.Array(spans.size(), + Schema.createArray(Span.SCHEMA$)); + for (Span s: spans) { + out.add(s); } return out; } @@ -143,7 +154,7 @@ public TracePlugin(TracePluginConfiguration conf) throws IOException { this.storage = new InMemorySpanStorage(); } else if (storageType == StorageType.DISK) { - this.storage = new FileSpanStorage(false); + this.storage = new FileSpanStorage(false, conf.compressionLevel); } else { // default this.storage = new InMemorySpanStorage(); diff --git a/lang/java/src/java/org/apache/avro/ipc/trace/TracePluginConfiguration.java b/lang/java/src/java/org/apache/avro/ipc/trace/TracePluginConfiguration.java index 2f4b4d5..073f32e 100644 --- a/lang/java/src/java/org/apache/avro/ipc/trace/TracePluginConfiguration.java +++ b/lang/java/src/java/org/apache/avro/ipc/trace/TracePluginConfiguration.java @@ -31,6 +31,8 @@ public class TracePluginConfiguration { public long maxSpans; // Max number of spans to store public boolean enabled; // Whether or not we are active public boolean buffer; // If disk storage, whether to buffer writes + public int compressionLevel; // If using file storage, what compression + // level (0-9). /** * Return a TracePluginConfiguration with default options. @@ -43,5 +45,6 @@ public TracePluginConfiguration() { this.maxSpans = 10000; this.enabled = true; this.buffer = true; + this.compressionLevel = 9; } } diff --git a/lang/java/src/java/org/apache/avro/ipc/trace/Util.java b/lang/java/src/java/org/apache/avro/ipc/trace/Util.java index 7bed89c..e3557c5 100644 --- a/lang/java/src/java/org/apache/avro/ipc/trace/Util.java +++ b/lang/java/src/java/org/apache/avro/ipc/trace/Util.java @@ -53,7 +53,6 @@ public static EnumSet getAllEvents(Span s) { return foundEvents; } - /** * Get the size of an RPC payload. */ diff --git a/lang/java/src/test/java/org/apache/avro/ipc/trace/TestFileSpanStorage.java b/lang/java/src/test/java/org/apache/avro/ipc/trace/TestFileSpanStorage.java index 65d04c0..82db581 100644 --- a/lang/java/src/test/java/org/apache/avro/ipc/trace/TestFileSpanStorage.java +++ b/lang/java/src/test/java/org/apache/avro/ipc/trace/TestFileSpanStorage.java @@ -19,6 +19,7 @@ package org.apache.avro.ipc.trace; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.util.ArrayList; @@ -35,8 +36,8 @@ public class TestFileSpanStorage { @Test public void testBasicStorage() { - SpanStorage test = new FileSpanStorage(false); - Span s = Util.createEventlessSpan(Util.IDValue(1), Util.IDValue(1), null); + FileSpanStorage test = new FileSpanStorage(false, 9); + Span s = Util.createEventlessSpan(Util.idValue(1), Util.idValue(1), null); s.messageName = new Utf8("message"); test.addSpan(s); try { @@ -45,15 +46,16 @@ public void testBasicStorage() { e.printStackTrace(); } assertTrue(test.getAllSpans().contains(s)); + test.clear(); } @Test public void testTonsOfSpans() { - SpanStorage test = new FileSpanStorage(false); + FileSpanStorage test = new FileSpanStorage(false, 9); test.setMaxSpans(100000); List spans = new ArrayList(50000); for (int i = 0; i < 50000; i++) { - Span s = Util.createEventlessSpan(Util.IDValue(i), Util.IDValue(i), null); + Span s = Util.createEventlessSpan(Util.idValue(i), Util.idValue(i), null); s.messageName = new Utf8("message"); test.addSpan(s); spans.add(s); @@ -64,13 +66,13 @@ public void testTonsOfSpans() { e.printStackTrace(); } assertEquals(50000, test.getAllSpans().size()); - + // Test fewer spans but explicitly call containsAll - SpanStorage test2 = new FileSpanStorage(false); + FileSpanStorage test2 = new FileSpanStorage(false, 9); test.setMaxSpans(100000); spans.clear(); for (int i = 0; i < 5000; i++) { - Span s = Util.createEventlessSpan(Util.IDValue(i), Util.IDValue(i), null); + Span s = Util.createEventlessSpan(Util.idValue(i), Util.idValue(i), null); s.messageName = new Utf8("message"); test2.addSpan(s); spans.add(s); @@ -81,24 +83,25 @@ public void testTonsOfSpans() { e.printStackTrace(); } assertTrue(test.getAllSpans().containsAll(spans)); - + test.clear(); + test2.clear(); } @Test public void testBasicMaxSpans() { - SpanStorage test = new FileSpanStorage(false); + FileSpanStorage test = new FileSpanStorage(false, 9); test.setMaxSpans(10); // Add a bunch of spans for (int i = 0; i < 100; i++) { - Span s = Util.createEventlessSpan(Util.IDValue(i), Util.IDValue(i), null); + Span s = Util.createEventlessSpan(Util.idValue(i), Util.idValue(i), null); s.messageName = new Utf8("message"); test.addSpan(s); } List lastNine = new LinkedList(); for (int i = 0; i < 9; i++) { - Span s = Util.createEventlessSpan(Util.IDValue(100 + i), Util.IDValue(100 + i), null); + Span s = Util.createEventlessSpan(Util.idValue(100 + i), Util.idValue(100 + i), null); s.messageName = new Utf8("message"); lastNine.add(s); test.addSpan(s); @@ -109,7 +112,67 @@ public void testBasicMaxSpans() { e.printStackTrace(); } List retreived = test.getAllSpans(); - assertTrue(retreived.size() == 9); + assertEquals(9, retreived.size()); assertTrue(retreived.containsAll(lastNine)); + + test.clear(); + } + + @Test + public void testRangeQuery1() { + FileSpanStorage test = new FileSpanStorage(false, 9); + test.setMaxSpans(10000); + FileSpanStorage.setFileGranularityForTesting(1); // Files every second + + long cutOff1 = 0; + long cutOff2 = 0; + + int numSpans = 10; + + Span[] spans = new Span[numSpans]; + // Add some spans + for (int i = 0; i < numSpans; i++) { + if (i == 1) { cutOff1 = (System.currentTimeMillis() - 20) * 1000000; } + + Span s = Util.createEventlessSpan(Util.idValue(i), Util.idValue(i), null); + TimestampedEvent te1 = new TimestampedEvent(); + te1.timeStamp = System.currentTimeMillis() * 1000000; + te1.event = SpanEvent.CLIENT_SEND; + + TimestampedEvent te2 = new TimestampedEvent(); + te2.timeStamp = System.currentTimeMillis() * 1000000; + te2.event = SpanEvent.CLIENT_RECV; + s.events.add(te1); + s.events.add(te2); + + s.messageName = new Utf8("message"); + test.addSpan(s); + spans[i] = s; + + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + + } + if (i == numSpans - 2) { + cutOff2 = (System.currentTimeMillis() - 20) * 1000000; + } + } + + try { + Thread.sleep(100); + } catch (InterruptedException e) { + } + + List retrieved = test.getSpansInRange(cutOff1, cutOff2); + assertEquals(numSpans - 2, retrieved.size()); + + assertFalse(retrieved.contains(spans[0])); + for (int j=1; j < numSpans - 2; j++) { + assertTrue(retrieved.contains(spans[j])); + } + assertFalse(retrieved.contains((spans[spans.length - 1]))); + + test.clear(); } } \ No newline at end of file diff --git a/share/schemas/org/apache/avro/ipc/trace/avroTrace.avdl b/share/schemas/org/apache/avro/ipc/trace/avroTrace.avdl index 7598929..6492c57 100644 --- a/share/schemas/org/apache/avro/ipc/trace/avroTrace.avdl +++ b/share/schemas/org/apache/avro/ipc/trace/avroTrace.avdl @@ -33,5 +33,14 @@ protocol AvroTrace { boolean complete; // Whether includes data from both sides } + /** + * Get all spans stored on this host. + */ array getAllSpans(); + + /** + * Get spans occuring between start and end. Each is a unix timestamp + * in nanosecond units (for consistency with TimestampedEvent). + */ + array getSpansInRange(long start, long end); } diff --git a/share/schemas/org/apache/avro/ipc/trace/avroTrace.avpr b/share/schemas/org/apache/avro/ipc/trace/avroTrace.avpr index 474d37d..041f3e8 100644 --- a/share/schemas/org/apache/avro/ipc/trace/avroTrace.avpr +++ b/share/schemas/org/apache/avro/ipc/trace/avroTrace.avpr @@ -64,6 +64,19 @@ "type" : "array", "items" : "Span" } + }, + "getSpansInRange" : { + "request" : [ { + "name" : "start", + "type" : "long" + }, { + "name" : "end", + "type" : "long" + } ], + "response" : { + "type" : "array", + "items" : "Span" + } } } } \ No newline at end of file