Skip to content

Commit

Permalink
[FLINK-2329] [runtime] Introduces InstanceGateway as an abstraction t…
Browse files Browse the repository at this point in the history
…o communicate with the TaskManager.

Replaces AkkaUtils.globalExecutionContext with instance dependent ExecutionContext.

This closes #893
  • Loading branch information
tillrohrmann authored and StephanEwen committed Jul 13, 2015
1 parent aa5e5b3 commit 2ccb5fd
Show file tree
Hide file tree
Showing 50 changed files with 1,399 additions and 981 deletions.
Expand Up @@ -18,20 +18,17 @@


package org.apache.flink.runtime.executiongraph; package org.apache.flink.runtime.executiongraph;


import akka.actor.ActorRef;
import akka.dispatch.OnComplete; import akka.dispatch.OnComplete;
import akka.dispatch.OnFailure; import akka.dispatch.OnFailure;
import akka.pattern.Patterns;
import akka.util.Timeout;
import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor; import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
import org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescriptor; import org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionLocation; import org.apache.flink.runtime.deployment.ResultPartitionLocation;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.instance.Instance; import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.InstanceConnectionInfo; import org.apache.flink.runtime.instance.InstanceConnectionInfo;
import org.apache.flink.runtime.instance.InstanceGateway;
import org.apache.flink.runtime.instance.SimpleSlot; import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.io.network.ConnectionID; import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
Expand All @@ -50,6 +47,7 @@
import org.apache.flink.runtime.util.SerializedValue; import org.apache.flink.runtime.util.SerializedValue;
import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.ExceptionUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future; import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration; import scala.concurrent.duration.FiniteDuration;


Expand Down Expand Up @@ -131,9 +129,20 @@ public class Execution implements Serializable {


private SerializedValue<StateHandle<?>> operatorState; private SerializedValue<StateHandle<?>> operatorState;


/** The execution context which is used to execute futures. */
@SuppressWarnings("NonSerializableFieldInSerializableClass")
private ExecutionContext executionContext;

// -------------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------------


public Execution(ExecutionVertex vertex, int attemptNumber, long startTimestamp, FiniteDuration timeout) { public Execution(
ExecutionContext executionContext,
ExecutionVertex vertex,
int attemptNumber,
long startTimestamp,
FiniteDuration timeout) {
this.executionContext = checkNotNull(executionContext);

this.vertex = checkNotNull(vertex); this.vertex = checkNotNull(vertex);
this.attemptId = new ExecutionAttemptID(); this.attemptId = new ExecutionAttemptID();


Expand Down Expand Up @@ -200,6 +209,8 @@ public void prepareForArchiving() {
} }
assignedResource = null; assignedResource = null;


executionContext = null;

partialInputChannelDeploymentDescriptors.clear(); partialInputChannelDeploymentDescriptors.clear();
partialInputChannelDeploymentDescriptors = null; partialInputChannelDeploymentDescriptors = null;
} }
Expand Down Expand Up @@ -338,8 +349,9 @@ public void deployToSlot(final SimpleSlot slot) throws JobException {
vertex.getExecutionGraph().registerExecution(this); vertex.getExecutionGraph().registerExecution(this);


final Instance instance = slot.getInstance(); final Instance instance = slot.getInstance();
final Future<Object> deployAction = Patterns.ask(instance.getTaskManager(), final InstanceGateway gateway = instance.getInstanceGateway();
new SubmitTask(deployment), new Timeout(timeout));
final Future<Object> deployAction = gateway.ask(new SubmitTask(deployment), timeout);


deployAction.onComplete(new OnComplete<Object>(){ deployAction.onComplete(new OnComplete<Object>(){


Expand All @@ -366,7 +378,7 @@ public void onComplete(Throwable failure, Object success) throws Throwable {
} }
} }
} }
}, AkkaUtils.globalExecutionContext()); }, executionContext);
} }
catch (Throwable t) { catch (Throwable t) {
markFailed(t); markFailed(t);
Expand Down Expand Up @@ -402,7 +414,7 @@ else if (current == RUNNING || current == DEPLOYING) {
else if (current == FINISHED || current == FAILED) { else if (current == FINISHED || current == FAILED) {
// nothing to do any more. finished failed before it could be cancelled. // nothing to do any more. finished failed before it could be cancelled.
// in any case, the task is removed from the TaskManager already // in any case, the task is removed from the TaskManager already
sendFailIntermediateResultPartitionsRPCCall(); sendFailIntermediateResultPartitionsRpcCall();


return; return;
} }
Expand Down Expand Up @@ -485,7 +497,7 @@ public Boolean call() throws Exception {


return true; return true;
} }
}, AkkaUtils.globalExecutionContext()); }, executionContext);


// double check to resolve race conditions // double check to resolve race conditions
if(consumerVertex.getExecutionState() == RUNNING){ if(consumerVertex.getExecutionState() == RUNNING){
Expand Down Expand Up @@ -533,7 +545,7 @@ public Boolean call() throws Exception {
final UpdatePartitionInfo updateTaskMessage = new UpdateTaskSinglePartitionInfo( final UpdatePartitionInfo updateTaskMessage = new UpdateTaskSinglePartitionInfo(
consumer.getAttemptId(), partition.getIntermediateResult().getId(), descriptor); consumer.getAttemptId(), partition.getIntermediateResult().getId(), descriptor);


sendUpdateTaskRpcCall(consumerSlot, updateTaskMessage); sendUpdatePartitionInfoRpcCall(consumerSlot, updateTaskMessage);
} }
// ---------------------------------------------------------------- // ----------------------------------------------------------------
// Consumer is scheduled or deploying => cache input channel // Consumer is scheduled or deploying => cache input channel
Expand Down Expand Up @@ -689,11 +701,12 @@ void sendPartitionInfos() {
inputChannelDeploymentDescriptors.add(partialInputChannelDeploymentDescriptor.createInputChannelDeploymentDescriptor(this)); inputChannelDeploymentDescriptors.add(partialInputChannelDeploymentDescriptor.createInputChannelDeploymentDescriptor(this));
} }


UpdatePartitionInfo updateTaskMessage = UpdatePartitionInfo updateTaskMessage = createUpdateTaskMultiplePartitionInfos(
createUpdateTaskMultiplePartitionInfos(attemptId, resultIDs, attemptId,
inputChannelDeploymentDescriptors); resultIDs,
inputChannelDeploymentDescriptors);


sendUpdateTaskRpcCall(assignedResource, updateTaskMessage); sendUpdatePartitionInfoRpcCall(assignedResource, updateTaskMessage);
} }
} }


Expand Down Expand Up @@ -804,14 +817,23 @@ else if (currentState == CANCELING || currentState == FAILED) {
} }
} }


/**
* This method sends a CancelTask message to the instance of the assigned slot.
*
* The sending is tried up to NUM_CANCEL_CALL_TRIES times.
*/
private void sendCancelRpcCall() { private void sendCancelRpcCall() {
final SimpleSlot slot = this.assignedResource; final SimpleSlot slot = this.assignedResource;


if (slot != null) { if (slot != null) {


Future<Object> cancelResult = AkkaUtils.retry(slot.getInstance().getTaskManager(), new final InstanceGateway gateway = slot.getInstance().getInstanceGateway();
CancelTask(attemptId), NUM_CANCEL_CALL_TRIES,
AkkaUtils.globalExecutionContext(), timeout); Future<Object> cancelResult = gateway.retry(
new CancelTask(attemptId),
NUM_CANCEL_CALL_TRIES,
timeout,
executionContext);


cancelResult.onComplete(new OnComplete<Object>() { cancelResult.onComplete(new OnComplete<Object>() {


Expand All @@ -827,43 +849,48 @@ public void onComplete(Throwable failure, Object success) throws Throwable {
} }
} }
} }
}, AkkaUtils.globalExecutionContext()); }, executionContext);
} }
} }


private void sendFailIntermediateResultPartitionsRPCCall() { private void sendFailIntermediateResultPartitionsRpcCall() {
final SimpleSlot slot = this.assignedResource; final SimpleSlot slot = this.assignedResource;


if (slot != null) { if (slot != null) {
final Instance instance = slot.getInstance(); final Instance instance = slot.getInstance();


if (instance.isAlive()) { if (instance.isAlive()) {
try { final InstanceGateway gateway = instance.getInstanceGateway();
// TODO For some tests this could be a problem when querying too early if all resources were released
instance.getTaskManager().tell(new FailIntermediateResultPartitions(attemptId), ActorRef.noSender()); // TODO For some tests this could be a problem when querying too early if all resources were released
} catch (Throwable t) { gateway.tell(new FailIntermediateResultPartitions(attemptId));
fail(new Exception("Intermediate result partition could not be failed.", t));
}
} }
} }
} }


private void sendUpdateTaskRpcCall(final SimpleSlot consumerSlot, /**
final UpdatePartitionInfo updateTaskMsg) { * Sends an UpdatePartitionInfo message to the instance of the consumerSlot.
*
* @param consumerSlot Slot to whose instance the message will be sent
* @param updatePartitionInfo UpdatePartitionInfo message
*/
private void sendUpdatePartitionInfoRpcCall(
final SimpleSlot consumerSlot,
final UpdatePartitionInfo updatePartitionInfo) {


if (consumerSlot != null) { if (consumerSlot != null) {
final Instance instance = consumerSlot.getInstance(); final Instance instance = consumerSlot.getInstance();
final InstanceGateway gateway = instance.getInstanceGateway();


Future<Object> futureUpdate = Patterns.ask(instance.getTaskManager(), updateTaskMsg, Future<Object> futureUpdate = gateway.ask(updatePartitionInfo, timeout);
new Timeout(timeout));


futureUpdate.onFailure(new OnFailure() { futureUpdate.onFailure(new OnFailure() {
@Override @Override
public void onFailure(Throwable failure) throws Throwable { public void onFailure(Throwable failure) throws Throwable {
fail(new IllegalStateException("Update task on instance " + instance + fail(new IllegalStateException("Update task on instance " + instance +
" failed due to:", failure)); " failed due to:", failure));
} }
}, AkkaUtils.globalExecutionContext()); }, executionContext);
} }
} }


Expand Down
Expand Up @@ -24,7 +24,6 @@
import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.execution.ExecutionState;
Expand All @@ -45,6 +44,7 @@
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


import scala.concurrent.ExecutionContext;
import scala.concurrent.duration.FiniteDuration; import scala.concurrent.duration.FiniteDuration;


import java.io.Serializable; import java.io.Serializable;
Expand Down Expand Up @@ -197,6 +197,10 @@ public class ExecutionGraph implements Serializable {
@SuppressWarnings("NonSerializableFieldInSerializableClass") @SuppressWarnings("NonSerializableFieldInSerializableClass")
private CheckpointCoordinator checkpointCoordinator; private CheckpointCoordinator checkpointCoordinator;


/** The execution context which is used to execute futures. */
@SuppressWarnings("NonSerializableFieldInSerializableClass")
private ExecutionContext executionContext;

// ------ Fields that are only relevant for archived execution graphs ------------ // ------ Fields that are only relevant for archived execution graphs ------------
private ExecutionConfig executionConfig; private ExecutionConfig executionConfig;


Expand All @@ -207,17 +211,38 @@ public class ExecutionGraph implements Serializable {
/** /**
* This constructor is for tests only, because it does not include class loading information. * This constructor is for tests only, because it does not include class loading information.
*/ */
ExecutionGraph(JobID jobId, String jobName, Configuration jobConfig, FiniteDuration timeout) { ExecutionGraph(
this(jobId, jobName, jobConfig, timeout, new ArrayList<BlobKey>(), ExecutionGraph.class.getClassLoader()); ExecutionContext executionContext,
} JobID jobId,

String jobName,
public ExecutionGraph(JobID jobId, String jobName, Configuration jobConfig, FiniteDuration timeout, Configuration jobConfig,
List<BlobKey> requiredJarFiles, ClassLoader userClassLoader) { FiniteDuration timeout) {

this(
if (jobId == null || jobName == null || jobConfig == null || userClassLoader == null) { executionContext,
jobId,
jobName,
jobConfig,
timeout,
new ArrayList<BlobKey>(),
ExecutionGraph.class.getClassLoader()
);
}

public ExecutionGraph(
ExecutionContext executionContext,
JobID jobId,
String jobName,
Configuration jobConfig,
FiniteDuration timeout,
List<BlobKey> requiredJarFiles,
ClassLoader userClassLoader) {

if (executionContext == null || jobId == null || jobName == null || jobConfig == null || userClassLoader == null) {
throw new NullPointerException(); throw new NullPointerException();
} }


this.executionContext = executionContext;

this.jobID = jobId; this.jobID = jobId;
this.jobName = jobName; this.jobName = jobName;
this.jobConfiguration = jobConfig; this.jobConfiguration = jobConfig;
Expand Down Expand Up @@ -451,6 +476,15 @@ public long getStatusTimestamp(JobStatus status) {
return this.stateTimestamps[status.ordinal()]; return this.stateTimestamps[status.ordinal()];
} }


/**
* Returns the ExecutionContext associated with this ExecutionGraph.
*
* @return ExecutionContext associated with this ExecutionGraph
*/
public ExecutionContext getExecutionContext() {
return executionContext;
}

// -------------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------------
// Actions // Actions
// -------------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -629,6 +663,7 @@ public void prepareForArchiving() {
userClassLoader = null; userClassLoader = null;
scheduler = null; scheduler = null;
checkpointCoordinator = null; checkpointCoordinator = null;
executionContext = null;


for (ExecutionJobVertex vertex : verticesInCreationOrder) { for (ExecutionJobVertex vertex : verticesInCreationOrder) {
vertex.prepareForArchiving(); vertex.prepareForArchiving();
Expand Down Expand Up @@ -719,7 +754,7 @@ public Object call() throws Exception {
restart(); restart();
return null; return null;
} }
}, AkkaUtils.globalExecutionContext()); }, executionContext);
break; break;
} }
else if (numberOfRetriesLeft <= 0 && transitionState(current, JobStatus.FAILED, failureCause)) { else if (numberOfRetriesLeft <= 0 && transitionState(current, JobStatus.FAILED, failureCause)) {
Expand Down

0 comments on commit 2ccb5fd

Please sign in to comment.