Skip to content

Commit

Permalink
[SPARK-17549][sql] Coalesce cached relation stats in driver.
Browse files Browse the repository at this point in the history
Currently there's a scalability problem with cached relations, in that
stats for every column, for each partition, are captured in the driver.
For large tables that leads to lots and lots of memory usage.

This change modifies the accumulator used to capture stats in the
driver to summarize the data as it arrives, instead of collecting
everything and then summarizing it.

Previously, for each column, the driver needed:

  (64 + 2 * sizeof(type)) * number of partitions

With the change, the driver requires a fixed 8 bytes per column.

On top of that, the change fixes a second problem dealing with how
statistics of cached relations that share stats with another one
(e.g. a cache projection of a cached relation) are calculated; previously,
the data would be wrong since the accumulator data would be summarized
based on the child output (while the data reflected the parent's output).
Now the calculation is done based on how the child's output maps to the
parent's output, yielding the correct size.
  • Loading branch information
Marcelo Vanzin committed Sep 21, 2016
1 parent 7e418e9 commit 5b3a65a
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.Statistics
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.CollectionAccumulator
import org.apache.spark.util.AccumulatorV2


object InMemoryRelation {
Expand All @@ -44,6 +44,70 @@ object InMemoryRelation {
new InMemoryRelation(child.output, useCompression, batchSize, storageLevel, child, tableName)()
}

/**
* Accumulator for storing column stats. Summarizes the data in the driver to curb the amount of
* memory being used. Only "sizeInBytes" for each column is kept.
*/
class ColStatsAccumulator(originalOutput: Seq[Attribute])
extends AccumulatorV2[Seq[ColumnStats], Array[Long]] {

private var stats: Array[Long] = null

override def isZero: Boolean = stats == null

override def copy(): AccumulatorV2[Seq[ColumnStats], Array[Long]] = {
val newAcc = new ColStatsAccumulator(originalOutput)
newAcc.stats = stats
newAcc
}

override def reset(): Unit = {
stats = null
}

override def add(update: Seq[ColumnStats]): Unit = {
if (update != null) {
require(isZero || stats.length == update.size, "Input stats doesn't match expected size.")

val newStats = new Array[Long](update.size)

update.toIndexedSeq.zipWithIndex.foreach { case (colStats, idx) =>
val current = if (!isZero) stats(idx) else 0L
newStats(idx) = current + colStats.sizeInBytes
}

stats = newStats
}
}

override def merge(other: AccumulatorV2[Seq[ColumnStats], Array[Long]]): Unit = {
if (other.value != null) {
require(isZero || stats.length == other.value.length,
"Merging accumulators of different size.")

val newStats = new Array[Long](other.value.length)
for (i <- 0 until other.value.size) {
val current = if (!isZero) stats(i) else 0L
newStats(i) = current + other.value(i)
}
stats = newStats
}
}

override def value: Array[Long] = stats

/**
* Calculate the size of the relation for a given output. Adds up all the known column sizes
* that match the desired output.
*/
def sizeForOutput(output: Seq[Attribute]): Long = {
originalOutput.toIndexedSeq.zipWithIndex.map { case (a, idx) =>
val count = output.count(a.semanticEquals)
stats(idx) * count
}.fold(0L)(_ + _)
}

}

/**
* CachedBatch is a cached batch of rows.
Expand All @@ -63,8 +127,7 @@ case class InMemoryRelation(
@transient child: SparkPlan,
tableName: Option[String])(
@transient var _cachedColumnBuffers: RDD[CachedBatch] = null,
val batchStats: CollectionAccumulator[InternalRow] =
child.sqlContext.sparkContext.collectionAccumulator[InternalRow])
_batchStats: ColStatsAccumulator = null)
extends logical.LeafNode with MultiInstanceRelation {

override protected def innerChildren: Seq[QueryPlan[_]] = Seq(child)
Expand All @@ -73,22 +136,23 @@ case class InMemoryRelation(

@transient val partitionStatistics = new PartitionStatistics(output)

val batchStats = if (_batchStats != null) {
_batchStats
} else {
val _newStats = new ColStatsAccumulator(output)
child.sqlContext.sparkContext.register(_newStats)
_newStats
}

override lazy val statistics: Statistics = {
if (batchStats.value.isEmpty) {
if (batchStats.isZero) {
// Underlying columnar RDD hasn't been materialized, no useful statistics information
// available, return the default statistics.
Statistics(sizeInBytes = child.sqlContext.conf.defaultSizeInBytes)
} else {
// Underlying columnar RDD has been materialized, required information has also been
// collected via the `batchStats` accumulator.
val sizeOfRow: Expression =
BindReferences.bindReference(
output.map(a => partitionStatistics.forAttribute(a).sizeInBytes).reduce(Add),
partitionStatistics.schema)

val sizeInBytes =
batchStats.value.asScala.map(row => sizeOfRow.eval(row).asInstanceOf[Long]).sum
Statistics(sizeInBytes = sizeInBytes)
Statistics(sizeInBytes = batchStats.sizeForOutput(output))
}
}

Expand Down Expand Up @@ -139,13 +203,13 @@ case class InMemoryRelation(
rowCount += 1
}

val stats = InternalRow.fromSeq(columnBuilders.map(_.columnStats.collectedStatistics)
.flatMap(_.values))

val stats = columnBuilders.map(_.columnStats)
batchStats.add(stats)

val statsRow = InternalRow.fromSeq(stats.map(_.collectedStatistics).flatMap(_.values))
CachedBatch(rowCount, columnBuilders.map { builder =>
JavaUtils.bufferToArray(builder.build())
}, stats)
}, statsRow)
}

def hasNext: Boolean = rowIterator.hasNext
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,4 +232,29 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
val columnTypes2 = List.fill(length2)(IntegerType)
val columnarIterator2 = GenerateColumnAccessor.generate(columnTypes2)
}

test("SPARK-17549: cached table size should be correctly calculated") {
val data = spark.sparkContext.parallelize(1 to 10, 5).map { i => (i, i.toLong) }
.toDF("col1", "col2")
val plan = spark.sessionState.executePlan(data.logicalPlan).sparkPlan
val cached = InMemoryRelation(true, 5, MEMORY_ONLY, plan, None)

// Materialize the data.
val expectedAnswer = data.collect()
checkAnswer(cached, expectedAnswer)

// Check that the right size was calculated.
val expectedColSizes = expectedAnswer.size * (INT.defaultSize + LONG.defaultSize)
assert(cached.statistics.sizeInBytes === expectedColSizes)

// Create a projection of the cached data and make sure the statistics are correct.
val projected = cached.withOutput(Seq(plan.output.last))
assert(projected.statistics.sizeInBytes === expectedAnswer.size * LONG.defaultSize)

// Create a silly projection that repeats columns of the first cached relation, and
// check that the size is calculated correctly.
val projected2 = cached.withOutput(Seq(plan.output.last, plan.output.last))
assert(projected2.statistics.sizeInBytes === 2 * expectedAnswer.size * LONG.defaultSize)
}

}

0 comments on commit 5b3a65a

Please sign in to comment.