Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-22673][SQL] InMemoryRelation should utilize existing stats whenever possible #19864

Closed
wants to merge 21 commits into from
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,16 @@ class CacheManager extends Logging {
logWarning("Asked to cache already cached data.")
} else {
val sparkSession = query.sparkSession
cachedData.add(CachedData(
planToCache,
InMemoryRelation(
sparkSession.sessionState.conf.useCompression,
sparkSession.sessionState.conf.columnBatchSize,
storageLevel,
sparkSession.sessionState.executePlan(planToCache).executedPlan,
tableName)))
val inMemoryRelation = InMemoryRelation(
sparkSession.sessionState.conf.useCompression,
sparkSession.sessionState.conf.columnBatchSize,
storageLevel,
sparkSession.sessionState.executePlan(planToCache).executedPlan,
tableName)
if (planToCache.conf.cboEnabled && planToCache.stats.rowCount.isDefined) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to limit to those conditions? I think we can pass the stats into the created InMemoryRelation even the two conditions don't match.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the reason I put it here is that when we did not enable CBO, the stats in the underlying plan might be much smaller than the actual size in memory leading to the potential risk of OOM error.

The underlying cause is that without CBO enabled, the size of the plan is calculated with BaseRelation's sizeInBytes, but with CBO, we can have a more accurate estimation,

override def computeStats(): Statistics = {
catalogTable
.flatMap(_.stats.map(_.toPlanStats(output, conf.cboEnabled)))
.getOrElse(Statistics(sizeInBytes = relation.sizeInBytes))
}

def toPlanStats(planOutput: Seq[Attribute], cboEnabled: Boolean): Statistics = {
if (cboEnabled && rowCount.isDefined) {
val attrStats = AttributeMap(planOutput.flatMap(a => colStats.get(a.name).map(a -> _)))
// Estimate size as number of rows * row size.
val size = EstimationUtils.getOutputSize(planOutput, rowCount.get, attrStats)
Statistics(sizeInBytes = size, rowCount = rowCount, attributeStats = attrStats)
} else {
// When CBO is disabled or the table doesn't have other statistics, we apply the size-only
// estimation strategy and only propagate sizeInBytes in statistics.
Statistics(sizeInBytes = sizeInBytes)
}
}

Copy link
Member

@viirya viirya Dec 6, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When CBO is disabled, don't we just set the sizeInBytes to defaultSizeInBytes? Is it different than current statistics of first run?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, if CBO is disabled, the relation's sizeInBytes is the file size

override def sizeInBytes: Long = location.sizeInBytes

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LogicalRelation uses the statistics from the relation only when there is no given catalogTable. In this case, it doesn't consider if CBO is enabled or not.

Only catalogTable considers CBO when computing its statistics in toPlanStats. It doesn't refer to relation's statistics, IIUC.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a catalog table doesn't have statistics in its metadata, we will fill it with defaultSizeInBytes.

val sizeInBytes = if (session.sessionState.conf.fallBackToHdfsForStatsEnabled) {
try {
val hadoopConf = session.sessionState.newHadoopConf()
val tablePath = new Path(table.location)
val fs: FileSystem = tablePath.getFileSystem(hadoopConf)
fs.getContentSummary(tablePath).getLength
} catch {
case e: IOException =>
logWarning("Failed to get table size from hdfs.", e)
session.sessionState.conf.defaultSizeInBytes
}
} else {
session.sessionState.conf.defaultSizeInBytes
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya you're right! Thanks for clearing the confusion

however, to prevent using relation's stats which can be much smaller than the in-memory size and lead to a potential OOM error, we should still have this condition here (we can remove cboEnabled though), right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The statistics from relation is based on files size, will it easily cause OOM issue? I think in the cases other than cached query, we still use this relation's statistics. If this is an issue, doesn't it also affect the other cases?

Copy link
Contributor Author

@CodingCat CodingCat Dec 6, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's true, it affects I believe....there is a similar discussion in #19743

inMemoryRelation.setStatsFromCachedPlan(planToCache)
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have to make InMemoryRelation stateful to avoid breaking APIs.....

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a more ideal change is to put the original plan stats into the constructor of InMemoryRelation, instead of making it mutable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like I have no way to access InMemoryRelation from outside of spark package, though it is not a package private class...how is that achieved?

If this is the case, I can modify the constructor

Thanks @cloud-fan

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

InMemoryRelation is not part of the public API and should be treated as unstable/internal. You can use it at your own risk. Changing the constructor is fine.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

cachedData.add(CachedData(planToCache, inMemoryRelation))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.Statistics
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.datasources.LogicalRelation
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused import.

import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.LongAccumulator


object InMemoryRelation {

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unnecessary change.

def apply(
useCompression: Boolean,
batchSize: Int,
Expand Down Expand Up @@ -71,14 +73,20 @@ case class InMemoryRelation(

override def computeStats(): Statistics = {
if (batchStats.value == 0L) {
// Underlying columnar RDD hasn't been materialized, no useful statistics information
// available, return the default statistics.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// Underlying columnar RDD hasn't been materialized, use the stats from the plan to cache.

Statistics(sizeInBytes = child.sqlContext.conf.defaultSizeInBytes)
inheritedStats.getOrElse(Statistics(sizeInBytes = child.sqlContext.conf.defaultSizeInBytes))
} else {
Statistics(sizeInBytes = batchStats.value.longValue)
}
}

private var inheritedStats: Option[Statistics] = _

private[execution] def setStatsFromCachedPlan(planToCache: LogicalPlan): Unit = {
require(planToCache.conf.cboEnabled, "you cannot use the stats of cached plan in" +
" InMemoryRelation without cbo enabled")
inheritedStats = Some(planToCache.stats)
}

// If the cached column buffers were not passed in, we calculate them in the constructor.
// As in Spark, the actual work of caching is lazy.
if (_cachedColumnBuffers == null) {
Expand Down