From 0e41a6f759a1ad6ea19e56f9bc84b0c03f617780 Mon Sep 17 00:00:00 2001 From: Sebastian Kruse Date: Fri, 22 Aug 2014 17:18:09 +0200 Subject: [PATCH 1/7] Add first functionality to display profiling data in the web interface * a servlet for providing profiling events * an html page to profile the events * an ajax call to load the data from the servlet --- .../js/resourceUsageFrontend.js | 35 +++++ .../web-docs-infoserver/resource-usage.html | 104 +++++++++++++++ .../jobmanager/web/ResourceUsageServlet.java | 126 ++++++++++++++++++ .../runtime/jobmanager/web/WebInfoServer.java | 2 + 4 files changed, 267 insertions(+) create mode 100644 flink-runtime/resources/web-docs-infoserver/js/resourceUsageFrontend.js create mode 100644 flink-runtime/resources/web-docs-infoserver/resource-usage.html create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/ResourceUsageServlet.java diff --git a/flink-runtime/resources/web-docs-infoserver/js/resourceUsageFrontend.js b/flink-runtime/resources/web-docs-infoserver/js/resourceUsageFrontend.js new file mode 100644 index 0000000000000..4f983ebe6d0ee --- /dev/null +++ b/flink-runtime/resources/web-docs-infoserver/js/resourceUsageFrontend.js @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Polls the job history on page load + */ +(function pollResourceUsage() { + $.ajax({ + url : "resourceUsage", + cache : false, + type : "GET", + success : function(content) { + + // Fill Table + $.html("#profilingEvents", content) + + }, + dataType : "html", + }); +})(); diff --git a/flink-runtime/resources/web-docs-infoserver/resource-usage.html b/flink-runtime/resources/web-docs-infoserver/resource-usage.html new file mode 100644 index 0000000000000..ce2f758eacdd7 --- /dev/null +++ b/flink-runtime/resources/web-docs-infoserver/resource-usage.html @@ -0,0 +1,104 @@ + + + + + + + + + Dashboard - Flink + + + + + + + + + + + + + + + + + + + + + + + + + + + +
+ + + + +
+ +
+
+

History Overview about recent jobs

+ +
+
+
+

+ Profiling Events +

+
+
+
+
+
+
+ +
+ +
+ + + \ No newline at end of file diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/ResourceUsageServlet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/ResourceUsageServlet.java new file mode 100644 index 0000000000000..6dd2438d8b8bc --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/ResourceUsageServlet.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager.web; + +import java.io.IOException; +import java.io.PrintWriter; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; + +import javax.servlet.ServletException; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.flink.runtime.event.job.AbstractEvent; +import org.apache.flink.runtime.event.job.RecentJobEvent; +import org.apache.flink.runtime.jobgraph.JobID; +import org.apache.flink.runtime.jobmanager.JobManager; +import org.apache.flink.runtime.profiling.types.ProfilingEvent; +import org.apache.flink.util.StringUtils; + +public class ResourceUsageServlet extends HttpServlet { + + private static final long serialVersionUID = 1L; + + /** + * The log for this class. + */ + private static final Log LOG = LogFactory.getLog(ResourceUsageServlet.class); + + private JobManager jobManager; + + public ResourceUsageServlet(JobManager jobManager) { + this.jobManager = jobManager; + } + + @Override + protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { + try { + JobID jobID = getJobID(req); + + if (jobID == null) { + resp.setStatus(HttpServletResponse.SC_OK); + resp.setContentType("text/html"); + resp.getWriter().write("

No job found.

"); + return; + } + + List allJobEvents = this.jobManager.getEvents(jobID); + List profilingEvents = new ArrayList(allJobEvents.size()); + for (AbstractEvent jobEvent : allJobEvents) { + if (jobEvent instanceof ProfilingEvent) { + profilingEvents.add((ProfilingEvent) jobEvent); + } + } + + resp.setStatus(HttpServletResponse.SC_OK); + resp.setContentType("text/html"); + PrintWriter writer = resp.getWriter(); + writer.write("

Profiling events

"); + writer.write("
    "); + for (ProfilingEvent profilingEvent : profilingEvents) { + writer.write("
  1. "); + writer.write(profilingEvent.getJobID().toString()); + writer.write(" - "); + writer.write(new Date(profilingEvent.getTimestamp()).toString()); + writer.write(" - "); + writer.write(profilingEvent.toString()); + writer.write("
  2. "); + } + writer.write("
"); + + } catch (Exception e) { + resp.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); + resp.setContentType("text/html"); + resp.getWriter().print(e.getMessage()); + if (LOG.isWarnEnabled()) { + LOG.warn(StringUtils.stringifyException(e)); + } + } + } + + /** Loads the job ID from the request or selects the latest submitted job. */ + private JobID getJobID(HttpServletRequest req) throws IOException { + String jobIdParameter = req.getParameter("jobid"); + JobID jobID = jobIdParameter == null ? loadLatestJobID() : JobID.fromHexString(jobIdParameter); + return jobID; + } + + /** + * @return the latest job ID from the {@link #jobManager}. + * @throws IOException + * if there is a problem with retrieving the job list + */ + private JobID loadLatestJobID() throws IOException { + List recentJobEvents = this.jobManager.getRecentJobs(); + RecentJobEvent mostRecentJobEvent = null; + for (RecentJobEvent jobEvent : recentJobEvents) { + if (mostRecentJobEvent == null + || mostRecentJobEvent.getSubmissionTimestamp() < jobEvent.getSubmissionTimestamp()) { + mostRecentJobEvent = jobEvent; + } + } + return mostRecentJobEvent == null ? null : mostRecentJobEvent.getJobID(); + } + +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java index 283fb83123160..8bdbf61616d91 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java @@ -128,6 +128,8 @@ public WebInfoServer(Configuration nepheleConfig, int port, JobManager jobmanage servletContext.addServlet(new ServletHolder(new LogfileInfoServlet(logDirFiles)), "/logInfo"); servletContext.addServlet(new ServletHolder(new SetupInfoServlet(jobmanager)), "/setupInfo"); servletContext.addServlet(new ServletHolder(new MenuServlet()), "/menu"); + // TODO: Add servlet only if profiling is enabled. + servletContext.addServlet(new ServletHolder(new ResourceUsageServlet(jobmanager)), "/resourceUsage"); // ----- the handler serving all the static files ----- From 7d94f3b5ffc6dd8b8fac54889486ff90c60bf51e Mon Sep 17 00:00:00 2001 From: Sebastian Kruse Date: Mon, 25 Aug 2014 17:13:55 +0200 Subject: [PATCH 2/7] [FLINK-964] Draw InstanceSummaryProfilingEvents with flot as line charts (prototype) --- .../js/resourceUsageFrontend.js | 130 ++++++++++- .../web-docs-infoserver/resource-usage.html | 217 +++++++++++------- .../runtime/jobmanager/web/MenuServlet.java | 19 +- .../jobmanager/web/ResourceUsageServlet.java | 146 +++++++++--- 4 files changed, 378 insertions(+), 134 deletions(-) diff --git a/flink-runtime/resources/web-docs-infoserver/js/resourceUsageFrontend.js b/flink-runtime/resources/web-docs-infoserver/js/resourceUsageFrontend.js index 4f983ebe6d0ee..d455e1193736a 100644 --- a/flink-runtime/resources/web-docs-infoserver/js/resourceUsageFrontend.js +++ b/flink-runtime/resources/web-docs-infoserver/js/resourceUsageFrontend.js @@ -16,20 +16,130 @@ * limitations under the License. */ -/* - * Polls the job history on page load +/** + * Template for profiling data. */ -(function pollResourceUsage() { +function ProfilingSeries(label, number) { + this.lastTimestamp = 0; + this.flotData = { + color : number, + label : label, + data : [] + }; +} + +/** Container for the different profiling series. */ +var collectedProfilingData = {}; +$(function initResourceUsage() { + collectedProfilingData.freeMemory = new ProfilingSeries("Free memory", 0); + collectedProfilingData.userCpu = new ProfilingSeries("User CPU", 1); + collectedProfilingData.transmittedBytes = new ProfilingSeries( + "Transmitted bytes", 2); +}); + + +/** Options to be used with flot. */ +function makeMemorySizeFormatter(baseScale) { + return function(val, axis) { + val *= baseScale + if (val > 1000000) + return (val / 1000000).toFixed(axis.tickDecimals) + " MB"; + else if (val > 1000) + return (val / 1000).toFixed(axis.tickDecimals) + " kB"; + else + return val.toFixed(axis.tickDecimals) + " B"; + } +} + +var mainMemoryFlotOptions = { + xaxis : { + mode : "time", + timeformat : "%H:%M:%S", + }, + yaxis : { + min : 0, + // /proc/meminfo delivers kiB + tickFormatter : makeMemorySizeFormatter(1024) + } +} + +var networkMemoryFlotOptions = { + xaxis : { + mode : "time", + timeformat : "%H:%M:%S", + }, + yaxis : { + min : 0, + tickFormatter : makeMemorySizeFormatter(1) + } + } + +var cpuFlotOptions = { + xaxis : { + mode : "time", + timeformat : "%H:%M:%S", + }, + yaxis : { + min : 0, + tickFormatter : function(val, axis) { + return val.toFixed(axis.tickDecimals) + " %" + } + } +} + + +/** + * Polls profiling events and updates the plot. + */ +function pollResourceUsage() { $.ajax({ url : "resourceUsage", cache : false, type : "GET", - success : function(content) { - - // Fill Table - $.html("#profilingEvents", content) - + success : function(serverResponse) { + updateData(serverResponse); + plotData(); }, - dataType : "html", + dataType : "json", }); -})(); +}; + +/** Updates a profiling series with the given profiling event. */ +function updateProfilingSeries(series, profilingEvent, property) { + if (profilingEvent.timestamp > series.lastTimestamp) { + series.lastTimestamp = profilingEvent.timestamp; + series.flotData.data.push([ profilingEvent.timestamp, + profilingEvent[property] ]); + } +} + +/** Updates all the profiling series from an array of profiling events. */ +function updateData(serverResponse) { + for ( var eventIndex in serverResponse) { + var profilingEvent = serverResponse[eventIndex]; + console.log(profilingEvent); + console.log(profilingEvent.type); + if (profilingEvent.type == "InstanceSummaryProfilingEvent") { + updateProfilingSeries(collectedProfilingData.userCpu, + profilingEvent, "userCpu"); + updateProfilingSeries(collectedProfilingData.transmittedBytes, + profilingEvent, "transmittedBytes"); + updateProfilingSeries(collectedProfilingData.freeMemory, + profilingEvent, "freeMemory"); + } + } +} + +function plotData() { + $.plot("#cpuChart", [ collectedProfilingData.userCpu.flotData ], + cpuFlotOptions); + $.plot("#memoryChart", [ collectedProfilingData.freeMemory.flotData ], + mainMemoryFlotOptions); + $.plot("#networkChart", + [ collectedProfilingData.transmittedBytes.flotData ], networkMemoryFlotOptions); +} + +$(function startPolling() { + pollResourceUsage(); + setInterval(pollResourceUsage, 1000); +}) diff --git a/flink-runtime/resources/web-docs-infoserver/resource-usage.html b/flink-runtime/resources/web-docs-infoserver/resource-usage.html index ce2f758eacdd7..64e41de98bdf3 100644 --- a/flink-runtime/resources/web-docs-infoserver/resource-usage.html +++ b/flink-runtime/resources/web-docs-infoserver/resource-usage.html @@ -1,104 +1,143 @@ - - - - - - - Dashboard - Flink - - - - - - - - - - - - - - - - - - - - - - - + + + + + + + + + + + + - - - - -
- - - - -
- -
-
-

History Overview about recent jobs

- -
-
+ + + + + +
+ + + + +
+ +
+
+

+ Monitor Overview on resource usage +

+ +
+ +
+ +
+
+

+ CPU +

+
+
+
+
+
+
+
+

+ Memory +

+
+
+
+
+
+

- Profiling Events + Network

-
+
-
+
+ -
+
+ -
+
+ - - \ No newline at end of file + + diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/MenuServlet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/MenuServlet.java index 441b64b4afe58..57d2cf0ddce8c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/MenuServlet.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/MenuServlet.java @@ -49,21 +49,21 @@ public class MenuServlet extends HttpServlet { * Array of possible menu entries on the left */ private static final String[] entries = { - "index", "history", "configuration", "taskmanagers" + "index", "history", "configuration", "taskmanagers", "resourceUsage" }; /** * The names of the menu entries shown in the browser */ private static final String[] names = { - "Dashboard", "History", "Configuration", "Task Managers" + "Dashboard", "History", "Configuration", "Task Managers", "Resource Usage" }; /** * The classes of the icons shown next to the names in the browser */ private static final String[] classes = { - "fa fa-dashboard", "fa fa-bar-chart-o", "fa fa-keyboard-o", "fa fa-building-o" + "fa fa-dashboard", "fa fa-archive", "fa fa-wrench", "fa fa-building-o", "fa fa-bar-chart-o" }; public MenuServlet() { @@ -79,16 +79,19 @@ protected void doGet(HttpServletRequest req, HttpServletResponse resp) resp.setStatus(HttpServletResponse.SC_OK); resp.setContentType("application/json"); - if ("index".equals(req.getParameter("get"))) { + String getParameter = req.getParameter("get"); + if ("index".equals(getParameter)) { writeMenu("index", resp); - } else if ("analyze".equals(req.getParameter("get"))) { + } else if ("analyze".equals(getParameter)) { writeMenu("analyze", resp); - } else if ("history".equals(req.getParameter("get"))) { + } else if ("history".equals(getParameter)) { writeMenu("history", resp); - } else if ("configuration".equals(req.getParameter("get"))) { + } else if ("configuration".equals(getParameter)) { writeMenu("configuration", resp); - } else if ("taskmanagers".equals(req.getParameter("get"))) { + } else if ("taskmanagers".equals(getParameter)) { writeMenu("taskmanagers", resp); + } else if ("resourceUsage".equals(getParameter)) { + writeMenu("resourceUsage", resp); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/ResourceUsageServlet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/ResourceUsageServlet.java index 6dd2438d8b8bc..4a18f723317a4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/ResourceUsageServlet.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/ResourceUsageServlet.java @@ -20,9 +20,9 @@ import java.io.IOException; import java.io.PrintWriter; -import java.util.ArrayList; -import java.util.Date; +import java.util.HashMap; import java.util.List; +import java.util.Map; import javax.servlet.ServletException; import javax.servlet.http.HttpServlet; @@ -35,6 +35,7 @@ import org.apache.flink.runtime.event.job.RecentJobEvent; import org.apache.flink.runtime.jobgraph.JobID; import org.apache.flink.runtime.jobmanager.JobManager; +import org.apache.flink.runtime.profiling.types.InstanceSummaryProfilingEvent; import org.apache.flink.runtime.profiling.types.ProfilingEvent; import org.apache.flink.util.StringUtils; @@ -45,49 +46,75 @@ public class ResourceUsageServlet extends HttpServlet { /** * The log for this class. */ - private static final Log LOG = LogFactory.getLog(ResourceUsageServlet.class); + private static final Log LOG = LogFactory + .getLog(ResourceUsageServlet.class); - private JobManager jobManager; + private final Map, ProfilingEventSerializer> jsonSerializers = new HashMap, ResourceUsageServlet.ProfilingEventSerializer>(); + + private final JobManager jobManager; public ResourceUsageServlet(JobManager jobManager) { this.jobManager = jobManager; + + this.jsonSerializers.put(InstanceSummaryProfilingEvent.class, + new InstanceSummaryProfilingEventSerializer()); } @Override - protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { + protected void doGet(HttpServletRequest req, HttpServletResponse resp) + throws ServletException, IOException { try { JobID jobID = getJobID(req); if (jobID == null) { resp.setStatus(HttpServletResponse.SC_OK); - resp.setContentType("text/html"); - resp.getWriter().write("

No job found.

"); + resp.setContentType("application/json"); +// resp.getWriter().write("[{type:\"dummy\",timestamp:123456789,jobid:\"abcdef0123456789\"}]"); +// resp.getWriter().write(this.jobManager.getRecentJobs().toString()); + resp.getWriter().write("[]"); return; } + resp.setStatus(HttpServletResponse.SC_OK); + resp.setContentType("application/json"); + resp.getWriter().write("["); + List allJobEvents = this.jobManager.getEvents(jobID); - List profilingEvents = new ArrayList(allJobEvents.size()); + String separator = ""; for (AbstractEvent jobEvent : allJobEvents) { if (jobEvent instanceof ProfilingEvent) { - profilingEvents.add((ProfilingEvent) jobEvent); + ProfilingEvent profilingEvent = (ProfilingEvent) jobEvent; + ProfilingEventSerializer jsonSerializer = getSerializer(profilingEvent); + if (jsonSerializer != null) { + resp.getWriter().write(separator); + jsonSerializer.write(profilingEvent, resp.getWriter()); + separator = ","; + } else { + resp.getWriter().write(separator); + new ProfilingEventSerializer().write(profilingEvent, resp.getWriter()); + separator = ","; + } } - } - resp.setStatus(HttpServletResponse.SC_OK); - resp.setContentType("text/html"); - PrintWriter writer = resp.getWriter(); - writer.write("

Profiling events

"); - writer.write("
    "); - for (ProfilingEvent profilingEvent : profilingEvents) { - writer.write("
  1. "); - writer.write(profilingEvent.getJobID().toString()); - writer.write(" - "); - writer.write(new Date(profilingEvent.getTimestamp()).toString()); - writer.write(" - "); - writer.write(profilingEvent.toString()); - writer.write("
  2. "); } - writer.write("
"); + + resp.getWriter().write("]"); + +// resp.setStatus(HttpServletResponse.SC_OK); +// resp.setContentType("text/html"); +// PrintWriter writer = resp.getWriter(); +// writer.write("

Profiling events

"); +// writer.write("
    "); +// for (ProfilingEvent profilingEvent : profilingEvents) { +// writer.write("
  1. "); +// writer.write(profilingEvent.getJobID().toString()); +// writer.write(" - "); +// writer.write(new Date(profilingEvent.getTimestamp()).toString()); +// writer.write(" - "); +// writer.write(profilingEvent.toString()); +// writer.write("
  2. "); +// } +// writer.write("
"); } catch (Exception e) { resp.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); @@ -99,10 +126,16 @@ protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws Se } } + @SuppressWarnings("unchecked") + private ProfilingEventSerializer getSerializer(ProfilingEvent profilingEvent) { + return (ProfilingEventSerializer) this.jsonSerializers.get(profilingEvent.getClass()); + } + /** Loads the job ID from the request or selects the latest submitted job. */ private JobID getJobID(HttpServletRequest req) throws IOException { String jobIdParameter = req.getParameter("jobid"); - JobID jobID = jobIdParameter == null ? loadLatestJobID() : JobID.fromHexString(jobIdParameter); + JobID jobID = jobIdParameter == null ? loadLatestJobID() : JobID + .fromHexString(jobIdParameter); return jobID; } @@ -116,11 +149,70 @@ private JobID loadLatestJobID() throws IOException { RecentJobEvent mostRecentJobEvent = null; for (RecentJobEvent jobEvent : recentJobEvents) { if (mostRecentJobEvent == null - || mostRecentJobEvent.getSubmissionTimestamp() < jobEvent.getSubmissionTimestamp()) { + || mostRecentJobEvent.getSubmissionTimestamp() < jobEvent + .getSubmissionTimestamp()) { mostRecentJobEvent = jobEvent; } } - return mostRecentJobEvent == null ? null : mostRecentJobEvent.getJobID(); + return mostRecentJobEvent == null ? null : mostRecentJobEvent + .getJobID(); + } + + private static class ProfilingEventSerializer { + + private PrintWriter writer; + private String separator; + + protected void writeField(String name, String value) { + this.writer.write(separator); + this.writer.write("\""); + this.writer.write(name); + this.writer.write("\":\""); + this.writer.write(value); + this.writer.write("\""); + this.separator = ","; + } + + protected void writeField(String name, long value) { + this.writer.write(separator); + this.writer.write("\""); + this.writer.write(name); + this.writer.write("\":"); + this.writer.write(Long.toString(value)); + this.separator = ","; + } + + protected void writeFields(T profilingEvent) { + writeField("type", profilingEvent.getClass().getSimpleName()); + writeField("jobID", profilingEvent.getJobID().toString()); + writeField("timestamp", profilingEvent.getTimestamp()); + } + + public synchronized void write(T profilingEvent, PrintWriter writer) + throws IOException { + this.writer = writer; // cache the writer for convenience -- we are + // synchronized here + this.writer.write("{"); + this.separator = ""; + writeFields(profilingEvent); + this.writer.write("}"); + this.writer = null; + } + + } + + private static class InstanceSummaryProfilingEventSerializer extends + ProfilingEventSerializer { + + @Override + protected void writeFields(InstanceSummaryProfilingEvent profilingEvent) { + super.writeFields(profilingEvent); + writeField("userCpu", profilingEvent.getUserCPU()); + writeField("totalMemory", profilingEvent.getTotalMemory()); + writeField("freeMemory", profilingEvent.getFreeMemory()); + writeField("transmittedBytes", profilingEvent.getTransmittedBytes()); + } + } } From 9b7fddfa62b55a3c832437d964d9d2afc280e27d Mon Sep 17 00:00:00 2001 From: Sebastian Kruse Date: Thu, 28 Aug 2014 13:28:38 +0200 Subject: [PATCH 3/7] [FLINK-964] generalize resource usage JS scripts, style charts --- .../js/jquery.flot.stack.min.js | 7 + .../js/resourceUsageFrontend.js | 234 ++++++++++++------ .../web-docs-infoserver/resource-usage.html | 1 + .../runtime/jobmanager/web/MenuServlet.java | 2 +- .../jobmanager/web/ResourceUsageServlet.java | 8 + 5 files changed, 182 insertions(+), 70 deletions(-) create mode 100644 flink-runtime/resources/web-docs-infoserver/js/jquery.flot.stack.min.js diff --git a/flink-runtime/resources/web-docs-infoserver/js/jquery.flot.stack.min.js b/flink-runtime/resources/web-docs-infoserver/js/jquery.flot.stack.min.js new file mode 100644 index 0000000000000..920764f5e7c62 --- /dev/null +++ b/flink-runtime/resources/web-docs-infoserver/js/jquery.flot.stack.min.js @@ -0,0 +1,7 @@ +/* Javascript plotting library for jQuery, version 0.8.3. + +Copyright (c) 2007-2014 IOLA and Ole Laursen. +Licensed under the MIT license. + +*/ +(function($){var options={series:{stack:null}};function init(plot){function findMatchingSeries(s,allseries){var res=null;for(var i=0;i2&&(horizontal?datapoints.format[2].x:datapoints.format[2].y),withsteps=withlines&&s.lines.steps,fromgap=true,keyOffset=horizontal?1:0,accumulateOffset=horizontal?0:1,i=0,j=0,l,m;while(true){if(i>=points.length)break;l=newpoints.length;if(points[i]==null){for(m=0;m=otherpoints.length){if(!withlines){for(m=0;mqx){if(withlines&&i>0&&points[i-ps]!=null){intery=py+(points[i-ps+accumulateOffset]-py)*(qx-px)/(points[i-ps+keyOffset]-px);newpoints.push(qx);newpoints.push(intery+qy);for(m=2;m0&&otherpoints[j-otherps]!=null)bottom=qy+(otherpoints[j-otherps+accumulateOffset]-qy)*(px-qx)/(otherpoints[j-otherps+keyOffset]-qx);newpoints[l+accumulateOffset]+=bottom;i+=ps}fromgap=false;if(l!=newpoints.length&&withbottom)newpoints[l+2]+=bottom}if(withsteps&&l!=newpoints.length&&l>0&&newpoints[l]!=null&&newpoints[l]!=newpoints[l-ps]&&newpoints[l+1]!=newpoints[l-ps+1]){for(m=0;m this.lastTimestamp) { + this.lastTimestamp = profilingEvent.timestamp; + this.flotData.data.push([ profilingEvent.timestamp, this.evaluate(profilingEvent) ]); + } + } + + this.cutOffBefore = function(minTimestamp) { + while (this.flotData.data.length > 0 && this.flotData.data[0][0] < minTimestamp) { + this.flotData.data.shift(); + } } } -var networkMemoryFlotOptions = { +function ProfilingGroup(name, options, eventType, plotId, windowSize) { + + this.name = name; + this.options = options; + this.eventType = eventType; + this.plotId = plotId; + this.series = []; + this.windowSize = windowSize; + + this.updateWith = function(event) { + if (event.type == this.eventType) { + var lastTimestamp = -1; + + // Update all contained series with the event. + for ( var i in this.series) { + var ser = this.series[i]; + ser.updateWith(event); + if (ser.lastTimestamp != undefined) { + lastTimestamp = Math.max(lastTimestamp, ser.lastTimestamp); + } + } + + // Cut off too old data. + if (lastTimestamp != -1 && this.windowSize >= 0) { + var minTimestamp = lastTimestamp - this.windowSize; + for ( var i in this.series) { + var ser = this.series[i]; + ser.cutOffBefore(minTimestamp); + } + } + } + + }; + + this.plot = function() { + var flotData = this.series.map(accessProperty("flotData")); + $.plot(this.plotId, flotData, this.options); + }; +} + +/** Container for the different profiling groups. */ +var profilingGroups = []; + +$(function initResourceUsage() { + var windowSize = 30*1000; // 30 sec + + // CPU profiling + var cpuFlotOptions = { + xaxis : { + mode : "time", + timeformat : "%H:%M:%S" + }, + yaxis : { + min : 0, + tickFormatter : function(val, axis) { + return val.toFixed(axis.tickDecimals) + " %"; + } + }, + series : { + lines : { + show : true, + fill : 0.4 + }, + stack : true + } + + }; + var cpuGroup = new ProfilingGroup("CPU", cpuFlotOptions, "InstanceSummaryProfilingEvent", "#cpuChart", windowSize); + // add series in reverse stacking order + cpuGroup.series.push(new ProfilingSeries("Interrupts", "#4682B4", function(e) { + return e.softIrqCpu + e.hardIrqCpu + })); + cpuGroup.series.push(new ProfilingSeries("I/O Wait", "#FFFF00", accessProperty("ioWaitCpu"))); + cpuGroup.series.push(new ProfilingSeries("System CPU", "#8B0000", accessProperty("systemCpu"))); + cpuGroup.series.push(new ProfilingSeries("User CPU", "#006400", accessProperty("userCpu"))); + + profilingGroups.push(cpuGroup); + + // Memory profiling + var mainMemoryFlotOptions = { xaxis : { mode : "time", timeformat : "%H:%M:%S", }, yaxis : { min : 0, - tickFormatter : makeMemorySizeFormatter(1) + // /proc/meminfo delivers kiB + tickFormatter : makeMemorySizeFormatter(1024) + }, + series : { + lines : { + show : true, + fill : 0.4 + }, + stack : true } - } + }; + var memGroup = new ProfilingGroup("Memory", mainMemoryFlotOptions, "InstanceSummaryProfilingEvent", "#memoryChart", windowSize); + memGroup.series.push(new ProfilingSeries("Used memory", "#FF8C00", function(e) { + return e.totalMemory - e.freeMemory; + })); + memGroup.series.push(new ProfilingSeries("Free memory", "#6495ED", accessProperty("freeMemory"))); + profilingGroups.push(memGroup); -var cpuFlotOptions = { - xaxis : { - mode : "time", - timeformat : "%H:%M:%S", - }, - yaxis : { - min : 0, - tickFormatter : function(val, axis) { - return val.toFixed(axis.tickDecimals) + " %" + // Network profiling + var networkMemoryFlotOptions = { + xaxis : { + mode : "time", + timeformat : "%H:%M:%S", + }, + yaxis : { + min : 0, + tickFormatter : makeMemorySizeFormatter(1) + }, + series : { + lines : { + show : true, + fill : 0.4 + } } - } -} + }; + var networkGroup = new ProfilingGroup("Network", networkMemoryFlotOptions, "InstanceSummaryProfilingEvent", "#networkChart", windowSize); + networkGroup.series.push(new ProfilingSeries("Transmitted", "#9370DB", accessProperty("transmittedBytes"))); + networkGroup.series.push(new ProfilingSeries("Received", "#BBBB00", accessProperty("receivedBytes"))); + profilingGroups.push(networkGroup) +}); + +// Logic for displaying the profiling groups /** * Polls profiling events and updates the plot. @@ -104,42 +215,27 @@ function pollResourceUsage() { }); }; -/** Updates a profiling series with the given profiling event. */ -function updateProfilingSeries(series, profilingEvent, property) { - if (profilingEvent.timestamp > series.lastTimestamp) { - series.lastTimestamp = profilingEvent.timestamp; - series.flotData.data.push([ profilingEvent.timestamp, - profilingEvent[property] ]); - } -} - /** Updates all the profiling series from an array of profiling events. */ function updateData(serverResponse) { for ( var eventIndex in serverResponse) { var profilingEvent = serverResponse[eventIndex]; - console.log(profilingEvent); - console.log(profilingEvent.type); - if (profilingEvent.type == "InstanceSummaryProfilingEvent") { - updateProfilingSeries(collectedProfilingData.userCpu, - profilingEvent, "userCpu"); - updateProfilingSeries(collectedProfilingData.transmittedBytes, - profilingEvent, "transmittedBytes"); - updateProfilingSeries(collectedProfilingData.freeMemory, - profilingEvent, "freeMemory"); + + for ( var groupIndex in profilingGroups) { + var profilingGroup = profilingGroups[groupIndex]; + profilingGroup.updateWith(profilingEvent); } } -} +}; +/** Plots all profiling groups. */ function plotData() { - $.plot("#cpuChart", [ collectedProfilingData.userCpu.flotData ], - cpuFlotOptions); - $.plot("#memoryChart", [ collectedProfilingData.freeMemory.flotData ], - mainMemoryFlotOptions); - $.plot("#networkChart", - [ collectedProfilingData.transmittedBytes.flotData ], networkMemoryFlotOptions); -} + for ( var groupIndex in profilingGroups) { + var profilingGroup = profilingGroups[groupIndex]; + profilingGroup.plot(); + } +}; $(function startPolling() { pollResourceUsage(); setInterval(pollResourceUsage, 1000); -}) +}); diff --git a/flink-runtime/resources/web-docs-infoserver/resource-usage.html b/flink-runtime/resources/web-docs-infoserver/resource-usage.html index 64e41de98bdf3..f4d5268f38f5e 100644 --- a/flink-runtime/resources/web-docs-infoserver/resource-usage.html +++ b/flink-runtime/resources/web-docs-infoserver/resource-usage.html @@ -25,6 +25,7 @@ + diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/MenuServlet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/MenuServlet.java index 57d2cf0ddce8c..822f04f487a22 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/MenuServlet.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/MenuServlet.java @@ -49,7 +49,7 @@ public class MenuServlet extends HttpServlet { * Array of possible menu entries on the left */ private static final String[] entries = { - "index", "history", "configuration", "taskmanagers", "resourceUsage" + "index", "history", "configuration", "taskmanagers", "resource-usage" }; /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/ResourceUsageServlet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/ResourceUsageServlet.java index 4a18f723317a4..aa43d7d825fc6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/ResourceUsageServlet.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/ResourceUsageServlet.java @@ -208,9 +208,17 @@ private static class InstanceSummaryProfilingEventSerializer extends protected void writeFields(InstanceSummaryProfilingEvent profilingEvent) { super.writeFields(profilingEvent); writeField("userCpu", profilingEvent.getUserCPU()); + writeField("systemCpu", profilingEvent.getSystemCPU()); + writeField("ioWaitCpu", profilingEvent.getIOWaitCPU()); + writeField("softIrqCpu", profilingEvent.getSoftIrqCPU()); + writeField("hardIrqCpu", profilingEvent.getHardIrqCPU()); writeField("totalMemory", profilingEvent.getTotalMemory()); writeField("freeMemory", profilingEvent.getFreeMemory()); + writeField("bufferedMemory", profilingEvent.getBufferedMemory()); + writeField("cachedMemory", profilingEvent.getCachedMemory()); + writeField("cachedSwapMemory", profilingEvent.getCachedSwapMemory()); writeField("transmittedBytes", profilingEvent.getTransmittedBytes()); + writeField("receivedBytes", profilingEvent.getReceivedBytes()); } } From a57ea06c0b650dfa81e239c716e3f349aeef2b88 Mon Sep 17 00:00:00 2001 From: Sebastian Kruse Date: Thu, 28 Aug 2014 14:31:47 +0200 Subject: [PATCH 4/7] [FLINK-964] tear apart core plotting code and configuration --- .../web-docs-infoserver/js/res-usage.js | 167 ++++++++++++ .../js/resourceUsageFrontend.js | 241 ------------------ .../web-docs-infoserver/resource-usage.html | 93 ++++++- .../jobmanager/web/ResourceUsageServlet.java | 77 +++--- 4 files changed, 289 insertions(+), 289 deletions(-) create mode 100644 flink-runtime/resources/web-docs-infoserver/js/res-usage.js delete mode 100644 flink-runtime/resources/web-docs-infoserver/js/resourceUsageFrontend.js diff --git a/flink-runtime/resources/web-docs-infoserver/js/res-usage.js b/flink-runtime/resources/web-docs-infoserver/js/res-usage.js new file mode 100644 index 0000000000000..64e34c991780d --- /dev/null +++ b/flink-runtime/resources/web-docs-infoserver/js/res-usage.js @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Declare global object as hook. +var flinkRU = {}; + +(function() { + + /** Container for the different profiling groups. */ + flinkRU.profilingGroups = []; + + // Helper functions + flinkRU.helpers = { + + /** Creates a function that reads out the given property of events. */ + accessProperty : function(property) { + return function(event) { + return event[property]; + }; + }, + + /** Options to be used with flot. */ + makeMemorySizeFormatter : function(baseScale) { + return function(val, axis) { + val *= baseScale + if (val > 1000000) + return (val / 1000000).toFixed(axis.tickDecimals) + " MB"; + else if (val > 1000) + return (val / 1000).toFixed(axis.tickDecimals) + " kB"; + else + return val.toFixed(axis.tickDecimals) + " B"; + } + } + } + + // Configuring the profiling data to be displayed + + /** + * Template for profiling data. + */ + flinkRU.ProfilingSeries = function(label, number, evaluationFunction) { + + this.lastTimestamp = 0; + + this.flotData = { + color : number, + label : label, + data : [] + }; + + this.evaluate = evaluationFunction; + + /** Updates a profiling series with the given profiling event. */ + this.updateWith = function(profilingEvent, property) { + if (profilingEvent.timestamp > this.lastTimestamp) { + this.lastTimestamp = profilingEvent.timestamp; + this.flotData.data.push([ profilingEvent.timestamp, this.evaluate(profilingEvent) ]); + } + } + + this.cutOffBefore = function(minTimestamp) { + while (this.flotData.data.length > 0 && this.flotData.data[0][0] < minTimestamp) { + this.flotData.data.shift(); + } + } + } + + /** Template for a set of profiling series to be plotted together. */ + flinkRU.ProfilingGroup = function(name, options, eventType, plotId, windowSize) { + + this.name = name; + this.options = options; + this.eventType = eventType; + this.plotId = plotId; + this.series = []; + this.windowSize = windowSize; + + this.updateWith = function(event) { + if (event.type == this.eventType) { + var lastTimestamp = -1; + + // Update all contained series with the event. + for ( var i in this.series) { + var ser = this.series[i]; + ser.updateWith(event); + if (ser.lastTimestamp != undefined) { + lastTimestamp = Math.max(lastTimestamp, ser.lastTimestamp); + } + } + + // Cut off too old data. + if (lastTimestamp != -1 && this.windowSize >= 0) { + var minTimestamp = lastTimestamp - this.windowSize; + for ( var i in this.series) { + var ser = this.series[i]; + ser.cutOffBefore(minTimestamp); + } + } + } + + }; + + this.plot = function() { + var flotData = this.series.map(flinkRU.helpers.accessProperty("flotData")); + $.plot(this.plotId, flotData, this.options); + }; + } + + + // Logic for displaying the profiling groups + + /** + * Polls profiling events and updates the plot. + */ + function pollResourceUsage() { + $.ajax({ + url : "resourceUsage", + cache : false, + type : "GET", + success : function(serverResponse) { + updateData(serverResponse); + plotData(); + }, + dataType : "json", + }); + } + + /** Updates all the profiling series from an array of profiling events. */ + function updateData(serverResponse) { + for ( var eventIndex in serverResponse) { + var profilingEvent = serverResponse[eventIndex]; + + for ( var groupIndex in flinkRU.profilingGroups) { + var profilingGroup = flinkRU.profilingGroups[groupIndex]; + profilingGroup.updateWith(profilingEvent); + } + } + } + + /** Plots all profiling groups. */ + function plotData() { + for ( var groupIndex in flinkRU.profilingGroups) { + var profilingGroup = flinkRU.profilingGroups[groupIndex]; + profilingGroup.plot(); + } + } + + flinkRU.startPolling = function (interval) { + pollResourceUsage(); + setInterval(pollResourceUsage, interval); + }; +})(); diff --git a/flink-runtime/resources/web-docs-infoserver/js/resourceUsageFrontend.js b/flink-runtime/resources/web-docs-infoserver/js/resourceUsageFrontend.js deleted file mode 100644 index e48d2d70e85c6..0000000000000 --- a/flink-runtime/resources/web-docs-infoserver/js/resourceUsageFrontend.js +++ /dev/null @@ -1,241 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -// Helper functions -/** Creates a function that reads out the given property of events. */ -function accessProperty(property) { - return function(event) { - return event[property]; - }; -} - -/** Options to be used with flot. */ -function makeMemorySizeFormatter(baseScale) { - return function(val, axis) { - val *= baseScale - if (val > 1000000) - return (val / 1000000).toFixed(axis.tickDecimals) + " MB"; - else if (val > 1000) - return (val / 1000).toFixed(axis.tickDecimals) + " kB"; - else - return val.toFixed(axis.tickDecimals) + " B"; - } -} - -// Configuring the profiling data to be displayed - -/** - * Template for profiling data. - */ -function ProfilingSeries(label, number, evaluationFunction) { - - this.lastTimestamp = 0; - - this.flotData = { - color : number, - label : label, - data : [] - }; - - this.evaluate = evaluationFunction; - - /** Updates a profiling series with the given profiling event. */ - this.updateWith = function(profilingEvent, property) { - if (profilingEvent.timestamp > this.lastTimestamp) { - this.lastTimestamp = profilingEvent.timestamp; - this.flotData.data.push([ profilingEvent.timestamp, this.evaluate(profilingEvent) ]); - } - } - - this.cutOffBefore = function(minTimestamp) { - while (this.flotData.data.length > 0 && this.flotData.data[0][0] < minTimestamp) { - this.flotData.data.shift(); - } - } -} - -function ProfilingGroup(name, options, eventType, plotId, windowSize) { - - this.name = name; - this.options = options; - this.eventType = eventType; - this.plotId = plotId; - this.series = []; - this.windowSize = windowSize; - - this.updateWith = function(event) { - if (event.type == this.eventType) { - var lastTimestamp = -1; - - // Update all contained series with the event. - for ( var i in this.series) { - var ser = this.series[i]; - ser.updateWith(event); - if (ser.lastTimestamp != undefined) { - lastTimestamp = Math.max(lastTimestamp, ser.lastTimestamp); - } - } - - // Cut off too old data. - if (lastTimestamp != -1 && this.windowSize >= 0) { - var minTimestamp = lastTimestamp - this.windowSize; - for ( var i in this.series) { - var ser = this.series[i]; - ser.cutOffBefore(minTimestamp); - } - } - } - - }; - - this.plot = function() { - var flotData = this.series.map(accessProperty("flotData")); - $.plot(this.plotId, flotData, this.options); - }; -} - -/** Container for the different profiling groups. */ -var profilingGroups = []; - -$(function initResourceUsage() { - var windowSize = 30*1000; // 30 sec - - // CPU profiling - var cpuFlotOptions = { - xaxis : { - mode : "time", - timeformat : "%H:%M:%S" - }, - yaxis : { - min : 0, - tickFormatter : function(val, axis) { - return val.toFixed(axis.tickDecimals) + " %"; - } - }, - series : { - lines : { - show : true, - fill : 0.4 - }, - stack : true - } - - }; - var cpuGroup = new ProfilingGroup("CPU", cpuFlotOptions, "InstanceSummaryProfilingEvent", "#cpuChart", windowSize); - // add series in reverse stacking order - cpuGroup.series.push(new ProfilingSeries("Interrupts", "#4682B4", function(e) { - return e.softIrqCpu + e.hardIrqCpu - })); - cpuGroup.series.push(new ProfilingSeries("I/O Wait", "#FFFF00", accessProperty("ioWaitCpu"))); - cpuGroup.series.push(new ProfilingSeries("System CPU", "#8B0000", accessProperty("systemCpu"))); - cpuGroup.series.push(new ProfilingSeries("User CPU", "#006400", accessProperty("userCpu"))); - - profilingGroups.push(cpuGroup); - - // Memory profiling - var mainMemoryFlotOptions = { - xaxis : { - mode : "time", - timeformat : "%H:%M:%S", - }, - yaxis : { - min : 0, - // /proc/meminfo delivers kiB - tickFormatter : makeMemorySizeFormatter(1024) - }, - series : { - lines : { - show : true, - fill : 0.4 - }, - stack : true - } - }; - var memGroup = new ProfilingGroup("Memory", mainMemoryFlotOptions, "InstanceSummaryProfilingEvent", "#memoryChart", windowSize); - memGroup.series.push(new ProfilingSeries("Used memory", "#FF8C00", function(e) { - return e.totalMemory - e.freeMemory; - })); - memGroup.series.push(new ProfilingSeries("Free memory", "#6495ED", accessProperty("freeMemory"))); - profilingGroups.push(memGroup); - - // Network profiling - var networkMemoryFlotOptions = { - xaxis : { - mode : "time", - timeformat : "%H:%M:%S", - }, - yaxis : { - min : 0, - tickFormatter : makeMemorySizeFormatter(1) - }, - series : { - lines : { - show : true, - fill : 0.4 - } - } - }; - - var networkGroup = new ProfilingGroup("Network", networkMemoryFlotOptions, "InstanceSummaryProfilingEvent", "#networkChart", windowSize); - networkGroup.series.push(new ProfilingSeries("Transmitted", "#9370DB", accessProperty("transmittedBytes"))); - networkGroup.series.push(new ProfilingSeries("Received", "#BBBB00", accessProperty("receivedBytes"))); - profilingGroups.push(networkGroup) -}); - -// Logic for displaying the profiling groups - -/** - * Polls profiling events and updates the plot. - */ -function pollResourceUsage() { - $.ajax({ - url : "resourceUsage", - cache : false, - type : "GET", - success : function(serverResponse) { - updateData(serverResponse); - plotData(); - }, - dataType : "json", - }); -}; - -/** Updates all the profiling series from an array of profiling events. */ -function updateData(serverResponse) { - for ( var eventIndex in serverResponse) { - var profilingEvent = serverResponse[eventIndex]; - - for ( var groupIndex in profilingGroups) { - var profilingGroup = profilingGroups[groupIndex]; - profilingGroup.updateWith(profilingEvent); - } - } -}; - -/** Plots all profiling groups. */ -function plotData() { - for ( var groupIndex in profilingGroups) { - var profilingGroup = profilingGroups[groupIndex]; - profilingGroup.plot(); - } -}; - -$(function startPolling() { - pollResourceUsage(); - setInterval(pollResourceUsage, 1000); -}); diff --git a/flink-runtime/resources/web-docs-infoserver/resource-usage.html b/flink-runtime/resources/web-docs-infoserver/resource-usage.html index f4d5268f38f5e..561ed20b41ee4 100644 --- a/flink-runtime/resources/web-docs-infoserver/resource-usage.html +++ b/flink-runtime/resources/web-docs-infoserver/resource-usage.html @@ -28,7 +28,7 @@ - + @@ -46,6 +46,97 @@ }); }); + + + diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/ResourceUsageServlet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/ResourceUsageServlet.java index aa43d7d825fc6..dda6a79731a4e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/ResourceUsageServlet.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/ResourceUsageServlet.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.io.PrintWriter; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -35,8 +36,10 @@ import org.apache.flink.runtime.event.job.RecentJobEvent; import org.apache.flink.runtime.jobgraph.JobID; import org.apache.flink.runtime.jobmanager.JobManager; +import org.apache.flink.runtime.profiling.types.InstanceProfilingEvent; import org.apache.flink.runtime.profiling.types.InstanceSummaryProfilingEvent; import org.apache.flink.runtime.profiling.types.ProfilingEvent; +import org.apache.flink.runtime.profiling.types.SingleInstanceProfilingEvent; import org.apache.flink.util.StringUtils; public class ResourceUsageServlet extends HttpServlet { @@ -46,8 +49,7 @@ public class ResourceUsageServlet extends HttpServlet { /** * The log for this class. */ - private static final Log LOG = LogFactory - .getLog(ResourceUsageServlet.class); + private static final Log LOG = LogFactory.getLog(ResourceUsageServlet.class); private final Map, ProfilingEventSerializer> jsonSerializers = new HashMap, ResourceUsageServlet.ProfilingEventSerializer>(); @@ -56,30 +58,21 @@ public class ResourceUsageServlet extends HttpServlet { public ResourceUsageServlet(JobManager jobManager) { this.jobManager = jobManager; - this.jsonSerializers.put(InstanceSummaryProfilingEvent.class, - new InstanceSummaryProfilingEventSerializer()); + this.jsonSerializers.put(InstanceSummaryProfilingEvent.class, new InstanceSummaryProfilingEventSerializer()); + this.jsonSerializers.put(SingleInstanceProfilingEvent.class, new SingleInstanceProfilingEventSerializer()); } @Override - protected void doGet(HttpServletRequest req, HttpServletResponse resp) - throws ServletException, IOException { + protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { try { JobID jobID = getJobID(req); - if (jobID == null) { - resp.setStatus(HttpServletResponse.SC_OK); - resp.setContentType("application/json"); -// resp.getWriter().write("[{type:\"dummy\",timestamp:123456789,jobid:\"abcdef0123456789\"}]"); -// resp.getWriter().write(this.jobManager.getRecentJobs().toString()); - resp.getWriter().write("[]"); - return; - } + List allJobEvents = jobID == null ? Collections. emptyList() : this.jobManager.getEvents(jobID); resp.setStatus(HttpServletResponse.SC_OK); resp.setContentType("application/json"); resp.getWriter().write("["); - - List allJobEvents = this.jobManager.getEvents(jobID); + String separator = ""; for (AbstractEvent jobEvent : allJobEvents) { if (jobEvent instanceof ProfilingEvent) { @@ -90,6 +83,7 @@ protected void doGet(HttpServletRequest req, HttpServletResponse resp) jsonSerializer.write(profilingEvent, resp.getWriter()); separator = ","; } else { + // This is not necessary and only useful to see what events are actually available in the frontend. resp.getWriter().write(separator); new ProfilingEventSerializer().write(profilingEvent, resp.getWriter()); separator = ","; @@ -97,24 +91,8 @@ protected void doGet(HttpServletRequest req, HttpServletResponse resp) } } - - resp.getWriter().write("]"); -// resp.setStatus(HttpServletResponse.SC_OK); -// resp.setContentType("text/html"); -// PrintWriter writer = resp.getWriter(); -// writer.write("

Profiling events

"); -// writer.write("
    "); -// for (ProfilingEvent profilingEvent : profilingEvents) { -// writer.write("
  1. "); -// writer.write(profilingEvent.getJobID().toString()); -// writer.write(" - "); -// writer.write(new Date(profilingEvent.getTimestamp()).toString()); -// writer.write(" - "); -// writer.write(profilingEvent.toString()); -// writer.write("
  2. "); -// } -// writer.write("
"); + resp.getWriter().write("]"); } catch (Exception e) { resp.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); @@ -127,15 +105,14 @@ protected void doGet(HttpServletRequest req, HttpServletResponse resp) } @SuppressWarnings("unchecked") - private ProfilingEventSerializer getSerializer(ProfilingEvent profilingEvent) { + private ProfilingEventSerializer getSerializer(ProfilingEvent profilingEvent) { return (ProfilingEventSerializer) this.jsonSerializers.get(profilingEvent.getClass()); } - + /** Loads the job ID from the request or selects the latest submitted job. */ private JobID getJobID(HttpServletRequest req) throws IOException { String jobIdParameter = req.getParameter("jobid"); - JobID jobID = jobIdParameter == null ? loadLatestJobID() : JobID - .fromHexString(jobIdParameter); + JobID jobID = jobIdParameter == null ? loadLatestJobID() : JobID.fromHexString(jobIdParameter); return jobID; } @@ -148,14 +125,11 @@ private JobID loadLatestJobID() throws IOException { List recentJobEvents = this.jobManager.getRecentJobs(); RecentJobEvent mostRecentJobEvent = null; for (RecentJobEvent jobEvent : recentJobEvents) { - if (mostRecentJobEvent == null - || mostRecentJobEvent.getSubmissionTimestamp() < jobEvent - .getSubmissionTimestamp()) { + if (mostRecentJobEvent == null || mostRecentJobEvent.getSubmissionTimestamp() < jobEvent.getSubmissionTimestamp()) { mostRecentJobEvent = jobEvent; } } - return mostRecentJobEvent == null ? null : mostRecentJobEvent - .getJobID(); + return mostRecentJobEvent == null ? null : mostRecentJobEvent.getJobID(); } private static class ProfilingEventSerializer { @@ -188,8 +162,7 @@ protected void writeFields(T profilingEvent) { writeField("timestamp", profilingEvent.getTimestamp()); } - public synchronized void write(T profilingEvent, PrintWriter writer) - throws IOException { + public synchronized void write(T profilingEvent, PrintWriter writer) throws IOException { this.writer = writer; // cache the writer for convenience -- we are // synchronized here this.writer.write("{"); @@ -201,11 +174,10 @@ public synchronized void write(T profilingEvent, PrintWriter writer) } - private static class InstanceSummaryProfilingEventSerializer extends - ProfilingEventSerializer { + private static class InstanceProfilingEventSerializer extends ProfilingEventSerializer { @Override - protected void writeFields(InstanceSummaryProfilingEvent profilingEvent) { + protected void writeFields(T profilingEvent) { super.writeFields(profilingEvent); writeField("userCpu", profilingEvent.getUserCPU()); writeField("systemCpu", profilingEvent.getSystemCPU()); @@ -220,7 +192,18 @@ protected void writeFields(InstanceSummaryProfilingEvent profilingEvent) { writeField("transmittedBytes", profilingEvent.getTransmittedBytes()); writeField("receivedBytes", profilingEvent.getReceivedBytes()); } + } + private static class InstanceSummaryProfilingEventSerializer extends InstanceProfilingEventSerializer { + } + + private static class SingleInstanceProfilingEventSerializer extends InstanceProfilingEventSerializer { + + @Override + protected void writeFields(SingleInstanceProfilingEvent profilingEvent) { + super.writeFields(profilingEvent); + writeField("instanceName", profilingEvent.getInstanceName()); + } } } From 59847f4b998b0527b2e0747fec17c9897bfe25f1 Mon Sep 17 00:00:00 2001 From: Sebastian Kruse Date: Thu, 28 Aug 2014 18:00:20 +0200 Subject: [PATCH 5/7] [FLINK-964] add support for SingleInstanceProfilingEvents (2 versions) --- .../org/apache/flink/util/StringUtils.java | 18 ++ .../web-docs-infoserver/js/res-usage.js | 21 +- .../resource-usage-detailed.html | 260 ++++++++++++++++++ .../resource-usage-detailed2.html | 257 +++++++++++++++++ .../jobmanager/web/JobmanagerInfoServlet.java | 6 + .../runtime/jobmanager/web/MenuServlet.java | 6 +- 6 files changed, 557 insertions(+), 11 deletions(-) create mode 100644 flink-runtime/resources/web-docs-infoserver/resource-usage-detailed.html create mode 100644 flink-runtime/resources/web-docs-infoserver/resource-usage-detailed2.html diff --git a/flink-core/src/main/java/org/apache/flink/util/StringUtils.java b/flink-core/src/main/java/org/apache/flink/util/StringUtils.java index 2699c9e2e7a1f..1a090e7ec39e1 100644 --- a/flink-core/src/main/java/org/apache/flink/util/StringUtils.java +++ b/flink-core/src/main/java/org/apache/flink/util/StringUtils.java @@ -28,6 +28,7 @@ import java.io.PrintWriter; import java.io.StringWriter; import java.util.Arrays; +import java.util.Collection; import java.util.Random; /** @@ -302,4 +303,21 @@ public static String getRandomString(Random rnd, int minLength, int maxLength, c } return new String(data); } + + /** + * Concatenates all the given objects and places the given separator in between them. + * + * @param objects is a set of objects or null values to concatenate + * @param separator is the separator to place between two objects + * @return the concatenated {@link String} of all objects + */ + public static String join(Collection objects, String separator) { + StringBuilder sb = new StringBuilder(); + String actualSeparator = ""; + for (Object o : objects) { + sb.append(actualSeparator).append(String.valueOf(o)); + actualSeparator = separator; + } + return sb.toString(); + } } diff --git a/flink-runtime/resources/web-docs-infoserver/js/res-usage.js b/flink-runtime/resources/web-docs-infoserver/js/res-usage.js index 64e34c991780d..3898c55531b33 100644 --- a/flink-runtime/resources/web-docs-infoserver/js/res-usage.js +++ b/flink-runtime/resources/web-docs-infoserver/js/res-usage.js @@ -23,7 +23,7 @@ var flinkRU = {}; /** Container for the different profiling groups. */ flinkRU.profilingGroups = []; - + // Helper functions flinkRU.helpers = { @@ -45,6 +45,12 @@ var flinkRU = {}; else return val.toFixed(axis.tickDecimals) + " B"; } + }, + + makeInstanceFilter : function(instance) { + return function(event) { + return event.instanceName == instance; + } } } @@ -53,7 +59,7 @@ var flinkRU = {}; /** * Template for profiling data. */ - flinkRU.ProfilingSeries = function(label, number, evaluationFunction) { + flinkRU.ProfilingSeries = function(label, number, evaluationFunction, additionalFilter) { this.lastTimestamp = 0; @@ -67,7 +73,7 @@ var flinkRU = {}; /** Updates a profiling series with the given profiling event. */ this.updateWith = function(profilingEvent, property) { - if (profilingEvent.timestamp > this.lastTimestamp) { + if (profilingEvent.timestamp > this.lastTimestamp && (additionalFilter == undefined || additionalFilter(profilingEvent))) { this.lastTimestamp = profilingEvent.timestamp; this.flotData.data.push([ profilingEvent.timestamp, this.evaluate(profilingEvent) ]); } @@ -81,7 +87,7 @@ var flinkRU = {}; } /** Template for a set of profiling series to be plotted together. */ - flinkRU.ProfilingGroup = function(name, options, eventType, plotId, windowSize) { + flinkRU.ProfilingGroup = function(name, options, eventType, plotId, windowSize, additionalFilter) { this.name = name; this.options = options; @@ -91,7 +97,7 @@ var flinkRU = {}; this.windowSize = windowSize; this.updateWith = function(event) { - if (event.type == this.eventType) { + if (event.type == this.eventType && (additionalFilter == undefined || additionalFilter(event))) { var lastTimestamp = -1; // Update all contained series with the event. @@ -121,7 +127,6 @@ var flinkRU = {}; }; } - // Logic for displaying the profiling groups /** @@ -136,7 +141,7 @@ var flinkRU = {}; updateData(serverResponse); plotData(); }, - dataType : "json", + dataType : "json" }); } @@ -160,7 +165,7 @@ var flinkRU = {}; } } - flinkRU.startPolling = function (interval) { + flinkRU.startPolling = function(interval) { pollResourceUsage(); setInterval(pollResourceUsage, interval); }; diff --git a/flink-runtime/resources/web-docs-infoserver/resource-usage-detailed.html b/flink-runtime/resources/web-docs-infoserver/resource-usage-detailed.html new file mode 100644 index 0000000000000..215edcdb392f6 --- /dev/null +++ b/flink-runtime/resources/web-docs-infoserver/resource-usage-detailed.html @@ -0,0 +1,260 @@ + + + + + + + + +Dashboard - Flink + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
+ + + + +
+ +
+
+

+ Monitor Overview on resource usage +

+ +
+
+
+ +
+ + +
+ + +
+ + + + diff --git a/flink-runtime/resources/web-docs-infoserver/resource-usage-detailed2.html b/flink-runtime/resources/web-docs-infoserver/resource-usage-detailed2.html new file mode 100644 index 0000000000000..6c0e2fda509c8 --- /dev/null +++ b/flink-runtime/resources/web-docs-infoserver/resource-usage-detailed2.html @@ -0,0 +1,257 @@ + + + + + + + + +Dashboard - Flink + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
+ + + + +
+ +
+
+

+ Monitor Overview on resource usage +

+ +
+ +
+
+
+

+ CPU +

+
+
+
+
+
+
+
+

+ Memory +

+
+
+
+
+
+
+
+

+ Network +

+
+
+
+
+
+
+
+

+ Legend +

+
+
+
+
+
+
+
+ + +
+ + +
+ + + + diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java index 91d211083696f..3d4699affc15f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java @@ -94,6 +94,12 @@ else if("groupvertex".equals(req.getParameter("get"))) { else if("taskmanagers".equals(req.getParameter("get"))) { resp.getWriter().write("{\"taskmanagers\": " + jobmanager.getNumberOfTaskManagers() +", \"slots\": "+jobmanager.getAvailableSlots()+"}"); } + else if("instances".equals(req.getParameter("get"))) { + String enclosingQuotes = jobmanager.getInstances().isEmpty() ? "" : "\""; + resp.getWriter().write( + "{\"instances\": [" + enclosingQuotes + StringUtils.join(jobmanager.getInstances().keySet(), "\",\"") + + enclosingQuotes + "]}"); + } else if("cancel".equals(req.getParameter("get"))) { String jobId = req.getParameter("job"); jobmanager.cancelJob(JobID.fromHexString(jobId)); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/MenuServlet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/MenuServlet.java index 822f04f487a22..a16b698d40978 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/MenuServlet.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/MenuServlet.java @@ -49,21 +49,21 @@ public class MenuServlet extends HttpServlet { * Array of possible menu entries on the left */ private static final String[] entries = { - "index", "history", "configuration", "taskmanagers", "resource-usage" + "index", "history", "configuration", "taskmanagers", "resource-usage", "resource-usage-detailed", "resource-usage-detailed2" }; /** * The names of the menu entries shown in the browser */ private static final String[] names = { - "Dashboard", "History", "Configuration", "Task Managers", "Resource Usage" + "Dashboard", "History", "Configuration", "Task Managers", "Resource Usage", "Detailed Resource Usage", "Detailed Resource Usage II" }; /** * The classes of the icons shown next to the names in the browser */ private static final String[] classes = { - "fa fa-dashboard", "fa fa-archive", "fa fa-wrench", "fa fa-building-o", "fa fa-bar-chart-o" + "fa fa-dashboard", "fa fa-archive", "fa fa-wrench", "fa fa-building-o", "fa fa-bar-chart-o", "fa fa-bar-chart-o", "fa fa-bar-chart-o" }; public MenuServlet() { From 9f8457a6197a3b6c2233003777d91513ee021a4a Mon Sep 17 00:00:00 2001 From: Sebastian Kruse Date: Wed, 3 Sep 2014 14:54:12 +0200 Subject: [PATCH 6/7] [FLINK-964] Clean-up prototype * move summary profiling to the dashboard * keep only one detailed resource usage page (the other page still exists, though) * allow to modify the displayed time window in the flink-conf.yaml --- .../src/main/flink-bin/conf/flink-conf.yaml | 6 + .../src/main/flink-bin/conf/flink-conf.yaml~ | 83 ++++ .../resources/web-docs-infoserver/index.html | 354 +++++++++++++----- .../web-docs-infoserver/js/res-usage.js | 24 +- .../resource-usage-detailed.html | 49 ++- .../resource-usage-detailed2.html | 8 +- .../runtime/jobmanager/web/MenuServlet.java | 6 +- .../jobmanager/web/ResourceUsageServlet.java | 68 +++- .../runtime/jobmanager/web/WebInfoServer.java | 1 - 9 files changed, 466 insertions(+), 133 deletions(-) create mode 100644 flink-dist/src/main/flink-bin/conf/flink-conf.yaml~ diff --git a/flink-dist/src/main/flink-bin/conf/flink-conf.yaml b/flink-dist/src/main/flink-bin/conf/flink-conf.yaml index 65a87b97619e7..70b8e16a5ddc5 100644 --- a/flink-dist/src/main/flink-bin/conf/flink-conf.yaml +++ b/flink-dist/src/main/flink-bin/conf/flink-conf.yaml @@ -27,6 +27,8 @@ jobmanager.rpc.port: 6123 jobmanager.heap.mb: 256 +jobmanager.profiling.enable: true + taskmanager.heap.mb: 512 taskmanager.numberOfTaskSlots: -1 @@ -39,6 +41,10 @@ parallelization.degree.default: 1 jobmanager.web.port: 8081 +# Duration of profiling events to display in ms (-1 = unlimited) +# +jobmanager.web.profiling.windowsize: 300000 + webclient.port: 8080 #============================================================================== diff --git a/flink-dist/src/main/flink-bin/conf/flink-conf.yaml~ b/flink-dist/src/main/flink-bin/conf/flink-conf.yaml~ new file mode 100644 index 0000000000000..7149e39c80c88 --- /dev/null +++ b/flink-dist/src/main/flink-bin/conf/flink-conf.yaml~ @@ -0,0 +1,83 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + + +#============================================================================== +# Common +#============================================================================== + +jobmanager.rpc.address: localhost + +jobmanager.rpc.port: 6123 + +jobmanager.heap.mb: 256 + +jobmanager.profling.enable: true + +taskmanager.heap.mb: 512 + +taskmanager.numberOfTaskSlots: -1 + +parallelization.degree.default: 1 + +#============================================================================== +# Web Frontend +#============================================================================== + +jobmanager.web.port: 8081 + +# Duration of profiling events to display in ms (-1 = unlimited) +# +jobmanager.web.profiling.windowsize: 300000 + +webclient.port: 8080 + +#============================================================================== +# Advanced +#============================================================================== + +# The number of buffers for the network stack. +# +# taskmanager.network.numberOfBuffers: 2048 + +# Directories for temporary files. +# +# Add a delimited list for multiple directories, using the system directory +# delimiter (colon ':' on unix) or a comma, e.g.: +# /data1/tmp:/data2/tmp:/data3/tmp +# +# Note: Each directory entry is read from and written to by a different I/O +# thread. You can include the same directory multiple times in order to create +# multiple I/O threads against that directory. This is for example relevant for +# high-throughput RAIDs. +# +# If not specified, the system-specific Java temporary directory (java.io.tmpdir +# property) is taken. +# +# taskmanager.tmp.dirs: /tmp + +# Path to the Hadoop configuration directory. +# +# This configuration is used when writing into HDFS. Unless specified otherwise, +# HDFS file creation will use HDFS default settings with respect to block-size, +# replication factor, etc. +# +# You can also directly specify the paths to hdfs-default.xml and hdfs-site.xml +# via keys 'fs.hdfs.hdfsdefault' and 'fs.hdfs.hdfssite'. +# +# fs.hdfs.hadoopconf: /path/to/hadoop/conf/ diff --git a/flink-runtime/resources/web-docs-infoserver/index.html b/flink-runtime/resources/web-docs-infoserver/index.html index f9d9e563fa810..8351cddec5020 100755 --- a/flink-runtime/resources/web-docs-infoserver/index.html +++ b/flink-runtime/resources/web-docs-infoserver/index.html @@ -22,8 +22,12 @@ + + + + @@ -51,6 +55,98 @@ }); }); + + + @@ -87,113 +183,165 @@
-
+
+ +
+
+

+ Apache Flink Dashboard +

+ +
+
+ -
-
-

Apache Flink Dashboard

- -
-
+
+
+
+
+
+
+ +
+
+

+ +

+

Task Manager

+
+
+
+
+
+
+
+
+
+
+ +
+
+

+ +

+

Processing Slots

+
+
+
+
+
+
+
+
+
+
+ +
+
+

+ +

+

Jobs Finished

+
+
+
+
+
+
+
+
+
+
+ +
+
+

+ +

+

Jobs Canceled

+
+
+
+
+
+
+
+
+
+
+ +
+
+

+ +

+

Jobs Failed

+
+
+
+
+
+
+ +
+
+ -
-
-
-
-
-
- -
-
-

-

Task Manager

-
-
-
-
-
-
-
-
-
-
- -
-
-

-

Processing Slots

-
-
-
-
-
-
-
-
-
-
- -
-
-

-

Jobs Finished

-
-
-
-
-
-
-
-
-
-
- -
-
-

-

Jobs Canceled

-
-
-
-
-
-
-
-
-
-
- -
-
-

-

Jobs Failed

-
-
-
-
-
-
- -
-
+
+
+
+
+

+ Running Jobs +

+
+
+

There are currently no running jobs, but you can + analyze your old jobs in the history. +

+

+
+
+
+
+ -
-
-
-
-

Running Jobs

-
-
-

There are currently no running jobs, but you can analyze your old jobs in the history.

-

-
-
-
-
+
+
+
+
+

CPU

+
+
+
+
+
+
-
+
+
+
+

Memory

+
+
+
+
+
+
+
+
+
+

Network

+
+
+
+
+
+
+
+
diff --git a/flink-runtime/resources/web-docs-infoserver/js/res-usage.js b/flink-runtime/resources/web-docs-infoserver/js/res-usage.js index 3898c55531b33..3ba981ae6019d 100644 --- a/flink-runtime/resources/web-docs-infoserver/js/res-usage.js +++ b/flink-runtime/resources/web-docs-infoserver/js/res-usage.js @@ -24,6 +24,23 @@ var flinkRU = {}; /** Container for the different profiling groups. */ flinkRU.profilingGroups = []; + flinkRU.settings = { + windowSize : -1 + }; + + // Load server settings. + flinkRU.loadSettings = function() { + $.ajax({ + url : "resourceUsage?get=settings", + cache : false, + type : "GET", + success : function(settings) { + flinkRU.settings = settings; + }, + dataType : "json" + }); + } + // Helper functions flinkRU.helpers = { @@ -87,14 +104,13 @@ var flinkRU = {}; } /** Template for a set of profiling series to be plotted together. */ - flinkRU.ProfilingGroup = function(name, options, eventType, plotId, windowSize, additionalFilter) { + flinkRU.ProfilingGroup = function(name, options, eventType, plotId, additionalFilter) { this.name = name; this.options = options; this.eventType = eventType; this.plotId = plotId; this.series = []; - this.windowSize = windowSize; this.updateWith = function(event) { if (event.type == this.eventType && (additionalFilter == undefined || additionalFilter(event))) { @@ -110,8 +126,8 @@ var flinkRU = {}; } // Cut off too old data. - if (lastTimestamp != -1 && this.windowSize >= 0) { - var minTimestamp = lastTimestamp - this.windowSize; + if (lastTimestamp != -1 && flinkRU.settings.windowSize >= 0) { + var minTimestamp = lastTimestamp - flinkRU.settings.windowSize; for ( var i in this.series) { var ser = this.series[i]; ser.cutOffBefore(minTimestamp); diff --git a/flink-runtime/resources/web-docs-infoserver/resource-usage-detailed.html b/flink-runtime/resources/web-docs-infoserver/resource-usage-detailed.html index 215edcdb392f6..926d2c07625e6 100644 --- a/flink-runtime/resources/web-docs-infoserver/resource-usage-detailed.html +++ b/flink-runtime/resources/web-docs-infoserver/resource-usage-detailed.html @@ -54,21 +54,34 @@ function instanceNameToPlotId(type, instance) { return "plot-" + type + "-" + instance.replace(/[^a-zA-Z0-9]/g, ""); } - + + function parseInstanceName(instance) { + return /^(\S*) \((.*)\)$/.exec(instance); + } + + function createSectionHeading(caption, subcaption) { + return '
\ + ' + caption + ' ' + subcaption + '\ +
'; + } + function createPlaceholder(caption, subcaption, id) { return '
\ -
\ -

\ - ' - + caption + '' + subcaption + '\ -

\ -
\ -
\ +
\ +
\ +

' + caption + '' + subcaption + '\ +

\ +
\ +
\ +
\
\
\
' } + + function createNodeSection(heading, placeholders) { + return heading + '
' + placeholders.join('') + '
'; + } function configureMonitor(instances) { @@ -144,14 +157,20 @@

\ var cpuPlotId = instanceNameToPlotId('cpu', instance); var memoryPlotId = instanceNameToPlotId('memory', instance); var networkPlotId = instanceNameToPlotId('network', instance); + var instanceNames = parseInstanceName(instance); - placeholders.append(createPlaceholder("CPU", instance, cpuPlotId)); - placeholders.append(createPlaceholder("Memory", instance, memoryPlotId)); - placeholders.append(createPlaceholder("Network", instance, networkPlotId)); + placeholders.append(createNodeSection( + createSectionHeading(instanceNames[1], instanceNames[2]), + [ + createPlaceholder("CPU", instance, cpuPlotId), + createPlaceholder("Memory", instance, memoryPlotId), + createPlaceholder("Network", instance, networkPlotId) + ] + )); var instanceFilter = flinkRU.helpers.makeInstanceFilter(instance); - var cpuGroup = new flinkRU.ProfilingGroup("CPU", cpuFlotOptions, "SingleInstanceProfilingEvent", '#' + cpuPlotId, windowSize, instanceFilter); + var cpuGroup = new flinkRU.ProfilingGroup("CPU", cpuFlotOptions, "SingleInstanceProfilingEvent", '#' + cpuPlotId, instanceFilter); // add series in reverse stacking order cpuGroup.series.push(new flinkRU.ProfilingSeries("Interrupts", "#4682B4", function(e) { return e.softIrqCpu + e.hardIrqCpu @@ -162,7 +181,7 @@

\ flinkRU.profilingGroups.push(cpuGroup); var memGroup = new flinkRU.ProfilingGroup("Memory", mainMemoryFlotOptions, "SingleInstanceProfilingEvent", '#' + memoryPlotId, - windowSize, instanceFilter); + instanceFilter); memGroup.series.push(new flinkRU.ProfilingSeries("Used memory", "#FF8C00", function(e) { return e.totalMemory - e.freeMemory; })); @@ -170,7 +189,7 @@

\ flinkRU.profilingGroups.push(memGroup); var networkGroup = new flinkRU.ProfilingGroup("Network", networkMemoryFlotOptions, "SingleInstanceProfilingEvent", - '#' + networkPlotId, windowSize, instanceFilter); + '#' + networkPlotId, instanceFilter); networkGroup.series.push(new flinkRU.ProfilingSeries("Transmitted", "#9370DB", flinkRU.helpers .accessProperty("transmittedBytes"))); networkGroup.series diff --git a/flink-runtime/resources/web-docs-infoserver/resource-usage-detailed2.html b/flink-runtime/resources/web-docs-infoserver/resource-usage-detailed2.html index 6c0e2fda509c8..6dc11178ba844 100644 --- a/flink-runtime/resources/web-docs-infoserver/resource-usage-detailed2.html +++ b/flink-runtime/resources/web-docs-infoserver/resource-usage-detailed2.html @@ -71,7 +71,7 @@ } }; - var cpuGroup = new flinkRU.ProfilingGroup("CPU", cpuFlotOptions, "SingleInstanceProfilingEvent", "#cpuChart", windowSize); + var cpuGroup = new flinkRU.ProfilingGroup("CPU", cpuFlotOptions, "SingleInstanceProfilingEvent", "#cpuChart"); flinkRU.profilingGroups.push(cpuGroup); // Memory profiling @@ -89,8 +89,7 @@ show : false } }; - var memGroup = new flinkRU.ProfilingGroup("Memory", mainMemoryFlotOptions, "SingleInstanceProfilingEvent", "#memoryChart", - windowSize); + var memGroup = new flinkRU.ProfilingGroup("Memory", mainMemoryFlotOptions, "SingleInstanceProfilingEvent", "#memoryChart"); flinkRU.profilingGroups.push(memGroup); // Network profiling @@ -108,8 +107,7 @@ } }; - var networkGroup = new flinkRU.ProfilingGroup("Network", networkMemoryFlotOptions, "SingleInstanceProfilingEvent", "#networkChart", - windowSize); + var networkGroup = new flinkRU.ProfilingGroup("Network", networkMemoryFlotOptions, "SingleInstanceProfilingEvent", "#networkChart"); flinkRU.profilingGroups.push(networkGroup); var instanceNameRegex = /\w+/ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/MenuServlet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/MenuServlet.java index a16b698d40978..d0ac042254933 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/MenuServlet.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/MenuServlet.java @@ -49,21 +49,21 @@ public class MenuServlet extends HttpServlet { * Array of possible menu entries on the left */ private static final String[] entries = { - "index", "history", "configuration", "taskmanagers", "resource-usage", "resource-usage-detailed", "resource-usage-detailed2" + "index", "history", "configuration", "taskmanagers", "resource-usage-detailed" }; /** * The names of the menu entries shown in the browser */ private static final String[] names = { - "Dashboard", "History", "Configuration", "Task Managers", "Resource Usage", "Detailed Resource Usage", "Detailed Resource Usage II" + "Dashboard", "History", "Configuration", "Task Managers", "Resource Usage" }; /** * The classes of the icons shown next to the names in the browser */ private static final String[] classes = { - "fa fa-dashboard", "fa fa-archive", "fa fa-wrench", "fa fa-building-o", "fa fa-bar-chart-o", "fa fa-bar-chart-o", "fa fa-bar-chart-o" + "fa fa-dashboard", "fa fa-archive", "fa fa-wrench", "fa fa-building-o", "fa fa-bar-chart-o" }; public MenuServlet() { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/ResourceUsageServlet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/ResourceUsageServlet.java index dda6a79731a4e..4fd54192dd520 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/ResourceUsageServlet.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/ResourceUsageServlet.java @@ -32,6 +32,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.runtime.event.job.AbstractEvent; import org.apache.flink.runtime.event.job.RecentJobEvent; import org.apache.flink.runtime.jobgraph.JobID; @@ -42,10 +44,29 @@ import org.apache.flink.runtime.profiling.types.SingleInstanceProfilingEvent; import org.apache.flink.util.StringUtils; +/** + * This servlet delivers information on profiling events to the client. + * + * Supported requests: + *
    + *
  1. get=setttings: Frontend profiling configuration
  2. + *
  3. get=profilingEvents or no get: Deliver current + * profiling events for a/the current job (use jobid to specificy job + * ID)
  4. + *
+ */ public class ResourceUsageServlet extends HttpServlet { private static final long serialVersionUID = 1L; + public static final String PROFILING_RESULTS_DISPLAY_WINDOW_SIZE_KEY = "jobmanager.web.profiling.windowsize"; + + /** + * For debugging purposes: Deliver also unused profiling events to the + * client? + */ + private static final boolean PRINT_ALL_EVENTS = false; + /** * The log for this class. */ @@ -64,6 +85,33 @@ public ResourceUsageServlet(JobManager jobManager) { @Override protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { + String getParameter = req.getParameter("get"); // profilingEvents is + // default + if (getParameter == null) { + respondProfilingEvents(req, resp); + } else if (getParameter.equalsIgnoreCase("profilingEvents")) { + respondProfilingEvents(req, resp); + } else if (getParameter.equalsIgnoreCase("settings")) { + respondSettings(req, resp); + } + } + + /** + * Reports the configuration for the profiling frontend. + */ + private void respondSettings(HttpServletRequest req, HttpServletResponse resp) throws IOException { + Configuration conf = GlobalConfiguration.getConfiguration(); + long windowSize = conf.getLong(PROFILING_RESULTS_DISPLAY_WINDOW_SIZE_KEY, -1); + resp.setStatus(HttpServletResponse.SC_OK); + resp.setContentType("application/json"); + PrintWriter writer = resp.getWriter(); + writer.format("{\"windowSize\":%d}", windowSize); + } + + /** + * Gathers {@link ProfilingEvent} objects from the {@link JobManager} and reports them in JSON format. + */ + private void respondProfilingEvents(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { try { JobID jobID = getJobID(req); @@ -82,8 +130,9 @@ protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws Se resp.getWriter().write(separator); jsonSerializer.write(profilingEvent, resp.getWriter()); separator = ","; - } else { - // This is not necessary and only useful to see what events are actually available in the frontend. + } else if (PRINT_ALL_EVENTS) { + // This is not necessary and only useful to see what + // events are actually available in the frontend. resp.getWriter().write(separator); new ProfilingEventSerializer().write(profilingEvent, resp.getWriter()); separator = ","; @@ -104,6 +153,11 @@ protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws Se } } + /** + * Find a suitable serializer for the given {@link ProfilingEvent} as registered in {@link #jsonSerializers}. + * @param profilingEvent is the event that shall be serialized + * @return a suitable serializer or null if none exists + */ @SuppressWarnings("unchecked") private ProfilingEventSerializer getSerializer(ProfilingEvent profilingEvent) { return (ProfilingEventSerializer) this.jsonSerializers.get(profilingEvent.getClass()); @@ -132,11 +186,18 @@ private JobID loadLatestJobID() throws IOException { return mostRecentJobEvent == null ? null : mostRecentJobEvent.getJobID(); } + /** + * Abstract class with some convenience functionality to serialize + * {@link ProfilingEvent} objects to JSON. + */ private static class ProfilingEventSerializer { private PrintWriter writer; private String separator; + /** + * Write a string parameter to the current {@link #writer}. + */ protected void writeField(String name, String value) { this.writer.write(separator); this.writer.write("\""); @@ -147,6 +208,9 @@ protected void writeField(String name, String value) { this.separator = ","; } + /** + * Write a long parameter to the current {@link #writer}. + */ protected void writeField(String name, long value) { this.writer.write(separator); this.writer.write("\""); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java index 8bdbf61616d91..8ab5ed49475e2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java @@ -128,7 +128,6 @@ public WebInfoServer(Configuration nepheleConfig, int port, JobManager jobmanage servletContext.addServlet(new ServletHolder(new LogfileInfoServlet(logDirFiles)), "/logInfo"); servletContext.addServlet(new ServletHolder(new SetupInfoServlet(jobmanager)), "/setupInfo"); servletContext.addServlet(new ServletHolder(new MenuServlet()), "/menu"); - // TODO: Add servlet only if profiling is enabled. servletContext.addServlet(new ServletHolder(new ResourceUsageServlet(jobmanager)), "/resourceUsage"); From 62e6e7b91478d85e63c6fd666b35f951796547e9 Mon Sep 17 00:00:00 2001 From: Sebastian Kruse Date: Thu, 4 Sep 2014 13:59:21 +0200 Subject: [PATCH 7/7] [FLINK-964] Changes according to code review * profiling is disable by default * the plots on the index page and the link to the detailed resource usage page are only shown when profiling is enabled * remove a backup file --- .../src/main/flink-bin/conf/flink-conf.yaml | 2 +- .../src/main/flink-bin/conf/flink-conf.yaml~ | 83 ------------------- .../resources/web-docs-infoserver/index.html | 11 ++- .../web-docs-infoserver/js/res-usage.js | 5 +- .../runtime/jobmanager/web/MenuServlet.java | 46 ++++++++++ .../jobmanager/web/ResourceUsageServlet.java | 8 +- 6 files changed, 62 insertions(+), 93 deletions(-) delete mode 100644 flink-dist/src/main/flink-bin/conf/flink-conf.yaml~ diff --git a/flink-dist/src/main/flink-bin/conf/flink-conf.yaml b/flink-dist/src/main/flink-bin/conf/flink-conf.yaml index 70b8e16a5ddc5..f8f32517874c6 100644 --- a/flink-dist/src/main/flink-bin/conf/flink-conf.yaml +++ b/flink-dist/src/main/flink-bin/conf/flink-conf.yaml @@ -27,7 +27,7 @@ jobmanager.rpc.port: 6123 jobmanager.heap.mb: 256 -jobmanager.profiling.enable: true +jobmanager.profiling.enable: false taskmanager.heap.mb: 512 diff --git a/flink-dist/src/main/flink-bin/conf/flink-conf.yaml~ b/flink-dist/src/main/flink-bin/conf/flink-conf.yaml~ deleted file mode 100644 index 7149e39c80c88..0000000000000 --- a/flink-dist/src/main/flink-bin/conf/flink-conf.yaml~ +++ /dev/null @@ -1,83 +0,0 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ - - -#============================================================================== -# Common -#============================================================================== - -jobmanager.rpc.address: localhost - -jobmanager.rpc.port: 6123 - -jobmanager.heap.mb: 256 - -jobmanager.profling.enable: true - -taskmanager.heap.mb: 512 - -taskmanager.numberOfTaskSlots: -1 - -parallelization.degree.default: 1 - -#============================================================================== -# Web Frontend -#============================================================================== - -jobmanager.web.port: 8081 - -# Duration of profiling events to display in ms (-1 = unlimited) -# -jobmanager.web.profiling.windowsize: 300000 - -webclient.port: 8080 - -#============================================================================== -# Advanced -#============================================================================== - -# The number of buffers for the network stack. -# -# taskmanager.network.numberOfBuffers: 2048 - -# Directories for temporary files. -# -# Add a delimited list for multiple directories, using the system directory -# delimiter (colon ':' on unix) or a comma, e.g.: -# /data1/tmp:/data2/tmp:/data3/tmp -# -# Note: Each directory entry is read from and written to by a different I/O -# thread. You can include the same directory multiple times in order to create -# multiple I/O threads against that directory. This is for example relevant for -# high-throughput RAIDs. -# -# If not specified, the system-specific Java temporary directory (java.io.tmpdir -# property) is taken. -# -# taskmanager.tmp.dirs: /tmp - -# Path to the Hadoop configuration directory. -# -# This configuration is used when writing into HDFS. Unless specified otherwise, -# HDFS file creation will use HDFS default settings with respect to block-size, -# replication factor, etc. -# -# You can also directly specify the paths to hdfs-default.xml and hdfs-site.xml -# via keys 'fs.hdfs.hdfsdefault' and 'fs.hdfs.hdfssite'. -# -# fs.hdfs.hadoopconf: /path/to/hadoop/conf/ diff --git a/flink-runtime/resources/web-docs-infoserver/index.html b/flink-runtime/resources/web-docs-infoserver/index.html index 8351cddec5020..ff2eda1901d1f 100755 --- a/flink-runtime/resources/web-docs-infoserver/index.html +++ b/flink-runtime/resources/web-docs-infoserver/index.html @@ -59,8 +59,13 @@ @@ -308,7 +311,7 @@

-
+