Skip to content
This repository has been archived by the owner on Jul 28, 2019. It is now read-only.

Commit

Permalink
FLUME-305: Fix code that causes TestCollectorSink to fail intermittently
Browse files Browse the repository at this point in the history
  • Loading branch information
Jonathan Hsieh committed May 2, 2011
1 parent e19dcc9 commit 1a633ee
Showing 1 changed file with 54 additions and 4 deletions.
Expand Up @@ -29,6 +29,7 @@
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.log4j.Level;
import org.junit.Before;
Expand Down Expand Up @@ -474,32 +475,81 @@ public void testHdfsDownInterruptAfterOpen() throws FlumeSpecException,

final CountDownLatch started = new CountDownLatch(1);
final CountDownLatch done = new CountDownLatch(1);

final AtomicReference<Exception> are = new AtomicReference(null);
Thread t = new Thread("append thread") {
public void run() {
Event e = new EventImpl("foo".getBytes());
try {
snk.open();
started.countDown();
snk.append(e);
} catch (IOException e1) {
} catch (Exception e1) {
// could be an exception but we don't care.
LOG.info("don't care about this exception: ", e1);
} catch (InterruptedException e1) {
// TODO Auto-generated catch block
are.set(e1);
}
done.countDown();
}
};
t.start();
boolean begun = started.await(60, TimeUnit.SECONDS);
assertTrue("took too long to start", begun);

// there is a race between this close call and the append call inside the
// thread. In this test we only want to verify that this exits in a
// reasonable amount of time.

snk.close();
LOG.info("Interrupting appending thread");
t.interrupt();
boolean completed = done.await(60, TimeUnit.SECONDS);
assertTrue("Timed out when attempting to shutdown", completed);
}

/**
* This tests close() and interrupt on a collectorSink in such a way that
* close happens while a append call is blocked.
*/
@Test
public void testHdfsDownInterruptBlockedAppend() throws FlumeSpecException,
IOException, InterruptedException {
final EventSink snk = FlumeBuilder.buildSink(new Context(),
"collectorSink(\"hdfs://nonexistant/user/foo\", \"foo\")");

final CountDownLatch started = new CountDownLatch(1);
final CountDownLatch done = new CountDownLatch(1);

final AtomicReference<Exception> are = new AtomicReference(null);
Thread t = new Thread("append thread") {
public void run() {
Event e = new EventImpl("foo".getBytes());
try {
snk.open();
started.countDown();
snk.append(e);
} catch (Exception e1) {
e1.printStackTrace();
are.set(e1);
}
done.countDown();
}
};
t.start();
boolean begun = started.await(60, TimeUnit.SECONDS);
assertTrue("took too long to start", begun);

// there is a race between this close call and the append call inside the
// thread. This sleep call should give enough to cause the append to get
// stuck.
Clock.sleep(1000);

snk.close();
LOG.info("Interrupting appending thread");
t.interrupt();
boolean completed = done.await(60, TimeUnit.SECONDS);
assertTrue("Timed out when attempting to shutdown", completed);
assertTrue("Expected exit due to interrupted exception",
are.get() instanceof InterruptedException);
}

/**
Expand Down

0 comments on commit 1a633ee

Please sign in to comment.