Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-18946][ML] sliceAggregate which is a new aggregate operator for high-dimensional data #17000

Closed
wants to merge 5 commits into from

Conversation

ZunwenYou
Copy link

@ZunwenYou ZunwenYou commented Feb 20, 2017

In many machine learning cases, driver has to aggregate high-dimensional vectors/arrays from executors.
TreeAggregate is good solution for aggregating vectors to driver, and you can increase depth of tree when data is large.
However, treeAggregate would still failed, when the parition number of RDD and the dimension of vector grows up.

We propose a new operator of RDD, named sliceAggregate, which split the vector into n slices and each slice is assigned a key(from 0 to n-1). The RDD[key, slice] will be transform to RDD[slice] by using reduceByKey operator.
Finally driver will collect and compose the n slices to obtain result.

qq 20170220214746

I run an experiment which calculate the statistic values of features.
The number of samples is 1000. The feature dimension ranges from 10k to 20m, the comparition of time cost between treeAggregate and sliceAggregate is shown as follows. When feature dimension reach 20 million, treeAggregate was failed.

The table of time cost(ms) between sliceAggregate and treeAggregate.

feature dimension sliceAggregate treeAggregate
10K 617 607
100K 1470 967
1M 4019 4731
2.5M 7679 13348
5M 14722 22858
7.5M 20821 36472
10M 28526 50184
20M 47014

image

The code relate to this experiment is here .

JIRA Issue: https://issues.apache.org/jira/browse/SPARK-18946

@MLnick
Copy link
Contributor

MLnick commented Feb 20, 2017

Just to be clear - this is essentially just splitting an array up into smaller chunks so that overall communication is more efficient? It would be good to look at why Spark is not doing a good job with one big array. Is the bottleneck really the executor communication (shuffle part)? Or is it collecting the big array back at the end of tree aggregation (ie this patch sort of allows more concurrency in the collect operation)?

cc @dbtsai @sethah @yanboliang who were looking at linear model scalability recently.

@hhbyyh
Copy link
Contributor

hhbyyh commented Feb 20, 2017

Hi @ZunwenYou Thanks for sharing the implementation with us.
Do you know what's the reason that treeAggregate failed when feature dimension reach 20 million?
I think this potentially can help with the 2G disk shuffle spill limit. (to be verified), but we should evaluate the extra memory consumption due to the slice and copy.

@ZunwenYou
Copy link
Author

Hi, @MLnick
You are right, sliceAggregate splits an array into smaller chunks before shuffle.
It has three advantage
Firstly, the shuffle data is less than treeAggregate during the whole transformation operation.
Secondly, as your description, it allows more concurrency, not only during the collect operation of driver, but also in the process of run seqOp and combOp.
Thirdly, as I observed, when an record is larger than 1G Bit(an array of 100 million dimension), shuffle among executors becomes less efficiency. At the same time, the rest of executos is waiting. I am not clear the reason for this.

@ZunwenYou
Copy link
Author

Hi, @hhbyyh

In our experiment, the class MultivariateOnlineSummarizer contains 8 arrays, if the dimension reaches 20 million, the memory of MultivariateOnlineSummarizer is 1280M(8Bit* 20M * 8).

The experiment configuration as follows:
spark.driver.maxResultSize 6g
spark.kryoserializer.buffer.max 2047m
driver-memory 20g
num-executors 100
executor-cores 2
executor-memory 15g

RDD and aggregate parameter:
RDD partition number 300
treeAggregate depth 5
As the description of configuration, treeAggregate will run into four stages, each stage task number is 300, 75, 18, 4.
At the last stage of treeAggrate, tasks will be killed, because executors throw exception java.lang.OutOfMemoryError: Requested array size exceeds VM limit.
I set treeAggregate depth=7, executor-memory=30g, the last stage still failed.

@MLnick
Copy link
Contributor

MLnick commented Feb 21, 2017

Is the speedup coming mostly from the MultivariateOnlineSummarizer stage?

See https://issues.apache.org/jira/browse/SPARK-19634 which is for porting this operation to use DataFrame UDAF and for computing only the required metrics (instead of forcing computing all as is done currently). I wonder how that will compare?

@ZunwenYou
Copy link
Author

Hi, @MLnick
Firstly, sliceAggregate is a common aggregate for array-like data. Besides MultivariateOnlineSummarizer case, it can be used in many large machine learning cases. I chose MultivariateOnlineSummarizer to do our experiment, just because it is really a bottleneck of LogisticRegression in ml package.

This is a good improvement for MultivariateOnlineSummarizer, but I do not think it's a good idea to compare these two improvement. In my opinion, it is reasonable to compare sliceAggregate to treeAggregate.

@MLnick
Copy link
Contributor

MLnick commented Feb 23, 2017

@ZunwenYou yes I understand that the sliceAggregate is different from SPARK-19634 and more comparable to treeAggregate. But I'm not sure, if we plan to port the vector summary to use DataFrame based UDAF, whether we can incorporate the benefit of sliceAggregate.

So my point would probably be to try to see how much benefit accrues from (a) using UDAF mechanism and (b) not computing unnecessary things. Then we can compare to the benefit here and decide.

@MLnick
Copy link
Contributor

MLnick commented Feb 23, 2017

I'm not totally certain there will be some huge benefit with porting vector summary to UDAF framework. But there are API-level benefits to doing so. Perhaps there is a way to incorporate the sliceAggregate idea into the summarizer or into catalyst operations that work with arrays...

@MLnick
Copy link
Contributor

MLnick commented Feb 24, 2017

cc @yanboliang - it seems actually similar in effect to the VL-BFGS work with RDD-based coefficients?

@ZunwenYou
Copy link
Author

ping @yanboliang , please has a look at this improvement.

@WeichenXu123
Copy link
Contributor

WeichenXu123 commented Nov 6, 2017

@MLnick It looks like VF-LBFGS has a different scenario. In VF algos, the vectors will be too large to store in driver memory, so we slice the vectors into different machines (stored by `RDD[Vector], and the use partitionID as slice key, and one RDD only store one vector).
and , in VF-LBFGS, there're only very few large vectors(usually 4-10 vectors) need to aggregate together. so, what this PR do looks different with VF-LBFGS.

and, about VF-LBFGS, the training dataset, each instance feaure is a high dimension but very sparse vector, the features data, in VF-LBFGS, will be transformed into BlockMatrix format, for following computation and aggregation with coefficients RDD (which is a dense high dimension vector stored by an RDD[Vector]). So the feature aggregation in VF algos we also do not use the way in this PR.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Feb 11, 2020
@github-actions github-actions bot closed this Feb 12, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
6 participants