From c09e579b26752f2ae477d60e8fbb6ccdce315df9 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Mon, 25 Sep 2017 15:29:59 +0200 Subject: [PATCH] [FLINK-7668] Add ExecutionGraphCache for ExecutionGraph based REST handlers The ExecutionGraphCache replaces the ExecutionGraphHolder. Unlike the latter, the former does not expect the AccessExecutionGraph to be the true ExecutionGraph. Instead it assumes it to be the ArchivedExecutionGraph. Therefore, it invalidates the cache entries after a given time to live period. This will trigger requesting the AccessExecutionGraph again and, thus, updating the ExecutionGraph information for the ExecutionGraph based REST handlers. In order to avoid memory leaks, the WebRuntimeMonitor starts now a periodic cleanup task which triggers ExecutionGraphCache.cleanup. This methods releases all cache entries which have exceeded their time to live. Currently it is set to 20 * refreshInterval of the web gui. This closes #4728. --- .../MesosApplicationMasterRunner.java | 3 +- .../runtime/webmonitor/WebRuntimeMonitor.java | 130 ++++--- .../webmonitor/WebRuntimeMonitorITCase.java | 4 +- .../runtime/akka/AkkaJobManagerGateway.java | 7 +- .../clusterframework/BootstrapTools.java | 24 +- .../flink/runtime/dispatcher/Dispatcher.java | 14 +- .../runtime/dispatcher/DispatcherGateway.java | 5 - .../runtime/jobmaster/JobManagerGateway.java | 34 -- .../flink/runtime/jobmaster/JobMaster.java | 6 + .../runtime/jobmaster/JobMasterGateway.java | 16 + .../messages/FlinkJobNotFoundException.java | 6 +- .../AbstractExecutionGraphRequestHandler.java | 24 +- .../AbstractJobVertexRequestHandler.java | 2 +- .../AbstractSubtaskAttemptRequestHandler.java | 2 +- .../legacy/AbstractSubtaskRequestHandler.java | 2 +- .../legacy/CurrentJobsOverviewHandler.java | 2 +- .../handler/legacy/ExecutionGraphCache.java | 187 ++++++++++ .../handler/legacy/ExecutionGraphHolder.java | 82 ---- .../legacy/JobAccumulatorsHandler.java | 2 +- .../JobCancellationWithSavepointHandlers.java | 62 +-- .../rest/handler/legacy/JobConfigHandler.java | 4 +- .../handler/legacy/JobDetailsHandler.java | 2 +- .../handler/legacy/JobExceptionsHandler.java | 2 +- .../rest/handler/legacy/JobPlanHandler.java | 2 +- .../legacy/JobVertexAccumulatorsHandler.java | 2 +- .../legacy/JobVertexBackPressureHandler.java | 2 +- .../legacy/JobVertexDetailsHandler.java | 2 +- .../legacy/JobVertexTaskManagersHandler.java | 2 +- .../SubtaskCurrentAttemptDetailsHandler.java | 2 +- ...skExecutionAttemptAccumulatorsHandler.java | 2 +- ...SubtaskExecutionAttemptDetailsHandler.java | 2 +- .../SubtasksAllAccumulatorsHandler.java | 2 +- .../handler/legacy/SubtasksTimesHandler.java | 2 +- .../checkpoints/CheckpointConfigHandler.java | 4 +- .../CheckpointStatsDetailsHandler.java | 4 +- ...CheckpointStatsDetailsSubtasksHandler.java | 4 +- .../checkpoints/CheckpointStatsHandler.java | 4 +- .../runtime/webmonitor/RestfulGateway.java | 39 +- .../runtime/webmonitor/WebMonitorUtils.java | 10 +- .../flink/runtime/jobmanager/JobManager.scala | 4 +- .../minicluster/FlinkMiniCluster.scala | 4 +- .../legacy/ExecutionGraphCacheTest.java | 353 ++++++++++++++++++ .../legacy/JobAccumulatorsHandlerTest.java | 2 +- ...CancellationWithSavepointHandlersTest.java | 19 +- .../handler/legacy/JobConfigHandlerTest.java | 2 +- .../handler/legacy/JobDetailsHandlerTest.java | 2 +- .../legacy/JobExceptionsHandlerTest.java | 2 +- .../handler/legacy/JobPlanHandlerTest.java | 2 +- .../JobVertexAccumulatorsHandlerTest.java | 2 +- .../JobVertexBackPressureHandlerTest.java | 8 +- .../legacy/JobVertexDetailsHandlerTest.java | 2 +- .../JobVertexTaskManagersHandlerTest.java | 2 +- ...btaskCurrentAttemptDetailsHandlerTest.java | 2 +- ...ecutionAttemptAccumulatorsHandlerTest.java | 2 +- ...askExecutionAttemptDetailsHandlerTest.java | 2 +- .../SubtasksAllAccumulatorsHandlerTest.java | 2 +- .../legacy/SubtasksTimesHandlerTest.java | 2 +- .../CheckpointConfigHandlerTest.java | 10 +- .../CheckpointStatsDetailsHandlerTest.java | 12 +- .../CheckpointStatsHandlerTest.java | 6 +- ...ckpointStatsSubtaskDetailsHandlerTest.java | 18 +- .../yarn/YarnApplicationMasterRunner.java | 3 +- 62 files changed, 840 insertions(+), 330 deletions(-) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java delete mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphHolder.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java index 6abdb2893042c..f465464b371d1 100755 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java @@ -33,6 +33,7 @@ import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.clusterframework.BootstrapTools; import org.apache.flink.runtime.clusterframework.ContainerSpecification; +import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; import org.apache.flink.runtime.jobmanager.JobManager; @@ -294,7 +295,7 @@ protected int runPrivileged(Configuration config, Configuration dynamicPropertie new AkkaJobManagerRetriever(actorSystem, webMonitorTimeout, 10, Time.milliseconds(50L)), new AkkaQueryServiceRetriever(actorSystem, webMonitorTimeout), webMonitorTimeout, - futureExecutor, + new ScheduledExecutorServiceAdapter(futureExecutor), LOG); if (webMonitor != null) { final URL webMonitorURL = new URL(webMonitor.getRestAddress()); diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java index 1af6ab641eb47..a37ce2de8225c 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java @@ -23,6 +23,7 @@ import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.WebOptions; import org.apache.flink.runtime.blob.BlobView; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; import org.apache.flink.runtime.jobmanager.MemoryArchivist; import org.apache.flink.runtime.jobmaster.JobManagerGateway; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; @@ -34,7 +35,7 @@ import org.apache.flink.runtime.rest.handler.legacy.CurrentJobIdsHandler; import org.apache.flink.runtime.rest.handler.legacy.CurrentJobsOverviewHandler; import org.apache.flink.runtime.rest.handler.legacy.DashboardConfigHandler; -import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphHolder; +import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; import org.apache.flink.runtime.rest.handler.legacy.JobAccumulatorsHandler; import org.apache.flink.runtime.rest.handler.legacy.JobCancellationHandler; import org.apache.flink.runtime.rest.handler.legacy.JobCancellationWithSavepointHandlers; @@ -93,7 +94,7 @@ import java.io.IOException; import java.util.UUID; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -143,6 +144,10 @@ public class WebRuntimeMonitor implements WebMonitor { private final WebMonitorConfig cfg; + private final ExecutionGraphCache executionGraphCache; + + private final ScheduledFuture executionGraphCleanupTask; + private AtomicBoolean cleanedUp = new AtomicBoolean(); @@ -155,7 +160,7 @@ public WebRuntimeMonitor( LeaderGatewayRetriever jobManagerRetriever, MetricQueryServiceRetriever queryServiceRetriever, Time timeout, - Executor executor) throws IOException, InterruptedException { + ScheduledExecutor scheduledExecutor) throws IOException, InterruptedException { this.leaderRetrievalService = checkNotNull(leaderRetrievalService); this.retriever = Preconditions.checkNotNull(jobManagerRetriever); @@ -193,11 +198,23 @@ public WebRuntimeMonitor( this.uploadDir = null; } - ExecutionGraphHolder currentGraphs = new ExecutionGraphHolder(timeout); + final long timeToLive = cfg.getRefreshInterval() * 10L; + + this.executionGraphCache = new ExecutionGraphCache( + timeout, + Time.milliseconds(timeToLive)); + + final long cleanupInterval = timeToLive * 2L; + + this.executionGraphCleanupTask = scheduledExecutor.scheduleWithFixedDelay( + executionGraphCache::cleanup, + cleanupInterval, + cleanupInterval, + TimeUnit.MILLISECONDS); // - Back pressure stats ---------------------------------------------- - stackTraceSamples = new StackTraceSampleCoordinator(executor, 60000); + stackTraceSamples = new StackTraceSampleCoordinator(scheduledExecutor, 60000); // Back pressure stats tracker config int cleanUpInterval = config.getInteger(WebOptions.BACKPRESSURE_CLEANUP_INTERVAL); @@ -228,55 +245,55 @@ public WebRuntimeMonitor( } else { serverSSLContext = null; } - metricFetcher = new MetricFetcher(retriever, queryServiceRetriever, executor, timeout); + metricFetcher = new MetricFetcher(retriever, queryServiceRetriever, scheduledExecutor, timeout); String defaultSavepointDir = config.getString(CoreOptions.SAVEPOINT_DIRECTORY); - JobCancellationWithSavepointHandlers cancelWithSavepoint = new JobCancellationWithSavepointHandlers(currentGraphs, executor, defaultSavepointDir); + JobCancellationWithSavepointHandlers cancelWithSavepoint = new JobCancellationWithSavepointHandlers(executionGraphCache, scheduledExecutor, defaultSavepointDir); RuntimeMonitorHandler triggerHandler = handler(cancelWithSavepoint.getTriggerHandler()); RuntimeMonitorHandler inProgressHandler = handler(cancelWithSavepoint.getInProgressHandler()); Router router = new Router(); // config how to interact with this web server - get(router, new DashboardConfigHandler(executor, cfg.getRefreshInterval())); + get(router, new DashboardConfigHandler(scheduledExecutor, cfg.getRefreshInterval())); // the overview - how many task managers, slots, free slots, ... - get(router, new ClusterOverviewHandler(executor, DEFAULT_REQUEST_TIMEOUT)); + get(router, new ClusterOverviewHandler(scheduledExecutor, DEFAULT_REQUEST_TIMEOUT)); // job manager configuration - get(router, new ClusterConfigHandler(executor, config)); + get(router, new ClusterConfigHandler(scheduledExecutor, config)); // overview over jobs - get(router, new CurrentJobsOverviewHandler(executor, DEFAULT_REQUEST_TIMEOUT, true, true)); - get(router, new CurrentJobsOverviewHandler(executor, DEFAULT_REQUEST_TIMEOUT, true, false)); - get(router, new CurrentJobsOverviewHandler(executor, DEFAULT_REQUEST_TIMEOUT, false, true)); - - get(router, new CurrentJobIdsHandler(executor, DEFAULT_REQUEST_TIMEOUT)); - - get(router, new JobDetailsHandler(currentGraphs, executor, metricFetcher)); - - get(router, new JobVertexDetailsHandler(currentGraphs, executor, metricFetcher)); - get(router, new SubtasksTimesHandler(currentGraphs, executor)); - get(router, new JobVertexTaskManagersHandler(currentGraphs, executor, metricFetcher)); - get(router, new JobVertexAccumulatorsHandler(currentGraphs, executor)); - get(router, new JobVertexBackPressureHandler(currentGraphs, executor, backPressureStatsTracker, refreshInterval)); - get(router, new JobVertexMetricsHandler(executor, metricFetcher)); - get(router, new SubtasksAllAccumulatorsHandler(currentGraphs, executor)); - get(router, new SubtaskCurrentAttemptDetailsHandler(currentGraphs, executor, metricFetcher)); - get(router, new SubtaskExecutionAttemptDetailsHandler(currentGraphs, executor, metricFetcher)); - get(router, new SubtaskExecutionAttemptAccumulatorsHandler(currentGraphs, executor)); - - get(router, new JobPlanHandler(currentGraphs, executor)); - get(router, new JobConfigHandler(currentGraphs, executor)); - get(router, new JobExceptionsHandler(currentGraphs, executor)); - get(router, new JobAccumulatorsHandler(currentGraphs, executor)); - get(router, new JobMetricsHandler(executor, metricFetcher)); - - get(router, new TaskManagersHandler(executor, DEFAULT_REQUEST_TIMEOUT, metricFetcher)); + get(router, new CurrentJobsOverviewHandler(scheduledExecutor, DEFAULT_REQUEST_TIMEOUT, true, true)); + get(router, new CurrentJobsOverviewHandler(scheduledExecutor, DEFAULT_REQUEST_TIMEOUT, true, false)); + get(router, new CurrentJobsOverviewHandler(scheduledExecutor, DEFAULT_REQUEST_TIMEOUT, false, true)); + + get(router, new CurrentJobIdsHandler(scheduledExecutor, DEFAULT_REQUEST_TIMEOUT)); + + get(router, new JobDetailsHandler(executionGraphCache, scheduledExecutor, metricFetcher)); + + get(router, new JobVertexDetailsHandler(executionGraphCache, scheduledExecutor, metricFetcher)); + get(router, new SubtasksTimesHandler(executionGraphCache, scheduledExecutor)); + get(router, new JobVertexTaskManagersHandler(executionGraphCache, scheduledExecutor, metricFetcher)); + get(router, new JobVertexAccumulatorsHandler(executionGraphCache, scheduledExecutor)); + get(router, new JobVertexBackPressureHandler(executionGraphCache, scheduledExecutor, backPressureStatsTracker, refreshInterval)); + get(router, new JobVertexMetricsHandler(scheduledExecutor, metricFetcher)); + get(router, new SubtasksAllAccumulatorsHandler(executionGraphCache, scheduledExecutor)); + get(router, new SubtaskCurrentAttemptDetailsHandler(executionGraphCache, scheduledExecutor, metricFetcher)); + get(router, new SubtaskExecutionAttemptDetailsHandler(executionGraphCache, scheduledExecutor, metricFetcher)); + get(router, new SubtaskExecutionAttemptAccumulatorsHandler(executionGraphCache, scheduledExecutor)); + + get(router, new JobPlanHandler(executionGraphCache, scheduledExecutor)); + get(router, new JobConfigHandler(executionGraphCache, scheduledExecutor)); + get(router, new JobExceptionsHandler(executionGraphCache, scheduledExecutor)); + get(router, new JobAccumulatorsHandler(executionGraphCache, scheduledExecutor)); + get(router, new JobMetricsHandler(scheduledExecutor, metricFetcher)); + + get(router, new TaskManagersHandler(scheduledExecutor, DEFAULT_REQUEST_TIMEOUT, metricFetcher)); get(router, new TaskManagerLogHandler( retriever, - executor, + scheduledExecutor, localRestAddress, timeout, TaskManagerLogHandler.FileMode.LOG, @@ -285,13 +302,13 @@ public WebRuntimeMonitor( get(router, new TaskManagerLogHandler( retriever, - executor, + scheduledExecutor, localRestAddress, timeout, TaskManagerLogHandler.FileMode.STDOUT, config, blobView)); - get(router, new TaskManagerMetricsHandler(executor, metricFetcher)); + get(router, new TaskManagerMetricsHandler(scheduledExecutor, metricFetcher)); router // log and stdout @@ -305,48 +322,48 @@ public WebRuntimeMonitor( .GET("/jobmanager/stdout", logFiles.stdOutFile == null ? new ConstantTextHandler("(stdout file unavailable)") : new StaticFileServerHandler<>(retriever, localRestAddress, timeout, logFiles.stdOutFile)); - get(router, new JobManagerMetricsHandler(executor, metricFetcher)); + get(router, new JobManagerMetricsHandler(scheduledExecutor, metricFetcher)); // Cancel a job via GET (for proper integration with YARN this has to be performed via GET) - get(router, new JobCancellationHandler(executor, timeout)); + get(router, new JobCancellationHandler(scheduledExecutor, timeout)); // DELETE is the preferred way of canceling a job (Rest-conform) - delete(router, new JobCancellationHandler(executor, timeout)); + delete(router, new JobCancellationHandler(scheduledExecutor, timeout)); get(router, triggerHandler); get(router, inProgressHandler); // stop a job via GET (for proper integration with YARN this has to be performed via GET) - get(router, new JobStoppingHandler(executor, timeout)); + get(router, new JobStoppingHandler(scheduledExecutor, timeout)); // DELETE is the preferred way of stopping a job (Rest-conform) - delete(router, new JobStoppingHandler(executor, timeout)); + delete(router, new JobStoppingHandler(scheduledExecutor, timeout)); int maxCachedEntries = config.getInteger(WebOptions.CHECKPOINTS_HISTORY_SIZE); CheckpointStatsCache cache = new CheckpointStatsCache(maxCachedEntries); // Register the checkpoint stats handlers - get(router, new CheckpointStatsHandler(currentGraphs, executor)); - get(router, new CheckpointConfigHandler(currentGraphs, executor)); - get(router, new CheckpointStatsDetailsHandler(currentGraphs, executor, cache)); - get(router, new CheckpointStatsDetailsSubtasksHandler(currentGraphs, executor, cache)); + get(router, new CheckpointStatsHandler(executionGraphCache, scheduledExecutor)); + get(router, new CheckpointConfigHandler(executionGraphCache, scheduledExecutor)); + get(router, new CheckpointStatsDetailsHandler(executionGraphCache, scheduledExecutor, cache)); + get(router, new CheckpointStatsDetailsSubtasksHandler(executionGraphCache, scheduledExecutor, cache)); if (webSubmitAllow) { // fetch the list of uploaded jars. - get(router, new JarListHandler(executor, uploadDir)); + get(router, new JarListHandler(scheduledExecutor, uploadDir)); // get plan for an uploaded jar - get(router, new JarPlanHandler(executor, uploadDir)); + get(router, new JarPlanHandler(scheduledExecutor, uploadDir)); // run a jar - post(router, new JarRunHandler(executor, uploadDir, timeout, config)); + post(router, new JarRunHandler(scheduledExecutor, uploadDir, timeout, config)); // upload a jar - post(router, new JarUploadHandler(executor, uploadDir)); + post(router, new JarUploadHandler(scheduledExecutor, uploadDir)); // delete an uploaded jar from submission interface - delete(router, new JarDeleteHandler(executor, uploadDir)); + delete(router, new JarDeleteHandler(scheduledExecutor, uploadDir)); } else { // send an Access Denied message - JarAccessDeniedHandler jad = new JarAccessDeniedHandler(executor); + JarAccessDeniedHandler jad = new JarAccessDeniedHandler(scheduledExecutor); get(router, jad); post(router, jad); delete(router, jad); @@ -447,6 +464,11 @@ public void run() { @Override public void stop() throws Exception { synchronized (startupShutdownLock) { + + executionGraphCleanupTask.cancel(false); + + executionGraphCache.close(); + leaderRetrievalService.stop(); netty.shutdown(); diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java index e67c5ce6eda9b..95b5811a8a76a 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java @@ -179,7 +179,7 @@ public void testRedirectToLeader() throws Exception { jobManagerRetrievers[i], new AkkaQueryServiceRetriever(jobManagerSystem[i], TIMEOUT), TIMEOUT, - TestingUtils.defaultExecutor()); + TestingUtils.defaultScheduledExecutor()); } ActorRef[] jobManager = new ActorRef[2]; @@ -323,7 +323,7 @@ public void testLeaderNotAvailable() throws Exception { new AkkaJobManagerRetriever(actorSystem, TIMEOUT, 0, Time.milliseconds(50L)), new AkkaQueryServiceRetriever(actorSystem, TIMEOUT), TIMEOUT, - TestingUtils.defaultExecutor()); + TestingUtils.defaultScheduledExecutor()); webRuntimeMonitor.start(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java index 18025dd10b9c7..2a2c414a603c8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java @@ -28,6 +28,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmaster.JobManagerGateway; import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.messages.FlinkJobNotFoundException; import org.apache.flink.runtime.messages.JobManagerMessages; import org.apache.flink.runtime.messages.webmonitor.JobsWithIDsOverview; import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails; @@ -226,7 +227,7 @@ public CompletableFuture requestJobDetails(boolean includeR } @Override - public CompletableFuture> requestJob(JobID jobId, Time timeout) { + public CompletableFuture requestJob(JobID jobId, Time timeout) { CompletableFuture jobResponseFuture = FutureUtils.toJava( jobManagerGateway .ask(new JobManagerMessages.RequestJob(jobId), FutureUtils.toFiniteDuration(timeout)) @@ -235,9 +236,9 @@ public CompletableFuture> requestJob(JobID jobId, return jobResponseFuture.thenApply( (JobManagerMessages.JobResponse jobResponse) -> { if (jobResponse instanceof JobManagerMessages.JobFound) { - return Optional.of(((JobManagerMessages.JobFound) jobResponse).executionGraph()); + return ((JobManagerMessages.JobFound) jobResponse).executionGraph(); } else { - return Optional.empty(); + throw new CompletionException(new FlinkJobNotFoundException(jobId)); } }); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java index 62cddb00b8f82..0a56963b1dd01 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java @@ -18,13 +18,6 @@ package org.apache.flink.runtime.clusterframework; -import akka.actor.ActorSystem; -import com.typesafe.config.Config; - -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.Option; -import org.apache.commons.lang3.StringUtils; - import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; @@ -32,6 +25,7 @@ import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.WebOptions; import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.jobmaster.JobManagerGateway; import org.apache.flink.runtime.webmonitor.WebMonitor; @@ -40,10 +34,13 @@ import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever; import org.apache.flink.util.NetUtils; +import akka.actor.ActorSystem; +import com.typesafe.config.Config; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; - import org.slf4j.LoggerFactory; -import scala.concurrent.duration.FiniteDuration; import java.io.File; import java.io.FileWriter; @@ -55,7 +52,8 @@ import java.util.HashMap; import java.util.Iterator; import java.util.Map; -import java.util.concurrent.Executor; + +import scala.concurrent.duration.FiniteDuration; /** * Tools for starting JobManager and TaskManager processes, including the @@ -178,7 +176,7 @@ public static ActorSystem startActorSystem( * @param jobManagerRetriever to retrieve the leading JobManagerGateway * @param queryServiceRetriever to resolve a query service * @param timeout for asynchronous operations - * @param executor to run asynchronous operations + * @param scheduledExecutor to run asynchronous operations * @param logger Logger for log output * @return WebMonitor instance. * @throws Exception @@ -189,7 +187,7 @@ public static WebMonitor startWebMonitorIfConfigured( LeaderGatewayRetriever jobManagerRetriever, MetricQueryServiceRetriever queryServiceRetriever, Time timeout, - Executor executor, + ScheduledExecutor scheduledExecutor, Logger logger) throws Exception { if (config.getInteger(WebOptions.PORT, 0) >= 0) { @@ -203,7 +201,7 @@ public static WebMonitor startWebMonitorIfConfigured( jobManagerRetriever, queryServiceRetriever, timeout, - executor); + scheduledExecutor); // start the web monitor if (monitor != null) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java index 0396d520e61f7..4d89dc8781e58 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java @@ -26,6 +26,7 @@ import org.apache.flink.runtime.client.JobSubmissionException; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.RunningJobsRegistry; @@ -284,7 +285,7 @@ public CompletableFuture requestStatusOverview(Time timeout) { } @Override - public CompletableFuture requestJobDetails(Time timeout) { + public CompletableFuture requestJobDetails(boolean includeRunning, boolean includeFinished, Time timeout) { final int numberJobsRunning = jobManagerRunners.size(); ArrayList> individualJobDetails = new ArrayList<>(numberJobsRunning); @@ -300,6 +301,17 @@ public CompletableFuture requestJobDetails(Time timeout) { new MultipleJobsDetails(jobDetails, null)); } + @Override + public CompletableFuture requestJob(JobID jobId, Time timeout) { + final JobManagerRunner jobManagerRunner = jobManagerRunners.get(jobId); + + if (jobManagerRunner == null) { + return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId)); + } else { + return jobManagerRunner.getJobManagerGateway().requestArchivedExecutionGraph(timeout); + } + } + /** * Cleans up the job related data from the dispatcher. If cleanupHA is true, then * the data will also be removed from HA. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java index 46b0cd9705a51..1d0a9dda75300 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java @@ -22,8 +22,6 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.messages.Acknowledge; -import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails; -import org.apache.flink.runtime.messages.webmonitor.StatusOverview; import org.apache.flink.runtime.rpc.FencedRpcGateway; import org.apache.flink.runtime.rpc.RpcTimeout; import org.apache.flink.runtime.webmonitor.RestfulGateway; @@ -80,7 +78,4 @@ CompletableFuture> listJobs( * @param timeout of the operation * @return Future {@link StatusOverview} containing the cluster information */ - CompletableFuture requestStatusOverview(@RpcTimeout Time timeout); - - CompletableFuture requestJobDetails(@RpcTimeout Time timeout); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java index 0ff88848f0570..782d6d005dd8c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java @@ -21,15 +21,12 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.akka.ListeningBehaviour; -import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.instance.Instance; import org.apache.flink.runtime.instance.InstanceID; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.messages.JobManagerMessages; import org.apache.flink.runtime.messages.webmonitor.JobsWithIDsOverview; -import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails; -import org.apache.flink.runtime.messages.webmonitor.StatusOverview; import org.apache.flink.runtime.webmonitor.RestfulGateway; import javax.annotation.Nullable; @@ -124,37 +121,6 @@ public interface JobManagerGateway extends RestfulGateway { */ CompletableFuture> requestTaskManagerInstances(Time timeout); - /** - * Requests job details currently being executed by the JobManager. - * - * @param includeRunning true if running jobs shall be included, otherwise false - * @param includeFinished true if finished jobs shall be included, otherwise false - * @param timeout for the asynchronous operation - * @return Future containing the job details - */ - CompletableFuture requestJobDetails( - boolean includeRunning, - boolean includeFinished, - Time timeout); - - /** - * Requests the AccessExecutionGraph for the given jobId. If there is no such graph, then - * {@link Optional#empty()} is returned. - * - * @param jobId identifying the job whose AccessExecutionGraph is requested - * @param timeout for the asynchronous operation - * @return Future containing the AccessExecutionGraph for the given jobId, otherwise {@link Optional#empty()} - */ - CompletableFuture> requestJob(JobID jobId, Time timeout); - - /** - * Requests the status overview from the JobManager. - * - * @param timeout for the asynchronous operation - * @return Future containing the status overview - */ - CompletableFuture requestStatusOverview(Time timeout); - /** * Requests the job overview from the JobManager. * diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 0e534366c6ba0..7efcc0b30b2ec 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -40,6 +40,7 @@ import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.executiongraph.Execution; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.ExecutionGraph; @@ -744,6 +745,11 @@ public CompletableFuture requestJobDetails(Time timeout) { return CompletableFuture.supplyAsync(() -> WebMonitorUtils.createDetailsForJob(executionGraph), executor); } + @Override + public CompletableFuture requestArchivedExecutionGraph(Time timeout) { + return CompletableFuture.completedFuture(executionGraph.archive()); + } + //---------------------------------------------------------------------------------------------- // Internal methods //---------------------------------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java index f3ca5be48717f..0628976cf270b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java @@ -23,6 +23,8 @@ import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.executiongraph.ExecutionVertex; @@ -232,5 +234,19 @@ CompletableFuture registerTaskManager( */ void heartbeatFromResourceManager(final ResourceID resourceID); + /** + * Request the details of the executed job. + * + * @param timeout for the rpc call + * @return Future details of the executed job + */ CompletableFuture requestJobDetails(@RpcTimeout Time timeout); + + /** + * Request the {@link ArchivedExecutionGraph} of the currently executed job. + * + * @param timeout for the rpc call + * @return Future archived execution graph derived from the currently executed job + */ + CompletableFuture requestArchivedExecutionGraph(@RpcTimeout Time timeout); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/FlinkJobNotFoundException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/FlinkJobNotFoundException.java index 95686ac122158..f606071370ef6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/FlinkJobNotFoundException.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/FlinkJobNotFoundException.java @@ -22,13 +22,13 @@ import org.apache.flink.util.FlinkException; /** - * Exception which is returned if a Flink job could not be found. + * Exception indicating that we could not find a Flink job with the given job ID. */ public class FlinkJobNotFoundException extends FlinkException { - private static final long serialVersionUID = -7803390762010615384L; + private static final long serialVersionUID = 2294698055059659025L; public FlinkJobNotFoundException(JobID jobId) { - super("Could not find Flink job (" + jobId + ")."); + super("Could not find Flink job (" + jobId + ')'); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractExecutionGraphRequestHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractExecutionGraphRequestHandler.java index 88932b5c5b79a..7fbde15fb5c1c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractExecutionGraphRequestHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractExecutionGraphRequestHandler.java @@ -27,7 +27,6 @@ import org.apache.flink.util.Preconditions; import java.util.Map; -import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.Executor; @@ -38,11 +37,11 @@ */ public abstract class AbstractExecutionGraphRequestHandler extends AbstractJsonRequestHandler { - private final ExecutionGraphHolder executionGraphHolder; + private final ExecutionGraphCache executionGraphCache; - public AbstractExecutionGraphRequestHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) { + public AbstractExecutionGraphRequestHandler(ExecutionGraphCache executionGraphCache, Executor executor) { super(executor); - this.executionGraphHolder = Preconditions.checkNotNull(executionGraphHolder); + this.executionGraphCache = Preconditions.checkNotNull(executionGraphCache); } @Override @@ -63,16 +62,15 @@ public CompletableFuture handleJsonRequest( return FutureUtils.completedExceptionally(new FlinkException("Invalid JobID string '" + jidString + "'", e)); } - final CompletableFuture> graphFuture = executionGraphHolder.getExecutionGraph(jid, jobManagerGateway); + final CompletableFuture graphFuture = executionGraphCache.getExecutionGraph(jid, jobManagerGateway); - return graphFuture.thenComposeAsync( - (Optional optGraph) -> { - if (optGraph.isPresent()) { - return handleRequest(optGraph.get(), pathParams); - } else { - throw new CompletionException(new NotFoundException("Could not find job with jobId " + jid + '.')); - } - }, executor); + return graphFuture + .exceptionally( + throwable -> { + throw new CompletionException(new NotFoundException("Could not find job " + jid + '.')); + }) + .thenComposeAsync( + (AccessExecutionGraph executionGraph) -> handleRequest(executionGraph, pathParams)); } public abstract CompletableFuture handleRequest(AccessExecutionGraph graph, Map params); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractJobVertexRequestHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractJobVertexRequestHandler.java index e2e4484b288c3..70606e4229ebf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractJobVertexRequestHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractJobVertexRequestHandler.java @@ -32,7 +32,7 @@ */ public abstract class AbstractJobVertexRequestHandler extends AbstractExecutionGraphRequestHandler { - public AbstractJobVertexRequestHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) { + public AbstractJobVertexRequestHandler(ExecutionGraphCache executionGraphHolder, Executor executor) { super(executionGraphHolder, executor); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractSubtaskAttemptRequestHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractSubtaskAttemptRequestHandler.java index ec277d8c311ea..9a225f4e9fc96 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractSubtaskAttemptRequestHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractSubtaskAttemptRequestHandler.java @@ -35,7 +35,7 @@ */ public abstract class AbstractSubtaskAttemptRequestHandler extends AbstractSubtaskRequestHandler { - public AbstractSubtaskAttemptRequestHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) { + public AbstractSubtaskAttemptRequestHandler(ExecutionGraphCache executionGraphHolder, Executor executor) { super(executionGraphHolder, executor); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractSubtaskRequestHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractSubtaskRequestHandler.java index d69038a0a9d33..b1797b02a38f9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractSubtaskRequestHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractSubtaskRequestHandler.java @@ -34,7 +34,7 @@ */ public abstract class AbstractSubtaskRequestHandler extends AbstractJobVertexRequestHandler { - public AbstractSubtaskRequestHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) { + public AbstractSubtaskRequestHandler(ExecutionGraphCache executionGraphHolder, Executor executor) { super(executionGraphHolder, executor); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandler.java index 669ef32906dbc..a6640a9370906 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandler.java @@ -75,7 +75,7 @@ public CurrentJobsOverviewHandler( @Override public CompletableFuture handleRequest(HandlerRequest request, DispatcherGateway gateway) { - return gateway.requestJobDetails(timeout); + return gateway.requestJobDetails(true, true, timeout); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java new file mode 100644 index 0000000000000..f63b042b3b45c --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.util.Preconditions; + +import java.io.Closeable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Cache for {@link AccessExecutionGraph} which are obtained from the Flink cluster. Every cache entry + * has an associated time to live after which a new request will trigger the reloading of the + * {@link AccessExecutionGraph} from the cluster. + */ +public class ExecutionGraphCache implements Closeable { + + private final Time timeout; + + private final Time timeToLive; + + private final ConcurrentHashMap cachedExecutionGraphs; + + private volatile boolean running = true; + + public ExecutionGraphCache( + Time timeout, + Time timeToLive) { + this.timeout = checkNotNull(timeout); + this.timeToLive = checkNotNull(timeToLive); + + cachedExecutionGraphs = new ConcurrentHashMap<>(4); + } + + @Override + public void close() { + running = false; + + // clear all cached AccessExecutionGraphs + cachedExecutionGraphs.clear(); + } + + /** + * Gets the number of cache entries. + */ + public int size() { + return cachedExecutionGraphs.size(); + } + + /** + * Gets the {@link AccessExecutionGraph} for the given {@link JobID} and caches it. The + * {@link AccessExecutionGraph} will be requested again after the refresh interval has passed + * or if the graph could not be retrieved from the given gateway. + * + * @param jobId identifying the {@link AccessExecutionGraph} to get + * @param restfulGateway to request the {@link AccessExecutionGraph} from + * @return Future containing the requested {@link AccessExecutionGraph} + */ + public CompletableFuture getExecutionGraph(JobID jobId, RestfulGateway restfulGateway) { + + Preconditions.checkState(running, "ExecutionGraphCache is no longer running"); + + while (true) { + final ExecutionGraphEntry oldEntry = cachedExecutionGraphs.get(jobId); + + final long currentTime = System.currentTimeMillis(); + + if (oldEntry != null) { + if (currentTime < oldEntry.getTTL()) { + if (oldEntry.getExecutionGraphFuture().isDone() && !oldEntry.getExecutionGraphFuture().isCompletedExceptionally()) { + + // TODO: Remove once we no longer request the actual ExecutionGraph from the JobManager but only the ArchivedExecutionGraph + try { + if (oldEntry.getExecutionGraphFuture().get().getState() != JobStatus.SUSPENDED) { + return oldEntry.getExecutionGraphFuture(); + } + // send a new request to get the ExecutionGraph from the new leader + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException("Could not retrieve ExecutionGraph from the orderly completed future. This should never happen.", e); + } + } else if (!oldEntry.getExecutionGraphFuture().isDone()) { + return oldEntry.getExecutionGraphFuture(); + } + // otherwise it must be completed exceptionally + } + } + + final ExecutionGraphEntry newEntry = new ExecutionGraphEntry(currentTime + timeToLive.toMilliseconds()); + + final boolean successfulUpdate; + + if (oldEntry == null) { + successfulUpdate = cachedExecutionGraphs.putIfAbsent(jobId, newEntry) == null; + } else { + successfulUpdate = cachedExecutionGraphs.replace(jobId, oldEntry, newEntry); + // cancel potentially outstanding futures + oldEntry.getExecutionGraphFuture().cancel(false); + } + + if (successfulUpdate) { + final CompletableFuture executionGraphFuture = restfulGateway.requestJob(jobId, timeout); + + executionGraphFuture.whenComplete( + (AccessExecutionGraph executionGraph, Throwable throwable) -> { + if (throwable != null) { + newEntry.getExecutionGraphFuture().completeExceptionally(throwable); + + // remove exceptionally completed entry because it doesn't help + cachedExecutionGraphs.remove(jobId, newEntry); + } else { + newEntry.getExecutionGraphFuture().complete(executionGraph); + + // TODO: Remove once we no longer request the actual ExecutionGraph from the JobManager but only the ArchivedExecutionGraph + if (executionGraph.getState() == JobStatus.SUSPENDED) { + // remove the entry in case of suspension --> triggers new request when accessed next time + cachedExecutionGraphs.remove(jobId, newEntry); + } + } + }); + + if (!running) { + // delete newly added entry in case of a concurrent stopping operation + cachedExecutionGraphs.remove(jobId, newEntry); + } + + return newEntry.getExecutionGraphFuture(); + } + } + } + + /** + * Perform the cleanup of out dated {@link ExecutionGraphEntry}. + */ + public void cleanup() { + long currentTime = System.currentTimeMillis(); + + // remove entries which have exceeded their time to live + cachedExecutionGraphs.values().removeIf( + (ExecutionGraphEntry entry) -> currentTime >= entry.getTTL()); + } + + /** + * Wrapper containing the current execution graph and it's time to live (TTL). + */ + private static final class ExecutionGraphEntry { + private final long ttl; + + private final CompletableFuture executionGraphFuture; + + ExecutionGraphEntry(long ttl) { + this.ttl = ttl; + this.executionGraphFuture = new CompletableFuture<>(); + } + + public long getTTL() { + return ttl; + } + + public CompletableFuture getExecutionGraphFuture() { + return executionGraphFuture; + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphHolder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphHolder.java deleted file mode 100644 index 8a47e509b19be..0000000000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphHolder.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.rest.handler.legacy; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.time.Time; -import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; -import org.apache.flink.runtime.executiongraph.ExecutionGraph; -import org.apache.flink.runtime.jobgraph.JobStatus; -import org.apache.flink.runtime.jobmaster.JobManagerGateway; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Optional; -import java.util.WeakHashMap; -import java.util.concurrent.CompletableFuture; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * Gateway to obtaining an {@link ExecutionGraph} from a source, like JobManager or Archive. - * - *

The holder will cache the ExecutionGraph behind a weak reference, which will be cleared - * at some point once no one else is pointing to the ExecutionGraph. - * Note that while the holder runs in the same JVM as the JobManager or Archive, the reference should - * stay valid. - */ -public class ExecutionGraphHolder { - - private static final Logger LOG = LoggerFactory.getLogger(ExecutionGraphHolder.class); - - private final Time timeout; - - private final WeakHashMap cache = new WeakHashMap<>(); - - public ExecutionGraphHolder(Time timeout) { - this.timeout = checkNotNull(timeout); - } - - /** - * Retrieves the execution graph with {@link JobID} jid wrapped in {@link Optional} or - * {@link Optional#empty()} if it cannot be found. - * - * @param jid jobID of the execution graph to be retrieved - * @return Optional ExecutionGraph if it has been retrievable, empty if there has been no ExecutionGraph - */ - public CompletableFuture> getExecutionGraph(JobID jid, JobManagerGateway jobManagerGateway) { - AccessExecutionGraph cached = cache.get(jid); - if (cached != null) { - if (cached.getState() == JobStatus.SUSPENDED) { - cache.remove(jid); - } else { - return CompletableFuture.completedFuture(Optional.of(cached)); - } - } - - CompletableFuture> executionGraphFuture = jobManagerGateway.requestJob(jid, timeout); - - executionGraphFuture.thenAcceptAsync( - optExecutionGraph -> - optExecutionGraph.ifPresent(executionGraph -> cache.put(jid, executionGraph))); - - return executionGraphFuture; - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobAccumulatorsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobAccumulatorsHandler.java index 68810eb7e738a..b2a2488806782 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobAccumulatorsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobAccumulatorsHandler.java @@ -42,7 +42,7 @@ public class JobAccumulatorsHandler extends AbstractExecutionGraphRequestHandler private static final String JOB_ACCUMULATORS_REST_PATH = "/jobs/:jobid/accumulators"; - public JobAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) { + public JobAccumulatorsHandler(ExecutionGraphCache executionGraphHolder, Executor executor) { super(executionGraphHolder, executor); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlers.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlers.java index 2750c338739d8..d31af4cb4218e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlers.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlers.java @@ -48,7 +48,6 @@ import java.util.ArrayDeque; import java.util.HashMap; import java.util.Map; -import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.Executor; @@ -91,13 +90,13 @@ public class JobCancellationWithSavepointHandlers { private final String defaultSavepointDirectory; public JobCancellationWithSavepointHandlers( - ExecutionGraphHolder currentGraphs, + ExecutionGraphCache currentGraphs, Executor executor) { this(currentGraphs, executor, null); } public JobCancellationWithSavepointHandlers( - ExecutionGraphHolder currentGraphs, + ExecutionGraphCache currentGraphs, Executor executor, @Nullable String defaultSavepointDirectory) { @@ -124,12 +123,12 @@ public InProgressHandler getInProgressHandler() { class TriggerHandler implements RequestHandler { /** Current execution graphs. */ - private final ExecutionGraphHolder currentGraphs; + private final ExecutionGraphCache currentGraphs; /** Execution context for futures. */ private final Executor executor; - public TriggerHandler(ExecutionGraphHolder currentGraphs, Executor executor) { + public TriggerHandler(ExecutionGraphCache currentGraphs, Executor executor) { this.currentGraphs = checkNotNull(currentGraphs); this.executor = checkNotNull(executor); } @@ -148,39 +147,40 @@ public CompletableFuture handleRequest( if (jobManagerGateway != null) { JobID jobId = JobID.fromHexString(pathParams.get("jobid")); - final CompletableFuture> graphFuture; + final CompletableFuture graphFuture; graphFuture = currentGraphs.getExecutionGraph(jobId, jobManagerGateway); - return graphFuture.thenApplyAsync( - (Optional optGraph) -> { - final AccessExecutionGraph graph = optGraph.orElseThrow( - () -> new CompletionException( - new NotFoundException("Could not find ExecutionGraph with jobId " + jobId + '.'))); - - CheckpointCoordinatorConfiguration jobCheckpointingConfiguration = graph.getCheckpointCoordinatorConfiguration(); - if (jobCheckpointingConfiguration == null) { - throw new CompletionException(new FlinkException("Cannot find checkpoint coordinator configuration for job.")); - } + return graphFuture.handleAsync( + (AccessExecutionGraph graph, Throwable throwable) -> { + if (throwable != null) { + throw new CompletionException(new NotFoundException("Could not find ExecutionGraph with jobId " + jobId + '.')); + } else { + CheckpointCoordinatorConfiguration jobCheckpointingConfiguration = graph.getCheckpointCoordinatorConfiguration(); + if (jobCheckpointingConfiguration == null) { + throw new CompletionException(new FlinkException("Cannot find checkpoint coordinator configuration for job.")); + } - String targetDirectory = pathParams.get("targetDirectory"); - if (targetDirectory == null) { - if (defaultSavepointDirectory == null) { - throw new IllegalStateException("No savepoint directory configured. " + - "You can either specify a directory when triggering this savepoint or " + - "configure a cluster-wide default via key '" + - CoreOptions.SAVEPOINT_DIRECTORY.key() + "'."); - } else { - targetDirectory = defaultSavepointDirectory; + String targetDirectory = pathParams.get("targetDirectory"); + if (targetDirectory == null) { + if (defaultSavepointDirectory == null) { + throw new IllegalStateException("No savepoint directory configured. " + + "You can either specify a directory when triggering this savepoint or " + + "configure a cluster-wide default via key '" + + CoreOptions.SAVEPOINT_DIRECTORY.key() + "'."); + } else { + targetDirectory = defaultSavepointDirectory; + } } - } - try { - return handleNewRequest(jobManagerGateway, jobId, targetDirectory, jobCheckpointingConfiguration.getCheckpointTimeout()); - } catch (IOException e) { - throw new CompletionException(new FlinkException("Could not cancel job with savepoint.", e)); + try { + return handleNewRequest(jobManagerGateway, jobId, targetDirectory, jobCheckpointingConfiguration.getCheckpointTimeout()); + } catch (IOException e) { + throw new CompletionException(new FlinkException("Could not cancel job with savepoint.", e)); + } } - }, executor); + }, + executor); } else { return FutureUtils.completedExceptionally(new Exception("No connection to the leading JobManager.")); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobConfigHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobConfigHandler.java index 787217f5d4c4c..2d404961324bc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobConfigHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobConfigHandler.java @@ -42,8 +42,8 @@ public class JobConfigHandler extends AbstractExecutionGraphRequestHandler { private static final String JOB_CONFIG_REST_PATH = "/jobs/:jobid/config"; - public JobConfigHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) { - super(executionGraphHolder, executor); + public JobConfigHandler(ExecutionGraphCache executionGraphCache, Executor executor) { + super(executionGraphCache, executor); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobDetailsHandler.java index b9f812b715a0e..31b14781a4c0a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobDetailsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobDetailsHandler.java @@ -60,7 +60,7 @@ public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler { private final MetricFetcher fetcher; - public JobDetailsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor, MetricFetcher fetcher) { + public JobDetailsHandler(ExecutionGraphCache executionGraphHolder, Executor executor, MetricFetcher fetcher) { super(executionGraphHolder, executor); this.fetcher = fetcher; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandler.java index 566631e4a1b19..62ee85c2ff11b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandler.java @@ -48,7 +48,7 @@ public class JobExceptionsHandler extends AbstractExecutionGraphRequestHandler { static final int MAX_NUMBER_EXCEPTION_TO_REPORT = 20; - public JobExceptionsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) { + public JobExceptionsHandler(ExecutionGraphCache executionGraphHolder, Executor executor) { super(executionGraphHolder, executor); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobPlanHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobPlanHandler.java index d9db1ff5480e1..ed8c702d8f9b6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobPlanHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobPlanHandler.java @@ -36,7 +36,7 @@ public class JobPlanHandler extends AbstractExecutionGraphRequestHandler { private static final String JOB_PLAN_REST_PATH = "/jobs/:jobid/plan"; - public JobPlanHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) { + public JobPlanHandler(ExecutionGraphCache executionGraphHolder, Executor executor) { super(executionGraphHolder, executor); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexAccumulatorsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexAccumulatorsHandler.java index d448027316c0b..90b1f8c048154 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexAccumulatorsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexAccumulatorsHandler.java @@ -44,7 +44,7 @@ public class JobVertexAccumulatorsHandler extends AbstractJobVertexRequestHandle private static final String JOB_VERTEX_ACCUMULATORS_REST_PATH = "/jobs/:jobid/vertices/:vertexid/accumulators"; - public JobVertexAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) { + public JobVertexAccumulatorsHandler(ExecutionGraphCache executionGraphHolder, Executor executor) { super(executionGraphHolder, executor); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexBackPressureHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexBackPressureHandler.java index 59bfc0ba48005..fb79f46073097 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexBackPressureHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexBackPressureHandler.java @@ -52,7 +52,7 @@ public class JobVertexBackPressureHandler extends AbstractJobVertexRequestHandle private final int refreshInterval; public JobVertexBackPressureHandler( - ExecutionGraphHolder executionGraphHolder, + ExecutionGraphCache executionGraphHolder, Executor executor, BackPressureStatsTracker backPressureStatsTracker, int refreshInterval) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexDetailsHandler.java index bfa7020ef578d..2ef5faac2b7c5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexDetailsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexDetailsHandler.java @@ -53,7 +53,7 @@ public class JobVertexDetailsHandler extends AbstractJobVertexRequestHandler { private final MetricFetcher fetcher; - public JobVertexDetailsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor, MetricFetcher fetcher) { + public JobVertexDetailsHandler(ExecutionGraphCache executionGraphHolder, Executor executor, MetricFetcher fetcher) { super(executionGraphHolder, executor); this.fetcher = fetcher; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexTaskManagersHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexTaskManagersHandler.java index 985ea1ea013fd..d2d5985b335c5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexTaskManagersHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexTaskManagersHandler.java @@ -55,7 +55,7 @@ public class JobVertexTaskManagersHandler extends AbstractJobVertexRequestHandle private final MetricFetcher fetcher; - public JobVertexTaskManagersHandler(ExecutionGraphHolder executionGraphHolder, Executor executor, MetricFetcher fetcher) { + public JobVertexTaskManagersHandler(ExecutionGraphCache executionGraphHolder, Executor executor, MetricFetcher fetcher) { super(executionGraphHolder, executor); this.fetcher = fetcher; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskCurrentAttemptDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskCurrentAttemptDetailsHandler.java index ff4fb465df881..2abdeaf3c3e9c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskCurrentAttemptDetailsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskCurrentAttemptDetailsHandler.java @@ -32,7 +32,7 @@ public class SubtaskCurrentAttemptDetailsHandler extends SubtaskExecutionAttempt public static final String SUBTASK_CURRENT_ATTEMPT_DETAILS_REST_PATH = "/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum"; - public SubtaskCurrentAttemptDetailsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor, MetricFetcher fetcher) { + public SubtaskCurrentAttemptDetailsHandler(ExecutionGraphCache executionGraphHolder, Executor executor, MetricFetcher fetcher) { super(executionGraphHolder, executor, fetcher); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptAccumulatorsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptAccumulatorsHandler.java index 1570896611e0a..3749776349ef6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptAccumulatorsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptAccumulatorsHandler.java @@ -47,7 +47,7 @@ public class SubtaskExecutionAttemptAccumulatorsHandler extends AbstractSubtaskA private static final String SUBTASK_ATTEMPT_ACCUMULATORS_REST_PATH = "/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt/accumulators"; - public SubtaskExecutionAttemptAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) { + public SubtaskExecutionAttemptAccumulatorsHandler(ExecutionGraphCache executionGraphHolder, Executor executor) { super(executionGraphHolder, executor); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptDetailsHandler.java index b0b22eef8621a..5aa83122b8531 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptDetailsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptDetailsHandler.java @@ -55,7 +55,7 @@ public class SubtaskExecutionAttemptDetailsHandler extends AbstractSubtaskAttemp private final MetricFetcher fetcher; - public SubtaskExecutionAttemptDetailsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor, MetricFetcher fetcher) { + public SubtaskExecutionAttemptDetailsHandler(ExecutionGraphCache executionGraphHolder, Executor executor, MetricFetcher fetcher) { super(executionGraphHolder, executor); this.fetcher = fetcher; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksAllAccumulatorsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksAllAccumulatorsHandler.java index 10d9e02cc111a..d1b607ab0b1f8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksAllAccumulatorsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksAllAccumulatorsHandler.java @@ -46,7 +46,7 @@ public class SubtasksAllAccumulatorsHandler extends AbstractJobVertexRequestHand private static final String SUBTASKS_ALL_ACCUMULATORS_REST_PATH = "/jobs/:jobid/vertices/:vertexid/subtasks/accumulators"; - public SubtasksAllAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) { + public SubtasksAllAccumulatorsHandler(ExecutionGraphCache executionGraphHolder, Executor executor) { super(executionGraphHolder, executor); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksTimesHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksTimesHandler.java index bf1d87e6a0df4..a968ab6f2ed78 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksTimesHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksTimesHandler.java @@ -47,7 +47,7 @@ public class SubtasksTimesHandler extends AbstractJobVertexRequestHandler { private static final String SUBTASK_TIMES_REST_PATH = "/jobs/:jobid/vertices/:vertexid/subtasktimes"; - public SubtasksTimesHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) { + public SubtasksTimesHandler(ExecutionGraphCache executionGraphHolder, Executor executor) { super(executionGraphHolder, executor); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandler.java index 6ab6676301d5c..69a59f5256322 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandler.java @@ -22,7 +22,7 @@ import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings; import org.apache.flink.runtime.rest.handler.legacy.AbstractExecutionGraphRequestHandler; -import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphHolder; +import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; import org.apache.flink.runtime.rest.handler.legacy.JsonFactory; import org.apache.flink.runtime.webmonitor.history.ArchivedJson; import org.apache.flink.runtime.webmonitor.history.JsonArchivist; @@ -46,7 +46,7 @@ public class CheckpointConfigHandler extends AbstractExecutionGraphRequestHandle private static final String CHECKPOINT_CONFIG_REST_PATH = "/jobs/:jobid/checkpoints/config"; - public CheckpointConfigHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) { + public CheckpointConfigHandler(ExecutionGraphCache executionGraphHolder, Executor executor) { super(executionGraphHolder, executor); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandler.java index b61c5d0c7575c..e27797189c09c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandler.java @@ -26,7 +26,7 @@ import org.apache.flink.runtime.checkpoint.TaskStateStats; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.rest.handler.legacy.AbstractExecutionGraphRequestHandler; -import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphHolder; +import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; import org.apache.flink.runtime.rest.handler.legacy.JsonFactory; import org.apache.flink.runtime.webmonitor.history.ArchivedJson; import org.apache.flink.runtime.webmonitor.history.JsonArchivist; @@ -54,7 +54,7 @@ public class CheckpointStatsDetailsHandler extends AbstractExecutionGraphRequest private final CheckpointStatsCache cache; - public CheckpointStatsDetailsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor, CheckpointStatsCache cache) { + public CheckpointStatsDetailsHandler(ExecutionGraphCache executionGraphHolder, Executor executor, CheckpointStatsCache cache) { super(executionGraphHolder, executor); this.cache = cache; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsSubtasksHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsSubtasksHandler.java index 22a8db2ade986..5420cf4c22d3a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsSubtasksHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsSubtasksHandler.java @@ -30,7 +30,7 @@ import org.apache.flink.runtime.jobmaster.JobManagerGateway; import org.apache.flink.runtime.rest.handler.legacy.AbstractExecutionGraphRequestHandler; import org.apache.flink.runtime.rest.handler.legacy.AbstractJobVertexRequestHandler; -import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphHolder; +import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; import org.apache.flink.runtime.rest.handler.legacy.JsonFactory; import org.apache.flink.runtime.webmonitor.history.ArchivedJson; import org.apache.flink.runtime.webmonitor.history.JsonArchivist; @@ -59,7 +59,7 @@ public class CheckpointStatsDetailsSubtasksHandler extends AbstractExecutionGrap private final CheckpointStatsCache cache; - public CheckpointStatsDetailsSubtasksHandler(ExecutionGraphHolder executionGraphHolder, Executor executor, CheckpointStatsCache cache) { + public CheckpointStatsDetailsSubtasksHandler(ExecutionGraphCache executionGraphHolder, Executor executor, CheckpointStatsCache cache) { super(executionGraphHolder, executor); this.cache = checkNotNull(cache); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandler.java index bea94f2c7d152..bbfcd8a962c5c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandler.java @@ -29,7 +29,7 @@ import org.apache.flink.runtime.checkpoint.RestoredCheckpointStats; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.rest.handler.legacy.AbstractExecutionGraphRequestHandler; -import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphHolder; +import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; import org.apache.flink.runtime.rest.handler.legacy.JsonFactory; import org.apache.flink.runtime.webmonitor.history.ArchivedJson; import org.apache.flink.runtime.webmonitor.history.JsonArchivist; @@ -55,7 +55,7 @@ public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler private static final String CHECKPOINT_STATS_REST_PATH = "/jobs/:jobid/checkpoints"; - public CheckpointStatsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) { + public CheckpointStatsHandler(ExecutionGraphCache executionGraphHolder, Executor executor) { super(executionGraphHolder, executor); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java index a5d52e5ff2ebc..d5d194daf883f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java @@ -18,9 +18,15 @@ package org.apache.flink.runtime.webmonitor; +import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.messages.FlinkJobNotFoundException; +import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails; +import org.apache.flink.runtime.messages.webmonitor.StatusOverview; import org.apache.flink.runtime.rpc.RpcEndpoint; import org.apache.flink.runtime.rpc.RpcGateway; +import org.apache.flink.runtime.rpc.RpcTimeout; import java.util.concurrent.CompletableFuture; @@ -38,5 +44,36 @@ public interface RestfulGateway extends RpcGateway { * @param timeout for this operation * @return Future REST endpoint address */ - CompletableFuture requestRestAddress(Time timeout); + CompletableFuture requestRestAddress(@RpcTimeout Time timeout); + + /** + * Requests the AccessExecutionGraph for the given jobId. If there is no such graph, then + * the future is completed with a {@link FlinkJobNotFoundException}. + * + * @param jobId identifying the job whose AccessExecutionGraph is requested + * @param timeout for the asynchronous operation + * @return Future containing the AccessExecutionGraph for the given jobId, otherwise {@link FlinkJobNotFoundException} + */ + CompletableFuture requestJob(JobID jobId, @RpcTimeout Time timeout); + + /** + * Requests job details currently being executed on the Flink cluster. + * + * @param includeRunning true if running jobs shall be included, otherwise false + * @param includeFinished true if finished jobs shall be included, otherwise false + * @param timeout for the asynchronous operation + * @return Future containing the job details + */ + CompletableFuture requestJobDetails( + boolean includeRunning, + boolean includeFinished, + @RpcTimeout Time timeout); + + /** + * Requests the cluster status overview. + * + * @param timeout for the asynchronous operation + * @return Future containing the status overview + */ + CompletableFuture requestStatusOverview(@RpcTimeout Time timeout); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java index 0accab755763a..1730bc8cd5268 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java @@ -23,6 +23,7 @@ import org.apache.flink.configuration.WebOptions; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.blob.BlobView; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; @@ -55,7 +56,6 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; /** * Utilities for the web runtime monitor. This class contains for example methods to build @@ -131,7 +131,7 @@ private static File resolveFileLocation(String logFilePath) { * @param jobManagerRetriever which retrieves the currently leading JobManager * @param queryServiceRetriever which retrieves the query service * @param timeout for asynchronous operations - * @param executor to run asynchronous operations + * @param scheduledExecutor to run asynchronous operations */ public static WebMonitor startWebRuntimeMonitor( Configuration config, @@ -139,7 +139,7 @@ public static WebMonitor startWebRuntimeMonitor( LeaderGatewayRetriever jobManagerRetriever, MetricQueryServiceRetriever queryServiceRetriever, Time timeout, - Executor executor) { + ScheduledExecutor scheduledExecutor) { // try to load and instantiate the class try { String classname = "org.apache.flink.runtime.webmonitor.WebRuntimeMonitor"; @@ -152,7 +152,7 @@ public static WebMonitor startWebRuntimeMonitor( LeaderGatewayRetriever.class, MetricQueryServiceRetriever.class, Time.class, - Executor.class); + ScheduledExecutor.class); return constructor.newInstance( config, highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID), @@ -160,7 +160,7 @@ public static WebMonitor startWebRuntimeMonitor( jobManagerRetriever, queryServiceRetriever, timeout, - executor); + scheduledExecutor); } catch (ClassNotFoundException e) { LOG.error("Could not load web runtime monitor. " + "Probably reason: flink-runtime-web is not in the classpath"); diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 0445f19c29450..1c0d223b14670 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -46,7 +46,7 @@ import org.apache.flink.runtime.clusterframework.FlinkResourceManager import org.apache.flink.runtime.clusterframework.messages._ import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager import org.apache.flink.runtime.clusterframework.types.ResourceID -import org.apache.flink.runtime.concurrent.{FutureUtils, Executors => FlinkExecutors} +import org.apache.flink.runtime.concurrent.{FutureUtils, ScheduledExecutorServiceAdapter, Executors => FlinkExecutors} import org.apache.flink.runtime.execution.SuppressRestartsException import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders.ResolveOrder import org.apache.flink.runtime.execution.librarycache.{BlobLibraryCacheManager, LibraryCacheManager} @@ -2232,7 +2232,7 @@ object JobManager { new AkkaJobManagerRetriever(jobManagerSystem, timeout, 10, Time.milliseconds(50L)), new AkkaQueryServiceRetriever(jobManagerSystem, timeout), timeout, - futureExecutor) + new ScheduledExecutorServiceAdapter(futureExecutor)) Option(webServer) } diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala index cedf60776dd9c..5692863785313 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala @@ -32,7 +32,7 @@ import org.apache.flink.configuration._ import org.apache.flink.core.fs.Path import org.apache.flink.runtime.akka.{AkkaJobManagerGateway, AkkaUtils} import org.apache.flink.runtime.client.{JobClient, JobExecutionException} -import org.apache.flink.runtime.concurrent.{FutureUtils, Executors => FlinkExecutors} +import org.apache.flink.runtime.concurrent.{FutureUtils, ScheduledExecutorServiceAdapter, Executors => FlinkExecutors} import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders import org.apache.flink.runtime.highavailability.{HighAvailabilityServices, HighAvailabilityServicesUtils} import org.apache.flink.runtime.instance.{ActorGateway, AkkaActorGateway} @@ -406,7 +406,7 @@ abstract class FlinkMiniCluster( new AkkaJobManagerRetriever(actorSystem, flinkTimeout, 10, Time.milliseconds(50L)), new AkkaQueryServiceRetriever(actorSystem, flinkTimeout), flinkTimeout, - actorSystem.dispatcher) + new ScheduledExecutorServiceAdapter(futureExecutor)) ) webServer.foreach(_.start()) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java new file mode 100644 index 0000000000000..1a8ea84de2b6e --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java @@ -0,0 +1,353 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy; + +import org.apache.flink.api.common.ArchivedExecutionConfig; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; +import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.executiongraph.ErrorInfo; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobmanager.JobManager; +import org.apache.flink.runtime.jobmaster.JobManagerGateway; +import org.apache.flink.runtime.messages.FlinkJobNotFoundException; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Tests for the {@link ExecutionGraphCache}. + */ +public class ExecutionGraphCacheTest extends TestLogger { + + /** + * Tests that we can cache AccessExecutionGraphs over multiple accesses. + */ + @Test + public void testExecutionGraphCaching() throws Exception { + final Time timeout = Time.milliseconds(100L); + final Time timeToLive = Time.hours(1L); + final JobID jobId = new JobID(); + final AccessExecutionGraph accessExecutionGraph = mock(AccessExecutionGraph.class); + + final JobManagerGateway jobManagerGateway = mock(JobManagerGateway.class); + when(jobManagerGateway.requestJob(eq(jobId), any(Time.class))).thenReturn(CompletableFuture.completedFuture(accessExecutionGraph)); + + try (ExecutionGraphCache executionGraphCache = new ExecutionGraphCache(timeout, timeToLive)) { + CompletableFuture accessExecutionGraphFuture = executionGraphCache.getExecutionGraph(jobId, jobManagerGateway); + + assertEquals(accessExecutionGraph, accessExecutionGraphFuture.get()); + + CompletableFuture accessExecutionGraphFuture2 = executionGraphCache.getExecutionGraph(jobId, jobManagerGateway); + + assertEquals(accessExecutionGraph, accessExecutionGraphFuture2.get()); + + // verify that we only issued a single request to the gateway + verify(jobManagerGateway, times(1)).requestJob(eq(jobId), any(Time.class)); + } + } + + /** + * Tests that an AccessExecutionGraph is invalidated after its TTL expired. + */ + @Test + public void testExecutionGraphEntryInvalidation() throws Exception { + final Time timeout = Time.milliseconds(100L); + final Time timeToLive = Time.milliseconds(1L); + final JobID jobId = new JobID(); + final AccessExecutionGraph accessExecutionGraph = mock(AccessExecutionGraph.class); + + final JobManagerGateway jobManagerGateway = mock(JobManagerGateway.class); + when(jobManagerGateway.requestJob(eq(jobId), any(Time.class))).thenReturn(CompletableFuture.completedFuture(accessExecutionGraph)); + + try (ExecutionGraphCache executionGraphCache = new ExecutionGraphCache(timeout, timeToLive)) { + CompletableFuture executionGraphFuture = executionGraphCache.getExecutionGraph(jobId, jobManagerGateway); + + assertEquals(accessExecutionGraph, executionGraphFuture.get()); + + // sleep for the TTL + Thread.sleep(timeToLive.toMilliseconds()); + + CompletableFuture executionGraphFuture2 = executionGraphCache.getExecutionGraph(jobId, jobManagerGateway); + + assertEquals(accessExecutionGraph, executionGraphFuture2.get()); + + verify(jobManagerGateway, times(2)).requestJob(eq(jobId), any(Time.class)); + } + } + + + /** + * Tests that a failure in requesting an AccessExecutionGraph from the gateway, will not create + * a cache entry --> another cache request will trigger a new gateway request. + */ + @Test + public void testImmediateCacheInvalidationAfterFailure() throws Exception { + final Time timeout = Time.milliseconds(100L); + final Time timeToLive = Time.hours(1L); + final JobID jobId = new JobID(); + + final AccessExecutionGraph accessExecutionGraph = mock(AccessExecutionGraph.class); + + final JobManagerGateway jobManagerGateway = mock(JobManagerGateway.class); + // let's first answer with a JobNotFoundException and then only with the correct result + when(jobManagerGateway.requestJob(eq(jobId), any(Time.class))).thenReturn( + FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId)), + CompletableFuture.completedFuture(accessExecutionGraph)); + + try (ExecutionGraphCache executionGraphCache = new ExecutionGraphCache(timeout, timeToLive)) { + CompletableFuture executionGraphFuture = executionGraphCache.getExecutionGraph(jobId, jobManagerGateway); + + try { + executionGraphFuture.get(); + + fail("The execution graph future should have been completed exceptionally."); + } catch (ExecutionException ee) { + assertTrue(ee.getCause() instanceof FlinkException); + } + + CompletableFuture executionGraphFuture2 = executionGraphCache.getExecutionGraph(jobId, jobManagerGateway); + + assertEquals(accessExecutionGraph, executionGraphFuture2.get()); + } + } + + /** + * Tests that cache entries are cleaned up when their TTL has expired upon + * calling {@link ExecutionGraphCache#cleanup()}. + */ + @Test + public void testCacheEntryCleanup() throws Exception { + final Time timeout = Time.milliseconds(100L); + final Time timeToLive = Time.milliseconds(1L); + final JobID jobId1 = new JobID(); + final JobID jobId2 = new JobID(); + final AccessExecutionGraph accessExecutionGraph1 = mock(AccessExecutionGraph.class); + final AccessExecutionGraph accessExecutionGraph2 = mock(AccessExecutionGraph.class); + + final JobManagerGateway jobManagerGateway = mock(JobManagerGateway.class); + when(jobManagerGateway.requestJob(eq(jobId1), any(Time.class))).thenReturn(CompletableFuture.completedFuture(accessExecutionGraph1)); + when(jobManagerGateway.requestJob(eq(jobId2), any(Time.class))).thenReturn(CompletableFuture.completedFuture(accessExecutionGraph2)); + + try (ExecutionGraphCache executionGraphCache = new ExecutionGraphCache(timeout, timeToLive)) { + + CompletableFuture executionGraph1Future = executionGraphCache.getExecutionGraph(jobId1, jobManagerGateway); + + CompletableFuture executionGraph2Future = executionGraphCache.getExecutionGraph(jobId2, jobManagerGateway); + + assertEquals(accessExecutionGraph1, executionGraph1Future.get()); + + assertEquals(accessExecutionGraph2, executionGraph2Future.get()); + + verify(jobManagerGateway, times(1)).requestJob(eq(jobId1), any(Time.class)); + verify(jobManagerGateway, times(1)).requestJob(eq(jobId2), any(Time.class)); + + Thread.sleep(timeToLive.toMilliseconds()); + + executionGraphCache.cleanup(); + + assertTrue(executionGraphCache.size() == 0); + } + } + + /** + * Tests that concurrent accesses only trigger a single AccessExecutionGraph request. + */ + @Test + public void testConcurrentAccess() throws Exception { + final Time timeout = Time.milliseconds(100L); + final Time timeToLive = Time.hours(1L); + final JobID jobId = new JobID(); + + final AccessExecutionGraph accessExecutionGraph = mock(AccessExecutionGraph.class); + + final JobManagerGateway jobManagerGateway = mock(JobManagerGateway.class); + when(jobManagerGateway.requestJob(eq(jobId), any(Time.class))).thenReturn(CompletableFuture.completedFuture(accessExecutionGraph)); + + final int numConcurrentAccesses = 10; + + final ArrayList> executionGraphFutures = new ArrayList<>(numConcurrentAccesses); + + final ExecutorService executor = java.util.concurrent.Executors.newFixedThreadPool(numConcurrentAccesses); + + try (ExecutionGraphCache executionGraphCache = new ExecutionGraphCache(timeout, timeToLive)) { + for (int i = 0; i < numConcurrentAccesses; i++) { + CompletableFuture executionGraphFuture = CompletableFuture + .supplyAsync( + () -> executionGraphCache.getExecutionGraph(jobId, jobManagerGateway), + executor) + .thenCompose(Function.identity()); + + executionGraphFutures.add(executionGraphFuture); + } + + final CompletableFuture> allExecutionGraphFutures = FutureUtils.combineAll(executionGraphFutures); + + Collection allExecutionGraphs = allExecutionGraphFutures.get(); + + for (AccessExecutionGraph executionGraph : allExecutionGraphs) { + assertEquals(accessExecutionGraph, executionGraph); + } + + verify(jobManagerGateway, times(1)).requestJob(eq(jobId), any(Time.class)); + } finally { + Executors.gracefulShutdown(5000L, TimeUnit.MILLISECONDS, executor); + } + } + + /** + * Tests that a cache entry is invalidated if the retrieved {@link AccessExecutionGraph} is in + * state {@link JobStatus#SUSPENDED}. + * + *

This test can be removed once we no longer request the actual {@link ExecutionGraph} from the + * {@link JobManager}. + */ + @Test + public void testCacheInvalidationIfSuspended() throws Exception { + final Time timeout = Time.milliseconds(100L); + final Time timeToLive = Time.hours(1L); + final JobID jobId = new JobID(); + + final AccessExecutionGraph accessExecutionGraph = mock(AccessExecutionGraph.class); + + final AccessExecutionGraph suspendedExecutionGraph = mock(AccessExecutionGraph.class); + when(suspendedExecutionGraph.getState()).thenReturn(JobStatus.SUSPENDED); + + final JobManagerGateway jobManagerGateway = mock(JobManagerGateway.class); + // let's first answer with a suspended ExecutionGraph and then only with the correct result + when(jobManagerGateway.requestJob(eq(jobId), any(Time.class))).thenReturn( + CompletableFuture.completedFuture(suspendedExecutionGraph), + CompletableFuture.completedFuture(accessExecutionGraph)); + + try (ExecutionGraphCache executionGraphCache = new ExecutionGraphCache(timeout, timeToLive)) { + CompletableFuture executionGraphFuture = executionGraphCache.getExecutionGraph(jobId, jobManagerGateway); + + assertEquals(suspendedExecutionGraph, executionGraphFuture.get()); + + CompletableFuture executionGraphFuture2 = executionGraphCache.getExecutionGraph(jobId, jobManagerGateway); + + assertEquals(accessExecutionGraph, executionGraphFuture2.get()); + } + } + + /** + * Tests that a cache entry is invalidated if the retrieved {@link AccessExecutionGraph} changes its + * state to {@link JobStatus#SUSPENDED}. + * + *

This test can be removed once we no longer request the actual {@link ExecutionGraph} from the + * {@link JobManager}. + */ + @Test + public void testCacheInvalidationIfSwitchToSuspended() throws Exception { + final Time timeout = Time.milliseconds(100L); + final Time timeToLive = Time.hours(1L); + final JobID jobId = new JobID(); + + final AccessExecutionGraph accessExecutionGraph = mock(AccessExecutionGraph.class); + + final SuspendableAccessExecutionGraph toBeSuspendedExecutionGraph = new SuspendableAccessExecutionGraph(jobId); + + final JobManagerGateway jobManagerGateway = mock(JobManagerGateway.class); + // let's first answer with a JobNotFoundException and then only with the correct result + when(jobManagerGateway.requestJob(eq(jobId), any(Time.class))).thenReturn( + CompletableFuture.completedFuture(toBeSuspendedExecutionGraph), + CompletableFuture.completedFuture(accessExecutionGraph)); + + try (ExecutionGraphCache executionGraphCache = new ExecutionGraphCache(timeout, timeToLive)) { + CompletableFuture executionGraphFuture = executionGraphCache.getExecutionGraph(jobId, jobManagerGateway); + + assertEquals(toBeSuspendedExecutionGraph, executionGraphFuture.get()); + + toBeSuspendedExecutionGraph.setJobStatus(JobStatus.SUSPENDED); + + // retrieve the same job from the cache again --> this should return it and invalidate the cache entry + CompletableFuture executionGraphFuture2 = executionGraphCache.getExecutionGraph(jobId, jobManagerGateway); + + assertEquals(accessExecutionGraph, executionGraphFuture2.get()); + + CompletableFuture executionGraphFuture3 = executionGraphCache.getExecutionGraph(jobId, jobManagerGateway); + + assertEquals(accessExecutionGraph, executionGraphFuture3.get()); + + verify(jobManagerGateway, times(2)).requestJob(eq(jobId), any(Time.class)); + } + } + + private static final class SuspendableAccessExecutionGraph extends ArchivedExecutionGraph { + + private static final long serialVersionUID = -6796543726305778101L; + + private JobStatus jobStatus; + + public SuspendableAccessExecutionGraph(JobID jobId) { + super( + jobId, + "ExecutionGraphCacheTest", + Collections.emptyMap(), + Collections.emptyList(), + new long[0], + JobStatus.RUNNING, + new ErrorInfo(new FlinkException("Test"), 42L), + "", + new StringifiedAccumulatorResult[0], + Collections.emptyMap(), + new ArchivedExecutionConfig(new ExecutionConfig()), + false, + null, + null); + + jobStatus = super.getState(); + } + + @Override + public JobStatus getState() { + return jobStatus; + } + + public void setJobStatus(JobStatus jobStatus) { + this.jobStatus = jobStatus; + } + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobAccumulatorsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobAccumulatorsHandlerTest.java index e1736c113fbe8..0a502e4d2aafd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobAccumulatorsHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobAccumulatorsHandlerTest.java @@ -54,7 +54,7 @@ public void testArchiver() throws Exception { @Test public void testGetPaths() { - JobAccumulatorsHandler handler = new JobAccumulatorsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor()); + JobAccumulatorsHandler handler = new JobAccumulatorsHandler(mock(ExecutionGraphCache.class), Executors.directExecutor()); String[] paths = handler.getPaths(); Assert.assertEquals(1, paths.length); Assert.assertEquals("/jobs/:jobid/accumulators", paths[0]); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlersTest.java index 8bb11411ad36e..18a52f5958663 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlersTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlersTest.java @@ -44,7 +44,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; @@ -66,7 +65,7 @@ public class JobCancellationWithSavepointHandlersTest extends TestLogger { @Test public void testGetPaths() { - JobCancellationWithSavepointHandlers handler = new JobCancellationWithSavepointHandlers(mock(ExecutionGraphHolder.class), executor); + JobCancellationWithSavepointHandlers handler = new JobCancellationWithSavepointHandlers(mock(ExecutionGraphCache.class), executor); JobCancellationWithSavepointHandlers.TriggerHandler triggerHandler = handler.getTriggerHandler(); String[] triggerPaths = triggerHandler.getPaths(); @@ -89,9 +88,9 @@ public void testGetPaths() { public void testAskTimeoutEqualsCheckpointTimeout() throws Exception { long timeout = 128288238L; JobID jobId = new JobID(); - ExecutionGraphHolder holder = mock(ExecutionGraphHolder.class); + ExecutionGraphCache holder = mock(ExecutionGraphCache.class); ExecutionGraph graph = mock(ExecutionGraph.class); - when(holder.getExecutionGraph(eq(jobId), any(JobManagerGateway.class))).thenReturn(CompletableFuture.completedFuture(Optional.of(graph))); + when(holder.getExecutionGraph(eq(jobId), any(JobManagerGateway.class))).thenReturn(CompletableFuture.completedFuture(graph)); when(graph.getCheckpointCoordinatorConfiguration()).thenReturn( new CheckpointCoordinatorConfiguration( 1L, @@ -123,9 +122,9 @@ public void testAskTimeoutEqualsCheckpointTimeout() throws Exception { public void testSavepointDirectoryConfiguration() throws Exception { long timeout = 128288238L; JobID jobId = new JobID(); - ExecutionGraphHolder holder = mock(ExecutionGraphHolder.class); + ExecutionGraphCache holder = mock(ExecutionGraphCache.class); ExecutionGraph graph = mock(ExecutionGraph.class); - when(holder.getExecutionGraph(eq(jobId), any(JobManagerGateway.class))).thenReturn(CompletableFuture.completedFuture(Optional.of(graph))); + when(holder.getExecutionGraph(eq(jobId), any(JobManagerGateway.class))).thenReturn(CompletableFuture.completedFuture(graph)); when(graph.getCheckpointCoordinatorConfiguration()).thenReturn( new CheckpointCoordinatorConfiguration( 1L, @@ -176,9 +175,9 @@ public void testSavepointDirectoryConfiguration() throws Exception { @Test public void testTriggerNewRequest() throws Exception { JobID jobId = new JobID(); - ExecutionGraphHolder holder = mock(ExecutionGraphHolder.class); + ExecutionGraphCache holder = mock(ExecutionGraphCache.class); ExecutionGraph graph = mock(ExecutionGraph.class); - when(holder.getExecutionGraph(eq(jobId), any(JobManagerGateway.class))).thenReturn(CompletableFuture.completedFuture(Optional.of(graph))); + when(holder.getExecutionGraph(eq(jobId), any(JobManagerGateway.class))).thenReturn(CompletableFuture.completedFuture(graph)); when(graph.getCheckpointCoordinatorConfiguration()).thenReturn( new CheckpointCoordinatorConfiguration( 1L, @@ -308,9 +307,9 @@ public void testTriggerNewRequest() throws Exception { @Test public void testFailedCancellation() throws Exception { JobID jobId = new JobID(); - ExecutionGraphHolder holder = mock(ExecutionGraphHolder.class); + ExecutionGraphCache holder = mock(ExecutionGraphCache.class); ExecutionGraph graph = mock(ExecutionGraph.class); - when(holder.getExecutionGraph(eq(jobId), any(JobManagerGateway.class))).thenReturn(CompletableFuture.completedFuture(Optional.of(graph))); + when(holder.getExecutionGraph(eq(jobId), any(JobManagerGateway.class))).thenReturn(CompletableFuture.completedFuture(graph)); when(graph.getCheckpointCoordinatorConfiguration()).thenReturn( new CheckpointCoordinatorConfiguration( 1L, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobConfigHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobConfigHandlerTest.java index 567df8cd6e66b..efe9dc8191b6c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobConfigHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobConfigHandlerTest.java @@ -55,7 +55,7 @@ public void testArchiver() throws Exception { @Test public void testGetPaths() { - JobConfigHandler handler = new JobConfigHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor()); + JobConfigHandler handler = new JobConfigHandler(mock(ExecutionGraphCache.class), Executors.directExecutor()); String[] paths = handler.getPaths(); Assert.assertEquals(1, paths.length); Assert.assertEquals("/jobs/:jobid/config", paths[0]); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobDetailsHandlerTest.java index afd743e4c779a..028bf0dd7965f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobDetailsHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobDetailsHandlerTest.java @@ -68,7 +68,7 @@ public void testArchiver() throws Exception { @Test public void testGetPaths() { - JobDetailsHandler handler = new JobDetailsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), null); + JobDetailsHandler handler = new JobDetailsHandler(mock(ExecutionGraphCache.class), Executors.directExecutor(), null); String[] paths = handler.getPaths(); Assert.assertEquals(2, paths.length); List pathsList = Lists.newArrayList(paths); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandlerTest.java index 6a20696ff7224..d20a156bc8bc4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandlerTest.java @@ -58,7 +58,7 @@ public void testArchiver() throws Exception { @Test public void testGetPaths() { - JobExceptionsHandler handler = new JobExceptionsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor()); + JobExceptionsHandler handler = new JobExceptionsHandler(mock(ExecutionGraphCache.class), Executors.directExecutor()); String[] paths = handler.getPaths(); Assert.assertEquals(1, paths.length); Assert.assertEquals("/jobs/:jobid/exceptions", paths[0]); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobPlanHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobPlanHandlerTest.java index 6d3b213768a3e..29e0819c61dea 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobPlanHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobPlanHandlerTest.java @@ -51,7 +51,7 @@ public void testArchiver() throws Exception { @Test public void testGetPaths() { - JobPlanHandler handler = new JobPlanHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor()); + JobPlanHandler handler = new JobPlanHandler(mock(ExecutionGraphCache.class), Executors.directExecutor()); String[] paths = handler.getPaths(); Assert.assertEquals(1, paths.length); Assert.assertEquals("/jobs/:jobid/plan", paths[0]); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexAccumulatorsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexAccumulatorsHandlerTest.java index feffe60b67ea2..d04345d0b915c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexAccumulatorsHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexAccumulatorsHandlerTest.java @@ -57,7 +57,7 @@ public void testArchiver() throws Exception { @Test public void testGetPaths() { - JobVertexAccumulatorsHandler handler = new JobVertexAccumulatorsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor()); + JobVertexAccumulatorsHandler handler = new JobVertexAccumulatorsHandler(mock(ExecutionGraphCache.class), Executors.directExecutor()); String[] paths = handler.getPaths(); Assert.assertEquals(1, paths.length); Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/accumulators", paths[0]); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexBackPressureHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexBackPressureHandlerTest.java index bd6817f84fcd6..e02ca35d4fa70 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexBackPressureHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexBackPressureHandlerTest.java @@ -45,7 +45,7 @@ public class JobVertexBackPressureHandlerTest { @Test public void testGetPaths() { - JobVertexBackPressureHandler handler = new JobVertexBackPressureHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), mock(BackPressureStatsTracker.class), 0); + JobVertexBackPressureHandler handler = new JobVertexBackPressureHandler(mock(ExecutionGraphCache.class), Executors.directExecutor(), mock(BackPressureStatsTracker.class), 0); String[] paths = handler.getPaths(); Assert.assertEquals(1, paths.length); Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/backpressure", paths[0]); @@ -61,7 +61,7 @@ public void testResponseNoStatsAvailable() throws Exception { .thenReturn(Optional.empty()); JobVertexBackPressureHandler handler = new JobVertexBackPressureHandler( - mock(ExecutionGraphHolder.class), + mock(ExecutionGraphCache.class), Executors.directExecutor(), statsTracker, 9999); @@ -95,7 +95,7 @@ public void testResponseStatsAvailable() throws Exception { .thenReturn(Optional.of(stats)); JobVertexBackPressureHandler handler = new JobVertexBackPressureHandler( - mock(ExecutionGraphHolder.class), + mock(ExecutionGraphCache.class), Executors.directExecutor(), statsTracker, 9999); @@ -157,7 +157,7 @@ public void testResponsePassedRefreshInterval() throws Exception { .thenReturn(Optional.of(stats)); JobVertexBackPressureHandler handler = new JobVertexBackPressureHandler( - mock(ExecutionGraphHolder.class), + mock(ExecutionGraphCache.class), Executors.directExecutor(), statsTracker, 0); // <----- refresh interval should fire immediately diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexDetailsHandlerTest.java index 5af1d53fd35a0..c2d260333f54b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexDetailsHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexDetailsHandlerTest.java @@ -59,7 +59,7 @@ public void testArchiver() throws Exception { @Test public void testGetPaths() { - JobVertexDetailsHandler handler = new JobVertexDetailsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), null); + JobVertexDetailsHandler handler = new JobVertexDetailsHandler(mock(ExecutionGraphCache.class), Executors.directExecutor(), null); String[] paths = handler.getPaths(); Assert.assertEquals(1, paths.length); Assert.assertEquals("/jobs/:jobid/vertices/:vertexid", paths[0]); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexTaskManagersHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexTaskManagersHandlerTest.java index 2a027fd4a4a25..e3f3382c278af 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexTaskManagersHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexTaskManagersHandlerTest.java @@ -62,7 +62,7 @@ public void testArchiver() throws Exception { @Test public void testGetPaths() { - JobVertexTaskManagersHandler handler = new JobVertexTaskManagersHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), null); + JobVertexTaskManagersHandler handler = new JobVertexTaskManagersHandler(mock(ExecutionGraphCache.class), Executors.directExecutor(), null); String[] paths = handler.getPaths(); Assert.assertEquals(1, paths.length); Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/taskmanagers", paths[0]); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskCurrentAttemptDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskCurrentAttemptDetailsHandlerTest.java index 9e0d54973e219..8082e3a1ae97c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskCurrentAttemptDetailsHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskCurrentAttemptDetailsHandlerTest.java @@ -32,7 +32,7 @@ public class SubtaskCurrentAttemptDetailsHandlerTest extends TestLogger { @Test public void testGetPaths() { - SubtaskCurrentAttemptDetailsHandler handler = new SubtaskCurrentAttemptDetailsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), null); + SubtaskCurrentAttemptDetailsHandler handler = new SubtaskCurrentAttemptDetailsHandler(mock(ExecutionGraphCache.class), Executors.directExecutor(), null); String[] paths = handler.getPaths(); Assert.assertEquals(1, paths.length); Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum", paths[0]); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptAccumulatorsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptAccumulatorsHandlerTest.java index 49e54c0d5cf58..894d6596c4eb0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptAccumulatorsHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptAccumulatorsHandlerTest.java @@ -65,7 +65,7 @@ public void testArchiver() throws Exception { @Test public void testGetPaths() { - SubtaskExecutionAttemptAccumulatorsHandler handler = new SubtaskExecutionAttemptAccumulatorsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor()); + SubtaskExecutionAttemptAccumulatorsHandler handler = new SubtaskExecutionAttemptAccumulatorsHandler(mock(ExecutionGraphCache.class), Executors.directExecutor()); String[] paths = handler.getPaths(); Assert.assertEquals(1, paths.length); Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt/accumulators", paths[0]); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptDetailsHandlerTest.java index e1fe8b5d9050c..32ba09c1d84a0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptDetailsHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptDetailsHandlerTest.java @@ -74,7 +74,7 @@ public void testArchiver() throws Exception { @Test public void testGetPaths() { - SubtaskExecutionAttemptDetailsHandler handler = new SubtaskExecutionAttemptDetailsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), null); + SubtaskExecutionAttemptDetailsHandler handler = new SubtaskExecutionAttemptDetailsHandler(mock(ExecutionGraphCache.class), Executors.directExecutor(), null); String[] paths = handler.getPaths(); Assert.assertEquals(1, paths.length); Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt", paths[0]); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksAllAccumulatorsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksAllAccumulatorsHandlerTest.java index 1478f00343caf..e55776d7a0132 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksAllAccumulatorsHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksAllAccumulatorsHandlerTest.java @@ -59,7 +59,7 @@ public void testArchiver() throws Exception { @Test public void testGetPaths() { - SubtasksAllAccumulatorsHandler handler = new SubtasksAllAccumulatorsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor()); + SubtasksAllAccumulatorsHandler handler = new SubtasksAllAccumulatorsHandler(mock(ExecutionGraphCache.class), Executors.directExecutor()); String[] paths = handler.getPaths(); Assert.assertEquals(1, paths.length); Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasks/accumulators", paths[0]); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksTimesHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksTimesHandlerTest.java index 3783b84b8c793..9f4a7dabeea82 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksTimesHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksTimesHandlerTest.java @@ -60,7 +60,7 @@ public void testArchiver() throws Exception { @Test public void testGetPaths() { - SubtasksTimesHandler handler = new SubtasksTimesHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor()); + SubtasksTimesHandler handler = new SubtasksTimesHandler(mock(ExecutionGraphCache.class), Executors.directExecutor()); String[] paths = handler.getPaths(); Assert.assertEquals(1, paths.length); Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasktimes", paths[0]); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandlerTest.java index 7050fa66e8c88..0e12e0fdbc931 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandlerTest.java @@ -23,7 +23,7 @@ import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings; -import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphHolder; +import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; import org.apache.flink.runtime.webmonitor.history.ArchivedJson; import org.apache.flink.runtime.webmonitor.history.JsonArchivist; @@ -79,7 +79,7 @@ public void testArchiver() throws IOException { @Test public void testGetPaths() { - CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor()); + CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphCache.class), Executors.directExecutor()); String[] paths = handler.getPaths(); Assert.assertEquals(1, paths.length); Assert.assertEquals("/jobs/:jobid/checkpoints/config", paths[0]); @@ -95,7 +95,7 @@ public void testSimpleConfig() throws Exception { AccessExecutionGraph graph = graphAndSettings.graph; CheckpointCoordinatorConfiguration chkConfig = graphAndSettings.jobCheckpointingConfiguration; - CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor()); + CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphCache.class), Executors.directExecutor()); String json = handler.handleRequest(graph, Collections.emptyMap()).get(); ObjectMapper mapper = new ObjectMapper(); @@ -121,7 +121,7 @@ public void testAtLeastOnce() throws Exception { AccessExecutionGraph graph = graphAndSettings.graph; - CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor()); + CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphCache.class), Executors.directExecutor()); String json = handler.handleRequest(graph, Collections.emptyMap()).get(); ObjectMapper mapper = new ObjectMapper(); @@ -140,7 +140,7 @@ public void testEnabledExternalizedCheckpointSettings() throws Exception { AccessExecutionGraph graph = graphAndSettings.graph; ExternalizedCheckpointSettings externalizedSettings = graphAndSettings.externalizedSettings; - CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor()); + CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphCache.class), Executors.directExecutor()); String json = handler.handleRequest(graph, Collections.emptyMap()).get(); ObjectMapper mapper = new ObjectMapper(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandlerTest.java index e614608850d14..1eac20b337e2f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandlerTest.java @@ -31,7 +31,7 @@ import org.apache.flink.runtime.concurrent.Executors; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphHolder; +import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; import org.apache.flink.runtime.webmonitor.history.ArchivedJson; import org.apache.flink.runtime.webmonitor.history.JsonArchivist; @@ -102,7 +102,7 @@ public void testArchiver() throws IOException { @Test public void testGetPaths() { - CheckpointStatsDetailsHandler handler = new CheckpointStatsDetailsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0)); + CheckpointStatsDetailsHandler handler = new CheckpointStatsDetailsHandler(mock(ExecutionGraphCache.class), Executors.directExecutor(), new CheckpointStatsCache(0)); String[] paths = handler.getPaths(); Assert.assertEquals(1, paths.length); Assert.assertEquals("/jobs/:jobid/checkpoints/details/:checkpointid", paths[0]); @@ -114,7 +114,7 @@ public void testGetPaths() { @Test public void testIllegalCheckpointId() throws Exception { AccessExecutionGraph graph = mock(AccessExecutionGraph.class); - CheckpointStatsDetailsHandler handler = new CheckpointStatsDetailsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0)); + CheckpointStatsDetailsHandler handler = new CheckpointStatsDetailsHandler(mock(ExecutionGraphCache.class), Executors.directExecutor(), new CheckpointStatsCache(0)); Map params = new HashMap<>(); params.put("checkpointid", "illegal checkpoint"); String json = handler.handleRequest(graph, params).get(); @@ -128,7 +128,7 @@ public void testIllegalCheckpointId() throws Exception { @Test public void testNoCheckpointIdParam() throws Exception { AccessExecutionGraph graph = mock(AccessExecutionGraph.class); - CheckpointStatsDetailsHandler handler = new CheckpointStatsDetailsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0)); + CheckpointStatsDetailsHandler handler = new CheckpointStatsDetailsHandler(mock(ExecutionGraphCache.class), Executors.directExecutor(), new CheckpointStatsCache(0)); String json = handler.handleRequest(graph, Collections.emptyMap()).get(); assertEquals("{}", json); @@ -148,7 +148,7 @@ public void testCheckpointNotFound() throws Exception { AccessExecutionGraph graph = mock(AccessExecutionGraph.class); when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot); - CheckpointStatsDetailsHandler handler = new CheckpointStatsDetailsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0)); + CheckpointStatsDetailsHandler handler = new CheckpointStatsDetailsHandler(mock(ExecutionGraphCache.class), Executors.directExecutor(), new CheckpointStatsCache(0)); Map params = new HashMap<>(); params.put("checkpointid", "123"); String json = handler.handleRequest(graph, params).get(); @@ -319,7 +319,7 @@ private static JsonNode triggerRequest(AbstractCheckpointStats checkpoint) throw AccessExecutionGraph graph = mock(AccessExecutionGraph.class); when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot); - CheckpointStatsDetailsHandler handler = new CheckpointStatsDetailsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0)); + CheckpointStatsDetailsHandler handler = new CheckpointStatsDetailsHandler(mock(ExecutionGraphCache.class), Executors.directExecutor(), new CheckpointStatsCache(0)); Map params = new HashMap<>(); params.put("checkpointid", "123"); String json = handler.handleRequest(graph, params).get(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandlerTest.java index 4be7840a536b3..4bb385c14298e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandlerTest.java @@ -33,7 +33,7 @@ import org.apache.flink.runtime.checkpoint.RestoredCheckpointStats; import org.apache.flink.runtime.concurrent.Executors; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; -import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphHolder; +import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; import org.apache.flink.runtime.webmonitor.history.ArchivedJson; import org.apache.flink.runtime.webmonitor.history.JsonArchivist; @@ -87,7 +87,7 @@ public void testArchiver() throws IOException { @Test public void testGetPaths() { - CheckpointStatsHandler handler = new CheckpointStatsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor()); + CheckpointStatsHandler handler = new CheckpointStatsHandler(mock(ExecutionGraphCache.class), Executors.directExecutor()); String[] paths = handler.getPaths(); Assert.assertEquals(1, paths.length); Assert.assertEquals("/jobs/:jobid/checkpoints", paths[0]); @@ -100,7 +100,7 @@ public void testGetPaths() { public void testCheckpointStatsRequest() throws Exception { TestCheckpointStats testCheckpointStats = createTestCheckpointStats(); - CheckpointStatsHandler handler = new CheckpointStatsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor()); + CheckpointStatsHandler handler = new CheckpointStatsHandler(mock(ExecutionGraphCache.class), Executors.directExecutor()); String json = handler.handleRequest(testCheckpointStats.graph, Collections.emptyMap()).get(); ObjectMapper mapper = new ObjectMapper(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java index 4d9b3948de23b..b352baec98963 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java @@ -30,7 +30,7 @@ import org.apache.flink.runtime.concurrent.Executors; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphHolder; +import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; import org.apache.flink.runtime.webmonitor.history.ArchivedJson; import org.apache.flink.runtime.webmonitor.history.JsonArchivist; @@ -101,7 +101,7 @@ public void testArchiver() throws Exception { @Test public void testGetPaths() { - CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0)); + CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphCache.class), Executors.directExecutor(), new CheckpointStatsCache(0)); String[] paths = handler.getPaths(); Assert.assertEquals(1, paths.length); Assert.assertEquals("/jobs/:jobid/checkpoints/details/:checkpointid/subtasks/:vertexid", paths[0]); @@ -150,7 +150,7 @@ public void testSubtaskRequestNoSummary() throws Exception { @Test public void testIllegalCheckpointId() throws Exception { AccessExecutionGraph graph = mock(AccessExecutionGraph.class); - CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0)); + CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphCache.class), Executors.directExecutor(), new CheckpointStatsCache(0)); Map params = new HashMap<>(); params.put("checkpointid", "illegal checkpoint"); String json = handler.handleRequest(graph, params).get(); @@ -164,7 +164,7 @@ public void testIllegalCheckpointId() throws Exception { @Test public void testNoCheckpointIdParam() throws Exception { AccessExecutionGraph graph = mock(AccessExecutionGraph.class); - CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0)); + CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphCache.class), Executors.directExecutor(), new CheckpointStatsCache(0)); String json = handler.handleRequest(graph, Collections.emptyMap()).get(); assertEquals("{}", json); @@ -184,7 +184,7 @@ public void testCheckpointNotFound() throws Exception { AccessExecutionGraph graph = mock(AccessExecutionGraph.class); when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot); - CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0)); + CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphCache.class), Executors.directExecutor(), new CheckpointStatsCache(0)); Map params = new HashMap<>(); params.put("checkpointid", "123"); params.put("vertexid", new JobVertexID().toString()); @@ -200,7 +200,7 @@ public void testCheckpointNotFound() throws Exception { @Test public void testIllegalJobVertexIdParam() throws Exception { AccessExecutionGraph graph = mock(AccessExecutionGraph.class); - CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0)); + CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphCache.class), Executors.directExecutor(), new CheckpointStatsCache(0)); Map params = new HashMap<>(); params.put("checkpointid", "1"); params.put("vertexid", "illegal vertex id"); @@ -215,7 +215,7 @@ public void testIllegalJobVertexIdParam() throws Exception { @Test public void testNoJobVertexIdParam() throws Exception { AccessExecutionGraph graph = mock(AccessExecutionGraph.class); - CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0)); + CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphCache.class), Executors.directExecutor(), new CheckpointStatsCache(0)); Map params = new HashMap<>(); params.put("checkpointid", "1"); String json = handler.handleRequest(graph, params).get(); @@ -239,7 +239,7 @@ public void testJobVertexNotFound() throws Exception { AccessExecutionGraph graph = mock(AccessExecutionGraph.class); when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot); - CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0)); + CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphCache.class), Executors.directExecutor(), new CheckpointStatsCache(0)); Map params = new HashMap<>(); params.put("checkpointid", "123"); params.put("vertexid", new JobVertexID().toString()); @@ -260,7 +260,7 @@ private static JsonNode triggerRequest(AbstractCheckpointStats checkpoint) throw AccessExecutionGraph graph = mock(AccessExecutionGraph.class); when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot); - CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0)); + CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphCache.class), Executors.directExecutor(), new CheckpointStatsCache(0)); Map params = new HashMap<>(); params.put("checkpointid", "123"); params.put("vertexid", new JobVertexID().toString()); diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java index 43641d274bfb2..c101b759d0676 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java @@ -30,6 +30,7 @@ import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.clusterframework.BootstrapTools; import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; +import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; import org.apache.flink.runtime.jobmanager.JobManager; @@ -353,7 +354,7 @@ protected int runApplicationMaster(Configuration config) { new AkkaJobManagerRetriever(actorSystem, webMonitorTimeout, 10, Time.milliseconds(50L)), new AkkaQueryServiceRetriever(actorSystem, webMonitorTimeout), webMonitorTimeout, - futureExecutor, + new ScheduledExecutorServiceAdapter(futureExecutor), LOG); // 2: the JobManager