Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-7] [Runtime] Enable Range Partitioner. #1255

Closed
wants to merge 10 commits into from

Conversation

ChengXiangLi
Copy link

Updated at 11/05
This PR handle the range partition with automatic sampled data distribution. Currently, this PR rewrite the PlanNode DAG in Optimizer to support range partitioner, the rewrite logic is as following:
image
To be noted, the PartitionNode and PartitionIDRemoverNode is switched in our final implementation.

@fhueske
Copy link
Contributor

fhueske commented Oct 14, 2015

Hi @ChengXiangLi, thanks a lot for the PR!
Range partitioning has been very long on our feature list ;-)
I'll look into this in the next days.

@ChengXiangLi
Copy link
Author

Thanks for the review in advance, @fhueske . Something i want to mention here is that, the range partitioner implementation in this PR follow the existed partitioner design pattern, which in my personal view, is not well designed by the principle of object-oriented programing, the parameters of different partitioners are passed through Flink all together, and all partitioners logic mixed in OutputEmitter, i'm not very satisfied with the implementation in this PR actually. I think we need refactor the partitioner as a followup work, but this should not block this PR. Range partitioner is a little different from other partitioners, with it enabled, we could have a better overview about how to design the abstraction of partitioner. The refactor should require a design doc and fully discussed in community, what do you think about this?

@fhueske
Copy link
Contributor

fhueske commented Oct 16, 2015

Hi @ChengXiangLi, I had a look at your PR and I think we need to change the implementation a bit.
Right now, it executes an additional job for each range partition operator to obtain a sample. The additional job executes the full program and samples at the end. Imagine a complex program that includes for instance several joins and wants to write out the result in a total order, i.e., range partitions and sorts the result before it writes to the final sink. With the current implementation, this would mean that the expensive job is executed twice.

It would be better to inject the sampling into the actual job. This can be done for example as follow.
For a program such as:

DataSet x = ...
x.rangePartition(0).reduce(...)

could be translated into:

DataSet<X> x = ...
DataSet<Distr> dist = x.mapPartition("sample").reduce("collect samples and build distribution");
DataSet<Tuple2<Integer,X>> xWithPIDs = x
  .map("assign PartitionIDs).withBroadcastSet(dist, "distribution");

This would inject the sampling into the original program. The sampling is done as before, but the data distribution is broadcasted to a map operator that uses the distribution to assign partition IDs to records and converts the DataSet<X> into a DataSet<Tuple2<Integer, X>> similar as the KeySelector. Once the partition IDs are assigned, a RangePartitionOperator could partition the tuples on the first field (f0) with a simple Int-DataDistribution (0,1,2,3,4,..., n). Finally, the DataSet needs to be unwrapped, i.e, converted from DataSet<Tuple2<Integer,X>> to DataSet<X>.

I agree it is not super nice, but this implementationx would cache the intermediate result instead of recomputing it. In addition it barely touches the internals.

It is also possible to integrate the partitioning more tightly into the runtime by providing the data distribution directly to the Partitioner. However, that would mean we need to implement a partitioning operator for the runtime (instead of using the regular operator and a NOOP driver).

Btw. I have some code lying around (for a not-yet-completed features) to extract keys from a record given the key specification. Let me know if that would help for your implementation.

Regarding the implementation of the Partitioner and OutputEmitter, I am very open for suggestions for how to improve the design. As you said, I would bring this discussion to the dev mailing list or open a JIRA and start a discussion there.

What do you think? Thanks, Fabian

@ChengXiangLi
Copy link
Author

Thanks, @fhueske , it's definitely great that if we can only execute the pre-sample logic just once, i would update the code later.

@fhueske
Copy link
Contributor

fhueske commented Oct 19, 2015

I think the right way to implement the range partitioner feature is to inject the sampler during the JobGraph generation phase after the OptimizedPlan has been created. This would allow to transparently handle range partitioning during optimization and make it a completely runtime-related feature.

@ChengXiangLi
Copy link
Author

Implemented the range partitioner with KeySelector based on broadcast data distribution, it's not fully finished yet, @fhueske , for the range partitioner with filed index or field name, it should require the tool you have mentioned before about extracting keys from a record.

public void mapPartition(Iterable<Tuple2<K, IN>> values, Collector<Tuple2<Integer, IN>> out) throws Exception {

List<Object> broadcastVariable = getRuntimeContext().getBroadcastVariable("DataDistribution");
if (broadcastVariable == null || broadcastVariable.size() != 1) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can move the broadcast variable initialization into the open method.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nevermind, I thought you were using a MapFunction, but its a MapPartitionFunction. So this is only done once.

@fhueske
Copy link
Contributor

fhueske commented Oct 26, 2015

Hi, I left a few comments inside.
I don't think we need the special user code to extract keys. The comparators provide this functionality and can be generated within the JobGraphGenerator.

Let me know what you think.

@ChengXiangLi ChengXiangLi force-pushed the rangepartitioner branch 2 times, most recently from a39dc9a to fe279eb Compare October 29, 2015 06:16
@ChengXiangLi
Copy link
Author

I add the RangePartitionRewriter at Optimizer.comple() finally, as it rewrite OptimizedPlan, and utilize the JavaApiPostPass afterward.

* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.java.functions;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The range partition functions (AssignRangeIndex, RangeBoundaryBuilder, and PartitionIDRemoveWrapper) should be moved from flink-java to flink-runtime because they are rather part of the runtime than of the user-facing API.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should they be part of flink-optimizer? The rewrite happens during optimization phase, we have not touch runtime yet.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My intention was to move the classes to flink-runtime because this is where most of the execution code is located, including the execution vertices. flink-optimizer does not contain execution code AFAIK and does only translates the API code into an executable representation.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, make sense to me.

@fhueske
Copy link
Contributor

fhueske commented Dec 7, 2015

Hi @ChengXiangLi, I tried the PR locally on my machine and it was working quite well.
I found a few minor issues:

  • You don't need to set keys and sort order for FORWARD shipping strategies in RangePartitionRewriter line 118
  • You set the DataExchangeMode twice in line 163 and 166 or RangePartitionRewriter
  • We need to set costs and properties for the PlanNodes that we create. Otherwise ExecutionEnvironment.getExecutionPlan() fails with a NPE. Costs can be 0, IMO but we should set the correct physical properties because these are visualized and might confuse users.
  • The injected plan node should have good names that indicate that they belong to the range partitioner. I suggest:
    • sipPlanNode: "RangePartition: LocalSample"
    • sicPlanNode: "RangePartition: GlobalSample"
    • rbPlanNode: "RangePartition: Histogram"
    • ariPlanNode: "RangePartition: Partition"
    • prPlanNode: "RangePartition: Partition" (IMO its fine to have the same name here. Too much detail such as assign id and unwrap might just confuse)

What do you think?

Btw. The Record specific code paths (incl. the RecordOutputEmitter) have been removed.

@fhueske
Copy link
Contributor

fhueske commented Dec 8, 2015

Thanks for the fast update!
Your last commit seems to include changes that have already been merged to the master branch. Can you revert it and rebase the second last commit on the current master?

I also got feedback from @StephanEwen about the use of the IntDistribution. He shares your opinion, that it is not really needed and just incurs additional overhead due to the second binary search. He proposes to use a PARTITION_CUSTOM strategy instead of a PARTITION_RANGE strategy with a Partitioner that simply returns the partition id. You basically can replace
partChannel.setShipStrategy(ShipStrategyType.PARTITION_RANGE, keys, sortDirection, null, DataExchangeMode.PIPELINED);
by
partChannel.setShipStrategy(ShipStrategyType.PARTITION_CUSTOM, keys, new IdPartitioner(), DataExchangeMode.PIPELINED);
and provide an implementation for IdPartitioner. This way, we have still support for generic DataDistribution range partitioning (although not being used ATM) and a more efficient automatic range partitioning.

On last very small remark. Could you please set the parallelism also for the OptimizerNodes (sipNode, sicNode, etc.)? Otherwise, the JSON output will have some "parallelism": "default" values.

I think we're almost there. :-)
Thanks a lot, Fabian

@ChengXiangLi ChengXiangLi force-pushed the rangepartitioner branch 4 times, most recently from be08030 to a9c1af7 Compare December 9, 2015 03:48
@ChengXiangLi
Copy link
Author

Thanks, @fhueske . I've updated the PR to use PARTITION_CUSTOM. The fail test should be unrelated.

@fhueske
Copy link
Contributor

fhueske commented Dec 11, 2015

Thanks for the update. Looks good. Will try it once more on a cluster.

@uce, @StephanEwen , we inject a DataExchangeMode.BATCH for range partitioning (RangePartitionRewriter, line 168). IIRC, there are some implication wrt. to iterations. Will that work?

@uce
Copy link
Contributor

uce commented Dec 17, 2015

Sorry for not responding earlier. I've missed the notification somehow.

Batch exchange (the blocking intermediate results) do not work within iterations. Therefore the optimizer never sets the exchange mode to batch within iterations currently.

I think we need to suppress range partitionings within iterations for the time being.

@fhueske
Copy link
Contributor

fhueske commented Dec 17, 2015

Thanks @uce! I extended the RangePartitionRewriter to cover that case and will include the changes when I merge the PR.

I also found that no combiners are inserted if the input of a reduce or combinable groupreduce is explicitly partitioned (FLINK-3179). This is not directly related to this PR (because it also affects partitionByHash and partitionCustom), but should be fixed soon.

I am trying the PR right now on a cluster setup. Thanks @ChengXiangLi for your patience with this one!

@fhueske
Copy link
Contributor

fhueske commented Dec 17, 2015

I executed the Flink WordCount example on 4 nodes with 8 parallel tasks and roughly 17GB of input data once with hash partitioning and once with range partitioning. Both times no combiner was used.
First of all, both programs compute the same result and the result of the range partitioned WordCount is ordered. So the implementation is correct.

The hash partitioned WC took 8:00 mins and the range partitioned 13:17 mins.
The breakdown of the range partitioned WC looks as follows:

  1. Source+FlatMap: 3:01 mins
  2. LocalSample: 3:01 mins
  3. GlobalSample: 0:15 mins
  4. Histogram: 24 ms
  5. PreparePartition: 8:49 mins
  6. Partition: 8:48 mins
  7. GroupReduce: 10:14 mins
  8. Sink: 1:09 mins

The breakdown of the hash partitioned WC is:

  1. Source + FlatMap: 6:26 mins
  2. Partition: 6:25 mins
  3. GroupReduce: 7:58 mins
  4. Sink: 1:21 mins

So, the overhead of the range partitioned WC comes from additional IO of reading the flatMapped words and the additional 4-byte integer. Also the sorting of the group reduce does not happen concurrently with the source IO. Reducing (w/o sort) and sink take about the same amount of time.

I also check the distribution of input and output records / bytes for the GroupReduce.

min records-in min bytes-in max records-in max bytes-in
Hash 197M 1.82GB 346M 2.90GB
Range 177M 1.41GB 352M 2.90GB
min records-out min bytes-out max records-out max bytes-out
Hash 2.5M 26.5MB 2.5M 26.5MB
Range 2.3K 28.2KB 14M 154MB

We see that the range partitioner does not perform better (in fact a bit worse) than the hash partitioner (the differences for output records are expected). Maybe increasing the sample size helps? The overhead of reading the the intermediate data set from disk is so high, that a more fine-grained histogram can be justified, IMO. How about increasing the sample size from parallelism * 20 to parallelism * 1000?

Other thoughts?

@ChengXiangLi
Copy link
Author

Hi, @fhueske , For the partition part, i think it's normal that RangePartition is slower than HashParition, as you've mentioned, RangePartition introduce more overhead. The most difference between HashParition and RangePartition is that, HashParition is key-wise partition(elements with same key would shuffled to same target), and RangePartition is key-wise and partition-wise partition(the partition is in order as well), so for global order, we can sort in parallel after RangePartition, that's what we can benefit from RangePartition.
On the other side, it's still make sense to improve RangePartition performance, although i don't think increasing the sample size would help here. Based on my previous calculation and test, parallelism * 20 is enough to generate well-proportioned partitions. Do you find there is data skew in any partition after RangePartition?

@fhueske
Copy link
Contributor

fhueske commented Dec 18, 2015

Range partitioning serves two purposes:

  1. producing fully sorted results.
  2. evenly balancing the load in case of skewed key distributions.

Producing sorted results is working fine. However, producing balanced partitions does not seem to work so well. Looking at the numbers I posted, the partitions produced by the range partitioner are less balanced than the hash partitioned ones (records-in / bytes-in). The difference is not huge, but still range partitioning should be able to do better than hash partitioning.

I proposed to increase the sample size, because this should improve the accuracy of the histogram without having a (measurable) impact on the performance. If we pay so much time to generate a histogram, the histogram should be accurate enough to result in balanced partitions.

Can you explain how you calculated the sample size of parallelism * 20?

@ChengXiangLi
Copy link
Author

Sorry, @fhueske , i misunderstood your test data, the keys should be skewed on some value, while in my previous test, the keys are not skewed. it's complicate to calculate how many samples should be taken from a dataset to meet an a priori specified accuracy guarantee, one of the algorithm is described at http://research.microsoft.com/pubs/159275/MSR-TR-2012-18.pdf which i used before, but it should not totally fit into the case which keys are skewed.
Would you continue to test how much it required to make partition roughly balanced? Raise the sample number should not add much overhead, i'm totally support of it.

@fhueske
Copy link
Contributor

fhueske commented Dec 21, 2015

Thanks @ChengXiangLi.
I will increase the sample size to 1000 elements per partition and finally merge this PR :-)

Thanks for your patience!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants