Skip to content

Commit

Permalink
[hotfix] [tests] Increase resilience of ProcessFailureRecoveryTests
Browse files Browse the repository at this point in the history
  • Loading branch information
StephanEwen committed Sep 17, 2015
1 parent 77989d3 commit d15fd33
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 10 deletions.
Expand Up @@ -32,6 +32,7 @@
import org.apache.flink.runtime.net.NetUtils;
import org.apache.flink.runtime.taskmanager.TaskManager;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -49,6 +50,7 @@
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;

import static org.apache.flink.runtime.testutils.CommonTestUtils.getCurrentClasspath;
import static org.apache.flink.runtime.testutils.CommonTestUtils.getJavaCommandPath;
Expand All @@ -68,7 +70,7 @@
* guaranteed to remain empty (all tasks are already deployed) and kills one of
* the original task managers. The recovery should restart the tasks on the new TaskManager.
*/
public abstract class AbstractProcessFailureRecoveryTest {
public abstract class AbstractProcessFailureRecoveryTest extends TestLogger {

protected static final String READY_MARKER_FILE_PREFIX = "ready_";
protected static final String PROCEED_MARKER_FILE = "proceed";
Expand Down Expand Up @@ -147,7 +149,7 @@ public void testTaskManagerProcessFailure() {
// the program will very slowly consume elements until the marker file (later created by the
// test driver code) is present
final File coordinateDirClosure = coordinateTempDir;
final Throwable[] errorRef = new Throwable[1];
final AtomicReference<Throwable> errorRef = new AtomicReference<>();

// we trigger program execution in a separate thread
Thread programTrigger = new Thread("Program Trigger") {
Expand All @@ -158,7 +160,7 @@ public void run() {
}
catch (Throwable t) {
t.printStackTrace();
errorRef[0] = t;
errorRef.set(t);
}
}
};
Expand All @@ -168,7 +170,18 @@ public void run() {

// wait until all marker files are in place, indicating that all tasks have started
// max 20 seconds
waitForMarkerFiles(coordinateTempDir, PARALLELISM, 20000);
if (!waitForMarkerFiles(coordinateTempDir, PARALLELISM, 120000)) {
// check if the program failed for some reason
if (errorRef.get() != null) {
Throwable error = errorRef.get();
error.printStackTrace();
fail("The program encountered a " + error.getClass().getSimpleName() + " : " + error.getMessage());
}
else {
// no error occurred, simply a timeout
fail("The tasks were not started within time (" + 120000 + "msecs)");
}
}

// start the third TaskManager
taskManagerProcess3 = new ProcessBuilder(command).start();
Expand All @@ -192,8 +205,8 @@ public void run() {
assertFalse("The program did not finish in time", programTrigger.isAlive());

// check whether the program encountered an error
if (errorRef[0] != null) {
Throwable error = errorRef[0];
if (errorRef.get() != null) {
Throwable error = errorRef.get();
error.printStackTrace();
fail("The program encountered a " + error.getClass().getSimpleName() + " : " + error.getMessage());
}
Expand Down Expand Up @@ -316,7 +329,7 @@ protected static void touchFile(File file) throws IOException {
}
}

protected static void waitForMarkerFiles(File basedir, int num, long timeout) {
protected static boolean waitForMarkerFiles(File basedir, int num, long timeout) {
long now = System.currentTimeMillis();
final long deadline = now + timeout;

Expand All @@ -333,7 +346,7 @@ protected static void waitForMarkerFiles(File basedir, int num, long timeout) {
}

if (allFound) {
return;
return true;
}
else {
// not all found, wait for a bit
Expand All @@ -348,7 +361,7 @@ protected static void waitForMarkerFiles(File basedir, int num, long timeout) {
}
}

fail("The tasks were not started within time (" + timeout + "msecs)");
return false;
}

// --------------------------------------------------------------------------------------------
Expand Down
Expand Up @@ -83,7 +83,7 @@ public Long map(Long value) throws Exception {
}
}).startNewChain()
// populate the coordinate directory so we can proceed to TaskManager failure
.map(new Mapper(coordinateDir));
.map(new Mapper(coordinateDir));

//write result to temporary file
result.addSink(new CheckpointedSink(DATA_COUNT));
Expand Down

0 comments on commit d15fd33

Please sign in to comment.