Skip to content

Conversation

@gagafunctor
Copy link
Contributor

Changes proposed:

  • Adding method to compute treeAggregate depth required to avoid exceeding driver max result size (first commit)
  • Using it in the computation of grammian of RowMatrix (second commit)

Tests:

  • Unit Test wise, one unit test checking the behavior of the depth computation method
  • Tested at scale on hadoop cluster by doing PCA on a large dataset (needed depth 3 to succeed)

Debatable choice:

I'm not sure if RDD API is the right place to put the depth computation method. The advantage of it is that it allows to access driver max result size, and rdd number of partitions, to set default arguments for the method. Semantically, such a method might belong to something like org.apache.spark.util.Utils though.

@gagafunctor
Copy link
Contributor Author

I think I messed up with scalafmt. Will fix that tomorrow and clean the PR.

@gagafunctor gagafunctor force-pushed the Heuristic_for_treeAggregate_depth branch from 07ca0ae to 72dab50 Compare March 6, 2019 10:10
@srowen
Copy link
Member

srowen commented Mar 6, 2019

@gagafunctor that's fine. Feel free to force-push changes or just close and open a new PR against a new branch if it's easier.

@gagafunctor gagafunctor force-pushed the Heuristic_for_treeAggregate_depth branch from 72dab50 to cf2e92a Compare March 6, 2019 13:58
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this is the right way to do it here. I was hesitating between:

  • Doing that, meaning using a standard way of estimating object size, at the cost of allocating the dense vector
  • Not allocating the dense vector, at the cost of estimating its size with an ad-hoc computation

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't allocate a potentially large vector just for this. Just go with the ad-hoc estimate of nt * 8 / 1000000
Nit: grammian -> gramian

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fine, I'll change that (and correct the typo in gramian)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't allocate a potentially large vector just for this. Just go with the ad-hoc estimate of nt * 8 / 1000000
Nit: grammian -> gramian

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shouldn't be in RDD; it can just live in RowMatrix for now. it can be private[spark] for testing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fine, I'll move that (and the test to RowMatrix class)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This value needs to be the result of conf.get(MAX_RESULT_SIZE)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, more concise way of getting that. Thanks for pointing me to the config package object, didn't know about this one !

Copy link
Contributor Author

@gagafunctor gagafunctor Mar 8, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So,playing in UTests with different ways to play with this config field made me realize something...
I tried the following:
conf.set(""spark.driver.maxResultSize", "10g") Then I get:

  • conf.get(MAX_RESULT_SIZE) returns 10737418240 -> it's in Bytes
  • Utils.memoryStringToMb(conf.get("spark.driver.maxResultSize")) returns 10240 -> It's in megabytes

So the units are as expected, but the results are a bit different from each other...

-> I'm not sure what's the best way to get the maxResultSize in Mb from code here:

  • Utils.memoryStringToMb(conf.get("spark.driver.maxResultSize"))
    OR
  • conf.get(MAX_RESULT_SIZE) / (1000 * 1000)

??
(Knowing that these might yield slightly different results, but I don't know if it's a big deal)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may be moot as I don't think you need to check this value, but what's the issue - memoryStringToMb does return megabytes. Actually, we have an unfortunate conflation of megabyte and mebibyte here, but it's not material for purposes here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I figured that there's no issue at all there. In fact, 10737418240 / (1024 * 1024) == 10240

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't check this condition here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I though it could avoid users loosing time by tryiing to do an aggregation that is doomed to fail...
But I can remove it if you feel it's better to let the user get a "total size of results exceeded driver.maxResultSize" error.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just use log; the base won't matter.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might be doing the math wrong here, but we want partitions^(1/depth) * vectorSize <= maxResultSize, right? I get that this needs depth >= log(partitions) / (log(maxResultSize) - log(vectorSize)) ?

I suppose we want some safety checks for 0 partitions or 0 aggregate size in MB.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, the formulae you post is actually what I tested at scale. Dunno why I didn't just copy paste it, I just brain farted on the maths when writing the PR ^^

@gagafunctor gagafunctor force-pushed the Heuristic_for_treeAggregate_depth branch from cf2e92a to 1f63cdc Compare March 7, 2019 10:57
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this needs to be (nt * 8) / 1000000 to be correct, but nt could overflow, so needs to be (nt * 8L) / 1000000 for example. You can cast back to int then.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure why, is it because (nt / 1M) * 8 would be hurt by double precision issues after nt/1M ?

Anyway I'll apply your suggestion

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are integers though. (999999/1000000)8 == 0, but (9999998)/1000000 == 7, which is more like correct. It has to be done using longs though to avoid overflow: ((nt * 8L) / 1000000).toInt

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: use === to trigger the scalatest macro that will print a better error if this check fails.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice, another trick I'll apply in my code from now on, thanks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it's a fair point, failing earlier. I think it is OK and simpler to not plumb through an extra arg here to check it here too. It will fail fairly quickly. I don't feel strongly about it but don't know if we need to bother checking it here at all.

Also can you get rid of numPartitions? you can create an empty or dummy RDD in the test with as many partitions as you like. It should always be using rows.getNumPartitions anyway I think.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay to get rid of numPartitions argument.
But with the same logic in mind, I guess I should I get rid of the maxDriverResultSizeInMb argument aswell, since we can access it from sparkConf.
Will do that in next update, to come soon.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's why I was saying just don't bother with the check at all.

@gagafunctor gagafunctor force-pushed the Heuristic_for_treeAggregate_depth branch from 1f63cdc to 8349870 Compare March 8, 2019 16:35
Copy link
Contributor Author

@gagafunctor gagafunctor Mar 8, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So about this line: I figured out that changing this will affect the sparkConf for tests that will be executed after this one aswell (I tried to set maxResultSize to 10bytes here and it broke several later tests).
Do you know what's the disired way to make this only change the sparkConf for the closure of this test ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have to change this from 2g for it to select a depth of 3? does a little larger number of partitions help? I thought you might have to reduce it if anything. There are ways to isolate the conf change to just the test, but if it's avoidable that's better

Copy link
Contributor Author

@gagafunctor gagafunctor Mar 11, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked the maxResultSize parameter value in the test and it's the default one (1gig, e.g 1024 Mb) -> I'll change the numPartitions and object size to:

  • numPartitions -> 100
  • objectSize -> 100 to get a desired depth of 2 (because sqrt(100) * 100 <= 1024)
  • objectSize -> 110 to get a desired depth of 3 (because sqrt(100) * 110 > 1024 and math.pow(100, 0.33) * 110 < 1024)

So the (semantically) same test doesn't require a change in the sharedContext spark conf

@gagafunctor gagafunctor force-pushed the Heuristic_for_treeAggregate_depth branch from 8349870 to b95f847 Compare March 8, 2019 16:39
@gagafunctor gagafunctor changed the title [SPARK-26881][core] Heuristic for tree aggregate depth [SPARK-26881][mllib] Heuristic for tree aggregate depth Mar 8, 2019
@gagafunctor gagafunctor force-pushed the Heuristic_for_treeAggregate_depth branch from b95f847 to f2cd5cc Compare March 11, 2019 10:28
@SparkQA
Copy link

SparkQA commented Mar 11, 2019

Test build #4606 has finished for PR 23983 at commit f2cd5cc.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The style checker is picky:

[error] /home/jenkins/workspace/NewSparkPullRequestBuilder/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala:27:0: There should at least one a single empty line separating groups 3rdParty and spark.
[error] /home/jenkins/workspace/NewSparkPullRequestBuilder/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala:29:0: org.apache.spark.internal.Logging is in wrong order relative to org.apache.spark.internal.config.MAX_RESULT_SIZE.
[error] Total time: 18 s, completed Mar 11, 2019 6:05:24 AM

I think you need to restore the blank line, and move this import below the following one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll try that.
However, I don't understand the rationale behind this import ordering error, as "org.apache.spark.internal.Logging" seems to be higher in alphabetic order than "org.apache.spark.internal.config.MAX_RESULT_SIZE"

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's sorted by package, then within the package. org.apache.spark.internal is considered to sort before subpackages like org.apache.spark.internal.config

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay makes sense, thanks for the clarification !

@gagafunctor gagafunctor force-pushed the Heuristic_for_treeAggregate_depth branch from f2cd5cc to 8013072 Compare March 13, 2019 16:34
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now this always logs a warning? Looks like this should be removed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, done.

@gagafunctor gagafunctor force-pushed the Heuristic_for_treeAggregate_depth branch from 8013072 to be7416b Compare March 18, 2019 10:01
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: reducting -> reducing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the style checker wants this blank line

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The use of 10241024 here is inconsistent with the 10001000 above. No big deal but maybe easier to just take a number of bytes as long in both cases

Copy link
Contributor Author

@gagafunctor gagafunctor Mar 18, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, I'll switch the 10001000 to 1024 * 1024, as it is the ratio between the result of conf.get(MAX_RESULT_SIZE) and the max result size in MegaBytes obtained from the method Utils.memoryStringToMb

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry to pick on this, but what about dealing in bytes here, not MB? I think we might have a problem if the aggregatedObjectSize is so small that it rounds down to 0 MB and then below you take the log of 0.

I apologize for only thinking about this now, but I think we have a problem when the object size is nearly equal to the max. The desired depth could be really big, like 1000 or more.

Indeed, the denominator can be 0 or negative. I suspect we want to not fail in this case but just use a max depth in those cases too.

How about capping the depth between 1 and, say, 10 to be safe? as a heuristic I don't think depths larger than that are reasonable anyway. Use 10 if denominator is <= 0.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added max depth capping, and changed unit for memory sizes (Mb -> Bytes).
Thing is it made me change the API (Int -> Long), as size in Bytes might be too big for Int (cf gramian size = nt * 8, with nt potentially already being a big Int).

@gagafunctor gagafunctor force-pushed the Heuristic_for_treeAggregate_depth branch 4 times, most recently from ada30a5 to f0453cc Compare March 20, 2019 13:57
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checking here is too late; the depth may be infinite or negative above. I think you'd have to check aggregatedObjectSizeInBytes == 0 and aggregatedObjectSizeInBytes > maxDriverResultSizeInBytes above.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fine, I'll put back the require(aggregatedObjectSizeInBytes > maxDriverResultSizeInBytes), and add one for aggregatedObjectSizeInBytes > 0

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd make the minimum 1 -- not sure why it was 2 before.

@gagafunctor gagafunctor force-pushed the Heuristic_for_treeAggregate_depth branch from f0453cc to 4b28f2c Compare March 20, 2019 15:23
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this can happen, but it's an OK check.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking this can use the max depth rather than fail, but, don't feel strongly about it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have strong opinion about that either. But I slightly prefer failing here rather than let a long job (depth 10 involves a lot of shuffle) that is doomed to fail run.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd make the minimum 1 -- not sure why it was 2 before.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One tiny wrinkle here: if this is very large the int could overflow. I'd just cast at the end when it's used as an arg to max

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@gagafunctor gagafunctor force-pushed the Heuristic_for_treeAggregate_depth branch from 4b28f2c to 7c4b666 Compare March 21, 2019 13:46
@SparkQA
Copy link

SparkQA commented Mar 21, 2019

Test build #4652 has finished for PR 23983 at commit 7c4b666.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gagafunctor gagafunctor force-pushed the Heuristic_for_treeAggregate_depth branch from 7c4b666 to 76c2780 Compare March 21, 2019 16:10
@gagafunctor
Copy link
Contributor Author

Test build #4652 has finished for PR 23983 at commit 7c4b666.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Just pushed a version fixing scala style issues.

@SparkQA
Copy link

SparkQA commented Mar 22, 2019

Test build #4653 has finished for PR 23983 at commit 76c2780.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we still have one last problem here, as before - if the object size is just a little bit less than max, then the denominator will be tiny and the desired depth will be very large, possibly overflowing an int. I'd just move the .toInt outside the min/max calls and it should be fine.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gagafunctor I think this is almost done. One more issue to deal with to make it tight.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@srowen Sorry for the lag, I was in vacations, away from my dev station.
I should be able to push an update by tuesday, but feel free to do it before if you want :)

@srowen
Copy link
Member

srowen commented Mar 31, 2019

@gagafunctor I can take this over to finish if you're unavailable

@gagafunctor gagafunctor force-pushed the Heuristic_for_treeAggregate_depth branch from 76c2780 to 643bfaf Compare April 1, 2019 16:44
@gagafunctor gagafunctor force-pushed the Heuristic_for_treeAggregate_depth branch from 643bfaf to 4f61d86 Compare April 1, 2019 17:05
@gagafunctor
Copy link
Contributor Author

@srowen I think I addressed your last change request, tell me if any other change is needed from your point of view.

@SparkQA
Copy link

SparkQA commented Apr 8, 2019

Test build #4697 has finished for PR 23983 at commit 4f61d86.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@srowen
Copy link
Member

srowen commented Apr 9, 2019

Merged to master

@srowen srowen closed this in dfa2328 Apr 9, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants