From 20a6043643c073a128c8fa70bf2a8602b1c1537e Mon Sep 17 00:00:00 2001 From: lianhuiwang Date: Mon, 5 Jun 2017 23:09:38 +0800 Subject: [PATCH 1/7] init commit --- .../PruneFileSourcePartitions.scala | 7 +++-- .../PruneFileSourcePartitionsSuite.scala | 30 +++++++++++++++++++ 2 files changed, 35 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala index 905b8683e10bd..6dbb46ec37585 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.datasources +import org.apache.spark.sql.catalyst.catalog.CatalogStatistics import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} @@ -59,8 +60,10 @@ private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] { val prunedFileIndex = catalogFileIndex.filterPartitions(partitionKeyFilters.toSeq) val prunedFsRelation = fsRelation.copy(location = prunedFileIndex)(sparkSession) - val prunedLogicalRelation = logicalRelation.copy(relation = prunedFsRelation) - + val withStats = logicalRelation.catalogTable.map(_.copy( + stats = Some(CatalogStatistics(sizeInBytes = BigInt(prunedFileIndex.sizeInBytes))))) + val prunedLogicalRelation = logicalRelation.copy( + relation = prunedFsRelation, catalogTable = withStats) // Keep partition-pruning predicates so that they are visible in physical planning val filterExpression = filters.reduceLeft(And) val filter = Filter(filterExpression, prunedLogicalRelation) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala index f818e29555468..d906eda2708b5 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, HadoopFsRelation, LogicalRelation, PruneFileSourcePartitions} import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SQLTestUtils import org.apache.spark.sql.types.StructType @@ -66,4 +67,33 @@ class PruneFileSourcePartitionsSuite extends QueryTest with SQLTestUtils with Te } } } + + test("SPARK-20986 Reset table's statistics after PruneFileSourcePartitions rule") { + withTempView("tempTbl", "partTbl") { + spark.range(1000).selectExpr("id").createOrReplaceTempView("tempTbl") + sql("CREATE TABLE partTbl (id INT) PARTITIONED BY (part INT) STORED AS parquet") + for (part <- Seq(1, 2, 3)) { + sql( + s""" + |INSERT OVERWRITE TABLE partTbl PARTITION (part='$part') + |select id from tempTbl + """.stripMargin) + } + + withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> "true") { + val df = sql("SELECT * FROM partTbl where part = 1") + val query = df.queryExecution.analyzed.analyze + val sizes1 = query.collect { + case relation: LogicalRelation => relation.computeStats(conf).sizeInBytes + } + assert(sizes1.size === 1, s"Size wrong for:\n ${df.queryExecution}") + assert(sizes1(0) > 5000, s"expected > 5000 for test table 'src', got: ${sizes1(0)}") + val sizes2 = Optimize.execute(query).collect { + case relation: LogicalRelation => relation.computeStats(conf).sizeInBytes + } + assert(sizes2.size === 1, s"Size wrong for:\n ${df.queryExecution}") + assert(sizes2(0) < 5000, s"expected < 5000 for test table 'src', got: ${sizes2(0)}") + } + } + } } From c53a0c7a304e6a12548a047fd08786a174ed1479 Mon Sep 17 00:00:00 2001 From: lianhuiwang Date: Tue, 6 Jun 2017 08:53:36 +0800 Subject: [PATCH 2/7] address comments. --- .../PruneFileSourcePartitionsSuite.scala | 44 ++++++++++--------- 1 file changed, 23 insertions(+), 21 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala index d906eda2708b5..3e31231e6beb7 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala @@ -69,30 +69,32 @@ class PruneFileSourcePartitionsSuite extends QueryTest with SQLTestUtils with Te } test("SPARK-20986 Reset table's statistics after PruneFileSourcePartitions rule") { - withTempView("tempTbl", "partTbl") { - spark.range(1000).selectExpr("id").createOrReplaceTempView("tempTbl") - sql("CREATE TABLE partTbl (id INT) PARTITIONED BY (part INT) STORED AS parquet") - for (part <- Seq(1, 2, 3)) { - sql( - s""" - |INSERT OVERWRITE TABLE partTbl PARTITION (part='$part') - |select id from tempTbl + withTempView("tempTbl") { + withTable("partTbl") { + spark.range(1000).selectExpr("id").createOrReplaceTempView("tempTbl") + sql("CREATE TABLE partTbl (id INT) PARTITIONED BY (part INT) STORED AS parquet") + for (part <- Seq(1, 2, 3)) { + sql( + s""" + |INSERT OVERWRITE TABLE partTbl PARTITION (part='$part') + |select id from tempTbl """.stripMargin) - } - - withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> "true") { - val df = sql("SELECT * FROM partTbl where part = 1") - val query = df.queryExecution.analyzed.analyze - val sizes1 = query.collect { - case relation: LogicalRelation => relation.computeStats(conf).sizeInBytes } - assert(sizes1.size === 1, s"Size wrong for:\n ${df.queryExecution}") - assert(sizes1(0) > 5000, s"expected > 5000 for test table 'src', got: ${sizes1(0)}") - val sizes2 = Optimize.execute(query).collect { - case relation: LogicalRelation => relation.computeStats(conf).sizeInBytes + + withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> "true") { + val df = sql("SELECT * FROM partTbl where part = 1") + val query = df.queryExecution.analyzed.analyze + val sizes1 = query.collect { + case relation: LogicalRelation => relation.computeStats(conf).sizeInBytes + } + assert(sizes1.size === 1, s"Size wrong for:\n ${df.queryExecution}") + assert(sizes1(0) > 5000, s"expected > 5000 for test table 'src', got: ${sizes1(0)}") + val sizes2 = Optimize.execute(query).collect { + case relation: LogicalRelation => relation.computeStats(conf).sizeInBytes + } + assert(sizes2.size === 1, s"Size wrong for:\n ${df.queryExecution}") + assert(sizes2(0) < 5000, s"expected < 5000 for test table 'src', got: ${sizes2(0)}") } - assert(sizes2.size === 1, s"Size wrong for:\n ${df.queryExecution}") - assert(sizes2(0) < 5000, s"expected < 5000 for test table 'src', got: ${sizes2(0)}") } } } From 120662e4e2834cf3c191357a29da56b81b7b9c05 Mon Sep 17 00:00:00 2001 From: lianhuiwang Date: Sun, 11 Jun 2017 16:46:07 +0800 Subject: [PATCH 3/7] address comments. --- .../datasources/PruneFileSourcePartitions.scala | 1 + .../execution/PruneFileSourcePartitionsSuite.scala | 12 ++++++++++-- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala index 6dbb46ec37585..f5df1848a38c4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala @@ -60,6 +60,7 @@ private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] { val prunedFileIndex = catalogFileIndex.filterPartitions(partitionKeyFilters.toSeq) val prunedFsRelation = fsRelation.copy(location = prunedFileIndex)(sparkSession) + // Change table stats based on the sizeInBytes of pruned files val withStats = logicalRelation.catalogTable.map(_.copy( stats = Some(CatalogStatistics(sizeInBytes = BigInt(prunedFileIndex.sizeInBytes))))) val prunedLogicalRelation = logicalRelation.copy( diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala index 3e31231e6beb7..4f2ecf228d26a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.hive.execution import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} @@ -81,6 +82,13 @@ class PruneFileSourcePartitionsSuite extends QueryTest with SQLTestUtils with Te """.stripMargin) } + val tableName = "partTbl" + sql(s"analyze table partTbl compute STATISTICS") + + val tableStats = + spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName)).stats + assert(tableStats.isDefined && tableStats.get.sizeInBytes > 0, "tableStats is lost") + withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> "true") { val df = sql("SELECT * FROM partTbl where part = 1") val query = df.queryExecution.analyzed.analyze @@ -88,12 +96,12 @@ class PruneFileSourcePartitionsSuite extends QueryTest with SQLTestUtils with Te case relation: LogicalRelation => relation.computeStats(conf).sizeInBytes } assert(sizes1.size === 1, s"Size wrong for:\n ${df.queryExecution}") - assert(sizes1(0) > 5000, s"expected > 5000 for test table 'src', got: ${sizes1(0)}") + assert(sizes1(0) == tableStats.get.sizeInBytes) val sizes2 = Optimize.execute(query).collect { case relation: LogicalRelation => relation.computeStats(conf).sizeInBytes } assert(sizes2.size === 1, s"Size wrong for:\n ${df.queryExecution}") - assert(sizes2(0) < 5000, s"expected < 5000 for test table 'src', got: ${sizes2(0)}") + assert(sizes2(0) < tableStats.get.sizeInBytes) } } } From f7c3dfc47fa196ab428fbd7ddc49cff92d2258c0 Mon Sep 17 00:00:00 2001 From: lianhuiwang Date: Mon, 12 Jun 2017 20:13:51 +0800 Subject: [PATCH 4/7] address comments. --- .../hive/execution/PruneFileSourcePartitionsSuite.scala | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala index 4f2ecf228d26a..4f758dee626a5 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala @@ -72,9 +72,9 @@ class PruneFileSourcePartitionsSuite extends QueryTest with SQLTestUtils with Te test("SPARK-20986 Reset table's statistics after PruneFileSourcePartitions rule") { withTempView("tempTbl") { withTable("partTbl") { - spark.range(1000).selectExpr("id").createOrReplaceTempView("tempTbl") + spark.range(10).selectExpr("id").createOrReplaceTempView("tempTbl") sql("CREATE TABLE partTbl (id INT) PARTITIONED BY (part INT) STORED AS parquet") - for (part <- Seq(1, 2, 3)) { + for (part <- Seq(1, 2)) { sql( s""" |INSERT OVERWRITE TABLE partTbl PARTITION (part='$part') @@ -83,8 +83,7 @@ class PruneFileSourcePartitionsSuite extends QueryTest with SQLTestUtils with Te } val tableName = "partTbl" - sql(s"analyze table partTbl compute STATISTICS") - + sql(s"ANALYZE TABLE partTbl COMPUTE STATISTICS") val tableStats = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName)).stats assert(tableStats.isDefined && tableStats.get.sizeInBytes > 0, "tableStats is lost") @@ -98,7 +97,7 @@ class PruneFileSourcePartitionsSuite extends QueryTest with SQLTestUtils with Te assert(sizes1.size === 1, s"Size wrong for:\n ${df.queryExecution}") assert(sizes1(0) == tableStats.get.sizeInBytes) val sizes2 = Optimize.execute(query).collect { - case relation: LogicalRelation => relation.computeStats(conf).sizeInBytes + case relation: LogicalRelation => relation.catalogTable.get.stats.get.sizeInBytes } assert(sizes2.size === 1, s"Size wrong for:\n ${df.queryExecution}") assert(sizes2(0) < tableStats.get.sizeInBytes) From d1513a8174bb5095b3866d8de6a77b99d5bf10ae Mon Sep 17 00:00:00 2001 From: lianhuiwang Date: Tue, 13 Jun 2017 09:11:10 +0800 Subject: [PATCH 5/7] address comments for UT. --- .../execution/PruneFileSourcePartitionsSuite.scala | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala index 4f758dee626a5..3cdf534fe1c5d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala @@ -92,15 +92,19 @@ class PruneFileSourcePartitionsSuite extends QueryTest with SQLTestUtils with Te val df = sql("SELECT * FROM partTbl where part = 1") val query = df.queryExecution.analyzed.analyze val sizes1 = query.collect { - case relation: LogicalRelation => relation.computeStats(conf).sizeInBytes + case relation: LogicalRelation => relation.catalogTable.get.stats.get.sizeInBytes } assert(sizes1.size === 1, s"Size wrong for:\n ${df.queryExecution}") assert(sizes1(0) == tableStats.get.sizeInBytes) - val sizes2 = Optimize.execute(query).collect { - case relation: LogicalRelation => relation.catalogTable.get.stats.get.sizeInBytes + val relations = Optimize.execute(query).collect { + case relation: LogicalRelation => relation } - assert(sizes2.size === 1, s"Size wrong for:\n ${df.queryExecution}") - assert(sizes2(0) < tableStats.get.sizeInBytes) + assert(relations.size === 1, s"Size wrong for:\n ${df.queryExecution}") + val size2 = relations(0).computeStats(conf).sizeInBytes + val size3 = relations(0).catalogTable.get.stats.get.sizeInBytes + assert(size2 == size3) + assert(size2 < tableStats.get.sizeInBytes) + assert(size3 < tableStats.get.sizeInBytes) } } } From 6e28460e9e741d627f95bd461bd6b97fa7e4abf6 Mon Sep 17 00:00:00 2001 From: lianhuiwang Date: Tue, 13 Jun 2017 20:07:56 +0800 Subject: [PATCH 6/7] address comments for UT. --- .../PruneFileSourcePartitionsSuite.scala | 51 +++++++------------ 1 file changed, 17 insertions(+), 34 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala index 3cdf534fe1c5d..abd1330acf1ff 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala @@ -70,43 +70,26 @@ class PruneFileSourcePartitionsSuite extends QueryTest with SQLTestUtils with Te } test("SPARK-20986 Reset table's statistics after PruneFileSourcePartitions rule") { - withTempView("tempTbl") { - withTable("partTbl") { - spark.range(10).selectExpr("id").createOrReplaceTempView("tempTbl") - sql("CREATE TABLE partTbl (id INT) PARTITIONED BY (part INT) STORED AS parquet") - for (part <- Seq(1, 2)) { - sql( - s""" - |INSERT OVERWRITE TABLE partTbl PARTITION (part='$part') - |select id from tempTbl - """.stripMargin) - } + withTable("tbl") { + spark.range(10).selectExpr("id", "id % 3 as p").write.partitionBy("p").saveAsTable("tbl") + sql(s"ANALYZE TABLE tbl COMPUTE STATISTICS") + val tableStats = spark.sessionState.catalog.getTableMetadata(TableIdentifier("tbl")).stats + assert(tableStats.isDefined && tableStats.get.sizeInBytes > 0, "tableStats is lost") - val tableName = "partTbl" - sql(s"ANALYZE TABLE partTbl COMPUTE STATISTICS") - val tableStats = - spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName)).stats - assert(tableStats.isDefined && tableStats.get.sizeInBytes > 0, "tableStats is lost") + val df = sql("SELECT * FROM tbl WHERE p = 1") + val sizes1 = df.queryExecution.analyzed.collect { + case relation: LogicalRelation => relation.catalogTable.get.stats.get.sizeInBytes + } + assert(sizes1.size === 1, s"Size wrong for:\n ${df.queryExecution}") + assert(sizes1(0) == tableStats.get.sizeInBytes) - withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> "true") { - val df = sql("SELECT * FROM partTbl where part = 1") - val query = df.queryExecution.analyzed.analyze - val sizes1 = query.collect { - case relation: LogicalRelation => relation.catalogTable.get.stats.get.sizeInBytes - } - assert(sizes1.size === 1, s"Size wrong for:\n ${df.queryExecution}") - assert(sizes1(0) == tableStats.get.sizeInBytes) - val relations = Optimize.execute(query).collect { - case relation: LogicalRelation => relation - } - assert(relations.size === 1, s"Size wrong for:\n ${df.queryExecution}") - val size2 = relations(0).computeStats(conf).sizeInBytes - val size3 = relations(0).catalogTable.get.stats.get.sizeInBytes - assert(size2 == size3) - assert(size2 < tableStats.get.sizeInBytes) - assert(size3 < tableStats.get.sizeInBytes) - } + val relations = df.queryExecution.optimizedPlan.collect { + case relation: LogicalRelation => relation } + assert(relations.size === 1, s"Size wrong for:\n ${df.queryExecution}") + val size2 = relations(0).computeStats(conf).sizeInBytes + assert(size2 == relations(0).catalogTable.get.stats.get.sizeInBytes) + assert(size2 < tableStats.get.sizeInBytes) } } } From 16a3f7e08645d06838d0301a5c00c9c16bd3a127 Mon Sep 17 00:00:00 2001 From: lianhuiwang Date: Tue, 13 Jun 2017 20:10:01 +0800 Subject: [PATCH 7/7] remove unused import. --- .../sql/hive/execution/PruneFileSourcePartitionsSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala index abd1330acf1ff..d91f25a4da013 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala @@ -26,7 +26,6 @@ import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, HadoopFsRelation, LogicalRelation, PruneFileSourcePartitions} import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.hive.test.TestHiveSingleton -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SQLTestUtils import org.apache.spark.sql.types.StructType