Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-22673][SQL] InMemoryRelation should utilize existing stats whenever possible #19864

Closed
wants to merge 21 commits into from

Conversation

CodingCat
Copy link
Contributor

What changes were proposed in this pull request?

The current implementation of InMemoryRelation always uses the most expensive execution plan when writing cache
With CBO enabled, we can actually have a more exact estimation of the underlying table size...

How was this patch tested?

existing test

@CodingCat
Copy link
Contributor Author

@SparkQA
Copy link

SparkQA commented Dec 1, 2017

Test build #84382 has finished for PR 19864 at commit 32f7c74.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 2, 2017

Test build #84384 has finished for PR 19864 at commit 2082c0e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

// available, return the default statistics.
Statistics(sizeInBytes = child.sqlContext.conf.defaultSizeInBytes)
children.filter(_.isInstanceOf[LogicalRelation]) match {
case Seq(c @ LogicalRelation(_, _, _, _), _) if c.conf.cboEnabled =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

InMemoryRelation is a logical.LeafNode. I think it has no children?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, @viirya , yes, you're right! I misread the generated plan, working on it

@CodingCat CodingCat changed the title [SPARK-22673][SQL] InMemoryRelation should utilize on-disk table stats whenever possible [WIP][SPARK-22673][SQL] InMemoryRelation should utilize on-disk table stats whenever possible Dec 2, 2017
@CodingCat CodingCat changed the title [WIP][SPARK-22673][SQL] InMemoryRelation should utilize on-disk table stats whenever possible [SPARK-22673][SQL] InMemoryRelation should utilize on-disk table stats whenever possible Dec 2, 2017
tableName)
if (planToCache.conf.cboEnabled && planToCache.stats.rowCount.isDefined) {
inMemoryRelation.setStatsFromCachedPlan(planToCache)
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have to make InMemoryRelation stateful to avoid breaking APIs.....

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a more ideal change is to put the original plan stats into the constructor of InMemoryRelation, instead of making it mutable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like I have no way to access InMemoryRelation from outside of spark package, though it is not a package private class...how is that achieved?

If this is the case, I can modify the constructor

Thanks @cloud-fan

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

InMemoryRelation is not part of the public API and should be treated as unstable/internal. You can use it at your own risk. Changing the constructor is fine.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@CodingCat CodingCat changed the title [SPARK-22673][SQL] InMemoryRelation should utilize on-disk table stats whenever possible [SPARK-22673][SQL] InMemoryRelation should utilize existing stats whenever possible Dec 2, 2017
@SparkQA
Copy link

SparkQA commented Dec 2, 2017

Test build #84390 has finished for PR 19864 at commit bc70817.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 2, 2017

Test build #84392 has finished for PR 19864 at commit 4650307.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member

viirya commented Dec 3, 2017 via email

@CodingCat
Copy link
Contributor Author

@viirya yes, we can get more accurate stats later, however, the first stats is also important as it enables the user to pay less for the first run which writes cache.

The current implementation always chooses the most expensive plan in the first run, e.g. always resort to sortmergejoin instead of broadcastjoin even it is possible, CBO is actually disabled for any operator which locates in downstream of InMemoryRelation. Additionally, it makes execution plan inconsistent even for the same query over the same dataset. Of course, all of these issues happen in the first run.

IMHO, we have a chance to make it better, why not?

@CodingCat
Copy link
Contributor Author

@viirya any thoughts?

storageLevel,
sparkSession.sessionState.executePlan(planToCache).executedPlan,
tableName)
if (planToCache.conf.cboEnabled && planToCache.stats.rowCount.isDefined) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to limit to those conditions? I think we can pass the stats into the created InMemoryRelation even the two conditions don't match.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the reason I put it here is that when we did not enable CBO, the stats in the underlying plan might be much smaller than the actual size in memory leading to the potential risk of OOM error.

The underlying cause is that without CBO enabled, the size of the plan is calculated with BaseRelation's sizeInBytes, but with CBO, we can have a more accurate estimation,

override def computeStats(): Statistics = {
catalogTable
.flatMap(_.stats.map(_.toPlanStats(output, conf.cboEnabled)))
.getOrElse(Statistics(sizeInBytes = relation.sizeInBytes))
}

def toPlanStats(planOutput: Seq[Attribute], cboEnabled: Boolean): Statistics = {
if (cboEnabled && rowCount.isDefined) {
val attrStats = AttributeMap(planOutput.flatMap(a => colStats.get(a.name).map(a -> _)))
// Estimate size as number of rows * row size.
val size = EstimationUtils.getOutputSize(planOutput, rowCount.get, attrStats)
Statistics(sizeInBytes = size, rowCount = rowCount, attributeStats = attrStats)
} else {
// When CBO is disabled or the table doesn't have other statistics, we apply the size-only
// estimation strategy and only propagate sizeInBytes in statistics.
Statistics(sizeInBytes = sizeInBytes)
}
}

Copy link
Member

@viirya viirya Dec 6, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When CBO is disabled, don't we just set the sizeInBytes to defaultSizeInBytes? Is it different than current statistics of first run?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, if CBO is disabled, the relation's sizeInBytes is the file size

override def sizeInBytes: Long = location.sizeInBytes

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LogicalRelation uses the statistics from the relation only when there is no given catalogTable. In this case, it doesn't consider if CBO is enabled or not.

Only catalogTable considers CBO when computing its statistics in toPlanStats. It doesn't refer to relation's statistics, IIUC.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a catalog table doesn't have statistics in its metadata, we will fill it with defaultSizeInBytes.

val sizeInBytes = if (session.sessionState.conf.fallBackToHdfsForStatsEnabled) {
try {
val hadoopConf = session.sessionState.newHadoopConf()
val tablePath = new Path(table.location)
val fs: FileSystem = tablePath.getFileSystem(hadoopConf)
fs.getContentSummary(tablePath).getLength
} catch {
case e: IOException =>
logWarning("Failed to get table size from hdfs.", e)
session.sessionState.conf.defaultSizeInBytes
}
} else {
session.sessionState.conf.defaultSizeInBytes
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya you're right! Thanks for clearing the confusion

however, to prevent using relation's stats which can be much smaller than the in-memory size and lead to a potential OOM error, we should still have this condition here (we can remove cboEnabled though), right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The statistics from relation is based on files size, will it easily cause OOM issue? I think in the cases other than cached query, we still use this relation's statistics. If this is an issue, doesn't it also affect the other cases?

Copy link
Contributor Author

@CodingCat CodingCat Dec 6, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's true, it affects I believe....there is a similar discussion in #19743

import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.LongAccumulator


object InMemoryRelation {

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unnecessary change.

import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.datasources.LogicalRelation
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused import.

@SparkQA
Copy link

SparkQA commented Dec 5, 2017

Test build #84494 has finished for PR 19864 at commit b6a36be.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@CodingCat
Copy link
Contributor Author

thanks @viirya @cloud-fan and @hvanhovell, just addressed the comments and answered the question

@SparkQA
Copy link

SparkQA commented Dec 5, 2017

Test build #84496 has finished for PR 19864 at commit 898f1b5.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 5, 2017

Test build #84499 has finished for PR 19864 at commit 4c34701.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.


test("SPARK-22673: InMemoryRelation should utilize existing stats of the plan to be cached") {
withSQLConf("spark.sql.cbo.enabled" -> "true") {
val workDir = s"${Utils.createTempDir()}/table1"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: use withTempDir

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

assert(inMemoryRelation.computeStats().sizeInBytes === 16)

// test of catalog table
val dfFromTable = spark.catalog.createTable("table1", workDir).cache()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: wrap with withTable, which will clean up the table automatically at the end.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


// InMemoryRelation's stats should be updated after calculating stats of the table
spark.sql("ANALYZE TABLE table1 COMPUTE STATISTICS")
assert(inMemoryRelation2.computeStats().sizeInBytes === 16)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happened here? InMemoryRelation.statsOfPlanToCache gets updated aotumatically?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it was a mistake here, AnalyzeTableCommand would actually force the table to be evaluated with count(), and it actually hits the longAccumulator's value,

fixed in the latest commit

@SparkQA
Copy link

SparkQA commented Dec 18, 2017

Test build #85030 has finished for PR 19864 at commit b2d829b.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@CodingCat
Copy link
Contributor Author

retest this please

@HyukjinKwon
Copy link
Member

HyukjinKwon commented Dec 18, 2017

Let's hold retesting on. Seems globally failing.

@SparkQA
Copy link

SparkQA commented Dec 18, 2017

Test build #85035 has finished for PR 19864 at commit b2d829b.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

retest this please

withSQLConf("spark.sql.cbo.enabled" -> "true") {
withTempDir { workDir =>
withTable("table1") {
val workDirPath = workDir.getAbsolutePath + "/table1"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems you can use withTempPath, which just gives you a path string without creating it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then we don't need to append /table1 to the path

@cloud-fan
Copy link
Contributor

LGTM except one comment

@SparkQA
Copy link

SparkQA commented Dec 18, 2017

Test build #85051 has finished for PR 19864 at commit b2d829b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 18, 2017

Test build #85067 has finished for PR 19864 at commit de2905c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

spark.sql("ANALYZE TABLE table1 COMPUTE STATISTICS")
val inMemoryRelation3 = spark.read.table("table1").cache().queryExecution.optimizedPlan.
collect { case plan: InMemoryRelation => plan }.head
assert(inMemoryRelation3.computeStats().sizeInBytes === 48)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missed this one, why does it have a different stats than the table cache stats 16?

Copy link
Contributor Author

@CodingCat CodingCat Dec 19, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because 16 is the exact in-memory size which is got by reading the accumulator's value after evaluating the RDD

48 is calculated by EstimationUtils:

def getOutputSize(
attributes: Seq[Attribute],
outputRowCount: BigInt,
attrStats: AttributeMap[ColumnStat] = AttributeMap(Nil)): BigInt = {
// We assign a generic overhead for a Row object, the actual overhead is different for different
// Row format.
val sizePerRow = 8 + attributes.map { attr =>
if (attrStats.contains(attr)) {
attr.dataType match {
case StringType =>
// UTF8String: base + offset + numBytes
attrStats(attr).avgLen + 8 + 4
case _ =>
attrStats(attr).avgLen
}
} else {
attr.dataType.defaultSize
}
}.sum
// Output size can't be zero, or sizeInBytes of BinaryNode will also be zero
// (simple computation of statistics returns product of children).
if (outputRowCount > 0) outputRowCount * sizePerRow else 1
}
(8 + 4 (sum of average attribute length)) * 4

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@asfgit asfgit closed this in ab7346f Dec 19, 2017
@CodingCat
Copy link
Contributor Author

thanks

@CodingCat CodingCat deleted the SPARK-22673 branch December 19, 2017 17:07
ghost pushed a commit to dbtsai/spark that referenced this pull request Jan 13, 2018
…tion's size

## What changes were proposed in this pull request?

as per discussion in apache#19864 (comment)

the current HadoopFsRelation is purely based on the underlying file size which is not accurate and makes the execution vulnerable to errors like OOM

Users can enable CBO with the functionalities in apache#19864 to avoid this issue

This JIRA proposes to add a configurable factor to sizeInBytes method in HadoopFsRelation class so that users can mitigate this problem without CBO

## How was this patch tested?

Existing tests

Author: CodingCat <zhunansjtu@gmail.com>
Author: Nan Zhu <nanzhu@uber.com>

Closes apache#20072 from CodingCat/SPARK-22790.
asfgit pushed a commit that referenced this pull request Jan 13, 2018
…tion's size

## What changes were proposed in this pull request?

as per discussion in #19864 (comment)

the current HadoopFsRelation is purely based on the underlying file size which is not accurate and makes the execution vulnerable to errors like OOM

Users can enable CBO with the functionalities in #19864 to avoid this issue

This JIRA proposes to add a configurable factor to sizeInBytes method in HadoopFsRelation class so that users can mitigate this problem without CBO

## How was this patch tested?

Existing tests

Author: CodingCat <zhunansjtu@gmail.com>
Author: Nan Zhu <nanzhu@uber.com>

Closes #20072 from CodingCat/SPARK-22790.

(cherry picked from commit ba891ec)
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
@@ -60,7 +62,8 @@ case class InMemoryRelation(
@transient child: SparkPlan,
tableName: Option[String])(
@transient var _cachedColumnBuffers: RDD[CachedBatch] = null,
val batchStats: LongAccumulator = child.sqlContext.sparkContext.longAccumulator)
val batchStats: LongAccumulator = child.sqlContext.sparkContext.longAccumulator,
statsOfPlanToCache: Statistics = null)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we set this to null?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

eh...we do not have other options, it's more like a placeholder, since InMemoryRelation is created by CacheManager through apply() in companion object it's no harm here IMHO

ghost pushed a commit to dbtsai/spark that referenced this pull request Jan 27, 2018
## What changes were proposed in this pull request?

This is a regression introduced by apache#19864

When we lookup cache, we should not carry the hint info, as this cache entry might be added by a plan having hint info, while the input plan for this lookup may not have hint info, or have different hint info.

## How was this patch tested?

a new test.

Author: Wenchen Fan <wenchen@databricks.com>

Closes apache#20394 from cloud-fan/cache.
asfgit pushed a commit that referenced this pull request Jan 27, 2018
## What changes were proposed in this pull request?

This is a regression introduced by #19864

When we lookup cache, we should not carry the hint info, as this cache entry might be added by a plan having hint info, while the input plan for this lookup may not have hint info, or have different hint info.

## How was this patch tested?

a new test.

Author: Wenchen Fan <wenchen@databricks.com>

Closes #20394 from cloud-fan/cache.

(cherry picked from commit 5b5447c)
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
7 participants