Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Minor] Fixes on top of #1679 #1736

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,8 @@ private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: Spar
metricRegistry.register(MetricRegistry.name("memory", "memUsed_MB"), new Gauge[Long] {
override def getValue: Long = {
val storageStatusList = blockManager.master.getStorageStatus
val maxMem = storageStatusList.map(_.maxMem).sum
val remainingMem = storageStatusList.map(_.memRemaining).sum
(maxMem - remainingMem) / 1024 / 1024
val memUsed = storageStatusList.map(_.memUsed).sum
memUsed / 1024 / 1024
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: / (1024 * 1024) instead ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do the double slash in other places as well. Changing those too would touch upon things I didn't touch in #1679, so I'd rather keep the changes in this PR at the minimum while keeping the code consistent with the other places in this file

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bad code is bad code :-)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, it is just a nit - so please dont let this block a commit !

}
})

Expand Down
11 changes: 4 additions & 7 deletions core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -172,16 +172,13 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {
def memRemaining: Long = maxMem - memUsed

/** Return the memory used by this block manager. */
def memUsed: Long =
_nonRddStorageInfo._1 + _rddBlocks.keys.toSeq.map(memUsedByRdd).sum
def memUsed: Long = _nonRddStorageInfo._1 + _rddBlocks.keys.toSeq.map(memUsedByRdd).sum

/** Return the disk space used by this block manager. */
def diskUsed: Long =
_nonRddStorageInfo._2 + _rddBlocks.keys.toSeq.map(diskUsedByRdd).sum
def diskUsed: Long = _nonRddStorageInfo._2 + _rddBlocks.keys.toSeq.map(diskUsedByRdd).sum

/** Return the off-heap space used by this block manager. */
def offHeapUsed: Long =
_nonRddStorageInfo._3 + _rddBlocks.keys.toSeq.map(offHeapUsedByRdd).sum
def offHeapUsed: Long = _nonRddStorageInfo._3 + _rddBlocks.keys.toSeq.map(offHeapUsedByRdd).sum

/** Return the memory used by the given RDD in this block manager in O(1) time. */
def memUsedByRdd(rddId: Int): Long = _rddStorageInfo.get(rddId).map(_._1).getOrElse(0L)
Expand Down Expand Up @@ -246,7 +243,7 @@ private[spark] object StorageUtils {
val rddId = rddInfo.id
// Assume all blocks belonging to the same RDD have the same storage level
val storageLevel = statuses
.map(_.rddStorageLevel(rddId)).flatMap(s => s).headOption.getOrElse(StorageLevel.NONE)
.flatMap(_.rddStorageLevel(rddId)).headOption.getOrElse(StorageLevel.NONE)
val numCachedPartitions = statuses.map(_.numRddBlocksById(rddId)).sum
val memSize = statuses.map(_.memUsedByRdd(rddId)).sum
val diskSize = statuses.map(_.diskUsedByRdd(rddId)).sum
Expand Down