Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,11 @@ import org.apache.flink.runtime.security.SecurityUtils.SecurityConfiguration
import org.apache.flink.runtime.taskexecutor.{TaskManagerServices, TaskManagerServicesConfiguration, TaskManagerConfiguration}
import org.apache.flink.runtime.util._
import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMessages}
import org.apache.flink.util.NetUtils

import scala.collection.JavaConverters._
import scala.concurrent._
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.util.{Failure, Success}

/**
* The TaskManager is responsible for executing the individual tasks of a Flink job. It is
Expand Down Expand Up @@ -423,7 +421,7 @@ class TaskManager(
futureResponse.mapTo[Boolean].onComplete {
// IMPORTANT: In the future callback, we cannot directly modify state
// but only send messages to the TaskManager to do those changes
case Success(result) =>
case scala.util.Success(result) =>
if (!result) {
self ! decorateMessage(
FailTask(
Expand All @@ -432,7 +430,7 @@ class TaskManager(
)
}

case Failure(t) =>
case scala.util.Failure(t) =>
self ! decorateMessage(
FailTask(
executionID,
Expand Down Expand Up @@ -470,7 +468,7 @@ class TaskManager(
sender ! decorateMessage(Acknowledge.get())
} catch {
case t: Throwable =>
sender ! decorateMessage(Failure(t))
sender ! decorateMessage(Status.Failure(t))
}
} else {
log.debug(s"Cannot find task to stop for execution ${executionID})")
Expand Down Expand Up @@ -762,8 +760,6 @@ class TaskManager(
// ---- Done ----
log.debug(s"Done with stack trace sample $sampleId.")



sender ! new StackTraceSampleResponse(sampleId, executionId, currentTraces)
}

Expand All @@ -781,7 +777,7 @@ class TaskManager(
}
} catch {
case e: Exception =>
sender ! Failure(e)
sender ! decorateMessage(Status.Failure(e))
}

case _ => unhandled(message)
Expand Down Expand Up @@ -841,10 +837,10 @@ class TaskManager(
client.put(fis);
}(context.dispatcher)
.onComplete {
case Success(value) =>
case scala.util.Success(value) =>
sender ! value
fis.close()
case Failure(e) =>
case scala.util.Failure(e) =>
sender ! akka.actor.Status.Failure(e)
fis.close()
}(context.dispatcher)
Expand Down Expand Up @@ -1209,7 +1205,7 @@ class TaskManager(
catch {
case t: Throwable =>
log.error("SubmitTask failed", t)
sender ! decorateMessage(Failure(t))
sender ! decorateMessage(Status.Failure(t))
}
}

Expand Down Expand Up @@ -1263,7 +1259,7 @@ class TaskManager(
if (errors.isEmpty) {
sender ! decorateMessage(Acknowledge.get())
} else {
sender ! decorateMessage(Failure(new Exception(errors.mkString("\n"))))
sender ! decorateMessage(Status.Failure(new Exception(errors.mkString("\n"))))
}

case None =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ protected void run() {
expectMsgEquals(Acknowledge.get());

tm.tell(new StopTask(eid2), testActorGateway);
expectMsgClass(Failure.class);
expectMsgClass(Status.Failure.class);

assertEquals(ExecutionState.RUNNING, t2.getExecutionState());

Expand Down Expand Up @@ -1227,13 +1227,13 @@ protected void run() {

// Receive the expected message (heartbeat races possible)
Object[] msg = receiveN(1);
while (!(msg[0] instanceof Failure)) {
while (!(msg[0] instanceof Status.Failure)) {
msg = receiveN(1);
}

Failure response = (Failure) msg[0];
Status.Failure response = (Status.Failure) msg[0];

assertEquals(IllegalStateException.class, response.exception().getClass());
assertEquals(IllegalStateException.class, response.cause().getClass());
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
Expand Down Expand Up @@ -1525,7 +1525,234 @@ public void testFailingScheduleOrUpdateConsumersMessage() throws Exception {
}
}};
}


/**
* Tests that the TaskManager sends a proper exception back to the sender if the submit task
* message fails.
*/
@Test
public void testSubmitTaskFailure() throws Exception {
ActorGateway jobManager = null;
ActorGateway taskManager = null;

try {

ActorRef jm = system.actorOf(Props.create(SimpleJobManager.class, leaderSessionID));
jobManager = new AkkaActorGateway(jm, leaderSessionID);

taskManager = TestingUtils.createTaskManager(
system,
jobManager,
new Configuration(),
true,
true);

TaskDeploymentDescriptor tdd = createTaskDeploymentDescriptor(
new JobID(),
"test job",
new JobVertexID(),
new ExecutionAttemptID(),
new SerializedValue<>(new ExecutionConfig()),
"test task",
0, // this will make the submission fail because the number of key groups must be >= 1
0,
1,
0,
new Configuration(),
new Configuration(),
"Foobar",
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
Collections.<InputGateDeploymentDescriptor>emptyList(),
Collections.<BlobKey>emptyList(),
Collections.<URL>emptyList(),
0);

Future<Object> submitResponse = taskManager.ask(new SubmitTask(tdd), timeout);

try {
Await.result(submitResponse, timeout);

fail("The submit task message should have failed.");
} catch (IllegalArgumentException e) {
// expected
}
} finally {
TestingUtils.stopActor(jobManager);
TestingUtils.stopActor(taskManager);
}
}

/**
* Tests that the TaskManager sends a proper exception back to the sender if the stop task
* message fails.
*/
@Test
public void testStopTaskFailure() throws Exception {
ActorGateway jobManager = null;
ActorGateway taskManager = null;

try {
final ExecutionAttemptID executionAttemptId = new ExecutionAttemptID();

ActorRef jm = system.actorOf(Props.create(SimpleJobManager.class, leaderSessionID));
jobManager = new AkkaActorGateway(jm, leaderSessionID);

taskManager = TestingUtils.createTaskManager(
system,
jobManager,
new Configuration(),
true,
true);

TaskDeploymentDescriptor tdd = createTaskDeploymentDescriptor(
new JobID(),
"test job",
new JobVertexID(),
executionAttemptId,
new SerializedValue<>(new ExecutionConfig()),
"test task",
1,
0,
1,
0,
new Configuration(),
new Configuration(),
BlockingNoOpInvokable.class.getName(),
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
Collections.<InputGateDeploymentDescriptor>emptyList(),
Collections.<BlobKey>emptyList(),
Collections.<URL>emptyList(),
0);

Future<Object> submitResponse = taskManager.ask(new SubmitTask(tdd), timeout);

Await.result(submitResponse, timeout);

Future<Object> stopResponse = taskManager.ask(new StopTask(executionAttemptId), timeout);

try {
Await.result(stopResponse, timeout);

fail("The stop task message should have failed.");
} catch (UnsupportedOperationException e) {
// expected
}
} finally {
TestingUtils.stopActor(jobManager);
TestingUtils.stopActor(taskManager);
}
}

/**
* Tests that the TaskManager sends a proper exception back to the sender if the trigger stack
* trace message fails.
*/
@Test
public void testStackTraceSampleFailure() throws Exception {
ActorGateway jobManager = null;
ActorGateway taskManager = null;

try {

ActorRef jm = system.actorOf(Props.create(SimpleJobManager.class, leaderSessionID));
jobManager = new AkkaActorGateway(jm, leaderSessionID);

taskManager = TestingUtils.createTaskManager(
system,
jobManager,
new Configuration(),
true,
true);

Future<Object> stackTraceResponse = taskManager.ask(
new TriggerStackTraceSample(
0,
new ExecutionAttemptID(),
0,
Time.milliseconds(1L),
0),
timeout);

try {
Await.result(stackTraceResponse, timeout);

fail("The trigger stack trace message should have failed.");
} catch (IllegalStateException e) {
// expected
}
} finally {
TestingUtils.stopActor(jobManager);
TestingUtils.stopActor(taskManager);
}
}

/**
* Tests that the TaskManager sends a proper exception back to the sender if the trigger stack
* trace message fails.
*/
@Test
public void testUpdateTaskInputPartitionsFailure() throws Exception {
ActorGateway jobManager = null;
ActorGateway taskManager = null;

try {

final ExecutionAttemptID executionAttemptId = new ExecutionAttemptID();

ActorRef jm = system.actorOf(Props.create(SimpleJobManager.class, leaderSessionID));
jobManager = new AkkaActorGateway(jm, leaderSessionID);

taskManager = TestingUtils.createTaskManager(
system,
jobManager,
new Configuration(),
true,
true);

TaskDeploymentDescriptor tdd = createTaskDeploymentDescriptor(
new JobID(),
"test job",
new JobVertexID(),
executionAttemptId,
new SerializedValue<>(new ExecutionConfig()),
"test task",
1,
0,
1,
0,
new Configuration(),
new Configuration(),
BlockingNoOpInvokable.class.getName(),
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
Collections.<InputGateDeploymentDescriptor>emptyList(),
Collections.<BlobKey>emptyList(),
Collections.<URL>emptyList(),
0);

Future<Object> submitResponse = taskManager.ask(new SubmitTask(tdd), timeout);

Await.result(submitResponse, timeout);

Future<Object> partitionUpdateResponse = taskManager.ask(
new TaskMessages.UpdateTaskSinglePartitionInfo(
executionAttemptId,
new IntermediateDataSetID(),
new InputChannelDeploymentDescriptor(new ResultPartitionID(), ResultPartitionLocation.createLocal())),
timeout);

try {
Await.result(partitionUpdateResponse, timeout);

fail("The update task input partitions message should have failed.");
} catch (Exception e) {
// expected
}
} finally {
TestingUtils.stopActor(jobManager);
TestingUtils.stopActor(taskManager);
}
}

// --------------------------------------------------------------------------------------------

public static class SimpleJobManager extends FlinkUntypedActor {
Expand Down