Skip to content

Commit

Permalink
[SPARK-22790][SQL] add a configurable factor to describe HadoopFsRela…
Browse files Browse the repository at this point in the history
…tion's size

## What changes were proposed in this pull request?

as per discussion in #19864 (comment)

the current HadoopFsRelation is purely based on the underlying file size which is not accurate and makes the execution vulnerable to errors like OOM

Users can enable CBO with the functionalities in #19864 to avoid this issue

This JIRA proposes to add a configurable factor to sizeInBytes method in HadoopFsRelation class so that users can mitigate this problem without CBO

## How was this patch tested?

Existing tests

Author: CodingCat <zhunansjtu@gmail.com>
Author: Nan Zhu <nanzhu@uber.com>

Closes #20072 from CodingCat/SPARK-22790.
  • Loading branch information
CodingCat authored and gatorsmile committed Jan 13, 2018
1 parent bd4a21b commit ba891ec
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ object SQLConf {
val CONSTRAINT_PROPAGATION_ENABLED = buildConf("spark.sql.constraintPropagation.enabled")
.internal()
.doc("When true, the query optimizer will infer and propagate data constraints in the query " +
"plan to optimize them. Constraint propagation can sometimes be computationally expensive" +
"plan to optimize them. Constraint propagation can sometimes be computationally expensive " +
"for certain kinds of query plans (such as those with a large number of predicates and " +
"aliases) which might negatively impact overall runtime.")
.booleanConf
Expand All @@ -263,6 +263,15 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val FILE_COMRESSION_FACTOR = buildConf("spark.sql.sources.fileCompressionFactor")
.internal()
.doc("When estimating the output data size of a table scan, multiply the file size with this " +
"factor as the estimated data size, in case the data is compressed in the file and lead to" +
" a heavily underestimated result.")
.doubleConf
.checkValue(_ > 0, "the value of fileDataSizeFactor must be larger than 0")
.createWithDefault(1.0)

val PARQUET_SCHEMA_MERGING_ENABLED = buildConf("spark.sql.parquet.mergeSchema")
.doc("When true, the Parquet data source merges schemas collected from all data files, " +
"otherwise the schema is picked from the summary file or a random data file " +
Expand Down Expand Up @@ -1255,6 +1264,8 @@ class SQLConf extends Serializable with Logging {

def escapedStringLiterals: Boolean = getConf(ESCAPED_STRING_LITERALS)

def fileCompressionFactor: Double = getConf(FILE_COMRESSION_FACTOR)

def stringRedationPattern: Option[Regex] = SQL_STRING_REDACTION_PATTERN.readFrom(reader)

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,11 @@ case class HadoopFsRelation(
}
}

override def sizeInBytes: Long = location.sizeInBytes
override def sizeInBytes: Long = {
val compressionFactor = sqlContext.conf.fileCompressionFactor
(location.sizeInBytes * compressionFactor).toLong
}


override def inputFiles: Array[String] = location.inputFiles
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources
import java.io.{File, FilenameFilter}

import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, SortMergeJoinExec}
import org.apache.spark.sql.test.SharedSQLContext

class HadoopFsRelationSuite extends QueryTest with SharedSQLContext {
Expand All @@ -39,4 +40,44 @@ class HadoopFsRelationSuite extends QueryTest with SharedSQLContext {
assert(df.queryExecution.logical.stats.sizeInBytes === BigInt(totalSize))
}
}

test("SPARK-22790: spark.sql.sources.compressionFactor takes effect") {
import testImplicits._
Seq(1.0, 0.5).foreach { compressionFactor =>
withSQLConf("spark.sql.sources.fileCompressionFactor" -> compressionFactor.toString,
"spark.sql.autoBroadcastJoinThreshold" -> "400") {
withTempPath { workDir =>
// the file size is 740 bytes
val workDirPath = workDir.getAbsolutePath
val data1 = Seq(100, 200, 300, 400).toDF("count")
data1.write.parquet(workDirPath + "/data1")
val df1FromFile = spark.read.parquet(workDirPath + "/data1")
val data2 = Seq(100, 200, 300, 400).toDF("count")
data2.write.parquet(workDirPath + "/data2")
val df2FromFile = spark.read.parquet(workDirPath + "/data2")
val joinedDF = df1FromFile.join(df2FromFile, Seq("count"))
if (compressionFactor == 0.5) {
val bJoinExec = joinedDF.queryExecution.executedPlan.collect {
case bJoin: BroadcastHashJoinExec => bJoin
}
assert(bJoinExec.nonEmpty)
val smJoinExec = joinedDF.queryExecution.executedPlan.collect {
case smJoin: SortMergeJoinExec => smJoin
}
assert(smJoinExec.isEmpty)
} else {
// compressionFactor is 1.0
val bJoinExec = joinedDF.queryExecution.executedPlan.collect {
case bJoin: BroadcastHashJoinExec => bJoin
}
assert(bJoinExec.isEmpty)
val smJoinExec = joinedDF.queryExecution.executedPlan.collect {
case smJoin: SortMergeJoinExec => smJoin
}
assert(smJoinExec.nonEmpty)
}
}
}
}
}
}

0 comments on commit ba891ec

Please sign in to comment.