Skip to content

Commit

Permalink
[SPARK-25474][SQL] Support spark.sql.statistics.fallBackToHdfs in d…
Browse files Browse the repository at this point in the history
…ata source tables

In case of CatalogFileIndex datasource table, sizeInBytes is always coming as default size in bytes, which is  8.0EB (Even when the user give fallBackToHdfsForStatsEnabled=true) . So, the datasource table which has CatalogFileIndex, always prefer SortMergeJoin, instead of BroadcastJoin, even though the size is below broadcast join threshold.
In this PR, In case of CatalogFileIndex table, if we enable "fallBackToHdfsForStatsEnabled=true", then the computeStatistics  get the sizeInBytes from the hdfs and we get the actual size of the table. Hence, during join operation, when the table size is below broadcast threshold, it will prefer broadCastHashJoin instead of SortMergeJoin.

Added UT

Closes apache#22502 from shahidki31/SPARK-25474.

Authored-by: shahid <shahidki31@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
  • Loading branch information
shahidki31 authored and dongjoon-hyun committed Jul 28, 2019
1 parent 70f82fd commit 485ae6d
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -344,4 +344,15 @@ object CommandUtils extends Logging {
private def isDataPath(path: Path, stagingDir: String): Boolean = {
!path.getName.startsWith(stagingDir) && DataSourceUtils.isDataPath(path)
}

def getSizeInBytesFallBackToHdfs(session: SparkSession, path: Path, defaultSize: Long): Long = {
try {
val hadoopConf = session.sessionState.newHadoopConf()
path.getFileSystem(hadoopConf).getContentSummary(path).getLength
} catch {
case NonFatal(e) =>
logWarning(s"Failed to get table size from hdfs. Using the default size, $defaultSize.", e)
defaultSize
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@

package org.apache.spark.sql.execution.datasources

import java.util.Locale

import scala.collection.mutable
import org.apache.hadoop.fs.Path

import org.apache.spark.sql.{SparkSession, SQLContext}
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.execution.FileRelation
import org.apache.spark.sql.execution.command.CommandUtils
import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister}
import org.apache.spark.sql.types.{StructField, StructType}

Expand Down Expand Up @@ -71,7 +70,13 @@ case class HadoopFsRelation(

override def sizeInBytes: Long = {
val compressionFactor = sqlContext.conf.fileCompressionFactor
(location.sizeInBytes * compressionFactor).toLong
val defaultSize = (location.sizeInBytes * compressionFactor).toLong
location match {
case cfi: CatalogFileIndex if sparkSession.sessionState.conf.fallBackToHdfsForStatsEnabled =>
CommandUtils.getSizeInBytesFallBackToHdfs(sparkSession, new Path(cfi.table.location),
defaultSize)
case _ => defaultSize
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoTab
ScriptTransformation}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.command.{CreateTableCommand, DDLUtils}
import org.apache.spark.sql.execution.command.{CommandUtils, CreateTableCommand, DDLUtils}
import org.apache.spark.sql.execution.datasources.CreateTable
import org.apache.spark.sql.hive.execution._
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
Expand Down Expand Up @@ -118,16 +118,8 @@ class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] {
if DDLUtils.isHiveTable(relation.tableMeta) && relation.tableMeta.stats.isEmpty =>
val table = relation.tableMeta
val sizeInBytes = if (session.sessionState.conf.fallBackToHdfsForStatsEnabled) {
try {
val hadoopConf = session.sessionState.newHadoopConf()
val tablePath = new Path(table.location)
val fs: FileSystem = tablePath.getFileSystem(hadoopConf)
fs.getContentSummary(tablePath).getLength
} catch {
case e: IOException =>
logWarning("Failed to get table size from hdfs.", e)
session.sessionState.conf.defaultSizeInBytes
}
CommandUtils.getSizeInBytesFallBackToHdfs(session, new Path(table.location),
session.sessionState.conf.defaultSizeInBytes)
} else {
session.sessionState.conf.defaultSizeInBytes
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1484,4 +1484,44 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
}
}
}

test("SPARK-25474: test sizeInBytes for CatalogFileIndex dataSourceTable") {
withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> "true") {
withTable("t1", "t2") {
sql("CREATE TABLE t1 (id INT, name STRING) USING PARQUET PARTITIONED BY (name)")
sql("INSERT INTO t1 VALUES (1, 'a')")
checkKeywordsNotExist(sql("EXPLAIN COST SELECT * FROM t1"), "sizeInBytes=8.0 EiB")
sql("CREATE TABLE t2 (id INT, name STRING) USING PARQUET PARTITIONED BY (name)")
sql("INSERT INTO t2 VALUES (1, 'a')")
checkKeywordsExist(sql("EXPLAIN SELECT * FROM t1, t2 WHERE t1.id=t2.id"),
"BroadcastHashJoin")
}
}
}

test("SPARK-25474: should not fall back to hdfs when table statistics exists" +
" for CatalogFileIndex dataSourceTable") {

var sizeInBytesDisabledFallBack, sizeInBytesEnabledFallBack = 0L
Seq(true, false).foreach { fallBackToHdfs =>
withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> fallBackToHdfs.toString) {
withTable("t1") {
sql("CREATE TABLE t1 (id INT, name STRING) USING PARQUET PARTITIONED BY (name)")
sql("INSERT INTO t1 VALUES (1, 'a')")
// Analyze command updates the statistics of table `t1`
sql("ANALYZE TABLE t1 COMPUTE STATISTICS")
val catalogTable = getCatalogTable("t1")
assert(catalogTable.stats.isDefined)

if (!fallBackToHdfs) {
sizeInBytesDisabledFallBack = catalogTable.stats.get.sizeInBytes.toLong
} else {
sizeInBytesEnabledFallBack = catalogTable.stats.get.sizeInBytes.toLong
}
checkKeywordsNotExist(sql("EXPLAIN COST SELECT * FROM t1"), "sizeInBytes=8.0 EiB")
}
}
}
assert(sizeInBytesEnabledFallBack === sizeInBytesDisabledFallBack)
}
}

0 comments on commit 485ae6d

Please sign in to comment.