Skip to content

Commit

Permalink
Revert "[SPARK-25474][SQL] Support `spark.sql.statistics.fallBackToHd…
Browse files Browse the repository at this point in the history
…fs` in data source tables"

This reverts commit 485ae6d.
  • Loading branch information
gatorsmile committed Aug 23, 2019
1 parent 485ae6d commit 9a265d7
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -344,15 +344,4 @@ object CommandUtils extends Logging {
private def isDataPath(path: Path, stagingDir: String): Boolean = {
!path.getName.startsWith(stagingDir) && DataSourceUtils.isDataPath(path)
}

def getSizeInBytesFallBackToHdfs(session: SparkSession, path: Path, defaultSize: Long): Long = {
try {
val hadoopConf = session.sessionState.newHadoopConf()
path.getFileSystem(hadoopConf).getContentSummary(path).getLength
} catch {
case NonFatal(e) =>
logWarning(s"Failed to get table size from hdfs. Using the default size, $defaultSize.", e)
defaultSize
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@

package org.apache.spark.sql.execution.datasources

import org.apache.hadoop.fs.Path
import java.util.Locale

import scala.collection.mutable

import org.apache.spark.sql.{SparkSession, SQLContext}
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.execution.FileRelation
import org.apache.spark.sql.execution.command.CommandUtils
import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister}
import org.apache.spark.sql.types.{StructField, StructType}

Expand Down Expand Up @@ -70,13 +71,7 @@ case class HadoopFsRelation(

override def sizeInBytes: Long = {
val compressionFactor = sqlContext.conf.fileCompressionFactor
val defaultSize = (location.sizeInBytes * compressionFactor).toLong
location match {
case cfi: CatalogFileIndex if sparkSession.sessionState.conf.fallBackToHdfsForStatsEnabled =>
CommandUtils.getSizeInBytesFallBackToHdfs(sparkSession, new Path(cfi.table.location),
defaultSize)
case _ => defaultSize
}
(location.sizeInBytes * compressionFactor).toLong
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoTab
ScriptTransformation}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.command.{CommandUtils, CreateTableCommand, DDLUtils}
import org.apache.spark.sql.execution.command.{CreateTableCommand, DDLUtils}
import org.apache.spark.sql.execution.datasources.CreateTable
import org.apache.spark.sql.hive.execution._
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
Expand Down Expand Up @@ -118,8 +118,16 @@ class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] {
if DDLUtils.isHiveTable(relation.tableMeta) && relation.tableMeta.stats.isEmpty =>
val table = relation.tableMeta
val sizeInBytes = if (session.sessionState.conf.fallBackToHdfsForStatsEnabled) {
CommandUtils.getSizeInBytesFallBackToHdfs(session, new Path(table.location),
session.sessionState.conf.defaultSizeInBytes)
try {
val hadoopConf = session.sessionState.newHadoopConf()
val tablePath = new Path(table.location)
val fs: FileSystem = tablePath.getFileSystem(hadoopConf)
fs.getContentSummary(tablePath).getLength
} catch {
case e: IOException =>
logWarning("Failed to get table size from hdfs.", e)
session.sessionState.conf.defaultSizeInBytes
}
} else {
session.sessionState.conf.defaultSizeInBytes
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1484,44 +1484,4 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
}
}
}

test("SPARK-25474: test sizeInBytes for CatalogFileIndex dataSourceTable") {
withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> "true") {
withTable("t1", "t2") {
sql("CREATE TABLE t1 (id INT, name STRING) USING PARQUET PARTITIONED BY (name)")
sql("INSERT INTO t1 VALUES (1, 'a')")
checkKeywordsNotExist(sql("EXPLAIN COST SELECT * FROM t1"), "sizeInBytes=8.0 EiB")
sql("CREATE TABLE t2 (id INT, name STRING) USING PARQUET PARTITIONED BY (name)")
sql("INSERT INTO t2 VALUES (1, 'a')")
checkKeywordsExist(sql("EXPLAIN SELECT * FROM t1, t2 WHERE t1.id=t2.id"),
"BroadcastHashJoin")
}
}
}

test("SPARK-25474: should not fall back to hdfs when table statistics exists" +
" for CatalogFileIndex dataSourceTable") {

var sizeInBytesDisabledFallBack, sizeInBytesEnabledFallBack = 0L
Seq(true, false).foreach { fallBackToHdfs =>
withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> fallBackToHdfs.toString) {
withTable("t1") {
sql("CREATE TABLE t1 (id INT, name STRING) USING PARQUET PARTITIONED BY (name)")
sql("INSERT INTO t1 VALUES (1, 'a')")
// Analyze command updates the statistics of table `t1`
sql("ANALYZE TABLE t1 COMPUTE STATISTICS")
val catalogTable = getCatalogTable("t1")
assert(catalogTable.stats.isDefined)

if (!fallBackToHdfs) {
sizeInBytesDisabledFallBack = catalogTable.stats.get.sizeInBytes.toLong
} else {
sizeInBytesEnabledFallBack = catalogTable.stats.get.sizeInBytes.toLong
}
checkKeywordsNotExist(sql("EXPLAIN COST SELECT * FROM t1"), "sizeInBytes=8.0 EiB")
}
}
}
assert(sizeInBytesEnabledFallBack === sizeInBytesDisabledFallBack)
}
}

0 comments on commit 9a265d7

Please sign in to comment.