Skip to content

Commit

Permalink
Flume-871. Background HDFS append with configurable wait timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
pmujumdar committed Dec 7, 2011
1 parent fde0a69 commit 66f3078
Show file tree
Hide file tree
Showing 4 changed files with 291 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,14 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.flume.Channel;
import org.apache.flume.Context;
Expand Down Expand Up @@ -59,6 +67,7 @@ public class HDFSEventSink extends AbstractSink implements PollableSink,
static final String defaultFileType = HDFSWriterFactory.SequenceFileType;
static final int defaultMaxOpenFiles = 5000;
static final String defaultWriteFormat = HDFSFormatterFactory.hdfsWritableFormat;
static final long defaultAppendTimeout = 1000;

private long rollInterval;
private long rollSize;
Expand All @@ -71,7 +80,9 @@ public class HDFSEventSink extends AbstractSink implements PollableSink,
private String path;
private int maxOpenFiles;
private String writeFormat;
private HDFSWriterFactory myWriterFactory;
private HDFSWriterFactory myWriterFactory;
private ExecutorService executor;
private long appendTimeout;

/*
* Extended Java LinkedHashMap for open file handle LRU queue We want to clear
Expand Down Expand Up @@ -128,6 +139,7 @@ public void configure(Context context) {
String fileType = context.get("hdfs.fileType", String.class);
String maxOpenFiles = context.get("hdfs.maxOpenFiles", String.class);
String writeFormat = context.get("hdfs.writeFormat", String.class);
String appendTimeout = context.get("hdfs.appendTimeout", String.class);

if (fileName == null)
fileName = defaultFileName;
Expand Down Expand Up @@ -190,6 +202,12 @@ public void configure(Context context) {
} else {
this.writeFormat = writeFormat;
}

if (appendTimeout == null) {
this.appendTimeout = defaultAppendTimeout;
} else {
this.appendTimeout = Long.parseLong(appendTimeout);
}
}

private static boolean codecMatches(Class<? extends CompressionCodec> cls,
Expand Down Expand Up @@ -246,6 +264,50 @@ private static CompressionCodec getCodec(String codecName) {
return codec;
}

/*
* Execute the append on a separate thread and wait for the completion for the specified amount of time
* In case of timeout, cancel the append and throw an IOException
*/
private BucketFlushStatus backgroundAppend(final BucketWriter bw, final Event e) throws IOException, InterruptedException {
Future<BucketFlushStatus> future = executor.submit(new Callable<BucketFlushStatus>() {
public BucketFlushStatus call() throws Exception {
return bw.append(e);
}
});

try {
if (appendTimeout > 0) {
return future.get(appendTimeout, TimeUnit.MILLISECONDS);
} else {
return future.get();
}
} catch (TimeoutException eT) {
future.cancel(true);
throw new IOException("Append timed out", eT);
} catch (ExecutionException e1) {
Throwable cause = e1.getCause();
if (cause instanceof IOException) {
throw (IOException) cause;
} else if (cause instanceof InterruptedException) {
throw (InterruptedException) cause;
} else if (cause instanceof RuntimeException) {
throw (RuntimeException) cause;
} else {
// we have a throwable that is not an exception. (such as a
// NoClassDefFoundError)
LOG.error("Got a throwable that is not an exception! Bailing out!",
e1.getCause());
throw new RuntimeException(e1.getCause());
}
} catch (CancellationException ce) {
throw new InterruptedException(
"Blocked append interrupted by rotation event");
} catch (InterruptedException ex) {
LOG.warn("Unexpected Exception " + ex.getMessage(), ex);
throw (InterruptedException) ex;
}
}

/**
* Pull events out of channel and send it to HDFS - take at the most
* txnEventMax, that's the maximum #events to hold in channel for a given
Expand Down Expand Up @@ -281,7 +343,7 @@ public Status process() throws EventDeliveryException {
}

// Write the data to HDFS
syncedUp = bw.append(event);
syncedUp = backgroundAppend(bw, event);

// keep track of the files in current batch that are not flushed
// we need to flush all those at the end of the transaction
Expand Down Expand Up @@ -326,11 +388,22 @@ public void stop() {
} catch (IOException eIO) {
LOG.warn("IOException in opening file", eIO);
}
executor.shutdown();
try {
while (executor.isTerminated() == false) {
executor.awaitTermination(defaultAppendTimeout, TimeUnit.MILLISECONDS);
}
} catch (InterruptedException ex) {
LOG.warn("shutdown interrupted" + ex.getMessage(), ex);
}

executor = null;
super.stop();
}

@Override
public void start() {
executor = Executors.newFixedThreadPool(1);
for (Entry<String, BucketWriter> e : sfWriters.entrySet()) {
try {
e.getValue().open();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ public void append(Event e, FlumeFormatter fmt) throws IOException {

if (e.getHeaders().containsKey("fault")) {
throw new IOException("Injected fault");
} else if (e.getHeaders().containsKey("slow")) {
long waitTime = Long.parseLong(e.getHeaders().get("slow"));
try {
Thread.sleep(waitTime);
} catch (InterruptedException eT) {
throw new IOException("append interrupted", eT);
}
}
super.append(e, fmt);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,15 @@ public void append(Event e, FlumeFormatter fmt) throws IOException {
} else if (e.getHeaders().containsKey("fault-once")) {
e.getHeaders().remove("fault-once");
throw new IOException("Injected fault");
} else if (e.getHeaders().containsKey("slow")) {
long waitTime = Long.parseLong(e.getHeaders().get("slow"));
try {
Thread.sleep(waitTime);
} catch (InterruptedException eT) {
throw new IOException("append interrupted", eT);
}
}

super.append(e, fmt);
}

Expand Down
Loading

0 comments on commit 66f3078

Please sign in to comment.