Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

FLUME-1748: HDFS Sink should check if the thread is interrupted befor…

…e performing any HDFS operations

(Hari Shreedharan via Brock Noland)
  • Loading branch information...
commit aa549c4f27db848cb8900533fd0f16562d971aa2 1 parent 97ed09e
Brock Noland authored
View
31 flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
@@ -165,6 +165,7 @@ private void resetCounters() {
/**
* open() is called by append()
* @throws IOException
+ * @throws InterruptedException
*/
private void open() throws IOException, InterruptedException {
runPrivileged(new PrivilegedExceptionAction<Void>() {
@@ -178,8 +179,9 @@ public Void run() throws Exception {
/**
* doOpen() must only be called by open()
* @throws IOException
+ * @throws InterruptedException
*/
- private void doOpen() throws IOException {
+ private void doOpen() throws IOException, InterruptedException {
if ((filePath == null) || (writer == null) || (formatter == null)) {
throw new IOException("Invalid file settings");
}
@@ -194,6 +196,7 @@ private void doOpen() throws IOException {
// NOTE: tried synchronizing on the underlying Kerberos principal previously
// which caused deadlocks. See FLUME-1231.
synchronized (staticLock) {
+ checkAndThrowInterruptedException();
try {
long counter = fileExtensionCounter.incrementAndGet();
if (codeC == null) {
@@ -252,8 +255,10 @@ public Void call() throws Exception {
* Close the file handle and rename the temp file to the permanent filename.
* Safe to call multiple times. Logs HDFSWriter.close() exceptions.
* @throws IOException On failure to rename if temp file exists.
+ * @throws InterruptedException
*/
public synchronized void close() throws IOException, InterruptedException {
+ checkAndThrowInterruptedException();
flush();
runPrivileged(new PrivilegedExceptionAction<Void>() {
public Void run() throws Exception {
@@ -302,8 +307,11 @@ private void doClose() throws IOException {
/**
* flush the data
+ * @throws IOException
+ * @throws InterruptedException
*/
public synchronized void flush() throws IOException, InterruptedException {
+ checkAndThrowInterruptedException();
if (!isBatchComplete()) {
runPrivileged(new PrivilegedExceptionAction<Void>() {
public Void run() throws Exception {
@@ -354,8 +362,13 @@ private void doFlush() throws IOException {
* We rotate before append, and not after, so that the active file rolling
* mechanism will never roll an empty file. This also ensures that the file
* creation time reflects when the first event was written.
+ *
+ * @throws IOException
+ * @throws InterruptedException
*/
- public synchronized void append(Event event) throws IOException, InterruptedException {
+ public synchronized void append(Event event)
+ throws IOException, InterruptedException {
+ checkAndThrowInterruptedException();
if (!isOpen) {
if(idleClosed) {
throw new IOException("This bucket writer was closed due to idling and this handle " +
@@ -442,4 +455,18 @@ private boolean isBatchComplete() {
void setClock(Clock clock) {
this.clock = clock;
}
+
+ /**
+ * This method if the current thread has been interrupted and throws an
+ * exception.
+ * @throws InterruptedException
+ */
+ private static void checkAndThrowInterruptedException()
+ throws InterruptedException {
+ if (Thread.currentThread().interrupted()) {
+ throw new InterruptedException("Timed out before HDFS call was made. "
+ + "Your hdfs.callTimeout might be set too low or HDFS calls are "
+ + "taking too long.");
+ }
+ }
}
Please sign in to comment.
Something went wrong with that request. Please try again.