Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-22672][SQL][TEST] Refactor ORC Tests #19882

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
28 changes: 0 additions & 28 deletions sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Expand Up @@ -2775,32 +2775,4 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}
}
}

test("SPARK-21791 ORC should support column names with dot") {
val orc = classOf[org.apache.spark.sql.execution.datasources.orc.OrcFileFormat].getCanonicalName
withTempDir { dir =>
val path = new File(dir, "orc").getCanonicalPath
Seq(Some(1), None).toDF("col.dots").write.format(orc).save(path)
assert(spark.read.format(orc).load(path).collect().length == 2)
}
}

test("SPARK-20728 Make ORCFileFormat configurable between sql/hive and sql/core") {
withSQLConf(SQLConf.ORC_IMPLEMENTATION.key -> "hive") {
val e = intercept[AnalysisException] {
sql("CREATE TABLE spark_20728(a INT) USING ORC")
}
assert(e.message.contains("Hive built-in ORC data source must be used with Hive support"))
}

withSQLConf(SQLConf.ORC_IMPLEMENTATION.key -> "native") {
withTable("spark_20728") {
sql("CREATE TABLE spark_20728(a INT) USING ORC")
val fileFormat = sql("SELECT * FROM spark_20728").queryExecution.analyzed.collectFirst {
case l: LogicalRelation => l.relation.asInstanceOf[HadoopFsRelation].fileFormat.getClass
}
assert(fileFormat == Some(classOf[OrcFileFormat]))
}
}
}
}
Expand Up @@ -15,25 +15,32 @@
* limitations under the License.
*/

package org.apache.spark.sql.hive.orc
package org.apache.spark.sql.execution.datasources.orc

import java.nio.charset.StandardCharsets
import java.sql.{Date, Timestamp}

import scala.collection.JavaConverters._

import org.apache.hadoop.hive.ql.io.sarg.{PredicateLeaf, SearchArgument}
import org.apache.orc.storage.ql.io.sarg.{PredicateLeaf, SearchArgument}

import org.apache.spark.sql.{Column, DataFrame, QueryTest}
import org.apache.spark.sql.{Column, DataFrame}
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._

/**
* A test suite that tests ORC filter API based filter pushdown optimization.
* A test suite that tests Apache ORC filter API based filter pushdown optimization.
* OrcFilterSuite and HiveOrcFilterSuite is logically duplicated to provide the same test coverage.
* The difference are the packages containing 'Predicate' and 'SearchArgument' classes.
* - OrcFilterSuite uses 'org.apache.orc.storage.ql.io.sarg' package.
* - HiveOrcFilterSuite uses 'org.apache.hadoop.hive.ql.io.sarg' package.
*/
class OrcFilterSuite extends QueryTest with OrcTest {
class OrcFilterSuite extends OrcTest with SharedSQLContext {

private def checkFilterPredicate(
df: DataFrame,
predicate: Predicate,
Expand All @@ -55,7 +62,7 @@ class OrcFilterSuite extends QueryTest with OrcTest {
DataSourceStrategy.selectFilters(maybeRelation.get, maybeAnalyzedPredicate.toSeq)
assert(selectedFilters.nonEmpty, "No filter is pushed down")

val maybeFilter = OrcFilters.createFilter(query.schema, selectedFilters.toArray)
val maybeFilter = OrcFilters.createFilter(query.schema, selectedFilters)
assert(maybeFilter.isDefined, s"Couldn't generate filter predicate for $selectedFilters")
checker(maybeFilter.get)
}
Expand Down Expand Up @@ -99,7 +106,7 @@ class OrcFilterSuite extends QueryTest with OrcTest {
DataSourceStrategy.selectFilters(maybeRelation.get, maybeAnalyzedPredicate.toSeq)
assert(selectedFilters.nonEmpty, "No filter is pushed down")

val maybeFilter = OrcFilters.createFilter(query.schema, selectedFilters.toArray)
val maybeFilter = OrcFilters.createFilter(query.schema, selectedFilters)
assert(maybeFilter.isEmpty, s"Could generate filter predicate for $selectedFilters")
}

Expand Down Expand Up @@ -284,40 +291,27 @@ class OrcFilterSuite extends QueryTest with OrcTest {

test("filter pushdown - combinations with logical operators") {
withOrcDataFrame((1 to 4).map(i => Tuple1(Option(i)))) { implicit df =>
// Because `ExpressionTree` is not accessible at Hive 1.2.x, this should be checked
// in string form in order to check filter creation including logical operators
// such as `and`, `or` or `not`. So, this function uses `SearchArgument.toString()`
// to produce string expression and then compare it to given string expression below.
// This might have to be changed after Hive version is upgraded.
checkFilterPredicate(
'_1.isNotNull,
"""leaf-0 = (IS_NULL _1)
|expr = (not leaf-0)""".stripMargin.trim
"leaf-0 = (IS_NULL _1), expr = (not leaf-0)"
)
checkFilterPredicate(
'_1 =!= 1,
"""leaf-0 = (IS_NULL _1)
|leaf-1 = (EQUALS _1 1)
|expr = (and (not leaf-0) (not leaf-1))""".stripMargin.trim
"leaf-0 = (IS_NULL _1), leaf-1 = (EQUALS _1 1), expr = (and (not leaf-0) (not leaf-1))"
)
checkFilterPredicate(
!('_1 < 4),
"""leaf-0 = (IS_NULL _1)
|leaf-1 = (LESS_THAN _1 4)
|expr = (and (not leaf-0) (not leaf-1))""".stripMargin.trim
"leaf-0 = (IS_NULL _1), leaf-1 = (LESS_THAN _1 4), expr = (and (not leaf-0) (not leaf-1))"
)
checkFilterPredicate(
'_1 < 2 || '_1 > 3,
"""leaf-0 = (LESS_THAN _1 2)
|leaf-1 = (LESS_THAN_EQUALS _1 3)
|expr = (or leaf-0 (not leaf-1))""".stripMargin.trim
"leaf-0 = (LESS_THAN _1 2), leaf-1 = (LESS_THAN_EQUALS _1 3), " +
"expr = (or leaf-0 (not leaf-1))"
)
checkFilterPredicate(
'_1 < 2 && '_1 > 3,
"""leaf-0 = (IS_NULL _1)
|leaf-1 = (LESS_THAN _1 2)
|leaf-2 = (LESS_THAN_EQUALS _1 3)
|expr = (and (not leaf-0) leaf-1 (not leaf-2))""".stripMargin.trim
"leaf-0 = (IS_NULL _1), leaf-1 = (LESS_THAN _1 2), leaf-2 = (LESS_THAN_EQUALS _1 3), " +
"expr = (and (not leaf-0) leaf-1 (not leaf-2))"
)
}
}
Expand All @@ -344,4 +338,30 @@ class OrcFilterSuite extends QueryTest with OrcTest {
checkNoFilterPredicate('_1.isNotNull)
}
}

test("SPARK-12218 Converting conjunctions into ORC SearchArguments") {
import org.apache.spark.sql.sources._
// The `LessThan` should be converted while the `StringContains` shouldn't
val schema = new StructType(
Array(
StructField("a", IntegerType, nullable = true),
StructField("b", StringType, nullable = true)))
assertResult("leaf-0 = (LESS_THAN a 10), expr = leaf-0") {
OrcFilters.createFilter(schema, Array(
LessThan("a", 10),
StringContains("b", "prefix")
)).get.toString
}

// The `LessThan` should be converted while the whole inner `And` shouldn't
assertResult("leaf-0 = (LESS_THAN a 10), expr = leaf-0") {
OrcFilters.createFilter(schema, Array(
LessThan("a", 10),
Not(And(
GreaterThan("a", 1),
StringContains("b", "prefix")
))
)).get.toString
}
}
}
Expand Up @@ -15,48 +15,21 @@
* limitations under the License.
*/

package org.apache.spark.sql.hive.orc
package org.apache.spark.sql.execution.datasources.orc

import java.io.File

import scala.reflect.ClassTag
import scala.reflect.runtime.universe.TypeTag

import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.scalatest.BeforeAndAfterAll

import org.apache.spark.sql._
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.util.Utils
import org.apache.spark.sql.test.SharedSQLContext

// The data where the partitioning key exists only in the directory structure.
case class OrcParData(intField: Int, stringField: String)

// The data that also includes the partitioning key
case class OrcParDataWithKey(intField: Int, pi: Int, stringField: String, ps: String)

// TODO This test suite duplicates ParquetPartitionDiscoverySuite a lot
class OrcPartitionDiscoverySuite extends QueryTest with TestHiveSingleton with BeforeAndAfterAll {
import spark._
import spark.implicits._

val defaultPartitionName = ConfVars.DEFAULTPARTITIONNAME.defaultStrVal

def withTempDir(f: File => Unit): Unit = {
val dir = Utils.createTempDir().getCanonicalFile
try f(dir) finally Utils.deleteRecursively(dir)
}

def makeOrcFile[T <: Product: ClassTag: TypeTag](
data: Seq[T], path: File): Unit = {
data.toDF().write.mode("overwrite").orc(path.getCanonicalPath)
}


def makeOrcFile[T <: Product: ClassTag: TypeTag](
df: DataFrame, path: File): Unit = {
df.write.mode("overwrite").orc(path.getCanonicalPath)
}
abstract class OrcPartitionDiscoveryTest extends OrcTest {
val defaultPartitionName = "__HIVE_DEFAULT_PARTITION__"

protected def withTempTable(tableName: String)(f: => Unit): Unit = {
try f finally spark.catalog.dropTempView(tableName)
Expand Down Expand Up @@ -90,7 +63,7 @@ class OrcPartitionDiscoverySuite extends QueryTest with TestHiveSingleton with B
makePartitionDir(base, defaultPartitionName, "pi" -> pi, "ps" -> ps))
}

read.orc(base.getCanonicalPath).createOrReplaceTempView("t")
spark.read.orc(base.getCanonicalPath).createOrReplaceTempView("t")

withTempTable("t") {
checkAnswer(
Expand Down Expand Up @@ -137,7 +110,7 @@ class OrcPartitionDiscoverySuite extends QueryTest with TestHiveSingleton with B
makePartitionDir(base, defaultPartitionName, "pi" -> pi, "ps" -> ps))
}

read.orc(base.getCanonicalPath).createOrReplaceTempView("t")
spark.read.orc(base.getCanonicalPath).createOrReplaceTempView("t")

withTempTable("t") {
checkAnswer(
Expand Down Expand Up @@ -186,8 +159,8 @@ class OrcPartitionDiscoverySuite extends QueryTest with TestHiveSingleton with B
makePartitionDir(base, defaultPartitionName, "pi" -> pi, "ps" -> ps))
}

read
.option(ConfVars.DEFAULTPARTITIONNAME.varname, defaultPartitionName)
spark.read
.option("hive.exec.default.partition.name", defaultPartitionName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the new ORC didn't change these config names?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. In fact, Apache ORC doesn't have this params.

.orc(base.getCanonicalPath)
.createOrReplaceTempView("t")

Expand Down Expand Up @@ -228,8 +201,8 @@ class OrcPartitionDiscoverySuite extends QueryTest with TestHiveSingleton with B
makePartitionDir(base, defaultPartitionName, "pi" -> pi, "ps" -> ps))
}

read
.option(ConfVars.DEFAULTPARTITIONNAME.varname, defaultPartitionName)
spark.read
.option("hive.exec.default.partition.name", defaultPartitionName)
.orc(base.getCanonicalPath)
.createOrReplaceTempView("t")

Expand All @@ -253,3 +226,4 @@ class OrcPartitionDiscoverySuite extends QueryTest with TestHiveSingleton with B
}
}

class OrcPartitionDiscoverySuite extends OrcPartitionDiscoveryTest with SharedSQLContext