From a7e24580a2178b27ea77e7327c39ad7e75cac0a3 Mon Sep 17 00:00:00 2001 From: Gyula Fora Date: Sun, 17 May 2015 20:33:38 +0200 Subject: [PATCH] [streaming] Initial rework of the operator state interfaces --- .../api/common/functions/RichMapFunction.java | 2 +- .../api/common/functions/RuntimeContext.java | 47 ++++ .../util/AbstractRuntimeUDFContext.java | 12 + .../flink/api/common/state/OperatorState.java | 67 ++++++ .../api/common/state/StateCheckpointer.java | 73 ++++++ .../flink/runtime/state/LocalStateHandle.java | 19 +- .../runtime/state/PartitionedStateHandle.java | 47 ++++ .../runtime/state/PartitionedStateStore.java | 47 ++++ .../flink/runtime/state/StateUtils.java | 40 ++-- .../api/checkpoint/CheckpointCommitter.java | 7 +- .../api/datastream/IterativeDataStream.java | 2 + .../api/datastream/StreamProjection.java | 3 +- .../functions/source/FileReadFunction.java | 8 +- .../api/functions/source/SourceFunction.java | 4 +- .../streaming/api/graph/StreamConfig.java | 21 +- .../api/operators/AbstractStreamOperator.java | 6 +- .../operators/AbstractUdfStreamOperator.java | 68 +++--- .../api/operators/StatefulStreamOperator.java | 4 +- .../api/operators/StreamOperator.java | 8 +- .../api/state/BasicCheckpointer.java | 37 +++ .../streaming/api/state/EagerStateStore.java | 86 +++++++ .../streaming/api/state/LazyStateStore.java | 117 ++++++++++ .../state/PartitionedStreamOperatorState.java | 126 +++++++++++ .../api/state/StreamOperatorState.java | 100 +++++++++ .../runtime/io/BlockingQueueBroker.java | 4 +- .../runtime/io/StreamRecordWriter.java | 6 +- .../runtime/tasks/OneInputStreamTask.java | 4 +- .../runtime/tasks/StreamIterationHead.java | 1 - .../streaming/runtime/tasks/StreamTask.java | 22 +- .../tasks/StreamingRuntimeContext.java | 30 ++- .../runtime/tasks/TwoInputStreamTask.java | 1 - .../serialization/DeserializationSchema.java | 4 +- .../api/state/StatefulFunctionTest.java | 211 ++++++++++++++++++ .../runtime/tasks/SourceStreamTaskTest.java | 68 +++--- .../flink/streaming/util/MockCoContext.java | 7 +- .../flink/streaming/util/MockContext.java | 7 +- .../StreamCheckpointingITCase.java | 118 ++++------ ...ProcessFailureStreamingRecoveryITCase.java | 79 ++----- 38 files changed, 1244 insertions(+), 269 deletions(-) create mode 100644 flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java create mode 100644 flink-core/src/main/java/org/apache/flink/api/common/state/StateCheckpointer.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/state/PartitionedStateHandle.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/state/PartitionedStateStore.java create mode 100644 flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/BasicCheckpointer.java create mode 100644 flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/EagerStateStore.java create mode 100644 flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/LazyStateStore.java create mode 100644 flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStreamOperatorState.java create mode 100644 flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamOperatorState.java create mode 100644 flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulFunctionTest.java diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichMapFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichMapFunction.java index 2005be01f1f14..7adb25ba5b887 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichMapFunction.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichMapFunction.java @@ -34,7 +34,7 @@ public abstract class RichMapFunction extends AbstractRichFunction implements MapFunction { private static final long serialVersionUID = 1L; - + @Override public abstract OUT map(IN value) throws Exception; } diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java index f68d2b0a9e045..a3b8f6570d967 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java @@ -29,6 +29,8 @@ import org.apache.flink.api.common.accumulators.IntCounter; import org.apache.flink.api.common.accumulators.LongCounter; import org.apache.flink.api.common.cache.DistributedCache; +import org.apache.flink.api.common.state.OperatorState; +import org.apache.flink.api.common.state.StateCheckpointer; /** * A RuntimeContext contains information about the context in which functions are executed. Each parallel instance @@ -160,4 +162,49 @@ public interface RuntimeContext { * @return The distributed cache of the worker executing this instance. */ DistributedCache getDistributedCache(); + + // -------------------------------------------------------------------------------------------- + + /** + * Returns the {@link OperatorState} of this operator instance, which can be + * used to store and update user state in a fault tolerant fashion. The + * state will be initialized by the provided default value, and the + * {@link StateCheckpointer} will be used to draw the state snapshots. + * + *

+ * When storing a {@link Serializable} state the user can omit the + * {@link StateCheckpointer} in which case the full state will be written as + * the snapshot. + *

+ * + * @param defaultState + * Default value for the operator state. This will be returned + * the first time {@link OperatorState#getState()} (for every + * state partition) is called before + * {@link OperatorState#updateState(Object)}. + * @param checkpointer + * The {@link StateCheckpointer} that will be used to draw + * snapshots from the user state. + * @return The {@link OperatorState} for this instance. + */ + OperatorState getOperatorState(S defaultState, StateCheckpointer checkpointer); + + /** + * Returns the {@link OperatorState} of this operator instance, which can be + * used to store and update user state in a fault tolerant fashion. The + * state will be initialized by the provided default value. + * + *

+ * When storing a non-{@link Serializable} state the user needs to specify a + * {@link StateCheckpointer} for drawing snapshots. + *

+ * + * @param defaultState + * Default value for the operator state. This will be returned + * the first time {@link OperatorState#getState()} (for every + * state partition) is called before + * {@link OperatorState#updateState(Object)}. + * @return The {@link OperatorState} for this instance. + */ + OperatorState getOperatorState(S defaultState); } diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java index 735fe8ea9c65c..413565b83fe99 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java @@ -33,6 +33,8 @@ import org.apache.flink.api.common.accumulators.LongCounter; import org.apache.flink.api.common.cache.DistributedCache; import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.state.OperatorState; +import org.apache.flink.api.common.state.StateCheckpointer; import org.apache.flink.core.fs.Path; /** @@ -170,4 +172,14 @@ private Accumulator getAccumulator(String name } return (Accumulator) accumulator; } + + @Override + public OperatorState getOperatorState(S defaultState, StateCheckpointer checkpointer) { + throw new UnsupportedOperationException("Operator state is only accessible for streaming operators."); + } + + @Override + public OperatorState getOperatorState(S defaultState) { + throw new UnsupportedOperationException("Operator state is only accessible for streaming operators."); + } } diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java b/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java new file mode 100644 index 0000000000000..5b3fa052f3d25 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.state; + +import org.apache.flink.api.common.functions.MapFunction; + +/** + * Base class for all streaming operator states. It can represent both + * partitioned (when state partitioning is defined in the program) or + * non-partitioned user states. + * + * State can be accessed and manipulated using the {@link #getState()} and + * {@link #updateState(T)} methods. These calls are only valid in the + * transformation call the operator represents, for instance inside + * {@link MapFunction#map()} and invalid in + * {@link #open(org.apache.flink.configuration.Configuration)} or + * {@link #close()}. + * + * @param + * Type of the operator state + */ +public interface OperatorState { + + /** + * Gets the current state for the operator. When the state is not + * partitioned the returned state is the same for all inputs. If state + * partitioning is applied the state returned depends on the current + * operator input, as the operator maintains an independent state for each + * partitions. + * + *

+ * {@link #getState()} returns null if there is no state stored + * in the operator. This is the expected behaviour before initializing the + * state with {@link #updateState(T)}. + *

+ * + * @return The operator state corresponding to the current input. + */ + T getState(); + + /** + * Updates the operator state accessible by {@link #getState()} to the given + * value. The next time {@link #getState()} is called (for the same state + * partition) the returned state will represent the updated value. + * + * @param state + * The updated state. + */ + void updateState(T state); + +} diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateCheckpointer.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateCheckpointer.java new file mode 100644 index 0000000000000..488e3082affde --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateCheckpointer.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.state; + +import java.io.Serializable; + +/** + * Basic interface for creating {@link OperatorState} snapshots in stateful + * streaming programs. + * + * The user needs to implement the {@link #snapshotState(S, long, long)} and + * {@link #restoreState(C)} methods that will be called to create and restore + * state snapshots of the given states. + * + *

+ * Note that the {@link OperatorState} is synchronously checkpointed. + * While the state is written, the state cannot be accessed or modified so the + * function needs not return a copy of its state, but may return a reference to + * its state. + *

+ * + * @param + * Type of the operator state. + * @param + * Type of the snapshot that will be persisted. + */ +public interface StateCheckpointer { + + /** + * Takes a snapshot of a given operator state. The snapshot returned will be + * persisted in the state backend for this job and restored upon failure. + * This method is called for all state partitions in case of partitioned + * state when creating a checkpoint. + * + * @param state + * The state for which the snapshot needs to be taken + * @param checkpointId + * The ID of the checkpoint. + * @param checkpointTimestamp + * The timestamp of the checkpoint, as derived by + * System.currentTimeMillis() on the JobManager. + * + * @return A snapshot of the operator state. + */ + public C snapshotState(S state, long checkpointId, long checkpointTimestamp); + + /** + * Restores the operator states from a given snapshot. The restores state + * will be loaded back to the function. In case of partitioned state, each + * partition is restored independently. + * + * @param stateSnapshot + * The state snapshot that needs to be restored. + * @return The state corresponding to the snapshot. + */ + public S restoreState(C stateSnapshot); +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java index a53d8dac5e889..5ba372dc39d25 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java @@ -23,36 +23,33 @@ /** * A StateHandle that includes the operator states directly. */ -public class LocalStateHandle implements StateHandle { +public class LocalStateHandle implements StateHandle { private static final long serialVersionUID = 2093619217898039610L; - private final Serializable state; + private final T state; - public LocalStateHandle(Serializable state) { + public LocalStateHandle(T state) { this.state = state; } @Override - public Serializable getState() { + public T getState() { return state; } @Override public void discardState() throws Exception { } - - public static LocalStateHandleProvider createProvider(){ - return new LocalStateHandleProvider(); - } - private static class LocalStateHandleProvider implements StateHandleProvider { + public static class LocalStateHandleProvider implements + StateHandleProvider { private static final long serialVersionUID = 4665419208932921425L; @Override - public LocalStateHandle createStateHandle(Serializable state) { - return new LocalStateHandle(state); + public LocalStateHandle createStateHandle(R state) { + return new LocalStateHandle(state); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PartitionedStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PartitionedStateHandle.java new file mode 100644 index 0000000000000..4119df1191127 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PartitionedStateHandle.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import java.io.Serializable; +import java.util.Map; + +public class PartitionedStateHandle implements + StateHandle>> { + + private static final long serialVersionUID = 7505365403501402100L; + + Map> handles; + + public PartitionedStateHandle(Map> handles) { + this.handles = handles; + } + + @Override + public Map> getState() throws Exception { + return handles; + } + + @Override + public void discardState() throws Exception { + for (StateHandle handle : handles.values()) { + handle.discardState(); + } + } + +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PartitionedStateStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PartitionedStateStore.java new file mode 100644 index 0000000000000..8b73edfdd34cb --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PartitionedStateStore.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import java.io.Serializable; +import java.util.Map; + +/** + * Interface for storing and accessing partitioned state. The interface is + * designed in a way that allows implementations for lazily state access. + * + * @param + * Type of the state. + * @param + * Type of the state snapshot. + */ +public interface PartitionedStateStore { + + S getStateForKey(Serializable key) throws Exception; + + void setStateForKey(Serializable key, S state); + + Map getPartitionedState() throws Exception; + + Map> snapshotStates(long checkpointId, long checkpointTimestamp) throws Exception; + + void restoreStates(Map> snapshots) throws Exception; + + boolean containsKey(Serializable key); + +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtils.java index fbd76bad7d52d..7977e09e7618a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtils.java @@ -18,6 +18,8 @@ package org.apache.flink.runtime.state; +import java.util.List; + import org.apache.flink.runtime.jobgraph.tasks.OperatorStateCarrier; /** @@ -26,20 +28,23 @@ public class StateUtils { /** - * Utility method to define a common generic bound to be used for setting a generic state - * handle on a generic state carrier. + * Utility method to define a common generic bound to be used for setting a + * generic state handle on a generic state carrier. * - * This has no impact on runtime, since internally, it performs - * unchecked casts. The purpose is merely to allow the use of generic interfaces without resorting - * to raw types, by giving the compiler a common type bound. + * This has no impact on runtime, since internally, it performs unchecked + * casts. The purpose is merely to allow the use of generic interfaces + * without resorting to raw types, by giving the compiler a common type + * bound. * - * @param op The state carrier operator. - * @param state The state handle. - * @param Type bound for the + * @param op + * The state carrier operator. + * @param state + * The state handle. + * @param + * Type bound for the */ - public static > void setOperatorState(OperatorStateCarrier op, StateHandle state) - throws Exception - { + public static > void setOperatorState(OperatorStateCarrier op, + StateHandle state) throws Exception { @SuppressWarnings("unchecked") OperatorStateCarrier typedOp = (OperatorStateCarrier) op; @SuppressWarnings("unchecked") @@ -47,10 +52,15 @@ public static > void setOperatorState(OperatorStateCarr typedOp.setInitialState(typedHandle); } - - + + public static List rePartitionHandles( + List handles, int numPartitions) { + return null; + } + // ------------------------------------------------------------------------ - + /** Do not instantiate */ - private StateUtils() {} + private StateUtils() { + } } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointCommitter.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointCommitter.java index a95b540ae6edf..82ef6f38f1d7c 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointCommitter.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointCommitter.java @@ -18,6 +18,10 @@ package org.apache.flink.streaming.api.checkpoint; +import java.io.Serializable; + +import org.apache.flink.runtime.state.StateHandle; + /** * This interface must be implemented by functions/operations that want to receive * a commit notification once a checkpoint has been completely acknowledged by all @@ -32,6 +36,7 @@ public interface CheckpointCommitter { * fail any more. * * @param checkpointId The ID of the checkpoint that has been completed. + * @param checkPointedState Handle to the state that was checkpointed with this checkpoint id. */ - void commitCheckpoint(long checkpointId); + void commitCheckpoint(long checkpointId, StateHandle checkPointedState); } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java index 6a48b6abc142b..2d70d493017dc 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java @@ -17,6 +17,8 @@ package org.apache.flink.streaming.api.datastream; +import org.apache.flink.streaming.api.collector.selector.OutputSelector; + /** * The iterative data stream represents the start of an iteration in a * {@link DataStream}. diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java index 16e9deb725f6d..149d7a886a638 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java @@ -17,7 +17,6 @@ package org.apache.flink.streaming.api.datastream; -import com.google.common.base.Preconditions; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple1; @@ -48,6 +47,8 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.streaming.api.operators.StreamProject; +import com.google.common.base.Preconditions; + public class StreamProjection { private DataStream dataStream; diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileReadFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileReadFunction.java index 945f95363e750..4f859e892c1b8 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileReadFunction.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileReadFunction.java @@ -17,6 +17,10 @@ package org.apache.flink.streaming.api.functions.source; +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.net.URI; + import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.core.fs.FSDataInputStream; @@ -24,10 +28,6 @@ import org.apache.flink.core.fs.Path; import org.apache.flink.util.Collector; -import java.io.BufferedReader; -import java.io.InputStreamReader; -import java.net.URI; - public class FileReadFunction implements FlatMapFunction, String> { private static final long serialVersionUID = 1L; diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java index 17ca34debcc52..921a33bceae76 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java @@ -18,10 +18,10 @@ package org.apache.flink.streaming.api.functions.source; -import org.apache.flink.api.common.functions.Function; - import java.io.Serializable; +import org.apache.flink.api.common.functions.Function; + /** * Base interface for all stream data sources in Flink. The contract of a stream source * is the following: When the source should start emitting elements the {@link #run} method diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java index 329b4dd304a52..0784582abba43 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; +import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.state.StateHandleProvider; import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper; @@ -59,6 +60,7 @@ public class StreamConfig implements Serializable { private static final String OUT_STREAM_EDGES = "outStreamEdges"; private static final String IN_STREAM_EDGES = "inStreamEdges"; private static final String STATEHANDLE_PROVIDER = "stateHandleProvider"; + private static final String STATE_PARTITIONER = "statePartitioner"; // DEFAULT VALUES private static final long DEFAULT_TIMEOUT = 100; @@ -381,7 +383,6 @@ public Map getTransitiveChainedTaskConfigs(ClassLoader cl } public void setStateHandleProvider(StateHandleProvider provider) { - try { InstantiationUtil.writeObjectToConfig(provider, this.config, STATEHANDLE_PROVIDER); } catch (IOException e) { @@ -398,6 +399,24 @@ public StateHandleProvider getStateHandleProvider(ClassLoader cl) { throw new StreamTaskException("Could not instantiate statehandle provider.", e); } } + + public void setStatePartitioner(KeySelector partitioner) { + try { + InstantiationUtil.writeObjectToConfig(partitioner, this.config, STATE_PARTITIONER); + } catch (IOException e) { + throw new StreamTaskException("Could not serialize state partitioner.", e); + } + } + + @SuppressWarnings("unchecked") + public KeySelector getStatePartitioner(ClassLoader cl) { + try { + return (KeySelector) InstantiationUtil + .readObjectFromConfig(this.config, STATE_PARTITIONER, cl); + } catch (Exception e) { + throw new StreamTaskException("Could not instantiate state partitioner.", e); + } + } public void setChainStart() { config.setBoolean(IS_CHAINED_VERTEX, true); diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java index a3655877adfba..40f61d980c30b 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java @@ -19,8 +19,8 @@ package org.apache.flink.streaming.api.operators; import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext; /** * Base class for operators that do not contain a user-defined function. @@ -31,7 +31,7 @@ public abstract class AbstractStreamOperator implements StreamOperator private static final long serialVersionUID = 1L; - protected transient RuntimeContext runtimeContext; + protected transient StreamingRuntimeContext runtimeContext; protected transient ExecutionConfig executionConfig; @@ -43,7 +43,7 @@ public abstract class AbstractStreamOperator implements StreamOperator protected ChainingStrategy chainingStrategy = ChainingStrategy.HEAD; @Override - public void setup(Output output, RuntimeContext runtimeContext) { + public void setup(Output output, StreamingRuntimeContext runtimeContext) { this.output = output; this.executionConfig = runtimeContext.getExecutionConfig(); this.runtimeContext = runtimeContext; diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java index 90b2b2ffceb2b..cbcbcee351cdf 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java @@ -18,20 +18,25 @@ package org.apache.flink.streaming.api.operators; +import java.io.Serializable; +import java.util.Map; + import org.apache.flink.api.common.functions.Function; -import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.functions.util.FunctionUtils; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.StateHandle; import org.apache.flink.streaming.api.checkpoint.CheckpointCommitter; -import org.apache.flink.streaming.api.checkpoint.Checkpointed; - -import java.io.Serializable; +import org.apache.flink.streaming.api.state.StreamOperatorState; +import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext; /** - * This is used as the base class for operators that have a user-defined function. + * This is used as the base class for operators that have a user-defined + * function. * - * @param The output type of the operator - * @param The type of the user function + * @param + * The output type of the operator + * @param + * The type of the user function */ public abstract class AbstractUdfStreamOperator extends AbstractStreamOperator implements StatefulStreamOperator { @@ -44,7 +49,7 @@ public AbstractUdfStreamOperator(F userFunction) { } @Override - public final void setup(Output output, RuntimeContext runtimeContext) { + public void setup(Output output, StreamingRuntimeContext runtimeContext) { super.setup(output, runtimeContext); FunctionUtils.setFunctionRuntimeContext(userFunction, runtimeContext); } @@ -57,35 +62,37 @@ public void open(Configuration parameters) throws Exception { } @Override - public void close() throws Exception{ + public void close() throws Exception { super.close(); FunctionUtils.closeFunction(userFunction); } + @SuppressWarnings("unchecked") public void restoreInitialState(Serializable state) throws Exception { - if (userFunction instanceof Checkpointed) { - setStateOnFunction(state, userFunction); - } - else { - throw new IllegalStateException("Trying to restore state of a non-checkpointed function"); - } + + Map> snapshots = (Map>) state; + + StreamOperatorState operatorState = (StreamOperatorState) runtimeContext + .getOperatorState(); + + operatorState.restoreState(snapshots); + } - public Serializable getStateSnapshotFromFunction(long checkpointId, long timestamp) throws Exception { - if (userFunction instanceof Checkpointed) { - return ((Checkpointed) userFunction).snapshotState(checkpointId, timestamp); - } - else { - return null; - } + public Serializable getStateSnapshotFromFunction(long checkpointId, long timestamp) + throws Exception { + + StreamOperatorState operatorState = (StreamOperatorState) runtimeContext.getOperatorState(); + + return (Serializable) operatorState.snapshotState(checkpointId, timestamp); } - public void confirmCheckpointCompleted(long checkpointId, long timestamp) throws Exception { + public void confirmCheckpointCompleted(long checkpointId, long timestamp, + StateHandle checkpointedState) throws Exception { if (userFunction instanceof CheckpointCommitter) { try { - ((CheckpointCommitter) userFunction).commitCheckpoint(checkpointId); - } - catch (Exception e) { + ((CheckpointCommitter) userFunction).commitCheckpoint(checkpointId, checkpointedState); + } catch (Exception e) { throw new Exception("Error while confirming checkpoint " + checkpointId + " to the stream function", e); } } @@ -94,13 +101,4 @@ public void confirmCheckpointCompleted(long checkpointId, long timestamp) throws public F getUserFunction() { return userFunction; } - - private static void setStateOnFunction(Serializable state, Function function) { - @SuppressWarnings("unchecked") - T typedState = (T) state; - @SuppressWarnings("unchecked") - Checkpointed typedFunction = (Checkpointed) function; - - typedFunction.restoreState(typedState); - } } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StatefulStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StatefulStreamOperator.java index e171af8255d31..343f87d14d444 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StatefulStreamOperator.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StatefulStreamOperator.java @@ -19,6 +19,8 @@ import java.io.Serializable; +import org.apache.flink.runtime.state.StateHandle; + /** * Interface for Stream operators that can have state. This interface is used for checkpointing * and restoring that state. @@ -31,5 +33,5 @@ public interface StatefulStreamOperator extends StreamOperator { Serializable getStateSnapshotFromFunction(long checkpointId, long timestamp) throws Exception; - void confirmCheckpointCompleted(long checkpointId, long timestamp) throws Exception; + void confirmCheckpointCompleted(long checkpointId, long timestamp, StateHandle checkpointedState) throws Exception; } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java index 05b15be949c8e..aebff5c7df23b 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java @@ -17,11 +17,11 @@ package org.apache.flink.streaming.api.operators; -import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.flink.configuration.Configuration; - import java.io.Serializable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext; + /** * Basic interface for stream operators. Implementers would implement one of * {@link org.apache.flink.streaming.api.operators.OneInputStreamOperator} or @@ -37,7 +37,7 @@ public interface StreamOperator extends Serializable { /** * Initializes the {@link StreamOperator} for input and output handling. */ - public void setup(Output output, RuntimeContext runtimeContext); + public void setup(Output output, StreamingRuntimeContext runtimeContext); /** * This method is called before any elements are processed. diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/BasicCheckpointer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/BasicCheckpointer.java new file mode 100644 index 0000000000000..14d1504dabf18 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/BasicCheckpointer.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.state; + +import java.io.Serializable; + +import org.apache.flink.api.common.state.StateCheckpointer; + +public class BasicCheckpointer implements StateCheckpointer { + + @Override + public Serializable snapshotState(Serializable state, long checkpointId, long checkpointTimestamp) { + return state; + } + + @Override + public Serializable restoreState(Serializable stateSnapshot) { + return stateSnapshot; + } + +} diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/EagerStateStore.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/EagerStateStore.java new file mode 100644 index 0000000000000..4ac01a51103a8 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/EagerStateStore.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.state; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.flink.api.common.state.StateCheckpointer; +import org.apache.flink.runtime.state.PartitionedStateStore; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.state.StateHandleProvider; + +public class EagerStateStore implements PartitionedStateStore { + + protected StateCheckpointer checkpointer; + protected StateHandleProvider provider; + + Map fetchedState; + + public EagerStateStore(StateCheckpointer checkpointer, StateHandleProvider provider) { + this.checkpointer = checkpointer; + this.provider = provider; + + fetchedState = new HashMap(); + } + + @Override + public S getStateForKey(Serializable key) throws Exception { + return fetchedState.get(key); + } + + @Override + public void setStateForKey(Serializable key, S state) { + fetchedState.put(key, state); + } + + @Override + public Map getPartitionedState() throws Exception { + return fetchedState; + } + + @Override + public Map> snapshotStates(long checkpointId, + long checkpointTimestamp) { + + Map> handles = new HashMap>(); + + for (Entry stateEntry : fetchedState.entrySet()) { + handles.put(stateEntry.getKey(), provider.createStateHandle(checkpointer.snapshotState( + stateEntry.getValue(), checkpointId, checkpointTimestamp))); + } + return handles; + } + + @Override + public void restoreStates(Map> snapshots) throws Exception { + for (Entry> snapshotEntry : snapshots.entrySet()) { + fetchedState.put(snapshotEntry.getKey(), + checkpointer.restoreState(snapshotEntry.getValue().getState())); + } + } + + @Override + public boolean containsKey(Serializable key) { + return fetchedState.containsKey(key); + } + +} diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/LazyStateStore.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/LazyStateStore.java new file mode 100644 index 0000000000000..9872a0cb3df98 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/LazyStateStore.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.state; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.flink.api.common.state.StateCheckpointer; +import org.apache.flink.runtime.state.PartitionedStateStore; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.state.StateHandleProvider; + +/** + * Implementation of the {@link PartitionedStateStore} interface for lazy + * retrieval and snapshotting of the partitioned operator states. Lazy state + * access considerably speeds up recovery and makes resource access smoother by + * avoiding request congestion in the persistent storage layer. + * + *

+ * The logic implemented here can also be used later to push unused state to the + * persistent layer and also avoids re-snapshotting the unmodified states. + *

+ * + * @param + * Type of the operator states. + * @param + * Type of the state checkpoints. + */ +public class LazyStateStore implements PartitionedStateStore { + + protected StateCheckpointer checkpointer; + protected StateHandleProvider provider; + + Map> unfetchedState; + Map fetchedState; + + public LazyStateStore(StateCheckpointer checkpointer, StateHandleProvider provider) { + this.checkpointer = checkpointer; + this.provider = provider; + + unfetchedState = new HashMap>(); + fetchedState = new HashMap(); + } + + @Override + public S getStateForKey(Serializable key) throws Exception { + S state = fetchedState.get(key); + if (state != null) { + return state; + } else { + StateHandle handle = unfetchedState.get(key); + if (handle != null) { + state = checkpointer.restoreState(handle.getState()); + fetchedState.put(key, state); + unfetchedState.remove(key); + return state; + } else { + return null; + } + } + } + + @Override + public void setStateForKey(Serializable key, S state) { + fetchedState.put(key, state); + unfetchedState.remove(key); + } + + @Override + public Map getPartitionedState() throws Exception { + for (Entry> handleEntry : unfetchedState.entrySet()) { + fetchedState.put(handleEntry.getKey(), + checkpointer.restoreState(handleEntry.getValue().getState())); + } + unfetchedState.clear(); + return fetchedState; + } + + @Override + public Map> snapshotStates(long checkpointId, + long checkpointTimestamp) { + for (Entry stateEntry : fetchedState.entrySet()) { + unfetchedState.put(stateEntry.getKey(), provider.createStateHandle(checkpointer + .snapshotState(stateEntry.getValue(), checkpointId, checkpointTimestamp))); + } + return unfetchedState; + } + + @Override + public void restoreStates(Map> snapshots) { + unfetchedState.putAll(snapshots); + } + + @Override + public boolean containsKey(Serializable key) { + return fetchedState.containsKey(key) || unfetchedState.containsKey(key); + } + +} diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStreamOperatorState.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStreamOperatorState.java new file mode 100644 index 0000000000000..26b2a881cfacc --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStreamOperatorState.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.state; + +import java.io.Serializable; +import java.util.Map; + +import org.apache.flink.api.common.state.OperatorState; +import org.apache.flink.api.common.state.StateCheckpointer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.runtime.state.PartitionedStateStore; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.state.StateHandleProvider; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; + +/** + * Implementation of the {@link OperatorState} interface for partitioned user + * states. It provides methods for checkpointing and restoring partitioned + * operator states upon failure. + * + * @param + * Input type of the underlying {@link OneInputStreamOperator} + * @param + * Type of the underlying {@link OperatorState}. + * @param + * Type of the state snapshot. + */ +public class PartitionedStreamOperatorState extends + StreamOperatorState { + + // KeySelector for getting the state partition key for each input + private KeySelector keySelector; + + private PartitionedStateStore stateStore; + + private S defaultState; + + // The currently processed input, used to extract the appropriate key + private IN currentInput; + + public PartitionedStreamOperatorState(StateCheckpointer checkpointer, + StateHandleProvider provider, KeySelector keySelector) { + super(checkpointer, provider); + this.keySelector = keySelector; + this.stateStore = new EagerStateStore(checkpointer, provider); + } + + @SuppressWarnings("unchecked") + public PartitionedStreamOperatorState(StateHandleProvider provider, + KeySelector keySelector) { + this((StateCheckpointer) new BasicCheckpointer(), provider, keySelector); + } + + @Override + public S getState() { + if (currentInput == null) { + return null; + } else { + try { + Serializable key = keySelector.getKey(currentInput); + if(stateStore.containsKey(key)){ + return stateStore.getStateForKey(key); + }else{ + return defaultState; + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } + + @Override + public void updateState(S state) { + if (currentInput == null) { + throw new RuntimeException("Need a valid input for updating a state."); + } else { + try { + stateStore.setStateForKey(keySelector.getKey(currentInput), state); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } + + @Override + public void setDefaultState(S defaultState){ + this.defaultState = defaultState; + } + + public void setCurrentInput(IN input) { + currentInput = input; + } + + @Override + public Map> snapshotState(long checkpointId, + long checkpointTimestamp) throws Exception{ + return stateStore.snapshotStates(checkpointId, checkpointTimestamp); + } + + @Override + public void restoreState(Map> snapshots) throws Exception { + stateStore.restoreStates(snapshots); + } + + @Override + public Map getPartitionedState() throws Exception { + return stateStore.getPartitionedState(); + } + +} diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamOperatorState.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamOperatorState.java new file mode 100644 index 0000000000000..90a3726875ada --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamOperatorState.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.state; + +import java.io.Serializable; +import java.util.Map; + +import org.apache.flink.api.common.state.OperatorState; +import org.apache.flink.api.common.state.StateCheckpointer; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.state.StateHandleProvider; +import org.apache.flink.shaded.com.google.common.collect.ImmutableMap; + +/** + * Implementation of the {@link OperatorState} interface for non-partitioned + * user states. It provides methods for checkpointing and restoring operator + * states upon failure using the provided {@link StateCheckpointer} and + * {@link StateHandleProvider}. + * + * @param + * Type of the underlying {@link OperatorState}. + * @param + * Type of the state snapshot. + */ +public class StreamOperatorState implements OperatorState { + + protected static final Serializable DEFAULTKEY = -1; + + private S state; + private StateCheckpointer checkpointer; + private StateHandleProvider provider; + + public StreamOperatorState(StateCheckpointer checkpointer, StateHandleProvider provider) { + this.checkpointer = checkpointer; + this.provider = provider; + } + + @SuppressWarnings("unchecked") + public StreamOperatorState(StateHandleProvider provider) { + this((StateCheckpointer) new BasicCheckpointer(), provider); + } + + @Override + public S getState() { + return state; + } + + @Override + public void updateState(S state) { + this.state = state; + } + + public void setDefaultState(S defaultState){ + updateState(defaultState); + } + + public StateCheckpointer getCheckpointer() { + return checkpointer; + } + + public void setCheckpointer(StateCheckpointer checkpointer) { + this.checkpointer = checkpointer; + } + + protected StateHandleProvider getStateHandleProvider() { + return provider; + } + + public Map> snapshotState(long checkpointId, + long checkpointTimestamp) throws Exception { + return ImmutableMap.of(DEFAULTKEY, provider.createStateHandle(checkpointer.snapshotState( + getState(), checkpointId, checkpointTimestamp))); + + } + + public void restoreState(Map> snapshots) throws Exception { + updateState(checkpointer.restoreState(snapshots.get(DEFAULTKEY).getState())); + } + + public Map getPartitionedState() throws Exception { + return ImmutableMap.of(DEFAULTKEY, state); + } + +} diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BlockingQueueBroker.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BlockingQueueBroker.java index 390555810cc25..247fe25abaa1d 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BlockingQueueBroker.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BlockingQueueBroker.java @@ -17,9 +17,9 @@ package org.apache.flink.streaming.runtime.io; -import java.util.concurrent.BlockingQueue; +import java.util.concurrent.BlockingQueue; -import org.apache.flink.runtime.iterative.concurrent.Broker; +import org.apache.flink.runtime.iterative.concurrent.Broker; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; @SuppressWarnings("rawtypes") diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java index 941ddd2345d02..c212346c264f7 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java @@ -17,14 +17,14 @@ package org.apache.flink.streaming.runtime.io; +import java.io.IOException; + import org.apache.flink.core.io.IOReadableWritable; -import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; import org.apache.flink.runtime.io.network.api.writer.ChannelSelector; import org.apache.flink.runtime.io.network.api.writer.RecordWriter; +import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; import org.apache.flink.runtime.io.network.api.writer.RoundRobinChannelSelector; -import java.io.IOException; - public class StreamRecordWriter extends RecordWriter { private long timeout; diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java index 2360aa81476cb..6750b52ce3ccb 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java @@ -18,6 +18,8 @@ package org.apache.flink.streaming.runtime.tasks; +import java.io.IOException; + import org.apache.flink.runtime.io.network.partition.consumer.InputGate; import org.apache.flink.runtime.plugable.DeserializationDelegate; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; @@ -29,8 +31,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; - public class OneInputStreamTask extends StreamTask> { private static final Logger LOG = LoggerFactory.getLogger(OneInputStreamTask.class); diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java index e69f533cb2d71..4952cdfedcd7a 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java @@ -25,7 +25,6 @@ import org.apache.flink.streaming.api.collector.StreamOutput; import org.apache.flink.streaming.runtime.io.BlockingQueueBroker; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index db95dccf1dc9d..e7f4d9c66527f 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -24,6 +24,7 @@ import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.functors.NotNullPredicate; +import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.runtime.event.task.TaskEvent; @@ -41,6 +42,8 @@ import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.StatefulStreamOperator; import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.api.state.PartitionedStreamOperatorState; +import org.apache.flink.streaming.api.state.StreamOperatorState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -102,10 +105,19 @@ public String getName() { return getEnvironment().getTaskName(); } + @SuppressWarnings({ "rawtypes", "unchecked" }) public StreamingRuntimeContext createRuntimeContext(StreamConfig conf) { Environment env = getEnvironment(); - return new StreamingRuntimeContext(conf.getStreamOperator(userClassLoader).getClass() - .getSimpleName(), env, getUserCodeClassLoader(), getExecutionConfig()); + String operatorName = conf.getStreamOperator(userClassLoader).getClass().getSimpleName(); + + KeySelector statePartitioner = conf.getStatePartitioner(userClassLoader); + + StreamOperatorState state = statePartitioner == null ? new StreamOperatorState( + getStateHandleProvider()) : new PartitionedStreamOperatorState( + getStateHandleProvider(), statePartitioner); + + return new StreamingRuntimeContext(operatorName, env, getUserCodeClassLoader(), + getExecutionConfig(), state); } private StateHandleProvider getStateHandleProvider() { @@ -129,7 +141,7 @@ private StateHandleProvider getStateHandleProvider() { switch (backend) { case JOBMANAGER: LOG.info("State backend for state checkpoints is set to jobmanager."); - return LocalStateHandle.createProvider(); + return new LocalStateHandle.LocalStateHandleProvider(); case FILESYSTEM: String checkpointDir = GlobalConfiguration.getString(ConfigConstants.STATE_BACKEND_FS_DIR, null); if (checkpointDir != null) { @@ -294,13 +306,13 @@ public void confirmCheckpoint(long checkpointId, long timestamp) throws Exceptio // we do nothing here so far. this should call commit on the source function, for example synchronized (checkpointLock) { if (streamOperator instanceof StatefulStreamOperator) { - ((StatefulStreamOperator) streamOperator).confirmCheckpointCompleted(checkpointId, timestamp); + ((StatefulStreamOperator) streamOperator).confirmCheckpointCompleted(checkpointId, timestamp, null); } if (hasChainedOperators) { for (OneInputStreamOperator chainedOperator : outputHandler.getChainedOperators()) { if (chainedOperator instanceof StatefulStreamOperator) { - ((StatefulStreamOperator) chainedOperator).confirmCheckpointCompleted(checkpointId, timestamp); + ((StatefulStreamOperator) chainedOperator).confirmCheckpointCompleted(checkpointId, timestamp, null); } } } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java index 6112e03c0e5b2..5fd158c3941fc 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java @@ -18,13 +18,18 @@ package org.apache.flink.streaming.runtime.tasks; +import java.io.Serializable; + import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.functions.util.RuntimeUDFContext; +import org.apache.flink.api.common.state.OperatorState; +import org.apache.flink.api.common.state.StateCheckpointer; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; import org.apache.flink.runtime.operators.util.TaskConfig; +import org.apache.flink.streaming.api.state.StreamOperatorState; /** * Implementation of the {@link RuntimeContext}, created by runtime stream UDF @@ -33,13 +38,16 @@ public class StreamingRuntimeContext extends RuntimeUDFContext { private final Environment env; + @SuppressWarnings("rawtypes") + private StreamOperatorState state; public StreamingRuntimeContext(String name, Environment env, ClassLoader userCodeClassLoader, - ExecutionConfig executionConfig) { + ExecutionConfig executionConfig, StreamOperatorState state) { super(name, env.getNumberOfSubtasks(), env.getIndexInSubtaskGroup(), userCodeClassLoader, executionConfig, env.getDistributedCacheEntries()); this.env = env; + this.state = state; } /** @@ -60,5 +68,25 @@ public InputSplitProvider getInputSplitProvider() { public Configuration getTaskStubParameters() { return new TaskConfig(env.getTaskConfiguration()).getStubParameters(); } + + @SuppressWarnings("unchecked") + @Override + public OperatorState getOperatorState(S defaultState, + StateCheckpointer checkpointer) { + state.setCheckpointer(checkpointer); + return (OperatorState) state; + } + + @SuppressWarnings("unchecked") + @Override + public OperatorState getOperatorState(S defaultState) { + state.setDefaultState(defaultState); + return (OperatorState) state; + } + + @SuppressWarnings("unchecked") + public OperatorState getOperatorState() { + return (OperatorState) state; + } } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java index 67119f7c13b23..2052877c30041 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java @@ -30,7 +30,6 @@ import org.apache.flink.streaming.runtime.io.InputGateFactory; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java index 87c97571c2475..333bcdd2817bb 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java @@ -17,10 +17,10 @@ package org.apache.flink.streaming.util.serialization; -import org.apache.flink.api.java.typeutils.ResultTypeQueryable; - import java.io.Serializable; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; + public interface DeserializationSchema extends Serializable, ResultTypeQueryable { /** diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulFunctionTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulFunctionTest.java new file mode 100644 index 0000000000000..f4c1a89257ca9 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulFunctionTest.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.state; + +import static org.junit.Assert.assertEquals; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.flink.api.common.accumulators.DoubleCounter; +import org.apache.flink.api.common.accumulators.Histogram; +import org.apache.flink.api.common.accumulators.IntCounter; +import org.apache.flink.api.common.accumulators.LongCounter; +import org.apache.flink.api.common.cache.DistributedCache; +import org.apache.flink.api.common.functions.BroadcastVariableInitializer; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.state.OperatorState; +import org.apache.flink.api.common.state.StateCheckpointer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.LocalStateHandle.LocalStateHandleProvider; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.shaded.com.google.common.collect.ImmutableMap; +import org.apache.flink.util.InstantiationUtil; +import org.junit.Test; + +public class StatefulFunctionTest { + + @SuppressWarnings("unchecked") + @Test + public void simpleStateTest() throws Exception { + + StatefulMapper mapper = new StatefulMapper(); + MockContext context = new MockContext(false, mapper); + mapper.setRuntimeContext(context); + mapper.open(null); + + assertEquals(Arrays.asList("1", "2", "3", "4", "5"), + applyOnSequence(mapper, 1, 5, context.state)); + assertEquals((Integer) 5, context.state.getState()); + + byte[] serializedState = InstantiationUtil.serializeObject(context.state + .snapshotState(1, 1)); + + StatefulMapper restoredMapper = new StatefulMapper(); + MockContext restoredContext = new MockContext(false, restoredMapper); + restoredMapper.setRuntimeContext(context); + restoredMapper.open(null); + + assertEquals(null, restoredContext.state.getState()); + + Map> deserializedState = (Map>) InstantiationUtil + .deserializeObject(serializedState, Thread.currentThread().getContextClassLoader()); + + restoredContext.state.restoreState(deserializedState); + + assertEquals((Integer) 5, restoredContext.state.getState()); + + } + + @SuppressWarnings("unchecked") + @Test + public void partitionedStateTest() throws Exception { + StatefulMapper mapper = new StatefulMapper(); + MockContext context = new MockContext(true, mapper); + mapper.setRuntimeContext(context); + mapper.open(null); + + assertEquals(Arrays.asList("1", "2", "3", "4", "5"), + applyOnSequence(mapper, 1, 5, context.state)); + assertEquals(ImmutableMap.of(0, 2, 1, 3), context.state.getPartitionedState()); + + byte[] serializedState = InstantiationUtil.serializeObject(context.state + .snapshotState(1, 1)); + + StatefulMapper restoredMapper = new StatefulMapper(); + MockContext restoredContext = new MockContext(true, restoredMapper); + restoredMapper.setRuntimeContext(context); + restoredMapper.open(null); + + assertEquals(null, restoredContext.state.getState()); + + Map> deserializedState = (Map>) InstantiationUtil + .deserializeObject(serializedState, Thread.currentThread().getContextClassLoader()); + + restoredContext.state.restoreState(deserializedState); + + assertEquals(ImmutableMap.of(0, 2, 1, 3), restoredContext.state.getPartitionedState()); + + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + private List applyOnSequence(MapFunction mapper, int from, int to, + StreamOperatorState state) throws Exception { + List output = new ArrayList(); + for (int i = from; i <= to; i++) { + if (state instanceof PartitionedStreamOperatorState) { + ((PartitionedStreamOperatorState) state).setCurrentInput(i); + } + output.add(mapper.map(i)); + } + return output; + } + + public static class ModKey implements KeySelector { + + private static final long serialVersionUID = 4193026742083046736L; + + int base; + + public ModKey(int base) { + this.base = base; + } + + @Override + public Integer getKey(Integer value) throws Exception { + return value % base; + } + + } + + public static class StatefulMapper extends RichMapFunction { + + private static final long serialVersionUID = -9007873655253339356L; + OperatorState opState; + + @Override + public String map(Integer value) throws Exception { + opState.updateState(opState.getState() + 1); + return value.toString(); + } + + @Override + public void open(Configuration conf) { + opState = getRuntimeContext().getOperatorState(0); + } + } + + public static class MockContext implements RuntimeContext { + + StreamOperatorState state; + + public MockContext(boolean isPartitionedState, StatefulMapper mapper) { + if (isPartitionedState) { + this.state = new PartitionedStreamOperatorState( + new LocalStateHandleProvider(), new ModKey(2)); + } else { + this.state = new StreamOperatorState( + new LocalStateHandleProvider()); + } + } + + public String getTaskName() {return null;} + public int getNumberOfParallelSubtasks() {return 0;} + public int getIndexOfThisSubtask() {return 0;} + public ExecutionConfig getExecutionConfig() {return null;} + public ClassLoader getUserCodeClassLoader() {return null;} + public void addAccumulator(String name, Accumulator accumulator) {} + public Accumulator getAccumulator(String name) {return null;} + public HashMap> getAllAccumulators() {return null;} + public IntCounter getIntCounter(String name) {return null;} + public LongCounter getLongCounter(String name) {return null;} + public DoubleCounter getDoubleCounter(String name) {return null;} + public Histogram getHistogram(String name) {return null;} + public List getBroadcastVariable(String name) {return null;} + public C getBroadcastVariableWithInitializer(String name, + BroadcastVariableInitializer initializer) {return null;} + public DistributedCache getDistributedCache() {return null;} + + @SuppressWarnings("unchecked") + @Override + public OperatorState getOperatorState(S defaultState, + StateCheckpointer checkpointer) { + state.setDefaultState((Integer) defaultState); + return (OperatorState) state; + } + + @SuppressWarnings("unchecked") + @Override + public OperatorState getOperatorState(S defaultState) { + state.setDefaultState((Integer) defaultState); + return (OperatorState) state; + } + + } + +} diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java index c745e6c5ef630..0a26ebb54ec32 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java @@ -18,17 +18,29 @@ package org.apache.flink.streaming.runtime.tasks; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicLong; + import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.OperatorState; +import org.apache.flink.api.common.state.StateCheckpointer; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; import org.apache.flink.runtime.taskmanager.Task; -import org.apache.flink.streaming.api.checkpoint.Checkpointed; import org.apache.flink.streaming.api.collector.selector.BroadcastOutputSelectorWrapper; import org.apache.flink.streaming.api.collector.selector.OutputSelector; -import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.graph.StreamEdge; import org.apache.flink.streaming.api.graph.StreamNode; @@ -42,18 +54,6 @@ import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.LinkedList; -import java.util.List; -import java.util.Random; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.Semaphore; -import java.util.concurrent.atomic.AtomicLong; - @RunWith(PowerMockRunner.class) @PrepareForTest({Task.class, ResultPartitionWriter.class}) public class SourceStreamTaskTest extends StreamTaskTestBase { @@ -144,7 +144,7 @@ public void testDataSourceTask() throws Exception { Assert.assertEquals(NUM_ELEMENTS, outList.size()); } - private static class MockSource implements SourceFunction>, Checkpointed { + private static class MockSource extends RichSourceFunction> implements StateCheckpointer { private static final long serialVersionUID = 1; @@ -156,6 +156,7 @@ private static class MockSource implements SourceFunction> private volatile long lastCheckpointId = -1; private Semaphore semaphore; + private OperatorState state; private volatile boolean isRunning = true; @@ -164,7 +165,7 @@ public MockSource(int maxElements, int checkpointDelay, int readDelay) { this.checkpointDelay = checkpointDelay; this.readDelay = readDelay; this.count = 0; - semaphore = new Semaphore(1); + this.semaphore = new Semaphore(1); } @Override @@ -189,32 +190,33 @@ public void run(SourceContext> ctx) { public void cancel() { isRunning = false; } + + @Override + public void open(Configuration conf){ + state = getRuntimeContext().getOperatorState(1, this); + } + @Override - public Serializable snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { + public Integer snapshotState(Integer state, long checkpointId, long checkpointTimestamp) { if (!semaphore.tryAcquire()) { Assert.fail("Concurrent invocation of snapshotState."); - } - int startCount = count; - lastCheckpointId = checkpointId; - - long sum = 0; - for (int i = 0; i < checkpointDelay; i++) { - sum += new Random().nextLong(); - } - - if (startCount != count) { + } else { + int startCount = count; + + if (startCount != count) { + semaphore.release(); + // This means that next() was invoked while the snapshot was ongoing + Assert.fail("Count is different at start end end of snapshot."); + } semaphore.release(); - // This means that next() was invoked while the snapshot was ongoing - Assert.fail("Count is different at start end end of snapshot."); } - semaphore.release(); - return sum; + return 0; } @Override - public void restoreState(Serializable state) { - + public Integer restoreState(Integer stateSnapshot) { + return stateSnapshot; } } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java index 77139949b4b78..ea0cb9455a9d7 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java @@ -20,12 +20,10 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; -import java.util.HashMap; import java.util.Iterator; import java.util.List; import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.typeutils.TypeExtractor; @@ -157,8 +155,9 @@ public CoReaderIterator, StreamRecord> getIterator() { public static List createAndExecute(TwoInputStreamOperator operator, List input1, List input2) { MockCoContext mockContext = new MockCoContext(input1, input2); - RuntimeContext runtimeContext = new StreamingRuntimeContext("CoMockTask", new MockEnvironment(3 * 1024 * 1024, new MockInputSplitProvider(), 1024), null, - new ExecutionConfig()); + StreamingRuntimeContext runtimeContext = new StreamingRuntimeContext("CoMockTask", + new MockEnvironment(3 * 1024 * 1024, new MockInputSplitProvider(), 1024), null, + new ExecutionConfig(), null); operator.setup(mockContext.collector, runtimeContext); diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java index 8b5607fa825fd..adec338e98ffc 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java @@ -20,12 +20,10 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; -import java.util.HashMap; import java.util.Iterator; import java.util.List; import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.runtime.operators.testutils.MockEnvironment; @@ -105,8 +103,9 @@ public MutableObjectIterator> getIterator() { public static List createAndExecute(OneInputStreamOperator operator, List inputs) { MockContext mockContext = new MockContext(inputs); - RuntimeContext runtimeContext = new StreamingRuntimeContext("MockTask", new MockEnvironment(3 * 1024 * 1024, new MockInputSplitProvider(), 1024), null, - new ExecutionConfig()); + StreamingRuntimeContext runtimeContext = new StreamingRuntimeContext("MockTask", + new MockEnvironment(3 * 1024 * 1024, new MockInputSplitProvider(), 1024), null, + new ExecutionConfig(), null); operator.setup(mockContext.output, runtimeContext); try { diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java index f0eef9d5e0a05..8c17eec778d35 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java @@ -18,6 +18,10 @@ package org.apache.flink.test.checkpointing; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + import java.util.HashMap; import java.util.Map; import java.util.Random; @@ -25,24 +29,19 @@ import org.apache.flink.api.common.functions.RichFilterFunction; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.functions.RichReduceFunction; +import org.apache.flink.api.common.state.OperatorState; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.checkpoint.Checkpointed; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.test.util.ForkableFlinkMiniCluster; - import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - /** * A simple test that runs a streaming topology with checkpointing enabled. * @@ -182,14 +181,14 @@ public void invoke(PrefixCount value) { // -------------------------------------------------------------------------------------------- private static class StringGeneratingSourceFunction extends RichSourceFunction - implements Checkpointed, ParallelSourceFunction { + implements ParallelSourceFunction { private final long numElements; private Random rnd; private StringBuilder stringBuilder; - private long index; + private OperatorState index; private int step; private volatile boolean isRunning; @@ -197,7 +196,7 @@ private static class StringGeneratingSourceFunction extends RichSourceFunction ctx) throws Exception { final Object lockingObject = ctx.getCheckpointLock(); - while (isRunning && index < numElements) { - char first = (char) ((index % 40) + 40); + while (isRunning && index.getState() < numElements) { + char first = (char) ((index.getState() % 40) + 40); stringBuilder.setLength(0); stringBuilder.append(first); @@ -230,7 +229,7 @@ public void run(SourceContext ctx) throws Exception { String result = randomString(stringBuilder, rnd); synchronized (lockingObject) { - index += step; + index.updateState(index.getState() + step); ctx.collect(result); } } @@ -241,16 +240,6 @@ public void cancel() { isRunning = false; } - @Override - public Long snapshotState(long checkpointId, long checkpointTimestamp) { - return this.index; - } - - @Override - public void restoreState(Long state) { - this.index = state; - } - private static String randomString(StringBuilder bld, Random rnd) { final int len = rnd.nextInt(10) + 5; @@ -263,35 +252,27 @@ private static String randomString(StringBuilder bld, Random rnd) { } } - private static class StatefulCounterFunction extends RichMapFunction - implements Checkpointed { - + private static class StatefulCounterFunction extends RichMapFunction { + private OperatorState count; + static final long[] counts = new long[PARALLELISM]; @Override public PrefixCount map(PrefixCount value) throws Exception { - count++; + count.updateState(count.getState() + 1); return value; } - static final long[] counts = new long[PARALLELISM]; - - private long count = 0; - - @Override - public void close() { - counts[getRuntimeContext().getIndexOfThisSubtask()] = count; - } - @Override - public Long snapshotState(long checkpointId, long checkpointTimestamp) { - return count; + public void open(Configuration conf) { + count = getRuntimeContext().getOperatorState(0L); } @Override - public void restoreState(Long state) { - count = state; + public void close() { + counts[getRuntimeContext().getIndexOfThisSubtask()] = count.getState(); } + } private static class OnceFailingReducer extends RichReduceFunction { @@ -353,60 +334,47 @@ public String toString() { } } - private static class StringRichFilterFunction extends RichFilterFunction implements Checkpointed { + private static class StringRichFilterFunction extends RichFilterFunction { + OperatorState count; + static final long[] counts = new long[PARALLELISM]; + @Override public boolean filter(String value) { - count++; + count.updateState(count.getState() + 1); return value.length() < 100; } - - static final long[] counts = new long[PARALLELISM]; - - private long count = 0; - - @Override - public void close() { - counts[getRuntimeContext().getIndexOfThisSubtask()] = count; - } - + @Override - public Long snapshotState(long checkpointId, long checkpointTimestamp) { - return count; + public void open(Configuration conf) { + this.count = getRuntimeContext().getOperatorState(0L); } @Override - public void restoreState(Long state) { - count = state; + public void close() { + counts[getRuntimeContext().getIndexOfThisSubtask()] = count.getState(); } } - private static class StringPrefixCountRichMapFunction extends RichMapFunction implements Checkpointed { - + private static class StringPrefixCountRichMapFunction extends RichMapFunction { + OperatorState count; + static final long[] counts = new long[PARALLELISM]; + @Override public PrefixCount map(String value) { - count++; + count.updateState(count.getState() + 1); return new PrefixCount(value.substring(0, 1), value, 1L); } - - static final long[] counts = new long[PARALLELISM]; - - private long count = 0; - - @Override - public void close() { - counts[getRuntimeContext().getIndexOfThisSubtask()] = count; - } - + @Override - public Long snapshotState(long checkpointId, long checkpointTimestamp) { - return count; + public void open(Configuration conf) { + this.count = getRuntimeContext().getOperatorState(0L); } @Override - public void restoreState(Long state) { - count = state; + public void close() { + counts[getRuntimeContext().getIndexOfThisSubtask()] = count.getState(); } } } diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java index f153eb7bda6b1..60032053a24c6 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java @@ -20,7 +20,6 @@ import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; import java.io.File; import java.io.IOException; @@ -30,10 +29,10 @@ import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.state.OperatorState; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.state.FileStateHandle; -import org.apache.flink.streaming.api.checkpoint.Checkpointed; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; @@ -84,7 +83,7 @@ public Long map(Long value) throws Exception { } }).startNewChain() // populate the coordinate directory so we can proceed to TaskManager failure - .map(new StatefulMapper(coordinateDir)); + .map(new Mapper(coordinateDir)); //write result to temporary file result.addSink(new CheckpointedSink(DATA_COUNT)); @@ -105,18 +104,16 @@ public Long map(Long value) throws Exception { } } - public static class SleepyDurableGenerateSequence extends RichParallelSourceFunction - implements Checkpointed { - private static final long serialVersionUID = 1L; + public static class SleepyDurableGenerateSequence extends RichParallelSourceFunction { private static final long SLEEP_TIME = 50; private final File coordinateDir; private final long end; - private long collected; - private volatile boolean isRunning = true; + + private OperatorState collected; public SleepyDurableGenerateSequence(File coordinateDir, long end) { this.coordinateDir = coordinateDir; @@ -136,7 +133,7 @@ public void run(SourceContext sourceCtx) throws Exception { final File proceedFile = new File(coordinateDir, PROCEED_MARKER_FILE); boolean checkForProceedFile = true; - while (isRunning && collected < toCollect) { + while (isRunning && collected.getState() < toCollect) { // check if the proceed file exists (then we go full speed) // if not, we always recheck and sleep if (checkForProceedFile) { @@ -149,34 +146,28 @@ public void run(SourceContext sourceCtx) throws Exception { } synchronized (checkpointLock) { - sourceCtx.collect(collected * stepSize + congruence); - collected++; + sourceCtx.collect(collected.getState() * stepSize + congruence); + collected.updateState(collected.getState() + 1); } } } - - @Override - public void cancel() { - isRunning = false; - } - + @Override - public Long snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { - return collected; + public void open(Configuration conf) { + collected = getRuntimeContext().getOperatorState(0L); } @Override - public void restoreState(Long state) { - collected = state; + public void cancel() { + isRunning = false; } } - public static class StatefulMapper extends RichMapFunction implements - Checkpointed { + + public static class Mapper extends RichMapFunction { private boolean markerCreated = false; private File coordinateDir; - private boolean restored = false; - public StatefulMapper(File coordinateDir) { + public Mapper(File coordinateDir) { this.coordinateDir = coordinateDir; } @@ -189,31 +180,14 @@ public Long map(Long value) throws Exception { } return value; } - - @Override - public void close() { - if (!restored) { - fail(); - } - } - - @Override - public Integer snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { - return 1; - } - - @Override - public void restoreState(Integer state) { - restored = true; - } } - private static class CheckpointedSink extends RichSinkFunction implements Checkpointed { + private static class CheckpointedSink extends RichSinkFunction { private long stepSize; private long congruence; private long toCollect; - private long collected = 0L; + private OperatorState collected; private long end; public CheckpointedSink(long end) { @@ -225,30 +199,21 @@ public void open(Configuration parameters) throws IOException { stepSize = getRuntimeContext().getNumberOfParallelSubtasks(); congruence = getRuntimeContext().getIndexOfThisSubtask(); toCollect = (end % stepSize > congruence) ? (end / stepSize + 1) : (end / stepSize); + collected = getRuntimeContext().getOperatorState(0L); } @Override public void invoke(Long value) throws Exception { - long expected = collected * stepSize + congruence; + long expected = collected.getState() * stepSize + congruence; Assert.assertTrue("Value did not match expected value. " + expected + " != " + value, value.equals(expected)); - collected++; + collected.updateState(collected.getState() + 1); - if (collected > toCollect) { + if (collected.getState() > toCollect) { Assert.fail("Collected <= toCollect: " + collected + " > " + toCollect); } } - - @Override - public Long snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { - return collected; - } - - @Override - public void restoreState(Long state) { - collected = state; - } } }