Skip to content

Commit

Permalink
[tests] Flix flakey SimpleRecoveryITCase
Browse files Browse the repository at this point in the history
  • Loading branch information
StephanEwen committed Feb 20, 2015
1 parent c46bdff commit 1f49926
Showing 1 changed file with 39 additions and 16 deletions.
Expand Up @@ -67,8 +67,6 @@ public static void teardownCluster() {
@Test
public void testFailedRunThenSuccessfulRun() {

FailOnceMapper.failuresBeforeSuccess = 1;

try {
List<Long> resultCollection = new ArrayList<Long>();

Expand All @@ -81,7 +79,7 @@ public void testFailedRunThenSuccessfulRun() {
env.setNumberOfExecutionRetries(0);

env.generateSequence(1, 10)
.map(new FailOnceMapper<Long>())
.map(new FailingMapper1<Long>())
.reduce(new ReduceFunction<Long>() {
@Override
public Long reduce(Long value1, Long value2) {
Expand All @@ -91,8 +89,9 @@ public Long reduce(Long value1, Long value2) {
.output(new LocalCollectionOutputFormat<Long>(resultCollection));

try {
env.execute();
fail("The program should have failed");
JobExecutionResult res = env.execute();
String msg = res == null ? "null result" : "result in " + res.getNetRuntime();
fail("The program should have failed, but returned " + msg);
}
catch (ProgramInvocationException e) {
// expected
Expand All @@ -108,7 +107,7 @@ public Long reduce(Long value1, Long value2) {
env.setNumberOfExecutionRetries(0);

env.generateSequence(1, 10)
.map(new FailOnceMapper<Long>())
.map(new FailingMapper1<Long>())
.reduce(new ReduceFunction<Long>() {
@Override
public Long reduce(Long value1, Long value2) {
Expand Down Expand Up @@ -143,9 +142,6 @@ public Long reduce(Long value1, Long value2) {

@Test
public void testRestart() {

FailOnceMapper.failuresBeforeSuccess = 1;

try {
List<Long> resultCollection = new ArrayList<Long>();

Expand All @@ -156,7 +152,7 @@ public void testRestart() {
env.setNumberOfExecutionRetries(1);

env.generateSequence(1, 10)
.map(new FailOnceMapper<Long>())
.map(new FailingMapper2<Long>())
.reduce(new ReduceFunction<Long>() {
@Override
public Long reduce(Long value1, Long value2) {
Expand Down Expand Up @@ -189,9 +185,6 @@ public Long reduce(Long value1, Long value2) {

@Test
public void testRestartMultipleTimes() {

FailOnceMapper.failuresBeforeSuccess = 3;

try {
List<Long> resultCollection = new ArrayList<Long>();

Expand All @@ -202,7 +195,7 @@ public void testRestartMultipleTimes() {
env.setNumberOfExecutionRetries(3);

env.generateSequence(1, 10)
.map(new FailOnceMapper<Long>())
.map(new FailingMapper3<Long>())
.reduce(new ReduceFunction<Long>() {
@Override
public Long reduce(Long value1, Long value2) {
Expand Down Expand Up @@ -235,9 +228,39 @@ public Long reduce(Long value1, Long value2) {

// ------------------------------------------------------------------------------------

private static class FailOnceMapper<T> extends RichMapFunction<T, T> {
private static class FailingMapper1<T> extends RichMapFunction<T, T> {

private static int failuresBeforeSuccess = 1;

@Override
public T map(T value) throws Exception {
if (failuresBeforeSuccess > 0 && getRuntimeContext().getIndexOfThisSubtask() == 1) {
failuresBeforeSuccess--;
throw new Exception("Test Failure");
}

return value;
}
}

private static class FailingMapper2<T> extends RichMapFunction<T, T> {

private static int failuresBeforeSuccess = 1;

@Override
public T map(T value) throws Exception {
if (failuresBeforeSuccess > 0 && getRuntimeContext().getIndexOfThisSubtask() == 1) {
failuresBeforeSuccess--;
throw new Exception("Test Failure");
}

return value;
}
}

private static class FailingMapper3<T> extends RichMapFunction<T, T> {

private static int failuresBeforeSuccess = 0;
private static int failuresBeforeSuccess = 3;

@Override
public T map(T value) throws Exception {
Expand Down

0 comments on commit 1f49926

Please sign in to comment.