Skip to content

Commit

Permalink
[FLINK-4672] [taskmanager] Do not decorate Actor Kill messages
Browse files Browse the repository at this point in the history
  • Loading branch information
StephanEwen committed Sep 26, 2016
1 parent 28ff5a3 commit 6f237cf
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 4 deletions.
Expand Up @@ -1379,7 +1379,7 @@ class TaskManager(
"\n" +
"A fatal error occurred, forcing the TaskManager to shut down: " + message, cause)

self ! decorateMessage(Kill)
self ! Kill
}

override def notifyLeaderAddress(leaderAddress: String, leaderSessionID: UUID): Unit = {
Expand Down
Expand Up @@ -20,6 +20,7 @@

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Kill;
import akka.actor.Props;
import akka.japi.Creator;
import akka.testkit.JavaTestKit;
Expand Down Expand Up @@ -55,6 +56,7 @@
import org.apache.flink.runtime.messages.StackTraceSampleMessages.ResponseStackTraceSampleSuccess;
import org.apache.flink.runtime.messages.StackTraceSampleMessages.TriggerStackTraceSample;
import org.apache.flink.runtime.messages.TaskManagerMessages;
import org.apache.flink.runtime.messages.TaskManagerMessages.FatalError;
import org.apache.flink.runtime.messages.TaskMessages;
import org.apache.flink.runtime.messages.TaskMessages.CancelTask;
import org.apache.flink.runtime.messages.TaskMessages.PartitionState;
Expand Down Expand Up @@ -1367,6 +1369,28 @@ protected void run() {
}};
}

@Test
public void testTerminationOnFatalError() {
new JavaTestKit(system){{

final ActorGateway taskManager = TestingUtils.createTaskManager(
system,
system.deadLetters(), // no jobmanager
new Configuration(),
true,
false);

try {
watch(taskManager.actor());
taskManager.tell(new FatalError("test fatal error", new Exception("something super bad")));
expectTerminated(d, taskManager.actor());
}
finally {
taskManager.tell(Kill.getInstance());
}
}};
}

// --------------------------------------------------------------------------------------------

public static class SimpleJobManager extends FlinkUntypedActor {
Expand Down Expand Up @@ -1547,11 +1571,14 @@ public static final class TestInvokableBlockingCancelable extends AbstractInvoka

@Override
public void invoke() throws Exception {
Object o = new Object();
final Object o = new Object();
//noinspection SynchronizationOnLocalVariableOrMethodParameter
synchronized (o) {
o.wait();
//noinspection InfiniteLoopStatement
while (true) {
o.wait();
}
}
}
}

}

0 comments on commit 6f237cf

Please sign in to comment.