Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-7213] Introduce state management by OperatorID in TaskManager #4353

Closed
wants to merge 5 commits into from

Conversation

StefanRRichter
Copy link
Contributor

@StefanRRichter StefanRRichter commented Jul 17, 2017

Flink-5892 introduced the job manager / checkpoint coordinator part of managing state on the operator level instead of the task level by introducing explicit operator_id -> state mappings.

However, this explicit mapping was not introduced in the task manager side, so the explicit mapping is still converted into a mapping that suits the implicit operator chain order.

This PR introduces this part and offers explicit state management by operator_id in the task manager.

Furthermore, this PR also introduces TaskStateSnapshot as unifying abstraction to replace TaskStateHandles and SubtaskStatewhich were always very similar, except that one offered collections of state handles (to support scaling in on restore) while the other only contained single objects (because each state is snapshotted into one state handle).

@StefanRRichter
Copy link
Contributor Author

CC @StephanEwen @zentol

* Under normal circumstances, the expected size of each collection is still 0 or 1, except for scale-down. In
* scale-down, one operator subtask can become responsible for the state of multiple previous subtasks. The collections
* can then store all the state handles that are relevant to build up the new subtask state.
* <p>There is no collection for legacy state because it is nor rescalable.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: nor -> not

* This class encapsulates the state for one parallel instance of an operator. The complete state of a (logical)
* operator (e.g. a flatmap operator) consists of the union of all {@link OperatorSubtaskState}s from all
* parallel tasks that physically execute parallelized, physical instances of the operator.
* <p>The full state of the logical operator is represented by {@link OperatorState} which consists of
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add en empty line before the <p> tag so we have to make less changes when activating checkstyle.

@@ -164,8 +168,16 @@ public void acknowledgeCheckpoint(
throw new RuntimeException(e);
}

boolean hasKeyedManagedKeyedState = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-> hasManagedKeyedState?

@@ -164,6 +269,7 @@ public long getStateSize() {

// --------------------------------------------------------------------------------------------


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this empty line

* <p>Typically, we expect all collections in this class to be of size 0 or 1, because there up to one state handle
* produced per state type (e.g. managed-keyed, raw-operator, ...). In particular, this holds when taking a snapshot.
* The purpose of having the state handles in collections is that this class is also reused in restoring state.
* Under normal circumstances, the expected size of each collection is still 0 or 1, except for scale-down. In
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How come we don't need this in the current master, where this class is also used for restoring state?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the master, we used two different classes for this purpose: OperatorSubtaskState to report from task to master, and TaskStateHandles to restore from master to task. Their difference is that in the first all fields are singletons, and the second all are collections. Otherwise, their purpose is identical, so I collapsed them into one class.

Copy link
Contributor

@zentol zentol left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the TaskStateHandles class can be removed. Looks like it only used by an unused OperatorStateHandles constructor.

@@ -89,6 +89,10 @@ private GroupByStateNameResults groupByStateName(

for (OperatorStateHandle psh : previousParallelSubtaskStates) {

if(psh == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing spacer after if

* he checkpoint coordinator. Tasks run operator instances in parallel, so the union of all
* {@link TaskStateSnapshot} that are collected by the checkpoint coordinator from all tasks represent the whole
* state of a job at the time of the checkpoint.
* <p>This class should be called TaskState once the old class with this name that we keep for backwards
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add empty line before paragraph

* register their state under their operator id. Each operator instance is a physical execution responsible for
* processing a partition of the data that goes through a logical operator. This partitioning happens to parallelize
* execution of logical operators, e.g. distributing a map function.
* <p>One instance of this class contains the information that one task will send to acknowledge a checkpoint request by t
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add empty line before paragraph

super.putState(subtaskIndex, spy(subtaskState));
}
}
// private static final class SpyInjectingOperatorState extends OperatorState {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can remove this (which is really great...)

Collections.singletonList(serializedKeyGroupStates),
Collections.<KeyedStateHandle>emptyList()));

//SubtaskState checkpointStateHandles = new SubtaskState(serializedState, null, null, serializedKeyGroupStates, null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whats up with this line?


Collection<KeyedStateHandle> keyedStateHandlesRaw = null;
Collection<OperatorStateHandle> operatorStateHandlesRaw = null;
Collection<OperatorStateHandle> operatorStateHandlesBackend = null;

boolean restoring = null != stateHandles;
boolean restoring = (null != stateHandles);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why did you add the braces?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like to do this when generating a boolean out of a != or == comparison because I find this easier to read in the presence of more than one = character.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to keep the parenthesis

I think we should let contributors use such styles at their discretion


if (asyncCheckpointState.compareAndSet(CheckpointingOperation.AsynCheckpointState.RUNNING,
CheckpointingOperation.AsynCheckpointState.COMPLETED)) {

// we signal a stateless task by reporting null, so that there are no attempts to assign empty state
// to stateless tasks on restore. This enables simple job modifications that only concern
// stateless without the need to assign them uids to match their (always empty) states.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stateless tasks

@@ -75,31 +103,84 @@
*/
private final long stateSize;

@VisibleForTesting
public OperatorSubtaskState(StreamStateHandle legacyOperatorState) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this constructor call the other one?

* <p>The full state of the logical operator is represented by {@link OperatorState} which consists of
* {@link OperatorSubtaskState}s.
*
* <p>Typically, we expect all collections in this class to be of size 0 or 1, because there up to one state handle
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because there is

Copy link
Contributor

@zentol zentol left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I propose modifying the TaskStateSnapshpot to contain a Map<OperatorID, Set<OperatorSubtaskState>> instead of integrating the collections into the OperatorSubtaskStates.

IMO this gives us a cleaner separation of concerns; the fact that a single operator subtask uses multiple subtask states is a deployment detail, which the OperatorSubtaskState shouldn't be concerned about.

@StefanRRichter
Copy link
Contributor Author

I think that idea is problematic because in the rescaling case, all the collections can have different sizes. For example there can be 5 managed keyed state handles and 7 managed operator state handles and zero state handles for the raw state. Then how would you split that up between the OperatorSubtaskStates in your set? Like this, OperatorSubtaskState contains the complete state for an operator subtask which I think is a good thing. Also maybe at some point there might be a reason to report more than one handle already on snapshotting.

@StefanRRichter
Copy link
Contributor Author

StefanRRichter commented Jul 30, 2017

BTW, one alternative I was once considering for the scale down case is merging multiple state handles (that are backed by different physical files) in one logical state handle, using something based on MultiStreamStateHandle. That would require minor changes in how the backends currently iterate the handles and some calculation of virtual offsets near the StateAssignmentOperation, mapping the old physical file offsets to the new logical offsets in the stream that gives a consecutive logical view over the files. Then, the whole code would never again deal with this detail. Wonder if this could be worth the effort? Still kind of like the plan...
Edit: It might need also another thought how to do something like that for the incremental cp case.

@StephanEwen
Copy link
Contributor

I had a very rough look at it, and the conceptual rework looks very good.

This would need a detailed pass over the code changes, though, since it touches very sensitive code...

@StephanEwen
Copy link
Contributor

Concerning the suggestion about the MultiStreamStateHandle - I am not sure that this can always work. Different physical files may have headers, so it may be important to recognize them as different chunks of state in the general case.

Copy link
Contributor

@StephanEwen StephanEwen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good work on this change!

+1 from my side, with some suggestions for edits. None of the issues are blocking merging the PR, they are minor suggestions for improvements.

I did skip the review of the parts of the code that deal with legacy state. Given that there is an imminent removal for that code, I thought that would make sense...

There is also some parts that will be improved through the related pull request to refactor the StreamConfig to hold a config per operator, rather than putting all operator configurations into one task config.

General Comments:

  • For the sake of robustness and future safety, it would be great to turn some of the state holder classes into immutable types. I was thinking of OperatorSubtaskState and TaskStateSnapshot for example.

  • There seem to be various cases of mocking in the CheckpointCoordinatorTest that can probably work without any mocking...

  • I think equals() and hashCode() are not really well defined on the state handle objects, because equal paths / array contents do not necessarily mean that the object is the same). Transitively, they are not well defined on classes like TaskStateSnapshot. If the only purpose is testing (compact way of asserting equality), implementing a Matcher that holds that type of equals logic would solve the problem as well.

private static final long serialVersionUID = 1L;

/** Mapping from an operator id to the state of one subtask of this operator */
private final Map<OperatorID, OperatorSubtaskState> subtaskStatesByOperatorID;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A LinkedHashMap has a slightly more predictable iteration performance (list traversal) compared to a HashMap (search through sparse table array). There are a lot of value iterations done in this class, but we also should have pretty full hash tables (since we never delete), so not sure how much difference it makes...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I think if we consider default load factors and for large sizes, I would pick a min >30% hit rate linear array scan over 100% hit rate random access iteration. For all expected sizes (in cache) in this class, it should not matter. LHM also consumes a bit more memory. I would tend to keep it this way.

* <p>This class should be called TaskState once the old class with this name that we keep for backwards
* compatibility goes away.
*/
public class TaskStateSnapshot implements CompositeStateHandle {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to make this immutable? It looks like this should not be modified any more after fully constructing it. This would also make it clear that methods iterating over the state, or returning sets / iterables can never fail with concurrent modifications.

For example the size method is considered a "best effort" method for info purposes only, and should not fail with an exception (it currently could fail with a ConcurrentModificationException).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class is totally intended to be immutable. So beyond what it is currently enforcing, do you suggest using immutable collections inside?

hasManagedKeyedState |= state.getManagedKeyedState() != null;
}
}

// should be one k/v state
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"should be at least one k/v state"?

/**
* Empty state.
*/
public OperatorSubtaskState() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor optimization: One could make this constructor private and have a field OperatorSubtaskState.EMPTY as a placeholder for the empty states. I'd leave this to you whether you think it worth doing...

newRawOperatorStates,
subTaskIndex,
operatorIndex,
subManagedOperatorState,
subRawOperatorState);

// KeyedState
if (operatorIndex == operatorIDs.size() - 1) {
subKeyedState = reAssignSubKeyedStates(operatorState,
if (isHeadOperator(operatorIndex, operatorIDs)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this check here? From the JobManager and CheckpointCoordinator side, nothing should prevent non-head operators to have keyed state. It is just a limitation in the current API.

This check seems to "enforce" an API limitation in a more general runtime that does not actually have a need for that restriction.

@@ -850,18 +843,20 @@ public void testSuccessfulCheckpointSubsumesUnsuccessful() {
OperatorID opID2 = OperatorID.fromJobVertexID(ackVertex2.getJobvertexId());
OperatorID opID3 = OperatorID.fromJobVertexID(ackVertex3.getJobvertexId());

Map<OperatorID, OperatorState> operatorStates1 = pending1.getOperatorStates();
TaskStateSnapshot taskOperatorSubtaskStates1_1 = spy(new TaskStateSnapshot());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is spying necessary here? There seem to be no verify() calls on this type...

@@ -878,14 +873,17 @@ public void testSuccessfulCheckpointSubsumesUnsuccessful() {
}
long checkpointId2 = pending2.getCheckpointId();

Map<OperatorID, OperatorState> operatorStates2 = pending2.getOperatorStates();
TaskStateSnapshot taskOperatorSubtaskStates2_1 = spy(new TaskStateSnapshot());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above, spying necessary?

// check that the vertices received the trigger checkpoint message
{
verify(vertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId), eq(timestamp), any(CheckpointOptions.class));
verify(vertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId), eq(timestamp), any(CheckpointOptions.class));
}

OperatorID opID1 = OperatorID.fromJobVertexID(vertex1.getJobvertexId());
OperatorID opID2 = OperatorID.fromJobVertexID(vertex2.getJobvertexId());
TaskStateSnapshot taskOperatorSubtaskStates1 = mock(TaskStateSnapshot.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not create a proper TaskStateSnapshot with one entry, rather than mocking?

@StefanRRichter
Copy link
Contributor Author

Thanks for the review @StephanEwen ! As I have already some branches that build upon this and touch similar places in the code, I would suggest to merge this as is and introduce the polishing changes you suggested afterwards with another commit.

@StefanRRichter
Copy link
Contributor Author

Merged in b71154a

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants