Skip to content

Commit

Permalink
set stats in cachedmanager
Browse files Browse the repository at this point in the history
  • Loading branch information
CodingCat committed Dec 2, 2017
1 parent 2082c0e commit bc70817
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,16 @@ class CacheManager extends Logging {
logWarning("Asked to cache already cached data.")
} else {
val sparkSession = query.sparkSession
cachedData.add(CachedData(
planToCache,
InMemoryRelation(
sparkSession.sessionState.conf.useCompression,
sparkSession.sessionState.conf.columnBatchSize,
storageLevel,
sparkSession.sessionState.executePlan(planToCache).executedPlan,
tableName)))
val inMemoryRelation = InMemoryRelation(
sparkSession.sessionState.conf.useCompression,
sparkSession.sessionState.conf.columnBatchSize,
storageLevel,
sparkSession.sessionState.executePlan(planToCache).executedPlan,
tableName)
if (planToCache.conf.cboEnabled && planToCache.stats.rowCount.isDefined) {
inMemoryRelation.setStatsFromCachedPlan(planToCache)
}
cachedData.add(CachedData(planToCache, inMemoryRelation))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,15 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.Statistics
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.LongAccumulator


object InMemoryRelation {

def apply(
useCompression: Boolean,
batchSize: Int,
Expand Down Expand Up @@ -73,22 +73,20 @@ case class InMemoryRelation(

override def computeStats(): Statistics = {
if (batchStats.value == 0L) {
children.filter(_.isInstanceOf[LogicalRelation]) match {
case Seq(c @ LogicalRelation(_, _, _, _), _) if c.conf.cboEnabled =>
val stats = c.computeStats()
if (stats.rowCount.isDefined) {
stats
} else {
Statistics(sizeInBytes = child.sqlContext.conf.defaultSizeInBytes)
}
case _ =>
Statistics(sizeInBytes = child.sqlContext.conf.defaultSizeInBytes)
}
inheritedStats.getOrElse(Statistics(sizeInBytes = child.sqlContext.conf.defaultSizeInBytes))
} else {
Statistics(sizeInBytes = batchStats.value.longValue)
}
}

private var inheritedStats: Option[Statistics] = _

private[execution] def setStatsFromCachedPlan(planToCache: LogicalPlan): Unit = {
require(planToCache.conf.cboEnabled, "you cannot use the stats of cached plan in" +
" InMemoryRelation without cbo enabled")
inheritedStats = Some(planToCache.stats)
}

// If the cached column buffers were not passed in, we calculate them in the constructor.
// As in Spark, the actual work of caching is lazy.
if (_cachedColumnBuffers == null) {
Expand Down

0 comments on commit bc70817

Please sign in to comment.