Skip to content

Commit

Permalink
[FLINK-8721][flip6] Handle archiving failures for accumulators
Browse files Browse the repository at this point in the history
During archivization, wrap errors thrown by users' Accumulators into a OptionalFailure
and do not fail the job because of that.
  • Loading branch information
pnowojski committed Mar 22, 2018
1 parent c7744d7 commit c691d94
Show file tree
Hide file tree
Showing 34 changed files with 462 additions and 361 deletions.
Expand Up @@ -65,6 +65,7 @@
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.OptionalFailure;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;

Expand Down Expand Up @@ -792,7 +793,7 @@ public CompletableFuture<Collection<JobStatusMessage>> listJobs() throws Excepti
* @param jobID The job identifier of a job.
* @return A Map containing the accumulator's name and its value.
*/
public Map<String, Object> getAccumulators(JobID jobID) throws Exception {
public Map<String, OptionalFailure<Object>> getAccumulators(JobID jobID) throws Exception {
return getAccumulators(jobID, ClassLoader.getSystemClassLoader());
}

Expand All @@ -803,7 +804,7 @@ public Map<String, Object> getAccumulators(JobID jobID) throws Exception {
* @param loader The class loader for deserializing the accumulator results.
* @return A Map containing the accumulator's name and its value.
*/
public Map<String, Object> getAccumulators(JobID jobID, ClassLoader loader) throws Exception {
public Map<String, OptionalFailure<Object>> getAccumulators(JobID jobID, ClassLoader loader) throws Exception {
ActorGateway jobManagerGateway = getJobManagerGateway();

Future<Object> response;
Expand All @@ -816,7 +817,7 @@ public Map<String, Object> getAccumulators(JobID jobID, ClassLoader loader) thro
Object result = Await.result(response, timeout);

if (result instanceof AccumulatorResultsFound) {
Map<String, SerializedValue<Object>> serializedAccumulators =
Map<String, SerializedValue<OptionalFailure<Object>>> serializedAccumulators =
((AccumulatorResultsFound) result).result();

return AccumulatorHelper.deserializeAccumulators(serializedAccumulators, loader);
Expand Down
Expand Up @@ -41,6 +41,7 @@
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.OptionalFailure;
import org.apache.flink.util.SerializedValue;

import javax.annotation.Nonnull;
Expand Down Expand Up @@ -132,16 +133,16 @@ public CompletableFuture<Collection<JobStatusMessage>> listJobs() throws Excepti
}

@Override
public Map<String, Object> getAccumulators(JobID jobID) throws Exception {
public Map<String, OptionalFailure<Object>> getAccumulators(JobID jobID) throws Exception {
return getAccumulators(jobID, ClassLoader.getSystemClassLoader());
}

@Override
public Map<String, Object> getAccumulators(JobID jobID, ClassLoader loader) throws Exception {
public Map<String, OptionalFailure<Object>> getAccumulators(JobID jobID, ClassLoader loader) throws Exception {
AccessExecutionGraph executionGraph = guardWithSingleRetry(() -> miniCluster.getExecutionGraph(jobID), scheduledExecutor).get();
Map<String, SerializedValue<Object>> accumulatorsSerialized = executionGraph.getAccumulatorsSerialized();
Map<String, Object> result = new HashMap<>(accumulatorsSerialized.size());
for (Map.Entry<String, SerializedValue<Object>> acc : accumulatorsSerialized.entrySet()) {
Map<String, SerializedValue<OptionalFailure<Object>>> accumulatorsSerialized = executionGraph.getAccumulatorsSerialized();
Map<String, OptionalFailure<Object>> result = new HashMap<>(accumulatorsSerialized.size());
for (Map.Entry<String, SerializedValue<OptionalFailure<Object>>> acc : accumulatorsSerialized.entrySet()) {
result.put(acc.getKey(), acc.getValue().deserializeValue(loader));
}
return result;
Expand Down
Expand Up @@ -89,6 +89,7 @@
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.OptionalFailure;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.CheckedSupplier;

Expand Down Expand Up @@ -409,7 +410,7 @@ private CompletableFuture<String> triggerSavepoint(
}

@Override
public Map<String, Object> getAccumulators(final JobID jobID, ClassLoader loader) throws Exception {
public Map<String, OptionalFailure<Object>> getAccumulators(final JobID jobID, ClassLoader loader) throws Exception {
final JobAccumulatorsHeaders accumulatorsHeaders = JobAccumulatorsHeaders.getInstance();
final JobAccumulatorsMessageParameters accMsgParams = accumulatorsHeaders.getUnresolvedMessageParameters();
accMsgParams.jobPathParameter.resolve(jobID);
Expand All @@ -420,7 +421,7 @@ public Map<String, Object> getAccumulators(final JobID jobID, ClassLoader loader
accMsgParams
);

Map<String, Object> result = Collections.emptyMap();
Map<String, OptionalFailure<Object>> result = Collections.emptyMap();

try {
result = responseFuture.thenApply((JobAccumulatorsInfo accumulatorsInfo) -> {
Expand Down
Expand Up @@ -84,6 +84,7 @@
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.testutils.category.Flip6;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.OptionalFailure;
import org.apache.flink.util.SerializedThrowable;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
Expand Down Expand Up @@ -362,7 +363,7 @@ public void testSubmitJobAndWaitForExecutionResult() throws Exception {
JobExecutionResultResponseBody.created(new JobResult.Builder()
.jobId(jobId)
.netRuntime(Long.MAX_VALUE)
.accumulatorResults(Collections.singletonMap("testName", new SerializedValue<>(1.0)))
.accumulatorResults(Collections.singletonMap("testName", new SerializedValue<>(OptionalFailure.of(1.0))))
.build()),
JobExecutionResultResponseBody.created(new JobResult.Builder()
.jobId(jobId)
Expand Down Expand Up @@ -558,7 +559,7 @@ public void testGetAccumulators() throws Exception {
JobID id = new JobID();

{
Map<String, Object> accumulators = restClusterClient.getAccumulators(id);
Map<String, OptionalFailure<Object>> accumulators = restClusterClient.getAccumulators(id);
assertNotNull(accumulators);
assertEquals(1, accumulators.size());

Expand Down Expand Up @@ -594,9 +595,9 @@ protected CompletableFuture<JobAccumulatorsInfo> handleRequest(
userTaskAccumulators.add(new JobAccumulatorsInfo.UserTaskAccumulator("testName", "testType", "testValue"));

if (includeSerializedValue) {
Map<String, SerializedValue<Object>> serializedUserTaskAccumulators = new HashMap<>(1);
Map<String, SerializedValue<OptionalFailure<Object>>> serializedUserTaskAccumulators = new HashMap<>(1);
try {
serializedUserTaskAccumulators.put("testKey", new SerializedValue<>("testValue"));
serializedUserTaskAccumulators.put("testKey", new SerializedValue<>(OptionalFailure.of("testValue")));
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand Down
Expand Up @@ -20,10 +20,12 @@

import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.util.OptionalFailure;

import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
* The result of a job execution. Gives access to the execution time of the job,
Expand All @@ -34,7 +36,7 @@ public class JobExecutionResult extends JobSubmissionResult {

private final long netRuntime;

private final Map<String, Object> accumulatorResults;
private final Map<String, OptionalFailure<Object>> accumulatorResults;

/**
* Creates a new JobExecutionResult.
Expand All @@ -43,7 +45,7 @@ public class JobExecutionResult extends JobSubmissionResult {
* @param netRuntime The net runtime of the job (excluding pre-flight phase like the optimizer) in milliseconds
* @param accumulators A map of all accumulators produced by the job.
*/
public JobExecutionResult(JobID jobID, long netRuntime, Map<String, Object> accumulators) {
public JobExecutionResult(JobID jobID, long netRuntime, Map<String, OptionalFailure<Object>> accumulators) {
super(jobID);
this.netRuntime = netRuntime;

Expand Down Expand Up @@ -85,7 +87,7 @@ public long getNetRuntime(TimeUnit desiredUnit) {
*/
@SuppressWarnings("unchecked")
public <T> T getAccumulatorResult(String accumulatorName) {
return (T) this.accumulatorResults.get(accumulatorName);
return (T) this.accumulatorResults.get(accumulatorName).get();
}

/**
Expand All @@ -95,7 +97,9 @@ public <T> T getAccumulatorResult(String accumulatorName) {
* @return A map containing all accumulators produced by the job.
*/
public Map<String, Object> getAllAccumulatorResults() {
return this.accumulatorResults;
return accumulatorResults.entrySet()
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().get()));
}

/**
Expand Down
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.api.common.accumulators;

import org.apache.flink.annotation.Internal;
import org.apache.flink.util.OptionalFailure;
import org.apache.flink.util.SerializedValue;

import java.io.IOException;
Expand All @@ -44,35 +45,44 @@ public class AccumulatorHelper {
* The collection of accumulators that will be merged into the
* other
*/
public static void mergeInto(Map<String, Accumulator<?, ?>> target, Map<String, Accumulator<?, ?>> toMerge) {
public static void mergeInto(Map<String, OptionalFailure<Accumulator<?, ?>>> target, Map<String, Accumulator<?, ?>> toMerge) {
for (Map.Entry<String, Accumulator<?, ?>> otherEntry : toMerge.entrySet()) {
Accumulator<?, ?> ownAccumulator = target.get(otherEntry.getKey());
OptionalFailure<Accumulator<?, ?>> ownAccumulator = target.get(otherEntry.getKey());
if (ownAccumulator == null) {
// Create initial counter (copy!)
target.put(otherEntry.getKey(), otherEntry.getValue().clone());
target.put(
otherEntry.getKey(),
OptionalFailure.createFrom(() -> otherEntry.getValue().clone()));
}
else if (ownAccumulator.isFailure()) {
continue;
}
else {
// Both should have the same type
AccumulatorHelper.compareAccumulatorTypes(otherEntry.getKey(),
ownAccumulator.getClass(), otherEntry.getValue().getClass());
ownAccumulator.get().getClass(), otherEntry.getValue().getClass());
// Merge target counter with other counter
mergeSingle(ownAccumulator, otherEntry.getValue());
target.put(
otherEntry.getKey(),
OptionalFailure.createFrom(() -> mergeSingle(ownAccumulator.get(), otherEntry.getValue().clone())));
}
}
}

/**
* Workaround method for type safety.
*/
private static <V, R extends Serializable> void mergeSingle(Accumulator<?, ?> target,
Accumulator<?, ?> toMerge) {
private static <V, R extends Serializable> Accumulator<V, R> mergeSingle(Accumulator<?, ?> target,
Accumulator<?, ?> toMerge) {
@SuppressWarnings("unchecked")
Accumulator<V, R> typedTarget = (Accumulator<V, R>) target;

@SuppressWarnings("unchecked")
Accumulator<V, R> typedToMerge = (Accumulator<V, R>) toMerge;

typedTarget.merge(typedToMerge);

return typedTarget;
}

/**
Expand Down Expand Up @@ -106,10 +116,10 @@ public static void compareAccumulatorTypes(
* Transform the Map with accumulators into a Map containing only the
* results.
*/
public static Map<String, Object> toResultMap(Map<String, Accumulator<?, ?>> accumulators) {
Map<String, Object> resultMap = new HashMap<String, Object>();
public static Map<String, OptionalFailure<Object>> toResultMap(Map<String, Accumulator<?, ?>> accumulators) {
Map<String, OptionalFailure<Object>> resultMap = new HashMap<>();
for (Map.Entry<String, Accumulator<?, ?>> entry : accumulators.entrySet()) {
resultMap.put(entry.getKey(), entry.getValue().getLocalValue());
resultMap.put(entry.getKey(), OptionalFailure.createFrom(() -> entry.getValue().getLocalValue()));
}
return resultMap;
}
Expand Down Expand Up @@ -152,19 +162,19 @@ public static String getResultsFormatted(Map<String, Object> map) {
* @throws IOException
* @throws ClassNotFoundException
*/
public static Map<String, Object> deserializeAccumulators(
Map<String, SerializedValue<Object>> serializedAccumulators, ClassLoader loader)
throws IOException, ClassNotFoundException {
public static Map<String, OptionalFailure<Object>> deserializeAccumulators(
Map<String, SerializedValue<OptionalFailure<Object>>> serializedAccumulators,
ClassLoader loader) throws IOException, ClassNotFoundException {

if (serializedAccumulators == null || serializedAccumulators.isEmpty()) {
return Collections.emptyMap();
}

Map<String, Object> accumulators = new HashMap<>(serializedAccumulators.size());
Map<String, OptionalFailure<Object>> accumulators = new HashMap<>(serializedAccumulators.size());

for (Map.Entry<String, SerializedValue<Object>> entry : serializedAccumulators.entrySet()) {
for (Map.Entry<String, SerializedValue<OptionalFailure<Object>>> entry : serializedAccumulators.entrySet()) {

Object value = null;
OptionalFailure<Object> value = null;
if (entry.getValue() != null) {
value = entry.getValue().deserializeValue(loader);
}
Expand Down

This file was deleted.

Expand Up @@ -50,6 +50,7 @@
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.types.Value;
import org.apache.flink.util.OptionalFailure;
import org.apache.flink.util.Visitor;

import java.util.ArrayList;
Expand Down Expand Up @@ -115,7 +116,7 @@ public JobExecutionResult execute(Plan program) throws Exception {
}

long endTime = System.currentTimeMillis();
Map<String, Object> accumulatorResults = AccumulatorHelper.toResultMap(accumulators);
Map<String, OptionalFailure<Object>> accumulatorResults = AccumulatorHelper.toResultMap(accumulators);
return new JobExecutionResult(null, endTime - startTime, accumulatorResults);
}

Expand Down

0 comments on commit c691d94

Please sign in to comment.