From 6db95530fce62d5695a246232d11306aac005ef0 Mon Sep 17 00:00:00 2001 From: Kurt Young Date: Thu, 8 Sep 2016 12:00:13 +0800 Subject: [PATCH] [FLINK-4408][JobManager] Introduce JobMasterRunner and implement job submission & setting up the ExecutionGraph --- .../HighAvailabilityServices.java | 12 + .../highavailability/NonHaServices.java | 14 + .../flink/runtime/jobmaster/JobMaster.java | 478 +++++++++++++----- .../runtime/jobmaster/JobMasterGateway.java | 24 + .../runtime/jobmaster/JobMasterRunner.java | 200 ++++++++ .../runtime/taskexecutor/TaskExecutor.java | 12 + .../TestingHighAvailabilityServices.java | 36 ++ 7 files changed, 663 insertions(+), 113 deletions(-) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRunner.java diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java index 298147cf1dce1..04b0ce772d8f3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java @@ -19,6 +19,8 @@ package org.apache.flink.runtime.highavailability; import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; +import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore; import org.apache.flink.runtime.leaderelection.LeaderElectionService; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; @@ -52,4 +54,14 @@ public interface HighAvailabilityServices { * @param jobID The identifier of the job running the election. */ LeaderElectionService getJobMasterLeaderElectionService(JobID jobID) throws Exception; + + /** + * Gets the checkpoint recovery factory for the job manager + */ + CheckpointRecoveryFactory getCheckpointRecoveryFactory() throws Exception; + + /** + * Gets the submitted job graph store for the job manager + */ + SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java index 292a404118665..2dbd393679466 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java @@ -19,6 +19,10 @@ package org.apache.flink.runtime.highavailability; import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; +import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory; +import org.apache.flink.runtime.jobmanager.StandaloneSubmittedJobGraphStore; +import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore; import org.apache.flink.runtime.leaderelection.LeaderElectionService; import org.apache.flink.runtime.leaderelection.StandaloneLeaderElectionService; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; @@ -69,4 +73,14 @@ public LeaderElectionService getResourceManagerLeaderElectionService() throws Ex public LeaderElectionService getJobMasterLeaderElectionService(JobID jobID) throws Exception { return new StandaloneLeaderElectionService(); } + + @Override + public CheckpointRecoveryFactory getCheckpointRecoveryFactory() throws Exception { + return new StandaloneCheckpointRecoveryFactory(); + } + + @Override + public SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception { + return new StandaloneSubmittedJobGraphStore(); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 0a6a7ef0938ec..8ce02b48a8267 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -18,21 +18,51 @@ package org.apache.flink.runtime.jobmaster; -import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; +import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore; +import org.apache.flink.runtime.checkpoint.savepoint.SavepointLoader; +import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore; +import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker; +import org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker; +import org.apache.flink.runtime.checkpoint.stats.SimpleCheckpointStatsTracker; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.client.JobSubmissionException; +import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; +import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.runtime.leaderelection.LeaderContender; -import org.apache.flink.runtime.leaderelection.LeaderElectionService; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator; +import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings; +import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore; +import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.messages.Acknowledge; -import org.apache.flink.runtime.rpc.RpcMethod; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.rpc.RpcEndpoint; +import org.apache.flink.runtime.rpc.RpcMethod; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.taskmanager.TaskExecutionState; -import org.apache.flink.util.Preconditions; -import java.util.UUID; +import scala.concurrent.ExecutionContext; +import scala.concurrent.duration.FiniteDuration; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkNotNull; /** * JobMaster implementation. The job master is responsible for the execution of a single @@ -41,7 +71,7 @@ * It offers the following methods as part of its rpc interface to interact with the JobMaster * remotely: * */ @@ -52,7 +82,6 @@ public class JobMaster extends RpcEndpoint { /** Logical representation of the job */ private final JobGraph jobGraph; - private final JobID jobID; /** Configuration of the job */ private final Configuration configuration; @@ -60,32 +89,62 @@ public class JobMaster extends RpcEndpoint { /** Service to contend for and retrieve the leadership of JM and RM */ private final HighAvailabilityServices highAvailabilityServices; - /** Leader Management */ - private LeaderElectionService leaderElectionService = null; - private UUID leaderSessionID; + /** Blob cache manager used across jobs */ + private final BlobLibraryCacheManager libraryCacheManager; + + /** Factory to create restart strategy for this job */ + private final RestartStrategyFactory restartStrategyFactory; + + /** Store for save points */ + private final SavepointStore savepointStore; + + /** The timeout for this job */ + private final FiniteDuration timeout; + + /** The scheduler to use for scheduling new tasks as they are needed */ + private final Scheduler scheduler; + + /** The metrics group used across jobs */ + private final JobManagerMetricGroup jobManagerMetricGroup; + + /** The execution context which is used to execute futures */ + private final ExecutionContext executionContext; + + /** The execution graph of this job */ + private volatile ExecutionGraph executionGraph; + + /** The checkpoint recovery factory used by this job */ + private CheckpointRecoveryFactory checkpointRecoveryFactory; + + /** Store for all submitted job graphs */ + private SubmittedJobGraphStore submittedJobGraphs; - /** - * The JM's Constructor - * - * @param jobGraph The representation of the job's execution plan - * @param configuration The job's configuration - * @param rpcService The RPC service at which the JM serves - * @param highAvailabilityService The cluster's HA service from the JM can elect and retrieve leaders. - */ public JobMaster( JobGraph jobGraph, Configuration configuration, RpcService rpcService, - HighAvailabilityServices highAvailabilityService) { - + HighAvailabilityServices highAvailabilityService, + BlobLibraryCacheManager libraryCacheManager, + RestartStrategyFactory restartStrategyFactory, + SavepointStore savepointStore, + FiniteDuration timeout, + Scheduler scheduler, + JobManagerMetricGroup jobManagerMetricGroup) + { super(rpcService); - - this.jobGraph = Preconditions.checkNotNull(jobGraph); - this.jobID = Preconditions.checkNotNull(jobGraph.getJobID()); - - this.configuration = Preconditions.checkNotNull(configuration); - - this.highAvailabilityServices = Preconditions.checkNotNull(highAvailabilityService); + this.jobGraph = checkNotNull(jobGraph); + this.configuration = checkNotNull(configuration); + this.highAvailabilityServices = checkNotNull(highAvailabilityService); + this.libraryCacheManager = checkNotNull(libraryCacheManager); + this.restartStrategyFactory = checkNotNull(restartStrategyFactory); + this.savepointStore = checkNotNull(savepointStore); + this.timeout = checkNotNull(timeout); + this.scheduler = checkNotNull(scheduler); + this.jobManagerMetricGroup = checkNotNull(jobManagerMetricGroup); + this.executionContext = checkNotNull(rpcService.getExecutionContext()); + this.executionGraph = null; + this.checkpointRecoveryFactory = null; + this.submittedJobGraphs = null; } public ResourceManagerGateway getResourceManager() { @@ -93,93 +152,297 @@ public ResourceManagerGateway getResourceManager() { } //---------------------------------------------------------------------------------------------- - // Initialization methods + // Lifecycle management //---------------------------------------------------------------------------------------------- + + @Override public void start() { super.start(); - // register at the election once the JM starts - registerAtElectionService(); + try { + checkpointRecoveryFactory = highAvailabilityServices.getCheckpointRecoveryFactory(); + } catch (Exception e) { + log.error("Could not get the checkpoint recovery factory.", e); + throw new RuntimeException("Could not get the checkpoint recovery factory.", e); + } + + try { + submittedJobGraphs = highAvailabilityServices.getSubmittedJobGraphStore(); + } catch (Exception e) { + log.error("Could not start the JobManager because we cannot get the job graph store.", e); + throw new RuntimeException("Could not get the job graph store.", e); + } + } + @Override + public void shutDown() { + super.shutDown(); + + suspendJob(new Exception("JobManager is shutting down.")); + } //---------------------------------------------------------------------------------------------- - // JobMaster Leadership methods + // RPC methods //---------------------------------------------------------------------------------------------- - /** - * Retrieves the election service and contend for the leadership. - */ - private void registerAtElectionService() { - try { - leaderElectionService = highAvailabilityServices.getJobMasterLeaderElectionService(jobID); - leaderElectionService.start(new JobMasterLeaderContender()); - } catch (Exception e) { - throw new RuntimeException("Fail to register at the election of JobMaster", e); - } - } /** - * Start the execution when the leadership is granted. + * Submits a job to the job manager. The job is registered at the libraryCacheManager which + * creates the job's class loader. The job graph is appended to the corresponding execution + * graph and be prepared to run. * - * @param newLeaderSessionID The identifier of the new leadership session + * @param isRecovery Flag indicating whether this is a recovery or initial submission + * @return Flag indicating whether this job has been accepted */ - public void grantJobMasterLeadership(final UUID newLeaderSessionID) { - runAsync(new Runnable() { - @Override - public void run() { - log.info("JobManager {} grants leadership with session id {}.", getAddress(), newLeaderSessionID); + @RpcMethod + public boolean submitJob(final boolean isRecovery) { + log.info("Submitting job {} ({}) " + (isRecovery ? "(Recovery)" : ""), jobGraph.getJobID(), jobGraph.getName()); - // The operation may be blocking, but since JM is idle before it grants the leadership, it's okay that - // JM waits here for the operation's completeness. - leaderSessionID = newLeaderSessionID; - leaderElectionService.confirmLeaderSessionID(newLeaderSessionID); + try { + // IMPORTANT: We need to make sure that the library registration is the first action, + // because this makes sure that the uploaded jar files are removed in case of + // unsuccessful + try { + libraryCacheManager.registerJob(jobGraph.getJobID(), jobGraph.getUserJarBlobKeys(), + jobGraph.getClasspaths()); + } catch (Throwable t) { + throw new JobSubmissionException(jobGraph.getJobID(), + "Cannot set up the user code libraries: " + t.getMessage(), t); + } - // TODO:: execute the job when the leadership is granted. + final ClassLoader userCodeLoader = libraryCacheManager.getClassLoader(jobGraph.getJobID()); + if (userCodeLoader == null) { + throw new JobSubmissionException(jobGraph.getJobID(), + "The user code class loader could not be initialized."); } - }); - } - /** - * Stop the execution when the leadership is revoked. - */ - public void revokeJobMasterLeadership() { - runAsync(new Runnable() { - @Override - public void run() { - log.info("JobManager {} was revoked leadership.", getAddress()); + if (jobGraph.getNumberOfVertices() == 0) { + throw new JobSubmissionException(jobGraph.getJobID(), "The given job is empty"); + } - // TODO:: cancel the job's execution and notify all listeners - cancelAndClearEverything(new Exception("JobManager is no longer the leader.")); + final RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration = + jobGraph.getSerializedExecutionConfig() + .deserializeValue(userCodeLoader) + .getRestartStrategy(); + final RestartStrategy restartStrategy; + if (restartStrategyConfiguration != null) { + restartStrategy = RestartStrategyFactory.createRestartStrategy(restartStrategyConfiguration); + } else { + restartStrategy = restartStrategyFactory.createRestartStrategy(); + } + log.info("Using restart strategy {} for {}.", restartStrategy, jobGraph.getJobID()); - leaderSessionID = null; + MetricGroup jobMetrics = null; + if (jobManagerMetricGroup != null) { + jobMetrics = jobManagerMetricGroup.addJob(jobGraph); } - }); + if (jobMetrics == null) { + jobMetrics = new UnregisteredMetricsGroup(); + } + + if (executionGraph != null) { + executionGraph = new ExecutionGraph( + executionContext, + jobGraph.getJobID(), + jobGraph.getName(), + jobGraph.getJobConfiguration(), + jobGraph.getSerializedExecutionConfig(), + timeout, + restartStrategy, + jobGraph.getUserJarBlobKeys(), + jobGraph.getClasspaths(), + userCodeLoader, + jobMetrics); + } else { + // TODO: update last active time in JobInfo + } + + executionGraph.setScheduleMode(jobGraph.getScheduleMode()); + executionGraph.setQueuedSchedulingAllowed(jobGraph.getAllowQueuedScheduling()); + + try { + executionGraph.setJsonPlan(JsonPlanGenerator.generatePlan(jobGraph)); + } catch (Exception e) { + log.warn("Cannot create JSON plan for job {} ({})", jobGraph.getJobID(), jobGraph.getName(), e); + executionGraph.setJsonPlan("{}"); + } + + // initialize the vertices that have a master initialization hook + // file output formats create directories here, input formats create splits + if (log.isDebugEnabled()) { + log.debug("Running initialization on master for job {} ({}).", jobGraph.getJobID(), jobGraph.getName()); + } + for (JobVertex vertex : jobGraph.getVertices()) { + final String executableClass = vertex.getInvokableClassName(); + if (executableClass == null || executableClass.length() == 0) { + throw new JobSubmissionException(jobGraph.getJobID(), + "The vertex " + vertex.getID() + " (" + vertex.getName() + ") has no invokable class."); + } + if (vertex.getParallelism() == ExecutionConfig.PARALLELISM_AUTO_MAX) { + vertex.setParallelism(scheduler.getTotalNumberOfSlots()); + } + + try { + vertex.initializeOnMaster(userCodeLoader); + } catch (Throwable t) { + throw new JobExecutionException(jobGraph.getJobID(), + "Cannot initialize task '" + vertex.getName() + "': " + t.getMessage(), t); + } + } + + // topologically sort the job vertices and attach the graph to the existing one + final List sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources(); + if (log.isDebugEnabled()) { + log.debug("Adding {} vertices from job graph {} ({}).", sortedTopology.size(), + jobGraph.getJobID(), jobGraph.getName()); + } + executionGraph.attachJobGraph(sortedTopology); + + if (log.isDebugEnabled()) { + log.debug("Successfully created execution graph from job graph {} ({}).", + jobGraph.getJobID(), jobGraph.getName()); + } + + final JobSnapshottingSettings snapshotSettings = jobGraph.getSnapshotSettings(); + if (snapshotSettings != null) { + List triggerVertices = getExecutionJobVertexWithId( + executionGraph, snapshotSettings.getVerticesToTrigger()); + + List ackVertices = getExecutionJobVertexWithId( + executionGraph, snapshotSettings.getVerticesToAcknowledge()); + + List confirmVertices = getExecutionJobVertexWithId( + executionGraph, snapshotSettings.getVerticesToConfirm()); + + CompletedCheckpointStore completedCheckpoints = checkpointRecoveryFactory.createCheckpointStore( + jobGraph.getJobID(), userCodeLoader); + + CheckpointIDCounter checkpointIdCounter = checkpointRecoveryFactory.createCheckpointIDCounter( + jobGraph.getJobID()); + + // Checkpoint stats tracker + boolean isStatsDisabled = configuration.getBoolean( + ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_DISABLE, + ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_DISABLE); + + final CheckpointStatsTracker checkpointStatsTracker; + if (isStatsDisabled) { + checkpointStatsTracker = new DisabledCheckpointStatsTracker(); + } else { + int historySize = configuration.getInteger( + ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE, + ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE); + checkpointStatsTracker = new SimpleCheckpointStatsTracker(historySize, ackVertices, jobMetrics); + } + + executionGraph.enableSnapshotCheckpointing( + snapshotSettings.getCheckpointInterval(), + snapshotSettings.getCheckpointTimeout(), + snapshotSettings.getMinPauseBetweenCheckpoints(), + snapshotSettings.getMaxConcurrentCheckpoints(), + triggerVertices, + ackVertices, + confirmVertices, + checkpointIdCounter, + completedCheckpoints, + savepointStore, + checkpointStatsTracker); + } + + // TODO: register job status change listeners + // TODO: register client listeners + + if (isRecovery) { + // this is a recovery of a master failure (this master takes over) + executionGraph.restoreLatestCheckpointedState(); + } else { + if (snapshotSettings != null) { + String savepointPath = snapshotSettings.getSavepointPath(); + if (savepointPath != null) { + // got a savepoint + log.info("Starting job from savepoint {}.", savepointPath); + + // load the savepoint as a checkpoint into the system + final CompletedCheckpoint savepoint = SavepointLoader.loadAndValidateSavepoint( + jobGraph.getJobID(), executionGraph.getAllVertices(), savepointStore, savepointPath); + executionGraph.getCheckpointCoordinator().getCheckpointStore().addCheckpoint(savepoint); + + // Reset the checkpoint ID counter + long nextCheckpointId = savepoint.getCheckpointID() + 1; + log.info("Reset the checkpoint ID to " + nextCheckpointId); + executionGraph.getCheckpointCoordinator().getCheckpointIdCounter().setCount(nextCheckpointId); + + executionGraph.restoreLatestCheckpointedState(); + } + } + + // TODO: add this job to submitted job graph store + + } + + // TODO: notify client about job submit success + + return true; + } catch (Throwable t) { + log.error("Failed to submit job {} ({})", jobGraph.getJobID(), jobGraph.getName(), t); + + libraryCacheManager.unregisterJob(jobGraph.getJobID()); + + if (executionGraph != null) { + executionGraph.fail(t); + executionGraph = null; + } + + final Throwable rt; + if (t instanceof JobExecutionException) { + rt = (JobExecutionException) t; + } else { + rt = new JobExecutionException(jobGraph.getJobID(), + "Failed to start job " + jobGraph.getJobID() + " (" + jobGraph.getName() + ")", t); + } + // TODO: notify client of this failure + + // Any error occurred during submission phase will make this job as rejected + return false; + } } /** - * Handles error occurring in the leader election service - * - * @param exception Exception thrown in the leader election service + * Making this job begins to run. */ - public void onJobMasterElectionError(final Exception exception) { - runAsync(new Runnable() { + @RpcMethod + public void startJob() { + log.info("Starting job {} ({}).", jobGraph.getJobID(), jobGraph.getName()); + + // start scheduling job in another thread + getRpcService().getExecutionContext().execute(new Runnable() { @Override public void run() { - log.error("Received an error from the LeaderElectionService.", exception); - - // TODO:: cancel the job's execution and shutdown the JM - cancelAndClearEverything(exception); - - leaderSessionID = null; + if (executionGraph != null) { + try { + executionGraph.scheduleForExecution(scheduler); + } catch (Throwable t) { + executionGraph.fail(t); + } + } } }); - } - //---------------------------------------------------------------------------------------------- - // RPC methods - //---------------------------------------------------------------------------------------------- + /** + * Suspending job, all the running tasks will be cancelled, and runtime status will be cleared. Should re-submit + * the job before restarting it. + * + * @param cause The reason of why this job been suspended. + */ + @RpcMethod + public void suspendJob(final Throwable cause) { + if (executionGraph != null) { + executionGraph.suspend(cause); + executionGraph = null; + } + } /** * Updates the task execution state for a given task. @@ -208,37 +471,26 @@ public void registerAtResourceManager(final String address) { //---------------------------------------------------------------------------------------------- /** - * Cancel the current job and notify all listeners the job's cancellation. + * Converts JobVertexIDs to corresponding ExecutionJobVertexes * - * @param cause Cause for the cancelling. + * @param executionGraph The execution graph that holds the relationship + * @param vertexIDs The vertexIDs need to be converted + * @return The corresponding ExecutionJobVertexes + * @throws JobSubmissionException */ - private void cancelAndClearEverything(Throwable cause) { - // currently, nothing to do here - } - - // ------------------------------------------------------------------------ - // Utility classes - // ------------------------------------------------------------------------ - private class JobMasterLeaderContender implements LeaderContender { - - @Override - public void grantLeadership(UUID leaderSessionID) { - JobMaster.this.grantJobMasterLeadership(leaderSessionID); - } - - @Override - public void revokeLeadership() { - JobMaster.this.revokeJobMasterLeadership(); - } - - @Override - public String getAddress() { - return JobMaster.this.getAddress(); - } - - @Override - public void handleError(Exception exception) { - onJobMasterElectionError(exception); + private static List getExecutionJobVertexWithId( + final ExecutionGraph executionGraph, final List vertexIDs) + throws JobSubmissionException + { + final List ret = new ArrayList<>(vertexIDs.size()); + for (JobVertexID vertexID : vertexIDs) { + final ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(vertexID); + if (executionJobVertex == null) { + throw new JobSubmissionException(executionGraph.getJobID(), + "The snapshot checkpointing settings refer to non-existent vertex " + vertexID); + } + ret.add(executionJobVertex); } + return ret; } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java index a53e383942744..2e496cb41a2f5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java @@ -21,6 +21,7 @@ import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.rpc.RpcGateway; import org.apache.flink.runtime.taskmanager.TaskExecutionState; + import scala.concurrent.Future; /** @@ -28,6 +29,29 @@ */ public interface JobMasterGateway extends RpcGateway { + /** + * Submits a job to the job manager. The job is registered at the libraryCacheManager which + * creates the job's class loader. The job graph is appended to the corresponding execution + * graph and be prepared to run. + * + * @param isRecovery Flag indicating whether this is a recovery or initial submission + * @return Flag indicating whether this job has been accepted + */ + Future submitJob(final boolean isRecovery); + + /** + * Making this job begins to run. + */ + void startJob(); + + /** + * Suspending job, all the running tasks will be cancelled, and runtime status will be cleared. Should re-submit + * the job before restarting it. + * + * @param cause The reason of why this job been suspended. + */ + void suspendJob(final Throwable cause); + /** * Updates the task execution state for a given task. * diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRunner.java new file mode 100644 index 0000000000000..cb747b30299b0 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRunner.java @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster; + +import akka.dispatch.OnComplete; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore; +import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; +import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; +import org.apache.flink.runtime.leaderelection.LeaderContender; +import org.apache.flink.runtime.leaderelection.LeaderElectionService; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.StartStoppable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import scala.concurrent.ExecutionContext; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; + +import java.util.UUID; + +/** + * The runner for the job manager. It deals with job level leader election and make underlying job manager + * properly reacted. Also this runner takes care of determining whether job manager should be recovered, + * until it's been fully disposed. + */ +public class JobMasterRunner implements StartStoppable, LeaderContender { + + private final Logger log = LoggerFactory.getLogger(JobMasterRunner.class); + + /** The job graph needs to run */ + private final JobGraph jobGraph; + + /** Whether is job is an initial submission or recovered */ + private volatile boolean isRecovery; + + /** Provides services needed by high availability */ + private final HighAvailabilityServices highAvailabilityServices; + + /** The manager of the job */ + private final JobMaster jobManager; + + /** The execution context which is used to execute futures */ + private final ExecutionContext executionContext; + + /** Leader election for this job */ + private LeaderElectionService leaderElectionService; + + /** Leader session id when granted leadership */ + private UUID leaderSessionID; + + public JobMasterRunner( + final JobGraph jobGraph, + final boolean isRecovery, + final Configuration configuration, + final RpcService rpcService, + final HighAvailabilityServices highAvailabilityServices, + final BlobLibraryCacheManager libraryCacheManager, + final RestartStrategyFactory restartStrategyFactory, + final SavepointStore savepointStore, + final FiniteDuration timeout, + final Scheduler scheduler, + final JobManagerMetricGroup jobManagerMetricGroup) + { + this.jobGraph = jobGraph; + this.isRecovery = isRecovery; + this.highAvailabilityServices = highAvailabilityServices; + this.jobManager = new JobMaster(jobGraph, configuration, rpcService, highAvailabilityServices, + libraryCacheManager, restartStrategyFactory, savepointStore, timeout, scheduler, jobManagerMetricGroup); + this.executionContext = rpcService.getExecutionContext(); + + this.leaderElectionService = null; + this.leaderSessionID = null; + } + + //---------------------------------------------------------------------------------------------- + // Lifecycle management + //---------------------------------------------------------------------------------------------- + + public void start() { + jobManager.start(); + + try { + leaderElectionService = highAvailabilityServices.getJobMasterLeaderElectionService(jobGraph.getJobID()); + leaderElectionService.start(this); + } catch (Exception e) { + log.error("Could not start the JobManager because the leader election service did not start.", e); + throw new RuntimeException("Could not start the leader election service.", e); + } + } + + public void stop() { + if (leaderElectionService != null) { + try { + leaderElectionService.stop(); + } catch (Exception e) { + log.error("Could not properly shutdown the leader election service."); + } + } + + jobManager.shutDown(); + } + + public void done() { + // TODO: called when job is done + } + + //---------------------------------------------------------------------------------------------- + // Leadership methods + //---------------------------------------------------------------------------------------------- + + @Override + public void grantLeadership(UUID leaderSessionID) { + log.info("JobManager for job {} ({}) was granted leadership with session id {} at {}.", + jobGraph.getJobID(), jobGraph.getName(), leaderSessionID, getAddress()); + + // The operation may be blocking, but since this runner is idle before it been granted the leadership, + // it's okay that job manager wait for the operation complete + leaderElectionService.confirmLeaderSessionID(leaderSessionID); + this.leaderSessionID = leaderSessionID; + + Future submitResult = jobManager.getSelf().submitJob(isRecovery); + submitResult.onComplete(new OnComplete() { + @Override + public void onComplete(Throwable failure, Boolean success) throws Throwable { + if (failure != null) { + log.error("Failed to submit job {} ({})", jobGraph.getJobID(), jobGraph.getName(), failure); + // TODO: notify JobMasterRunner holder to shutdown this runner. + } else { + if (success) { + if (leaderElectionService.hasLeadership()) { + // There is a small chance that multiple job managers schedule the same job after if + // they try to recover at the same time. This will eventually be noticed, but can not be + // ruled out from the beginning. + + // NOTE: Scheduling the job for execution is a separate action from the job submission. + // The success of submitting the job must be independent from the success of scheduling + // the job. + jobManager.getSelf().startJob(); + + if (!isRecovery) { + // we have started a newly submitted job, after this, every time we want to restart + // this job again, we should treat it as recovery + isRecovery = true; + } + } else { + // Do nothing here, since revokeLeadership will either be called soon or + // has already been called + log.warn("Submitted job {} ({}), but not leader already, waiting to get leadership" + + "and then retry.", jobGraph.getJobID(), jobGraph.getName()); + } + } else { + // TODO: notify JobMasterRunner holder to shutdown this runner. + } + } + } + }, executionContext); + + } + + @Override + public void revokeLeadership() { + log.info("JobManager for job {} ({}) was revoked leadership at {}.", + jobGraph.getJobID(), jobGraph.getName(), getAddress()); + leaderSessionID = null; + jobManager.getSelf().suspendJob(new Exception("JobManager is no longer the leader.")); + } + + @Override + public String getAddress() { + return jobManager.getAddress(); + } + + @Override + public void handleError(Exception exception) { + log.error("Received an error from the leader election service.", exception); + // TODO: let the outside know and stop this runner + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index a455fe2dec72d..56b325f96109a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -21,12 +21,14 @@ import akka.actor.ActorSystem; import akka.util.Timeout; import com.typesafe.config.Config; +import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; import org.apache.flink.runtime.io.network.ConnectionManager; import org.apache.flink.runtime.io.network.LocalConnectionManager; import org.apache.flink.runtime.io.network.TaskEventDispatcher; import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; import org.apache.flink.runtime.io.network.netty.NettyConnectionManager; import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; +import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore; import org.apache.flink.runtime.query.KvStateRegistry; import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats; import org.apache.flink.runtime.query.netty.KvStateServer; @@ -318,6 +320,16 @@ public LeaderElectionService getResourceManagerLeaderElectionService() throws Ex public LeaderElectionService getJobMasterLeaderElectionService(JobID jobID) throws Exception { return null; } + + @Override + public CheckpointRecoveryFactory getCheckpointRecoveryFactory() throws Exception { + return null; + } + + @Override + public SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception { + return null; + } }; // start all the TaskManager services (network stack, library cache, ...) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java index 3162f40391ea5..52b5d72ff133c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java @@ -19,6 +19,8 @@ package org.apache.flink.runtime.highavailability; import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; +import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore; import org.apache.flink.runtime.leaderelection.LeaderElectionService; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; @@ -34,6 +36,9 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices private volatile LeaderElectionService resourceManagerLeaderElectionService; + private volatile CheckpointRecoveryFactory checkpointRecoveryFactory; + + private volatile SubmittedJobGraphStore submittedJobGraphStore; // ------------------------------------------------------------------------ // Setters for mock / testing implementations @@ -51,6 +56,14 @@ public void setResourceManagerLeaderElectionService(LeaderElectionService leader this.resourceManagerLeaderElectionService = leaderElectionService; } + public void setCheckpointRecoveryFactory(CheckpointRecoveryFactory checkpointRecoveryFactory) { + this.checkpointRecoveryFactory = checkpointRecoveryFactory; + } + + public void setSubmittedJobGraphStore(SubmittedJobGraphStore submittedJobGraphStore) { + this.submittedJobGraphStore = submittedJobGraphStore; + } + // ------------------------------------------------------------------------ // HA Services Methods // ------------------------------------------------------------------------ @@ -86,4 +99,27 @@ public LeaderElectionService getResourceManagerLeaderElectionService() throws Ex throw new IllegalStateException("ResourceManagerLeaderElectionService has not been set"); } } + + @Override + public CheckpointRecoveryFactory getCheckpointRecoveryFactory() throws Exception { + CheckpointRecoveryFactory factory = checkpointRecoveryFactory; + + if (factory != null) { + return factory; + } else { + throw new IllegalStateException("CheckpointRecoveryFactory has not been set"); + } + } + + @Override + public SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception { + SubmittedJobGraphStore store = submittedJobGraphStore; + + if (store != null) { + return store; + } else { + throw new IllegalStateException("SubmittedJobGraphStore has not been set"); + + } + } }