New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-8721][flip6] Handle archiving failures for accumulators #5737
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the fix @pnowojski. I had a comment about the actual accumulator value types delivered to the client via JobExecutionResult#accumulatorResults
. I think we have to introduce something like an AccumulatorValue
wrapper which can wrap the actual accumulator value or an exception which is thrown when being accessed in JobExecutionResult#getAccumulatorResult
and in JobExecutionResult#getAllAccumulatorResults
where we have to unpack all of them.
} | ||
return results; | ||
} | ||
} | ||
|
||
private static StringifiedAccumulatorResult stringifyAccumulatorResult(String name, Accumulator<?, ?> accumulator) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Nullable
missing for accumulator
localValue = accumulator.getLocalValue(); | ||
} | ||
catch (RuntimeException exception) { | ||
LOG.error("Failed to stringify accumulator", exception); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe add name
to log statement.
} | ||
|
||
@Test | ||
public void testFaultyAccumulator() throws Exception { | ||
|
||
TestEnvironment env = MINI_CLUSTER_RESOURCE.getTestEnvironment(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could also write StreamExecutionEnvironment.getExecutionEnvironment()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there an equivalent to .output(new DiscardingOutputFormat<>());
in the StreamExecutionEnvironment
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a no-op sink?
} catch (IOException e) { | ||
throw new RuntimeException("It should never happen that we cannot serialize the accumulator serialization exception.", e); | ||
} | ||
return new SerializedValue<>(new FailedAccumulator(ioe)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, the problem I see here is that in the success case, we store the accumulator value and in the failure case we store an Accumulator
instance. Thus, the user will expect the accumulator value and casting it accordingly. Thus he will never call the Accumulator
methods which will throw the exceptions (see JobExecutionResult
for how the user interacts with the accumulator values). In that sense the previous solution with storing a FailedAccumulatorSerialization
was also flawed.
What we actually would have to store in the SerializedValue
is something like an Either<Throwable, V>
. On the client side when accessing the accumulatorsValueMap
it should check whether it is left or right and in the left case throw the exception.
Alternatively, we say that an accumulator failure always results in a job failure. This means that in JobMaster#jobStatusChanged
we generate a failed ArchivedExecutionGraph
in case of an accumulator failure.
c691d94
to
1a2bc75
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update @pnowojski. I had some questions concerning the OptionalFailure
class. I think we should not catch RuntimeException
in the createFrom
. Actually I'm wondering whether we need createFrom
at all. I also think that we should throw a checked exception when accessing the OptionalFailure
value. Otherwise the user might not be aware that it could contain an exception.
private static final Logger LOG = LoggerFactory.getLogger(OptionalFailure.class); | ||
|
||
@Nullable | ||
private T value; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This type is not serializable. I think you should mark it transient
and then override readObject
and writeObject
similar to how ArrayList
does it.
try { | ||
return OptionalFailure.of(valueSupplier.get()); | ||
} | ||
catch (RuntimeException ex) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure whether we should capture the RuntimeException
here. To me a supplier
should not throw RuntimeExceptions
and if so, then it should not produce a OptionalFailure
but instead fail with a RuntimeException
.
/** | ||
* @return stored value or throw a {@link FlinkRuntimeException} with {@code failureCause}. | ||
*/ | ||
public T get() throws FlinkRuntimeException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think get
should throw a checked exception and not an unchecked exception. Otherwise users won't be aware of it. We could provide a method getUnchecked
where we throw an unchecked exception.
} | ||
|
||
@Override | ||
public boolean equals(Object object) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why deviating from the super class' parameter name obj
?
if (!(object instanceof OptionalFailure)) { | ||
return false; | ||
} | ||
OptionalFailure other = (OptionalFailure) object; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's cast to OptionalFailure<?>
return OptionalFailure.of(valueSupplier.get()); | ||
} | ||
catch (RuntimeException ex) { | ||
LOG.error("Failed to archive accumulators", ex); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The message indicates that OptionalFailure
was implemented for the accumulators in mind, but I think it should be more generic. I guess that AccumulatorHelper#67
is also the reason why we catch the RuntimeException
to make the merge supplier as smooth as possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ops, it was pulled in here by an accident.
Map<String, SerializedValue<Object>> accumulators) { | ||
public SerializedJobExecutionResult(JobID jobID, | ||
long netRuntime, | ||
Map<String, SerializedValue<OptionalFailure<Object>>> accumulators) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something is with the indentation off here.
@JsonDeserialize(contentUsing = SerializedValueDeserializer.class) @JsonProperty(FIELD_NAME_SERIALIZED_USER_TASK_ACCUMULATORS) Map<String, SerializedValue<Object>> serializedUserAccumulators) { | ||
@JsonProperty(FIELD_NAME_JOB_ACCUMULATORS) List<JobAccumulator> jobAccumulators, | ||
@JsonProperty(FIELD_NAME_USER_TASK_ACCUMULATORS) List<UserTaskAccumulator> userAccumulators, | ||
@JsonDeserialize(contentUsing = SerializedValueDeserializer.class) @JsonProperty(FIELD_NAME_SERIALIZED_USER_TASK_ACCUMULATORS) Map<String, SerializedValue<OptionalFailure<Object>>> serializedUserAccumulators) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indentation is wrong
import static org.junit.Assert.assertEquals; | ||
import static org.junit.Assert.assertNull; | ||
import static org.junit.Assert.assertTrue; | ||
import static org.junit.Assert.fail; | ||
|
||
/** | ||
* Tests for the SerializedJobExecutionResult | ||
*/ | ||
public class SerializedJobExecutionResultTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
extends TestLogger
missing
} | ||
|
||
@Test | ||
public void testFaultyAccumulator() throws Exception { | ||
|
||
TestEnvironment env = MINI_CLUSTER_RESOURCE.getTestEnvironment(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a no-op sink?
During archivization, wrap errors thrown by users' Accumulators into a OptionalFailure and do not fail the job because of that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Really nice work @pnowojski. Changes look good to me. Merging this PR.
During archivization, wrap errors thrown by users' Accumulators into a OptionalFailure and do not fail the job because of that. This closes apache#5737.
During archivization, wrap errors thrown by users' Accumulators into a OptionalFailure and do not fail the job because of that. This closes apache#5737.
During archivization, wrap errors thrown by users' Accumulators into a OptionalFailure and do not fail the job because of that. This closes #5737.
Thanks! |
During archivization, wrap errors thrown by users' Accumulators into a OptionalFailure and do not fail the job because of that. This closes apache#5737.
During archivization, wrap errors thrown by users' Accumulators into a FailedAccumulator and do not fail the job because of that.
Verifying this change
This change is covered by existing AccumulatorErrorITCase
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (yes / no)Documentation