-
Notifications
You must be signed in to change notification settings - Fork 13.8k
[FLINK-34978][State] Introduce Asynchronous State APIs #24595
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Closed
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
65 changes: 65 additions & 0 deletions
65
flink-core-api/src/main/java/org/apache/flink/api/common/state/v2/AppendingState.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,65 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.flink.api.common.state.v2; | ||
|
|
||
| import org.apache.flink.annotation.Experimental; | ||
|
|
||
| /** | ||
| * Base interface for partitioned state that supports adding elements and inspecting the current | ||
| * state. Elements can either be kept in a buffer (list-like) or aggregated into one value. | ||
| * | ||
| * <p>The state is accessed and modified by user functions, and checkpointed consistently by the | ||
| * system as part of the distributed snapshots. | ||
| * | ||
| * <p>The state is only accessible by functions applied on a {@code KeyedStream}. The key is | ||
| * automatically supplied by the system, so the function always sees the value mapped to the key of | ||
| * the current element. That way, the system can handle stream and state partitioning consistently | ||
| * together. | ||
| * | ||
| * @param <IN> Type of the value that can be added to the state. | ||
| * @param <OUT> Type of the value that can be retrieved from the state. | ||
| */ | ||
| @Experimental | ||
| public interface AppendingState<IN, OUT> extends State { | ||
|
|
||
| /** | ||
| * Returns the current value for the state. When the state is not partitioned the returned value | ||
| * is the same for all inputs in a given operator instance. If state partitioning is applied, | ||
| * the value returned depends on the current operator input, as the operator maintains an | ||
| * independent state for each partition. | ||
| * | ||
| * <p><b>NOTE TO IMPLEMENTERS:</b> if the state is empty, then this method should return {@code | ||
| * null} wrapped by a StateFuture. | ||
| * | ||
| * @return The operator state value corresponding to the current input or {@code null} wrapped | ||
| * by a {@link StateFuture} if the state is empty. | ||
| */ | ||
| StateFuture<OUT> asyncGet(); | ||
|
|
||
| /** | ||
| * Updates the operator state accessible by {@link #asyncGet()} by adding the given value to the | ||
| * list of values. The next time {@link #asyncGet()} is called (for the same state partition) | ||
| * the returned state will represent the updated list. | ||
| * | ||
| * <p>null value is not allowed to be passed in. | ||
| * | ||
| * @param value The new value for the state. | ||
| */ | ||
| StateFuture<Void> asyncAdd(IN value); | ||
| } |
71 changes: 71 additions & 0 deletions
71
flink-core-api/src/main/java/org/apache/flink/api/common/state/v2/ListState.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,71 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.flink.api.common.state.v2; | ||
|
|
||
| import org.apache.flink.annotation.Experimental; | ||
|
|
||
| import java.util.List; | ||
|
|
||
| /** | ||
| * {@link State} interface for partitioned list state in Operations. The state is accessed and | ||
| * modified by user functions, and checkpointed consistently by the system as part of the | ||
| * distributed snapshots. | ||
| * | ||
| * <p>The state can be a keyed list state or an operator list state. | ||
| * | ||
| * <p>When it is a keyed list state, it is accessed by functions applied on a {@code KeyedStream}. | ||
| * The key is automatically supplied by the system, so the function always sees the value mapped to | ||
| * the key of the current element. That way, the system can handle stream and state partitioning | ||
| * consistently together. | ||
| * | ||
| * <p>When it is an operator list state, the list is a collection of state items that are | ||
| * independent from each other and eligible for redistribution across operator instances in case of | ||
| * changed operator parallelism. | ||
| * | ||
| * @param <T> Type of values that this list state keeps. | ||
| */ | ||
| @Experimental | ||
| public interface ListState<T> extends MergingState<T, StateIterator<T>> { | ||
|
|
||
| /** | ||
| * Updates the operator state accessible by {@link #asyncGet()} by updating existing values to | ||
| * the given list of values. The next time {@link #asyncGet()} is called (for the same state | ||
| * partition) the returned state will represent the updated list. | ||
| * | ||
| * <p>If an empty list is passed in, the state value will be null. | ||
| * | ||
| * <p>Null value passed in or any null value in list is not allowed. | ||
| * | ||
| * @param values The new values for the state. | ||
| */ | ||
| StateFuture<Void> asyncUpdate(List<T> values); | ||
|
|
||
| /** | ||
| * Updates the operator state accessible by {@link #asyncGet()} by adding the given values to | ||
| * existing list of values. The next time {@link #asyncGet()} is called (for the same state | ||
| * partition) the returned state will represent the updated list. | ||
| * | ||
| * <p>If an empty list is passed in, the state value remains unchanged. | ||
| * | ||
| * <p>Null value passed in or any null value in list is not allowed. | ||
| * | ||
| * @param values The new values to be added to the state. | ||
| */ | ||
| StateFuture<Void> asyncAddAll(List<T> values); | ||
| } |
149 changes: 149 additions & 0 deletions
149
flink-core-api/src/main/java/org/apache/flink/api/common/state/v2/MapState.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,149 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.flink.api.common.state.v2; | ||
|
|
||
| import org.apache.flink.annotation.Experimental; | ||
|
|
||
| import java.util.Map; | ||
|
|
||
| /** | ||
| * {@link State} interface for partitioned key-value state. The key-value pair can be added, updated | ||
| * and retrieved. | ||
| * | ||
| * <p>The state is accessed and modified by user functions, and checkpointed consistently by the | ||
| * system as part of the distributed snapshots. | ||
| * | ||
| * <p>The state is only accessible by functions applied on a {@code KeyedStream}. The key is | ||
| * automatically supplied by the system, so the function always sees the value mapped to the key of | ||
| * the current element. That way, the system can handle stream and state partitioning consistently | ||
| * together. | ||
| * | ||
| * @param <UK> Type of the keys in the state. | ||
| * @param <UV> Type of the values in the state. | ||
| */ | ||
| @Experimental | ||
| public interface MapState<UK, UV> extends State { | ||
|
|
||
| /** | ||
| * Returns the current value associated with the given key asynchronously. When the state is not | ||
| * partitioned the returned value is the same for all inputs in a given operator instance. If | ||
| * state partitioning is applied, the value returned depends on the current operator input, as | ||
| * the operator maintains an independent state for each partition. | ||
| * | ||
| * @return The {@link StateFuture} that will return value corresponding to the current input. | ||
| * When no corresponding value for this key, the future will return {@code null}. | ||
| */ | ||
| StateFuture<UV> asyncGet(UK key); | ||
|
|
||
| /** | ||
| * Update the current value associated with the given key asynchronously. When the state is not | ||
| * partitioned the value is updated for all inputs in a given operator instance. If state | ||
| * partitioning is applied, the updated value depends on the current operator input, as the | ||
| * operator maintains an independent state for each partition. When a {@code null} value is | ||
| * provided, the state for the given key will be removed. | ||
| * | ||
| * @param key The key that will be updated. | ||
| * @param value The new value for the key. | ||
| * @return The {@link StateFuture} that will trigger the callback when update finishes. | ||
| */ | ||
| StateFuture<Void> asyncPut(UK key, UV value); | ||
|
|
||
| /** | ||
| * Update all of the mappings from the given map into the state asynchronously. When the state | ||
| * is not partitioned the value is updated for all inputs in a given operator instance. If state | ||
| * partitioning is applied, the updated mapping depends on the current operator input, as the | ||
| * operator maintains an independent state for each partition. When a {@code null} value is | ||
| * provided within the map, the state for the corresponding key will be removed. | ||
| * | ||
| * <p>If an empty map is passed in, the state value remains unchanged. | ||
| * | ||
| * <p>Null map pointer is not allowed. | ||
| * | ||
| * @param map The mappings to be stored in this state. | ||
| * @return The {@link StateFuture} that will trigger the callback when update finishes. | ||
| */ | ||
| StateFuture<Void> asyncPutAll(Map<UK, UV> map); | ||
masteryhx marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| /** | ||
| * Delete the mapping of the given key from the state asynchronously. When the state is not | ||
| * partitioned the deleted value is the same for all inputs in a given operator instance. If | ||
| * state partitioning is applied, the value deleted depends on the current operator input, as | ||
| * the operator maintains an independent state for each partition. | ||
| * | ||
| * @param key The key of the mapping. | ||
| * @return The {@link StateFuture} that will trigger the callback when update finishes. | ||
| */ | ||
| StateFuture<Void> asyncRemove(UK key); | ||
|
|
||
| /** | ||
| * Returns whether there exists the given mapping asynchronously. When the state is not | ||
| * partitioned the returned value is the same for all inputs in a given operator instance. If | ||
| * state partitioning is applied, the value returned depends on the current operator input, as | ||
| * the operator maintains an independent state for each partition. | ||
| * | ||
| * @param key The key of the mapping. | ||
| * @return The {@link StateFuture} that will return true if there exists a mapping whose key | ||
| * equals to the given key. | ||
| */ | ||
| StateFuture<Boolean> asyncContains(UK key); | ||
|
|
||
| /** | ||
| * Returns the current iterator for all the mappings of this state asynchronously. When the | ||
| * state is not partitioned the returned iterator is the same for all inputs in a given operator | ||
| * instance. If state partitioning is applied, the iterator returned depends on the current | ||
| * operator input, as the operator maintains an independent state for each partition. | ||
| * | ||
| * @return The {@link StateFuture} that will return mapping iterator corresponding to the | ||
| * current input. | ||
| */ | ||
| StateFuture<StateIterator<Map.Entry<UK, UV>>> asyncEntries(); | ||
|
|
||
| /** | ||
| * Returns the current iterator for all the keys of this state asynchronously. When the state is | ||
| * not partitioned the returned iterator is the same for all inputs in a given operator | ||
| * instance. If state partitioning is applied, the iterator returned depends on the current | ||
| * operator input, as the operator maintains an independent state for each partition. | ||
| * | ||
| * @return The {@link StateFuture} that will return key iterator corresponding to the current | ||
| * input. | ||
| */ | ||
| StateFuture<StateIterator<UK>> asyncKeys(); | ||
|
|
||
| /** | ||
| * Returns the current iterator for all the values of this state asynchronously. When the state | ||
| * is not partitioned the returned iterator is the same for all inputs in a given operator | ||
| * instance. If state partitioning is applied, the iterator returned depends on the current | ||
| * operator input, as the operator maintains an independent state for each partition. | ||
| * | ||
| * @return The {@link StateFuture} that will return value iterator corresponding to the current | ||
| * input. | ||
| */ | ||
| StateFuture<StateIterator<UV>> asyncValues(); | ||
|
|
||
| /** | ||
| * Returns whether this state contains no key-value mappings asynchronously. When the state is | ||
| * not partitioned the returned value is the same for all inputs in a given operator instance. | ||
| * If state partitioning is applied, the value returned depends on the current operator input, | ||
| * as the operator maintains an independent state for each partition. | ||
| * | ||
| * @return The {@link StateFuture} that will return true if there is no key-value mapping, | ||
| * otherwise false. | ||
| */ | ||
| StateFuture<Boolean> asyncIsEmpty(); | ||
| } | ||
32 changes: 32 additions & 0 deletions
32
flink-core-api/src/main/java/org/apache/flink/api/common/state/v2/MergingState.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,32 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.flink.api.common.state.v2; | ||
|
|
||
| import org.apache.flink.annotation.Experimental; | ||
|
|
||
| /** | ||
| * Extension of {@link AppendingState} that allows merging of state. That is, two instances of | ||
| * {@link MergingState} can be combined into a single instance that contains all the information of | ||
| * the two merged states. | ||
| * | ||
| * @param <IN> Type of the value that can be added to the state. | ||
| * @param <OUT> Type of the value that can be retrieved from the state. | ||
| */ | ||
| @Experimental | ||
| public interface MergingState<IN, OUT> extends AppendingState<IN, OUT> {} |
36 changes: 36 additions & 0 deletions
36
flink-core-api/src/main/java/org/apache/flink/api/common/state/v2/State.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,36 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.flink.api.common.state.v2; | ||
|
|
||
| import org.apache.flink.annotation.Experimental; | ||
|
|
||
| /** | ||
| * Interface that different types of partitioned state must implement. | ||
masteryhx marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| * | ||
| * <p>The state is only accessible by functions applied on a {@code KeyedStream}. The key is | ||
| * automatically supplied by the system, so the function always sees the value mapped to the key of | ||
| * the current element. That way, the system can handle stream and state partitioning consistently | ||
| * together. | ||
| */ | ||
| @Experimental | ||
| public interface State { | ||
|
|
||
| /** Removes the value mapped under the current key asynchronously. */ | ||
| StateFuture<Void> asyncClear(); | ||
| } | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.