Skip to content

Commit

Permalink
[FLINK-2392][yarn tests] Use log event which is guaranteed to show up
Browse files Browse the repository at this point in the history
to test whether the job was successful
  • Loading branch information
rmetzger committed Sep 22, 2015
1 parent 5e6175d commit 6d1656a
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 17 deletions.
Expand Up @@ -63,8 +63,17 @@ public static void addTestAppender(Class target, Level level) {
}

public static void checkForLogString(String expected) {
LoggingEvent found = getEventContainingString(expected);
if(found != null) {
LOG.info("Found expected string '"+expected+"' in log message "+found);
return;
}
Assert.fail("Unable to find expected string '" + expected + "' in log messages");
}

public static LoggingEvent getEventContainingString(String expected) {
if(testAppender == null) {
throw new NullPointerException("Initialize it first");
throw new NullPointerException("Initialize test appender first");
}
LoggingEvent found = null;
for(LoggingEvent event: testAppender.events) {
Expand All @@ -73,11 +82,7 @@ public static void checkForLogString(String expected) {
break;
}
}
if(found != null) {
LOG.info("Found expected string '"+expected+"' in log message "+found);
return;
}
Assert.fail("Unable to find expected string '" + expected + "' in log messages");
return found;
}

public static class TestAppender extends AppenderSkeleton {
Expand Down
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.client.FlinkYarnSessionCli;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.client.JobClient;
import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus;
Expand Down Expand Up @@ -408,6 +409,7 @@ public void testfullAlloc() {
@Test
public void perJobYarnCluster() {
LOG.info("Starting perJobYarnCluster()");
addTestAppender(JobClient.class, Level.INFO);
File exampleJarLocation = YarnTestBase.findFile("..", new ContainsName(new String[] {"-WordCount.jar"} , "streaming")); // exclude streaming wordcount here.
Assert.assertNotNull("Could not find wordcount jar", exampleJarLocation);
runWithArgs(new String[]{"run", "-m", "yarn-cluster",
Expand All @@ -417,10 +419,10 @@ public void perJobYarnCluster() {
"-yjm", "768",
"-ytm", "1024", exampleJarLocation.getAbsolutePath()},
/* test succeeded after this string */
"Job execution switched to status FINISHED.",
"Job execution complete",
/* prohibited strings: (we want to see (2/2)) */
new String[]{"System.out)(1/1) switched to FINISHED "},
RunTypes.CLI_FRONTEND, 0);
RunTypes.CLI_FRONTEND, 0, true);
LOG.info("Finished perJobYarnCluster()");
}

Expand All @@ -430,6 +432,9 @@ public void perJobYarnCluster() {
@Test
public void perJobYarnClusterWithParallelism() {
LOG.info("Starting perJobYarnClusterWithParallelism()");
// write log messages to stdout as well, so that the runWithArgs() method
// is catching the log output
addTestAppender(JobClient.class, Level.INFO);
File exampleJarLocation = YarnTestBase.findFile("..", new ContainsName(new String[] {"-WordCount.jar"}, "streaming")); // exclude streaming wordcount here.
Assert.assertNotNull("Could not find wordcount jar", exampleJarLocation);
runWithArgs(new String[]{"run",
Expand All @@ -440,10 +445,10 @@ public void perJobYarnClusterWithParallelism() {
"-yjm", "768",
"-ytm", "1024", exampleJarLocation.getAbsolutePath()},
/* test succeeded after this string */
"Job execution switched to status FINISHED.",
"Job execution complete",
/* prohibited strings: (we want to see (2/2)) */
new String[]{"System.out)(1/1) switched to FINISHED "},
RunTypes.CLI_FRONTEND, 0);
RunTypes.CLI_FRONTEND, 0, true);
LOG.info("Finished perJobYarnClusterWithParallelism()");
}

Expand Down
Expand Up @@ -34,6 +34,7 @@
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.log4j.spi.LoggingEvent;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
Expand Down Expand Up @@ -240,8 +241,8 @@ public static File writeYarnSiteConfigXML(Configuration yarnConf) throws IOExcep
*/
public static void ensureNoProhibitedStringInLogFiles(final String[] prohibited, final String[] whitelisted) {
File cwd = new File("target/"+yarnConfiguration.get(TEST_CLUSTER_NAME_KEY));
Assert.assertTrue("Expecting directory "+cwd.getAbsolutePath()+" to exist", cwd.exists());
Assert.assertTrue("Expecting directory "+cwd.getAbsolutePath()+" to be a directory", cwd.isDirectory());
Assert.assertTrue("Expecting directory " + cwd.getAbsolutePath() + " to exist", cwd.exists());
Assert.assertTrue("Expecting directory " + cwd.getAbsolutePath() + " to be a directory", cwd.isDirectory());

File foundFile = findFile(cwd.getAbsolutePath(), new FilenameFilter() {
@Override
Expand Down Expand Up @@ -390,7 +391,7 @@ protected Runner startWithArgs(String[] args, String startedAfterString, RunType
final int START_TIMEOUT_SECONDS = 60;

Runner runner = new Runner(args, type);
runner.setName("Frontend (CLI/YARN Client) runner thread (runWithArgs()).");
runner.setName("Frontend (CLI/YARN Client) runner thread (startWithArgs()).");
runner.start();

for(int second = 0; second < START_TIMEOUT_SECONDS; second++) {
Expand All @@ -414,10 +415,19 @@ protected Runner startWithArgs(String[] args, String startedAfterString, RunType
return null;
}

protected void runWithArgs(String[] args, String terminateAfterString, String[] failOnStrings, RunTypes type, int returnCode) {
runWithArgs(args,terminateAfterString, failOnStrings, type, returnCode, false);
}
/**
* The test has been passed once the "terminateAfterString" has been seen.
* @param args Command line arguments for the runner
* @param terminateAfterString the runner is searching the stdout and stderr for this string. as soon as it appears, the test has passed
* @param failOnStrings The runner is searching stdout and stderr for the strings specified here. If one appears, the test has failed
* @param type Set the type of the runner
* @param returnCode Expected return code from the runner.
* @param checkLogForTerminateString If true, the runner checks also the log4j logger for the terminate string
*/
protected void runWithArgs(String[] args, String terminateAfterString, String[] failOnStrings, RunTypes type, int returnCode) {
protected void runWithArgs(String[] args, String terminateAfterString, String[] failOnStrings, RunTypes type, int returnCode, boolean checkLogForTerminateString) {
LOG.info("Running with args {}", Arrays.toString(args));

outContent = new ByteArrayOutputStream();
Expand All @@ -434,6 +444,7 @@ protected void runWithArgs(String[] args, String terminateAfterString, String[]
runner.start();

boolean expectedStringSeen = false;
boolean testPassedFromLog4j = false;
do {
sleep(1000);
String outContentString = outContent.toString();
Expand All @@ -450,8 +461,17 @@ protected void runWithArgs(String[] args, String terminateAfterString, String[]
}
}
}
// check output for correct TaskManager startup.
if (outContentString.contains(terminateAfterString) || errContentString.contains(terminateAfterString) ) {
// check output for the expected terminateAfterString.
if(checkLogForTerminateString) {
LoggingEvent matchedEvent = UtilsTest.getEventContainingString(terminateAfterString);
if(matchedEvent != null) {
testPassedFromLog4j = true;
LOG.info("Found expected output in logging event {}", matchedEvent);
}

}

if (outContentString.contains(terminateAfterString) || errContentString.contains(terminateAfterString) || testPassedFromLog4j ) {
expectedStringSeen = true;
LOG.info("Found expected output in redirected streams");
// send "stop" command to command line interface
Expand All @@ -469,13 +489,14 @@ protected void runWithArgs(String[] args, String terminateAfterString, String[]
else {
// check if thread died
if (!runner.isAlive()) {
sendOutput();
if (runner.getReturnValue() != 0) {
Assert.fail("Runner thread died before the test was finished. Return value = "
+ runner.getReturnValue());
} else {
LOG.info("Runner stopped earlier than expected with return value = 0");
}
// leave loop: the runner died, so we can not expect new strings to show up.
break;
}
}
}
Expand Down

0 comments on commit 6d1656a

Please sign in to comment.