From 533e952c86cb3ca93a72638cece860fa690002e2 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Wed, 15 Feb 2017 14:16:26 +0100 Subject: [PATCH 1/2] [FLINK-5773] Use akka.actor.Status.Failure class to send failures via AskSupport Akka's AskSupport trait requires that failures are wrapped in a akka.actor.Status.Failure to be recognized. Internally the trait will unwrap the failure and wrap it in a scala.util.Failure instance. However, it does not recognize the scala Failure when given to the AskSupport trait. As a consequence it would wrap scala.util.Failure in a scala.util.Success instance. --- .../runtime/taskmanager/TaskManager.scala | 20 +- .../runtime/taskmanager/TaskManagerTest.java | 230 +++++++++++++++++- 2 files changed, 237 insertions(+), 13 deletions(-) diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index 7cb19026a0717..a70454bd2407f 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -69,13 +69,11 @@ import org.apache.flink.runtime.security.SecurityUtils.SecurityConfiguration import org.apache.flink.runtime.taskexecutor.{TaskManagerServices, TaskManagerServicesConfiguration, TaskManagerConfiguration} import org.apache.flink.runtime.util._ import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMessages} -import org.apache.flink.util.NetUtils import scala.collection.JavaConverters._ import scala.concurrent._ import scala.concurrent.duration._ import scala.language.postfixOps -import scala.util.{Failure, Success} /** * The TaskManager is responsible for executing the individual tasks of a Flink job. It is @@ -423,7 +421,7 @@ class TaskManager( futureResponse.mapTo[Boolean].onComplete { // IMPORTANT: In the future callback, we cannot directly modify state // but only send messages to the TaskManager to do those changes - case Success(result) => + case scala.util.Success(result) => if (!result) { self ! decorateMessage( FailTask( @@ -432,7 +430,7 @@ class TaskManager( ) } - case Failure(t) => + case scala.util.Failure(t) => self ! decorateMessage( FailTask( executionID, @@ -470,7 +468,7 @@ class TaskManager( sender ! decorateMessage(Acknowledge.get()) } catch { case t: Throwable => - sender ! decorateMessage(Failure(t)) + sender ! decorateMessage(Status.Failure(t)) } } else { log.debug(s"Cannot find task to stop for execution ${executionID})") @@ -762,8 +760,6 @@ class TaskManager( // ---- Done ---- log.debug(s"Done with stack trace sample $sampleId.") - - sender ! new StackTraceSampleResponse(sampleId, executionId, currentTraces) } @@ -781,7 +777,7 @@ class TaskManager( } } catch { case e: Exception => - sender ! Failure(e) + sender ! decorateMessage(Status.Failure(e)) } case _ => unhandled(message) @@ -841,10 +837,10 @@ class TaskManager( client.put(fis); }(context.dispatcher) .onComplete { - case Success(value) => + case scala.util.Success(value) => sender ! value fis.close() - case Failure(e) => + case scala.util.Failure(e) => sender ! akka.actor.Status.Failure(e) fis.close() }(context.dispatcher) @@ -1209,7 +1205,7 @@ class TaskManager( catch { case t: Throwable => log.error("SubmitTask failed", t) - sender ! decorateMessage(Failure(t)) + sender ! decorateMessage(Status.Failure(t)) } } @@ -1263,7 +1259,7 @@ class TaskManager( if (errors.isEmpty) { sender ! decorateMessage(Acknowledge.get()) } else { - sender ! decorateMessage(Failure(new Exception(errors.mkString("\n")))) + sender ! decorateMessage(Status.Failure(new Exception(errors.mkString("\n")))) } case None => diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java index 770aa35efde61..91fbf2aa0a65c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java @@ -69,6 +69,7 @@ import org.apache.flink.runtime.messages.TaskMessages.CancelTask; import org.apache.flink.runtime.messages.TaskMessages.StopTask; import org.apache.flink.runtime.messages.TaskMessages.SubmitTask; +import org.apache.flink.runtime.operators.testutils.DummyInvokable; import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages; import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages; import org.apache.flink.runtime.testingUtils.TestingUtils; @@ -1525,7 +1526,234 @@ public void testFailingScheduleOrUpdateConsumersMessage() throws Exception { } }}; } - + + /** + * Tests that the TaskManager sends a proper exception back to the sender if the submit task + * message fails. + */ + @Test + public void testSubmitTaskFailure() throws Exception { + ActorGateway jobManager = null; + ActorGateway taskManager = null; + + try { + + ActorRef jm = system.actorOf(Props.create(SimpleJobManager.class, leaderSessionID)); + jobManager = new AkkaActorGateway(jm, leaderSessionID); + + taskManager = TestingUtils.createTaskManager( + system, + jobManager, + new Configuration(), + true, + true); + + TaskDeploymentDescriptor tdd = createTaskDeploymentDescriptor( + new JobID(), + "test job", + new JobVertexID(), + new ExecutionAttemptID(), + new SerializedValue<>(new ExecutionConfig()), + "test task", + 0, // this will make the submission fail because the number of key groups must be >= 1 + 0, + 1, + 0, + new Configuration(), + new Configuration(), + "Foobar", + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + 0); + + Future submitResponse = taskManager.ask(new SubmitTask(tdd), timeout); + + try { + Await.result(submitResponse, timeout); + + fail("The submit task message should have failed."); + } catch (IllegalArgumentException e) { + // expected + } + } finally { + TestingUtils.stopActor(jobManager); + TestingUtils.stopActor(taskManager); + } + } + + /** + * Tests that the TaskManager sends a proper exception back to the sender if the stop task + * message fails. + */ + @Test + public void testStopTaskFailure() throws Exception { + ActorGateway jobManager = null; + ActorGateway taskManager = null; + + try { + final ExecutionAttemptID executionAttemptId = new ExecutionAttemptID(); + + ActorRef jm = system.actorOf(Props.create(SimpleJobManager.class, leaderSessionID)); + jobManager = new AkkaActorGateway(jm, leaderSessionID); + + taskManager = TestingUtils.createTaskManager( + system, + jobManager, + new Configuration(), + true, + true); + + TaskDeploymentDescriptor tdd = createTaskDeploymentDescriptor( + new JobID(), + "test job", + new JobVertexID(), + executionAttemptId, + new SerializedValue<>(new ExecutionConfig()), + "test task", + 1, + 0, + 1, + 0, + new Configuration(), + new Configuration(), + DummyInvokable.class.getName(), + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + 0); + + Future submitResponse = taskManager.ask(new SubmitTask(tdd), timeout); + + Await.result(submitResponse, timeout); + + Future stopResponse = taskManager.ask(new StopTask(executionAttemptId), timeout); + + try { + Await.result(stopResponse, timeout); + + fail("The stop task message should have failed."); + } catch (UnsupportedOperationException e) { + // expected + } + } finally { + TestingUtils.stopActor(jobManager); + TestingUtils.stopActor(taskManager); + } + } + + /** + * Tests that the TaskManager sends a proper exception back to the sender if the trigger stack + * trace message fails. + */ + @Test + public void testStackTraceSampleFailure() throws Exception { + ActorGateway jobManager = null; + ActorGateway taskManager = null; + + try { + + ActorRef jm = system.actorOf(Props.create(SimpleJobManager.class, leaderSessionID)); + jobManager = new AkkaActorGateway(jm, leaderSessionID); + + taskManager = TestingUtils.createTaskManager( + system, + jobManager, + new Configuration(), + true, + true); + + Future stackTraceResponse = taskManager.ask( + new TriggerStackTraceSample( + 0, + new ExecutionAttemptID(), + 0, + Time.milliseconds(1L), + 0), + timeout); + + try { + Await.result(stackTraceResponse, timeout); + + fail("The trigger stack trace message should have failed."); + } catch (IllegalStateException e) { + // expected + } + } finally { + TestingUtils.stopActor(jobManager); + TestingUtils.stopActor(taskManager); + } + } + + /** + * Tests that the TaskManager sends a proper exception back to the sender if the trigger stack + * trace message fails. + */ + @Test + public void testUpdateTaskInputPartitionsFailure() throws Exception { + ActorGateway jobManager = null; + ActorGateway taskManager = null; + + try { + + final ExecutionAttemptID executionAttemptId = new ExecutionAttemptID(); + + ActorRef jm = system.actorOf(Props.create(SimpleJobManager.class, leaderSessionID)); + jobManager = new AkkaActorGateway(jm, leaderSessionID); + + taskManager = TestingUtils.createTaskManager( + system, + jobManager, + new Configuration(), + true, + true); + + TaskDeploymentDescriptor tdd = createTaskDeploymentDescriptor( + new JobID(), + "test job", + new JobVertexID(), + executionAttemptId, + new SerializedValue<>(new ExecutionConfig()), + "test task", + 1, + 0, + 1, + 0, + new Configuration(), + new Configuration(), + DummyInvokable.class.getName(), + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + 0); + + Future submitResponse = taskManager.ask(new SubmitTask(tdd), timeout); + + Await.result(submitResponse, timeout); + + Future partitionUpdateResponse = taskManager.ask( + new TaskMessages.UpdateTaskSinglePartitionInfo( + executionAttemptId, + new IntermediateDataSetID(), + new InputChannelDeploymentDescriptor(new ResultPartitionID(), ResultPartitionLocation.createLocal())), + timeout); + + try { + Await.result(partitionUpdateResponse, timeout); + + fail("The update task input partitions message should have failed."); + } catch (Exception e) { + // expected + } + } finally { + TestingUtils.stopActor(jobManager); + TestingUtils.stopActor(taskManager); + } + } + // -------------------------------------------------------------------------------------------- public static class SimpleJobManager extends FlinkUntypedActor { From 9a024adf50c21fad0abdb6fe9fc602fb5bfd48c5 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Wed, 15 Feb 2017 15:28:12 +0100 Subject: [PATCH 2/2] Fix failing test case The unwrapping only takes place if one asks an actor. In case of tell, the original message will be transmitted. --- .../flink/runtime/taskmanager/TaskManagerTest.java | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java index 91fbf2aa0a65c..356d693d861fe 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java @@ -69,7 +69,6 @@ import org.apache.flink.runtime.messages.TaskMessages.CancelTask; import org.apache.flink.runtime.messages.TaskMessages.StopTask; import org.apache.flink.runtime.messages.TaskMessages.SubmitTask; -import org.apache.flink.runtime.operators.testutils.DummyInvokable; import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages; import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages; import org.apache.flink.runtime.testingUtils.TestingUtils; @@ -484,7 +483,7 @@ protected void run() { expectMsgEquals(Acknowledge.get()); tm.tell(new StopTask(eid2), testActorGateway); - expectMsgClass(Failure.class); + expectMsgClass(Status.Failure.class); assertEquals(ExecutionState.RUNNING, t2.getExecutionState()); @@ -1228,13 +1227,13 @@ protected void run() { // Receive the expected message (heartbeat races possible) Object[] msg = receiveN(1); - while (!(msg[0] instanceof Failure)) { + while (!(msg[0] instanceof Status.Failure)) { msg = receiveN(1); } - Failure response = (Failure) msg[0]; + Status.Failure response = (Status.Failure) msg[0]; - assertEquals(IllegalStateException.class, response.exception().getClass()); + assertEquals(IllegalStateException.class, response.cause().getClass()); } catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); @@ -1618,7 +1617,7 @@ public void testStopTaskFailure() throws Exception { 0, new Configuration(), new Configuration(), - DummyInvokable.class.getName(), + BlockingNoOpInvokable.class.getName(), Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), @@ -1723,7 +1722,7 @@ public void testUpdateTaskInputPartitionsFailure() throws Exception { 0, new Configuration(), new Configuration(), - DummyInvokable.class.getName(), + BlockingNoOpInvokable.class.getName(), Collections.emptyList(), Collections.emptyList(), Collections.emptyList(),