Skip to content

Commit

Permalink
More stuff for file storage
Browse files Browse the repository at this point in the history
  • Loading branch information
Patrick Wendell committed Aug 4, 2010
1 parent 70150e3 commit c3127da
Show file tree
Hide file tree
Showing 9 changed files with 303 additions and 62 deletions.
210 changes: 169 additions & 41 deletions lang/java/src/java/org/apache/avro/ipc/trace/FileSpanStorage.java
Expand Up @@ -25,9 +25,8 @@
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.TreeMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;

import org.apache.avro.file.CodecFactory;
Expand All @@ -44,13 +43,15 @@
*/
public class FileSpanStorage implements SpanStorage {
/*
* We use rolling Avro DataFiles that store Span data associated with ten
* minute chunks. Because we enforce an upper limit on the number of spans
* stored, simply drop oldest file if and when the next write causes us to
* exceed that limit.
* We use rolling Avro data files that store Span data associated with ten
* minute chunks (this provides a simple way to index on time). Because we
* enforce an upper limit on the number of spans stored, simply drop
* oldest file if and when the next write causes us to exceed that limit. This
* approximates a FIFO queue of spans, which is basically what we want to
* maintain.
*
* Focus is on efficiency since most logic occurs every
* time a span is recorded (that is, every RPC call!).
* time a span is recorded (that is, every RPC call).
*
* We never want to block on span adding operations, which occur in the same
* thread as the Requestor. We are okay to block on span retrieving
Expand All @@ -60,12 +61,12 @@ public class FileSpanStorage implements SpanStorage {
*/

private class DiskWriterThread implements Runnable {

/** Shared Span queue. Read-only for this thread. */
private BlockingQueue<Span> outstanding;

/** Shared queue of files currently in view. Read/write for this thread. */
private Queue<File> files;
private TreeMap<Long, File> files;

/** How many Spans already written to each file. */
private HashMap<File, Long> spansPerFile = new HashMap<File, Long>();
Expand All @@ -82,16 +83,22 @@ private class DiskWriterThread implements Runnable {
/** Whether to buffer file writes.*/
private boolean doBuffer;

/** Compression level for files. */
private int compressionLevel;

/**
* Thread that runs continuously and writes outstanding requests to
* Avro files. This thread also deals with rolling files over and dropping
* old files when the span limit is reached.
* @param compressionLevel
*/
public DiskWriterThread(BlockingQueue<Span> outstanding,
Queue<File> files, boolean buffer) {
TreeMap<Long, File> files, boolean buffer,
int compressionLevel) {
this.outstanding = outstanding;
this.files = files;
this.doBuffer = buffer;
this.compressionLevel = compressionLevel;
}

public void run() {
Expand All @@ -100,15 +107,16 @@ public void run() {
try {
s = this.outstanding.take();
} catch (InterruptedException e1) {
continue;
continue; // should not be interrupted
}
try {
assureCurrentWriter();
this.currentWriter.append(s);
if (!this.doBuffer) this.currentWriter.flush();
this.spansSoFar += 1;
long fileSpans = this.spansPerFile.get(this.files.peek());
this.spansPerFile.put(this.files.peek(),fileSpans + 1);
File latest = this.files.lastEntry().getValue();
long fileSpans = this.spansPerFile.get(latest);
this.spansPerFile.put(latest, fileSpans + 1);
} catch (IOException e) {
// Fail silently if can't write
}
Expand All @@ -127,11 +135,9 @@ private void assureCurrentWriter() throws IOException {
// Will we overshoot policy?
while (this.spansSoFar >= maxSpans) {
File oldest = null;
// If spansSoFar is positive, there must be at least one file
synchronized (this.files) {
oldest = this.files.poll();
}
if (oldest == null) {
throw new RuntimeException("Bad state.");
oldest = this.files.remove(this.files.firstKey());
}
this.spansSoFar -= spansPerFile.get(oldest);
spansPerFile.remove(oldest);
Expand All @@ -143,33 +149,37 @@ private void assureCurrentWriter() throws IOException {
currentTimestamp = (long) 0;
currentWriter = null;
}
long rightNow = System.currentTimeMillis() / 1000L;

// What file should we be in
long cutOff = floorSecond(rightNow);

long currentStamp = System.currentTimeMillis() / 1000L;
long cutOff = currentStamp - (currentStamp % SECONDS_PER_FILE);
if (currentWriter == null) {
createNewFile = true;
}
else if (cutOff > (currentTimestamp + SECONDS_PER_FILE)) {
// Test for roll-over.
else if (cutOff >= (currentTimestamp + SECONDS_PER_FILE)) {
currentWriter.close();
createNewFile = true;
}

if (createNewFile) {
File newFile = new File(TRACE_FILE_DIR + "/" +
Thread.currentThread().getId() + "_" + currentStamp + FILE_SUFFIX);
Thread.currentThread().getId() + "_" + cutOff + FILE_SUFFIX);
synchronized (this.files) {
this.files.add(newFile);
this.files.put(cutOff, newFile);
}
this.spansPerFile.put(newFile, (long) 0);
this.currentWriter = new DataFileWriter<Span>(SPAN_WRITER);
this.currentWriter.setCodec(CodecFactory.deflateCodec(9));
this.currentWriter.setCodec(CodecFactory.deflateCodec(compressionLevel));
this.currentWriter.create(Span.SCHEMA$, newFile);
currentTimestamp = cutOff;
this.currentTimestamp = cutOff;
}
}
}

/** Granularity of file chunks. */
private final static int SECONDS_PER_FILE = 60 * 10; // ten minute chunks
private static int SECONDS_PER_FILE = 60 * 10; // ten minute chunks

/** Directory of data files */
private static String TRACE_FILE_DIR = "/tmp";
Expand All @@ -180,13 +190,10 @@ else if (cutOff > (currentTimestamp + SECONDS_PER_FILE)) {
private final static SpecificDatumReader<Span> SPAN_READER =
new SpecificDatumReader<Span>(Span.class);

/** How frequently to poll for new Spans, in Milliseconds */
public final static long POLL_FREQUNECY_MILLIS = 50;

private long maxSpans = DEFAULT_MAX_SPANS;

/** Shared queue of files currently in view. This thread only reads.*/
private Queue<File> files = new LinkedList<File>();
private TreeMap<Long, File> files = new TreeMap<Long, File>();

/** Shared Span queue. This thread only writes. */
LinkedBlockingQueue<Span> outstanding = new LinkedBlockingQueue<Span>();
Expand All @@ -195,9 +202,66 @@ else if (cutOff > (currentTimestamp + SECONDS_PER_FILE)) {
Thread writer;
Boolean writerEnabled;

public FileSpanStorage(boolean buffer) {
/**
* Return the head of the time bucket associated with this specific time.
*/
private static long floorSecond(long currentSecond) {
return currentSecond - (currentSecond % SECONDS_PER_FILE);
}

/**
* Given a path to a data file of Spans, extract all spans and add them
* to the provided list.
*/
private static void addFileSpans(File f, List<Span> list) throws IOException {
DataFileReader<Span> reader = new DataFileReader<Span>(f, SPAN_READER);
Iterator<Span> it = reader.iterator();
ArrayList<Span> spans = new ArrayList<Span>();
while (it.hasNext()) {
spans.add(it.next());
}
list.addAll(spans);
}

/**
* Given a path to a data file of Spans, extract spans within a time period
* bounded by start and end.
*/
private static void addFileSpans(File f, List<Span> list,
long start, long end) throws IOException {
DataFileReader<Span> reader = new DataFileReader<Span>(f, SPAN_READER);
Iterator<Span> it = reader.iterator();
ArrayList<Span> spans = new ArrayList<Span>();
while (it.hasNext()) {
// See if this span occurred entirely in range
long startTime = 0;
long endTime = 0;

Span test = it.next();
for (TimestampedEvent e: test.events) {
if (e.event instanceof SpanEvent) {
SpanEvent ev = (SpanEvent) e.event;
switch (ev) {
case CLIENT_SEND: startTime = e.timeStamp;
case SERVER_RECV: startTime = e.timeStamp;
case CLIENT_RECV: endTime = e.timeStamp;
case SERVER_SEND: endTime = e.timeStamp;
}
}
}
if (startTime > start && endTime < end) { spans.add(test); }
}
list.addAll(spans);
}

public static void setFileGranularityForTesting(int granularity) {
SECONDS_PER_FILE = granularity;
}

public FileSpanStorage(boolean buffer, int compressionLevel) {
this.writerEnabled = true;
this.writer = new Thread(new DiskWriterThread(outstanding, files, buffer));
this.writer = new Thread(new DiskWriterThread(
outstanding, files, buffer, compressionLevel));
this.writer.start();
}

Expand All @@ -214,26 +278,90 @@ public void addSpan(Span s) {
public List<Span> getAllSpans() {
ArrayList<Span> out = new ArrayList<Span>();
synchronized (this.files) {
for (File f: this.files) {
DataFileReader<Span> reader;
for (File f: this.files.values()) {
try {
reader = new DataFileReader<Span>(f, SPAN_READER);
addFileSpans(f, out);
} catch (IOException e) {
continue; // Skip if there is a problem with this file
}
Iterator<Span> it = reader.iterator();
ArrayList<Span> spans = new ArrayList<Span>();
while (it.hasNext()) {
spans.add(it.next());
continue;
}
out.addAll(spans);
}
}
return out;
}

/**
* Clear all Span data stored by this plugin.
*/
public void clear() {
ArrayList<Span> out = new ArrayList<Span>();
synchronized (this.files) {
for (Long l: new LinkedList<Long>(this.files.keySet())) {
File f = this.files.remove(l);
f.delete();
}
}
}

@Override
public void setMaxSpans(long maxSpans) {
this.maxSpans = maxSpans;
}

@Override
public List<Span> getSpansInRange(long start, long end) {
/*
* We first find the book-end files (first and last) whose Spans may
* or may not fit in the the range. Intermediary files can be directly
* passed, since they are completely within the time range.
*
* [ ] <-- Time range
*
* |-----++|+++++++|+++++++|+++++++|+++----|-------|--->
* \ / \ /
* start end
* file file
*
*/
List<Span> out = new ArrayList<Span>();
List<Long> middleFiles = new LinkedList<Long>();

long startSecond = start / SpanStorage.NANOS_PER_SECOND;
long endSecond = end / SpanStorage.NANOS_PER_SECOND;

int numFiles = (int) (endSecond - startSecond) / SECONDS_PER_FILE;
for (int i = 1; i < (numFiles); i++) {
middleFiles.add(startSecond + i * SECONDS_PER_FILE);
}

synchronized (this.files) {
for (Long l: middleFiles) {
if (files.containsKey(l)) {
try {
addFileSpans(files.get(l), out);
} catch (IOException e) {
continue;
}
}
}

// Start file
if (files.containsKey(startSecond)) {
try {
addFileSpans(files.get(startSecond), out, start, end);
} catch (IOException e) {
// Give up silently
}
}

// End file
if (files.containsKey(endSecond)) {
try {
addFileSpans(files.get(endSecond), out, start, end);
} catch (IOException e) {
// Give up silently
}
}
}
return out;
}
}
Expand Up @@ -64,4 +64,10 @@ public void setMaxSpans(long maxSpans) {
public List<Span> getAllSpans() {
return (LinkedList<Span>) this.spans.clone();
}

@Override
public List<Span> getSpansInRange(long start, long end) {
// TODO Auto-generated method stub
return null;
}
}
11 changes: 10 additions & 1 deletion lang/java/src/java/org/apache/avro/ipc/trace/SpanStorage.java
Expand Up @@ -28,7 +28,9 @@
*
*/
public interface SpanStorage {
public static final long DEFAULT_MAX_SPANS = 10000;
public static final long DEFAULT_MAX_SPANS = 10000L;
public final static long MILLIS_PER_SECOND = 1000L;
public final static long NANOS_PER_SECOND = 1000000000L;

/**
* Add a span.
Expand All @@ -45,4 +47,11 @@ public interface SpanStorage {
* Return a list of all spans currently stored. For testing.
*/
List<Span> getAllSpans();

/**
* Return a list of all spans that fall within the time given range.
* @param start UNIX time (in nanoseconds) as a long
* @param end UNIX time (in nanoseconds) as a long
*/
List<Span> getSpansInRange(long start, long end);
}

0 comments on commit c3127da

Please sign in to comment.