Skip to content

Commit

Permalink
[Flink-5892] Restore state on operator level
Browse files Browse the repository at this point in the history
  • Loading branch information
zentol committed Apr 28, 2017
1 parent 8045fab commit f7980a7
Show file tree
Hide file tree
Showing 58 changed files with 2,971 additions and 1,117 deletions.
4 changes: 3 additions & 1 deletion docs/ops/upgrading.md
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ val mappedEvents: DataStream[(Int, Long)] = events


**Note:** Since the operator IDs stored in a savepoint and IDs of operators in the application to start must be equal, it is highly recommended to assign unique IDs to all operators of an application that might be upgraded in the future. This advice applies to all operators, i.e., operators with and without explicitly declared operator state, because some operators have internal state that is not visible to the user. Upgrading an application without assigned operator IDs is significantly more difficult and may only be possible via a low-level workaround using the `setUidHash()` method. **Note:** Since the operator IDs stored in a savepoint and IDs of operators in the application to start must be equal, it is highly recommended to assign unique IDs to all operators of an application that might be upgraded in the future. This advice applies to all operators, i.e., operators with and without explicitly declared operator state, because some operators have internal state that is not visible to the user. Upgrading an application without assigned operator IDs is significantly more difficult and may only be possible via a low-level workaround using the `setUidHash()` method.


**Important:** As of 1.3.0 this also applies to operators that are part of a chain.

By default all state stored in a savepoint must be matched to the operators of a starting application. However, users can explicitly agree to skip (and thereby discard) state that cannot be matched to an operator when starting a application from a savepoint. Stateful operators for which no state is found in the savepoint are initialized with their default state. By default all state stored in a savepoint must be matched to the operators of a starting application. However, users can explicitly agree to skip (and thereby discard) state that cannot be matched to an operator when starting a application from a savepoint. Stateful operators for which no state is found in the savepoint are initialized with their default state.


### Stateful Operators and User Functions ### Stateful Operators and User Functions
Expand Down Expand Up @@ -105,7 +107,7 @@ When upgrading an application by changing its topology, a few things need to be
* **Adding a stateful operator:** The state of the operator will be initialized with the default state unless it takes over the state of another operator. * **Adding a stateful operator:** The state of the operator will be initialized with the default state unless it takes over the state of another operator.
* **Removing a stateful operator:** The state of the removed operator is lost unless another operator takes it over. When starting the upgraded application, you have to explicitly agree to discard the state. * **Removing a stateful operator:** The state of the removed operator is lost unless another operator takes it over. When starting the upgraded application, you have to explicitly agree to discard the state.
* **Changing of input and output types of operators:** When adding a new operator before or behind an operator with internal state, you have to ensure that the input or output type of the stateful operator is not modified to preserve the data type of the internal operator state (see above for details). * **Changing of input and output types of operators:** When adding a new operator before or behind an operator with internal state, you have to ensure that the input or output type of the stateful operator is not modified to preserve the data type of the internal operator state (see above for details).
* **Changing operator chaining:** Operators can be chained together for improved performance. However, chaining can limit the ability of an application to be upgraded if a chain contains a stateful operator that is not the first operator of the chain. In such a case, it is not possible to break the chain such that the stateful operator is moved out of the chain. It is also not possible to append or inject an existing stateful operator into a chain. The chaining behavior can be changed by modifying the parallelism of a chained operator or by adding or removing explicit operator chaining instructions. * **Changing operator chaining:** Operators can be chained together for improved performance. When restoring from a savepoint taken since 1.3.0 it is possible to modify chains while preversing state consistency. It is possible a break the chain such that a stateful operator is moved out of the chain. It is also possible to append or inject a new or existing stateful operator into a chain, or to modify the operator order within a chain. However, when upgrading a savepoint to 1.3.0 it is paramount that the topology did not change in regards to chaining. All operators that are part of a chain should be assigned an ID as described in the [Matching Operator State](#Matching Operator State) section above.


## Upgrading the Flink Framework Version ## Upgrading the Flink Framework Version


Expand Down
5 changes: 0 additions & 5 deletions docs/setup/savepoints.md
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -185,8 +185,3 @@ If you did not assign IDs, the auto generated IDs of the stateful operators will
If the savepoint was triggered with Flink >= 1.2.0 and using no deprecated state API like `Checkpointed`, you can simply restore the program from a savepoint and specify a new parallelism. If the savepoint was triggered with Flink >= 1.2.0 and using no deprecated state API like `Checkpointed`, you can simply restore the program from a savepoint and specify a new parallelism.


If you are resuming from a savepoint triggered with Flink < 1.2.0 or using now deprecated APIs you first have to migrate your job and savepoint to Flink 1.2.0 before being able to change the parallelism. See the [upgrading jobs and Flink versions guide]({{ site.baseurl }}/ops/upgrading.html). If you are resuming from a savepoint triggered with Flink < 1.2.0 or using now deprecated APIs you first have to migrate your job and savepoint to Flink 1.2.0 before being able to change the parallelism. See the [upgrading jobs and Flink versions guide]({{ site.baseurl }}/ops/upgrading.html).

## Current limitations

- **Chaining**: Chained operators are identified by the ID of the first task. It's not possible to manually assign an ID to an intermediate chained task, e.g. in the chain `[ a -> b -> c ]` only **a** can have its ID assigned manually, but not **b** or **c**. To work around this, you can [manually define the task chains](index.html#task-chaining-and-resource-groups). If you rely on the automatic ID assignment, a change in the chaining behaviour will also change the IDs.

Original file line number Original file line Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@


import org.apache.flink.migration.runtime.checkpoint.TaskState; import org.apache.flink.migration.runtime.checkpoint.TaskState;
import org.apache.flink.runtime.checkpoint.MasterState; import org.apache.flink.runtime.checkpoint.MasterState;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.checkpoint.savepoint.Savepoint; import org.apache.flink.runtime.checkpoint.savepoint.Savepoint;
import org.apache.flink.util.Preconditions; import org.apache.flink.util.Preconditions;


Expand Down Expand Up @@ -71,6 +72,11 @@ public Collection<MasterState> getMasterStates() {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }


@Override
public Collection<OperatorState> getOperatorStates() {
return null;
}

@Override @Override
public void dispose() throws Exception { public void dispose() throws Exception {
//NOP //NOP
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.flink.runtime.executiongraph.JobStatusListener; import org.apache.flink.runtime.executiongraph.JobStatusListener;
import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings; import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
Expand Down Expand Up @@ -892,7 +893,7 @@ public void run() {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
StringBuilder builder = new StringBuilder(); StringBuilder builder = new StringBuilder();
builder.append("Checkpoint state: "); builder.append("Checkpoint state: ");
for (TaskState state : completedCheckpoint.getTaskStates().values()) { for (OperatorState state : completedCheckpoint.getOperatorStates().values()) {
builder.append(state); builder.append(state);
builder.append(", "); builder.append(", ");
} }
Expand Down Expand Up @@ -1017,11 +1018,11 @@ public boolean restoreLatestCheckpointedState(
LOG.info("Restoring from latest valid checkpoint: {}.", latest); LOG.info("Restoring from latest valid checkpoint: {}.", latest);


// re-assign the task states // re-assign the task states

final Map<OperatorID, OperatorState> operatorStates = latest.getOperatorStates();
final Map<JobVertexID, TaskState> taskStates = latest.getTaskStates();


StateAssignmentOperation stateAssignmentOperation = StateAssignmentOperation stateAssignmentOperation =
new StateAssignmentOperation(LOG, tasks, taskStates, allowNonRestoredState); new StateAssignmentOperation(tasks, operatorStates, allowNonRestoredState);

stateAssignmentOperation.assignStates(); stateAssignmentOperation.assignStates();


// call master hooks for restore // call master hooks for restore
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@


package org.apache.flink.runtime.checkpoint; package org.apache.flink.runtime.checkpoint;


import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.SharedStateRegistry; import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.StateUtil; import org.apache.flink.runtime.state.StateUtil;
import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.StreamStateHandle;
Expand Down Expand Up @@ -94,8 +93,8 @@ public class CompletedCheckpoint implements Serializable {
/** The duration of the checkpoint (completion timestamp - trigger timestamp). */ /** The duration of the checkpoint (completion timestamp - trigger timestamp). */
private final long duration; private final long duration;


/** States of the different task groups belonging to this checkpoint */ /** States of the different operator groups belonging to this checkpoint */
private final HashMap<JobVertexID, TaskState> taskStates; private final Map<OperatorID, OperatorState> operatorStates;


/** Properties for this checkpoint. */ /** Properties for this checkpoint. */
private final CheckpointProperties props; private final CheckpointProperties props;
Expand All @@ -117,38 +116,12 @@ public class CompletedCheckpoint implements Serializable {


// ------------------------------------------------------------------------ // ------------------------------------------------------------------------


@VisibleForTesting
CompletedCheckpoint(
JobID job,
long checkpointID,
long timestamp,
long completionTimestamp,
Map<JobVertexID, TaskState> taskStates) {

this(job, checkpointID, timestamp, completionTimestamp, taskStates,
Collections.<MasterState>emptyList(),
CheckpointProperties.forStandardCheckpoint());
}

public CompletedCheckpoint(
JobID job,
long checkpointID,
long timestamp,
long completionTimestamp,
Map<JobVertexID, TaskState> taskStates,
@Nullable Collection<MasterState> masterHookStates,
CheckpointProperties props) {

this(job, checkpointID, timestamp, completionTimestamp, taskStates,
masterHookStates, props, null, null);
}

public CompletedCheckpoint( public CompletedCheckpoint(
JobID job, JobID job,
long checkpointID, long checkpointID,
long timestamp, long timestamp,
long completionTimestamp, long completionTimestamp,
Map<JobVertexID, TaskState> taskStates, Map<OperatorID, OperatorState> operatorStates,
@Nullable Collection<MasterState> masterHookStates, @Nullable Collection<MasterState> masterHookStates,
CheckpointProperties props, CheckpointProperties props,
@Nullable StreamStateHandle externalizedMetadata, @Nullable StreamStateHandle externalizedMetadata,
Expand All @@ -171,7 +144,7 @@ public CompletedCheckpoint(


// we create copies here, to make sure we have no shared mutable // we create copies here, to make sure we have no shared mutable
// data structure with the "outside world" // data structure with the "outside world"
this.taskStates = new HashMap<>(checkNotNull(taskStates)); this.operatorStates = new HashMap<>(checkNotNull(operatorStates));
this.masterHookStates = masterHookStates == null || masterHookStates.isEmpty() ? this.masterHookStates = masterHookStates == null || masterHookStates.isEmpty() ?
Collections.<MasterState>emptyList() : Collections.<MasterState>emptyList() :
new ArrayList<>(masterHookStates); new ArrayList<>(masterHookStates);
Expand Down Expand Up @@ -239,19 +212,15 @@ public boolean discardOnShutdown(JobStatus jobStatus, SharedStateRegistry shared
public long getStateSize() { public long getStateSize() {
long result = 0L; long result = 0L;


for (TaskState taskState : taskStates.values()) { for (OperatorState operatorState : operatorStates.values()) {
result += taskState.getStateSize(); result += operatorState.getStateSize();
} }


return result; return result;
} }


public Map<JobVertexID, TaskState> getTaskStates() { public Map<OperatorID, OperatorState> getOperatorStates() {
return Collections.unmodifiableMap(taskStates); return operatorStates;
}

public TaskState getTaskState(JobVertexID jobVertexID) {
return taskStates.get(jobVertexID);
} }


public Collection<MasterState> getMasterHookStates() { public Collection<MasterState> getMasterHookStates() {
Expand Down Expand Up @@ -288,7 +257,7 @@ void setDiscardCallback(@Nullable CompletedCheckpointStats.DiscardCallback disca
* @param sharedStateRegistry The registry where shared states are registered * @param sharedStateRegistry The registry where shared states are registered
*/ */
public void registerSharedStates(SharedStateRegistry sharedStateRegistry) { public void registerSharedStates(SharedStateRegistry sharedStateRegistry) {
sharedStateRegistry.registerAll(taskStates.values()); sharedStateRegistry.registerAll(operatorStates.values());
} }


// -------------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -338,7 +307,7 @@ protected void doDiscardExternalizedMetaData() {
protected void doDiscardPrivateState() { protected void doDiscardPrivateState() {
// discard private state objects // discard private state objects
try { try {
StateUtil.bestEffortDiscardAllStateObjects(taskStates.values()); StateUtil.bestEffortDiscardAllStateObjects(operatorStates.values());
} catch (Exception e) { } catch (Exception e) {
storedException = ExceptionUtils.firstOrSuppressed(e, storedException); storedException = ExceptionUtils.firstOrSuppressed(e, storedException);
} }
Expand All @@ -353,7 +322,7 @@ protected void doReportStoredExceptions() throws Exception {
} }


protected void clearTaskStatesAndNotifyDiscardCompleted() { protected void clearTaskStatesAndNotifyDiscardCompleted() {
taskStates.clear(); operatorStates.clear();
// to be null-pointer safe, copy reference to stack // to be null-pointer safe, copy reference to stack
CompletedCheckpointStats.DiscardCallback discardCallback = CompletedCheckpointStats.DiscardCallback discardCallback =
CompletedCheckpoint.this.discardCallback; CompletedCheckpoint.this.discardCallback;
Expand Down Expand Up @@ -392,7 +361,7 @@ public StoredDiscardStrategy(SharedStateRegistry sharedStateRegistry) {


@Override @Override
protected void doDiscardSharedState() { protected void doDiscardSharedState() {
sharedStateRegistry.unregisterAll(taskStates.values()); sharedStateRegistry.unregisterAll(operatorStates.values());
} }
} }
} }
Original file line number Original file line Diff line number Diff line change
@@ -0,0 +1,183 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.checkpoint;

import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.CompositeStateHandle;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.util.Preconditions;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

/**
* Simple container class which contains the raw/managed/legacy operator state and key-group state handles for the sub
* tasks of an operator.
*/
public class OperatorState implements CompositeStateHandle {

private static final long serialVersionUID = -4845578005863201810L;

/** id of the operator */
private final OperatorID operatorID;

/** handles to non-partitioned states, subtaskindex -> subtaskstate */
private final Map<Integer, OperatorSubtaskState> operatorSubtaskStates;

/** parallelism of the operator when it was checkpointed */
private final int parallelism;

/** maximum parallelism of the operator when the job was first created */
private final int maxParallelism;

public OperatorState(OperatorID operatorID, int parallelism, int maxParallelism) {
Preconditions.checkArgument(
parallelism <= maxParallelism,
"Parallelism " + parallelism + " is not smaller or equal to max parallelism " + maxParallelism + ".");

this.operatorID = operatorID;

this.operatorSubtaskStates = new HashMap<>(parallelism);

this.parallelism = parallelism;
this.maxParallelism = maxParallelism;
}

public OperatorID getOperatorID() {
return operatorID;
}

public void putState(int subtaskIndex, OperatorSubtaskState subtaskState) {
Preconditions.checkNotNull(subtaskState);

if (subtaskIndex < 0 || subtaskIndex >= parallelism) {
throw new IndexOutOfBoundsException("The given sub task index " + subtaskIndex +
" exceeds the maximum number of sub tasks " + operatorSubtaskStates.size());
} else {
operatorSubtaskStates.put(subtaskIndex, subtaskState);
}
}

public OperatorSubtaskState getState(int subtaskIndex) {
if (subtaskIndex < 0 || subtaskIndex >= parallelism) {
throw new IndexOutOfBoundsException("The given sub task index " + subtaskIndex +
" exceeds the maximum number of sub tasks " + operatorSubtaskStates.size());
} else {
return operatorSubtaskStates.get(subtaskIndex);
}
}

public Collection<OperatorSubtaskState> getStates() {
return operatorSubtaskStates.values();
}

public int getNumberCollectedStates() {
return operatorSubtaskStates.size();
}

public int getParallelism() {
return parallelism;
}

public int getMaxParallelism() {
return maxParallelism;
}

public boolean hasNonPartitionedState() {
for (OperatorSubtaskState sts : operatorSubtaskStates.values()) {
if (sts != null && sts.getLegacyOperatorState() != null) {
return true;
}
}
return false;
}

@Override
public void discardState() throws Exception {
for (OperatorSubtaskState operatorSubtaskState : operatorSubtaskStates.values()) {
operatorSubtaskState.discardState();
}
}

@Override
public void registerSharedStates(SharedStateRegistry sharedStateRegistry) {
for (OperatorSubtaskState operatorSubtaskState : operatorSubtaskStates.values()) {
operatorSubtaskState.registerSharedStates(sharedStateRegistry);
}
}

@Override
public void unregisterSharedStates(SharedStateRegistry sharedStateRegistry) {
for (OperatorSubtaskState operatorSubtaskState : operatorSubtaskStates.values()) {
operatorSubtaskState.unregisterSharedStates(sharedStateRegistry);
}
}

@Override
public long getStateSize() {
long result = 0L;

for (int i = 0; i < parallelism; i++) {
OperatorSubtaskState operatorSubtaskState = operatorSubtaskStates.get(i);
if (operatorSubtaskState != null) {
result += operatorSubtaskState.getStateSize();
}
}

return result;
}

@Override
public boolean equals(Object obj) {
if (obj instanceof OperatorState) {
OperatorState other = (OperatorState) obj;

return operatorID.equals(other.operatorID)
&& parallelism == other.parallelism
&& operatorSubtaskStates.equals(other.operatorSubtaskStates);
} else {
return false;
}
}

@Override
public int hashCode() {
return parallelism + 31 * Objects.hash(operatorID, operatorSubtaskStates);
}

public Map<Integer, OperatorSubtaskState> getSubtaskStates() {
return Collections.unmodifiableMap(operatorSubtaskStates);
}

@Override
public String toString() {
// KvStates are always null in 1.1. Don't print this as it might
// confuse users that don't care about how we store it internally.
return "OperatorState(" +
"operatorID: " + operatorID +
", parallelism: " + parallelism +
", maxParallelism: " + maxParallelism +
", sub task states: " + operatorSubtaskStates.size() +
", total size (bytes): " + getStateSize() +
')';
}
}
Loading

0 comments on commit f7980a7

Please sign in to comment.