Skip to content

Commit

Permalink
[FLINK-4720] Implement archived ExecutionGraph
Browse files Browse the repository at this point in the history
This closes #2577.
  • Loading branch information
zentol committed Oct 14, 2016
1 parent f6d8668 commit 21e8e2d
Show file tree
Hide file tree
Showing 55 changed files with 1,944 additions and 311 deletions.
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.common;

import java.io.Serializable;

public interface Archiveable<T extends Serializable> {
T archive();
}
Expand Up @@ -15,9 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.executiongraph.archive;

import org.apache.flink.api.common.ExecutionConfig;
package org.apache.flink.api.common;

import java.io.Serializable;
import java.util.Collections;
Expand All @@ -28,15 +26,15 @@
* It can be used to display job information on the web interface
* without having to keep the classloader around after job completion.
*/
public class ExecutionConfigSummary implements Serializable {
public class ArchivedExecutionConfig implements Serializable {

private final String executionMode;
private final String restartStrategyDescription;
private final int parallelism;
private final boolean objectReuseEnabled;
private final Map<String, String> globalJobParameters;

public ExecutionConfigSummary(ExecutionConfig ec) {
public ArchivedExecutionConfig(ExecutionConfig ec) {
executionMode = ec.getExecutionMode().name();
if (ec.getRestartStrategy() != null) {
restartStrategyDescription = ec.getRestartStrategy().getDescription();
Expand Down
Expand Up @@ -58,7 +58,7 @@
* </ul>
*/
@Public
public class ExecutionConfig implements Serializable {
public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecutionConfig> {

private static final long serialVersionUID = 1L;

Expand Down Expand Up @@ -770,6 +770,11 @@ public int hashCode() {
public boolean canEqual(Object obj) {
return obj instanceof ExecutionConfig;
}

@Override
public ArchivedExecutionConfig archive() {
return new ArchivedExecutionConfig(this);
}


// ------------------------------ Utilities ----------------------------------
Expand Down
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.runtime.webmonitor;

import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.messages.JobManagerMessages;
Expand Down Expand Up @@ -47,7 +48,7 @@ public class ExecutionGraphHolder {

private final FiniteDuration timeout;

private final WeakHashMap<JobID, ExecutionGraph> cache = new WeakHashMap<JobID, ExecutionGraph>();
private final WeakHashMap<JobID, AccessExecutionGraph> cache = new WeakHashMap<>();

public ExecutionGraphHolder() {
this(WebRuntimeMonitor.DEFAULT_REQUEST_TIMEOUT);
Expand All @@ -63,8 +64,8 @@ public ExecutionGraphHolder(FiniteDuration timeout) {
* @param jid jobID of the execution graph to be retrieved
* @return the retrieved execution graph or null if it is not retrievable
*/
public ExecutionGraph getExecutionGraph(JobID jid, ActorGateway jobManager) {
ExecutionGraph cached = cache.get(jid);
public AccessExecutionGraph getExecutionGraph(JobID jid, ActorGateway jobManager) {
AccessExecutionGraph cached = cache.get(jid);
if (cached != null) {
return cached;
}
Expand All @@ -78,7 +79,7 @@ public ExecutionGraph getExecutionGraph(JobID jid, ActorGateway jobManager) {
return null;
}
else if (result instanceof JobManagerMessages.JobFound) {
ExecutionGraph eg = ((JobManagerMessages.JobFound) result).executionGraph();
AccessExecutionGraph eg = ((JobManagerMessages.JobFound) result).executionGraph();
cache.put(jid, eg);
return eg;
}
Expand Down
Expand Up @@ -19,7 +19,7 @@
package org.apache.flink.runtime.webmonitor.handlers;

import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
import org.apache.flink.runtime.webmonitor.NotFoundException;
Expand Down Expand Up @@ -53,13 +53,13 @@ public String handleRequest(Map<String, String> pathParams, Map<String, String>
throw new RuntimeException("Invalid JobID string '" + jidString + "': " + e.getMessage());
}

ExecutionGraph eg = executionGraphHolder.getExecutionGraph(jid, jobManager);
AccessExecutionGraph eg = executionGraphHolder.getExecutionGraph(jid, jobManager);
if (eg == null) {
throw new NotFoundException("Could not find job with id " + jid);
}

return handleRequest(eg, pathParams);
}

public abstract String handleRequest(ExecutionGraph graph, Map<String, String> params) throws Exception;
public abstract String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception;
}
Expand Up @@ -18,8 +18,8 @@

package org.apache.flink.runtime.webmonitor.handlers;

import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;

Expand All @@ -36,7 +36,7 @@ public AbstractJobVertexRequestHandler(ExecutionGraphHolder executionGraphHolder
}

@Override
public final String handleRequest(ExecutionGraph graph, Map<String, String> params) throws Exception {
public final String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
final String vidString = params.get("vertexid");
if (vidString == null) {
throw new IllegalArgumentException("vertexId parameter missing");
Expand All @@ -50,13 +50,13 @@ public final String handleRequest(ExecutionGraph graph, Map<String, String> para
throw new IllegalArgumentException("Invalid JobVertexID string '" + vidString + "': " + e.getMessage());
}

final ExecutionJobVertex jobVertex = graph.getJobVertex(vid);
final AccessExecutionJobVertex jobVertex = graph.getJobVertex(vid);
if (jobVertex == null) {
throw new IllegalArgumentException("No vertex with ID '" + vidString + "' exists.");
}

return handleRequest(jobVertex, params);
}

public abstract String handleRequest(ExecutionJobVertex jobVertex, Map<String, String> params) throws Exception;
public abstract String handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) throws Exception;
}
Expand Up @@ -18,8 +18,8 @@

package org.apache.flink.runtime.webmonitor.handlers;

import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.AccessExecution;
import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;

import java.util.Map;
Expand All @@ -37,7 +37,7 @@ public AbstractSubtaskAttemptRequestHandler(ExecutionGraphHolder executionGraphH
}

@Override
public String handleRequest(ExecutionVertex vertex, Map<String, String> params) throws Exception {
public String handleRequest(AccessExecutionVertex vertex, Map<String, String> params) throws Exception {
final String attemptNumberString = params.get("attempt");
if (attemptNumberString == null) {
throw new RuntimeException("Attempt number parameter missing");
Expand All @@ -51,18 +51,18 @@ public String handleRequest(ExecutionVertex vertex, Map<String, String> params)
throw new RuntimeException("Invalid attempt number parameter");
}

final Execution currentAttempt = vertex.getCurrentExecutionAttempt();
final AccessExecution currentAttempt = vertex.getCurrentExecutionAttempt();
if (attempt == currentAttempt.getAttemptNumber()) {
return handleRequest(currentAttempt, params);
}
else if (attempt >= 0 && attempt < currentAttempt.getAttemptNumber()) {
Execution exec = vertex.getPriorExecutionAttempt(attempt);
AccessExecution exec = vertex.getPriorExecutionAttempt(attempt);
return handleRequest(exec, params);
}
else {
throw new RuntimeException("Attempt does not exist: " + attempt);
}
}

public abstract String handleRequest(Execution execAttempt, Map<String, String> params) throws Exception;
public abstract String handleRequest(AccessExecution execAttempt, Map<String, String> params) throws Exception;
}
Expand Up @@ -18,8 +18,8 @@

package org.apache.flink.runtime.webmonitor.handlers;

import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;

import java.util.Map;
Expand All @@ -36,7 +36,7 @@ public AbstractSubtaskRequestHandler(ExecutionGraphHolder executionGraphHolder)
}

@Override
public final String handleRequest(ExecutionJobVertex jobVertex, Map<String, String> params) throws Exception {
public final String handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) throws Exception {
final String subtaskNumberString = params.get("subtasknum");
if (subtaskNumberString == null) {
throw new RuntimeException("Subtask number parameter missing");
Expand All @@ -54,9 +54,9 @@ public final String handleRequest(ExecutionJobVertex jobVertex, Map<String, Stri
throw new RuntimeException("subtask does not exist: " + subtask);
}

final ExecutionVertex vertex = jobVertex.getTaskVertices()[subtask];
final AccessExecutionVertex vertex = jobVertex.getTaskVertices()[subtask];
return handleRequest(vertex, params);
}

public abstract String handleRequest(ExecutionVertex vertex, Map<String, String> params) throws Exception;
public abstract String handleRequest(AccessExecutionVertex vertex, Map<String, String> params) throws Exception;
}
Expand Up @@ -20,7 +20,7 @@

import com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;

import java.io.StringWriter;
Expand All @@ -36,7 +36,7 @@ public JobAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder) {
}

@Override
public String handleRequest(ExecutionGraph graph, Map<String, String> params) throws Exception {
public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
StringifiedAccumulatorResult[] allAccumulators = graph.getAccumulatorResultsStringified();

StringWriter writer = new StringWriter();
Expand Down
Expand Up @@ -22,7 +22,7 @@
import org.apache.flink.runtime.checkpoint.stats.CheckpointStats;
import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.stats.JobCheckpointStats;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
import scala.Option;

Expand All @@ -39,7 +39,7 @@ public JobCheckpointsHandler(ExecutionGraphHolder executionGraphHolder) {
}

@Override
public String handleRequest(ExecutionGraph graph, Map<String, String> params) throws Exception {
public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
StringWriter writer = new StringWriter();
JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);

Expand Down
Expand Up @@ -22,8 +22,8 @@
import java.util.Map;

import com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.archive.ExecutionConfigSummary;
import org.apache.flink.api.common.ArchivedExecutionConfig;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;

/**
Expand All @@ -36,7 +36,7 @@ public JobConfigHandler(ExecutionGraphHolder executionGraphHolder) {
}

@Override
public String handleRequest(ExecutionGraph graph, Map<String, String> params) throws Exception {
public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {

StringWriter writer = new StringWriter();
JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
Expand All @@ -45,7 +45,7 @@ public String handleRequest(ExecutionGraph graph, Map<String, String> params) th
gen.writeStringField("jid", graph.getJobID().toString());
gen.writeStringField("name", graph.getJobName());

final ExecutionConfigSummary summary = graph.getExecutionConfigSummary();
final ArchivedExecutionConfig summary = graph.getArchivedExecutionConfig();

if (summary != null) {
gen.writeObjectFieldStart("execution-config");
Expand All @@ -59,7 +59,7 @@ public String handleRequest(ExecutionGraph graph, Map<String, String> params) th
Map<String, String> ucVals = summary.getGlobalJobParameters();
if (ucVals != null) {
gen.writeObjectFieldStart("user-config");

for (Map.Entry<String, String> ucVal : ucVals.entrySet()) {
gen.writeStringField(ucVal.getKey(), ucVal.getValue());
}
Expand Down
Expand Up @@ -24,9 +24,10 @@
import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;

Expand All @@ -50,7 +51,7 @@ public JobDetailsHandler(ExecutionGraphHolder executionGraphHolder) {
}

@Override
public String handleRequest(ExecutionGraph graph, Map<String, String> params) throws Exception {
public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
final StringWriter writer = new StringWriter();
final JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);

Expand Down Expand Up @@ -84,13 +85,13 @@ public String handleRequest(ExecutionGraph graph, Map<String, String> params) th
int[] jobVerticesPerState = new int[ExecutionState.values().length];
gen.writeArrayFieldStart("vertices");

for (ExecutionJobVertex ejv : graph.getVerticesTopologically()) {
for (AccessExecutionJobVertex ejv : graph.getVerticesTopologically()) {
int[] tasksPerState = new int[ExecutionState.values().length];
long startTime = Long.MAX_VALUE;
long endTime = 0;
boolean allFinished = true;

for (ExecutionVertex vertex : ejv.getTaskVertices()) {
for (AccessExecutionVertex vertex : ejv.getTaskVertices()) {
final ExecutionState state = vertex.getExecutionState();
tasksPerState[state.ordinal()]++;

Expand Down Expand Up @@ -133,7 +134,7 @@ public String handleRequest(ExecutionGraph graph, Map<String, String> params) th

gen.writeStartObject();
gen.writeStringField("id", ejv.getJobVertexId().toString());
gen.writeStringField("name", ejv.getJobVertex().getName());
gen.writeStringField("name", ejv.getName());
gen.writeNumberField("parallelism", ejv.getParallelism());
gen.writeStringField("status", jobVertexState.name());

Expand Down

0 comments on commit 21e8e2d

Please sign in to comment.