Skip to content

Conversation

@shixiaogang
Copy link

@shixiaogang shixiaogang commented Nov 8, 2016

Changes in the definition of State and StateDescriptor:

  • Add get() in the State interface.
  • Remove type serializers of state values from StateDescriptors.
  • Add SimpleStateDescriptor to simplify the construction of ValueStateDescriptor, ReducingStateDescriptor and FoldingStateDescriptor.
  • Changes the definition of KeyedStateBackend and AbstractKeyedStateBackend accordingly.
  • Modify the implementation of ListStateDescriptor accordingly.

Changes in HeapStateBackend:

  • Let AbstractHeapState not implement the State interface. The clear() method now is removed from AbstractHeapState.
  • Add HeapSimpleState to simplify the implementation of HeapValueState, HeapReducingState and HeapFoldingState.
  • Change the implementation of HeapValueState, HeapReducingState and HeapFoldingState accordingly.

Changes in RocksDBStateBackend:

  • Let AbstractRocksDBState not implement the State interface, removing the clear() method. Now, AbstractRocksDBState does not depend on the types of State and StateDescriptor any more.
  • Add RocksDBSimpleState to simplify the implementation of RocksDBValueState, RocksDBReducingState and RocksDBFoldingState.
  • Change the implementation of RocksDBValueState, RocksDBReducingState and RocksDBFoldingState accordingly.

Others:

  • Update the usage of States in the implementation of window operators.
  • Update the usage of States in unit tests.

=== UPDATE ===

  • Remove clear() method from State
  • Add a new interface called UpdatableState
  • Reformat the code

@shixiaogang shixiaogang changed the title [FLINK-5023 & FLINK-5024] Add SimpleStateDescriptor to clarify the concepts [FLINK-5023][FLINK-5024] Add SimpleStateDescriptor to clarify the concepts Nov 8, 2016
RocksDBKeyedStateBackend<K> backend) {

RocksDBKeyedStateBackend<K> backend
) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please stick with the original code style here

if (valueBytes == null) {
return stateDesc.getDefaultValue();
}
return valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStream(valueBytes)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that this is used only in single threaded settings, we should try to reuse the DataInputViewStreamWrapper and ByteArrayInputStream.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may help reduce the number of object allocation, but i think the brought benefits are fewer that brought by the re-usage of ByteArrayOutputStream. Because the byte array inside ByteArrayInputStream is provided by other objects.

What do you think?

out.defaultWriteObject();
}

private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this method is not necessary, because it only triggers default serialization anyways.

Copy link
Contributor

@aljoscha aljoscha left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall, I like these changes a lot! Very good work.

I had some inline comments, mostly about missing Javadoc. One thing I'd like to see changed is the additional V generic parameter in a lot of the state accessor methods, I don't think we need that one and I would like it if we don't add additional generic parameters if we don't have to.

private transient TypeInformation<T> typeInfo;

/** The default value returned by the state when no other value is bound to a key */
protected transient T defaultValue;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might think about renaming this because for FoldingState it's not a default value but the initial accumulation value. (Just a suggestion, not strictly necessary)

* partitioned the returned value is the same for all inputs in a given
* operator instance. If state partitioning is applied, the value returned
* depends on the current operator input, as the operator maintains an
* independent state for each partition.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is incorrect, we don't maintain state by partition but by key. (This is not your fault, it was always like this on ValueState and you copied it from there, I just discovered it now when reading your code.)

*
* @throws IOException Thrown if the system cannot access the state.
*/
@Deprecated
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also add a @deprecated description in the Javadoc, pointing to use get().

import java.util.Map;

/**
* Heap-backed partitioned states whose values are not composited and is snapshotted into files.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the is snapshotted into files portion is not always true, and also not necessary here.

*/
@SuppressWarnings({"rawtypes", "unchecked"})
<N, S extends State> S getPartitionedState(
<N, V, S extends State<V>> S getPartitionedState(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we don't need the additional V parameter. It's never used and State<?> works just as while while State<V> just pretends to add more type safety. Because both ways work I would prefer to go with the solution that doesn't add additional generic parameters to methods.

Same holds for mergePartitionedStates().

*/
@Internal
abstract class AbstractQueryableStateOperator<S extends State, IN>
abstract class AbstractQueryableStateOperator<V, S extends State<V>, IN>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think here the same thing I mentioned earlier holds, we don't really need the additional V parameter.

* @throws Exception Thrown, if the state backend cannot create the key/value state.
*/
protected <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor) throws Exception {
protected <V, S extends State<V>> S getPartitionedState(StateDescriptor<S> stateDescriptor) throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, the same thing I mentioned on KeyedStateBackend holds, we don't need the additional V parameter.

* function (function is not part os a KeyedStream).
*/
<S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor);
<V, S extends State<V>> S getPartitionedState(StateDescriptor<S> stateDescriptor);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same thing as elsewhere: we don't need V.

@shixiaogang
Copy link
Author

@aljoscha Thanks for your review. I have updated the PR according to your suggestion.

@aljoscha
Copy link
Contributor

@shixiaogang Thanks for the swift update! The changes look very good now though I found one last thing that could be problematic. This is changing the behaviour of FoldingState when it comes to the default value. I'll quickly open a PR that verifies the current behaviour.

@shixiaogang
Copy link
Author

Oh... I added another field to make the code more clear, but I did not notice the serialization problem. Thanks very much for your reminder.

Your solution does work though the concept of "defaultValue" in folding states is a little confusing. Another solution to let FoldingStateDescriptor implement its own serialization method. And I prefer this one.

What do you think? @aljoscha

@aljoscha
Copy link
Contributor

@shixiaogang you can also do that, but then the machinery in SimpleStateDescriptor is only used by ValueStateDescriptor because this is really the only descriptor that has a default value. (In my original implementation it was like this and only ValueStateDescriptor had the serialisation logic and a default value.)

@shixiaogang
Copy link
Author

I moved default value from SimpleStateDescriptor to ValueStateDescriptor. Now only ValueStateDescriptors have default values. The serialization methods may contain some duplicated code, but i think it's acceptable.

I also modify the implementation of HeapReducingStates. The first value will be copied before being put into the heap.

@aljoscha What do you think of these changes?

@aljoscha
Copy link
Contributor

Thanks for updating, @shixiaogang. I'll look at this today!

For the change to reducing state it would be better to have this as a separate issue/commit/PR. @fhueske you already opened an issue for this, right? Could you please point us to that?

@shixiaogang
Copy link
Author

I rebased the branch to resolve the conflicts with the master branch.

@shixiaogang
Copy link
Author

Despite the changes in the state descriptors, the Flink jobs can restore from old versioned snapshots now.

@StephanEwen
Copy link
Contributor

Hi @shixiaogang !

I went through this pull request and below are a few thoughts. The pull request changes many things together. Some can work, and for others I would suggest to do it differently.
Let me know what you think about this:

Changing the StateDescriptor

This is probably a good cleanup. It does currently break the backwards compatibility, because Flink 1.1 wrote the state descriptors into the checkpoints. Flink 1.2 and 1.3 do not do that any more, but currently rely on the unchanged StateDescriptor classes for resuming Flink-1.1-savepoints.

We added a way to define "migration" classes, meaning we can store the old StateDescriptor classes in a migration package where they are dynamically loaded only as proxies when resuming a Flink-1.1 savepoint. That way we can change the classes and maintain backwards compatibility.

Removing clear() fromState

I think it would be nice to keep clear() on the base State interface. Can you explain why you want to remove it? In my opinion, every state needs to be able to clear, so it makes sense to have this on the case interface.

If this is in preparation for the MapState, then the MapState can simply override the clear() method with different logic.

  • I think that the MapState needs to support clear as well, where it deletes the sub-map for the current key.
  • On the HeapStateBackend, this is quite easy, when we assume that each (key/namespace) has a map as the value (and the complete map can be dropped)
  • For the RocksDBStateBackend, it is a bit more expensive, and would correspond to a range-iteration-and-deletes.

If an application decides that the MapState clear() is to expensive, it can decide to not call it. But we should still support it for cases where it is necessary.

Changing the State interface

I would like to not change State to State<T> in the Flink master now, because it cases warnings in all parts of the code (that suddenly use raw types) and for some user programs.

While this would be done for Flink 2.0, it would make merging simpler if we don't change it now.

@StephanEwen
Copy link
Contributor

Update: Just saw that you already did the migration support.

Will merge the StateDescriptor refactoring. Let's discuss the other two types of changes separately.

@StephanEwen
Copy link
Contributor

I posted a rebased version of the state descriptor refactoring here: #3243

@shixiaogang
Copy link
Author

@StephanEwen Thanks a lot for your comments.

Removing clear() from State

This change is suggested by @aljoscha who wants to let broadcast states share the same interface (see the discussion in FLINK-5023) . As mentioned, the broadcast states are read-only in some cases. Hence it's suggested not to provide the clear() method in the base State interface.

Changing the State interface

The State interface is typed because I want to provide the get() method for all states so that we can retrieve the data in the state (under the current key for keyed states). The functionality is already provided by all states except ValueState who provides the same functionality with the value() method. Providing the method for all states can help reduce some duplicated code in the implementation. It also makes sense for read-only states mentioned above.

@aljoscha
Copy link
Contributor

aljoscha commented Feb 4, 2017

In fact, my original suggestion was to add a new interface ReadableState:

interface ReadableState<T> {
  T get();
}

and leave all the existing interfaces as they are. That way we would have the least amount of changes in existing code and still have a common interface for state that can be read. (Note that ReadableState does not extends State on purpose, because of State.clear().)

@shixiaogang
Copy link
Author

@aljoscha That way, it's very confusing that a ReadableState is not a State. Hence I made State read-only and introduced the UpdatableState interface who extends State with the method clear().

These changes (mainly the introduction of the get() method) are intended to remove the duplicated code. As they have little relationship with the implementation of map states. I think it's okay not to change these interfaces now.

But I prefer to rethink the state hierarchy in the near future because there exists too much duplicated code now.

@shixiaogang
Copy link
Author

Close the pull request because the state descriptor now is refactored with the introduction of composited serializers (See FLINK-5790).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants