Skip to content

Commit

Permalink
[SPARK-20280][CORE] FileStatusCache Weigher integer overflow
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

Weigher.weigh needs to return Int but it is possible for an Array[FileStatus] to have size > Int.maxValue. To avoid this, the size is scaled down by a factor of 32. The maximumWeight of the cache is also scaled down by the same factor.

## How was this patch tested?
New test in FileIndexSuite

Author: Bogdan Raducanu <bogdan@databricks.com>

Closes #17591 from bogdanrdc/SPARK-20280.
  • Loading branch information
bogdanrdc authored and hvanhovell committed Apr 10, 2017
1 parent a26e3ed commit f6dd8e0
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,27 +94,48 @@ private class SharedInMemoryCache(maxSizeInBytes: Long) extends Logging {
// Opaque object that uniquely identifies a shared cache user
private type ClientId = Object


private val warnedAboutEviction = new AtomicBoolean(false)

// we use a composite cache key in order to distinguish entries inserted by different clients
private val cache: Cache[(ClientId, Path), Array[FileStatus]] = CacheBuilder.newBuilder()
.weigher(new Weigher[(ClientId, Path), Array[FileStatus]] {
private val cache: Cache[(ClientId, Path), Array[FileStatus]] = {
// [[Weigher]].weigh returns Int so we could only cache objects < 2GB
// instead, the weight is divided by this factor (which is smaller
// than the size of one [[FileStatus]]).
// so it will support objects up to 64GB in size.
val weightScale = 32
val weigher = new Weigher[(ClientId, Path), Array[FileStatus]] {
override def weigh(key: (ClientId, Path), value: Array[FileStatus]): Int = {
(SizeEstimator.estimate(key) + SizeEstimator.estimate(value)).toInt
}})
.removalListener(new RemovalListener[(ClientId, Path), Array[FileStatus]]() {
override def onRemoval(removed: RemovalNotification[(ClientId, Path), Array[FileStatus]])
: Unit = {
val estimate = (SizeEstimator.estimate(key) + SizeEstimator.estimate(value)) / weightScale
if (estimate > Int.MaxValue) {
logWarning(s"Cached table partition metadata size is too big. Approximating to " +
s"${Int.MaxValue.toLong * weightScale}.")
Int.MaxValue
} else {
estimate.toInt
}
}
}
val removalListener = new RemovalListener[(ClientId, Path), Array[FileStatus]]() {
override def onRemoval(
removed: RemovalNotification[(ClientId, Path),
Array[FileStatus]]): Unit = {
if (removed.getCause == RemovalCause.SIZE &&
warnedAboutEviction.compareAndSet(false, true)) {
warnedAboutEviction.compareAndSet(false, true)) {
logWarning(
"Evicting cached table partition metadata from memory due to size constraints " +
"(spark.sql.hive.filesourcePartitionFileCacheSize = " + maxSizeInBytes + " bytes). " +
"This may impact query planning performance.")
"(spark.sql.hive.filesourcePartitionFileCacheSize = "
+ maxSizeInBytes + " bytes). This may impact query planning performance.")
}
}})
.maximumWeight(maxSizeInBytes)
.build[(ClientId, Path), Array[FileStatus]]()
}
}
CacheBuilder.newBuilder()
.weigher(weigher)
.removalListener(removalListener)
.maximumWeight(maxSizeInBytes / weightScale)
.build[(ClientId, Path), Array[FileStatus]]()
}


/**
* @return a FileStatusCache that does not share any entries with any other client, but does
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.metrics.source.HiveCatalogMetrics
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.util.{KnownSizeEstimation, SizeEstimator}

class FileIndexSuite extends SharedSQLContext {

Expand Down Expand Up @@ -220,6 +221,21 @@ class FileIndexSuite extends SharedSQLContext {
assert(catalog.leafDirPaths.head == fs.makeQualified(dirPath))
}
}

test("SPARK-20280 - FileStatusCache with a partition with very many files") {
/* fake the size, otherwise we need to allocate 2GB of data to trigger this bug */
class MyFileStatus extends FileStatus with KnownSizeEstimation {
override def estimatedSize: Long = 1000 * 1000 * 1000
}
/* files * MyFileStatus.estimatedSize should overflow to negative integer
* so, make it between 2bn and 4bn
*/
val files = (1 to 3).map { i =>
new MyFileStatus()
}
val fileStatusCache = FileStatusCache.getOrCreate(spark)
fileStatusCache.putLeafFiles(new Path("/tmp", "abc"), files.toArray)
}
}

class FakeParentPathFileSystem extends RawLocalFileSystem {
Expand Down

0 comments on commit f6dd8e0

Please sign in to comment.