From 137d94b220a852f2124886a618ccf179ffe8cb46 Mon Sep 17 00:00:00 2001 From: Robert Metzger Date: Tue, 21 Apr 2015 19:05:34 +0200 Subject: [PATCH] Improve error messages on Task deployment --- .../org/apache/flink/runtime/executiongraph/Execution.java | 5 +++-- .../org/apache/flink/runtime/taskmanager/TaskManager.scala | 3 ++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index 3ba378c5f4d6d..f77605547ab88 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -328,7 +328,7 @@ public void deployToSlot(final SimpleSlot slot) throws JobException { // register this execution at the execution graph, to receive call backs vertex.getExecutionGraph().registerExecution(this); - Instance instance = slot.getInstance(); + final Instance instance = slot.getInstance(); Future deployAction = Patterns.ask(instance.getTaskManager(), new SubmitTask(deployment), new Timeout(timeout)); @@ -338,7 +338,8 @@ public void deployToSlot(final SimpleSlot slot) throws JobException { public void onComplete(Throwable failure, Object success) throws Throwable { if (failure != null) { if (failure instanceof TimeoutException) { - markFailed(new Exception("Cannot deploy task - TaskManager not responding.", failure)); + markFailed(new Exception("Cannot deploy task (" + deployment.toString() + "): " + + "TaskManager (" + instance.getTaskManager().toString() + ") not responding after a timeout of " + timeout, failure)); } else { markFailed(failure); diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index d6b91ec9c0310..16fed5b9064a8 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -802,8 +802,9 @@ extends Actor with ActorLogMessages with ActorLogging { throw new IllegalStateException("TaskManager is not associated with a JobManager") } if (slot < 0 || slot >= numberOfSlots) { - throw new Exception(s"Target slot ${slot} does not exist on TaskManager.") + throw new Exception(s"Target slot $slot does not exist on TaskManager.") } + LOG.info("Submitting task {}", executionID) val userCodeClassLoader = libraryCacheManager match { case Some(manager) =>