Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-2138] Added custom partitioning to DataStream #872

Closed
wants to merge 3 commits into from

Conversation

gaborhermann
Copy link
Contributor

Custom partitioning added to DataStream in order to be more consistent with the batch API.

@gyfora
Copy link
Contributor

gyfora commented Jun 26, 2015

Wouldn't it make sense to implement custom partitioning in a way that it allows to return a array of indexes like in the ChannelSelector interface? Returning only 1 index limits the partitioning very much.

Maybe the users could implement a ChannelSelector and we would wrap that.

@gaborhermann
Copy link
Contributor Author

I guess it is easier for the users to understand and partitioning to multiple channels at a time is rarely needed. Is there a use-case where it is needed?

It should be consistent with the batch API in my opinion. Let's start a discussion about this if we would like to change the custom partitioning.

@gyfora
Copy link
Contributor

gyfora commented Jun 27, 2015

I think I could find several use cases if I wanted to :) For example I would often like to broadcast some model information to many downstream operators at once. (not exactly broadcast ,maybe only to a couple of them)

Also even this does not give full flexibility. Imagine a scenario where I have a self loop, and I want to send something to all others (except myself), to do this I would also need to know my own channel index...

@StephanEwen
Copy link
Contributor

I actually like this approach. We had the same discussion for the batch API and resolved to this, because:

  • You can always chain a FlatMapFunction with a partitionCustom() request to solve all the above situations.
  • This interface allows easy Java8-lambda implementation and it works well with the type extraction.
  • It seems to cover the majority of cases more elegantly, as there is no need for array wrapping in the user code.

@gaborhermann
Copy link
Contributor Author

By the way, in the Scala DataSet the user should specify the Java Partitioner[K] class. Wouldn't it be more convenient to wrap a function like (K, Int) => Int into a Partitioner[K] similarly to the KeySelector?

@StephanEwen
Copy link
Contributor

In the batch API, equality of the partitioners is used to determine compatibility of the partitioning. This may at some point become interesting for the streaming API as well.

In any case, let's pick one of the two variants (function of partitioner implementation). Overloading the methods too much with equally powerful variants inevitable confuses users.

@gaborhermann
Copy link
Contributor Author

I'd prefer the function implementation (like (K, Int) => Int), but it should stay consistent with the batch API. I don't see why the wrapping would effect the compatibility checking of the partitioning.

Is it okay, if I change it to the function implementation in both (Scala batch, Scala streaming) APIs? If not, then let's just stick with the partitioner implementation in the APIs.

@StephanEwen
Copy link
Contributor

I am confused now, what is it going to be?

  1. Overloading, such that it is Scala function and Partitioner, at the cost of redundant APIs.
  2. Only partitioner (sync with batch API)
  3. Only Scala function (break with batch API)

I am not a big fan of (1), as these redundant options are confusing blow-ups of the APIs.

@gaborhermann
Copy link
Contributor Author

Sorry for not making myself clear.

I would actually go for
4. Only the Scala function (both in the streaming and batch API)

I don't understand how changing from partitioner implementation to function implementation in the batch API would mess up determining the compatibility of the partitioning. By compatibility I mean the type of the key must be the same as the input of the partitioner.

I suppose there was another reason (that I do not understand) for choosing the partitioner implementation for the Scala batch API, so if (4) is not an option, I would go for (2) (only partitioner, sync with batch API).

@StephanEwen
Copy link
Contributor

The partitioner function in Scala was simply added as a mirror of the Java API.

The batch API is stable, that means at most we can add a Scala function and deprecate the partitioner.

@gaborhermann
Copy link
Contributor Author

Okay, then I will

  • deprecate the partitioner implementation in the batch API
  • add the function implementation to the batch API
  • add the function implementation to the streaming API and remove the partitioner implementation (so streaming will only have function implementation). As this PR is not merged yet we do not break the streaming API.

Is it okay?
I guess it's worth it. This way Scala users will be able to write more concise code and they will not get confused by the overloaded functions because the ones with the partitioner will be deprecated.

@StephanEwen
Copy link
Contributor

How about we leave the batch API as it is for now and address that as a separate issue? There are quite some subtleties in how the optimizer assesses equality of partitioning (based on partitioners) that would have to be changed (and should retain backwards compatibility).

@gaborhermann
Copy link
Contributor Author

Okay then. These are the effects of changing I did not know about. Let's stick to (2) and later on, we might reconsider this.

@rmetzger
Copy link
Contributor

rmetzger commented Jul 1, 2015

-1
Documentation is missing.

http://flink.apache.org/coding-guidelines.html:

Documentation Updates. Many changes in the system will also affect the documentation (both JavaDocs and the user documentation in the docs/ directory.). Pull requests and patches are required to update the documentation accordingly, otherwise the change can not be accepted to the source code.

Could you also add an IT case that ensures that the data is actually partitioned properly? The test you've added is only ensuring that the partitioning properties are set correctly in the DataStream.

@gaborhermann
Copy link
Contributor Author

Sorry.

  • updated the docs (custom partitioning was also missing in the Scala batch API docs)
  • added IT case tests (also for the other stream partitioning methods as they were missing too)

@gyfora
Copy link
Contributor

gyfora commented Jul 10, 2015

Looks good to merge. If no objections, I will merge it tomorrow. 👍

@asfgit asfgit closed this in 3f3aeb7 Jul 13, 2015
mxm pushed a commit to mxm/flink that referenced this pull request Jul 14, 2015
shghatge pushed a commit to shghatge/flink that referenced this pull request Aug 8, 2015
nikste pushed a commit to nikste/flink that referenced this pull request Sep 29, 2015
nltran pushed a commit to nltran/flink that referenced this pull request Jan 8, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants