-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-22673][SQL] InMemoryRelation should utilize existing stats whenever possible #19864
Changes from all commits
b2fb1d2
0971900
32f7c74
2082c0e
bc70817
4650307
b6a36be
898f1b5
4c34701
8f912a3
064f6d1
385edc0
fbb3729
0170070
6299f3d
eb2dc0a
0fde46e
c2a92d4
4b2fcb6
b2d829b
de2905c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -37,8 +37,10 @@ object InMemoryRelation { | |
batchSize: Int, | ||
storageLevel: StorageLevel, | ||
child: SparkPlan, | ||
tableName: Option[String]): InMemoryRelation = | ||
new InMemoryRelation(child.output, useCompression, batchSize, storageLevel, child, tableName)() | ||
tableName: Option[String], | ||
statsOfPlanToCache: Statistics): InMemoryRelation = | ||
new InMemoryRelation(child.output, useCompression, batchSize, storageLevel, child, tableName)( | ||
statsOfPlanToCache = statsOfPlanToCache) | ||
} | ||
|
||
|
||
|
@@ -60,7 +62,8 @@ case class InMemoryRelation( | |
@transient child: SparkPlan, | ||
tableName: Option[String])( | ||
@transient var _cachedColumnBuffers: RDD[CachedBatch] = null, | ||
val batchStats: LongAccumulator = child.sqlContext.sparkContext.longAccumulator) | ||
val batchStats: LongAccumulator = child.sqlContext.sparkContext.longAccumulator, | ||
statsOfPlanToCache: Statistics = null) | ||
extends logical.LeafNode with MultiInstanceRelation { | ||
|
||
override protected def innerChildren: Seq[SparkPlan] = Seq(child) | ||
|
@@ -71,9 +74,8 @@ case class InMemoryRelation( | |
|
||
override def computeStats(): Statistics = { | ||
if (batchStats.value == 0L) { | ||
// Underlying columnar RDD hasn't been materialized, no useful statistics information | ||
// available, return the default statistics. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
Statistics(sizeInBytes = child.sqlContext.conf.defaultSizeInBytes) | ||
// Underlying columnar RDD hasn't been materialized, use the stats from the plan to cache | ||
statsOfPlanToCache | ||
} else { | ||
Statistics(sizeInBytes = batchStats.value.longValue) | ||
} | ||
|
@@ -142,7 +144,7 @@ case class InMemoryRelation( | |
def withOutput(newOutput: Seq[Attribute]): InMemoryRelation = { | ||
InMemoryRelation( | ||
newOutput, useCompression, batchSize, storageLevel, child, tableName)( | ||
_cachedColumnBuffers, batchStats) | ||
_cachedColumnBuffers, batchStats, statsOfPlanToCache) | ||
} | ||
|
||
override def newInstance(): this.type = { | ||
|
@@ -154,11 +156,12 @@ case class InMemoryRelation( | |
child, | ||
tableName)( | ||
_cachedColumnBuffers, | ||
batchStats).asInstanceOf[this.type] | ||
batchStats, | ||
statsOfPlanToCache).asInstanceOf[this.type] | ||
} | ||
|
||
def cachedColumnBuffers: RDD[CachedBatch] = _cachedColumnBuffers | ||
|
||
override protected def otherCopyArgs: Seq[AnyRef] = | ||
Seq(_cachedColumnBuffers, batchStats) | ||
Seq(_cachedColumnBuffers, batchStats, statsOfPlanToCache) | ||
} |
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -30,6 +30,7 @@ import org.apache.spark.sql.test.SharedSQLContext | |||||||||||||||||||||||||||||||||||||||||||||||||
import org.apache.spark.sql.test.SQLTestData._ | ||||||||||||||||||||||||||||||||||||||||||||||||||
import org.apache.spark.sql.types._ | ||||||||||||||||||||||||||||||||||||||||||||||||||
import org.apache.spark.storage.StorageLevel._ | ||||||||||||||||||||||||||||||||||||||||||||||||||
import org.apache.spark.util.Utils | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||
class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { | ||||||||||||||||||||||||||||||||||||||||||||||||||
import testImplicits._ | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -40,7 +41,8 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { | |||||||||||||||||||||||||||||||||||||||||||||||||
data.createOrReplaceTempView(s"testData$dataType") | ||||||||||||||||||||||||||||||||||||||||||||||||||
val storageLevel = MEMORY_ONLY | ||||||||||||||||||||||||||||||||||||||||||||||||||
val plan = spark.sessionState.executePlan(data.logicalPlan).sparkPlan | ||||||||||||||||||||||||||||||||||||||||||||||||||
val inMemoryRelation = InMemoryRelation(useCompression = true, 5, storageLevel, plan, None) | ||||||||||||||||||||||||||||||||||||||||||||||||||
val inMemoryRelation = InMemoryRelation(useCompression = true, 5, storageLevel, plan, None, | ||||||||||||||||||||||||||||||||||||||||||||||||||
data.logicalPlan.stats) | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||
assert(inMemoryRelation.cachedColumnBuffers.getStorageLevel == storageLevel) | ||||||||||||||||||||||||||||||||||||||||||||||||||
inMemoryRelation.cachedColumnBuffers.collect().head match { | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -116,7 +118,8 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { | |||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||
test("simple columnar query") { | ||||||||||||||||||||||||||||||||||||||||||||||||||
val plan = spark.sessionState.executePlan(testData.logicalPlan).sparkPlan | ||||||||||||||||||||||||||||||||||||||||||||||||||
val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan, None) | ||||||||||||||||||||||||||||||||||||||||||||||||||
val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan, None, | ||||||||||||||||||||||||||||||||||||||||||||||||||
testData.logicalPlan.stats) | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||
checkAnswer(scan, testData.collect().toSeq) | ||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -132,8 +135,10 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { | |||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||
test("projection") { | ||||||||||||||||||||||||||||||||||||||||||||||||||
val plan = spark.sessionState.executePlan(testData.select('value, 'key).logicalPlan).sparkPlan | ||||||||||||||||||||||||||||||||||||||||||||||||||
val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan, None) | ||||||||||||||||||||||||||||||||||||||||||||||||||
val logicalPlan = testData.select('value, 'key).logicalPlan | ||||||||||||||||||||||||||||||||||||||||||||||||||
val plan = spark.sessionState.executePlan(logicalPlan).sparkPlan | ||||||||||||||||||||||||||||||||||||||||||||||||||
val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan, None, | ||||||||||||||||||||||||||||||||||||||||||||||||||
logicalPlan.stats) | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||
checkAnswer(scan, testData.collect().map { | ||||||||||||||||||||||||||||||||||||||||||||||||||
case Row(key: Int, value: String) => value -> key | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -149,7 +154,8 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { | |||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||
test("SPARK-1436 regression: in-memory columns must be able to be accessed multiple times") { | ||||||||||||||||||||||||||||||||||||||||||||||||||
val plan = spark.sessionState.executePlan(testData.logicalPlan).sparkPlan | ||||||||||||||||||||||||||||||||||||||||||||||||||
val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan, None) | ||||||||||||||||||||||||||||||||||||||||||||||||||
val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan, None, | ||||||||||||||||||||||||||||||||||||||||||||||||||
testData.logicalPlan.stats) | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||
checkAnswer(scan, testData.collect().toSeq) | ||||||||||||||||||||||||||||||||||||||||||||||||||
checkAnswer(scan, testData.collect().toSeq) | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -323,7 +329,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { | |||||||||||||||||||||||||||||||||||||||||||||||||
test("SPARK-17549: cached table size should be correctly calculated") { | ||||||||||||||||||||||||||||||||||||||||||||||||||
val data = spark.sparkContext.parallelize(1 to 10, 5).toDF() | ||||||||||||||||||||||||||||||||||||||||||||||||||
val plan = spark.sessionState.executePlan(data.logicalPlan).sparkPlan | ||||||||||||||||||||||||||||||||||||||||||||||||||
val cached = InMemoryRelation(true, 5, MEMORY_ONLY, plan, None) | ||||||||||||||||||||||||||||||||||||||||||||||||||
val cached = InMemoryRelation(true, 5, MEMORY_ONLY, plan, None, data.logicalPlan.stats) | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||
// Materialize the data. | ||||||||||||||||||||||||||||||||||||||||||||||||||
val expectedAnswer = data.collect() | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -448,8 +454,8 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { | |||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||
test("SPARK-22249: buildFilter should not throw exception when In contains an empty list") { | ||||||||||||||||||||||||||||||||||||||||||||||||||
val attribute = AttributeReference("a", IntegerType)() | ||||||||||||||||||||||||||||||||||||||||||||||||||
val testRelation = InMemoryRelation(false, 1, MEMORY_ONLY, | ||||||||||||||||||||||||||||||||||||||||||||||||||
LocalTableScanExec(Seq(attribute), Nil), None) | ||||||||||||||||||||||||||||||||||||||||||||||||||
val localTableScanExec = LocalTableScanExec(Seq(attribute), Nil) | ||||||||||||||||||||||||||||||||||||||||||||||||||
val testRelation = InMemoryRelation(false, 1, MEMORY_ONLY, localTableScanExec, None, null) | ||||||||||||||||||||||||||||||||||||||||||||||||||
val tableScanExec = InMemoryTableScanExec(Seq(attribute), | ||||||||||||||||||||||||||||||||||||||||||||||||||
Seq(In(attribute, Nil)), testRelation) | ||||||||||||||||||||||||||||||||||||||||||||||||||
assert(tableScanExec.partitionFilters.isEmpty) | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -479,4 +485,43 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { | |||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||
test("SPARK-22673: InMemoryRelation should utilize existing stats of the plan to be cached") { | ||||||||||||||||||||||||||||||||||||||||||||||||||
withSQLConf("spark.sql.cbo.enabled" -> "true") { | ||||||||||||||||||||||||||||||||||||||||||||||||||
withTempPath { workDir => | ||||||||||||||||||||||||||||||||||||||||||||||||||
withTable("table1") { | ||||||||||||||||||||||||||||||||||||||||||||||||||
val workDirPath = workDir.getAbsolutePath | ||||||||||||||||||||||||||||||||||||||||||||||||||
val data = Seq(100, 200, 300, 400).toDF("count") | ||||||||||||||||||||||||||||||||||||||||||||||||||
data.write.parquet(workDirPath) | ||||||||||||||||||||||||||||||||||||||||||||||||||
val dfFromFile = spark.read.parquet(workDirPath).cache() | ||||||||||||||||||||||||||||||||||||||||||||||||||
val inMemoryRelation = dfFromFile.queryExecution.optimizedPlan.collect { | ||||||||||||||||||||||||||||||||||||||||||||||||||
case plan: InMemoryRelation => plan | ||||||||||||||||||||||||||||||||||||||||||||||||||
}.head | ||||||||||||||||||||||||||||||||||||||||||||||||||
// InMemoryRelation's stats is file size before the underlying RDD is materialized | ||||||||||||||||||||||||||||||||||||||||||||||||||
assert(inMemoryRelation.computeStats().sizeInBytes === 740) | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||
// InMemoryRelation's stats is updated after materializing RDD | ||||||||||||||||||||||||||||||||||||||||||||||||||
dfFromFile.collect() | ||||||||||||||||||||||||||||||||||||||||||||||||||
assert(inMemoryRelation.computeStats().sizeInBytes === 16) | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||
// test of catalog table | ||||||||||||||||||||||||||||||||||||||||||||||||||
val dfFromTable = spark.catalog.createTable("table1", workDirPath).cache() | ||||||||||||||||||||||||||||||||||||||||||||||||||
val inMemoryRelation2 = dfFromTable.queryExecution.optimizedPlan. | ||||||||||||||||||||||||||||||||||||||||||||||||||
collect { case plan: InMemoryRelation => plan }.head | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||
// Even CBO enabled, InMemoryRelation's stats keeps as the file size before table's stats | ||||||||||||||||||||||||||||||||||||||||||||||||||
// is calculated | ||||||||||||||||||||||||||||||||||||||||||||||||||
assert(inMemoryRelation2.computeStats().sizeInBytes === 740) | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||
// InMemoryRelation's stats should be updated after calculating stats of the table | ||||||||||||||||||||||||||||||||||||||||||||||||||
// clear cache to simulate a fresh environment | ||||||||||||||||||||||||||||||||||||||||||||||||||
dfFromTable.unpersist(blocking = true) | ||||||||||||||||||||||||||||||||||||||||||||||||||
spark.sql("ANALYZE TABLE table1 COMPUTE STATISTICS") | ||||||||||||||||||||||||||||||||||||||||||||||||||
val inMemoryRelation3 = spark.read.table("table1").cache().queryExecution.optimizedPlan. | ||||||||||||||||||||||||||||||||||||||||||||||||||
collect { case plan: InMemoryRelation => plan }.head | ||||||||||||||||||||||||||||||||||||||||||||||||||
assert(inMemoryRelation3.computeStats().sizeInBytes === 48) | ||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. missed this one, why does it have a different stats than the table cache stats There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. because 16 is the 48 is calculated by EstimationUtils: Lines 65 to 88 in bdb5e55
(8 + 4 (sum of average attribute length)) * 4
|
||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why we set this to
null
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
eh...we do not have other options, it's more like a placeholder, since InMemoryRelation is created by CacheManager through apply() in companion object it's no harm here IMHO