-
Notifications
You must be signed in to change notification settings - Fork 13.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-12729][state-processor-api] Add state reader for consuming non-partitioned operator state #8615
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the great work @sjwiesman!
The comments I have are mostly minor, with only 2 more important comments about classloaders and whether or not the work of reading savepoint metadata can be done earlier and done only once.
One thing to probably think about (maybe can be done as a follow-up PR):
Is the naming of Savepoint
and ExistingSavepoint
still sensible here, now that we renamed this series of work as "State Processor API"?
Otherwise, I have tried this manually, and it works for both types of backends, haven't discovered any problems so far. +1 to merge this once the comments are addressed.
...libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/Savepoint.java
Outdated
Show resolved
Hide resolved
ExistingSavepoint(ExecutionEnvironment env, String path, StateBackend stateBackend) { | ||
this.env = env; | ||
this.existingSavepoint = path; | ||
this.stateBackend = stateBackend; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: missing Preconditions.checkNotNull
checks
String uid, | ||
String name, | ||
TypeInformation<T> typeInfo, | ||
TypeSerializer<T> serializer) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As a follow-up, we can probably think about a variant where the user simply passes in a TypeSerializer
and no TypeInformation
. In this case, can we just wrap the given serializer into a "dummy" type info?
Not entirely sure what methods of the TypeInformation
will be used in the batch processing API.
*/ | ||
public BroadcastStateInputFormat(String savepointPath, String uid, MapStateDescriptor<K, V> descriptor) { | ||
super(savepointPath, uid, true); | ||
this.descriptor = descriptor; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: missing Preconditions.checkNotNull()
check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please check all other constructors of this problem, I think the issue occurs in multiple constructors.
* | ||
* @param <OT> The type of the input. | ||
*/ | ||
abstract class OperatorStateInputFormat<OT> extends SavepointInputFormat<OT, OperatorStateInputSplit> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Internal
annotation
...-processing-api/src/main/java/org/apache/flink/state/api/input/OperatorStateInputFormat.java
Outdated
Show resolved
Hide resolved
* @throws IOException If the savepoint path is invalid or the uid does not exist | ||
*/ | ||
OperatorState getOperatorState() throws IOException { | ||
final Savepoint savepoint = SavepointLoader.loadSavepoint(savepointPath); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've been thinking whether or not this operation can be done more earlier when loading a ExistingSavepoint
, and maintained by the ExistingSavepoint
class. Not entirely sure though, as that would require access to the DFS on the client side; not sure how feasible that is in practice.
Not too much of a deal, as this shouldn't be a heavy workload.
.resolveCheckpointPointer(savepointPath); | ||
|
||
try (DataInputStream stream = new DataInputStream(location.getMetadataHandle().openInputStream())) { | ||
return Checkpoints.loadCheckpointMetadata(stream, NullClassLoader.INSTANCE); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the future, once we add type information of state into the savepoint metadata file, I don't think this NullClassLoader
is correct anymore. We would potentially need the actual user classloader, since the metadata would contain user classes (e.g. user implemented TypeSerializerSnapshot
s)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was one place where I couldn't get access to the user classloader but it wasn't being used so I decided to go this route. However, if we do the reading on the client the current thread class loader should be the user class loader so that makes it easier.
...k-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/NullClassLoader.java
Outdated
Show resolved
Hide resolved
…-partitioned operator state
…ing non-partitioned operator state
…ing non-partitioned operator state
Thanks for the review @tzulitai ! I've addressed all the comments including moving the savepoint loader to run once on the client. |
…ing non-partitioned operator state
…n OnDiskSavepointMetadata This closes apache#8615.
…n OnDiskSavepointMetadata This closes apache#8615.
What is the purpose of the change
This is the initial PR for FLIP-43 adding the functionality to read non-partitioned operator state from a state snapshot.
Brief changelog
Verifying this change
This change added tests and can be verified as follows:
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (yes / no)Documentation
(Full documentation coming in the keyed state reader PR)