Skip to content

Commit

Permalink
[FLINK-8721][flip6] Handle archiving failures for accumulators
Browse files Browse the repository at this point in the history
  • Loading branch information
pnowojski committed Mar 21, 2018
1 parent c61014a commit 6556443
Show file tree
Hide file tree
Showing 6 changed files with 176 additions and 111 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,34 @@ public class AccumulatorHelper {
public static void mergeInto(Map<String, Accumulator<?, ?>> target, Map<String, Accumulator<?, ?>> toMerge) {
for (Map.Entry<String, Accumulator<?, ?>> otherEntry : toMerge.entrySet()) {
Accumulator<?, ?> ownAccumulator = target.get(otherEntry.getKey());
if (ownAccumulator == null) {
if (ownAccumulator instanceof FailedAccumulator) {
continue;
}
else if (otherEntry.getValue() instanceof FailedAccumulator) {
target.put(otherEntry.getKey(), otherEntry.getValue());
}
else if (ownAccumulator == null) {
// Create initial counter (copy!)
target.put(otherEntry.getKey(), otherEntry.getValue().clone());
Accumulator<?, ?> accumulator;
try {
accumulator = otherEntry.getValue().clone();
}
catch (RuntimeException ex) {
accumulator = new FailedAccumulator<>(ex);
}
target.put(otherEntry.getKey(), accumulator);
}
else {
else if (!(ownAccumulator instanceof FailedAccumulator)) {
// Both should have the same type
AccumulatorHelper.compareAccumulatorTypes(otherEntry.getKey(),
ownAccumulator.getClass(), otherEntry.getValue().getClass());
// Merge target counter with other counter
mergeSingle(ownAccumulator, otherEntry.getValue());
try {
mergeSingle(ownAccumulator, otherEntry.getValue());
}
catch (RuntimeException ex) {
target.put(otherEntry.getKey(), new FailedAccumulator<>(ex));
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,25 @@

package org.apache.flink.api.common.accumulators;

import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;

import java.io.Serializable;

/**
* {@link Accumulator} implementation which indicates a serialization problem with the original
* {@link Accumulator} implementation which indicates a generic problem with the original
* accumulator. Accessing any of the {@link Accumulator} method will result in throwing the
* serialization exception.
* exception cause.
*
* @param <V> type of the value
* @param <R> type of the accumulator result
*/
public class FailedAccumulatorSerialization<V, R extends Serializable> implements Accumulator<V, R> {
public class FailedAccumulator<V, R extends Serializable> implements Accumulator<V, R> {
private static final long serialVersionUID = 6965908827065879760L;

private final Throwable throwable;

public FailedAccumulatorSerialization(Throwable throwable) {
public FailedAccumulator(Throwable throwable) {
this.throwable = Preconditions.checkNotNull(throwable);
}

Expand All @@ -46,28 +46,32 @@ public Throwable getThrowable() {

@Override
public void add(V value) {
ExceptionUtils.rethrow(throwable);
rethrow();
}

@Override
public R getLocalValue() {
ExceptionUtils.rethrow(throwable);
rethrow();
return null;
}

@Override
public void resetLocal() {
ExceptionUtils.rethrow(throwable);
rethrow();
}

@Override
public void merge(Accumulator<V, R> other) {
ExceptionUtils.rethrow(throwable);
rethrow();
}

@Override
public Accumulator<V, R> clone() {
ExceptionUtils.rethrow(throwable);
rethrow();
return null;
}

private void rethrow() {
throw new FlinkRuntimeException(throwable);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@
import static org.junit.Assert.assertThat;

/**
* Tests for the {@link FailedAccumulatorSerialization}.
* Tests for the {@link FailedAccumulator}.
*/
public class FailedAccumulatorSerializationTest extends TestLogger {
public class FailedAccumulatorTest extends TestLogger {

private static final IOException TEST_EXCEPTION = new IOException("Test exception");

Expand All @@ -44,7 +44,7 @@ public class FailedAccumulatorSerializationTest extends TestLogger {
*/
@Test
public void testMethodCallThrowsException() {
final FailedAccumulatorSerialization<Integer, Integer> accumulator = new FailedAccumulatorSerialization<>(TEST_EXCEPTION);
final FailedAccumulator<Integer, Integer> accumulator = new FailedAccumulator<>(TEST_EXCEPTION);

try {
accumulator.getLocalValue();
Expand Down Expand Up @@ -76,11 +76,11 @@ public void testMethodCallThrowsException() {
*/
@Test
public void testSerialization() throws Exception {
final FailedAccumulatorSerialization<?, ?> accumulator = new FailedAccumulatorSerialization<>(TEST_EXCEPTION);
final FailedAccumulator<?, ?> accumulator = new FailedAccumulator<>(TEST_EXCEPTION);

final byte[] serializedAccumulator = InstantiationUtil.serializeObject(accumulator);

final FailedAccumulatorSerialization<?, ?> deserializedAccumulator = InstantiationUtil.deserializeObject(serializedAccumulator, ClassLoader.getSystemClassLoader());
final FailedAccumulator<?, ?> deserializedAccumulator = InstantiationUtil.deserializeObject(serializedAccumulator, ClassLoader.getSystemClassLoader());

assertThat(deserializedAccumulator.getThrowable(), is(instanceOf(TEST_EXCEPTION.getClass())));
assertThat(deserializedAccumulator.getThrowable().getMessage(), is(equalTo(TEST_EXCEPTION.getMessage())));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,18 @@
package org.apache.flink.runtime.accumulators;

import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.util.ExceptionUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

/**
* Container class that transports the result of an accumulator as set of strings.
*/
public class StringifiedAccumulatorResult implements java.io.Serializable{
private static final Logger LOG = LoggerFactory.getLogger(StringifiedAccumulatorResult.class);

private static final long serialVersionUID = -4642311296836822611L;

Expand Down Expand Up @@ -67,22 +72,29 @@ public static StringifiedAccumulatorResult[] stringifyAccumulatorResults(Map<Str

int i = 0;
for (Map.Entry<String, Accumulator<?, ?>> entry : accs.entrySet()) {
StringifiedAccumulatorResult result;
Accumulator<?, ?> accumulator = entry.getValue();
if (accumulator != null) {
Object localValue = accumulator.getLocalValue();
if (localValue != null) {
result = new StringifiedAccumulatorResult(entry.getKey(), accumulator.getClass().getSimpleName(), localValue.toString());
} else {
result = new StringifiedAccumulatorResult(entry.getKey(), accumulator.getClass().getSimpleName(), "null");
}
} else {
result = new StringifiedAccumulatorResult(entry.getKey(), "null", "null");
}

results[i++] = result;
results[i++] = stringifyAccumulatorResult(entry.getKey(), entry.getValue());
}
return results;
}
}

private static StringifiedAccumulatorResult stringifyAccumulatorResult(String name, Accumulator<?, ?> accumulator) {
if (accumulator == null) {
return new StringifiedAccumulatorResult(name, "null", "null");
} else {
Object localValue;
try {
localValue = accumulator.getLocalValue();
}
catch (RuntimeException exception) {
LOG.error("Failed to stringify accumulator", exception);
localValue = ExceptionUtils.stringifyException(exception);
}
if (localValue != null) {
return new StringifiedAccumulatorResult(name, accumulator.getClass().getSimpleName(), localValue.toString());
} else {
return new StringifiedAccumulatorResult(name, accumulator.getClass().getSimpleName(), "null");
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.api.common.accumulators.FailedAccumulatorSerialization;
import org.apache.flink.api.common.accumulators.FailedAccumulator;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.JobException;
Expand Down Expand Up @@ -103,6 +103,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.stream.Collectors;

import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
Expand Down Expand Up @@ -769,27 +770,26 @@ public Executor getFutureExecutor() {
*/
@Override
public Map<String, SerializedValue<Object>> getAccumulatorsSerialized() {
return aggregateUserAccumulators()
.entrySet()
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, entry -> serializeAccumulator(entry.getKey(), entry.getValue())));
}

Map<String, Accumulator<?, ?>> accumulatorMap = aggregateUserAccumulators();

Map<String, SerializedValue<Object>> result = new HashMap<>(accumulatorMap.size());
for (Map.Entry<String, Accumulator<?, ?>> entry : accumulatorMap.entrySet()) {

private static SerializedValue<Object> serializeAccumulator(String name, Accumulator<?, ?> accumulator) {
try {
if (accumulator instanceof FailedAccumulator) {
return new SerializedValue<>(accumulator);
}
return new SerializedValue<>(accumulator.getLocalValue());
} catch (IOException ioe) {
LOG.error("Could not serialize accumulator " + name + '.', ioe);
try {
final SerializedValue<Object> serializedValue = new SerializedValue<>(entry.getValue().getLocalValue());
result.put(entry.getKey(), serializedValue);
} catch (IOException ioe) {
LOG.error("Could not serialize accumulator " + entry.getKey() + '.', ioe);

try {
result.put(entry.getKey(), new SerializedValue<>(new FailedAccumulatorSerialization(ioe)));
} catch (IOException e) {
throw new RuntimeException("It should never happen that we cannot serialize the accumulator serialization exception.", e);
}
return new SerializedValue<>(new FailedAccumulator(ioe));
} catch (IOException e) {
throw new RuntimeException("It should never happen that we cannot serialize the accumulator serialization exception.", e);
}
}

return result;
}

/**
Expand Down

0 comments on commit 6556443

Please sign in to comment.