Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-17376][API/DataStream]Deprecated methods and related code updated #12189

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
456 changes: 4 additions & 452 deletions docs/dev/migration.md

Large diffs are not rendered by default.

456 changes: 4 additions & 452 deletions docs/dev/migration.zh.md

Large diffs are not rendered by default.

10 changes: 1 addition & 9 deletions docs/dev/stream/state/state.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,6 @@ added to the state. Contrary to `ReducingState`, the aggregate type may be diffe
of elements that are added to the state. The interface is the same as for `ListState` but elements
added using `add(IN)` are aggregated using a specified `AggregateFunction`.

* `FoldingState<T, ACC>`: This keeps a single value that represents the aggregation of all values
added to the state. Contrary to `ReducingState`, the aggregate type may be different from the type
of elements that are added to the state. The interface is similar to `ListState` but elements
added using `add(T)` are folded into an aggregate using a specified `FoldFunction`.

* `MapState<UK, UV>`: This keeps a list of mappings. You can put key-value pairs into the state and
retrieve an `Iterable` over all currently stored mappings. Mappings are added using `put(UK, UV)` or
`putAll(Map<UK, UV>)`. The value associated with a user key can be retrieved using `get(UK)`. The iterable
Expand All @@ -129,8 +124,6 @@ You can also use `isEmpty()` to check whether this map contains any key-value ma
All types of state also have a method `clear()` that clears the state for the currently
active key, i.e. the key of the input element.

<span class="label label-danger">Attention</span> `FoldingState` and `FoldingStateDescriptor` have been deprecated in Flink 1.4 and will be completely removed in the future. Please use `AggregatingState` and `AggregatingStateDescriptor` instead.

It is important to keep in mind that these state objects are only used for interfacing
with state. The state is not necessarily stored inside but might reside on disk or somewhere else.
The second thing to keep in mind is that the value you get from the state
Expand All @@ -142,7 +135,7 @@ To get a state handle, you have to create a `StateDescriptor`. This holds the na
that you can reference them), the type of the values that the state holds, and possibly
a user-specified function, such as a `ReduceFunction`. Depending on what type of state you
want to retrieve, you create either a `ValueStateDescriptor`, a `ListStateDescriptor`,
a `ReducingStateDescriptor`, a `FoldingStateDescriptor` or a `MapStateDescriptor`.
a `ReducingStateDescriptor`, or a `MapStateDescriptor`.

State is accessed using the `RuntimeContext`, so it is only possible in *rich functions*.
Please see [here]({% link dev/user_defined_functions.md %}#rich-functions) for
Expand All @@ -153,7 +146,6 @@ is available in a `RichFunction` has these methods for accessing state:
* `ReducingState<T> getReducingState(ReducingStateDescriptor<T>)`
* `ListState<T> getListState(ListStateDescriptor<T>)`
* `AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT>)`
* `FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC>)`
* `MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)`

This is an example `FlatMapFunction` that shows how all of the parts fit together:
Expand Down
9 changes: 1 addition & 8 deletions docs/dev/stream/state/state.zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,23 +107,17 @@ keyed state 接口提供不同类型状态的访问接口,这些状态都作
* `AggregatingState<IN, OUT>`: 保留一个单值,表示添加到状态的所有值的聚合。和 `ReducingState` 相反的是, 聚合类型可能与 添加到状态的元素的类型不同。
接口与 `ListState` 类似,但使用 `add(IN)` 添加的元素会用指定的 `AggregateFunction` 进行聚合。

* `FoldingState<T, ACC>`: 保留一个单值,表示添加到状态的所有值的聚合。 与 `ReducingState` 相反,聚合类型可能与添加到状态的元素类型不同。
接口与 `ListState` 类似,但使用`add(T)`添加的元素会用指定的 `FoldFunction` 折叠成聚合值。

* `MapState<UK, UV>`: 维护了一个映射列表。 你可以添加键值对到状态中,也可以获得反映当前所有映射的迭代器。使用 `put(UK,UV)` 或者 `putAll(Map<UK,UV>)` 添加映射。
使用 `get(UK)` 检索特定 key。 使用 `entries()`,`keys()` 和 `values()` 分别检索映射、键和值的可迭代视图。你还可以通过 `isEmpty()` 来判断是否包含任何键值对。

所有类型的状态还有一个`clear()` 方法,清除当前 key 下的状态数据,也就是当前输入元素的 key。

<span class="label label-danger">注意</span> `FoldingState` 和 `FoldingStateDescriptor` 从 Flink 1.4 开始就已经被启用,将会在未来被删除。
作为替代请使用 `AggregatingState` 和 `AggregatingStateDescriptor`。

请牢记,这些状态对象仅用于与状态交互。状态本身不一定存储在内存中,还可能在磁盘或其他位置。
另外需要牢记的是从状态中获取的值取决于输入元素所代表的 key。 因此,在不同 key 上调用同一个接口,可能得到不同的值。

你必须创建一个 `StateDescriptor`,才能得到对应的状态句柄。 这保存了状态名称(正如我们稍后将看到的,你可以创建多个状态,并且它们必须具有唯一的名称以便可以引用它们),
状态所持有值的类型,并且可能包含用户指定的函数,例如`ReduceFunction`。 根据不同的状态类型,可以创建`ValueStateDescriptor`,`ListStateDescriptor`,
`ReducingStateDescriptor`,`FoldingStateDescriptor` 或 `MapStateDescriptor`。
`ReducingStateDescriptor` 或 `MapStateDescriptor`。

状态通过 `RuntimeContext` 进行访问,因此只能在 *rich functions* 中使用。请参阅[这里]({% link dev/user_defined_functions.zh.md %}#rich-functions)获取相关信息,
但是我们很快也会看到一个例子。`RichFunction` 中 `RuntimeContext` 提供如下方法:
Expand All @@ -132,7 +126,6 @@ keyed state 接口提供不同类型状态的访问接口,这些状态都作
* `ReducingState<T> getReducingState(ReducingStateDescriptor<T>)`
* `ListState<T> getListState(ListStateDescriptor<T>)`
* `AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT>)`
* `FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC>)`
* `MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)`

下面是一个 `FlatMapFunction` 的例子,展示了如何将这些部分组合起来:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
Expand All @@ -29,6 +30,7 @@
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.JavaSerializer;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
Expand Down Expand Up @@ -387,8 +389,12 @@ public void initializeState(FunctionInitializationContext context) throws Except
this.refTruncate = reflectTruncate(fs);
}

// We are using JavaSerializer from the flink-runtime module here. This is very naughty and
// we shouldn't be doing it because ideally nothing in the API modules/connector depends
// directly on flink-runtime. We are doing it here because we need to maintain backwards
// compatibility with old state and because we will have to rework/remove this code soon.
OperatorStateStore stateStore = context.getOperatorStateStore();
restoredBucketStates = stateStore.getSerializableListState("bucket-states");
this.restoredBucketStates = stateStore.getListState(new ListStateDescriptor<>("bucket-states", new JavaSerializer<>()));

int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
if (context.isRestored()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@
package org.apache.flink.streaming.connectors.fs.bucketing;

import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.JavaSerializer;
import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.connectors.fs.StringWriter;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
Expand Down Expand Up @@ -204,7 +206,11 @@ static class ValidatingBucketingSink<T> extends BucketingSink<T> {
public void initializeState(FunctionInitializationContext context) throws Exception {
OperatorStateStore stateStore = context.getOperatorStateStore();

ListState<State<T>> restoredBucketStates = stateStore.getSerializableListState("bucket-states");
// We are using JavaSerializer from the flink-runtime module here. This is very naughty and
// we shouldn't be doing it because ideally nothing in the API modules/connector depends
// directly on flink-runtime. We are doing it here because we need to maintain backwards
// compatibility with old state and because we will have to rework/remove this code soon.
ListState<State<T>> restoredBucketStates = stateStore.getListState(new ListStateDescriptor<>("bucket-states", new JavaSerializer<>()));

if (context.isRestored()) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
Expand Down Expand Up @@ -193,12 +192,6 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
/** Accessor for state in the operator state backend. */
private transient ListState<Tuple2<KafkaTopicPartition, Long>> unionOffsetStates;

/**
* Flag indicating whether the consumer is restored from older state written with Flink 1.1 or 1.2.
* When the current run is restored from older state, partition discovery is disabled.
*/
private boolean restoredFromOldState;

/** Discovery loop, executed in a separate thread. */
private transient volatile Thread discoveryLoopThread;

Expand Down Expand Up @@ -566,17 +559,11 @@ public void open(Configuration configuration) throws Exception {
}

for (Map.Entry<KafkaTopicPartition, Long> restoredStateEntry : restoredState.entrySet()) {
if (!restoredFromOldState) {
// seed the partition discoverer with the union state while filtering out
// restored partitions that should not be subscribed by this subtask
if (KafkaTopicPartitionAssigner.assign(
restoredStateEntry.getKey(), getRuntimeContext().getNumberOfParallelSubtasks())
== getRuntimeContext().getIndexOfThisSubtask()){
subscribedPartitionsToStartOffsets.put(restoredStateEntry.getKey(), restoredStateEntry.getValue());
}
} else {
// when restoring from older 1.1 / 1.2 state, the restored state would not be the union state;
// in this case, just use the restored state as the subscribed partitions
// seed the partition discoverer with the union state while filtering out
// restored partitions that should not be subscribed by this subtask
if (KafkaTopicPartitionAssigner.assign(
restoredStateEntry.getKey(), getRuntimeContext().getNumberOfParallelSubtasks())
== getRuntimeContext().getIndexOfThisSubtask()){
subscribedPartitionsToStartOffsets.put(restoredStateEntry.getKey(), restoredStateEntry.getValue());
}
}
Expand Down Expand Up @@ -907,27 +894,12 @@ public final void initializeState(FunctionInitializationContext context) throws

OperatorStateStore stateStore = context.getOperatorStateStore();

ListState<Tuple2<KafkaTopicPartition, Long>> oldRoundRobinListState =
stateStore.getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);

this.unionOffsetStates = stateStore.getUnionListState(new ListStateDescriptor<>(OFFSETS_STATE_NAME,
createStateSerializer(getRuntimeContext().getExecutionConfig())));

if (context.isRestored() && !restoredFromOldState) {
if (context.isRestored()) {
restoredState = new TreeMap<>(new KafkaTopicPartition.Comparator());

// migrate from 1.2 state, if there is any
for (Tuple2<KafkaTopicPartition, Long> kafkaOffset : oldRoundRobinListState.get()) {
restoredFromOldState = true;
unionOffsetStates.add(kafkaOffset);
}
oldRoundRobinListState.clear();

if (restoredFromOldState && discoveryIntervalMillis != PARTITION_DISCOVERY_DISABLED) {
throw new IllegalArgumentException(
"Topic / partition discovery cannot be enabled if the job is restored from a savepoint from Flink 1.2.x.");
}

// populate actual holder for restored state
for (Tuple2<KafkaTopicPartition, Long> kafkaOffset : unionOffsetStates.get()) {
restoredState.put(kafkaOffset.f0, kafkaOffset.f1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.apache.flink.testutils.migration.MigrationVersion;
import org.apache.flink.util.SerializedValue;

import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand All @@ -57,8 +56,6 @@

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.Assume.assumeTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.powermock.api.mockito.PowerMockito.doAnswer;
Expand Down Expand Up @@ -99,7 +96,6 @@ public class FlinkKafkaConsumerBaseMigrationTest {
@Parameterized.Parameters(name = "Migration Savepoint: {0}")
public static Collection<MigrationVersion> parameters () {
return Arrays.asList(
MigrationVersion.v1_3,
MigrationVersion.v1_4,
MigrationVersion.v1_5,
MigrationVersion.v1_6,
Expand Down Expand Up @@ -325,40 +321,6 @@ public void testRestore() throws Exception {
consumerOperator.cancel();
}

/**
* Test restoring from savepoints before version Flink 1.3 should fail if discovery is enabled.
*/
@Test
public void testRestoreFailsWithNonEmptyPreFlink13StatesIfDiscoveryEnabled() throws Exception {
assumeTrue(testMigrateVersion == MigrationVersion.v1_3);

final List<KafkaTopicPartition> partitions = new ArrayList<>(PARTITION_STATE.keySet());

final DummyFlinkKafkaConsumer<String> consumerFunction =
new DummyFlinkKafkaConsumer<>(TOPICS, partitions, 1000L); // discovery enabled

StreamSource<String, DummyFlinkKafkaConsumer<String>> consumerOperator =
new StreamSource<>(consumerFunction);

final AbstractStreamOperatorTestHarness<String> testHarness =
new AbstractStreamOperatorTestHarness<>(consumerOperator, 1, 1, 0);

testHarness.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);

testHarness.setup();

// restore state from binary snapshot file; should fail since discovery is enabled
try {
testHarness.initializeState(
OperatorSnapshotUtil.getResourceFilename(
"kafka-consumer-migration-test-flink" + testMigrateVersion + "-snapshot"));

fail("Restore from savepoints from version before Flink 1.3.x should have failed if discovery is enabled.");
} catch (Exception e) {
Assert.assertTrue(e instanceof IllegalArgumentException);
}
}

// ------------------------------------------------------------------------

private static class DummyFlinkKafkaConsumer<T> extends FlinkKafkaConsumerBase<T> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1388,19 +1388,6 @@ public <S> ListState<S> getUnionListState(ListStateDescriptor<S> stateDescriptor
return (ListState<S>) mockRestoredUnionListState;
}

@Override
public <T extends Serializable> ListState<T> getSerializableListState(String stateName) throws Exception {
// return empty state for the legacy 1.2 Kafka consumer state
return new TestingListState<>();
}

// ------------------------------------------------------------------------

@Override
public <S> ListState<S> getOperatorState(ListStateDescriptor<S> stateDescriptor) throws Exception {
throw new UnsupportedOperationException();
}

@Override
public <K, V> BroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> stateDescriptor) throws Exception {
throw new UnsupportedOperationException();
Expand Down
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
Expand Down Expand Up @@ -97,7 +98,7 @@ FunctionInitializationContext getMockContext() throws Exception {
OperatorStateStore mockStore = Mockito.mock(OperatorStateStore.class);
FunctionInitializationContext mockContext = Mockito.mock(FunctionInitializationContext.class);
Mockito.when(mockContext.getOperatorStateStore()).thenReturn(mockStore);
Mockito.when(mockStore.getSerializableListState(any(String.class))).thenReturn(null);
Mockito.when(mockStore.getListState(any(ListStateDescriptor.class))).thenReturn(null);
return mockContext;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@
import org.apache.flink.api.common.externalresource.ExternalResourceInfo;
import org.apache.flink.api.common.state.AggregatingState;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.FoldingState;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapState;
Expand Down Expand Up @@ -414,50 +412,6 @@ public interface RuntimeContext {
@PublicEvolving
<IN, ACC, OUT> AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT> stateProperties);

/**
* Gets a handle to the system's key/value folding state. This state is similar to the state
* accessed via {@link #getState(ValueStateDescriptor)}, but is optimized for state that
* aggregates values with different types.
*
* <p>This state is only accessible if the function is executed on a KeyedStream.
*
* <pre>{@code
* DataStream<MyType> stream = ...;
* KeyedStream<MyType> keyedStream = stream.keyBy("id");
*
* keyedStream.map(new RichMapFunction<MyType, List<MyType>>() {
*
* private FoldingState<MyType, Long> state;
*
* public void open(Configuration cfg) {
* state = getRuntimeContext().getFoldingState(
* new FoldingStateDescriptor<>("sum", 0L, (a, b) -> a.count() + b, Long.class));
* }
*
* public Tuple2<MyType, Long> map(MyType value) {
* state.add(value);
* return new Tuple2<>(value, state.get());
* }
* });
*
* }</pre>
*
* @param stateProperties The descriptor defining the properties of the stats.
*
* @param <T> Type of the values folded in the other state
* @param <ACC> Type of the value in the state
*
* @return The partitioned state object.
*
* @throws UnsupportedOperationException Thrown, if no partitioned state is available for the
* function (function is not part of a KeyedStream).
*
* @deprecated will be removed in a future version in favor of {@link AggregatingState}
*/
@PublicEvolving
@Deprecated
<T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties);

/**
* Gets a handle to the system's key/value map state. This state is similar to the state
* accessed via {@link #getState(ValueStateDescriptor)}, but is optimized for state that
Expand Down
Loading