Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-10712] Support to restore state when using RestartPipelinedRegionStrategy #7009

Closed
wants to merge 2 commits into from

Conversation

Myasuka
Copy link
Member

@Myasuka Myasuka commented Nov 2, 2018

What is the purpose of the change

Currently, RestartPipelinedRegionStrategy does not perform any state restore. This is big problem because all restored regions will be restarted with empty state. This PR supports to restore state when using RestartPipelinedRegionStrategy.

Brief change log

  • Implement new restoreLatestCheckpointedState API for region-based failover in CheckpointCoordinator.
  • Reload checkpointed state when FailoverRegion called restart method.
  • StateAssignmentOperation could assign state with given executionVertices.

Verifying this change

This change added tests and can be verified as follows:

  • Added unit tests for FailoverRegion to ensure the failover region ever called new restoreLatestCheckpointedState API within CheckpointCoordinator.
  • Added unit tests for CheckpointCoordinatorTest to ensure CheckpointCoordinator could restore with RestartPipelinedRegionStrategy.
  • Added unit tests for CheckpointStateRestoreTest to ensure RestartPipelinedRegionStrategy could handle well when restoring state from a checkpoint to the task executions.
  • Added new integration test RegionFailoverITCase to verify state could be restored properly when the job consists multi regions.
  • Refactored StreamFaultToleranceTestBase to let all sub-classes ITs could failover with state using RestartPipelinedRegionStrategy.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): don't know
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? JavaDocs

Copy link
Contributor

@StefanRRichter StefanRRichter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not completely done with the review, but so far already have some change requests. Mostly the problem is code duplication. We should try to come up with a single implementation to handle both cases.

@@ -201,31 +261,33 @@ private void assignTaskStateToExecutionJobVertices(

for (int subTaskIndex = 0; subTaskIndex < newParallelism; subTaskIndex++) {

Execution currentExecutionAttempt = executionJobVertex.getTaskVertices()[subTaskIndex]
.getCurrentExecutionAttempt();
if (subTaskIndices.contains(subTaskIndex)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of for i in (0 .. newParallelism) -> contains(i), why not supply an Iterable subtaskIDsinstead and thenfor (int subtask : subtaskIDs)`? The old codepath would just pass in an iterable from 0 to new parallelism.

* that restores <i>non-partitioned</i> state from this
* checkpoint.
*/
public boolean restoreLatestCheckpointedState(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, this is almost a complete duplication of the original method. We should unify boths methods to keep this maintainable.

}

return true;
}

private void assignAttemptState(ExecutionJobVertex executionJobVertex, List<OperatorState> operatorStates) {
private void assignAttemptState(ExecutionJobVertex executionJobVertex, List<OperatorState> operatorStates, Set<Integer> subTaskIndices) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I doubt that Set<Integer> is the best representation of subtask indexes. At least from the interface leven, an Iterable<Integer> could do the job if we rewite the loop as I suggested. Forthermore, we can have a more memory friendly implementation to back this, for example boolean[] or Bitset.

@Myasuka
Copy link
Member Author

Myasuka commented Nov 30, 2018

@StefanRRichter Thanks for your comments, I would refactor this PR.
BTW, I found region failover without letting checkpoint coordinator restart its checkpointScheduler would not guarantee EXACTLY_ONCE mechanism. I'll include this part of modification in next commits.

@StefanRRichter
Copy link
Contributor

Ok, sounds good, looking forward to the new changes!

@Myasuka
Copy link
Member Author

Myasuka commented Dec 21, 2018

@StefanRRichter Would you please take a look at the new commit? I really appreciate any help you can provide.

Copy link
Contributor

@StefanRRichter StefanRRichter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update! I have a few suggestions for improvements in the detailed comments. More importantly, I was wondering if we even need this approach in all its complexity or if we can get away simply by using a slighly modified call to existing code (details in the inline comments). This is an important point to figure out before we proceed. Please also try to avoid unrelated changes, in particular to indentation, because they make the diff bigger and more to review.

@@ -77,8 +77,8 @@

private static String outPath;

@BeforeClass
public static void createHDFS() throws IOException {
@Before
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the changes on he file sink tests look unrelated. Does this fix some problem. Is there any reason why we should combine them with this PR instead of having a separate PR? If you agree, I would suggest revert such changes.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is because its parent StreamFaultToleranceTestBase introduced parameterized test for two different failover strategies, for which I want to verify whether streaming programs could still be fault tolerant with RestartPipelinedRegionStrategy. However, files on hdfs left within RollingSinkFaultToleranceITCase after this test has run once would cause it failed in the new run with another strategy. That's why I have to modify this test.
I think after this PR, FLINK-10713 should also add RestartIndividualStrategy into StreamFaultToleranceTestBase.

boolean errorIfNoCheckpoint) throws Exception {

Map<JobVertexID, ExecutionJobVertex> tasks = new HashMap<>(executionVertices.size());
Map<JobVertexID, BitSet> executionJobVertexIndices = new HashMap<>(executionVertices.size());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it could make more sense to combine the maps tasks and executionJobVertexIndices, for example combining them in a Map<JobVertexID, ExecutionJobVertexWithSelectedSubtasks>, where ExecutionJobVertexWithSelectedSubtasks is a pojo that combines ExecutionJobVertex with their corresponding BitSet / Iterable<Integer>. This can save lookups and memory for unnecessary data structures.

@@ -54,6 +55,7 @@
private static final Logger LOG = LoggerFactory.getLogger(StateAssignmentOperation.class);

private final Map<JobVertexID, ExecutionJobVertex> tasks;
private final Map<JobVertexID, BitSet> taskIndices;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my previous suggestion, what I meant was really using something like the interface Iterable<Integer>, not directly the implementation of BitSet. Then we could have two implementations of the iterable, one that delegates to a bitset internally, and one for the general case that just takes a number (parallelism) and generates a sequence from 0..parallelism. This would keep the general code path more memory friendly and does not tie us to BitSet. Instead of Iterable, Collection could be considered if you find that knowing the size opens up optimizations.

Map<OperatorInstanceID, List<KeyedStateHandle>> subManagedKeyedState,
Map<OperatorInstanceID, List<KeyedStateHandle>> subRawKeyedState,
int newParallelism) {
ExecutionJobVertex executionJobVertex,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a lot of unrelated formatting changes for method parameters. Can you please revert all the indentation changes because the make the diff bigger and therefore reviews harder.

executionGraph.getCheckpointCoordinator().restoreLatestCheckpointedState(
connectedExecutionVertexes, false, false);
connectedExecutionVertexes, false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a general a high-level question about this whole approach, that I forgot to ask in my previous review: do we even need to introduce this additional restore method. It seems to me that all it does is optimizing the state assignment a little bit for regional failover (at slightly increased complexity for the full recovery cases).

Instead, couldn't we just simply use the existing restoreLatestCheckpointedState method, without providing any indexes and just call it from here with executionGraph.allVertices(). State assignment is a simple meta data operation and should run very fast in general. As this will only modify the restoreState variable, only the vertices that are actually restarted for the failed region see the effect. The remaining vertices are not restarted and do not care about the change. We might need to change setInitialState for this by removing the precondition that checks for CREATED or only assign to instances in this state.
If this is just an optimization attempt for this special case, this could reduce the amount of changes and potentiall bugs by a lot. What do you think? Did you find any other reason why this whole index handling is required?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my point of view, we should not depend on execution's current status to determine whether to assign state.
With the given indices, we could figure out the logical plan containing exact executions needed to be assigned. While try to assign state to all executions and use execution's status to determine whether assigning initial state would cause potential bugs indeed.
Moreover, current implementation has already use one base restoreLatestCheckpointedState method to handle both situations to reduce potential bugs happening. On the other hand, I think FLINK-10713 could also reuse new restoreLatestCheckpointedState(List<ExecutionVertex>, boolean) method.
I'm not sure whether I have expressed my thoughts clearly to convince you, but please leave any thoughts or concerns if further discussion still needed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For all cases, as I see it the indexes work as an optimization to just run the repartitioning for some tasks. I think it would be ok to drop the precondition check because the assignment is only done once in the very beginning, so it seems a bit overcautious at this point. If you think that the optimization is helpful, we can introduce it with the small changes I proposed. However, I doubt that we see real performance benefits in the reassignment, but the code is getting more complex, which can make bugs more likely. Maybe you could just try out if the is an observable performance difference between using the indexes and just reassigning to all tasks?

@@ -985,6 +986,67 @@ int getNumScheduledTasks() {
* restore from.
* @param allowNonRestoredState Allow checkpoint state that cannot be mapped
* to any job vertex in tasks.
*/
public boolean restoreLatestCheckpointedState(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you want, it seems to me like we can remove the return value from this method and change it to void.

@StefanRRichter
Copy link
Contributor

StefanRRichter commented Jan 9, 2019

I have one more concern that might lead to bugs in a certain corner case. What will happen in your change if the task is using operator state, union state in particular. In applyRepartitioner(),

if (OperatorStateHandle.Mode.UNION.equals(metaInfo.getDistributionMode())) {

you can see that all operator state is repartitioned if there is one union state. this can already be a problem with a union state, but even more if there is a union state and some partionable state - the partitioning for the partitionable state for those tasks that are restarted could differ to the partitioning used in the original run - some partitions could be dropped or assigned twice by this. I think that means we need to change the method to only redistribute the union states, and I wonder if even distributioing the union state only for the failed task even makes sense. I think we need a testcase for this scenario (operator with 1 union and 1 partionable operator state)and I think it might fail as described before when we check how the operator state was reassigned after some partial recovery. What do you think?

@Myasuka
Copy link
Member Author

Myasuka commented Jan 13, 2019

@StefanRRichter It seems union operator state, somehow, conflicts with partial recovery. However, since region means the minimal pipeline connected sub graph, why it could have union-state across different regions? Otherwise, we might need to introduce some limitations in this scenario.

Would you please kindly help to clarify more clearly on this corner case?

@StefanRRichter
Copy link
Contributor

StefanRRichter commented Jan 14, 2019

@Myasuka Yes, in the current implementation union state is a problem and unfortunately it is also used in some popular operators. For example, KafkaConsumer "abuses" union state to have a rescaling protocol that can support partition discovery. In a nutshell, during restore all parallel instances see the kafka offsets of all partitions and every instance will cherrypick kafka partitions through a protocoll that all instances follow. So every partition will go to exactly one operator and there is no need for communication between instances for that.

The contract of union state is that in recovery, each operator instance sees the union of states from all instances. The question for partial recovery is now, will recoverying instances see i) all states, ii) all states from other recovering instances, or iii) only their old state. I think most likely, we should go for option i), but this also means that all states would have to go into the operation and cannot be excluded via the index.

Then there is another problem in the current implementation that can lead to bugs with your code in the following way: if there is at least one union state, all other operator states will go through round-robin reassignment as well. So, we round-robin reassign some state, but only restarting operators load the reassigned version of the state. Instances that we keep running will run with the old assignment of the state. This can lead to some partitions beeing assigned twice or not being assigned at all.

One way to solve this problem would be to separate union states from other operator states and only round-robin assign operator state if the parallelism did not change (which it never does for recoveries, only for restarts).

@Myasuka
Copy link
Member Author

Myasuka commented Jan 14, 2019

@StefanRRichter Thanks for your explanation. I still have two questions below:

  1. Even if we could assign partitioned operator state to all operator instances, the current taskRestore within Execution could only be shipped to taskmanagers if those executions located in the failed region. And instances that we keep running would not know operator states have changed. The possible bug "This can lead to some partitions beeing assigned twice or not being assigned at all" you mentioned is more likely on execution-graph side, and how we define the 'disunity' among tasks?

  2. The last suggestion you provide seems a bit confused for me, please correct me if I am wrong, did you actually mean only round-robin assign operator state if parallelism did change? The parallelism could only be changed if job restarted, while it would not change during job fail-over.

@StefanRRichter
Copy link
Contributor

@Myasuka

  1. Yes, that is correct. We need to doublecheck for operators that use union state if their protocol also works for partial restores. For example if under the same parallelism they will always pick the same state from the union a they have checkpointed and will not produce conflicts with operators that are still running.
  2. Again correct, it was a typo. We should no redistribution if the parallelism did not change for those states. That already avoids problems for all cases except 1.

@Myasuka
Copy link
Member Author

Myasuka commented Jan 18, 2019

@StefanRRichter I have went through all production code using union state as below:

For ArtificalOperatorStateMapper, SimpleEndlessSourceWithBloatedState , and StateCreatingFlatMap, they just use union state for end-to-end test verifying.

For FlinkKafkaProducer and FlinkKafkaProducer011, the union nextTransactionalIdHintState is just the same for all sub-tasks.

For KafkaConsumer, since Kafaka does support to decrease partition counts, the kafka partition is sticky to the subtask when parallslism not changed.

I think above operators would not meet conflict case.

For SequenceGeneratorSource, it would initialize state with max event time. However, monotonousEventTime only increase by fixed step of eventTimeClockProgressPerEvent, which should be the same for all sub-tasks.

For StreamingFileSink, it also get the max counter, but from the defination of StreamingFileSink, when restoring from checkpoint, the restored files in pending state are transferred into finished state, while in-progress files are rolled back. In other words from my point, partial recovery is not suitable for StreamingFileSink.

From my point of view, all operators in production code with union state, except StreamingFileSink, should work fine for partial restore. However, since the getUnionList API is public for users, we cannot control users' behavior. In a nutshell, if we support to restore state when using RestartPipelinedRegionStrategy, we should add limitation for union state.

I plan to add another parameter, which might be RecoveryMode, in the restoreLatestCheckpointedState method. For RestartAllStrategy and overall only one region's RestartPipelinedRegionStrategy, it's RecoveryMode.ALL; for other failover strategies, it's RecoveryMode.PARTIAL. By means of this, when assigning state, if we found RecoveryMode.PARTIAL and union state existed, unsupported exception could be thrown out. What do you think?

@StefanRRichter
Copy link
Contributor

I think I agree with the assessment of the existing operators.

About adding a RecoveryMode to consider, would that mean that we would prevent all jobs that use union state to work with partial recovery? I think if we just consider a few popular operators like KafkaConsumer, that would already prevent a lot of jobs from using different recovery modes.

I can see that this comes from the concern about existing code that uses union state. However, stricly speaking it should not be a regression because those recovery modes previously did not support state recovery at all. We also cannot prevent users from making wrong implementations, so I feel like a good thing to do is document what to care care for union state when using such recovery modes.

@Myasuka
Copy link
Member Author

Myasuka commented Feb 23, 2019

A new PR #7813 created to replace this one due to outdated code.

@tillrohrmann
Copy link
Contributor

Closing PR because it has been subsumed by #7813.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants