Skip to content

Commit

Permalink
[hotfix] [tests] Code cleanups in ExecutionGraphRestartTest
Browse files Browse the repository at this point in the history
  • Loading branch information
StephanEwen committed Jul 23, 2017
1 parent 605319b commit d80ba4d
Showing 1 changed file with 18 additions and 21 deletions.
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.core.testutils.OneShotLatch;
Expand Down Expand Up @@ -63,11 +64,8 @@
import org.junit.After; import org.junit.After;
import org.junit.Test; import org.junit.Test;


import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Deadline; import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration; import scala.concurrent.duration.FiniteDuration;
import scala.concurrent.impl.Promise;


import java.io.IOException; import java.io.IOException;
import java.net.InetAddress; import java.net.InetAddress;
Expand Down Expand Up @@ -531,8 +529,7 @@ public void testFailExecutionGraphAfterCancel() throws Exception {
*/ */
@Test @Test
public void testSuspendWhileRestarting() throws Exception { public void testSuspendWhileRestarting() throws Exception {
FiniteDuration timeout = new FiniteDuration(1, TimeUnit.MINUTES); final Time timeout = Time.of(1, TimeUnit.MINUTES);
Deadline deadline = timeout.fromNow();


Instance instance = ExecutionGraphTestUtils.getInstance( Instance instance = ExecutionGraphTestUtils.getInstance(
new ActorTaskManagerGateway( new ActorTaskManagerGateway(
Expand Down Expand Up @@ -571,7 +568,7 @@ public void testSuspendWhileRestarting() throws Exception {


instance.markDead(); instance.markDead();


Await.ready(controllableRestartStrategy.getReachedCanRestart(), deadline.timeLeft()); controllableRestartStrategy.getReachedCanRestart().await(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);


assertEquals(JobStatus.RESTARTING, eg.getState()); assertEquals(JobStatus.RESTARTING, eg.getState());


Expand All @@ -581,7 +578,7 @@ public void testSuspendWhileRestarting() throws Exception {


controllableRestartStrategy.unlockRestart(); controllableRestartStrategy.unlockRestart();


Await.ready(controllableRestartStrategy.getRestartDone(), deadline.timeLeft()); controllableRestartStrategy.getRestartDone().await(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);


assertEquals(JobStatus.SUSPENDED, eg.getState()); assertEquals(JobStatus.SUSPENDED, eg.getState());
} }
Expand Down Expand Up @@ -795,37 +792,37 @@ private static Instance createInstance(int port) {


private static class ControllableRestartStrategy implements RestartStrategy { private static class ControllableRestartStrategy implements RestartStrategy {


private Promise<Boolean> reachedCanRestart = new Promise.DefaultPromise<>(); private final OneShotLatch reachedCanRestart = new OneShotLatch();
private Promise<Boolean> doRestart = new Promise.DefaultPromise<>(); private final OneShotLatch doRestart = new OneShotLatch();
private Promise<Boolean> restartDone = new Promise.DefaultPromise<>(); private final OneShotLatch restartDone = new OneShotLatch();


private volatile Exception exception = null; private final Time timeout;


private FiniteDuration timeout; private volatile Exception exception;


public ControllableRestartStrategy(FiniteDuration timeout) { public ControllableRestartStrategy(Time timeout) {
this.timeout = timeout; this.timeout = timeout;
} }


public void unlockRestart() { public void unlockRestart() {
doRestart.success(true); doRestart.trigger();
} }


public Exception getException() { public Exception getException() {
return exception; return exception;
} }


public Future<Boolean> getReachedCanRestart() { public OneShotLatch getReachedCanRestart() {
return reachedCanRestart.future(); return reachedCanRestart;
} }


public Future<Boolean> getRestartDone() { public OneShotLatch getRestartDone() {
return restartDone.future(); return restartDone;
} }


@Override @Override
public boolean canRestart() { public boolean canRestart() {
reachedCanRestart.success(true); reachedCanRestart.trigger();
return true; return true;
} }


Expand All @@ -835,13 +832,13 @@ public void restart(final RestartCallback restarter, ScheduledExecutor executor)
@Override @Override
public void run() { public void run() {
try { try {
Await.ready(doRestart.future(), timeout); doRestart.await(timeout.getSize(), timeout.getUnit());
restarter.triggerFullRecovery(); restarter.triggerFullRecovery();
} catch (Exception e) { } catch (Exception e) {
exception = e; exception = e;
} }


restartDone.success(true); restartDone.trigger();
} }
}); });
} }
Expand Down

0 comments on commit d80ba4d

Please sign in to comment.