-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-20659][Core] Removing sc.getExecutorStorageStatus and making StorageStatus private #20546
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
If this change goes into the 2.3 branch then MimaExcludes.scala should be changed accordingly. |
This won't go into 2.3. Also, please don't copy & paste the bug title in your PR. Explain what you're doing instead. The current title does not explain what the change does. |
Test build #87216 has finished for PR 20546 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is in the right direction but I think it's worth to try to re-write getRDDStorageInfo
using the data from the status store instead. That might allow more code to go away.
getRDDStorageInfo(_ => true) | ||
} | ||
|
||
private[spark] def getRDDStorageInfo(filter: RDD[_] => Boolean): Array[RDDInfo] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a single call to this method outside of tests, in RDD.toDebugString
. That to me makes it another candidate to go away and be replaced with information from the AppStatusStore
. Then maybe you can remove more code from StorageStatus
.
Have you taken a look at that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have found something and I am not sure whether is it a bug and where to look for regarding its correction:
Using rddStorageInfo.numCachedPartitions gives back a different value then the old storage util computed/updated rddInfo.numCachedPartitions.
This is why I changed the assert in org.apache.spark.repl.SingletonReplSuite at "replicating blocks of object with class defined in repl".
Is it a good idea to open a new jira issue if the bug is in the existing rddStorageInfo.numCachedPartitions calculation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If those values differ then it's probably a bug in the new code. Or maybe a bug in the old code, although that's less likely. It would be good to investigate why they differ.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The old code considered the replication factor. I have created a separate Jira issue: https://issues.apache.org/jira/browse/SPARK-23394.
@DeveloperApi | ||
@deprecated("This class may be removed or made private in a future release.", "2.2.0") | ||
class StorageStatus( | ||
private [spark] class StorageStatus( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: no space after private
* | ||
* We store RDD blocks and non-RDD blocks separately to allow quick retrievals of RDD blocks. | ||
* These collections should only be mutated through the add/update/removeBlock methods. | ||
* These collections should only be mutated through the addBlock method. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is pretty out of date now. I don't see any calls to addBlock
outside of this class.
val data = sc.parallelize(1 to 1000, 10) | ||
val cachedData = data.persist(storageLevel) | ||
assert(cachedData.count === 1000) | ||
assert(sc.getExecutorStorageStatus.map(_.rddBlocksById(cachedData.id).size).sum === |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could replace these with code based on sc.statusStore
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using sc.statusStore here would also cause the bug I mentioned above (rddStorageInfo.numCachedPartitions difference). In many cases as testCaching method is called several times and this is why I left untouched.
* we submit a request to kill them. This must be called before each kill request. | ||
*/ | ||
private def syncExecutors(sc: SparkContext): Unit = { | ||
val driverExecutors = sc.getExecutorStorageStatus |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could replace this with code based on sc.statusStore
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have tried to use "sc.statusStore.executorList(true)" instead of sc.env.blockManager.master.getStorageStatus but the test failed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Failed how? The list kept by the block manager and by the status store should be the same, so if they differ, there's a problem somewhere.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As only registered executors can be killed this part synchronised the executors known by the master and the driver (the missing executors from the driver was registered with some mock data). The reason behind was some performance.
I have run tried to change the code to wait for the executors and it get very slow (one test took even 50 seconds long).
Test build #87232 has finished for PR 20546 at commit
|
Test build #87265 has finished for PR 20546 at commit
|
Test build #87266 has finished for PR 20546 at commit
|
Test build #87336 has finished for PR 20546 at commit
|
Test build #87345 has finished for PR 20546 at commit
|
Test build #87361 has finished for PR 20546 at commit
|
retest this please |
Test build #87378 has finished for PR 20546 at commit
|
LGTM, merging to master. I filed SPARK-23411 to deprecate the other API I missed before. |
int port(); | ||
long cacheSize(); | ||
int numRunningTasks(); | ||
long usedOnHeapStorageMemory(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe I'm missing something here, but do we already have a real use case for the added memory metrics here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This information was exposed by the public method being removed by this PR, so it makes sense to add these so that people still have a way to get that data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you are referring to StorageStatus
.onHeapMemUsed
/offHeapMemUsed
/onHeapMemRemaining
/offHeapMemRemaining
, it makes great sense then. But I still don't quite get a few things:
- Why do we use a different name format, while we could have let them be the same as in
StorageStatus
? - Are these information all we need to add to
SparkExecutorInfo
? After this change, how do we expose the information of disk usage?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- That follows the names in the public
MemoryMetrics
class from the REST API. - We could add that, just as we could add a whole lot of other things. At some point we should look at exposing the REST API types directly through
SparkStatusTracker
instead of having these mirror types.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not disagreeing with you on making such changes, but I'm also worrying about users could have to change their code a lot because of the changes we made. If you don't mind, may I submit a follow up PR to minimize the gap between the SparkExecutorInfo
and StorageStatus
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. But users put themselves at those kind of risks by using @DeveloperApi
methods, especially ones that have been deprecated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, the changes should be a minor issue. Thanks for explanations!
What changes were proposed in this pull request?
In this PR StorageStatus is made to private and simplified a bit moreover SparkContext.getExecutorStorageStatus method is removed. The reason of keeping StorageStatus is that it is usage from SparkContext.getRDDStorageInfo.
Instead of the method SparkContext.getExecutorStorageStatus executor infos are extended with additional memory metrics such as usedOnHeapStorageMemory, usedOffHeapStorageMemory, totalOnHeapStorageMemory, totalOffHeapStorageMemory.
How was this patch tested?
By running existing unit tests.