Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance Partitioned State and use it in WindowOperator #1562

Closed
wants to merge 3 commits into from

Conversation

aljoscha
Copy link
Contributor

The commits in this are self-contained. The first one enhances the partitioned state such that it can be used by the WindowOperator. The second commit adds the required changes in WindowOperator. The third commit adds a State Backend based on RocksDB. This means that window operator can not run on different types of state backends transparently.

@StephanEwen has some things that he wants to change, those will be added here (I think). I'm opening this now so that people can have a look at it.

[FLINK-3201] Enhance Partitioned State Interface with State Types

Add new state types ValueState, ListState and ReducingState, where
ListState and ReducingState derive from interface MergingState.

ValueState behaves exactly the same as OperatorState. MergingState is a
stateful list to which elements can be added and for which the elements
that it contains can be obtained. If using a ListState the list of
elements is actually kept, for a ReducingState a reduce function is used
to combine all added elements into one. To create a ValueState the user
passes a ValueStateIdentifier to
StreamingRuntimeContext.getPartitionedState() while they would pass a
ListStateIdentifier or ReducingStateIdentifier for the other state
types.

This change is necessary to give the system more information about the
nature of the operator state. We want this to be able to do incremental
snapshots. This would not be possible, for example, if the user had a
List as a state. Inside OperatorState this list would be opaque and
Flink could not create good incremental snapshots.

This also refactors the StateBackend. Before, the logic for partitioned
state was spread out over StreamingRuntimeContext,
AbstractStreamOperator and StateBackend. Now it is consolidated in
StateBackend.

This also adds support for partitioned state in two-input operators.

[FLINK-3200] Use Partitioned State in WindowOperator

This changes window operator to use the new partitioned state
abstraction for keeping window contents instead of custom internal
state and the checkpointed interface.

For now, timers are still kept as custom checkpointed state, however.

WindowOperator now expects a StateIdentifier for MergingState, this can
either be for ReducingState or ListState but WindowOperator is agnostic
to the type of State. Also the signature of WindowFunction is changed to
include the type of intermediate input. For example, if a ReducingState
is used the input of the WindowFunction is T (where T is the input
type). If using a ListState the input of the WindowFunction would be of
type Iterable[T].

@gyfora
Copy link
Contributor

gyfora commented Jan 31, 2016

Hey,
First of all, great work I am looking forward to having this is :)

I think it would be good if we added default implementations of the List, Reducing state based on the ValueState and those would be the default implementations returned by the AbstractStateBackend. I think this should go in before merging this.

Now that are approaching the release we should make the interfaces clean and future additions easy and I believe this is necessary for this. I think many backends will not have a very efficient way of implementing List and Reduce states and they will naturally fall back to the default implementation.

What do you think?

Gyula

@gyfora
Copy link
Contributor

gyfora commented Jan 31, 2016

This would also remove a lot of code duplication for the DbStateBackend and would probably also make the RocksDb backend a little cleaner :)

@aljoscha
Copy link
Contributor Author

aljoscha commented Feb 1, 2016

@gyfora you're right, I will add default implementations for ListState and ReducingState and use them in the DbStateBackend. For RocksDB there are custom implementations, for example in ListState I can append a value to the state with one call instead of a get-update-put operation.

Add new state types ValueState, ListState and ReducingState, where
ListState and ReducingState derive from interface MergingState.

ValueState behaves exactly the same as OperatorState. MergingState is a
stateful list to which elements can be added and for which the elements
that it contains can be obtained. If using a ListState the list of
elements is actually kept, for a ReducingState a reduce function is used
to combine all added elements into one. To create a ValueState the user
passes a ValueStateIdentifier to
StreamingRuntimeContext.getPartitionedState() while they would pass a
ListStateIdentifier or ReducingStateIdentifier for the other state
types.

This change is necessary to give the system more information about the
nature of the operator state. We want this to be able to do incremental
snapshots. This would not be possible, for example, if the user had a
List as a state. Inside OperatorState this list would be opaque and
Flink could not create good incremental snapshots.

This also refactors the StateBackend. Before, the logic for partitioned
state was spread out over StreamingRuntimeContext,
AbstractStreamOperator and StateBackend. Now it is consolidated in
StateBackend.

This also adds support for partitioned state in two-input operators.
This changes window operator to use the new partitioned state
abstraction for keeping window contents instead of custom internal
state and the checkpointed interface.

For now, timers are still kept as custom checkpointed state, however.

WindowOperator now expects a StateIdentifier for MergingState, this can
either be for ReducingState or ListState but WindowOperator is agnostic
to the type of State. Also the signature of WindowFunction is changed to
include the type of intermediate input. For example, if a ReducingState
is used the input of the WindowFunction is T (where T is the input
type). If using a ListState the input of the WindowFunction would be of
type Iterable[T].
@aljoscha aljoscha force-pushed the window-on-state branch 2 times, most recently from 5cdd020 to 06d225b Compare February 1, 2016 14:57
@StephanEwen
Copy link
Contributor

I think this is in quite good shape.

@gyfora's comment makes sense, +1 for such a default implementation.

The changes I am making are based on this, but I would like to go ahead with merging this independently. Should we also apply the changes suggested by Gyula after merging this?

@aljoscha
Copy link
Contributor Author

aljoscha commented Feb 1, 2016

@StephanEwen @gyfora I have the changes reflected in this PR already.

@StephanEwen
Copy link
Contributor

Okay, nice, didn't look at the code after the changes again.

+1 to merge this then

@StephanEwen
Copy link
Contributor

Have rebased this onto the current master.

One thing I'd like to change is to move the HDFS copy process utils from flink-core to flink-runtime.
Ideally, we keep flink-core free of any Hadoop-dependent code.

@StephanEwen
Copy link
Contributor

Rebased and extended this pull request in #1571

@aljoscha aljoscha closed this Feb 4, 2016
@aljoscha aljoscha deleted the window-on-state branch February 4, 2016 12:11
@wenlonglwl
Copy link

Hi, @aljoscha , I am confused about the change you make in this issue, that makes RichFunction not supported in the aggregation function of the windowed stream, what is useful for customized initialization of complex functions. Can you explain me the reason? Thank you for your time in forward ~

@aljoscha
Copy link
Contributor Author

aljoscha commented Mar 1, 2016

Hi @wenlonglwl, the reason for this were mostly practical concerns. We want to use our partitioned state abstraction for the window state because that makes it easy to change the state backend depending on the use case. So, when using a ReduceFunction this is actually put into a ReducingState that keeps an aggregated value using that ReduceFunction. Having RichFunctions in inside state would require a lot of overhead. First the function would have to be copied for every different key because the function can keep state internally, second, it would require wiring all the additional RichFunction calls (such as open(), close(), the RuntimeContext) through to the state abstraction.

Could you maybe use the WindowedStream.apply(ReduceFunction, WindowFunction) method, this allows you to incrementally aggregate the elements in the window and then get the final result in the WindowFunction once the window is emitted. This WindowFunction can be a RichFunction and can also keep state.

@StephanEwen
Copy link
Contributor

I am wondering whether we cannot allow rich functions in the windows still. The window operator would need to call open(), and close()rather than the state. What would be the downside of that?

@wenlonglwl
Copy link

3q, @aljoscha ~ I agree with @StephanEwen, your purpose is to prevent user from using partitioned state in windowed reduce function, but removing rich functions do much more side-effection~

@aljoscha
Copy link
Contributor Author

aljoscha commented Mar 3, 2016

It is certainly possible to find some way of doing it but it is not straightforward. So for now I wanted to keep it simple until we figure out a good way to do it.

@StephanEwen The window operator cannot call the rich methods since it doesn't know that there is one in there. The rich method lifecycle has to somehow be managed by the StateBackend/State. So then you need to figure out when to call the methods. When creating the StateBackend it's not possible since the state is not there yet. So you maybe call it when the state is first created. Do you call close when you clean out the state for all keys, since then the state is technically not there anymore. Then, do you call open again when it is created again? Also, what about the state methods on RuntimeContext. I don't think you can use ordinary RichFunctions there. (Same probably goes for most of the other methods on RuntimeContext.)

I think we need to find another way for these methods to have some kind of lifecycle management but it requires some thought.

@StephanEwen
Copy link
Contributor

Okay, that sounds like a design flaw in hiding the aggregating functions completely from the window operator in the state. I think the operator should be aware of the additional function, if only to open()/close() it.

The fact that a RichReduceFunction cannot be used at all seems like a huge limitation.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants