Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-3659] Allow ConnectedStreams to Be Keyed on Only One Side #1831

Closed
wants to merge 1 commit into from

Conversation

aljoscha
Copy link
Contributor

No description provided.

@gyfora
Copy link
Contributor

gyfora commented Mar 23, 2016

Thanks Aljoscha, this seems to work 👍

@gyfora
Copy link
Contributor

gyfora commented Mar 24, 2016

I think you can go ahead merging this if no-one has any objections :)

@StephanEwen
Copy link
Contributor

Can someone elaborate on the semantics?

I am against merging something that changes semantics and has zero description.

@StephanEwen
Copy link
Contributor

For example, how does keyed state work for the input side that is not key partitioned? How is the key found? How is partitioning guaranteed?

@gyfora
Copy link
Contributor

gyfora commented Mar 24, 2016

@StephanEwen

This PR does not change the behaviour of any existing Flink applications.

It now allows though that users only specify key of one input of the comapfunctions for instance:
input1.keyBy(...).connect(input2.broadcast())

This was previously impossible which probably blocks many existing use-cases (actually blocking one of my own applications that I try to build on Flink) where one input does not have associated state.

After this change the key-value state defined by the keyed input stream works as expected.

The only not so fortunate behaviour is that users can still call state.value() for inputs from the non-keyed stream and the behaviour is not clearly defined. If there was already input from the other side it returns the state for the last key, otherwise it will probably throw a nullpointer exception.

I think this is acceptable behaviour for the time being because well written programs will work as expected. We can think about how we want to handle the other non-keyed input but that will probably include changing many things in the KvBackends so they can do this properly. This problem already exists in flink as state access outside of the processing method is not well defined.

@aljoscha
Copy link
Contributor Author

To elaborate on this. State right now works well if you stick to the (admittedly somewhat hidden) rules. That is, you should only access state if there is a key available.

If there is no key available the behavior changes in unexpected ways based on what state backend is used and the capabilities of the key serializer. For example, let's look at access to ValueState in open(). For mem/fs state: ValueState.value() works, it will return the default value. ValueState.update() will throw a NPE. For RocksDB state: Neither method works if the key serializer cannot handle null values. If it can, then both methods will change state for the null key.

For these reasons I would like to change the semantics of state such that the user always has to call getState (or a similar method) and that the returned accessor object is documented to only be valid for the duration of the processing method. Right now, the user can wreak all kinds of havoc by down-casting the returned State object. Right now we have a very simple system that works if the user keeps to the rules and also makes things go fast. If we want to make it more restrictive we will lose some performance, of course.

@tillrohrmann
Copy link
Contributor

I like @aljoscha's idea to separate more explicitly the user state access and it's implementation. Having an accessor would also allow us to get rid of the swapping of the actual state objects which are wrapped by the State objects. Then the implementation of a StateBackend wouldn't have to be spread out over the KvState classes anymore. This again would make it easier to integrate the notion of virtual state partitions/shards into StateBackends. So in general, I think it would simplify our current StateBackend implementations noticeably.

@StephanEwen
Copy link
Contributor

Thanks for describing this.

This has quite some big implications, as far as I can see it. The state in the connected stream is now a "broadcast state" not partitioned, so allowing to do that on the key/value state probably breaks some ongoing efforts, like scaling, etc.

How about a more clean separation of these things:

  1. The connected streams (fully partitioned or not)
  2. Broadcast inputs, which are similar to the "side inputs" in cloud dataflow or the broadcast variables in DataSet.

That gives us

  • clean semantics, behavior that users can work with
  • We do not overcomplicate the key/value abstraction
  • and we can also make sure we checkpoint the broadcast state once only (rather than on each parallel subtask).

@aljoscha
Copy link
Contributor Author

Yes, I'll try and come up with Ideas in that direction then. 👍

@aljoscha
Copy link
Contributor Author

Closing this for now...

@aljoscha aljoscha closed this Mar 31, 2016
@aljoscha aljoscha deleted the connected-streams-relax branch December 16, 2016 13:12
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants