From 647b3b8b07822200c973f071654d1173d635bf5d Mon Sep 17 00:00:00 2001 From: Bruce Robbins Date: Wed, 25 Jul 2018 17:36:17 -0700 Subject: [PATCH 01/13] WIP version --- .../apache/spark/sql/internal/SQLConf.scala | 35 ++++++++++++++ .../PruneFileSourcePartitions.scala | 18 +++++++- .../sql/hive/client/HiveClientImpl.scala | 28 ++++++++--- .../spark/sql/hive/StatisticsSuite.scala | 46 +++++++++++++++++++ .../PruneFileSourcePartitionsSuite.scala | 32 +++++++++++++ 5 files changed, 151 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index e8529550b8fca..59c6cf844c1da 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -262,6 +262,37 @@ object SQLConf { .booleanConf .createWithDefault(true) + val ENABLE_FALL_BACK_TO_HDFS_FOR_STATS = + buildConf("spark.sql.statistics.fallBackToHdfs") + .doc("If the table statistics are not available from table metadata enable fall back to hdfs." + + " This is useful in determining if a table is small enough to use auto broadcast joins.") + .booleanConf + .createWithDefault(false) + + val SIZE_DESER_FACTOR = buildConf("spark.sql.statistics.deserialization.factor") + .doc("In the absence of uncompressed/raw data size, total file size will be used for " + + "statistics annotation. But the file may be compressed, encoded and serialized which may " + + "be lesser in size than the actual uncompressed/raw data size. This factor will be " + + "multiplied to file size to estimate the raw data size. ") + .doubleConf + .createWithDefault(1.0) + + val IGNORE_RAWDATASIZE = buildConf("spark.sql.statistics.ignoreRawDataSize") + .doc("Currently, the rawDataSize property of Hive tables is incorrect due to HIVE-20079. " + + "When this setting is true, Spark will not use the rawDataSize property when calculating " + + "the size of a table") + .booleanConf + .createWithDefault(false) + + val DEFAULT_SIZE_IN_BYTES = buildConf("spark.sql.defaultSizeInBytes") + .internal() + .doc("The default table size used in query planning. By default, it is set to Long.MaxValue " + + "which is larger than `spark.sql.autoBroadcastJoinThreshold` to be more conservative. " + + "That is to say by default the optimizer will not choose to broadcast a table unless it " + + "knows for sure its size is small enough.") + .longConf + .createWithDefault(Long.MaxValue) + val SHUFFLE_PARTITIONS = buildConf("spark.sql.shuffle.partitions") .doc("The default number of partitions to use when shuffling data for joins or aggregations.") .intConf @@ -1989,6 +2020,10 @@ class SQLConf extends Serializable with Logging { def integralDivideReturnLong: Boolean = getConf(SQLConf.LEGACY_INTEGRALDIVIDE_RETURN_LONG) + def sizeDeserializationFactor: Double = getConf(SQLConf.SIZE_DESER_FACTOR) + + def ignoreRawDataSize: Boolean = getConf(SQLConf.IGNORE_RAWDATASIZE) + /** ********************** SQLConf functionality methods ************ */ /** Set Spark SQL configuration properties. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala index 16b2367bfdd5c..c25458e014cef 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala @@ -17,11 +17,12 @@ package org.apache.spark.sql.execution.datasources -import org.apache.spark.sql.catalyst.catalog.CatalogStatistics +import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.internal.SQLConf private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown { @@ -65,7 +66,8 @@ private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] { fsRelation.copy(location = prunedFileIndex)(sparkSession) // Change table stats based on the sizeInBytes of pruned files val withStats = logicalRelation.catalogTable.map(_.copy( - stats = Some(CatalogStatistics(sizeInBytes = BigInt(prunedFileIndex.sizeInBytes))))) + stats = Some(CatalogStatistics(sizeInBytes = + BigInt(calcPartSize(logicalRelation.catalogTable, prunedFileIndex.sizeInBytes)))))) val prunedLogicalRelation = logicalRelation.copy( relation = prunedFsRelation, catalogTable = withStats) // Keep partition-pruning predicates so that they are visible in physical planning @@ -76,4 +78,16 @@ private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] { op } } + + private def calcPartSize(catalogTable: Option[CatalogTable], sizeInBytes: Long): Long = { + val conf: SQLConf = SQLConf.get + val factor = conf.sizeDeserializationFactor + if (catalogTable.isDefined && factor != 1.0 && + // TODO: The serde check should be in a utility function, since it is also checked elsewhere + catalogTable.get.storage.serde.exists(s => s.contains("Parquet") || s.contains("Orc"))) { + (sizeInBytes.toLong * factor).toLong + } else { + sizeInBytes + } + } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 5e9b324a168e0..d0628f6718b59 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -52,6 +52,7 @@ import org.apache.spark.sql.execution.QueryExecutionException import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.hive.HiveExternalCatalog.{DATASOURCE_SCHEMA, DATASOURCE_SCHEMA_NUMPARTS, DATASOURCE_SCHEMA_PART_PREFIX} import org.apache.spark.sql.hive.client.HiveClientImpl._ +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.util.{CircularBuffer, Utils} @@ -465,7 +466,7 @@ private[hive] class HiveClientImpl( // For EXTERNAL_TABLE, the table properties has a particular field "EXTERNAL". This is added // in the function toHiveTable. properties = filteredProperties, - stats = readHiveStats(properties), + stats = readHiveStats(properties, Option(h.getSerializationLib)), comment = comment, // In older versions of Spark(before 2.2.0), we expand the view original text and // store that into `viewExpandedText`, that should be used in view resolution. @@ -1034,14 +1035,17 @@ private[hive] object HiveClientImpl { createTime = apiPartition.getCreateTime.toLong * 1000, lastAccessTime = apiPartition.getLastAccessTime.toLong * 1000, parameters = properties, - stats = readHiveStats(properties)) + stats = readHiveStats(properties, + Option(apiPartition.getSd.getSerdeInfo.getSerializationLib))) } /** * Reads statistics from Hive. * Note that this statistics could be overridden by Spark's statistics if that's available. */ - private def readHiveStats(properties: Map[String, String]): Option[CatalogStatistics] = { + private def readHiveStats( + properties: Map[String, String], + serde: Option[String] = None): Option[CatalogStatistics] = { val totalSize = properties.get(StatsSetupConst.TOTAL_SIZE).map(BigInt(_)) val rawDataSize = properties.get(StatsSetupConst.RAW_DATA_SIZE).map(BigInt(_)) val rowCount = properties.get(StatsSetupConst.ROW_COUNT).map(BigInt(_)) @@ -1056,9 +1060,21 @@ private[hive] object HiveClientImpl { // return None. // In Hive, when statistics gathering is disabled, `rawDataSize` and `numRows` is always // zero after INSERT command. So they are used here only if they are larger than zero. - if (totalSize.isDefined && totalSize.get > 0L) { - Some(CatalogStatistics(sizeInBytes = totalSize.get, rowCount = rowCount.filter(_ > 0))) - } else if (rawDataSize.isDefined && rawDataSize.get > 0) { + val sqlConf = SQLConf.get + val factor = sqlConf.sizeDeserializationFactor + val adjustedSize = if (totalSize.isDefined && serde.isDefined && factor != 1.0D) { + // TODO: The serde check should be in a utility function, since it is also checked elsewhere + if (serde.get.contains("Parquet") || serde.get.contains("Orc")) { + Some(BigInt((totalSize.get.toLong * factor).toLong)) + } else { + totalSize + } + } else { + totalSize + } + if (adjustedSize.isDefined && adjustedSize.get > 0L) { + Some(CatalogStatistics(sizeInBytes = adjustedSize.get, rowCount = rowCount.filter(_ > 0))) + } else if (rawDataSize.isDefined && rawDataSize.get > 0 && !sqlConf.ignoreRawDataSize) { Some(CatalogStatistics(sizeInBytes = rawDataSize.get, rowCount = rowCount.filter(_ > 0))) } else { // TODO: still fill the rowCount even if sizeInBytes is empty. Might break anything? diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index db2024e8b5d16..c49087cee872f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -1381,4 +1381,50 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto assert(catalogStats.rowCount.isEmpty) } } + + test("Test deserialization factor") { + val formats = Seq("parquet", "orc", "textfile") + val factor = 10 + formats.foreach { format => + val tableName = s"${format}SizeTest" + val tableNameAdj = s"${tableName}Adj" + withTable(tableName, tableNameAdj) { + sql(s"CREATE TABLE $tableName STORED AS $format AS SELECT * FROM SRC") + val relationStats = spark.table(tableName).queryExecution.optimizedPlan.stats + val sizeInBytes = relationStats.sizeInBytes + + // test deserialization factor on non-partitioned table + withSQLConf(("spark.sql.statistics.deserialization.factor", factor.toString)) { + sql(s"CREATE TABLE $tableNameAdj STORED AS $format AS SELECT * FROM SRC") + val relationStats = spark.table(tableNameAdj).queryExecution.optimizedPlan.stats + val expectedSizeInBytes = if (format != "textfile") { + sizeInBytes * factor + } else { + sizeInBytes + } + assert(relationStats.sizeInBytes == expectedSizeInBytes) + } + } + } + } + + test("test ignoreRawDataSize") { + val tableName = "rawData" + withTable(tableName) { + withSQLConf("spark.sql.statistics.ignoreRawDataSize" -> "true") { + sql(s"CREATE TABLE $tableName (c1 bigint)" + + "TBLPROPERTIES ('numRows'='0', 'rawDataSize'='200', 'totalSize'='0')") + + val catalogTable = getCatalogTable(tableName) + + val properties = catalogTable.ignoredProperties + assert(properties("totalSize").toLong == 0) + assert(properties("rawDataSize").toLong > 0) + assert(properties("numRows").toLong == 0) + + val relationStats = spark.table(tableName).queryExecution.optimizedPlan.stats + assert(relationStats.sizeInBytes == spark.sessionState.conf.defaultSizeInBytes) + } + } + } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala index 94384185d190a..cd1fa5100099a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala @@ -91,4 +91,36 @@ class PruneFileSourcePartitionsSuite extends QueryTest with SQLTestUtils with Te assert(size2 < tableStats.get.sizeInBytes) } } + + test("Test deserialization factor against partition") { + val factor = 10 + val formats = Seq("parquet", "orc", "csv") + formats.foreach { format => + withTable("tbl") { + spark.range(10).selectExpr("id", "id % 3 as p").write.format(format) + .partitionBy("p").saveAsTable("tbl") + sql(s"ANALYZE TABLE tbl COMPUTE STATISTICS") + + val df1 = sql("SELECT * FROM tbl WHERE p = 1") + + val sizes1 = df1.queryExecution.optimizedPlan.collect { + case relation: LogicalRelation => relation.catalogTable.get.stats.get.sizeInBytes + } + + withSQLConf("spark.sql.statistics.deserialization.factor" -> factor.toString) { + val df2 = sql("SELECT * FROM tbl WHERE p = 1") + + val sizes2 = df2.queryExecution.optimizedPlan.collect { + case relation: LogicalRelation => relation.catalogTable.get.stats.get.sizeInBytes + } + + if (format == "csv") { + assert(sizes2(0) == sizes1(0)) + } else { + assert(sizes2(0) == (sizes1(0) * factor)) + } + } + } + } + } } From d628b013bb2495484be1c77c6c9f96daa92318b9 Mon Sep 17 00:00:00 2001 From: Bruce Robbins Date: Wed, 12 Sep 2018 13:57:12 -0700 Subject: [PATCH 02/13] Associate deser factor with a table --- .../PruneFileSourcePartitions.scala | 11 ++--- .../sql/hive/client/HiveClientImpl.scala | 13 ++---- .../spark/sql/hive/StatisticsSuite.scala | 34 ++++++-------- .../PruneFileSourcePartitionsSuite.scala | 46 ++++++++----------- 4 files changed, 41 insertions(+), 63 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala index c25458e014cef..6f9f53e948a19 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala @@ -80,14 +80,11 @@ private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] { } private def calcPartSize(catalogTable: Option[CatalogTable], sizeInBytes: Long): Long = { - val conf: SQLConf = SQLConf.get - val factor = conf.sizeDeserializationFactor - if (catalogTable.isDefined && factor != 1.0 && - // TODO: The serde check should be in a utility function, since it is also checked elsewhere - catalogTable.get.storage.serde.exists(s => s.contains("Parquet") || s.contains("Orc"))) { - (sizeInBytes.toLong * factor).toLong + val factor = if (catalogTable.isDefined) { + catalogTable.get.properties.get("deserFactor").getOrElse("1.0").toDouble } else { - sizeInBytes + 1.0 } + (sizeInBytes.toLong * factor).toLong } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index d0628f6718b59..4a11eb417fa21 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -1060,18 +1060,13 @@ private[hive] object HiveClientImpl { // return None. // In Hive, when statistics gathering is disabled, `rawDataSize` and `numRows` is always // zero after INSERT command. So they are used here only if they are larger than zero. - val sqlConf = SQLConf.get - val factor = sqlConf.sizeDeserializationFactor - val adjustedSize = if (totalSize.isDefined && serde.isDefined && factor != 1.0D) { - // TODO: The serde check should be in a utility function, since it is also checked elsewhere - if (serde.get.contains("Parquet") || serde.get.contains("Orc")) { - Some(BigInt((totalSize.get.toLong * factor).toLong)) - } else { - totalSize - } + val factor = properties.get("deserFactor").getOrElse("1.0").toDouble + val adjustedSize = if (totalSize.isDefined && factor != 1.0D) { + Some(BigInt((totalSize.get.toLong * factor).toLong)) } else { totalSize } + val sqlConf = SQLConf.get if (adjustedSize.isDefined && adjustedSize.get > 0L) { Some(CatalogStatistics(sizeInBytes = adjustedSize.get, rowCount = rowCount.filter(_ > 0))) } else if (rawDataSize.isDefined && rawDataSize.get > 0 && !sqlConf.ignoreRawDataSize) { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index c49087cee872f..16c477b966b93 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -1383,28 +1383,20 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto } test("Test deserialization factor") { - val formats = Seq("parquet", "orc", "textfile") val factor = 10 - formats.foreach { format => - val tableName = s"${format}SizeTest" - val tableNameAdj = s"${tableName}Adj" - withTable(tableName, tableNameAdj) { - sql(s"CREATE TABLE $tableName STORED AS $format AS SELECT * FROM SRC") - val relationStats = spark.table(tableName).queryExecution.optimizedPlan.stats - val sizeInBytes = relationStats.sizeInBytes - - // test deserialization factor on non-partitioned table - withSQLConf(("spark.sql.statistics.deserialization.factor", factor.toString)) { - sql(s"CREATE TABLE $tableNameAdj STORED AS $format AS SELECT * FROM SRC") - val relationStats = spark.table(tableNameAdj).queryExecution.optimizedPlan.stats - val expectedSizeInBytes = if (format != "textfile") { - sizeInBytes * factor - } else { - sizeInBytes - } - assert(relationStats.sizeInBytes == expectedSizeInBytes) - } - } + val tableName = s"sizeTest" + val tableNameAdj = s"${tableName}Adj" + withTable(tableName, tableNameAdj) { + sql(s"CREATE TABLE $tableName STORED AS PARQUET AS SELECT * FROM SRC") + val relationStats = spark.table(tableName).queryExecution.optimizedPlan.stats + val sizeInBytes = relationStats.sizeInBytes + + // test deserialization factor on non-partitioned table + sql(s"CREATE TABLE $tableNameAdj STORED AS PARQUET AS SELECT * FROM SRC") + sql(s"ALTER TABLE $tableNameAdj SET TBLPROPERTIES('deserFactor'='$factor')") + val relationStats2 = spark.table(tableNameAdj).queryExecution.optimizedPlan.stats + val expectedSizeInBytes = sizeInBytes * factor + assert(relationStats2.sizeInBytes == expectedSizeInBytes) } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala index cd1fa5100099a..06acf46b5927c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala @@ -94,33 +94,27 @@ class PruneFileSourcePartitionsSuite extends QueryTest with SQLTestUtils with Te test("Test deserialization factor against partition") { val factor = 10 - val formats = Seq("parquet", "orc", "csv") - formats.foreach { format => - withTable("tbl") { - spark.range(10).selectExpr("id", "id % 3 as p").write.format(format) - .partitionBy("p").saveAsTable("tbl") - sql(s"ANALYZE TABLE tbl COMPUTE STATISTICS") - - val df1 = sql("SELECT * FROM tbl WHERE p = 1") - - val sizes1 = df1.queryExecution.optimizedPlan.collect { - case relation: LogicalRelation => relation.catalogTable.get.stats.get.sizeInBytes - } - - withSQLConf("spark.sql.statistics.deserialization.factor" -> factor.toString) { - val df2 = sql("SELECT * FROM tbl WHERE p = 1") - - val sizes2 = df2.queryExecution.optimizedPlan.collect { - case relation: LogicalRelation => relation.catalogTable.get.stats.get.sizeInBytes - } - - if (format == "csv") { - assert(sizes2(0) == sizes1(0)) - } else { - assert(sizes2(0) == (sizes1(0) * factor)) - } - } + withTable("tbl") { + spark.range(10).selectExpr("id", "id % 3 as p").write.format("parquet") + .partitionBy("p").saveAsTable("tbl") + sql(s"ANALYZE TABLE tbl COMPUTE STATISTICS") + + val df1 = sql("SELECT * FROM tbl WHERE p = 1") + + val sizes1 = df1.queryExecution.optimizedPlan.collect { + case relation: LogicalRelation => relation.catalogTable.get.stats.get.sizeInBytes + } + assert(sizes1 != 0) + + sql(s"ALTER TABLE tbl SET TBLPROPERTIES('deserFactor'='$factor')") + sql("REFRESH TABLE TBL") + + val df2 = sql("SELECT * FROM tbl WHERE p = 1") + + val sizes2 = df2.queryExecution.optimizedPlan.collect { + case relation: LogicalRelation => relation.catalogTable.get.stats.get.sizeInBytes } + assert(sizes2(0) == (sizes1(0) * factor)) } } } From c2d961e196e2f46e1244ce2547aa15a330c77c27 Mon Sep 17 00:00:00 2001 From: Bruce Robbins Date: Wed, 12 Sep 2018 14:04:42 -0700 Subject: [PATCH 03/13] Cleanup old code --- .../apache/spark/sql/hive/client/HiveClientImpl.scala | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 4a11eb417fa21..c2ceb3c18b7dd 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -466,7 +466,7 @@ private[hive] class HiveClientImpl( // For EXTERNAL_TABLE, the table properties has a particular field "EXTERNAL". This is added // in the function toHiveTable. properties = filteredProperties, - stats = readHiveStats(properties, Option(h.getSerializationLib)), + stats = readHiveStats(properties), comment = comment, // In older versions of Spark(before 2.2.0), we expand the view original text and // store that into `viewExpandedText`, that should be used in view resolution. @@ -1035,17 +1035,14 @@ private[hive] object HiveClientImpl { createTime = apiPartition.getCreateTime.toLong * 1000, lastAccessTime = apiPartition.getLastAccessTime.toLong * 1000, parameters = properties, - stats = readHiveStats(properties, - Option(apiPartition.getSd.getSerdeInfo.getSerializationLib))) + stats = readHiveStats(properties)) } /** * Reads statistics from Hive. * Note that this statistics could be overridden by Spark's statistics if that's available. */ - private def readHiveStats( - properties: Map[String, String], - serde: Option[String] = None): Option[CatalogStatistics] = { + private def readHiveStats(properties: Map[String, String]): Option[CatalogStatistics] = { val totalSize = properties.get(StatsSetupConst.TOTAL_SIZE).map(BigInt(_)) val rawDataSize = properties.get(StatsSetupConst.RAW_DATA_SIZE).map(BigInt(_)) val rowCount = properties.get(StatsSetupConst.ROW_COUNT).map(BigInt(_)) From 55e340590cdffc2db39232921658c581101adf6a Mon Sep 17 00:00:00 2001 From: Bruce Robbins Date: Wed, 12 Sep 2018 14:21:08 -0700 Subject: [PATCH 04/13] More old code cleanup --- .../scala/org/apache/spark/sql/internal/SQLConf.scala | 10 ---------- .../datasources/PruneFileSourcePartitions.scala | 1 - .../execution/PruneFileSourcePartitionsSuite.scala | 2 -- 3 files changed, 13 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 59c6cf844c1da..a12d08eabb122 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -269,14 +269,6 @@ object SQLConf { .booleanConf .createWithDefault(false) - val SIZE_DESER_FACTOR = buildConf("spark.sql.statistics.deserialization.factor") - .doc("In the absence of uncompressed/raw data size, total file size will be used for " + - "statistics annotation. But the file may be compressed, encoded and serialized which may " + - "be lesser in size than the actual uncompressed/raw data size. This factor will be " + - "multiplied to file size to estimate the raw data size. ") - .doubleConf - .createWithDefault(1.0) - val IGNORE_RAWDATASIZE = buildConf("spark.sql.statistics.ignoreRawDataSize") .doc("Currently, the rawDataSize property of Hive tables is incorrect due to HIVE-20079. " + "When this setting is true, Spark will not use the rawDataSize property when calculating " + @@ -2020,8 +2012,6 @@ class SQLConf extends Serializable with Logging { def integralDivideReturnLong: Boolean = getConf(SQLConf.LEGACY_INTEGRALDIVIDE_RETURN_LONG) - def sizeDeserializationFactor: Double = getConf(SQLConf.SIZE_DESER_FACTOR) - def ignoreRawDataSize: Boolean = getConf(SQLConf.IGNORE_RAWDATASIZE) /** ********************** SQLConf functionality methods ************ */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala index 6f9f53e948a19..3ab5ebda02d66 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala @@ -22,7 +22,6 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.internal.SQLConf private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala index 06acf46b5927c..edddd4d713ec0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala @@ -100,7 +100,6 @@ class PruneFileSourcePartitionsSuite extends QueryTest with SQLTestUtils with Te sql(s"ANALYZE TABLE tbl COMPUTE STATISTICS") val df1 = sql("SELECT * FROM tbl WHERE p = 1") - val sizes1 = df1.queryExecution.optimizedPlan.collect { case relation: LogicalRelation => relation.catalogTable.get.stats.get.sizeInBytes } @@ -110,7 +109,6 @@ class PruneFileSourcePartitionsSuite extends QueryTest with SQLTestUtils with Te sql("REFRESH TABLE TBL") val df2 = sql("SELECT * FROM tbl WHERE p = 1") - val sizes2 = df2.queryExecution.optimizedPlan.collect { case relation: LogicalRelation => relation.catalogTable.get.stats.get.sizeInBytes } From 837c56c0c99f69f929a27d977ff56eebb7804678 Mon Sep 17 00:00:00 2001 From: Bruce Robbins Date: Mon, 17 Sep 2018 13:55:52 -0700 Subject: [PATCH 05/13] Fix typos --- .../test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala | 2 +- .../sql/hive/execution/PruneFileSourcePartitionsSuite.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index 16c477b966b93..b5a61ec9b0415 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -1382,7 +1382,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto } } - test("Test deserialization factor") { + test("test deserialization factor") { val factor = 10 val tableName = s"sizeTest" val tableNameAdj = s"${tableName}Adj" diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala index edddd4d713ec0..2b7b45373c871 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala @@ -103,7 +103,7 @@ class PruneFileSourcePartitionsSuite extends QueryTest with SQLTestUtils with Te val sizes1 = df1.queryExecution.optimizedPlan.collect { case relation: LogicalRelation => relation.catalogTable.get.stats.get.sizeInBytes } - assert(sizes1 != 0) + assert(sizes1(0) != 0) sql(s"ALTER TABLE tbl SET TBLPROPERTIES('deserFactor'='$factor')") sql("REFRESH TABLE TBL") From 56f9b0aaff8fda74f8948e651f1235333140c6c2 Mon Sep 17 00:00:00 2001 From: Bruce Robbins Date: Mon, 17 Sep 2018 14:19:16 -0700 Subject: [PATCH 06/13] Handle bad deserialization factor --- .../datasources/PruneFileSourcePartitions.scala | 4 +++- .../apache/spark/sql/hive/client/HiveClientImpl.scala | 3 ++- .../org/apache/spark/sql/hive/StatisticsSuite.scala | 9 ++++++++- 3 files changed, 13 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala index 3ab5ebda02d66..c35078bf5e2a1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.datasources +import org.apache.commons.lang3.math.NumberUtils + import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.PhysicalOperation @@ -80,7 +82,7 @@ private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] { private def calcPartSize(catalogTable: Option[CatalogTable], sizeInBytes: Long): Long = { val factor = if (catalogTable.isDefined) { - catalogTable.get.properties.get("deserFactor").getOrElse("1.0").toDouble + NumberUtils.toDouble(catalogTable.get.properties.get("deserFactor").getOrElse("1.0"), 1.0) } else { 1.0 } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index c2ceb3c18b7dd..7e8eef609365b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -25,6 +25,7 @@ import scala.collection.JavaConverters._ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer +import org.apache.commons.lang3.math.NumberUtils import org.apache.hadoop.fs.Path import org.apache.hadoop.hive.common.StatsSetupConst import org.apache.hadoop.hive.conf.HiveConf @@ -1057,7 +1058,7 @@ private[hive] object HiveClientImpl { // return None. // In Hive, when statistics gathering is disabled, `rawDataSize` and `numRows` is always // zero after INSERT command. So they are used here only if they are larger than zero. - val factor = properties.get("deserFactor").getOrElse("1.0").toDouble + val factor = NumberUtils.toDouble(properties.get("deserFactor").getOrElse("1.0"), 1.0) val adjustedSize = if (totalSize.isDefined && factor != 1.0D) { Some(BigInt((totalSize.get.toLong * factor).toLong)) } else { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index b5a61ec9b0415..705dd2917a17e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -1386,7 +1386,8 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto val factor = 10 val tableName = s"sizeTest" val tableNameAdj = s"${tableName}Adj" - withTable(tableName, tableNameAdj) { + val tableNameBad = s"${tableName}Bad" + withTable(tableName, tableNameAdj, tableNameBad) { sql(s"CREATE TABLE $tableName STORED AS PARQUET AS SELECT * FROM SRC") val relationStats = spark.table(tableName).queryExecution.optimizedPlan.stats val sizeInBytes = relationStats.sizeInBytes @@ -1397,6 +1398,12 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto val relationStats2 = spark.table(tableNameAdj).queryExecution.optimizedPlan.stats val expectedSizeInBytes = sizeInBytes * factor assert(relationStats2.sizeInBytes == expectedSizeInBytes) + + // test bad deserialization factor + sql(s"CREATE TABLE $tableNameBad STORED AS PARQUET AS SELECT * FROM SRC") + sql(s"ALTER TABLE $tableNameBad SET TBLPROPERTIES('deserFactor'='bad')") + val relationStats3 = spark.table(tableNameBad).queryExecution.optimizedPlan.stats + assert(relationStats3.sizeInBytes == sizeInBytes) } } From 382dc24e54014e963dd52e65533b8d999adf82d8 Mon Sep 17 00:00:00 2001 From: Bruce Robbins Date: Mon, 17 Sep 2018 14:26:36 -0700 Subject: [PATCH 07/13] Remove extraneous toLong --- .../sql/execution/datasources/PruneFileSourcePartitions.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala index c35078bf5e2a1..c974105778ab2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala @@ -86,6 +86,6 @@ private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] { } else { 1.0 } - (sizeInBytes.toLong * factor).toLong + (sizeInBytes * factor).toLong } } From c3345d6d5fa0eee7bbfc5c0a9fe704a7c6100ef1 Mon Sep 17 00:00:00 2001 From: Bruce Robbins Date: Mon, 17 Sep 2018 14:43:37 -0700 Subject: [PATCH 08/13] Update comment --- .../org/apache/spark/sql/hive/client/HiveClientImpl.scala | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 7e8eef609365b..b717739ccabe4 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -1056,6 +1056,11 @@ private[hive] object HiveClientImpl { // When table is external, `totalSize` is always zero, which will influence join strategy. // So when `totalSize` is zero, use `rawDataSize` instead. When `rawDataSize` is also zero, // return None. + // If a table has a deserialization factor, the table owner expects the in-memory + // representation of the table to be larger than the table's totalSize value. In that case, + // multiply totalSize by the deserialization factor and use that number instead. + // If the user has set spark.sql.statistics.ignoreRawDataSize to true (because of HIVE-20079, + // for example), don't use rawDataSize. // In Hive, when statistics gathering is disabled, `rawDataSize` and `numRows` is always // zero after INSERT command. So they are used here only if they are larger than zero. val factor = NumberUtils.toDouble(properties.get("deserFactor").getOrElse("1.0"), 1.0) From b24ba63f3bcda2b252e158aaf7f13bbcb1bf6038 Mon Sep 17 00:00:00 2001 From: Bruce Robbins Date: Tue, 18 Sep 2018 10:56:57 -0700 Subject: [PATCH 09/13] Can't use lang3.math.NumberUtils from a HiveClientImpl loaded via IsolatedClientLoader --- .../org/apache/spark/sql/hive/client/HiveClientImpl.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index b717739ccabe4..35bd156e03702 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -25,7 +25,6 @@ import scala.collection.JavaConverters._ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer -import org.apache.commons.lang3.math.NumberUtils import org.apache.hadoop.fs.Path import org.apache.hadoop.hive.common.StatsSetupConst import org.apache.hadoop.hive.conf.HiveConf @@ -1063,7 +1062,11 @@ private[hive] object HiveClientImpl { // for example), don't use rawDataSize. // In Hive, when statistics gathering is disabled, `rawDataSize` and `numRows` is always // zero after INSERT command. So they are used here only if they are larger than zero. - val factor = NumberUtils.toDouble(properties.get("deserFactor").getOrElse("1.0"), 1.0) + val factor = try { + properties.get("deserFactor").getOrElse("1.0").toDouble + } catch { + case _: NumberFormatException => 1.0 + } val adjustedSize = if (totalSize.isDefined && factor != 1.0D) { Some(BigInt((totalSize.get.toLong * factor).toLong)) } else { From 7f6abb4bbcfb9338a4eafae1e5a0db0bfaf1115d Mon Sep 17 00:00:00 2001 From: Bruce Robbins Date: Tue, 18 Sep 2018 15:10:57 -0700 Subject: [PATCH 10/13] Remove some duplication --- .../execution/datasources/DataSourceUtils.scala | 6 ++++++ .../datasources/PruneFileSourcePartitions.scala | 7 +++---- .../spark/sql/hive/client/HiveClientImpl.scala | 17 +++++------------ 3 files changed, 14 insertions(+), 16 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala index 90cec5e72c1a7..2896335b50569 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.datasources +import org.apache.commons.lang3.math.NumberUtils import org.apache.hadoop.fs.Path import org.apache.spark.sql.AnalysisException @@ -59,4 +60,9 @@ object DataSourceUtils { val name = path.getName !(name.startsWith("_") || name.startsWith(".")) } + + def calcDataSize(properties: Map[String, String], sizeInBytes: Long): Long = { + val factor = NumberUtils.toDouble(properties.get("deserFactor").getOrElse("1.0"), 1.0) + (sizeInBytes * factor).toLong + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala index c974105778ab2..71f3db45bba4c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala @@ -81,11 +81,10 @@ private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] { } private def calcPartSize(catalogTable: Option[CatalogTable], sizeInBytes: Long): Long = { - val factor = if (catalogTable.isDefined) { - NumberUtils.toDouble(catalogTable.get.properties.get("deserFactor").getOrElse("1.0"), 1.0) + if (catalogTable.isDefined) { + DataSourceUtils.calcDataSize(catalogTable.get.properties, sizeInBytes) } else { - 1.0 + sizeInBytes } - (sizeInBytes * factor).toLong } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 35bd156e03702..59ef1e1d4dbf1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -50,6 +50,7 @@ import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException} import org.apache.spark.sql.execution.QueryExecutionException import org.apache.spark.sql.execution.command.DDLUtils +import org.apache.spark.sql.execution.datasources.DataSourceUtils import org.apache.spark.sql.hive.HiveExternalCatalog.{DATASOURCE_SCHEMA, DATASOURCE_SCHEMA_NUMPARTS, DATASOURCE_SCHEMA_PART_PREFIX} import org.apache.spark.sql.hive.client.HiveClientImpl._ import org.apache.spark.sql.internal.SQLConf @@ -1062,19 +1063,11 @@ private[hive] object HiveClientImpl { // for example), don't use rawDataSize. // In Hive, when statistics gathering is disabled, `rawDataSize` and `numRows` is always // zero after INSERT command. So they are used here only if they are larger than zero. - val factor = try { - properties.get("deserFactor").getOrElse("1.0").toDouble - } catch { - case _: NumberFormatException => 1.0 - } - val adjustedSize = if (totalSize.isDefined && factor != 1.0D) { - Some(BigInt((totalSize.get.toLong * factor).toLong)) - } else { - totalSize - } + val adjustedSize = BigInt(DataSourceUtils.calcDataSize(properties, + totalSize.getOrElse(BigInt(0)).toLong)) val sqlConf = SQLConf.get - if (adjustedSize.isDefined && adjustedSize.get > 0L) { - Some(CatalogStatistics(sizeInBytes = adjustedSize.get, rowCount = rowCount.filter(_ > 0))) + if (adjustedSize > 0L) { + Some(CatalogStatistics(sizeInBytes = adjustedSize, rowCount = rowCount.filter(_ > 0))) } else if (rawDataSize.isDefined && rawDataSize.get > 0 && !sqlConf.ignoreRawDataSize) { Some(CatalogStatistics(sizeInBytes = rawDataSize.get, rowCount = rowCount.filter(_ > 0))) } else { From f849a83d95c7d349ed85c5eca6e13f62ab4bbfc9 Mon Sep 17 00:00:00 2001 From: Bruce Robbins Date: Fri, 21 Sep 2018 14:48:21 -0700 Subject: [PATCH 11/13] Stick with BigInt for totalSize in readHiveStats --- .../spark/sql/execution/datasources/DataSourceUtils.scala | 6 +++--- .../execution/datasources/PruneFileSourcePartitions.scala | 2 +- .../org/apache/spark/sql/hive/client/HiveClientImpl.scala | 3 +-- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala index 2896335b50569..3687fa09917bc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala @@ -61,8 +61,8 @@ object DataSourceUtils { !(name.startsWith("_") || name.startsWith(".")) } - def calcDataSize(properties: Map[String, String], sizeInBytes: Long): Long = { - val factor = NumberUtils.toDouble(properties.get("deserFactor").getOrElse("1.0"), 1.0) - (sizeInBytes * factor).toLong + def calcDataSize(properties: Map[String, String], sizeInBytes: BigInt): BigInt = { + val factor = NumberUtils.toInt(properties.get("deserFactor").getOrElse("1"), 1) + sizeInBytes * factor } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala index 71f3db45bba4c..588bd651059b3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala @@ -82,7 +82,7 @@ private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] { private def calcPartSize(catalogTable: Option[CatalogTable], sizeInBytes: Long): Long = { if (catalogTable.isDefined) { - DataSourceUtils.calcDataSize(catalogTable.get.properties, sizeInBytes) + DataSourceUtils.calcDataSize(catalogTable.get.properties, BigInt(sizeInBytes)).toLong } else { sizeInBytes } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 59ef1e1d4dbf1..abfccaf1a0cea 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -1063,8 +1063,7 @@ private[hive] object HiveClientImpl { // for example), don't use rawDataSize. // In Hive, when statistics gathering is disabled, `rawDataSize` and `numRows` is always // zero after INSERT command. So they are used here only if they are larger than zero. - val adjustedSize = BigInt(DataSourceUtils.calcDataSize(properties, - totalSize.getOrElse(BigInt(0)).toLong)) + val adjustedSize = DataSourceUtils.calcDataSize(properties, totalSize.getOrElse(BigInt(0))) val sqlConf = SQLConf.get if (adjustedSize > 0L) { Some(CatalogStatistics(sizeInBytes = adjustedSize, rowCount = rowCount.filter(_ > 0))) From 37a5bc9aceb2c29b6177a2b781fca0461e941504 Mon Sep 17 00:00:00 2001 From: Bruce Robbins Date: Fri, 21 Sep 2018 14:56:08 -0700 Subject: [PATCH 12/13] No need to unbox and rebox size in PruneFileSourcePartitions --- .../execution/datasources/PruneFileSourcePartitions.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala index 588bd651059b3..1ebac5cd05b98 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala @@ -68,7 +68,7 @@ private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] { // Change table stats based on the sizeInBytes of pruned files val withStats = logicalRelation.catalogTable.map(_.copy( stats = Some(CatalogStatistics(sizeInBytes = - BigInt(calcPartSize(logicalRelation.catalogTable, prunedFileIndex.sizeInBytes)))))) + calcPartSize(logicalRelation.catalogTable, prunedFileIndex.sizeInBytes))))) val prunedLogicalRelation = logicalRelation.copy( relation = prunedFsRelation, catalogTable = withStats) // Keep partition-pruning predicates so that they are visible in physical planning @@ -80,11 +80,11 @@ private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] { } } - private def calcPartSize(catalogTable: Option[CatalogTable], sizeInBytes: Long): Long = { + private def calcPartSize(catalogTable: Option[CatalogTable], sizeInBytes: Long): BigInt = { if (catalogTable.isDefined) { - DataSourceUtils.calcDataSize(catalogTable.get.properties, BigInt(sizeInBytes)).toLong + DataSourceUtils.calcDataSize(catalogTable.get.properties, BigInt(sizeInBytes)) } else { - sizeInBytes + BigInt(sizeInBytes) } } } From ddfe945ef161e59fc2bbc1a12bf40563d2bdd400 Mon Sep 17 00:00:00 2001 From: Bruce Robbins Date: Fri, 21 Sep 2018 15:24:07 -0700 Subject: [PATCH 13/13] Fix improper conflict resolution --- .../org/apache/spark/sql/internal/SQLConf.scala | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index a12d08eabb122..dd85da40fafd0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -262,13 +262,6 @@ object SQLConf { .booleanConf .createWithDefault(true) - val ENABLE_FALL_BACK_TO_HDFS_FOR_STATS = - buildConf("spark.sql.statistics.fallBackToHdfs") - .doc("If the table statistics are not available from table metadata enable fall back to hdfs." + - " This is useful in determining if a table is small enough to use auto broadcast joins.") - .booleanConf - .createWithDefault(false) - val IGNORE_RAWDATASIZE = buildConf("spark.sql.statistics.ignoreRawDataSize") .doc("Currently, the rawDataSize property of Hive tables is incorrect due to HIVE-20079. " + "When this setting is true, Spark will not use the rawDataSize property when calculating " + @@ -276,15 +269,6 @@ object SQLConf { .booleanConf .createWithDefault(false) - val DEFAULT_SIZE_IN_BYTES = buildConf("spark.sql.defaultSizeInBytes") - .internal() - .doc("The default table size used in query planning. By default, it is set to Long.MaxValue " + - "which is larger than `spark.sql.autoBroadcastJoinThreshold` to be more conservative. " + - "That is to say by default the optimizer will not choose to broadcast a table unless it " + - "knows for sure its size is small enough.") - .longConf - .createWithDefault(Long.MaxValue) - val SHUFFLE_PARTITIONS = buildConf("spark.sql.shuffle.partitions") .doc("The default number of partitions to use when shuffling data for joins or aggregations.") .intConf