From e5b31bb0e9bc386c0b03c610ef988e243d5f2a8f Mon Sep 17 00:00:00 2001 From: zentol Date: Mon, 20 Mar 2017 18:41:26 +0100 Subject: [PATCH 1/4] [refactor] Move netty setup into separate class --- .../runtime/webmonitor/WebRuntimeMonitor.java | 93 +---------- .../utils/WebFrontendBootstrap.java | 146 ++++++++++++++++++ 2 files changed, 152 insertions(+), 87 deletions(-) create mode 100644 flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java index d88fdcf218fba..39bca71004662 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java @@ -20,18 +20,7 @@ import akka.actor.ActorSystem; -import io.netty.bootstrap.ServerBootstrap; -import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.channel.socket.SocketChannel; -import io.netty.channel.socket.nio.NioServerSocketChannel; -import io.netty.handler.codec.http.HttpServerCodec; -import io.netty.handler.codec.http.router.Handler; import io.netty.handler.codec.http.router.Router; -import io.netty.handler.ssl.SslHandler; -import io.netty.handler.stream.ChunkedWriteHandler; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.ConfigConstants; @@ -84,6 +73,7 @@ import org.apache.flink.runtime.webmonitor.metrics.JobVertexMetricsHandler; import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher; import org.apache.flink.runtime.webmonitor.metrics.TaskManagerMetricsHandler; +import org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrap; import org.apache.flink.util.FileUtils; import org.slf4j.Logger; @@ -95,10 +85,8 @@ import scala.concurrent.duration.FiniteDuration; import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLEngine; import java.io.File; import java.io.IOException; -import java.net.InetSocketAddress; import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.ForkJoinPool; @@ -137,13 +125,10 @@ public class WebRuntimeMonitor implements WebMonitor { private final SSLContext serverSSLContext; - private final ServerBootstrap bootstrap; - private final Promise jobManagerAddressPromise = new scala.concurrent.impl.Promise.DefaultPromise<>(); private final FiniteDuration timeout; - - private Channel serverChannel; + private final WebFrontendBootstrap netty; private final File webRootDir; @@ -380,52 +365,7 @@ public void run() { LOG.warn("Error while adding shutdown hook", t); } - final Configuration sslConfig = config; - ChannelInitializer initializer = new ChannelInitializer() { - - @Override - protected void initChannel(SocketChannel ch) { - Handler handler = new Handler(router); - - // SSL should be the first handler in the pipeline - if (serverSSLContext != null) { - SSLEngine sslEngine = serverSSLContext.createSSLEngine(); - SSLUtils.setSSLVerAndCipherSuites(sslEngine, sslConfig); - sslEngine.setUseClientMode(false); - ch.pipeline().addLast("ssl", new SslHandler(sslEngine)); - } - - ch.pipeline() - .addLast(new HttpServerCodec()) - .addLast(new ChunkedWriteHandler()) - .addLast(new HttpRequestHandler(uploadDir)) - .addLast(handler.name(), handler) - .addLast(new PipelineErrorHandler(LOG)); - } - }; - - NioEventLoopGroup bossGroup = new NioEventLoopGroup(1); - NioEventLoopGroup workerGroup = new NioEventLoopGroup(); - - this.bootstrap = new ServerBootstrap(); - this.bootstrap - .group(bossGroup, workerGroup) - .channel(NioServerSocketChannel.class) - .childHandler(initializer); - - ChannelFuture ch; - if (configuredAddress == null) { - ch = this.bootstrap.bind(configuredPort); - } else { - ch = this.bootstrap.bind(configuredAddress, configuredPort); - } - this.serverChannel = ch.sync().channel(); - - InetSocketAddress bindAddress = (InetSocketAddress) serverChannel.localAddress(); - String address = bindAddress.getAddress().getHostAddress(); - int port = bindAddress.getPort(); - - LOG.info("Web frontend listening at " + address + ':' + port); + this.netty = new WebFrontendBootstrap(router, LOG, uploadDir, serverSSLContext, configuredAddress, configuredPort, config); } /** @@ -482,7 +422,7 @@ public void start(String jobManagerAkkaUrl) throws Exception { // this here repeatedly, because cache clean up only happens on // interactions with the cache. We need it to make sure that we // don't leak memory after completed jobs or long ago accessed stats. - bootstrap.childGroup().scheduleWithFixedDelay(new Runnable() { + netty.getBootstrap().childGroup().scheduleWithFixedDelay(new Runnable() { @Override public void run() { try { @@ -500,18 +440,7 @@ public void stop() throws Exception { synchronized (startupShutdownLock) { leaderRetrievalService.stop(); - if (this.serverChannel != null) { - this.serverChannel.close().awaitUninterruptibly(); - this.serverChannel = null; - } - if (bootstrap != null) { - if (bootstrap.group() != null) { - bootstrap.group().shutdownGracefully(); - } - if (bootstrap.childGroup() != null) { - bootstrap.childGroup().shutdownGracefully(); - } - } + netty.shutdown(); stackTraceSamples.shutDown(); @@ -525,17 +454,7 @@ public void stop() throws Exception { @Override public int getServerPort() { - Channel server = this.serverChannel; - if (server != null) { - try { - return ((InetSocketAddress) server.localAddress()).getPort(); - } - catch (Exception e) { - LOG.error("Cannot access local server port", e); - } - } - - return -1; + return netty.getServerPort(); } private void cleanup() { diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java new file mode 100644 index 0000000000000..19ec08ad316cf --- /dev/null +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.utils; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.http.HttpServerCodec; +import io.netty.handler.codec.http.router.Handler; +import io.netty.handler.codec.http.router.Router; +import io.netty.handler.ssl.SslHandler; +import io.netty.handler.stream.ChunkedWriteHandler; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.net.SSLUtils; +import org.apache.flink.runtime.webmonitor.HttpRequestHandler; +import org.apache.flink.runtime.webmonitor.PipelineErrorHandler; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; + +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLEngine; +import java.io.File; +import java.net.InetSocketAddress; + +/** + * This classes encapsulates the boot-strapping of netty for the web-frontend. + */ +public class WebFrontendBootstrap { + private final Router router; + private final Logger log; + private final File uploadDir; + private final SSLContext serverSSLContext; + private final ServerBootstrap bootstrap; + private final Channel serverChannel; + + public WebFrontendBootstrap( + Router router, + Logger log, + File directory, + SSLContext sslContext, + String configuredAddress, + int configuredPort, + final Configuration config) throws InterruptedException { + this.router = Preconditions.checkNotNull(router); + this.log = Preconditions.checkNotNull(log); + this.uploadDir = Preconditions.checkNotNull(directory); + this.serverSSLContext = sslContext; + + ChannelInitializer initializer = new ChannelInitializer() { + + @Override + protected void initChannel(SocketChannel ch) { + Handler handler = new Handler(WebFrontendBootstrap.this.router); + + // SSL should be the first handler in the pipeline + if (serverSSLContext != null) { + SSLEngine sslEngine = serverSSLContext.createSSLEngine(); + SSLUtils.setSSLVerAndCipherSuites(sslEngine, config); + sslEngine.setUseClientMode(false); + ch.pipeline().addLast("ssl", new SslHandler(sslEngine)); + } + + ch.pipeline() + .addLast(new HttpServerCodec()) + .addLast(new ChunkedWriteHandler()) + .addLast(new HttpRequestHandler(uploadDir)) + .addLast(handler.name(), handler) + .addLast(new PipelineErrorHandler(WebFrontendBootstrap.this.log)); + } + }; + + NioEventLoopGroup bossGroup = new NioEventLoopGroup(1); + NioEventLoopGroup workerGroup = new NioEventLoopGroup(); + + this.bootstrap = new ServerBootstrap(); + this.bootstrap + .group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .childHandler(initializer); + + ChannelFuture ch; + if (configuredAddress == null) { + ch = this.bootstrap.bind(configuredPort); + } else { + ch = this.bootstrap.bind(configuredAddress, configuredPort); + } + this.serverChannel = ch.sync().channel(); + + InetSocketAddress bindAddress = (InetSocketAddress) serverChannel.localAddress(); + String address = bindAddress.getAddress().getHostAddress(); + int port = bindAddress.getPort(); + + this.log.info("Web frontend listening at {}" + ':' + "{}", address, port); + } + + public ServerBootstrap getBootstrap() { + return bootstrap; + } + + public int getServerPort() { + Channel server = this.serverChannel; + if (server != null) { + try { + return ((InetSocketAddress) server.localAddress()).getPort(); + } + catch (Exception e) { + log.error("Cannot access local server port", e); + } + } + + return -1; + } + + public void shutdown() { + if (this.serverChannel != null) { + this.serverChannel.close().awaitUninterruptibly(); + } + if (bootstrap != null) { + if (bootstrap.group() != null) { + bootstrap.group().shutdownGracefully(); + } + if (bootstrap.childGroup() != null) { + bootstrap.childGroup().shutdownGracefully(); + } + } + } +} From 130078bf3f158196866aa96c0e0e4323497251b6 Mon Sep 17 00:00:00 2001 From: zentol Date: Mon, 20 Mar 2017 18:41:54 +0100 Subject: [PATCH 2/4] [FLINK-1579] Implement History Server --- docs/monitoring/back_pressure.md | 2 +- docs/monitoring/checkpoint_monitoring.md | 2 +- docs/monitoring/debugging_classloading.md | 2 +- docs/monitoring/debugging_event_time.md | 2 +- docs/monitoring/historyserver.md | 98 ++++++ docs/monitoring/large_state_tuning.md | 2 +- docs/setup/config.md | 17 + .../configuration/HistoryServerOptions.java | 82 +++++ .../src/main/flink-bin/bin/flink-daemon.sh | 6 +- .../src/main/flink-bin/bin/historyserver.sh | 34 ++ flink-dist/src/main/resources/flink-conf.yaml | 21 ++ .../runtime/webmonitor/WebRuntimeMonitor.java | 6 +- .../files/StaticFileServerHandler.java | 12 +- .../webmonitor/history/HistoryServer.java | 298 ++++++++++++++++++ .../history/HistoryServerArchiveFetcher.java | 252 +++++++++++++++ .../HistoryServerStaticFileServerHandler.java | 249 +++++++++++++++ .../webmonitor/WebMonitorUtilsTest.java | 36 +++ .../history/FsJobArchivistTest.java | 84 +++++ ...toryServerStaticFileServerHandlerTest.java | 77 +++++ .../webmonitor/history/HistoryServerTest.java | 122 +++++++ .../web-dashboard/app/index_hs.jade | 60 ++++ .../web-dashboard/app/scripts/index_hs.coffee | 208 ++++++++++++ flink-runtime-web/web-dashboard/gulpfile.js | 29 +- .../flink/runtime/history/FsJobArchivist.java | 121 +++++++ .../runtime/jobmanager/JobManagerOptions.java | 7 + .../runtime/webmonitor/WebMonitorUtils.java | 48 +++ .../webmonitor/history/ArchivedJson.java | 16 + .../flink/runtime/jobmanager/JobManager.scala | 29 +- .../runtime/jobmanager/MemoryArchivist.scala | 37 ++- .../minicluster/LocalFlinkMiniCluster.scala | 14 +- .../jobmanager/JobManagerHARecoveryTest.java | 3 +- .../webmonitor/history/ArchivedJsonTest.java | 37 +++ .../testingUtils/TestingMemoryArchivist.scala | 4 +- 33 files changed, 1986 insertions(+), 31 deletions(-) create mode 100644 docs/monitoring/historyserver.md create mode 100644 flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java create mode 100644 flink-dist/src/main/flink-bin/bin/historyserver.sh create mode 100644 flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java create mode 100644 flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java create mode 100644 flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java create mode 100644 flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebMonitorUtilsTest.java create mode 100644 flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/FsJobArchivistTest.java create mode 100644 flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandlerTest.java create mode 100644 flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java create mode 100644 flink-runtime-web/web-dashboard/app/index_hs.jade create mode 100644 flink-runtime-web/web-dashboard/app/scripts/index_hs.coffee create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/history/FsJobArchivist.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/history/ArchivedJsonTest.java diff --git a/docs/monitoring/back_pressure.md b/docs/monitoring/back_pressure.md index 4a4bd22265a99..f047066831c97 100644 --- a/docs/monitoring/back_pressure.md +++ b/docs/monitoring/back_pressure.md @@ -1,7 +1,7 @@ --- title: "Monitoring Back Pressure" nav-parent_id: monitoring -nav-pos: 4 +nav-pos: 5 --- + +Flink has a history server that can be used to query the statistics of completed jobs after the corresponding Flink cluster has been shut down. + +Furthermore, it exposes a REST API that accepts HTTP requests and responds with JSON data. + +* This will be replaced by the TOC +{:toc} + +## Overview + +The HistoryServer allows you to query the status and statistics of completed jobs that have been archived by a JobManager. + +You start and stop the HistoryServer via its corresponding startup script: + +```sh +# Start or stop the HistoryServer +bin/historyserver.sh (start|stop) +``` + +By default, this server binds to `localhost` and listens at port `8082`. + +Currently, you can only run it as a standalone process. + +## Configuration + +The configuration keys `jobmanager.archive.fs.dir` and `historyserver.archive.fs.refresh-interval` need to be adjusted for archiving and displaying archived jobs. + +**JobManager** + +The archiving of completed jobs happens on the JobManager, which uploads the archived job information to a file system directory. You can configure the directory to archive completed jobs in `flink-conf.yaml` by setting a directory via `jobmanager.archive.fs.dir`. + +```sh +# Directory to upload completed job information +jobmanager.archive.fs.dir: hdfs:///completed-jobs +``` + +**HistoryServer** + +The HistoryServer can be configured to monitor a comma-separated list of directories in via `historyserver.archive.fs.dir`. The configured directories are regularly polled for new archives; the polling interval can be configured via `historyserver.archive.fs.refresh-interval`. + +```sh +# Monitor the following directories for completed jobs +historyserver.archive.fs.dir: hdfs:///completed-jobs + +# Refresh every 10 seconds +historyserver.archive.fs.refresh-interval: 10000 +``` + +The contained archives are downloaded and cached in the local filesystem. The local directory for this is configured via `historyserver.web.tmpdir`. + +Check out the configuration page for a [complete list of configuration options]({{ site.baseurl }}/setup/config.html#history-server). + +## Available Requests + +Below is a list of available requests, with a sample JSON response. All requests are of the sample form `http://hostname:8082/jobs`, below we list only the *path* part of the URLs. + +Values in angle brackets are variables, for example `http://hostname:port/jobs//exceptions` will have to requested for example as `http://hostname:port/jobs/7684be6004e4e955c2a558a9bc463f65/exceptions`. + + - `/config` + - `/jobs` + - `/joboverview` + - `/jobs/` + - `/jobs//vertices` + - `/jobs//config` + - `/jobs//exceptions` + - `/jobs//accumulators` + - `/jobs//vertices/` + - `/jobs//vertices//subtasktimes` + - `/jobs//vertices//taskmanagers` + - `/jobs//vertices//accumulators` + - `/jobs//vertices//subtasks/accumulators` + - `/jobs//vertices//subtasks/` + - `/jobs//vertices//subtasks//attempts/` + - `/jobs//vertices//subtasks//attempts//accumulators` + - `/jobs//plan` diff --git a/docs/monitoring/large_state_tuning.md b/docs/monitoring/large_state_tuning.md index 78a5cb4814776..a5201063313c6 100644 --- a/docs/monitoring/large_state_tuning.md +++ b/docs/monitoring/large_state_tuning.md @@ -1,7 +1,7 @@ --- title: "Debugging and Tuning Checkpoints and Large State" nav-parent_id: monitoring -nav-pos: 11 +nav-pos: 12 --- + + + + + + Apache Flink History Server + + + + + + + + + + + + + + + +
+
+
+ + \ No newline at end of file diff --git a/flink-runtime-web/web-dashboard/web/js/hs/index.js b/flink-runtime-web/web-dashboard/web/js/hs/index.js new file mode 100644 index 0000000000000..374c0c1986575 --- /dev/null +++ b/flink-runtime-web/web-dashboard/web/js/hs/index.js @@ -0,0 +1,2 @@ +angular.module("flinkApp",["ui.router","angularMoment","dndLists"]).run(["$rootScope",function(e){return e.sidebarVisible=!1,e.showSidebar=function(){return e.sidebarVisible=!e.sidebarVisible,e.sidebarClass="force-show"}}]).value("flinkConfig",{jobServer:"","refresh-interval":1e4}).value("watermarksConfig",{noWatermark:-0x8000000000000000}).run(["JobsService","MainService","flinkConfig","$interval",function(e,t,r,n){return t.loadConfig().then(function(t){return angular.extend(r,t),e.listJobs(),n(function(){return e.listJobs()},r["refresh-interval"])})}]).config(["$uiViewScrollProvider",function(e){return e.useAnchorScroll()}]).run(["$rootScope","$state",function(e,t){return e.$on("$stateChangeStart",function(e,r,n,o){if(r.redirectTo)return e.preventDefault(),t.go(r.redirectTo,n)})}]).config(["$stateProvider","$urlRouterProvider",function(e,t){return e.state("completed-jobs",{url:"/completed-jobs",views:{main:{templateUrl:"partials/jobs/completed-jobs.html",controller:"CompletedJobsController"}}}).state("single-job",{url:"/jobs/{jobid}","abstract":!0,views:{main:{templateUrl:"partials/jobs/job.html",controller:"SingleJobController"}}}).state("single-job.plan",{url:"",redirectTo:"single-job.plan.subtasks",views:{details:{templateUrl:"partials/jobs/job.plan.html",controller:"JobPlanController"}}}).state("single-job.plan.subtasks",{url:"",views:{"node-details":{templateUrl:"partials/jobs/job.plan.node-list.subtasks.html",controller:"JobPlanSubtasksController"}}}).state("single-job.plan.metrics",{url:"/metrics",views:{"node-details":{templateUrl:"partials/jobs/job.plan.node-list.metrics.html",controller:"JobPlanMetricsController"}}}).state("single-job.plan.watermarks",{url:"/watermarks",views:{"node-details":{templateUrl:"partials/jobs/job.plan.node-list.watermarks.html"}}}).state("single-job.plan.taskmanagers",{url:"/taskmanagers",views:{"node-details":{templateUrl:"partials/jobs/job.plan.node-list.taskmanagers.html",controller:"JobPlanTaskManagersController"}}}).state("single-job.plan.accumulators",{url:"/accumulators",views:{"node-details":{templateUrl:"partials/jobs/job.plan.node-list.accumulators.html",controller:"JobPlanAccumulatorsController"}}}).state("single-job.plan.checkpoints",{url:"/checkpoints",redirectTo:"single-job.plan.checkpoints.overview",views:{"node-details":{templateUrl:"partials/jobs/job.plan.node-list.checkpoints.html",controller:"JobPlanCheckpointsController"}}}).state("single-job.plan.checkpoints.overview",{url:"/overview",views:{"checkpoints-view":{templateUrl:"partials/jobs/job.plan.node.checkpoints.overview.html",controller:"JobPlanCheckpointsController"}}}).state("single-job.plan.checkpoints.summary",{url:"/summary",views:{"checkpoints-view":{templateUrl:"partials/jobs/job.plan.node.checkpoints.summary.html",controller:"JobPlanCheckpointsController"}}}).state("single-job.plan.checkpoints.history",{url:"/history",views:{"checkpoints-view":{templateUrl:"partials/jobs/job.plan.node.checkpoints.history.html",controller:"JobPlanCheckpointsController"}}}).state("single-job.plan.checkpoints.config",{url:"/config",views:{"checkpoints-view":{templateUrl:"partials/jobs/job.plan.node.checkpoints.config.html",controller:"JobPlanCheckpointsController"}}}).state("single-job.plan.checkpoints.details",{url:"/details/{checkpointId}",views:{"checkpoints-view":{templateUrl:"partials/jobs/job.plan.node.checkpoints.details.html",controller:"JobPlanCheckpointDetailsController"}}}).state("single-job.plan.backpressure",{url:"/backpressure",views:{"node-details":{templateUrl:"partials/jobs/job.plan.node-list.backpressure.html",controller:"JobPlanBackPressureController"}}}).state("single-job.timeline",{url:"/timeline",views:{details:{templateUrl:"partials/jobs/job.timeline.html"}}}).state("single-job.timeline.vertex",{url:"/{vertexId}",views:{vertex:{templateUrl:"partials/jobs/job.timeline.vertex.html",controller:"JobTimelineVertexController"}}}).state("single-job.exceptions",{url:"/exceptions",views:{details:{templateUrl:"partials/jobs/job.exceptions.html",controller:"JobExceptionsController"}}}).state("single-job.config",{url:"/config",views:{details:{templateUrl:"partials/jobs/job.config.html"}}}),t.otherwise("/completed-jobs")}]),angular.module("flinkApp").directive("bsLabel",["JobsService",function(e){return{transclude:!0,replace:!0,scope:{getLabelClass:"&",status:"@"},template:"",link:function(t,r,n){return t.getLabelClass=function(){return"label label-"+e.translateLabelState(n.status)}}}}]).directive("bpLabel",["JobsService",function(e){return{transclude:!0,replace:!0,scope:{getBackPressureLabelClass:"&",status:"@"},template:"",link:function(t,r,n){return t.getBackPressureLabelClass=function(){return"label label-"+e.translateBackPressureLabelState(n.status)}}}}]).directive("indicatorPrimary",["JobsService",function(e){return{replace:!0,scope:{getLabelClass:"&",status:"@"},template:"",link:function(t,r,n){return t.getLabelClass=function(){return"fa fa-circle indicator indicator-"+e.translateLabelState(n.status)}}}}]).directive("tableProperty",function(){return{replace:!0,scope:{value:"="},template:"{{value || 'None'}}"}}),angular.module("flinkApp").filter("amDurationFormatExtended",["angularMomentConfig",function(e){var t;return t=function(e,t,r){return"undefined"==typeof e||null===e?"":moment.duration(e,t).format(r,{trim:!1})},t.$stateful=e.statefulFilters,t}]).filter("humanizeDuration",function(){return function(e,t){var r,n,o,i,s,a;return"undefined"==typeof e||null===e?"":(i=e%1e3,a=Math.floor(e/1e3),s=a%60,a=Math.floor(a/60),o=a%60,a=Math.floor(a/60),n=a%24,a=Math.floor(a/24),r=a,0===r?0===n?0===o?0===s?i+"ms":s+"s ":o+"m "+s+"s":t?n+"h "+o+"m":n+"h "+o+"m "+s+"s":t?r+"d "+n+"h":r+"d "+n+"h "+o+"m "+s+"s")}}).filter("limit",function(){return function(e){return e.length>73&&(e=e.substring(0,35)+"..."+e.substring(e.length-35,e.length)),e}}).filter("humanizeText",function(){return function(e){return e?e.replace(/>/g,">").replace(//g,""):""}}).filter("humanizeBytes",function(){return function(e){var t,r;return r=["B","KB","MB","GB","TB","PB","EB"],t=function(e,n){var o;return o=Math.pow(1024,n),e=r;n=0<=r?++e:--e)o.push(n+".currentLowWatermark");return o}(),o.getMetrics(i,t.id,s).then(function(e){var t,n,o,i,s,a,l;o=NaN,l={},i=e.values;for(t in i)a=i[t],s=t.replace(".currentLowWatermark",""),l[s]=a,(isNaN(o)||au.noWatermark?o:NaN,r.resolve({lowWatermark:n,watermarks:l})}),r.promise}}(this),r=l.defer(),s={},n=t.length,angular.forEach(t,function(e){return function(e,t){var o;return o=e.id,i(e).then(function(e){if(s[o]=e,t>=n-1)return r.resolve(s)})}}(this)),r.promise},e.hasWatermark=function(t){return e.watermarks[t]&&!isNaN(e.watermarks[t].lowWatermark)},e.$watch("plan",function(t){if(t)return c(t.nodes).then(function(t){return e.watermarks=t})}),e.$on("reload",function(){if(e.plan)return c(e.plan.nodes).then(function(t){return e.watermarks=t})})}]).controller("JobPlanController",["$scope","$state","$stateParams","$window","JobsService",function(e,t,r,n,o){return e.nodeid=null,e.nodeUnfolded=!1,e.stateList=o.stateList(),e.changeNode=function(t){return t!==e.nodeid?(e.nodeid=t,e.vertex=null,e.subtasks=null,e.accumulators=null,e.operatorCheckpointStats=null,e.$broadcast("reload"),e.$broadcast("node:change",e.nodeid)):(e.nodeid=null,e.nodeUnfolded=!1,e.vertex=null,e.subtasks=null,e.accumulators=null,e.operatorCheckpointStats=null)},e.deactivateNode=function(){return e.nodeid=null,e.nodeUnfolded=!1,e.vertex=null,e.subtasks=null,e.accumulators=null,e.operatorCheckpointStats=null},e.toggleFold=function(){return e.nodeUnfolded=!e.nodeUnfolded}}]).controller("JobPlanSubtasksController",["$scope","JobsService",function(e,t){var r;return r=function(){return t.getSubtasks(e.nodeid).then(function(t){return e.subtasks=t})},!e.nodeid||e.vertex&&e.vertex.st||r(),e.$on("reload",function(t){if(e.nodeid)return r()})}]).controller("JobPlanTaskManagersController",["$scope","JobsService",function(e,t){var r;return r=function(){return t.getTaskManagers(e.nodeid).then(function(t){return e.taskmanagers=t})},!e.nodeid||e.vertex&&e.vertex.st||r(),e.$on("reload",function(t){if(e.nodeid)return r()})}]).controller("JobPlanAccumulatorsController",["$scope","JobsService",function(e,t){var r;return r=function(){return t.getAccumulators(e.nodeid).then(function(t){return e.accumulators=t.main,e.subtaskAccumulators=t.subtasks})},!e.nodeid||e.vertex&&e.vertex.accumulators||r(),e.$on("reload",function(t){if(e.nodeid)return r()})}]).controller("JobPlanCheckpointsController",["$scope","$state","$stateParams","JobsService",function(e,t,r,n){var o;return e.checkpointDetails={},e.checkpointDetails.id=-1,n.getCheckpointConfig().then(function(t){return e.checkpointConfig=t}),o=function(){return n.getCheckpointStats().then(function(t){if(null!==t)return e.checkpointStats=t})},o(),e.$on("reload",function(e){return o()})}]).controller("JobPlanCheckpointDetailsController",["$scope","$state","$stateParams","JobsService",function(e,t,r,n){var o,i;return e.subtaskDetails={},e.checkpointDetails.id=r.checkpointId,o=function(t){return n.getCheckpointDetails(t).then(function(t){return null!==t?e.checkpoint=t:e.unknown_checkpoint=!0})},i=function(t,r){return n.getCheckpointSubtaskDetails(t,r).then(function(t){if(null!==t)return e.subtaskDetails[r]=t})},o(r.checkpointId),e.nodeid&&i(r.checkpointId,e.nodeid),e.$on("reload",function(t){if(o(r.checkpointId),e.nodeid)return i(r.checkpointId,e.nodeid)}),e.$on("$destroy",function(){return e.checkpointDetails.id=-1})}]).controller("JobPlanBackPressureController",["$scope","JobsService",function(e,t){var r;return r=function(){if(e.now=Date.now(),e.nodeid)return t.getOperatorBackPressure(e.nodeid).then(function(t){return e.backPressureOperatorStats[e.nodeid]=t})},r(),e.$on("reload",function(e){return r()})}]).controller("JobTimelineVertexController",["$scope","$state","$stateParams","JobsService",function(e,t,r,n){var o;return o=function(){return n.getVertex(r.vertexId).then(function(t){return e.vertex=t})},o(),e.$on("reload",function(e){return o()})}]).controller("JobExceptionsController",["$scope","$state","$stateParams","JobsService",function(e,t,r,n){return n.loadExceptions().then(function(t){return e.exceptions=t})}]).controller("JobPropertiesController",["$scope","JobsService",function(e,t){return e.changeNode=function(r){return r!==e.nodeid?(e.nodeid=r,t.getNode(r).then(function(t){return e.node=t})):(e.nodeid=null,e.node=null)}}]).controller("JobPlanMetricsController",["$scope","JobsService","MetricsService",function(e,t,r){var n;if(e.dragging=!1,e.window=r.getWindow(),e.availableMetrics=null,e.$on("$destroy",function(){return r.unRegisterObserver()}),n=function(){return t.getVertex(e.nodeid).then(function(t){return e.vertex=t}),r.getAvailableMetrics(e.jobid,e.nodeid).then(function(t){return e.availableMetrics=t,e.metrics=r.getMetricsSetup(e.jobid,e.nodeid).names,r.registerObserver(e.jobid,e.nodeid,function(t){return e.$broadcast("metrics:data:update",t.timestamp,t.values)})})},e.dropped=function(t,o,i,s,a){return r.orderMetrics(e.jobid,e.nodeid,i,o),e.$broadcast("metrics:refresh",i),n(),!1},e.dragStart=function(){return e.dragging=!0},e.dragEnd=function(){return e.dragging=!1},e.addMetric=function(t){return r.addMetric(e.jobid,e.nodeid,t.id),n()},e.removeMetric=function(t){return r.removeMetric(e.jobid,e.nodeid,t),n()},e.setMetricSize=function(t,o){return r.setMetricSize(e.jobid,e.nodeid,t,o),n()},e.getValues=function(t){return r.getValues(e.jobid,e.nodeid,t)},e.$on("node:change",function(t,r){if(!e.dragging)return n()}),e.nodeid)return n()}]),angular.module("flinkApp").directive("vertex",["$state",function(e){return{template:"",scope:{data:"="},link:function(e,t,r){var n,o,i;i=t.children()[0],o=t.width(),angular.element(i).attr("width",o),(n=function(e){var t,r,n;return d3.select(i).selectAll("*").remove(),n=[],angular.forEach(e.subtasks,function(e,t){var r;return r=[{label:"Scheduled",color:"#666",borderColor:"#555",starting_time:e.timestamps.SCHEDULED,ending_time:e.timestamps.DEPLOYING,type:"regular"},{label:"Deploying",color:"#aaa",borderColor:"#555",starting_time:e.timestamps.DEPLOYING,ending_time:e.timestamps.RUNNING,type:"regular"}],e.timestamps.FINISHED>0&&r.push({label:"Running",color:"#ddd",borderColor:"#555",starting_time:e.timestamps.RUNNING,ending_time:e.timestamps.FINISHED,type:"regular"}),n.push({label:"("+e.subtask+") "+e.host,times:r})}),t=d3.timeline().stack().tickFormat({format:d3.time.format("%L"),tickSize:1}).prefix("single").labelFormat(function(e){return e}).margin({left:100,right:0,top:0,bottom:0}).itemHeight(30).relativeTime(),r=d3.select(i).datum(n).call(t)})(e.data)}}}]).directive("timeline",["$state",function(e){return{template:"",scope:{vertices:"=",jobid:"="},link:function(t,r,n){var o,i,s,a;s=r.children()[0],i=r.width(),angular.element(s).attr("width",i),a=function(e){return e.replace(">",">")},o=function(r){var n,o,i;return d3.select(s).selectAll("*").remove(),i=[],angular.forEach(r,function(e){if(e["start-time"]>-1)return"scheduled"===e.type?i.push({times:[{label:a(e.name),color:"#cccccc",borderColor:"#555555",starting_time:e["start-time"],ending_time:e["end-time"],type:e.type}]}):i.push({times:[{label:a(e.name),color:"#d9f1f7",borderColor:"#62cdea",starting_time:e["start-time"],ending_time:e["end-time"],link:e.id,type:e.type}]})}),n=d3.timeline().stack().click(function(r,n,o){if(r.link)return e.go("single-job.timeline.vertex",{jobid:t.jobid,vertexId:r.link})}).tickFormat({format:d3.time.format("%L"),tickSize:1}).prefix("main").margin({left:0,right:0,top:0,bottom:0}).itemHeight(30).showBorderLine().showHourTimeline(),o=d3.select(s).datum(i).call(n)},t.$watch(n.vertices,function(e){if(e)return o(e)})}}}]).directive("split",function(){return{compile:function(e,t){return Split(e.children(),{sizes:[50,50],direction:"vertical"})}}}).directive("jobPlan",["$timeout",function(e){return{template:"
",scope:{plan:"=",watermarks:"=",setNode:"&"},link:function(e,t,r){var n,o,i,s,a,l,u,c,d,f,p,g,m,h,b,v,k,j,S,w,C,$,J,M,y;p=null,C=d3.behavior.zoom(),y=[],h=r.jobid,S=t.children()[0],j=t.children().children()[0],w=t.children()[1],l=d3.select(S),u=d3.select(j),c=d3.select(w),n=t.width(),angular.element(t.children()[0]).width(n),v=0,b=0,e.zoomIn=function(){var e,t,r;if(C.scale()<2.99)return e=C.translate(),t=e[0]*(C.scale()+.1/C.scale()),r=e[1]*(C.scale()+.1/C.scale()),C.scale(C.scale()+.1),C.translate([t,r]),u.attr("transform","translate("+t+","+r+") scale("+C.scale()+")"),v=C.scale(),b=C.translate()},e.zoomOut=function(){var e,t,r;if(C.scale()>.31)return C.scale(C.scale()-.1),e=C.translate(),t=e[0]*(C.scale()-.1/C.scale()),r=e[1]*(C.scale()-.1/C.scale()),C.translate([t,r]),u.attr("transform","translate("+t+","+r+") scale("+C.scale()+")"),v=C.scale(),b=C.translate()},i=function(e){var t;return t="",null==e.ship_strategy&&null==e.local_strategy||(t+="
",null!=e.ship_strategy&&(t+=e.ship_strategy),void 0!==e.temp_mode&&(t+=" ("+e.temp_mode+")"),void 0!==e.local_strategy&&(t+=",
"+e.local_strategy),t+="
"),t},m=function(e){return"partialSolution"===e||"nextPartialSolution"===e||"workset"===e||"nextWorkset"===e||"solutionSet"===e||"solutionDelta"===e},g=function(e,t){return"mirror"===t?"node-mirror":m(t)?"node-iteration":"node-normal"},s=function(e,t,r,n){var o,i;return o="
",o+="mirror"===t?"

Mirror of "+e.operator+"

":"

"+e.operator+"

",""===e.description?o+="":(i=e.description,i=M(i),o+="

"+i+"

"),null!=e.step_function?o+=f(e.id,r,n):(m(t)&&(o+="
"+t+" Node
"),""!==e.parallelism&&(o+="
Parallelism: "+e.parallelism+"
"),void 0!==e.lowWatermark&&(o+="
Low Watermark: "+e.lowWatermark+"
"),void 0!==e.operator&&e.operator_strategy&&(o+="
Operation: "+M(e.operator_strategy)+"
")),o+="
"},f=function(e,t,r){var n,o;return o="svg-"+e,n=""},M=function(e){var t;for("<"===e.charAt(0)&&(e=e.replace("<","<"),e=e.replace(">",">")),t="";e.length>30;)t=t+e.substring(0,30)+"
",e=e.substring(30,e.length);return t+=e},a=function(e,t,r,n,o,i){return null==n&&(n=!1),r.id===t.partial_solution?e.setNode(r.id,{label:s(r,"partialSolution",o,i),labelType:"html","class":g(r,"partialSolution")}):r.id===t.next_partial_solution?e.setNode(r.id,{label:s(r,"nextPartialSolution",o,i),labelType:"html","class":g(r,"nextPartialSolution")}):r.id===t.workset?e.setNode(r.id,{label:s(r,"workset",o,i),labelType:"html","class":g(r,"workset")}):r.id===t.next_workset?e.setNode(r.id,{label:s(r,"nextWorkset",o,i),labelType:"html","class":g(r,"nextWorkset")}):r.id===t.solution_set?e.setNode(r.id,{label:s(r,"solutionSet",o,i),labelType:"html","class":g(r,"solutionSet")}):r.id===t.solution_delta?e.setNode(r.id,{label:s(r,"solutionDelta",o,i),labelType:"html","class":g(r,"solutionDelta")}):e.setNode(r.id,{label:s(r,"",o,i),labelType:"html","class":g(r,"")})},o=function(e,t,r,n,o){return e.setEdge(o.id,r.id,{label:i(o),labelType:"html",arrowhead:"normal"})},k=function(e,t){var r,n,i,s,l,u,d,f,p,g,m,h,b,v;for(n=[],null!=t.nodes?v=t.nodes:(v=t.step_function,i=!0),s=0,u=v.length;s-1))return e["end-time"]=e["start-time"]+e.duration})},this.processVertices=function(e){return angular.forEach(e.vertices,function(e,t){return e.type="regular"}),e.vertices.unshift({name:"Scheduled","start-time":e.timestamps.CREATED,"end-time":e.timestamps.CREATED+1,type:"scheduled"})},this.listJobs=function(){var r;return r=o.defer(),e.get(t.jobServer+"joboverview").success(function(e){return function(t,n,o,i){return angular.forEach(t,function(t,r){switch(r){case"running":return c.running=e.setEndTimes(t);case"finished":return c.finished=e.setEndTimes(t);case"cancelled":return c.cancelled=e.setEndTimes(t);case"failed":return c.failed=e.setEndTimes(t)}}),r.resolve(c),d()}}(this)),r.promise},this.getJobs=function(e){return c[e]},this.getAllJobs=function(){return c},this.loadJob=function(r){return s=null,l.job=o.defer(),e.get(t.jobServer+"jobs/"+r).success(function(n){return function(o,i,a,u){return n.setEndTimes(o.vertices),n.processVertices(o),e.get(t.jobServer+"jobs/"+r+"/config").success(function(e){return o=angular.extend(o,e),s=o,l.job.resolve(s)})}}(this)),l.job.promise},this.getNode=function(e){var t,r;return r=function(e,t){var n,o,i,s;for(n=0,o=t.length;n
{{metric.id}}
',replace:!0,scope:{metric:"=",window:"=",removeMetric:"&",setMetricSize:"=",getValues:"&"},link:function(e,t,r){return e.btnClasses=["btn","btn-default","btn-xs"],e.value=null,e.data=[{values:e.getValues()}],e.options={x:function(e,t){return e.x},y:function(e,t){return e.y},xTickFormat:function(e){return d3.time.format("%H:%M:%S")(new Date(e))},yTickFormat:function(e){var t,r,n,o;for(r=!1,n=0,o=1,t=Math.abs(e);!r&&n<50;)Math.pow(10,n)<=t&&t6?e/Math.pow(10,n)+"E"+n:""+e}},e.showChart=function(){return d3.select(t.find("svg")[0]).datum(e.data).transition().duration(250).call(e.chart)},e.chart=nv.models.lineChart().options(e.options).showLegend(!1).margin({top:15,left:60,bottom:30,right:30}),e.chart.yAxis.showMaxMin(!1),e.chart.tooltip.hideDelay(0),e.chart.tooltip.contentGenerator(function(e){return"

"+d3.time.format("%H:%M:%S")(new Date(e.point.x))+" | "+e.point.y+"

"}),nv.utils.windowResize(e.chart.update),e.setSize=function(t){return e.setMetricSize(e.metric,t)},e.showChart(),e.$on("metrics:data:update",function(t,r,n){return e.value=parseFloat(n[e.metric.id]),e.data[0].values.push({x:r,y:e.value}),e.data[0].values.length>e.window&&e.data[0].values.shift(),e.showChart(),e.chart.clearHighlights(),e.chart.tooltip.hidden(!0)}),t.find(".metric-title").qtip({content:{text:e.metric.id},position:{my:"bottom left",at:"top left"},style:{classes:"qtip-light qtip-timeline-bar"}})}}}),angular.module("flinkApp").service("MetricsService",["$http","$q","flinkConfig","$interval",function(e,t,r,n){return this.metrics={},this.values={},this.watched={},this.observer={jobid:null,nodeid:null,callback:null +},this.refresh=n(function(e){return function(){return angular.forEach(e.metrics,function(t,r){return angular.forEach(t,function(t,n){var o;if(o=[],angular.forEach(t,function(e,t){return o.push(e.id)}),o.length>0)return e.getMetrics(r,n,o).then(function(t){if(r===e.observer.jobid&&n===e.observer.nodeid&&e.observer.callback)return e.observer.callback(t)})})})}}(this),r["refresh-interval"]),this.registerObserver=function(e,t,r){return this.observer.jobid=e,this.observer.nodeid=t,this.observer.callback=r},this.unRegisterObserver=function(){return this.observer={jobid:null,nodeid:null,callback:null}},this.setupMetrics=function(e,t){return this.setupLS(),this.watched[e]=[],angular.forEach(t,function(t){return function(r,n){if(r.id)return t.watched[e].push(r.id)}}(this))},this.getWindow=function(){return 100},this.setupLS=function(){return null==sessionStorage.flinkMetrics&&this.saveSetup(),this.metrics=JSON.parse(sessionStorage.flinkMetrics)},this.saveSetup=function(){return sessionStorage.flinkMetrics=JSON.stringify(this.metrics)},this.saveValue=function(e,t,r){if(null==this.values[e]&&(this.values[e]={}),null==this.values[e][t]&&(this.values[e][t]=[]),this.values[e][t].push(r),this.values[e][t].length>this.getWindow())return this.values[e][t].shift()},this.getValues=function(e,t,r){var n;return null==this.values[e]?[]:null==this.values[e][t]?[]:(n=[],angular.forEach(this.values[e][t],function(e){return function(e,t){if(null!=e.values[r])return n.push({x:e.timestamp,y:e.values[r]})}}(this)),n)},this.setupLSFor=function(e,t){if(null==this.metrics[e]&&(this.metrics[e]={}),null==this.metrics[e][t])return this.metrics[e][t]=[]},this.addMetric=function(e,t,r){return this.setupLSFor(e,t),this.metrics[e][t].push({id:r,size:"small"}),this.saveSetup()},this.removeMetric=function(e){return function(t,r,n){var o;if(null!=e.metrics[t][r])return o=e.metrics[t][r].indexOf(n),o===-1&&(o=_.findIndex(e.metrics[t][r],{id:n})),o!==-1&&e.metrics[t][r].splice(o,1),e.saveSetup()}}(this),this.setMetricSize=function(e){return function(t,r,n,o){var i;if(null!=e.metrics[t][r])return i=e.metrics[t][r].indexOf(n.id),i===-1&&(i=_.findIndex(e.metrics[t][r],{id:n.id})),i!==-1&&(e.metrics[t][r][i]={id:n.id,size:o}),e.saveSetup()}}(this),this.orderMetrics=function(e,t,r,n){return this.setupLSFor(e,t),angular.forEach(this.metrics[e][t],function(o){return function(i,s){if(i.id===r.id&&(o.metrics[e][t].splice(s,1),s Date: Tue, 21 Mar 2017 10:35:54 +0100 Subject: [PATCH 4/4] fix caching --- .../history/HistoryServerStaticFileServerHandler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java index 77b69edc59619..31e9bbcd355b9 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java @@ -211,7 +211,7 @@ private void respondWithFile(ChannelHandlerContext ctx, HttpRequest request, Str StaticFileServerHandler.setContentTypeHeader(response, file); // the job overview should be updated as soon as possible - if (!requestPath.equals("/joboverview")) { + if (!requestPath.equals("/joboverview.json")) { StaticFileServerHandler.setDateAndCacheHeaders(response, file); } if (HttpHeaders.isKeepAlive(request)) {