From d611489d0184ed1358dec965fa4015b1d1f3f503 Mon Sep 17 00:00:00 2001 From: zentol Date: Wed, 1 Mar 2017 12:23:15 +0100 Subject: [PATCH] [FLINK-5870] Handlers define REST URLs --- .../webmonitor/RuntimeMonitorHandler.java | 5 + .../webmonitor/RuntimeMonitorHandlerBase.java | 7 + .../runtime/webmonitor/WebRuntimeMonitor.java | 193 ++++++++++-------- .../handlers/ClusterOverviewHandler.java | 7 + .../handlers/CurrentJobIdsHandler.java | 7 + .../handlers/CurrentJobsOverviewHandler.java | 16 ++ .../handlers/DashboardConfigHandler.java | 7 + .../handlers/JarAccessDeniedHandler.java | 11 + .../webmonitor/handlers/JarDeleteHandler.java | 7 + .../webmonitor/handlers/JarListHandler.java | 7 + .../webmonitor/handlers/JarPlanHandler.java | 7 + .../webmonitor/handlers/JarRunHandler.java | 7 + .../webmonitor/handlers/JarUploadHandler.java | 7 + .../handlers/JobAccumulatorsHandler.java | 7 + .../handlers/JobCancellationHandler.java | 8 + .../JobCancellationWithSavepointHandlers.java | 17 +- .../webmonitor/handlers/JobConfigHandler.java | 7 + .../handlers/JobDetailsHandler.java | 8 + .../handlers/JobExceptionsHandler.java | 7 + .../handlers/JobManagerConfigHandler.java | 7 + .../webmonitor/handlers/JobPlanHandler.java | 6 + .../handlers/JobStoppingHandler.java | 8 + .../JobVertexAccumulatorsHandler.java | 7 + .../JobVertexBackPressureHandler.java | 7 + .../handlers/JobVertexDetailsHandler.java | 7 + .../JobVertexTaskManagersHandler.java | 7 + .../webmonitor/handlers/RequestHandler.java | 7 + .../SubtaskCurrentAttemptDetailsHandler.java | 7 + ...skExecutionAttemptAccumulatorsHandler.java | 7 + ...SubtaskExecutionAttemptDetailsHandler.java | 7 + .../SubtasksAllAccumulatorsHandler.java | 7 + .../handlers/SubtasksTimesHandler.java | 6 + .../handlers/TaskManagerLogHandler.java | 12 ++ .../handlers/TaskManagersHandler.java | 8 + .../checkpoints/CheckpointConfigHandler.java | 7 + .../CheckpointStatsDetailsHandler.java | 7 + ...CheckpointStatsDetailsSubtasksHandler.java | 7 + .../checkpoints/CheckpointStatsHandler.java | 7 + .../metrics/JobManagerMetricsHandler.java | 8 + .../webmonitor/metrics/JobMetricsHandler.java | 6 + .../metrics/JobVertexMetricsHandler.java | 6 + .../metrics/TaskManagerMetricsHandler.java | 8 + .../handlers/ClusterOverviewHandlerTest.java | 34 +++ .../handlers/CurrentJobIdsHandlerTest.java | 34 +++ .../CurrentJobsOverviewHandlerTest.java | 44 ++++ .../handlers/DashboardConfigHandlerTest.java | 31 +++ .../handlers/JarAccessDeniedHandlerTest.java | 39 ++++ .../handlers/JarDeleteHandlerTest.java | 31 +++ .../handlers/JarListHandlerTest.java | 31 +++ .../handlers/JarPlanHandlerTest.java | 31 +++ .../handlers/JarRunHandlerTest.java | 31 +++ .../handlers/JarUploadHandlerTest.java | 31 +++ .../handlers/JobAccumulatorsHandlerTest.java | 31 +++ .../handlers/JobCancellationHandlerTest.java | 36 ++++ ...CancellationWithSavepointHandlersTest.java | 20 ++ .../handlers/JobConfigHandlerTest.java | 31 +++ .../handlers/JobDetailsHandlerTest.java | 36 ++++ .../handlers/JobExceptionsHandlerTest.java | 31 +++ .../handlers/JobManagerConfigHandlerTest.java | 31 +++ .../handlers/JobPlanHandlerTest.java | 31 +++ .../handlers/JobStoppingHandlerTest.java | 36 ++++ .../JobVertexAccumulatorsHandlerTest.java | 31 +++ .../JobVertexBackPressureHandlerTest.java | 8 + .../handlers/JobVertexDetailsHandlerTest.java | 31 +++ .../JobVertexTaskManagersHandlerTest.java | 31 +++ ...btaskCurrentAttemptDetailsHandlerTest.java | 31 +++ ...ecutionAttemptAccumulatorsHandlerTest.java | 31 +++ ...askExecutionAttemptDetailsHandlerTest.java | 31 +++ .../handlers/SubtaskTimesHandlerTest.java | 31 +++ .../SubtasksAllAccumulatorsHandlerTest.java | 31 +++ .../handlers/TaskManagerLogHandlerTest.java | 28 +++ .../handlers/TaskManagersHandlerTest.java | 38 ++++ .../CheckpointConfigHandlerTest.java | 9 + .../CheckpointStatsDetailsHandlerTest.java | 9 + .../CheckpointStatsHandlerTest.java | 9 + ...ckpointStatsSubtaskDetailsHandlerTest.java | 9 + .../metrics/JobManagerMetricsHandlerTest.java | 10 + .../metrics/JobMetricsHandlerTest.java | 10 + .../metrics/JobVertexMetricsHandlerTest.java | 10 + .../TaskManagerMetricsHandlerTest.java | 10 + 80 files changed, 1439 insertions(+), 87 deletions(-) create mode 100644 flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandlerTest.java create mode 100644 flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandlerTest.java create mode 100644 flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java create mode 100644 flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandlerTest.java create mode 100644 flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandlerTest.java create mode 100644 flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandlerTest.java create mode 100644 flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandlerTest.java create mode 100644 flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandlerTest.java create mode 100644 flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java create mode 100644 flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandlerTest.java create mode 100644 flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandlerTest.java create mode 100644 flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandlerTest.java create mode 100644 flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandlerTest.java create mode 100644 flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandlerTest.java create mode 100644 flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java create mode 100644 flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandlerTest.java create mode 100644 flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandlerTest.java create mode 100644 flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandlerTest.java create mode 100644 flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandlerTest.java create mode 100644 flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandlerTest.java create mode 100644 flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandlerTest.java create mode 100644 flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandlerTest.java create mode 100644 flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandlerTest.java create mode 100644 flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandlerTest.java create mode 100644 flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskTimesHandlerTest.java create mode 100644 flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandlerTest.java create mode 100644 flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandlerTest.java diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java index 8dbd13566ff58..8bd58a3f029d7 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java @@ -75,6 +75,11 @@ public RuntimeMonitorHandler( this.handler = checkNotNull(handler); } + @Override + public String[] getPaths() { + return handler.getPaths(); + } + @Override protected void respondAsLeader(ChannelHandlerContext ctx, Routed routed, ActorGateway jobManager) { FullHttpResponse response; diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandlerBase.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandlerBase.java index 9442867609404..3c1dcb63f7015 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandlerBase.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandlerBase.java @@ -67,6 +67,13 @@ public RuntimeMonitorHandlerBase( this.httpsEnabled = httpsEnabled; } + /** + * Returns an array of REST URL's under which this handler can be registered. + * + * @return array containing REST URL's under which this handler can be registered. + */ + public abstract String[] getPaths(); + @Override protected void channelRead0(ChannelHandlerContext ctx, Routed routed) throws Exception { if (localJobManagerAddressFuture.isCompleted()) { diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java index a9cb630eabe72..dddc69dacbdf8 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java @@ -256,56 +256,50 @@ public WebRuntimeMonitor( RuntimeMonitorHandler triggerHandler = handler(cancelWithSavepoint.getTriggerHandler()); RuntimeMonitorHandler inProgressHandler = handler(cancelWithSavepoint.getInProgressHandler()); - router = new Router() - // config how to interact with this web server - .GET("/config", handler(new DashboardConfigHandler(cfg.getRefreshInterval()))) - - // the overview - how many task managers, slots, free slots, ... - .GET("/overview", handler(new ClusterOverviewHandler(DEFAULT_REQUEST_TIMEOUT))) - - // job manager configuration - .GET("/jobmanager/config", handler(new JobManagerConfigHandler(config))) - - // overview over jobs - .GET("/joboverview", handler(new CurrentJobsOverviewHandler(DEFAULT_REQUEST_TIMEOUT, true, true))) - .GET("/joboverview/running", handler(new CurrentJobsOverviewHandler(DEFAULT_REQUEST_TIMEOUT, true, false))) - .GET("/joboverview/completed", handler(new CurrentJobsOverviewHandler(DEFAULT_REQUEST_TIMEOUT, false, true))) - - .GET("/jobs", handler(new CurrentJobIdsHandler(DEFAULT_REQUEST_TIMEOUT))) - - .GET("/jobs/:jobid", handler(new JobDetailsHandler(currentGraphs, metricFetcher))) - .GET("/jobs/:jobid/vertices", handler(new JobDetailsHandler(currentGraphs, metricFetcher))) - - .GET("/jobs/:jobid/vertices/:vertexid", handler(new JobVertexDetailsHandler(currentGraphs, metricFetcher))) - .GET("/jobs/:jobid/vertices/:vertexid/subtasktimes", handler(new SubtasksTimesHandler(currentGraphs))) - .GET("/jobs/:jobid/vertices/:vertexid/taskmanagers", handler(new JobVertexTaskManagersHandler(currentGraphs, metricFetcher))) - .GET("/jobs/:jobid/vertices/:vertexid/accumulators", handler(new JobVertexAccumulatorsHandler(currentGraphs))) - .GET("/jobs/:jobid/vertices/:vertexid/backpressure", handler(new JobVertexBackPressureHandler( - currentGraphs, - backPressureStatsTracker, - refreshInterval))) - .GET("/jobs/:jobid/vertices/:vertexid/metrics", handler(new JobVertexMetricsHandler(metricFetcher))) - .GET("/jobs/:jobid/vertices/:vertexid/subtasks/accumulators", handler(new SubtasksAllAccumulatorsHandler(currentGraphs))) - .GET("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum", handler(new SubtaskCurrentAttemptDetailsHandler(currentGraphs, metricFetcher))) - .GET("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt", handler(new SubtaskExecutionAttemptDetailsHandler(currentGraphs, metricFetcher))) - .GET("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt/accumulators", handler(new SubtaskExecutionAttemptAccumulatorsHandler(currentGraphs))) - - .GET("/jobs/:jobid/plan", handler(new JobPlanHandler(currentGraphs))) - .GET("/jobs/:jobid/config", handler(new JobConfigHandler(currentGraphs))) - .GET("/jobs/:jobid/exceptions", handler(new JobExceptionsHandler(currentGraphs))) - .GET("/jobs/:jobid/accumulators", handler(new JobAccumulatorsHandler(currentGraphs))) - .GET("/jobs/:jobid/metrics", handler(new JobMetricsHandler(metricFetcher))) - - .GET("/taskmanagers", handler(new TaskManagersHandler(DEFAULT_REQUEST_TIMEOUT, metricFetcher))) - .GET("/taskmanagers/:" + TaskManagersHandler.TASK_MANAGER_ID_KEY, handler(new TaskManagersHandler(DEFAULT_REQUEST_TIMEOUT, metricFetcher))) - .GET("/taskmanagers/:" + TaskManagersHandler.TASK_MANAGER_ID_KEY + "/log", - new TaskManagerLogHandler(retriever, context, jobManagerAddressPromise.future(), timeout, - TaskManagerLogHandler.FileMode.LOG, config, enableSSL)) - .GET("/taskmanagers/:" + TaskManagersHandler.TASK_MANAGER_ID_KEY + "/stdout", - new TaskManagerLogHandler(retriever, context, jobManagerAddressPromise.future(), timeout, - TaskManagerLogHandler.FileMode.STDOUT, config, enableSSL)) - .GET("/taskmanagers/:" + TaskManagersHandler.TASK_MANAGER_ID_KEY + "/metrics", handler(new TaskManagerMetricsHandler(metricFetcher))) + router = new Router(); + // config how to interact with this web server + GET(router, new DashboardConfigHandler(cfg.getRefreshInterval())); + + // the overview - how many task managers, slots, free slots, ... + GET(router, new ClusterOverviewHandler(DEFAULT_REQUEST_TIMEOUT)); + + // job manager configuration + GET(router, new JobManagerConfigHandler(config)); + + // overview over jobs + GET(router, new CurrentJobsOverviewHandler(DEFAULT_REQUEST_TIMEOUT, true, true)); + GET(router, new CurrentJobsOverviewHandler(DEFAULT_REQUEST_TIMEOUT, true, false)); + GET(router, new CurrentJobsOverviewHandler(DEFAULT_REQUEST_TIMEOUT, false, true)); + + GET(router, new CurrentJobIdsHandler(DEFAULT_REQUEST_TIMEOUT)); + + GET(router, new JobDetailsHandler(currentGraphs, metricFetcher)); + + GET(router, new JobVertexDetailsHandler(currentGraphs, metricFetcher)); + GET(router, new SubtasksTimesHandler(currentGraphs)); + GET(router, new JobVertexTaskManagersHandler(currentGraphs, metricFetcher)); + GET(router, new JobVertexAccumulatorsHandler(currentGraphs)); + GET(router, new JobVertexBackPressureHandler(currentGraphs, backPressureStatsTracker, refreshInterval)); + GET(router, new JobVertexMetricsHandler(metricFetcher)); + GET(router, new SubtasksAllAccumulatorsHandler(currentGraphs)); + GET(router, new SubtaskCurrentAttemptDetailsHandler(currentGraphs, metricFetcher)); + GET(router, new SubtaskExecutionAttemptDetailsHandler(currentGraphs, metricFetcher)); + GET(router, new SubtaskExecutionAttemptAccumulatorsHandler(currentGraphs)); + + GET(router, new JobPlanHandler(currentGraphs)); + GET(router, new JobConfigHandler(currentGraphs)); + GET(router, new JobExceptionsHandler(currentGraphs)); + GET(router, new JobAccumulatorsHandler(currentGraphs)); + GET(router, new JobMetricsHandler(metricFetcher)); + + GET(router, new TaskManagersHandler(DEFAULT_REQUEST_TIMEOUT, metricFetcher)); + GET(router, new TaskManagerLogHandler(retriever, context, jobManagerAddressPromise.future(), timeout, + TaskManagerLogHandler.FileMode.LOG, config, enableSSL)); + GET(router, new TaskManagerLogHandler(retriever, context, jobManagerAddressPromise.future(), timeout, + TaskManagerLogHandler.FileMode.STDOUT, config, enableSSL)); + GET(router, new TaskManagerMetricsHandler(metricFetcher)); + router // log and stdout .GET("/jobmanager/log", logFiles.logFile == null ? new ConstantTextHandler("(log file unavailable)") : new StaticFileServerHandler(retriever, jobManagerAddressPromise.future(), timeout, logFiles.logFile, @@ -313,25 +307,22 @@ public WebRuntimeMonitor( .GET("/jobmanager/stdout", logFiles.stdOutFile == null ? new ConstantTextHandler("(stdout file unavailable)") : new StaticFileServerHandler(retriever, jobManagerAddressPromise.future(), timeout, logFiles.stdOutFile, - enableSSL)) - - .GET("/jobmanager/metrics", handler(new JobManagerMetricsHandler(metricFetcher))) + enableSSL)); - // Cancel a job via GET (for proper integration with YARN this has to be performed via GET) - .GET("/jobs/:jobid/yarn-cancel", handler(new JobCancellationHandler())) + GET(router, new JobManagerMetricsHandler(metricFetcher)); - // DELETE is the preferred way of canceling a job (Rest-conform) - .DELETE("/jobs/:jobid/cancel", handler(new JobCancellationHandler())) + // Cancel a job via GET (for proper integration with YARN this has to be performed via GET) + GET(router, new JobCancellationHandler()); + // DELETE is the preferred way of canceling a job (Rest-conform) + DELETE(router, new JobCancellationHandler()); - .GET("/jobs/:jobid/cancel-with-savepoint", triggerHandler) - .GET("/jobs/:jobid/cancel-with-savepoint/target-directory/:targetDirectory", triggerHandler) - .GET(JobCancellationWithSavepointHandlers.IN_PROGRESS_URL, inProgressHandler) + GET(router, triggerHandler); + GET(router, inProgressHandler); - // stop a job via GET (for proper integration with YARN this has to be performed via GET) - .GET("/jobs/:jobid/yarn-stop", handler(new JobStoppingHandler())) - - // DELETE is the preferred way of stopping a job (Rest-conform) - .DELETE("/jobs/:jobid/stop", handler(new JobStoppingHandler())); + // stop a job via GET (for proper integration with YARN this has to be performed via GET) + GET(router, new JobStoppingHandler()); + // DELETE is the preferred way of stopping a job (Rest-conform) + DELETE(router, new JobStoppingHandler()); int maxCachedEntries = config.getInteger( ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE, @@ -339,34 +330,32 @@ public WebRuntimeMonitor( CheckpointStatsCache cache = new CheckpointStatsCache(maxCachedEntries); // Register the checkpoint stats handlers - router - .GET("/jobs/:jobid/checkpoints", handler(new CheckpointStatsHandler(currentGraphs))) - .GET("/jobs/:jobid/checkpoints/config", handler(new CheckpointConfigHandler(currentGraphs))) - .GET("/jobs/:jobid/checkpoints/details/:checkpointid", handler(new CheckpointStatsDetailsHandler(currentGraphs, cache))) - .GET("/jobs/:jobid/checkpoints/details/:checkpointid/subtasks/:vertexid", handler(new CheckpointStatsDetailsSubtasksHandler(currentGraphs, cache))); + GET(router, new CheckpointStatsHandler(currentGraphs)); + GET(router, new CheckpointConfigHandler(currentGraphs)); + GET(router, new CheckpointStatsDetailsHandler(currentGraphs, cache)); + GET(router, new CheckpointStatsDetailsSubtasksHandler(currentGraphs, cache)); if (webSubmitAllow) { - router - // fetch the list of uploaded jars. - .GET("/jars", handler(new JarListHandler(uploadDir))) + // fetch the list of uploaded jars. + GET(router, new JarListHandler(uploadDir)); - // get plan for an uploaded jar - .GET("/jars/:jarid/plan", handler(new JarPlanHandler(uploadDir))) + // get plan for an uploaded jar + GET(router, new JarPlanHandler(uploadDir)); - // run a jar - .POST("/jars/:jarid/run", handler(new JarRunHandler(uploadDir, timeout, config))) + // run a jar + POST(router, new JarRunHandler(uploadDir, timeout, config)); - // upload a jar - .POST("/jars/upload", handler(new JarUploadHandler(uploadDir))) + // upload a jar + POST(router, new JarUploadHandler(uploadDir)); - // delete an uploaded jar from submission interface - .DELETE("/jars/:jarid", handler(new JarDeleteHandler(uploadDir))); + // delete an uploaded jar from submission interface + DELETE(router, new JarDeleteHandler(uploadDir)); } else { - router - // send an Access Denied message (sort of) - // Every other GET request will go to the File Server, which will not provide - // access to the jar directory anyway, because it doesn't exist in webRootDir. - .GET("/jars", handler(new JarAccessDeniedHandler())); + // send an Access Denied message + JarAccessDeniedHandler jad = new JarAccessDeniedHandler(); + GET(router, jad); + POST(router, jad); + DELETE(router, jad); } // this handler serves all the static contents @@ -526,6 +515,40 @@ private void cleanup() { } } + /** These methods are used in the route path setup. They register the given {@link RequestHandler} or + * {@link RuntimeMonitorHandlerBase} with the given {@link Router} for the respective REST method. + * The REST paths under which they are registered are defined by the handlers. **/ + + private void GET(Router router, RequestHandler handler) { + GET(router, handler(handler)); + } + + private void GET(Router router, RuntimeMonitorHandlerBase handler) { + for (String path : handler.getPaths()) { + router.GET(path, handler); + } + } + + private void DELETE(Router router, RequestHandler handler) { + DELETE(router, handler(handler)); + } + + private void DELETE(Router router, RuntimeMonitorHandlerBase handler) { + for (String path : handler.getPaths()) { + router.DELETE(path, handler); + } + } + + private void POST(Router router, RequestHandler handler) { + POST(router, handler(handler)); + } + + private void POST(Router router, RuntimeMonitorHandlerBase handler) { + for (String path : handler.getPaths()) { + router.POST(path, handler); + } + } + // ------------------------------------------------------------------------ // Utilities // ------------------------------------------------------------------------ diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java index 99ef3d9e27071..2bd055d8add66 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java @@ -38,6 +38,8 @@ */ public class ClusterOverviewHandler extends AbstractJsonRequestHandler { + private static final String CLUSTER_OVERVIEW_REST_PATH = "/overview"; + private static final String version = EnvironmentInformation.getVersion(); private static final String commitID = EnvironmentInformation.getRevisionInformation().commitId; @@ -48,6 +50,11 @@ public ClusterOverviewHandler(FiniteDuration timeout) { this.timeout = checkNotNull(timeout); } + @Override + public String[] getPaths() { + return new String[]{CLUSTER_OVERVIEW_REST_PATH}; + } + @Override public String handleJsonRequest(Map pathParams, Map queryParams, ActorGateway jobManager) throws Exception { // we need no parameters, get all requests diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java index b690c5666fc1b..94b1c16c275a7 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java @@ -39,11 +39,18 @@ */ public class CurrentJobIdsHandler extends AbstractJsonRequestHandler { + private static final String CURRENT_JOB_IDS_REST_PATH = "/jobs"; + private final FiniteDuration timeout; public CurrentJobIdsHandler(FiniteDuration timeout) { this.timeout = requireNonNull(timeout); } + + @Override + public String[] getPaths() { + return new String[]{CURRENT_JOB_IDS_REST_PATH}; + } @Override public String handleJsonRequest(Map pathParams, Map queryParams, ActorGateway jobManager) throws Exception { diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java index 07064da0cfb77..8486a9cd6b71b 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java @@ -38,6 +38,10 @@ */ public class CurrentJobsOverviewHandler extends AbstractJsonRequestHandler { + private static final String ALL_JOBS_REST_PATH = "/joboverview"; + private static final String RUNNING_JOBS_REST_PATH = "/joboverview/running"; + private static final String COMPLETED_JOBS_REST_PATH = "/joboverview/completed"; + private final FiniteDuration timeout; private final boolean includeRunningJobs; @@ -54,6 +58,18 @@ public CurrentJobsOverviewHandler( this.includeFinishedJobs = includeFinishedJobs; } + @Override + public String[] getPaths() { + if (includeRunningJobs && includeFinishedJobs) { + return new String[]{ALL_JOBS_REST_PATH}; + } + if (includeRunningJobs) { + return new String[]{RUNNING_JOBS_REST_PATH}; + } else { + return new String[]{COMPLETED_JOBS_REST_PATH}; + } + } + @Override public String handleJsonRequest(Map pathParams, Map queryParams, ActorGateway jobManager) throws Exception { try { diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java index 6fe072b12e532..49f4c265c7a3b 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java @@ -32,6 +32,8 @@ * and time zone of the server timestamps. */ public class DashboardConfigHandler extends AbstractJsonRequestHandler { + + private static String DASHBOARD_CONFIG_REST_PATH = "/config"; private final String configString; @@ -65,6 +67,11 @@ public DashboardConfigHandler(long refreshInterval) { throw new RuntimeException(e.getMessage(), e); } } + + @Override + public String[] getPaths() { + return new String[]{DASHBOARD_CONFIG_REST_PATH}; + } @Override public String handleJsonRequest(Map pathParams, Map queryParams, ActorGateway jobManager) throws Exception { diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java index ba32d0db6c20f..f0e3faf905058 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java @@ -27,6 +27,17 @@ public class JarAccessDeniedHandler extends AbstractJsonRequestHandler { private static final String ERROR_MESSAGE = "{\"error\": \"Web submission interface is not " + "available for this cluster. To enable it, set the configuration key ' jobmanager.web.submit.enable.'\"}"; + @Override + public String[] getPaths() { + return new String[]{ + JarListHandler.JAR_LIST_REST_PATH, + JarPlanHandler.JAR_PLAN_REST_PATH, + JarRunHandler.JAR_RUN_REST_PATH, + JarUploadHandler.JAR_UPLOAD_REST_PATH, + JarDeleteHandler.JAR_DELETE_REST_PATH + }; + } + @Override public String handleJsonRequest(Map pathParams, Map queryParams, ActorGateway jobManager) throws Exception { return ERROR_MESSAGE; diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java index ae959a5a3c4d0..69869bedea908 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java @@ -31,12 +31,19 @@ */ public class JarDeleteHandler extends AbstractJsonRequestHandler { + public static final String JAR_DELETE_REST_PATH = "/jars/:jarid"; + private final File jarDir; public JarDeleteHandler(File jarDirectory) { jarDir = jarDirectory; } + @Override + public String[] getPaths() { + return new String[]{JAR_DELETE_REST_PATH}; + } + @Override public String handleJsonRequest(Map pathParams, Map queryParams, ActorGateway jobManager) throws Exception { final String file = pathParams.get("jarid"); diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java index f3cdc30c5ba48..1af8e39025601 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java @@ -33,12 +33,19 @@ public class JarListHandler extends AbstractJsonRequestHandler { + public static final String JAR_LIST_REST_PATH = "/jars"; + private final File jarDir; public JarListHandler(File jarDirectory) { jarDir = jarDirectory; } + @Override + public String[] getPaths() { + return new String[]{JAR_LIST_REST_PATH}; + } + @Override public String handleJsonRequest(Map pathParams, Map queryParams, ActorGateway jobManager) throws Exception { try { diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java index bd0a6afd1723d..893e2b8cf90c3 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java @@ -32,10 +32,17 @@ */ public class JarPlanHandler extends JarActionHandler { + public static final String JAR_PLAN_REST_PATH = "/jars/:jarid/plan"; + public JarPlanHandler(File jarDirectory) { super(jarDirectory); } + @Override + public String[] getPaths() { + return new String[]{JAR_PLAN_REST_PATH}; + } + @Override public String handleJsonRequest(Map pathParams, Map queryParams, ActorGateway jobManager) throws Exception { try { diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java index 474be33b54e9d..a4a073e46eb2d 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java @@ -38,6 +38,8 @@ */ public class JarRunHandler extends JarActionHandler { + public static final String JAR_RUN_REST_PATH = "/jars/:jarid/run"; + private final FiniteDuration timeout; private final Configuration clientConfig; @@ -47,6 +49,11 @@ public JarRunHandler(File jarDirectory, FiniteDuration timeout, Configuration cl this.clientConfig = clientConfig; } + @Override + public String[] getPaths() { + return new String[]{JAR_RUN_REST_PATH}; + } + @Override public String handleJsonRequest(Map pathParams, Map queryParams, ActorGateway jobManager) throws Exception { try { diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java index 9a3b0e15ab09b..e7a2769efb852 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java @@ -29,12 +29,19 @@ */ public class JarUploadHandler extends AbstractJsonRequestHandler { + public static final String JAR_UPLOAD_REST_PATH = "/jars/upload"; + private final File jarDir; public JarUploadHandler(File jarDir) { this.jarDir = jarDir; } + @Override + public String[] getPaths() { + return new String[]{JAR_UPLOAD_REST_PATH}; + } + @Override public String handleJsonRequest( Map pathParams, diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java index 29613a079d39b..7664153496787 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java @@ -30,11 +30,18 @@ * Request handler that returns the aggregated user accumulators of a job. */ public class JobAccumulatorsHandler extends AbstractExecutionGraphRequestHandler { + + private static final String JOB_ACCUMULATORS_REST_PATH = "/jobs/:jobid/accumulators"; public JobAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder) { super(executionGraphHolder); } + @Override + public String[] getPaths() { + return new String[]{JOB_ACCUMULATORS_REST_PATH}; + } + @Override public String handleRequest(AccessExecutionGraph graph, Map params) throws Exception { StringifiedAccumulatorResult[] allAccumulators = graph.getAccumulatorResultsStringified(); diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java index 9f35719d98695..d9de7d7e6252e 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java @@ -30,6 +30,14 @@ */ public class JobCancellationHandler extends AbstractJsonRequestHandler { + private static final String JOB_CONCELLATION_REST_PATH = "/jobs/:jobid/cancel"; + private static final String JOB_CONCELLATION_YARN_REST_PATH = "/jobs/:jobid/yarn-cancel"; + + @Override + public String[] getPaths() { + return new String[]{JOB_CONCELLATION_REST_PATH, JOB_CONCELLATION_YARN_REST_PATH}; + } + @Override public String handleJsonRequest(Map pathParams, Map queryParams, ActorGateway jobManager) throws Exception { try { diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java index 492ce76c40912..b618d857fa9b3 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java @@ -55,8 +55,11 @@ */ public class JobCancellationWithSavepointHandlers { + private static final String CANCEL_WITH_SAVEPOINT_REST_PATH = "/jobs/:jobid/cancel-with-savepoint"; + private static final String CANCEL_WITH_SAVEPOINT_DIRECTORY_REST_PATH = "/jobs/:jobid/cancel-with-savepoint/target-directory/:targetDirectory"; + /** URL for in-progress cancellations. */ - public static final String IN_PROGRESS_URL = "/jobs/:jobid/cancel-with-savepoint/in-progress/:requestId"; + private static final String CANCELLATION_IN_PROGRESS_REST_PATH = "/jobs/:jobid/cancel-with-savepoint/in-progress/:requestId"; /** Encodings for String. */ private static final Charset ENCODING = Charset.forName("UTF-8"); @@ -126,6 +129,11 @@ public TriggerHandler(ExecutionGraphHolder currentGraphs, ExecutionContext execu this.executionContext = checkNotNull(executionContext); } + @Override + public String[] getPaths() { + return new String[]{CANCEL_WITH_SAVEPOINT_REST_PATH, CANCEL_WITH_SAVEPOINT_DIRECTORY_REST_PATH}; + } + @Override @SuppressWarnings("unchecked") public FullHttpResponse handleRequest( @@ -230,7 +238,7 @@ public void onComplete(Throwable failure, Object resp) throws Throwable { } // In-progress location - String location = IN_PROGRESS_URL + String location = CANCELLATION_IN_PROGRESS_REST_PATH .replace(":jobid", jobId.toString()) .replace(":requestId", Long.toString(requestId)); @@ -278,6 +286,11 @@ class InProgressHandler implements RequestHandler { /** Remember some recently completed */ private final ArrayDeque> recentlyCompleted = new ArrayDeque<>(NUM_GHOST_REQUEST_IDS); + @Override + public String[] getPaths() { + return new String[]{CANCELLATION_IN_PROGRESS_REST_PATH}; + } + @Override @SuppressWarnings("unchecked") public FullHttpResponse handleRequest(Map pathParams, Map queryParams, ActorGateway jobManager) throws Exception { diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java index 21639efce7e92..459ca2a8b38c6 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java @@ -31,10 +31,17 @@ */ public class JobConfigHandler extends AbstractExecutionGraphRequestHandler { + private static final String JOB_CONFIG_REST_PATH = "/jobs/:jobid/config"; + public JobConfigHandler(ExecutionGraphHolder executionGraphHolder) { super(executionGraphHolder); } + @Override + public String[] getPaths() { + return new String[]{JOB_CONFIG_REST_PATH}; + } + @Override public String handleRequest(AccessExecutionGraph graph, Map params) throws Exception { diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java index 35e6ca7a71e1f..7780e66a09d17 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java @@ -47,6 +47,9 @@ */ public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler { + private static final String JOB_DETAILS_REST_PATH = "/jobs/:jobid"; + private static final String JOB_DETAILS_VERTICES_REST_PATH = "/jobs/:jobid/vertices"; + private final MetricFetcher fetcher; public JobDetailsHandler(ExecutionGraphHolder executionGraphHolder, MetricFetcher fetcher) { @@ -54,6 +57,11 @@ public JobDetailsHandler(ExecutionGraphHolder executionGraphHolder, MetricFetche this.fetcher = fetcher; } + @Override + public String[] getPaths() { + return new String[]{JOB_DETAILS_REST_PATH, JOB_DETAILS_VERTICES_REST_PATH}; + } + @Override public String handleRequest(AccessExecutionGraph graph, Map params) throws Exception { final StringWriter writer = new StringWriter(); diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java index 376cca4f79309..3720dacab80fc 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java @@ -33,12 +33,19 @@ */ public class JobExceptionsHandler extends AbstractExecutionGraphRequestHandler { + private static final String JOB_EXCEPTIONS_REST_PATH = "/jobs/:jobid/exceptions"; + private static final int MAX_NUMBER_EXCEPTION_TO_REPORT = 20; public JobExceptionsHandler(ExecutionGraphHolder executionGraphHolder) { super(executionGraphHolder); } + @Override + public String[] getPaths() { + return new String[]{JOB_EXCEPTIONS_REST_PATH}; + } + @Override public String handleRequest(AccessExecutionGraph graph, Map params) throws Exception { StringWriter writer = new StringWriter(); diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java index 11ca931b5c2db..5fcf0102cb477 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java @@ -30,12 +30,19 @@ */ public class JobManagerConfigHandler extends AbstractJsonRequestHandler { + private static final String JOBMANAGER_CONFIG_REST_PATH = "/jobmanager/config"; + private final Configuration config; public JobManagerConfigHandler(Configuration config) { this.config = config; } + @Override + public String[] getPaths() { + return new String[]{JOBMANAGER_CONFIG_REST_PATH}; + } + @Override public String handleJsonRequest(Map pathParams, Map queryParams, ActorGateway jobManager) throws Exception { StringWriter writer = new StringWriter(); diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java index 64f70004f9bce..becc2e1cce840 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java @@ -28,11 +28,17 @@ */ public class JobPlanHandler extends AbstractExecutionGraphRequestHandler { + private static final String JOB_PLAN_REST_PATH = "/jobs/:jobid/plan"; public JobPlanHandler(ExecutionGraphHolder executionGraphHolder) { super(executionGraphHolder); } + @Override + public String[] getPaths() { + return new String[]{JOB_PLAN_REST_PATH}; + } + @Override public String handleRequest(AccessExecutionGraph graph, Map params) throws Exception { return graph.getJsonPlan(); diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java index 0f8c958015e09..c8ec689dad4c5 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java @@ -30,6 +30,14 @@ */ public class JobStoppingHandler extends AbstractJsonRequestHandler { + private static final String JOB_STOPPING_REST_PATH = "/jobs/:jobid/stop"; + private static final String JOB_STOPPING_YARN_REST_PATH = "/jobs/:jobid/yarn-stop"; + + @Override + public String[] getPaths() { + return new String[]{JOB_STOPPING_REST_PATH, JOB_STOPPING_YARN_REST_PATH}; + } + @Override public String handleJsonRequest(Map pathParams, Map queryParams, ActorGateway jobManager) throws Exception { try { diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java index ad4e2079d5a12..ccfcbba53e215 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java @@ -29,11 +29,18 @@ public class JobVertexAccumulatorsHandler extends AbstractJobVertexRequestHandler { + + private static final String JOB_VERTEX_ACCUMULATORS_REST_PATH = "/jobs/:jobid/vertices/:vertexid/accumulators"; public JobVertexAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder) { super(executionGraphHolder); } + @Override + public String[] getPaths() { + return new String[]{JOB_VERTEX_ACCUMULATORS_REST_PATH}; + } + @Override public String handleRequest(AccessExecutionJobVertex jobVertex, Map params) throws Exception { StringifiedAccumulatorResult[] accs = jobVertex.getAggregatedUserAccumulatorsStringified(); diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java index c5bacf295ceae..52167e1c57be0 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java @@ -39,6 +39,8 @@ */ public class JobVertexBackPressureHandler extends AbstractJobVertexRequestHandler { + private static final String JOB_VERTEX_BACKPRESSURE_REST_PATH = "/jobs/:jobid/vertices/:vertexid/backpressure"; + /** Back pressure stats tracker. */ private final BackPressureStatsTracker backPressureStatsTracker; @@ -56,6 +58,11 @@ public JobVertexBackPressureHandler( this.refreshInterval = refreshInterval; } + @Override + public String[] getPaths() { + return new String[]{JOB_VERTEX_BACKPRESSURE_REST_PATH}; + } + @Override public String handleRequest( AccessExecutionJobVertex accessJobVertex, diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java index 32626ba7e4125..0a07896fe5dec 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java @@ -39,6 +39,8 @@ */ public class JobVertexDetailsHandler extends AbstractJobVertexRequestHandler { + private static String JOB_VERTEX_DETAILS_REST_PATH = "/jobs/:jobid/vertices/:vertexid"; + private final MetricFetcher fetcher; public JobVertexDetailsHandler(ExecutionGraphHolder executionGraphHolder, MetricFetcher fetcher) { @@ -46,6 +48,11 @@ public JobVertexDetailsHandler(ExecutionGraphHolder executionGraphHolder, Metric this.fetcher = fetcher; } + @Override + public String[] getPaths() { + return new String[]{JOB_VERTEX_DETAILS_REST_PATH}; + } + @Override public String handleRequest(AccessExecutionJobVertex jobVertex, Map params) throws Exception { final long now = System.currentTimeMillis(); diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java index f468d35c6312f..b3dabead8f709 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java @@ -43,6 +43,8 @@ */ public class JobVertexTaskManagersHandler extends AbstractJobVertexRequestHandler { + private static final String JOB_VERTEX_TASKMANAGERS_REST_PATH = "/jobs/:jobid/vertices/:vertexid/taskmanagers"; + private final MetricFetcher fetcher; public JobVertexTaskManagersHandler(ExecutionGraphHolder executionGraphHolder, MetricFetcher fetcher) { @@ -50,6 +52,11 @@ public JobVertexTaskManagersHandler(ExecutionGraphHolder executionGraphHolder, M this.fetcher = fetcher; } + @Override + public String[] getPaths() { + return new String[]{JOB_VERTEX_TASKMANAGERS_REST_PATH}; + } + @Override public String handleRequest(AccessExecutionJobVertex jobVertex, Map params) throws Exception { // Build a map that groups tasks by TaskManager diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java index c56cfc3f26471..b6246e66efe85 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java @@ -49,4 +49,11 @@ public interface RequestHandler { * with the exception stack trace. */ FullHttpResponse handleRequest(Map pathParams, Map queryParams, ActorGateway jobManager) throws Exception; + + /** + * Returns an array of REST URL's under which this handler can be registered. + * + * @return array containing REST URL's under which this handler can be registered. + */ + String[] getPaths(); } diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java index 6d0951359ca05..4cf5f0ff2d7cb 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java @@ -28,11 +28,18 @@ * Request handler providing details about a single task execution attempt. */ public class SubtaskCurrentAttemptDetailsHandler extends SubtaskExecutionAttemptDetailsHandler { + + public static final String SUBTASK_CURRENT_ATTEMPT_DETAILS_REST_PATH = "/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum"; public SubtaskCurrentAttemptDetailsHandler(ExecutionGraphHolder executionGraphHolder, MetricFetcher fetcher) { super(executionGraphHolder, fetcher); } + @Override + public String[] getPaths() { + return new String[]{SUBTASK_CURRENT_ATTEMPT_DETAILS_REST_PATH}; + } + @Override public String handleRequest(AccessExecutionVertex vertex, Map params) throws Exception { return handleRequest(vertex.getCurrentExecutionAttempt(), params); diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java index e613efb199277..ba3a5ee5bbfc9 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java @@ -31,11 +31,18 @@ * via the "vertexid" parameter) in a specific job, defined via (defined voa the "jobid" parameter). */ public class SubtaskExecutionAttemptAccumulatorsHandler extends AbstractSubtaskAttemptRequestHandler { + + private static final String SUBTASK_ATTEMPT_ACCUMULATORS_REST_PATH = "/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt/accumulators"; public SubtaskExecutionAttemptAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder) { super(executionGraphHolder); } + @Override + public String[] getPaths() { + return new String[]{SUBTASK_ATTEMPT_ACCUMULATORS_REST_PATH}; + } + @Override public String handleRequest(AccessExecution execAttempt, Map params) throws Exception { final StringifiedAccumulatorResult[] accs = execAttempt.getUserAccumulatorsStringified(); diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java index da8db02df8033..b753b6e1ea056 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java @@ -37,6 +37,8 @@ */ public class SubtaskExecutionAttemptDetailsHandler extends AbstractSubtaskAttemptRequestHandler { + private static final String SUBTASK_ATTEMPT_DETAILS_REST_PATH = "/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt"; + private final MetricFetcher fetcher; public SubtaskExecutionAttemptDetailsHandler(ExecutionGraphHolder executionGraphHolder, MetricFetcher fetcher) { @@ -44,6 +46,11 @@ public SubtaskExecutionAttemptDetailsHandler(ExecutionGraphHolder executionGraph this.fetcher = fetcher; } + @Override + public String[] getPaths() { + return new String[]{SUBTASK_ATTEMPT_DETAILS_REST_PATH}; + } + @Override public String handleRequest(AccessExecution execAttempt, Map params) throws Exception { final ExecutionState status = execAttempt.getState(); diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java index 892a6063c6ec9..222d4746fa760 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java @@ -33,11 +33,18 @@ * Request handler that returns the accumulators for all subtasks of job vertex. */ public class SubtasksAllAccumulatorsHandler extends AbstractJobVertexRequestHandler { + + private static final String SUBTASKS_ALL_ACCUMULATORS_REST_PATH = "/jobs/:jobid/vertices/:vertexid/subtasks/accumulators"; public SubtasksAllAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder) { super(executionGraphHolder); } + @Override + public String[] getPaths() { + return new String[]{SUBTASKS_ALL_ACCUMULATORS_REST_PATH}; + } + @Override public String handleRequest(AccessExecutionJobVertex jobVertex, Map params) throws Exception { StringWriter writer = new StringWriter(); diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java index 76349eec0480c..e2e35e30227ac 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java @@ -35,11 +35,17 @@ */ public class SubtasksTimesHandler extends AbstractJobVertexRequestHandler { + private static final String SUBTASK_TIMES_REST_PATH = "/jobs/:jobid/vertices/:vertexid/subtasktimes"; public SubtasksTimesHandler(ExecutionGraphHolder executionGraphHolder) { super(executionGraphHolder); } + @Override + public String[] getPaths() { + return new String[]{SUBTASK_TIMES_REST_PATH}; + } + @Override public String handleRequest(AccessExecutionJobVertex jobVertex, Map params) throws Exception { final long now = System.currentTimeMillis(); diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java index 6583d3b92657d..1002bf3055e0d 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java @@ -94,6 +94,9 @@ public class TaskManagerLogHandler extends RuntimeMonitorHandlerBase { private static final Logger LOG = LoggerFactory.getLogger(TaskManagerLogHandler.class); + private static final String TASKMANAGER_LOG_REST_PATH = "/taskmanagers/:taskmanagerid/log"; + private static final String TASKMANAGER_OUT_REST_PATH = "/taskmanagers/:taskmanagerid/stdout"; + /** Keep track of last transmitted log, to clean up old ones */ private final HashMap lastSubmittedLog = new HashMap<>(); private final HashMap lastSubmittedStdout = new HashMap<>(); @@ -141,6 +144,15 @@ public TaskManagerLogHandler( timeTimeout = Time.milliseconds(timeout.toMillis()); } + @Override + public String[] getPaths() { + if (serveLogFile) { + return new String[]{TASKMANAGER_LOG_REST_PATH}; + } else { + return new String[]{TASKMANAGER_OUT_REST_PATH}; + } + } + /** * Response when running with leading JobManager. */ diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java index c757f5c2e7f2f..a23e983c087f8 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java @@ -41,6 +41,9 @@ public class TaskManagersHandler extends AbstractJsonRequestHandler { + private static final String TASKMANAGERS_REST_PATH = "/taskmanagers"; + private static final String TASKMANAGER_DETAILS_REST_PATH = "/taskmanagers/:taskmanagerid"; + public static final String TASK_MANAGER_ID_KEY = "taskmanagerid"; private final FiniteDuration timeout; @@ -52,6 +55,11 @@ public TaskManagersHandler(FiniteDuration timeout, MetricFetcher fetcher) { this.fetcher = fetcher; } + @Override + public String[] getPaths() { + return new String[]{TASKMANAGERS_REST_PATH, TASKMANAGER_DETAILS_REST_PATH}; + } + @Override public String handleJsonRequest(Map pathParams, Map queryParams, ActorGateway jobManager) throws Exception { try { diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java index be0d283781f0b..de40a4a185d5d 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java @@ -34,10 +34,17 @@ */ public class CheckpointConfigHandler extends AbstractExecutionGraphRequestHandler { + private static final String CHECKPOINT_CONFIG_REST_PATH = "/jobs/:jobid/checkpoints/config"; + public CheckpointConfigHandler(ExecutionGraphHolder executionGraphHolder) { super(executionGraphHolder); } + @Override + public String[] getPaths() { + return new String[]{CHECKPOINT_CONFIG_REST_PATH}; + } + @Override public String handleRequest(AccessExecutionGraph graph, Map params) throws Exception { StringWriter writer = new StringWriter(); diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java index 33d6cf7e6cf83..d0c251d587e50 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java @@ -39,6 +39,8 @@ */ public class CheckpointStatsDetailsHandler extends AbstractExecutionGraphRequestHandler { + private static final String CHECKPOINT_STATS_DETAILS_REST_PATH = "/jobs/:jobid/checkpoints/details/:checkpointid"; + private final CheckpointStatsCache cache; public CheckpointStatsDetailsHandler(ExecutionGraphHolder executionGraphHolder, CheckpointStatsCache cache) { @@ -46,6 +48,11 @@ public CheckpointStatsDetailsHandler(ExecutionGraphHolder executionGraphHolder, this.cache = cache; } + @Override + public String[] getPaths() { + return new String[]{CHECKPOINT_STATS_DETAILS_REST_PATH}; + } + @Override public String handleRequest(AccessExecutionGraph graph, Map params) throws Exception { long checkpointId = parseCheckpointId(params); diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java index d55467f74638d..15dd911ae4ec8 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java @@ -44,6 +44,8 @@ */ public class CheckpointStatsDetailsSubtasksHandler extends AbstractExecutionGraphRequestHandler { + private static final String CHECKPOINT_STATS_DETAILS_SUBTASKS_REST_PATH = "/jobs/:jobid/checkpoints/details/:checkpointid/subtasks/:vertexid"; + private final CheckpointStatsCache cache; public CheckpointStatsDetailsSubtasksHandler(ExecutionGraphHolder executionGraphHolder, CheckpointStatsCache cache) { @@ -51,6 +53,11 @@ public CheckpointStatsDetailsSubtasksHandler(ExecutionGraphHolder executionGraph this.cache = checkNotNull(cache); } + @Override + public String[] getPaths() { + return new String[]{CHECKPOINT_STATS_DETAILS_SUBTASKS_REST_PATH}; + } + @Override public String handleJsonRequest( Map pathParams, diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java index 8aab5fa54ce0d..529b38f3ff33e 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java @@ -44,10 +44,17 @@ */ public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler { + private static final String CHECKPOINT_STATS_REST_PATH = "/jobs/:jobid/checkpoints"; + public CheckpointStatsHandler(ExecutionGraphHolder executionGraphHolder) { super(executionGraphHolder); } + @Override + public String[] getPaths() { + return new String[]{CHECKPOINT_STATS_REST_PATH}; + } + @Override public String handleRequest(AccessExecutionGraph graph, Map params) throws Exception { StringWriter writer = new StringWriter(); diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java index 7452c71d3d8b7..f667ce502da58 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java @@ -31,10 +31,18 @@ * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] } */ public class JobManagerMetricsHandler extends AbstractMetricsHandler { + + private static final String JOBMANAGER_METRICS_REST_PATH = "/jobmanager/metrics"; + public JobManagerMetricsHandler(MetricFetcher fetcher) { super(fetcher); } + @Override + public String[] getPaths() { + return new String[]{JOBMANAGER_METRICS_REST_PATH}; + } + @Override protected Map getMapFor(Map pathParams, MetricStore metrics) { MetricStore.JobManagerMetricStore jobManager = metrics.getJobManagerMetricStore(); diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java index d66c954661279..26c9fa9a4c05d 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java @@ -32,11 +32,17 @@ */ public class JobMetricsHandler extends AbstractMetricsHandler { public static final String PARAMETER_JOB_ID = "jobid"; + private static final String JOB_METRICS_REST_PATH = "/jobs/:jobid/metrics"; public JobMetricsHandler(MetricFetcher fetcher) { super(fetcher); } + @Override + public String[] getPaths() { + return new String[]{JOB_METRICS_REST_PATH}; + } + @Override protected Map getMapFor(Map pathParams, MetricStore metrics) { MetricStore.JobMetricStore job = metrics.getJobMetricStore(pathParams.get(PARAMETER_JOB_ID)); diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java index 6fca77106e21d..3e838d755756d 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java @@ -32,11 +32,17 @@ */ public class JobVertexMetricsHandler extends AbstractMetricsHandler { public static final String PARAMETER_VERTEX_ID = "vertexid"; + private static final String JOB_VERTEX_METRICS_REST_PATH = "/jobs/:jobid/vertices/:vertexid/metrics"; public JobVertexMetricsHandler(MetricFetcher fetcher) { super(fetcher); } + @Override + public String[] getPaths() { + return new String[]{JOB_VERTEX_METRICS_REST_PATH}; + } + @Override protected Map getMapFor(Map pathParams, MetricStore metrics) { MetricStore.TaskMetricStore task = metrics.getTaskMetricStore( diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandler.java index f1b2e72077a0a..a74f5f25fdee1 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandler.java @@ -33,10 +33,18 @@ * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] } */ public class TaskManagerMetricsHandler extends AbstractMetricsHandler { + + private static final String TASKMANAGER_METRICS_REST_PATH = "/taskmanagers/:taskmanagerid/metrics"; + public TaskManagerMetricsHandler(MetricFetcher fetcher) { super(fetcher); } + @Override + public String[] getPaths() { + return new String[]{TASKMANAGER_METRICS_REST_PATH}; + } + @Override protected Map getMapFor(Map pathParams, MetricStore metrics) { MetricStore.TaskManagerMetricStore taskManager = metrics.getTaskManagerMetricStore(pathParams.get(TaskManagersHandler.TASK_MANAGER_ID_KEY)); diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandlerTest.java new file mode 100644 index 0000000000000..018ffdd869f68 --- /dev/null +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandlerTest.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.handlers; + +import org.junit.Assert; +import org.junit.Test; +import scala.concurrent.duration.FiniteDuration; + +import java.util.concurrent.TimeUnit; + +public class ClusterOverviewHandlerTest { + @Test + public void testGetPaths() { + ClusterOverviewHandler handler = new ClusterOverviewHandler(new FiniteDuration(0, TimeUnit.SECONDS)); + String[] paths = handler.getPaths(); + Assert.assertEquals(1, paths.length); + Assert.assertEquals("/overview", paths[0]); + } +} diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandlerTest.java new file mode 100644 index 0000000000000..e22564800e1bf --- /dev/null +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandlerTest.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.handlers; + +import org.junit.Assert; +import org.junit.Test; +import scala.concurrent.duration.FiniteDuration; + +import java.util.concurrent.TimeUnit; + +public class CurrentJobIdsHandlerTest { + @Test + public void testGetPaths() { + CurrentJobIdsHandler handler = new CurrentJobIdsHandler(new FiniteDuration(0, TimeUnit.SECONDS)); + String[] paths = handler.getPaths(); + Assert.assertEquals(1, paths.length); + Assert.assertEquals("/jobs", paths[0]); + } +} diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java new file mode 100644 index 0000000000000..3207feca15bae --- /dev/null +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.handlers; + +import org.junit.Assert; +import org.junit.Test; +import scala.concurrent.duration.FiniteDuration; + +import java.util.concurrent.TimeUnit; + +public class CurrentJobsOverviewHandlerTest { + @Test + public void testGetPaths() { + CurrentJobsOverviewHandler handlerAll = new CurrentJobsOverviewHandler(new FiniteDuration(0, TimeUnit.SECONDS), true, true); + String[] pathsAll = handlerAll.getPaths(); + Assert.assertEquals(1, pathsAll.length); + Assert.assertEquals("/joboverview", pathsAll[0]); + + CurrentJobsOverviewHandler handlerRunning = new CurrentJobsOverviewHandler(new FiniteDuration(0, TimeUnit.SECONDS), true, false); + String[] pathsRunning = handlerRunning.getPaths(); + Assert.assertEquals(1, pathsRunning.length); + Assert.assertEquals("/joboverview/running", pathsRunning[0]); + + CurrentJobsOverviewHandler handlerCompleted = new CurrentJobsOverviewHandler(new FiniteDuration(0, TimeUnit.SECONDS), false, true); + String[] pathsCompleted = handlerCompleted.getPaths(); + Assert.assertEquals(1, pathsCompleted.length); + Assert.assertEquals("/joboverview/completed", pathsCompleted[0]); + } +} diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandlerTest.java new file mode 100644 index 0000000000000..aa2d55244d9aa --- /dev/null +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandlerTest.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.handlers; + +import org.junit.Assert; +import org.junit.Test; + +public class DashboardConfigHandlerTest { + @Test + public void testGetPaths() { + DashboardConfigHandler handler = new DashboardConfigHandler(10000L); + String[] paths = handler.getPaths(); + Assert.assertEquals(1, paths.length); + Assert.assertEquals("/config", paths[0]); + } +} diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandlerTest.java new file mode 100644 index 0000000000000..e84926e1dfeaa --- /dev/null +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandlerTest.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.handlers; + +import com.google.common.collect.Lists; +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; + +public class JarAccessDeniedHandlerTest { + @Test + public void testGetPaths() { + JarAccessDeniedHandler handler = new JarAccessDeniedHandler(); + String[] paths = handler.getPaths(); + Assert.assertEquals(5, paths.length); + List pathsList = Lists.newArrayList(paths); + Assert.assertTrue(pathsList.contains("/jars")); + Assert.assertTrue(pathsList.contains("/jars/upload")); + Assert.assertTrue(pathsList.contains("/jars/:jarid")); + Assert.assertTrue(pathsList.contains("/jars/:jarid/plan")); + Assert.assertTrue(pathsList.contains("/jars/:jarid/run")); + } +} diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandlerTest.java new file mode 100644 index 0000000000000..f354e59c5708b --- /dev/null +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandlerTest.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.handlers; + +import org.junit.Assert; +import org.junit.Test; + +public class JarDeleteHandlerTest { + @Test + public void testGetPaths() { + JarDeleteHandler handler = new JarDeleteHandler(null); + String[] paths = handler.getPaths(); + Assert.assertEquals(1, paths.length); + Assert.assertEquals("/jars/:jarid", paths[0]); + } +} diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandlerTest.java new file mode 100644 index 0000000000000..8e97f655dfef9 --- /dev/null +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandlerTest.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.handlers; + +import org.junit.Assert; +import org.junit.Test; + +public class JarListHandlerTest { + @Test + public void testGetPaths() { + JarListHandler handler = new JarListHandler(null); + String[] paths = handler.getPaths(); + Assert.assertEquals(1, paths.length); + Assert.assertEquals("/jars", paths[0]); + } +} diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandlerTest.java new file mode 100644 index 0000000000000..2e6a5a2ce0a25 --- /dev/null +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandlerTest.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.handlers; + +import org.junit.Assert; +import org.junit.Test; + +public class JarPlanHandlerTest { + @Test + public void testGetPaths() { + JarPlanHandler handler = new JarPlanHandler(null); + String[] paths = handler.getPaths(); + Assert.assertEquals(1, paths.length); + Assert.assertEquals("/jars/:jarid/plan", paths[0]); + } +} diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java new file mode 100644 index 0000000000000..73c80eba973be --- /dev/null +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.handlers; + +import org.junit.Assert; +import org.junit.Test; + +public class JarRunHandlerTest { + @Test + public void testGetPaths() { + JarRunHandler handler = new JarRunHandler(null, null, null); + String[] paths = handler.getPaths(); + Assert.assertEquals(1, paths.length); + Assert.assertEquals("/jars/:jarid/run", paths[0]); + } +} diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandlerTest.java new file mode 100644 index 0000000000000..bd8c01726f4bb --- /dev/null +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandlerTest.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.handlers; + +import org.junit.Assert; +import org.junit.Test; + +public class JarUploadHandlerTest { + @Test + public void testGetPaths() { + JarUploadHandler handler = new JarUploadHandler(null); + String[] paths = handler.getPaths(); + Assert.assertEquals(1, paths.length); + Assert.assertEquals("/jars/upload", paths[0]); + } +} diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandlerTest.java new file mode 100644 index 0000000000000..96c7dd5f4ff25 --- /dev/null +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandlerTest.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.handlers; + +import org.junit.Assert; +import org.junit.Test; + +public class JobAccumulatorsHandlerTest { + @Test + public void testGetPaths() { + JobAccumulatorsHandler handler = new JobAccumulatorsHandler(null); + String[] paths = handler.getPaths(); + Assert.assertEquals(1, paths.length); + Assert.assertEquals("/jobs/:jobid/accumulators", paths[0]); + } +} diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandlerTest.java new file mode 100644 index 0000000000000..ea2d1d649159b --- /dev/null +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandlerTest.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.handlers; + +import com.google.common.collect.Lists; +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; + +public class JobCancellationHandlerTest { + @Test + public void testGetPaths() { + JobCancellationHandler handler = new JobCancellationHandler(); + String[] paths = handler.getPaths(); + Assert.assertEquals(2, paths.length); + List pathsList = Lists.newArrayList(paths); + Assert.assertTrue(pathsList.contains("/jobs/:jobid/cancel")); + Assert.assertTrue(pathsList.contains("/jobs/:jobid/yarn-cancel")); + } +} diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlersTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlersTest.java index cebb14e952462..8480497c4f26e 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlersTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlersTest.java @@ -20,6 +20,7 @@ import akka.dispatch.ExecutionContexts$; import akka.dispatch.Futures; +import com.google.common.collect.Lists; import io.netty.handler.codec.http.FullHttpResponse; import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.HttpResponseStatus; @@ -34,6 +35,7 @@ import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; import org.codehaus.jackson.JsonNode; import org.codehaus.jackson.map.ObjectMapper; +import org.junit.Assert; import org.junit.Test; import scala.concurrent.ExecutionContext; import scala.concurrent.Future; @@ -43,6 +45,7 @@ import java.nio.charset.Charset; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import static org.junit.Assert.assertEquals; @@ -57,6 +60,23 @@ public class JobCancellationWithSavepointHandlersTest { private static final ExecutionContext EC = ExecutionContexts$.MODULE$.fromExecutor(Executors.directExecutor()); + @Test + public void testGetPaths() { + JobCancellationWithSavepointHandlers handler = new JobCancellationWithSavepointHandlers(mock(ExecutionGraphHolder.class), EC); + + JobCancellationWithSavepointHandlers.TriggerHandler triggerHandler = handler.getTriggerHandler(); + String[] triggerPaths = triggerHandler.getPaths(); + Assert.assertEquals(2, triggerPaths.length); + List triggerPathsList = Lists.newArrayList(triggerPaths); + Assert.assertTrue(triggerPathsList.contains("/jobs/:jobid/cancel-with-savepoint")); + Assert.assertTrue(triggerPathsList.contains("/jobs/:jobid/cancel-with-savepoint/target-directory/:targetDirectory")); + + JobCancellationWithSavepointHandlers.InProgressHandler progressHandler = handler.getInProgressHandler(); + String[] progressPaths = progressHandler.getPaths(); + Assert.assertEquals(1, progressPaths.length); + Assert.assertEquals("/jobs/:jobid/cancel-with-savepoint/in-progress/:requestId", progressPaths[0]); + } + /** * Tests that the cancellation ask timeout respects the checkpoint timeout. * Otherwise, AskTimeoutExceptions are bound to happen for large state. diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandlerTest.java new file mode 100644 index 0000000000000..47ea6bff4ca6d --- /dev/null +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandlerTest.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.handlers; + +import org.junit.Assert; +import org.junit.Test; + +public class JobConfigHandlerTest { + @Test + public void testGetPaths() { + JobConfigHandler handler = new JobConfigHandler(null); + String[] paths = handler.getPaths(); + Assert.assertEquals(1, paths.length); + Assert.assertEquals("/jobs/:jobid/config", paths[0]); + } +} diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandlerTest.java new file mode 100644 index 0000000000000..b56bd648e24d8 --- /dev/null +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandlerTest.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.handlers; + +import com.google.common.collect.Lists; +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; + +public class JobDetailsHandlerTest { + @Test + public void testGetPaths() { + JobDetailsHandler handler = new JobDetailsHandler(null, null); + String[] paths = handler.getPaths(); + Assert.assertEquals(2, paths.length); + List pathsList = Lists.newArrayList(paths); + Assert.assertTrue(pathsList.contains("/jobs/:jobid")); + Assert.assertTrue(pathsList.contains("/jobs/:jobid/vertices")); + } +} diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java new file mode 100644 index 0000000000000..850971a901d11 --- /dev/null +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.handlers; + +import org.junit.Assert; +import org.junit.Test; + +public class JobExceptionsHandlerTest { + @Test + public void testGetPaths() { + JobExceptionsHandler handler = new JobExceptionsHandler(null); + String[] paths = handler.getPaths(); + Assert.assertEquals(1, paths.length); + Assert.assertEquals("/jobs/:jobid/exceptions", paths[0]); + } +} diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandlerTest.java new file mode 100644 index 0000000000000..cfb45c392c5d2 --- /dev/null +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandlerTest.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.handlers; + +import org.junit.Assert; +import org.junit.Test; + +public class JobManagerConfigHandlerTest { + @Test + public void testGetPaths() { + JobManagerConfigHandler handler = new JobManagerConfigHandler(null); + String[] paths = handler.getPaths(); + Assert.assertEquals(1, paths.length); + Assert.assertEquals("/jobmanager/config", paths[0]); + } +} diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandlerTest.java new file mode 100644 index 0000000000000..42808ed82c12d --- /dev/null +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandlerTest.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.handlers; + +import org.junit.Assert; +import org.junit.Test; + +public class JobPlanHandlerTest { + @Test + public void testGetPaths() { + JobPlanHandler handler = new JobPlanHandler(null); + String[] paths = handler.getPaths(); + Assert.assertEquals(1, paths.length); + Assert.assertEquals("/jobs/:jobid/plan", paths[0]); + } +} diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandlerTest.java new file mode 100644 index 0000000000000..81b4528f81bc8 --- /dev/null +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandlerTest.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.handlers; + +import com.google.common.collect.Lists; +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; + +public class JobStoppingHandlerTest { + @Test + public void testGetPaths() { + JobStoppingHandler handler = new JobStoppingHandler(); + String[] paths = handler.getPaths(); + Assert.assertEquals(2, paths.length); + List pathsList = Lists.newArrayList(paths); + Assert.assertTrue(pathsList.contains("/jobs/:jobid/stop")); + Assert.assertTrue(pathsList.contains("/jobs/:jobid/yarn-stop")); + } +} diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandlerTest.java new file mode 100644 index 0000000000000..d5138360e119f --- /dev/null +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandlerTest.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.handlers; + +import org.junit.Assert; +import org.junit.Test; + +public class JobVertexAccumulatorsHandlerTest { + @Test + public void testGetPaths() { + JobVertexAccumulatorsHandler handler = new JobVertexAccumulatorsHandler(null); + String[] paths = handler.getPaths(); + Assert.assertEquals(1, paths.length); + Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/accumulators", paths[0]); + } +} diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandlerTest.java index 2b8f804d23c35..f524e0a6cbafa 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandlerTest.java @@ -24,6 +24,7 @@ import org.apache.flink.runtime.webmonitor.BackPressureStatsTracker; import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; import org.apache.flink.runtime.webmonitor.OperatorBackPressureStats; +import org.junit.Assert; import org.junit.Test; import scala.Option; @@ -41,6 +42,13 @@ * Tests for back pressure handler responses. */ public class JobVertexBackPressureHandlerTest { + @Test + public void testGetPaths() { + JobVertexBackPressureHandler handler = new JobVertexBackPressureHandler(null, mock(BackPressureStatsTracker.class), 0); + String[] paths = handler.getPaths(); + Assert.assertEquals(1, paths.length); + Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/backpressure", paths[0]); + } /** Tests the response when no stats are available */ @Test diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandlerTest.java new file mode 100644 index 0000000000000..d20d7360680a3 --- /dev/null +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandlerTest.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.handlers; + +import org.junit.Assert; +import org.junit.Test; + +public class JobVertexDetailsHandlerTest { + @Test + public void testGetPaths() { + JobVertexDetailsHandler handler = new JobVertexDetailsHandler(null, null); + String[] paths = handler.getPaths(); + Assert.assertEquals(1, paths.length); + Assert.assertEquals("/jobs/:jobid/vertices/:vertexid", paths[0]); + } +} diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandlerTest.java new file mode 100644 index 0000000000000..e56a517ca1e12 --- /dev/null +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandlerTest.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.handlers; + +import org.junit.Assert; +import org.junit.Test; + +public class JobVertexTaskManagersHandlerTest { + @Test + public void testGetPaths() { + JobVertexTaskManagersHandler handler = new JobVertexTaskManagersHandler(null, null); + String[] paths = handler.getPaths(); + Assert.assertEquals(1, paths.length); + Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/taskmanagers", paths[0]); + } +} diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandlerTest.java new file mode 100644 index 0000000000000..277696fc6c98a --- /dev/null +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandlerTest.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.handlers; + +import org.junit.Assert; +import org.junit.Test; + +public class SubtaskCurrentAttemptDetailsHandlerTest { + @Test + public void testGetPaths() { + SubtaskCurrentAttemptDetailsHandler handler = new SubtaskCurrentAttemptDetailsHandler(null, null); + String[] paths = handler.getPaths(); + Assert.assertEquals(1, paths.length); + Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum", paths[0]); + } +} diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandlerTest.java new file mode 100644 index 0000000000000..0b6038fb6e2fd --- /dev/null +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandlerTest.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.handlers; + +import org.junit.Assert; +import org.junit.Test; + +public class SubtaskExecutionAttemptAccumulatorsHandlerTest { + @Test + public void testGetPaths() { + SubtaskExecutionAttemptAccumulatorsHandler handler = new SubtaskExecutionAttemptAccumulatorsHandler(null); + String[] paths = handler.getPaths(); + Assert.assertEquals(1, paths.length); + Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt/accumulators", paths[0]); + } +} diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandlerTest.java new file mode 100644 index 0000000000000..e9c9f84d80a54 --- /dev/null +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandlerTest.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.handlers; + +import org.junit.Assert; +import org.junit.Test; + +public class SubtaskExecutionAttemptDetailsHandlerTest { + @Test + public void testGetPaths() { + SubtaskExecutionAttemptDetailsHandler handler = new SubtaskExecutionAttemptDetailsHandler(null, null); + String[] paths = handler.getPaths(); + Assert.assertEquals(1, paths.length); + Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt", paths[0]); + } +} diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskTimesHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskTimesHandlerTest.java new file mode 100644 index 0000000000000..f425e18c2337f --- /dev/null +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskTimesHandlerTest.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.handlers; + +import org.junit.Assert; +import org.junit.Test; + +public class SubtaskTimesHandlerTest { + @Test + public void testGetPaths() { + SubtasksTimesHandler handler = new SubtasksTimesHandler(null); + String[] paths = handler.getPaths(); + Assert.assertEquals(1, paths.length); + Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasktimes", paths[0]); + } +} diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandlerTest.java new file mode 100644 index 0000000000000..1efb2607f1f7b --- /dev/null +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandlerTest.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.handlers; + +import org.junit.Assert; +import org.junit.Test; + +public class SubtasksAllAccumulatorsHandlerTest { + @Test + public void testGetPaths() { + SubtasksAllAccumulatorsHandler handler = new SubtasksAllAccumulatorsHandler(null); + String[] paths = handler.getPaths(); + Assert.assertEquals(1, paths.length); + Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasks/accumulators", paths[0]); + } +} diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java index c1e963ea61518..1db13691437af 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java @@ -44,6 +44,7 @@ import scala.Option; import scala.collection.JavaConverters; import scala.concurrent.ExecutionContext$; +import scala.concurrent.ExecutionContextExecutor; import scala.concurrent.Future$; import scala.concurrent.duration.FiniteDuration; @@ -60,6 +61,33 @@ import static org.powermock.api.mockito.PowerMockito.when; public class TaskManagerLogHandlerTest { + @Test + public void testGetPaths() { + TaskManagerLogHandler handlerLog = new TaskManagerLogHandler( + mock(JobManagerRetriever.class), + mock(ExecutionContextExecutor.class), + Future$.MODULE$.successful("/jm/address"), + AkkaUtils.getDefaultClientTimeout(), + TaskManagerLogHandler.FileMode.LOG, + new Configuration(), + false); + String[] pathsLog = handlerLog.getPaths(); + Assert.assertEquals(1, pathsLog.length); + Assert.assertEquals("/taskmanagers/:taskmanagerid/log", pathsLog[0]); + + TaskManagerLogHandler handlerOut = new TaskManagerLogHandler( + mock(JobManagerRetriever.class), + mock(ExecutionContextExecutor.class), + Future$.MODULE$.successful("/jm/address"), + AkkaUtils.getDefaultClientTimeout(), + TaskManagerLogHandler.FileMode.STDOUT, + new Configuration(), + false); + String[] pathsOut = handlerOut.getPaths(); + Assert.assertEquals(1, pathsOut.length); + Assert.assertEquals("/taskmanagers/:taskmanagerid/stdout", pathsOut[0]); + } + @Test public void testLogFetchingFailure() throws Exception { // ========= setup TaskManager ================================================================================= diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandlerTest.java new file mode 100644 index 0000000000000..5818954b18031 --- /dev/null +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandlerTest.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.handlers; + +import com.google.common.collect.Lists; +import org.junit.Assert; +import org.junit.Test; +import scala.concurrent.duration.FiniteDuration; + +import java.util.List; +import java.util.concurrent.TimeUnit; + +public class TaskManagersHandlerTest { + @Test + public void testGetPaths() { + TaskManagersHandler handler = new TaskManagersHandler(new FiniteDuration(0, TimeUnit.SECONDS), null); + String[] paths = handler.getPaths(); + Assert.assertEquals(2, paths.length); + List pathsList = Lists.newArrayList(paths); + Assert.assertTrue(pathsList.contains("/taskmanagers")); + Assert.assertTrue(pathsList.contains("/taskmanagers/:taskmanagerid")); + } +} diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java index e517c3c84b56f..f866e3ca15bc5 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java @@ -25,6 +25,7 @@ import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings; import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings; import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; +import org.junit.Assert; import org.junit.Test; import java.util.Collections; @@ -36,6 +37,14 @@ public class CheckpointConfigHandlerTest { + @Test + public void testGetPaths() { + CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphHolder.class)); + String[] paths = handler.getPaths(); + Assert.assertEquals(1, paths.length); + Assert.assertEquals("/jobs/:jobid/checkpoints/config", paths[0]); + } + /** * Tests a simple config. */ diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java index fb5cfc51ccc0a..a485d41e859c8 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java @@ -32,6 +32,7 @@ import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; +import org.junit.Assert; import org.junit.Test; import java.util.ArrayList; @@ -50,6 +51,14 @@ public class CheckpointStatsDetailsHandlerTest { + @Test + public void testGetPaths() { + CheckpointStatsDetailsHandler handler = new CheckpointStatsDetailsHandler(mock(ExecutionGraphHolder.class), new CheckpointStatsCache(0)); + String[] paths = handler.getPaths(); + Assert.assertEquals(1, paths.length); + Assert.assertEquals("/jobs/:jobid/checkpoints/details/:checkpointid", paths[0]); + } + /** * Tests request with illegal checkpoint ID param. */ diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java index 23a190065c512..89eac08707379 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java @@ -34,6 +34,7 @@ import org.apache.flink.runtime.checkpoint.RestoredCheckpointStats; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; +import org.junit.Assert; import org.junit.Test; import java.util.ArrayList; @@ -49,6 +50,14 @@ public class CheckpointStatsHandlerTest { + @Test + public void testGetPaths() { + CheckpointStatsHandler handler = new CheckpointStatsHandler(mock(ExecutionGraphHolder.class)); + String[] paths = handler.getPaths(); + Assert.assertEquals(1, paths.length); + Assert.assertEquals("/jobs/:jobid/checkpoints", paths[0]); + } + /** * Tests a complete checkpoint stats snapshot. */ diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java index 571adad400ef2..26433fac7f875 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java @@ -31,6 +31,7 @@ import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; +import org.junit.Assert; import org.junit.Test; import java.util.Collections; @@ -53,6 +54,14 @@ public class CheckpointStatsSubtaskDetailsHandlerTest { + @Test + public void testGetPaths() { + CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), new CheckpointStatsCache(0)); + String[] paths = handler.getPaths(); + Assert.assertEquals(1, paths.length); + Assert.assertEquals("/jobs/:jobid/checkpoints/details/:checkpointid/subtasks/:vertexid", paths[0]); + } + /** * Tests a subtask details request. */ diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandlerTest.java index d0ffc81e48651..89df2d9866296 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandlerTest.java @@ -19,7 +19,9 @@ import akka.actor.ActorSystem; import org.apache.flink.runtime.webmonitor.JobManagerRetriever; +import org.apache.flink.runtime.webmonitor.handlers.JobVertexAccumulatorsHandler; import org.apache.flink.util.TestLogger; +import org.junit.Assert; import org.junit.Test; import scala.concurrent.ExecutionContext; @@ -31,6 +33,14 @@ import static org.powermock.api.mockito.PowerMockito.mock; public class JobManagerMetricsHandlerTest extends TestLogger { + @Test + public void testGetPaths() { + JobManagerMetricsHandler handler = new JobManagerMetricsHandler(mock(MetricFetcher.class)); + String[] paths = handler.getPaths(); + Assert.assertEquals(1, paths.length); + Assert.assertEquals("/jobmanager/metrics", paths[0]); + } + @Test public void getMapFor() { MetricFetcher fetcher = new MetricFetcher(mock(ActorSystem.class), mock(JobManagerRetriever.class), mock(ExecutionContext.class)); diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandlerTest.java index 9391dc0625920..d02470db786bc 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandlerTest.java @@ -19,7 +19,9 @@ import akka.actor.ActorSystem; import org.apache.flink.runtime.webmonitor.JobManagerRetriever; +import org.apache.flink.runtime.webmonitor.handlers.JobVertexAccumulatorsHandler; import org.apache.flink.util.TestLogger; +import org.junit.Assert; import org.junit.Test; import scala.concurrent.ExecutionContext; @@ -32,6 +34,14 @@ import static org.powermock.api.mockito.PowerMockito.mock; public class JobMetricsHandlerTest extends TestLogger { + @Test + public void testGetPaths() { + JobMetricsHandler handler = new JobMetricsHandler(mock(MetricFetcher.class)); + String[] paths = handler.getPaths(); + Assert.assertEquals(1, paths.length); + Assert.assertEquals("/jobs/:jobid/metrics", paths[0]); + } + @Test public void getMapFor() throws Exception { MetricFetcher fetcher = new MetricFetcher(mock(ActorSystem.class), mock(JobManagerRetriever.class), mock(ExecutionContext.class)); diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandlerTest.java index a7f90844e1df1..fb82482f6a382 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandlerTest.java @@ -19,7 +19,9 @@ import akka.actor.ActorSystem; import org.apache.flink.runtime.webmonitor.JobManagerRetriever; +import org.apache.flink.runtime.webmonitor.handlers.JobVertexAccumulatorsHandler; import org.apache.flink.util.TestLogger; +import org.junit.Assert; import org.junit.Test; import scala.concurrent.ExecutionContext; @@ -33,6 +35,14 @@ import static org.powermock.api.mockito.PowerMockito.mock; public class JobVertexMetricsHandlerTest extends TestLogger { + @Test + public void testGetPaths() { + JobVertexMetricsHandler handler = new JobVertexMetricsHandler(mock(MetricFetcher.class)); + String[] paths = handler.getPaths(); + Assert.assertEquals(1, paths.length); + Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/metrics", paths[0]); + } + @Test public void getMapFor() throws Exception { MetricFetcher fetcher = new MetricFetcher(mock(ActorSystem.class), mock(JobManagerRetriever.class), mock(ExecutionContext.class)); diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandlerTest.java index 12c9f3f7ca999..48b34722d5d9d 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandlerTest.java @@ -19,7 +19,9 @@ import akka.actor.ActorSystem; import org.apache.flink.runtime.webmonitor.JobManagerRetriever; +import org.apache.flink.runtime.webmonitor.handlers.JobVertexAccumulatorsHandler; import org.apache.flink.util.TestLogger; +import org.junit.Assert; import org.junit.Test; import scala.concurrent.ExecutionContext; @@ -32,6 +34,14 @@ import static org.powermock.api.mockito.PowerMockito.mock; public class TaskManagerMetricsHandlerTest extends TestLogger { + @Test + public void testGetPaths() { + TaskManagerMetricsHandler handler = new TaskManagerMetricsHandler(mock(MetricFetcher.class)); + String[] paths = handler.getPaths(); + Assert.assertEquals(1, paths.length); + Assert.assertEquals("/taskmanagers/:taskmanagerid/metrics", paths[0]); + } + @Test public void getMapFor() throws Exception { MetricFetcher fetcher = new MetricFetcher(mock(ActorSystem.class), mock(JobManagerRetriever.class), mock(ExecutionContext.class));