From 9a265d7d448f23aad42dc73aff641f8e887a5f54 Mon Sep 17 00:00:00 2001 From: Xiao Li Date: Thu, 22 Aug 2019 21:57:57 -0700 Subject: [PATCH] Revert "[SPARK-25474][SQL] Support `spark.sql.statistics.fallBackToHdfs` in data source tables" This reverts commit 485ae6d1818e8756a86da38d6aefc8f1dbde49c2. --- .../sql/execution/command/CommandUtils.scala | 11 ----- .../datasources/HadoopFsRelation.scala | 13 ++---- .../spark/sql/hive/HiveStrategies.scala | 14 +++++-- .../spark/sql/hive/StatisticsSuite.scala | 40 ------------------- 4 files changed, 15 insertions(+), 63 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala index 9a9d66b4b78f1..b644e6dc471d6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala @@ -344,15 +344,4 @@ object CommandUtils extends Logging { private def isDataPath(path: Path, stagingDir: String): Boolean = { !path.getName.startsWith(stagingDir) && DataSourceUtils.isDataPath(path) } - - def getSizeInBytesFallBackToHdfs(session: SparkSession, path: Path, defaultSize: Long): Long = { - try { - val hadoopConf = session.sessionState.newHadoopConf() - path.getFileSystem(hadoopConf).getContentSummary(path).getLength - } catch { - case NonFatal(e) => - logWarning(s"Failed to get table size from hdfs. Using the default size, $defaultSize.", e) - defaultSize - } - } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala index f7d231503fa01..d278802e6c9f2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala @@ -17,12 +17,13 @@ package org.apache.spark.sql.execution.datasources -import org.apache.hadoop.fs.Path +import java.util.Locale + +import scala.collection.mutable import org.apache.spark.sql.{SparkSession, SQLContext} import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.execution.FileRelation -import org.apache.spark.sql.execution.command.CommandUtils import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister} import org.apache.spark.sql.types.{StructField, StructType} @@ -70,13 +71,7 @@ case class HadoopFsRelation( override def sizeInBytes: Long = { val compressionFactor = sqlContext.conf.fileCompressionFactor - val defaultSize = (location.sizeInBytes * compressionFactor).toLong - location match { - case cfi: CatalogFileIndex if sparkSession.sessionState.conf.fallBackToHdfsForStatsEnabled => - CommandUtils.getSizeInBytesFallBackToHdfs(sparkSession, new Path(cfi.table.location), - defaultSize) - case _ => defaultSize - } + (location.sizeInBytes * compressionFactor).toLong } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index d09c0ab0ffc86..7b28e4f401aba 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoTab ScriptTransformation} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution._ -import org.apache.spark.sql.execution.command.{CommandUtils, CreateTableCommand, DDLUtils} +import org.apache.spark.sql.execution.command.{CreateTableCommand, DDLUtils} import org.apache.spark.sql.execution.datasources.CreateTable import org.apache.spark.sql.hive.execution._ import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} @@ -118,8 +118,16 @@ class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] { if DDLUtils.isHiveTable(relation.tableMeta) && relation.tableMeta.stats.isEmpty => val table = relation.tableMeta val sizeInBytes = if (session.sessionState.conf.fallBackToHdfsForStatsEnabled) { - CommandUtils.getSizeInBytesFallBackToHdfs(session, new Path(table.location), - session.sessionState.conf.defaultSizeInBytes) + try { + val hadoopConf = session.sessionState.newHadoopConf() + val tablePath = new Path(table.location) + val fs: FileSystem = tablePath.getFileSystem(hadoopConf) + fs.getContentSummary(tablePath).getLength + } catch { + case e: IOException => + logWarning("Failed to get table size from hdfs.", e) + session.sessionState.conf.defaultSizeInBytes + } } else { session.sessionState.conf.defaultSizeInBytes } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index e20099a0bd9dc..b4e5058cfa5b8 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -1484,44 +1484,4 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto } } } - - test("SPARK-25474: test sizeInBytes for CatalogFileIndex dataSourceTable") { - withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> "true") { - withTable("t1", "t2") { - sql("CREATE TABLE t1 (id INT, name STRING) USING PARQUET PARTITIONED BY (name)") - sql("INSERT INTO t1 VALUES (1, 'a')") - checkKeywordsNotExist(sql("EXPLAIN COST SELECT * FROM t1"), "sizeInBytes=8.0 EiB") - sql("CREATE TABLE t2 (id INT, name STRING) USING PARQUET PARTITIONED BY (name)") - sql("INSERT INTO t2 VALUES (1, 'a')") - checkKeywordsExist(sql("EXPLAIN SELECT * FROM t1, t2 WHERE t1.id=t2.id"), - "BroadcastHashJoin") - } - } - } - - test("SPARK-25474: should not fall back to hdfs when table statistics exists" + - " for CatalogFileIndex dataSourceTable") { - - var sizeInBytesDisabledFallBack, sizeInBytesEnabledFallBack = 0L - Seq(true, false).foreach { fallBackToHdfs => - withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> fallBackToHdfs.toString) { - withTable("t1") { - sql("CREATE TABLE t1 (id INT, name STRING) USING PARQUET PARTITIONED BY (name)") - sql("INSERT INTO t1 VALUES (1, 'a')") - // Analyze command updates the statistics of table `t1` - sql("ANALYZE TABLE t1 COMPUTE STATISTICS") - val catalogTable = getCatalogTable("t1") - assert(catalogTable.stats.isDefined) - - if (!fallBackToHdfs) { - sizeInBytesDisabledFallBack = catalogTable.stats.get.sizeInBytes.toLong - } else { - sizeInBytesEnabledFallBack = catalogTable.stats.get.sizeInBytes.toLong - } - checkKeywordsNotExist(sql("EXPLAIN COST SELECT * FROM t1"), "sizeInBytes=8.0 EiB") - } - } - } - assert(sizeInBytesEnabledFallBack === sizeInBytesDisabledFallBack) - } }