Skip to content

Commit

Permalink
[FLINK-4735] [cluster management] Implements some job execution relat…
Browse files Browse the repository at this point in the history
…ed RPC calls on the JobManager
  • Loading branch information
KurtYoung authored and StephanEwen committed Dec 23, 2016
1 parent 66f1529 commit 4e5f423
Show file tree
Hide file tree
Showing 5 changed files with 457 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
Expand All @@ -41,8 +42,8 @@
import org.apache.flink.runtime.client.JobSubmissionException;
import org.apache.flink.runtime.client.SerializedJobExecutionResult;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
Expand All @@ -65,22 +66,32 @@
import org.apache.flink.runtime.jobmanager.OnCompletionActions;
import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.jobmaster.message.ClassloadingProps;
import org.apache.flink.runtime.jobmaster.message.DisposeSavepointResponse;
import org.apache.flink.runtime.jobmaster.message.TriggerSavepointResponse;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
import org.apache.flink.runtime.query.KvStateID;
import org.apache.flink.runtime.query.KvStateLocation;
import org.apache.flink.runtime.query.KvStateLocationRegistry;
import org.apache.flink.runtime.query.KvStateServerAddress;
import org.apache.flink.runtime.query.UnknownKvStateLocation;
import org.apache.flink.runtime.registration.RegisteredRpcConnection;
import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.registration.RetryingRegistration;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcMethod;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.util.SerializedThrowable;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.SerializedValue;

import org.slf4j.Logger;

import java.io.IOException;
Expand Down Expand Up @@ -528,20 +539,6 @@ public Acknowledge updateTaskExecutionState(final TaskExecutionState taskExecuti

}

//----------------------------------------------------------------------------------------------

// Internal methods

// ----------------------------------------------------------------------------------------------



private void handleFatalError(final Throwable cause) {
runAsync(new Runnable() {
@Override
public void run() {
log.error("Fatal error occurred on JobManager, cause: {}", cause.getMessage(), cause);
shutDown();
jobCompletionActions.onFatalError(cause);
}
});
}

@RpcMethod
public SerializedInputSplit requestNextInputSplit(
Expand Down Expand Up @@ -594,9 +591,9 @@ public SerializedInputSplit requestNextInputSplit(

@RpcMethod
public ExecutionState requestPartitionState(
final JobID ignored, // redundant parameter from when a JobManager would handle multiple jobs
final IntermediateDataSetID intermediateResultId,
final ResultPartitionID resultPartitionId) throws PartitionProducerDisposedException {
final JobID ignored,
final IntermediateDataSetID intermediateResultId,
final ResultPartitionID resultPartitionId) throws PartitionProducerDisposedException {

final Execution execution = executionGraph.getRegisteredExecutions().get(resultPartitionId.getProducerId());
if (execution != null) {
Expand Down Expand Up @@ -660,6 +657,169 @@ public void declineCheckpoint(
// Internal methods
//----------------------------------------------------------------------------------------------

@RpcMethod
public void resourceRemoved(final ResourceID resourceId, final String message) {
// TODO: remove resource from slot pool
}

@RpcMethod
public void acknowledgeCheckpoint(final AcknowledgeCheckpoint acknowledge) {
if (executionGraph != null) {
final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
if (checkpointCoordinator != null) {
getRpcService().execute(new Runnable() {
@Override
public void run() {
try {
if (!checkpointCoordinator.receiveAcknowledgeMessage(acknowledge)) {
log.info("Received message for non-existing checkpoint {}.",
acknowledge.getCheckpointId());
}
} catch (Exception e) {
log.error("Error in CheckpointCoordinator while processing {}", acknowledge, e);
}
}
});
}
else {
log.error("Received AcknowledgeCheckpoint message for job {} with no CheckpointCoordinator",
jobGraph.getJobID());
}
} else {
log.error("Received AcknowledgeCheckpoint for unavailable job {}", jobGraph.getJobID());
}
}

@RpcMethod
public void declineCheckpoint(final DeclineCheckpoint decline) {
if (executionGraph != null) {
final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
if (checkpointCoordinator != null) {
getRpcService().execute(new Runnable() {
@Override
public void run() {
try {
log.info("Received message for non-existing checkpoint {}.", decline.getCheckpointId());
} catch (Exception e) {
log.error("Error in CheckpointCoordinator while processing {}", decline, e);
}
}
});
} else {
log.error("Received DeclineCheckpoint message for job {} with no CheckpointCoordinator",
jobGraph.getJobID());
}
} else {
log.error("Received AcknowledgeCheckpoint for unavailable job {}", jobGraph.getJobID());
}
}

@RpcMethod
public KvStateLocation lookupKvStateLocation(final String registrationName) throws Exception {
if (executionGraph != null) {
if (log.isDebugEnabled()) {
log.debug("Lookup key-value state for job {} with registration " +
"name {}.", jobGraph.getJobID(), registrationName);
}

final KvStateLocationRegistry registry = executionGraph.getKvStateLocationRegistry();
final KvStateLocation location = registry.getKvStateLocation(registrationName);
if (location != null) {
return location;
} else {
throw new UnknownKvStateLocation(registrationName);
}
} else {
throw new IllegalStateException("Received lookup KvState location request for unavailable job " +
jobGraph.getJobID());
}
}

@RpcMethod
public void notifyKvStateRegistered(
final JobVertexID jobVertexId,
final KeyGroupRange keyGroupRange,
final String registrationName,
final KvStateID kvStateId,
final KvStateServerAddress kvStateServerAddress)
{
if (executionGraph != null) {
if (log.isDebugEnabled()) {
log.debug("Key value state registered for job {} under name {}.",
jobGraph.getJobID(), registrationName);
}
try {
executionGraph.getKvStateLocationRegistry().notifyKvStateRegistered(
jobVertexId, keyGroupRange, registrationName, kvStateId, kvStateServerAddress
);
} catch (Exception e) {
log.error("Failed to notify KvStateRegistry about registration {}.", registrationName);
}
} else {
log.error("Received notify KvState registered request for unavailable job " + jobGraph.getJobID());
}
}

@RpcMethod
public void notifyKvStateUnregistered(
JobVertexID jobVertexId,
KeyGroupRange keyGroupRange,
String registrationName)
{
if (executionGraph != null) {
if (log.isDebugEnabled()) {
log.debug("Key value state unregistered for job {} under name {}.",
jobGraph.getJobID(), registrationName);
}
try {
executionGraph.getKvStateLocationRegistry().notifyKvStateUnregistered(
jobVertexId, keyGroupRange, registrationName
);
} catch (Exception e) {
log.error("Failed to notify KvStateRegistry about registration {}.", registrationName);
}
} else {
log.error("Received notify KvState unregistered request for unavailable job " + jobGraph.getJobID());
}
}

@RpcMethod
public Future<TriggerSavepointResponse> triggerSavepoint() throws Exception {
return null;
}

@RpcMethod
public DisposeSavepointResponse disposeSavepoint(final String savepointPath) {
// TODO
return null;
}

@RpcMethod
public ClassloadingProps requestClassloadingProps() throws Exception {
if (executionGraph != null) {
return new ClassloadingProps(libraryCacheManager.getBlobServerPort(),
executionGraph.getRequiredJarFiles(),
executionGraph.getRequiredClasspaths());
} else {
throw new Exception("Received classloading props request for unavailable job " + jobGraph.getJobID());
}
}

//----------------------------------------------------------------------------------------------
// Internal methods
//----------------------------------------------------------------------------------------------

private void handleFatalError(final Throwable cause) {
runAsync(new Runnable() {
@Override
public void run() {
log.error("Fatal error occurred on JobManager, cause: {}", cause.getMessage(), cause);
shutDown();
jobCompletionActions.onFatalError(cause);
}
});
}

// TODO - wrap this as StatusListenerMessenger's callback with rpc main thread
private void jobStatusChanged(final JobStatus newJobStatus, long timestamp, final Throwable error) {
final JobID jobID = executionGraph.getJobID();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,18 @@
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmaster.message.ClassloadingProps;
import org.apache.flink.runtime.jobmaster.message.DisposeSavepointResponse;
import org.apache.flink.runtime.jobmaster.message.TriggerSavepointResponse;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.runtime.query.KvStateID;
import org.apache.flink.runtime.query.KvStateLocation;
import org.apache.flink.runtime.query.KvStateServerAddress;
import org.apache.flink.runtime.rpc.RpcTimeout;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KvState;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;

import java.util.UUID;
Expand Down Expand Up @@ -111,4 +121,80 @@ Future<ExecutionState> requestPartitionState(
* @param resourceID identifying the TaskManager to disconnect
*/
void disconnectTaskManager(ResourceID resourceID);

/**
* Notifies the JobManager about the removal of a resource.
*
* @param resourceId The ID under which the resource is registered.
* @param message Optional message with details, for logging and debugging.
*/

void resourceRemoved(final ResourceID resourceId, final String message);

/**
* Notifies the JobManager that the checkpoint of an individual task is completed.
*
* @param acknowledge The acknowledge message of the checkpoint
*/
void acknowledgeCheckpoint(final AcknowledgeCheckpoint acknowledge);

/**
* Notifies the JobManager that a checkpoint request could not be heeded.
* This can happen if a Task is already in RUNNING state but is internally not yet ready to perform checkpoints.
*
* @param decline The decline message of the checkpoint
*/
void declineCheckpoint(final DeclineCheckpoint decline);

/**
* Requests a {@link KvStateLocation} for the specified {@link KvState} registration name.
*
* @param registrationName Name under which the KvState has been registered.
* @return Future of the requested {@link KvState} location
*/
Future<KvStateLocation> lookupKvStateLocation(final String registrationName) throws Exception;

/**
* @param jobVertexId JobVertexID the KvState instance belongs to.
* @param keyGroupRange Key group range the KvState instance belongs to.
* @param registrationName Name under which the KvState has been registered.
* @param kvStateId ID of the registered KvState instance.
* @param kvStateServerAddress Server address where to find the KvState instance.
*/
void notifyKvStateRegistered(
final JobVertexID jobVertexId,
final KeyGroupRange keyGroupRange,
final String registrationName,
final KvStateID kvStateId,
final KvStateServerAddress kvStateServerAddress);

/**
* @param jobVertexId JobVertexID the KvState instance belongs to.
* @param keyGroupRange Key group index the KvState instance belongs to.
* @param registrationName Name under which the KvState has been registered.
*/
void notifyKvStateUnregistered(
JobVertexID jobVertexId,
KeyGroupRange keyGroupRange,
String registrationName);

/**
* Notifies the JobManager to trigger a savepoint for this job.
*
* @return Future of the savepoint trigger response.
*/
Future<TriggerSavepointResponse> triggerSavepoint();

/**
* Notifies the Jobmanager to dispose specified savepoint.
*
* @param savepointPath The path of the savepoint.
* @return The future of the savepoint disponse response.
*/
Future<DisposeSavepointResponse> disposeSavepoint(final String savepointPath);

/**
* Request the classloading props of this job.
*/
Future<ClassloadingProps> requestClassloadingProps();
}

0 comments on commit 4e5f423

Please sign in to comment.