From 365ae7e0398765c75dcb5a906292426584b67df0 Mon Sep 17 00:00:00 2001 From: Sachin Goel Date: Sun, 1 Nov 2015 15:54:13 +0530 Subject: [PATCH 1/2] [FLINK-2939][web-dashboard] Add support for cancelling jobs from the dashboard --- .../runtime/webmonitor/WebRuntimeMonitor.java | 6 ++- .../handlers/JobCancellationHandler.java | 46 +++++++++++++++++++ .../web-dashboard/app/partials/jobs/job.jade | 6 ++- .../app/scripts/modules/jobs/jobs.ctrl.coffee | 4 ++ .../app/scripts/modules/jobs/jobs.svc.coffee | 3 ++ .../web-dashboard/app/styles/index.styl | 5 +- .../web-dashboard/web/css/index.css | 2 +- .../web-dashboard/web/js/index.js | 13 +++++- .../web-dashboard/web/partials/jobs/job.html | 1 + 9 files changed, 77 insertions(+), 9 deletions(-) create mode 100644 flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java index e69165d2d03f2..eed781e61982c 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java @@ -36,6 +36,7 @@ import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler; import org.apache.flink.runtime.webmonitor.handlers.JobAccumulatorsHandler; +import org.apache.flink.runtime.webmonitor.handlers.JobCancellationHandler; import org.apache.flink.runtime.webmonitor.handlers.JobManagerConfigHandler; import org.apache.flink.runtime.webmonitor.handlers.JobPlanHandler; import org.apache.flink.runtime.webmonitor.handlers.JobConfigHandler; @@ -186,7 +187,10 @@ public WebRuntimeMonitor( .GET("/jobmanager/stdout", new StaticFileServerHandler(retriever, jobManagerAddressPromise.future(), timeout, logFiles.stdOutFile)) // this handler serves all the static contents - .GET("/:*", new StaticFileServerHandler(retriever, jobManagerAddressPromise.future(), timeout, webRootDir)); + .GET("/:*", new StaticFileServerHandler(retriever, jobManagerAddressPromise.future(), timeout, webRootDir)) + + // cancel a job + .DELETE("/jobs/:jobid", handler(new JobCancellationHandler())); synchronized (startupShutdownLock) { diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java new file mode 100644 index 0000000000000..20f28bbe24eba --- /dev/null +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor.handlers; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.util.StringUtils; + +import java.util.Map; + +public class JobCancellationHandler implements RequestHandler, RequestHandler.JsonResponse { + + @Override + public String handleRequest(Map params, ActorGateway jobManager) throws Exception { + try { + JobID jobid = new JobID(StringUtils.hexStringToByte(params.get("jobid"))); + if (jobManager != null) { + jobManager.tell(new JobManagerMessages.CancelJob(jobid)); + return ""; + } + else { + throw new Exception("No connection to the leading JobManager."); + } + } + catch (Exception e) { + throw new RuntimeException("Failed to cancel the job with id: " + params.get("jobid") + e.getMessage(), e); + } + } +} diff --git a/flink-runtime-web/web-dashboard/app/partials/jobs/job.jade b/flink-runtime-web/web-dashboard/app/partials/jobs/job.jade index d0291a4d9b89e..bdfd7eaf580bf 100644 --- a/flink-runtime-web/web-dashboard/app/partials/jobs/job.jade +++ b/flink-runtime-web/web-dashboard/app/partials/jobs/job.jade @@ -35,10 +35,14 @@ nav.navbar.navbar-default.navbar-fixed-top.navbar-main(ng-if="job") span(ng-if="job['end-time'] > -1") | - | {{ job['end-time'] | amDateFormat:'YYYY-MM-DD, H:mm:ss' }} - + .navbar-info.last.first(ng-if="job.duration > -1") | {{job.duration}} ms + .navbar-info.last.first(ng-if="job.state=='RUNNING' || job.state=='CREATED'") + span.show-pointer.label.label-danger(ng-click="cancelJob($event)") + | Cancel + nav.navbar.navbar-default.navbar-fixed-top.navbar-main-additional(ng-if="job") ul.nav.nav-tabs li(ui-sref-active='active') diff --git a/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.ctrl.coffee b/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.ctrl.coffee index 037a7e8c6b7d3..e18a5fcf1f5ba 100644 --- a/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.ctrl.coffee +++ b/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.ctrl.coffee @@ -70,6 +70,10 @@ angular.module('flinkApp') $interval.cancel(refresher) + $scope.cancelJob = (cancelEvent) -> + angular.element(cancelEvent.currentTarget).removeClass('label-danger').addClass('label-info').html('Cancelling...') + JobsService.cancelJob($stateParams.jobid).then (data) -> + {} # -------------------------------------- diff --git a/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.svc.coffee b/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.svc.coffee index 2e45cd99f1947..698437f3650cd 100644 --- a/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.svc.coffee +++ b/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.svc.coffee @@ -211,4 +211,7 @@ angular.module('flinkApp') deferred.promise + @cancelJob = (jobid) -> + $http.delete "jobs/" + jobid + @ diff --git a/flink-runtime-web/web-dashboard/app/styles/index.styl b/flink-runtime-web/web-dashboard/app/styles/index.styl index 25e1170569499..7d97efc6bd137 100644 --- a/flink-runtime-web/web-dashboard/app/styles/index.styl +++ b/flink-runtime-web/web-dashboard/app/styles/index.styl @@ -420,8 +420,5 @@ livechart #non-heap-mem background-color: #90ed7d -a.show-pointer +.show-pointer cursor: pointer - - - diff --git a/flink-runtime-web/web-dashboard/web/css/index.css b/flink-runtime-web/web-dashboard/web/css/index.css index 43f7d980a59cd..101f4cb8ef0c0 100644 --- a/flink-runtime-web/web-dashboard/web/css/index.css +++ b/flink-runtime-web/web-dashboard/web/css/index.css @@ -591,6 +591,6 @@ svg.graph .node-label { #non-heap-mem { background-color: #90ed7d; } -a.show-pointer { +.show-pointer { cursor: pointer; } diff --git a/flink-runtime-web/web-dashboard/web/js/index.js b/flink-runtime-web/web-dashboard/web/js/index.js index 2554a3b43121b..32c608a1f51e6 100644 --- a/flink-runtime-web/web-dashboard/web/js/index.js +++ b/flink-runtime-web/web-dashboard/web/js/index.js @@ -432,12 +432,18 @@ angular.module('flinkApp').controller('RunningJobsController', ["$scope", "$stat return $scope.$broadcast('reload'); }); }, flinkConfig["refresh-interval"]); - return $scope.$on('$destroy', function() { + $scope.$on('$destroy', function() { $scope.job = null; $scope.plan = null; $scope.vertices = null; return $interval.cancel(refresher); }); + return $scope.cancelJob = function(cancelEvent) { + angular.element(cancelEvent.currentTarget).removeClass('label-danger').addClass('label-info').html('Cancelling...'); + return JobsService.cancelJob($stateParams.jobid).then(function(data) { + return {}; + }); + }; }]).controller('JobPlanController', ["$scope", "$state", "$stateParams", "JobsService", function($scope, $state, $stateParams, JobsService) { console.log('JobPlanController'); $scope.nodeid = null; @@ -1191,6 +1197,9 @@ angular.module('flinkApp').service('JobsService', ["$http", "flinkConfig", "$log })(this)); return deferred.promise; }; + this.cancelJob = function(jobid) { + return $http["delete"]("jobs/" + jobid); + }; return this; }]); @@ -1396,4 +1405,4 @@ angular.module('flinkApp').service('TaskManagersService', ["$http", "flinkConfig return this; }]); -//# sourceMappingURL=data:application/json;base64, +//# sourceMappingURL=data:application/json;base64, diff --git a/flink-runtime-web/web-dashboard/web/partials/jobs/job.html b/flink-runtime-web/web-dashboard/web/partials/jobs/job.html index 4134274c114f2..d53bfc9e9420a 100644 --- a/flink-runtime-web/web-dashboard/web/partials/jobs/job.html +++ b/flink-runtime-web/web-dashboard/web/partials/jobs/job.html @@ -32,6 +32,7 @@ - {{ job['end-time'] | amDateFormat:'YYYY-MM-DD, H:mm:ss' }} +