Skip to content

Commit

Permalink
[FLINK-2687] [monitoring API] Extend vertex requests with subtask dat…
Browse files Browse the repository at this point in the history
…a and accumulators
  • Loading branch information
StephanEwen committed Sep 17, 2015
1 parent ba31f83 commit 99a351e
Show file tree
Hide file tree
Showing 17 changed files with 431 additions and 88 deletions.
Expand Up @@ -58,7 +58,7 @@ public static void mergeInto(Map<String, Accumulator<?, ?>> target, Map<String,
/** /**
* Workaround method for type safety * Workaround method for type safety
*/ */
private static final <V, R extends Serializable> void mergeSingle(Accumulator<?, ?> target, private static <V, R extends Serializable> void mergeSingle(Accumulator<?, ?> target,
Accumulator<?, ?> toMerge) { Accumulator<?, ?> toMerge) {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
Accumulator<V, R> typedTarget = (Accumulator<V, R>) target; Accumulator<V, R> typedTarget = (Accumulator<V, R>) target;
Expand Down Expand Up @@ -104,16 +104,16 @@ public static void compareAccumulatorTypes(Object name,
public static Map<String, Object> toResultMap(Map<String, Accumulator<?, ?>> accumulators) { public static Map<String, Object> toResultMap(Map<String, Accumulator<?, ?>> accumulators) {
Map<String, Object> resultMap = new HashMap<String, Object>(); Map<String, Object> resultMap = new HashMap<String, Object>();
for (Map.Entry<String, Accumulator<?, ?>> entry : accumulators.entrySet()) { for (Map.Entry<String, Accumulator<?, ?>> entry : accumulators.entrySet()) {
resultMap.put(entry.getKey(), (Object) entry.getValue().getLocalValue()); resultMap.put(entry.getKey(), entry.getValue().getLocalValue());
} }
return resultMap; return resultMap;
} }


public static String getResultsFormated(Map<String, Object> map) { public static String getResultsFormated(Map<String, Object> map) {
StringBuilder builder = new StringBuilder(); StringBuilder builder = new StringBuilder();
for (Map.Entry<String, Object> entry : map.entrySet()) { for (Map.Entry<String, Object> entry : map.entrySet()) {
builder.append("- " + entry.getKey() + " (" + entry.getValue().getClass().getName() builder.append("- ").append(entry.getKey()).append(" (").append(entry.getValue().getClass().getName());
+ ")" + ": " + entry.getValue().toString() + "\n"); builder.append(")").append(": ").append(entry.getValue().toString()).append("\n");
} }
return builder.toString(); return builder.toString();
} }
Expand All @@ -127,8 +127,7 @@ public static void resetAndClearAccumulators(Map<String, Accumulator<?, ?>> accu
} }
} }


public static Map<String, Accumulator<?, ?>> copy(final Map<String, Accumulator<?, public static Map<String, Accumulator<?, ?>> copy(Map<String, Accumulator<?, ?>> accumulators) {
?>> accumulators) {
Map<String, Accumulator<?, ?>> result = new HashMap<String, Accumulator<?, ?>>(); Map<String, Accumulator<?, ?>> result = new HashMap<String, Accumulator<?, ?>>();


for(Map.Entry<String, Accumulator<?, ?>> entry: accumulators.entrySet()){ for(Map.Entry<String, Accumulator<?, ?>> entry: accumulators.entrySet()){
Expand Down
Expand Up @@ -35,12 +35,15 @@
import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler; import org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobAccumulatorsHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobPlanHandler; import org.apache.flink.runtime.webmonitor.handlers.JobPlanHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobConfigHandler; import org.apache.flink.runtime.webmonitor.handlers.JobConfigHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobExceptionsHandler; import org.apache.flink.runtime.webmonitor.handlers.JobExceptionsHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobDetailsHandler; import org.apache.flink.runtime.webmonitor.handlers.JobDetailsHandler;
import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler; import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler;
import org.apache.flink.runtime.webmonitor.handlers.DashboardConfigHandler; import org.apache.flink.runtime.webmonitor.handlers.DashboardConfigHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobVertexAccumulatorsHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobVertexDetailsHandler;
import org.apache.flink.runtime.webmonitor.handlers.RequestHandler; import org.apache.flink.runtime.webmonitor.handlers.RequestHandler;
import org.apache.flink.runtime.webmonitor.handlers.CurrentJobIdsHandler; import org.apache.flink.runtime.webmonitor.handlers.CurrentJobIdsHandler;
import org.apache.flink.runtime.webmonitor.handlers.ClusterOverviewHandler; import org.apache.flink.runtime.webmonitor.handlers.ClusterOverviewHandler;
Expand Down Expand Up @@ -139,16 +142,22 @@ else if (flinkRoot != null) {
.GET("/jobs/:jobid", handler(new JobDetailsHandler(currentGraphs))) .GET("/jobs/:jobid", handler(new JobDetailsHandler(currentGraphs)))
.GET("/jobs/:jobid/vertices", handler(new JobDetailsHandler(currentGraphs))) .GET("/jobs/:jobid/vertices", handler(new JobDetailsHandler(currentGraphs)))


.GET("/jobs/:jobid/vertices/:vertexid", handler(new JobVertexDetailsHandler(currentGraphs)))
.GET("/jobs/:jobid/vertices/:vertexid/subtasktimes", handler(new SubtasksTimesHandler(currentGraphs))) .GET("/jobs/:jobid/vertices/:vertexid/subtasktimes", handler(new SubtasksTimesHandler(currentGraphs)))
.GET("/jobs/:jobid/vertices/:vertexid/accumulators", handler(new JobVertexAccumulatorsHandler(currentGraphs)))

// .GET("/jobs/:jobid/vertices/:vertexid/subtasks/:subtaskid/:attempt", handler(null))
// .GET("/jobs/:jobid/vertices/:vertexid/subtasks/:subtaskid/:attempt/accumulators", handler(null))


.GET("/jobs/:jobid/plan", handler(new JobPlanHandler(currentGraphs))) .GET("/jobs/:jobid/plan", handler(new JobPlanHandler(currentGraphs)))
.GET("/jobs/:jobid/config", handler(new JobConfigHandler(currentGraphs))) .GET("/jobs/:jobid/config", handler(new JobConfigHandler(currentGraphs)))
.GET("/jobs/:jobid/exceptions", handler(new JobExceptionsHandler(currentGraphs))) .GET("/jobs/:jobid/exceptions", handler(new JobExceptionsHandler(currentGraphs)))
.GET("/jobs/:jobid/accumulators", handler(new JobAccumulatorsHandler(currentGraphs)))


// the handler for the legacy requests // the handler for the legacy requests
.GET("/jobsInfo", new JobManagerInfoHandler(jobManager, archive, DEFAULT_REQUEST_TIMEOUT)) .GET("/jobsInfo", new JobManagerInfoHandler(jobManager, archive, DEFAULT_REQUEST_TIMEOUT))


// this handler serves all the static contents // this handler serves all the static contents
.GET("/:*", new StaticFileServerHandler(webRootDir)); .GET("/:*", new StaticFileServerHandler(webRootDir));
} }


Expand Down
Expand Up @@ -61,6 +61,7 @@ public String handleRequest(Map<String, String> params) throws Exception {


MultipleJobsDetails result = (MultipleJobsDetails) Await.result(future, timeout); MultipleJobsDetails result = (MultipleJobsDetails) Await.result(future, timeout);


final long now = System.currentTimeMillis();


StringWriter writer = new StringWriter(); StringWriter writer = new StringWriter();
JsonGenerator gen = JsonFactory.jacksonFactory.createJsonGenerator(writer); JsonGenerator gen = JsonFactory.jacksonFactory.createJsonGenerator(writer);
Expand All @@ -70,20 +71,20 @@ public String handleRequest(Map<String, String> params) throws Exception {
if (includeRunningJobs && includeFinishedJobs) { if (includeRunningJobs && includeFinishedJobs) {
gen.writeArrayFieldStart("running"); gen.writeArrayFieldStart("running");
for (JobDetails detail : result.getRunningJobs()) { for (JobDetails detail : result.getRunningJobs()) {
generateSingleJobDetails(detail, gen); generateSingleJobDetails(detail, gen, now);
} }
gen.writeEndArray(); gen.writeEndArray();


gen.writeArrayFieldStart("finished"); gen.writeArrayFieldStart("finished");
for (JobDetails detail : result.getFinishedJobs()) { for (JobDetails detail : result.getFinishedJobs()) {
generateSingleJobDetails(detail, gen); generateSingleJobDetails(detail, gen, now);
} }
gen.writeEndArray(); gen.writeEndArray();
} }
else { else {
gen.writeArrayFieldStart("jobs"); gen.writeArrayFieldStart("jobs");
for (JobDetails detail : includeRunningJobs ? result.getRunningJobs() : result.getFinishedJobs()) { for (JobDetails detail : includeRunningJobs ? result.getRunningJobs() : result.getFinishedJobs()) {
generateSingleJobDetails(detail, gen); generateSingleJobDetails(detail, gen, now);
} }
gen.writeEndArray(); gen.writeEndArray();
} }
Expand All @@ -97,7 +98,7 @@ public String handleRequest(Map<String, String> params) throws Exception {
} }
} }


private static void generateSingleJobDetails(JobDetails details, JsonGenerator gen) throws Exception { private static void generateSingleJobDetails(JobDetails details, JsonGenerator gen, long now) throws Exception {
gen.writeStartObject(); gen.writeStartObject();


gen.writeStringField("jid", details.getJobId().toString()); gen.writeStringField("jid", details.getJobId().toString());
Expand All @@ -106,7 +107,7 @@ private static void generateSingleJobDetails(JobDetails details, JsonGenerator g


gen.writeNumberField("start-time", details.getStartTime()); gen.writeNumberField("start-time", details.getStartTime());
gen.writeNumberField("end-time", details.getEndTime()); gen.writeNumberField("end-time", details.getEndTime());
gen.writeNumberField("duration", details.getEndTime() <= 0 ? -1L : details.getEndTime() - details.getStartTime()); gen.writeNumberField("duration", (details.getEndTime() <= 0 ? now : details.getEndTime()) - details.getStartTime());
gen.writeNumberField("last-modification", details.getLastUpdateTime()); gen.writeNumberField("last-modification", details.getLastUpdateTime());


gen.writeObjectFieldStart("tasks"); gen.writeObjectFieldStart("tasks");
Expand Down
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.runtime.webmonitor.handlers; package org.apache.flink.runtime.webmonitor.handlers;


import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.runtime.util.EnvironmentInformation;


import java.io.StringWriter; import java.io.StringWriter;
import java.util.Map; import java.util.Map;
Expand Down Expand Up @@ -46,6 +47,13 @@ public DashboardConfigHandler(long refreshInterval) {
gen.writeNumberField("refresh-interval", refreshInterval); gen.writeNumberField("refresh-interval", refreshInterval);
gen.writeNumberField("timezone-offset", timeZoneOffset); gen.writeNumberField("timezone-offset", timeZoneOffset);
gen.writeStringField("timezone-name", timeZoneName); gen.writeStringField("timezone-name", timeZoneName);
gen.writeStringField("flink-version", EnvironmentInformation.getVersion());

EnvironmentInformation.RevisionInformation revision = EnvironmentInformation.getRevisionInformation();
if (revision != null) {
gen.writeStringField("flink-revision", revision.commitId + " @ " + revision.commitDate);
}

gen.writeEndObject(); gen.writeEndObject();


gen.close(); gen.close();
Expand Down
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.webmonitor.handlers;

import com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;

import java.io.StringWriter;
import java.util.Map;

/**
* Request handler that returns the aggregated user accumulators of a job.
*/
public class JobAccumulatorsHandler extends AbstractExecutionGraphRequestHandler implements RequestHandler.JsonResponse {

public JobAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder) {
super(executionGraphHolder);
}

@Override
public String handleRequest(ExecutionGraph graph, Map<String, String> params) throws Exception {
StringifiedAccumulatorResult[] allAccumulators = graph.getAccumulatorResultsStringified();

StringWriter writer = new StringWriter();
JsonGenerator gen = JsonFactory.jacksonFactory.createJsonGenerator(writer);

gen.writeStartObject();

gen.writeArrayFieldStart("job-accumulators");
// empty for now
gen.writeEndArray();

gen.writeArrayFieldStart("user-task-accumulators");
for (StringifiedAccumulatorResult acc : allAccumulators) {
gen.writeStartObject();
gen.writeStringField("name", acc.getName());
gen.writeStringField("type", acc.getType());
gen.writeStringField("value", acc.getValue());
gen.writeEndObject();
}
gen.writeEndArray();
gen.writeEndObject();

gen.close();
return writer.toString();
}
}
Expand Up @@ -34,7 +34,14 @@
import java.util.Map; import java.util.Map;


/** /**
* Request handler that returns the JSON program plan of a job graph. * Request handler that returns details about a job, including:
* <ul>
* <li>Dataflow plan</li>
* <li>id, name, and current status</li>
* <li>start time, end time, duration</li>
* <li>number of job vertices in each state (pending, running, finished, failed)</li>
* <li>info about job vertices, including runtime, status, I/O bytes and records, subtasks in each status</li>
* </ul>
*/ */
public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler implements RequestHandler.JsonResponse { public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler implements RequestHandler.JsonResponse {


Expand Down Expand Up @@ -63,6 +70,7 @@ public String handleRequest(ExecutionGraph graph, Map<String, String> params) th
gen.writeNumberField("start-time", jobStartTime); gen.writeNumberField("start-time", jobStartTime);
gen.writeNumberField("end-time", jobEndTime); gen.writeNumberField("end-time", jobEndTime);
gen.writeNumberField("duration", (jobEndTime > 0 ? jobEndTime : now) - jobStartTime); gen.writeNumberField("duration", (jobEndTime > 0 ? jobEndTime : now) - jobStartTime);
gen.writeNumberField("now", now);


// timestamps // timestamps
gen.writeObjectFieldStart("timestamps"); gen.writeObjectFieldStart("timestamps");
Expand All @@ -71,9 +79,8 @@ public String handleRequest(ExecutionGraph graph, Map<String, String> params) th
} }
gen.writeEndObject(); gen.writeEndObject();


final int[] tasksPerStatusTotal = new int[ExecutionState.values().length];

// job vertices // job vertices
int[] jobVerticesPerState = new int[ExecutionState.values().length];
gen.writeArrayFieldStart("vertices"); gen.writeArrayFieldStart("vertices");


for (ExecutionJobVertex ejv : graph.getVerticesTopologically()) { for (ExecutionJobVertex ejv : graph.getVerticesTopologically()) {
Expand All @@ -85,7 +92,6 @@ public String handleRequest(ExecutionGraph graph, Map<String, String> params) th
for (ExecutionVertex vertex : ejv.getTaskVertices()) { for (ExecutionVertex vertex : ejv.getTaskVertices()) {
final ExecutionState state = vertex.getExecutionState(); final ExecutionState state = vertex.getExecutionState();
tasksPerState[state.ordinal()]++; tasksPerState[state.ordinal()]++;
tasksPerStatusTotal[state.ordinal()]++;


// take the earliest start time // take the earliest start time
long started = vertex.getStateTimestamp(ExecutionState.DEPLOYING); long started = vertex.getStateTimestamp(ExecutionState.DEPLOYING);
Expand All @@ -112,7 +118,11 @@ public String handleRequest(ExecutionGraph graph, Map<String, String> params) th
endTime = -1L; endTime = -1L;
duration = -1L; duration = -1L;
} }


ExecutionState jobVertexState =
ExecutionJobVertex.getAggregateJobVertexState(tasksPerState, ejv.getParallelism());
jobVerticesPerState[jobVertexState.ordinal()]++;

Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> metrics = ejv.getAggregatedMetricAccumulators(); Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> metrics = ejv.getAggregatedMetricAccumulators();


LongCounter readBytes = (LongCounter) metrics.get(AccumulatorRegistry.Metric.NUM_BYTES_IN); LongCounter readBytes = (LongCounter) metrics.get(AccumulatorRegistry.Metric.NUM_BYTES_IN);
Expand All @@ -124,7 +134,8 @@ public String handleRequest(ExecutionGraph graph, Map<String, String> params) th
gen.writeStringField("id", ejv.getJobVertexId().toString()); gen.writeStringField("id", ejv.getJobVertexId().toString());
gen.writeStringField("name", ejv.getJobVertex().getName()); gen.writeStringField("name", ejv.getJobVertex().getName());
gen.writeNumberField("parallelism", ejv.getParallelism()); gen.writeNumberField("parallelism", ejv.getParallelism());

gen.writeStringField("status", jobVertexState.name());

gen.writeNumberField("start-time", startTime); gen.writeNumberField("start-time", startTime);
gen.writeNumberField("end-time", endTime); gen.writeNumberField("end-time", endTime);
gen.writeNumberField("duration", duration); gen.writeNumberField("duration", duration);
Expand All @@ -141,11 +152,20 @@ public String handleRequest(ExecutionGraph graph, Map<String, String> params) th
gen.writeNumberField("read-records", readRecords != null ? readRecords.getLocalValuePrimitive() : -1L); gen.writeNumberField("read-records", readRecords != null ? readRecords.getLocalValuePrimitive() : -1L);
gen.writeNumberField("write-records",writeRecords != null ? writeRecords.getLocalValuePrimitive() : -1L); gen.writeNumberField("write-records",writeRecords != null ? writeRecords.getLocalValuePrimitive() : -1L);
gen.writeEndObject(); gen.writeEndObject();

gen.writeEndObject(); gen.writeEndObject();
} }
gen.writeEndArray(); gen.writeEndArray();


gen.writeObjectFieldStart("status-counts");
for (ExecutionState state : ExecutionState.values()) {
gen.writeNumberField(state.name(), jobVerticesPerState[state.ordinal()]);
}
gen.writeEndObject();

gen.writeFieldName("plan");
gen.writeRawValue(graph.getJsonPlan());

gen.writeEndObject(); gen.writeEndObject();


gen.close(); gen.close();
Expand Down
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.webmonitor.handlers;

import com.fasterxml.jackson.core.JsonGenerator;

import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;

import java.io.StringWriter;
import java.util.Map;


public class JobVertexAccumulatorsHandler extends AbstractExecutionGraphRequestHandler implements RequestHandler.JsonResponse {

public JobVertexAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder) {
super(executionGraphHolder);
}

@Override
public String handleRequest(ExecutionGraph graph, Map<String, String> params) throws Exception {
String vidString = params.get("vertexid");
if (vidString == null) {
throw new IllegalArgumentException("vertexId parameter missing");
}

JobVertexID vid;
try {
vid = JobVertexID.fromHexString(vidString);
}
catch (Exception e) {
throw new IllegalArgumentException("Invalid JobVertexID string '" + vidString + "': " + e.getMessage());
}

ExecutionJobVertex jobVertex = graph.getJobVertex(vid);
if (jobVertex == null) {
throw new IllegalArgumentException("No vertex with ID '" + vidString + "' exists.");
}

StringifiedAccumulatorResult[] accs = jobVertex.getAggregatedUserAccumulatorsStringified();

StringWriter writer = new StringWriter();
JsonGenerator gen = JsonFactory.jacksonFactory.createJsonGenerator(writer);

gen.writeStartObject();
gen.writeStringField("id", jobVertex.getJobVertexId().toString());

gen.writeArrayFieldStart("user-accumulators");
for (StringifiedAccumulatorResult acc : accs) {
gen.writeStartObject();
gen.writeStringField("name", acc.getName());
gen.writeStringField("type", acc.getType());
gen.writeStringField("value", acc.getValue());
gen.writeEndObject();
}
gen.writeEndArray();

gen.writeEndObject();

gen.close();
return writer.toString();
}
}

0 comments on commit 99a351e

Please sign in to comment.