Skip to content

Commit

Permalink
[FLINK-5107] Introduced limit for prior execution attempt history
Browse files Browse the repository at this point in the history
This closes #2837.
  • Loading branch information
StefanRRichter authored and tillrohrmann committed Nov 22, 2016
1 parent ba8ed26 commit f5af7f1
Show file tree
Hide file tree
Showing 8 changed files with 422 additions and 23 deletions.
Expand Up @@ -19,7 +19,6 @@
package org.apache.flink.runtime.webmonitor.handlers; package org.apache.flink.runtime.webmonitor.handlers;


import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.core.JsonGenerator;

import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
import org.apache.flink.runtime.executiongraph.AccessExecution; import org.apache.flink.runtime.executiongraph.AccessExecution;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
Expand All @@ -39,6 +38,12 @@ public SubtaskExecutionAttemptAccumulatorsHandler(ExecutionGraphHolder execution


@Override @Override
public String handleRequest(AccessExecution execAttempt, Map<String, String> params) throws Exception { public String handleRequest(AccessExecution execAttempt, Map<String, String> params) throws Exception {

// return empty string for pruned (== null) execution attempts
if (null == execAttempt) {
return "";
}

final StringifiedAccumulatorResult[] accs = execAttempt.getUserAccumulatorsStringified(); final StringifiedAccumulatorResult[] accs = execAttempt.getUserAccumulatorsStringified();


StringWriter writer = new StringWriter(); StringWriter writer = new StringWriter();
Expand Down
Expand Up @@ -19,17 +19,16 @@


import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.util.EvictingBoundedList;


import java.io.Serializable; import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;


public class ArchivedExecutionVertex implements AccessExecutionVertex, Serializable { public class ArchivedExecutionVertex implements AccessExecutionVertex, Serializable {


private static final long serialVersionUID = -6708241535015028576L; private static final long serialVersionUID = -6708241535015028576L;
private final int subTaskIndex; private final int subTaskIndex;


private final List<ArchivedExecution> priorExecutions; private final EvictingBoundedList<ArchivedExecution> priorExecutions;


/** The name in the format "myTask (2/7)", cached to avoid frequent string concatenations */ /** The name in the format "myTask (2/7)", cached to avoid frequent string concatenations */
private final String taskNameWithSubtask; private final String taskNameWithSubtask;
Expand All @@ -38,9 +37,10 @@ public class ArchivedExecutionVertex implements AccessExecutionVertex, Serializa


public ArchivedExecutionVertex(ExecutionVertex vertex) { public ArchivedExecutionVertex(ExecutionVertex vertex) {
this.subTaskIndex = vertex.getParallelSubtaskIndex(); this.subTaskIndex = vertex.getParallelSubtaskIndex();
this.priorExecutions = new ArrayList<>(); EvictingBoundedList<Execution> copyOfPriorExecutionsList = vertex.getCopyOfPriorExecutionsList();
for (Execution priorExecution : vertex.getPriorExecutions()) { priorExecutions = new EvictingBoundedList<>(copyOfPriorExecutionsList.getSizeLimit());
priorExecutions.add(priorExecution.archive()); for (Execution priorExecution : copyOfPriorExecutionsList) {
priorExecutions.add(priorExecution != null ? priorExecution.archive() : null);
} }
this.taskNameWithSubtask = vertex.getTaskNameWithSubtaskIndex(); this.taskNameWithSubtask = vertex.getTaskNameWithSubtaskIndex();
this.currentExecution = vertex.getCurrentExecutionAttempt().archive(); this.currentExecution = vertex.getCurrentExecutionAttempt().archive();
Expand Down
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.AccumulatorHelper; import org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.api.common.time.Time; import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplit; import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner; import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.core.io.InputSplitSource; import org.apache.flink.core.io.InputSplitSource;
Expand All @@ -38,6 +39,7 @@
import org.apache.flink.runtime.jobgraph.JobEdge; import org.apache.flink.runtime.jobgraph.JobEdge;
import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.JobManagerOptions;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
Expand Down Expand Up @@ -163,9 +165,16 @@ public ExecutionJobVertex(
result.getResultType()); result.getResultType());
} }


Configuration jobConfiguration = graph.getJobConfiguration();
int maxPriorAttemptsHistoryLength = jobConfiguration != null ?
jobConfiguration.getInteger(JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE) :
JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE.defaultValue();

// create all task vertices // create all task vertices
for (int i = 0; i < numTaskVertices; i++) { for (int i = 0; i < numTaskVertices; i++) {
ExecutionVertex vertex = new ExecutionVertex(this, i, this.producedDataSets, timeout, createTimestamp); ExecutionVertex vertex = new ExecutionVertex(
this, i, this.producedDataSets, timeout, createTimestamp, maxPriorAttemptsHistoryLength);

this.taskVertices[i] = vertex; this.taskVertices[i] = vertex;
} }


Expand Down
Expand Up @@ -36,11 +36,13 @@
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobEdge; import org.apache.flink.runtime.jobgraph.JobEdge;
import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.JobManagerOptions;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.state.TaskStateHandles; import org.apache.flink.runtime.state.TaskStateHandles;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.util.EvictingBoundedList;
import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.SerializedValue; import org.apache.flink.util.SerializedValue;
import org.slf4j.Logger; import org.slf4j.Logger;
Expand All @@ -53,7 +55,6 @@
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;


import static org.apache.flink.runtime.execution.ExecutionState.CANCELED; import static org.apache.flink.runtime.execution.ExecutionState.CANCELED;
import static org.apache.flink.runtime.execution.ExecutionState.FAILED; import static org.apache.flink.runtime.execution.ExecutionState.FAILED;
Expand All @@ -79,7 +80,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi


private final int subTaskIndex; private final int subTaskIndex;


private final List<Execution> priorExecutions; private final EvictingBoundedList<Execution> priorExecutions;


private final Time timeout; private final Time timeout;


Expand All @@ -99,15 +100,31 @@ public ExecutionVertex(
int subTaskIndex, int subTaskIndex,
IntermediateResult[] producedDataSets, IntermediateResult[] producedDataSets,
Time timeout) { Time timeout) {
this(jobVertex, subTaskIndex, producedDataSets, timeout, System.currentTimeMillis()); this(
jobVertex,
subTaskIndex,
producedDataSets,
timeout,
System.currentTimeMillis(),
JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE.defaultValue());
} }


public ExecutionVertex( public ExecutionVertex(
ExecutionJobVertex jobVertex, ExecutionJobVertex jobVertex,
int subTaskIndex, int subTaskIndex,
IntermediateResult[] producedDataSets, IntermediateResult[] producedDataSets,
Time timeout, Time timeout,
long createTimestamp) { int maxPriorExecutionHistoryLength) {
this(jobVertex, subTaskIndex, producedDataSets, timeout, System.currentTimeMillis(), maxPriorExecutionHistoryLength);
}

public ExecutionVertex(
ExecutionJobVertex jobVertex,
int subTaskIndex,
IntermediateResult[] producedDataSets,
Time timeout,
long createTimestamp,
int maxPriorExecutionHistoryLength) {


this.jobVertex = jobVertex; this.jobVertex = jobVertex;
this.subTaskIndex = subTaskIndex; this.subTaskIndex = subTaskIndex;
Expand All @@ -125,7 +142,7 @@ public ExecutionVertex(


this.inputEdges = new ExecutionEdge[jobVertex.getJobVertex().getInputs().size()][]; this.inputEdges = new ExecutionEdge[jobVertex.getJobVertex().getInputs().size()][];


this.priorExecutions = new CopyOnWriteArrayList<Execution>(); this.priorExecutions = new EvictingBoundedList<>(maxPriorExecutionHistoryLength);


this.currentExecution = new Execution( this.currentExecution = new Execution(
getExecutionGraph().getFutureExecutor(), getExecutionGraph().getFutureExecutor(),
Expand Down Expand Up @@ -235,16 +252,19 @@ public TaskManagerLocation getCurrentAssignedResourceLocation() {


@Override @Override
public Execution getPriorExecutionAttempt(int attemptNumber) { public Execution getPriorExecutionAttempt(int attemptNumber) {
if (attemptNumber >= 0 && attemptNumber < priorExecutions.size()) { synchronized (priorExecutions) {
return priorExecutions.get(attemptNumber); if (attemptNumber >= 0 && attemptNumber < priorExecutions.size()) {
} return priorExecutions.get(attemptNumber);
else { } else {
throw new IllegalArgumentException("attempt does not exist"); throw new IllegalArgumentException("attempt does not exist");
}
} }
} }


List<Execution> getPriorExecutions() { EvictingBoundedList<Execution> getCopyOfPriorExecutionsList() {
return priorExecutions; synchronized (priorExecutions) {
return new EvictingBoundedList<>(priorExecutions);
}
} }


public ExecutionGraph getExecutionGraph() { public ExecutionGraph getExecutionGraph() {
Expand Down
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.jobmanager;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.configuration.ConfigOption;

import static org.apache.flink.configuration.ConfigOptions.key;

@PublicEvolving
public class JobManagerOptions {

/**
* The maximum number of prior execution attempts kept in history.
*/
public static final ConfigOption<Integer> MAX_ATTEMPTS_HISTORY_SIZE =
key("job-manager.max-attempts-history-size").defaultValue(16);

private JobManagerOptions() {
throw new IllegalAccessError();
}
}
@@ -0,0 +1,160 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.util;

import org.apache.flink.util.Preconditions;

import java.io.Serializable;
import java.util.ConcurrentModificationException;
import java.util.Iterator;
import java.util.NoSuchElementException;

/**
* This class implements a list (array based) that is physically bounded in maximum size, but can virtually grow beyond
* the bounded size. When the list grows beyond the size bound, elements are dropped from the head of the list (FIFO
* order). If dropped elements are accessed, a default element is returned instead.
* <p>
* TODO this class could eventually implement the whole actual List interface.
*
* @param <T> type of the list elements
*/
public class EvictingBoundedList<T> implements Iterable<T>, Serializable {

private static final long serialVersionUID = -1863961980953613146L;

private final T defaultElement;
private final Object[] elements;
private int idx;
private int count;
private long modCount;

public EvictingBoundedList(int sizeLimit) {
this(sizeLimit, null);
}

public EvictingBoundedList(EvictingBoundedList<T> other) {
Preconditions.checkNotNull(other);
this.defaultElement = other.defaultElement;
this.elements = other.elements.clone();
this.idx = other.idx;
this.count = other.count;
this.modCount = 0L;
}

public EvictingBoundedList(int sizeLimit, T defaultElement) {
this.elements = new Object[sizeLimit];
this.defaultElement = defaultElement;
this.idx = 0;
this.count = 0;
this.modCount = 0L;
}

public int size() {
return count;
}

public boolean isEmpty() {
return 0 == count;
}

public boolean add(T t) {
elements[idx] = t;
idx = (idx + 1) % elements.length;
++count;
++modCount;
return true;
}

public void clear() {
if (!isEmpty()) {
for (int i = 0; i < elements.length; ++i) {
elements[i] = null;
}
count = 0;
idx = 0;
++modCount;
}
}

public T get(int index) {
Preconditions.checkArgument(index >= 0 && index < count);
return isDroppedIndex(index) ? getDefaultElement() : accessInternal(index % elements.length);
}

public int getSizeLimit() {
return elements.length;
}

public T set(int index, T element) {
Preconditions.checkArgument(index >= 0 && index < count);
++modCount;
if (isDroppedIndex(index)) {
return getDefaultElement();
} else {
int idx = index % elements.length;
T old = accessInternal(idx);
elements[idx] = element;
return old;
}
}

public T getDefaultElement() {
return defaultElement;
}

private boolean isDroppedIndex(int idx) {
return idx < count - elements.length;
}

@SuppressWarnings("unchecked")
private T accessInternal(int arrayIndex) {
return (T) elements[arrayIndex];
}

@Override
public Iterator<T> iterator() {
return new Iterator<T>() {

int pos = 0;
final long oldModCount = modCount;

@Override
public boolean hasNext() {
return pos < count;
}

@Override
public T next() {
if (oldModCount != modCount) {
throw new ConcurrentModificationException();
}
if (pos < count) {
return get(pos++);
} else {
throw new NoSuchElementException("Iterator exhausted.");
}
}

@Override
public void remove() {
throw new UnsupportedOperationException("Read-only iterator");
}
};
}
}

0 comments on commit f5af7f1

Please sign in to comment.